并发编程之LinkedTransferQueue

向量数据库大模型云通信

简介

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为“匹配”方式。

LinkedTransferQueue是ConcurrentLinkedQueue、SynchronousQueue(公平模式下转交元素)、LinkedBlockingQueue(阻塞Queue的基本方法)的超集。而且LinkedTransferQueue更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。

LinkedTransferQueue只有两个构造方法,这里不再详细介绍:

public LinkedTransferQueue() { }  
   
public LinkedTransferQueue(Collection<? extends E> c) {  
    this();  
    addAll(c);  
}  

LinkedTransferQueue源码详解

LinkedTransferQueue类定义如下:

public class LinkedTransferQueue extends AbstractQueue implements TransferQueue, java.io.Serializable

LinkedTransferQueue类继承自AbstractQueue抽象类,并且实现了TransferQueue接口:

public interface TransferQueue<E> extends BlockingQueue<E> {  
    // 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则返回false,并且不进入队列。  
    boolean tryTransfer(E e);  
    // 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则等待直到元素被消费者接收。  
    void transfer(E e) throws InterruptedException;  
    // 在上述方法的基础上设置超时时间  
    boolean tryTransfer(E e, long timeout, TimeUnit unit)  
        throws InterruptedException;  
    // 如果至少有一位消费者在等待,则返回true  
    boolean hasWaitingConsumer();  
    // 获取所有等待获取元素的消费线程数量  
    int getWaitingConsumerCount();  
}  

TransferQueue接口继承了BlockingQueue接口,并进行了扩充,自己又定义了一些LinkedTransferQueue类需要用到的方法。

TransferQueue队列中的节点都是Node类型:

static final class Node {  
    // 如果是消费者请求的节点,则isData为false,否则该节点为生产(数据)节点为true  
    final boolean isData;   // false if this is a request node  
    // 数据节点的值,若是消费者节点,则item为null  
    volatile Object item;   // initially non-null if isData; CASed to match  
    // 指向下一个节点  
    volatile Node next;  
    // 等待线程  
    volatile Thread waiter; // null until waiting  
   
    // CAS设置next  
    final boolean casNext(Node cmp, Node val) {  
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);  
    }  
   
    // CAS设置item  
    final boolean casItem(Object cmp, Object val) {  
        // assert cmp == null || cmp.getClass() != Node.class;  
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);  
    }  
   
    // 构造方法  
    Node(Object item, boolean isData) {  
        UNSAFE.putObject(this, itemOffset, item); // relaxed write  
        this.isData = isData;  
    }  
   
    // 将next指向自己  
    final void forgetNext() {  
        UNSAFE.putObject(this, nextOffset, this);  
    }  
   
    // 匹配或者节点被取消的时候会调用,设置item自连接,waiter为null  
    final void forgetContents() {  
        UNSAFE.putObject(this, itemOffset, this);  
        UNSAFE.putObject(this, waiterOffset, null);  
    }  
   
    // 节点是否被匹配过了  
    final boolean isMatched() {  
        Object x = item;  
        return (x == this) || ((x == null) == isData);  
    }  
   
    // 是否是一个未匹配的请求节点  
    // 如果是的话,则isData为false,且item为null,因为如果被匹配过了,item就不再为null,而是指向自己  
    final boolean isUnmatchedRequest() {  
        return !isData && item == null;  
    }  
   
    // 如果给定节点不能连接在当前节点后则返回true  
    final boolean cannotPrecede(boolean haveData) {  
        boolean d = isData;  
        Object x;  
        return d != haveData && (x = item) != this && (x != null) == d;  
    }  
   
    // 匹配一个数据节点  
    final boolean tryMatchData() {  
        // assert isData;  
        Object x = item;  
        if (x != null && x != this && casItem(x, null)) {  
            LockSupport.unpark(waiter);  
            return true;  
        }  
        return false;  
    }  
   
    private static final long serialVersionUID = -3375979862319811754L;  
   
    // Unsafe mechanics  
    private static final sun.misc.Unsafe UNSAFE;  
    private static final long itemOffset;  
    private static final long nextOffset;  
    private static final long waiterOffset;  
    static {  
        try {  
            UNSAFE = sun.misc.Unsafe.getUnsafe();  
            Class<?> k = Node.class;  
            itemOffset = UNSAFE.objectFieldOffset  
                (k.getDeclaredField("item"));  
            nextOffset = UNSAFE.objectFieldOffset  
                (k.getDeclaredField("next"));  
            waiterOffset = UNSAFE.objectFieldOffset  
                (k.getDeclaredField("waiter"));  
        } catch (Exception e) {  
            throw new Error(e);  
        }  
    }  
}  

匹配前后节点item的变化,其中node1为数据节点,node2为消费者请求的占位节点:

Node node1(isData-item) node2(isData-item)
匹配前 true-item false-null
匹配后 true-null false-this

数据节点,则匹配前item不为null且不为自身,匹配后设置为null。  
占位请求节点,匹配前item为null,匹配后自连接。  

LinkedTransferQueue类中的重要字段如下:

// 是否为多核  
private static final boolean MP =  
Runtime.getRuntime().availableProcessors() > 1;  
   
// 作为第一个等待节点在阻塞之前的自旋次数  
private static final int FRONT\_SPINS   = 1 << 7;  
   
// 前驱节点正在处理,当前节点在阻塞之前的自旋次数  
private static final int CHAINED\_SPINS = FRONT\_SPINS >>> 1;  
   
// sweepVotes的阈值  
static final int SWEEP\_THRESHOLD = 32;  
   
// 队列首节点  
transient volatile Node head;  
   
// 队列尾节点  
private transient volatile Node tail;  
   
// 断开被删除节点失败的次数  
private transient volatile int sweepVotes;  
   
// xfer方法的how参数的可能取值  
// 用于无等待的poll、tryTransfer  
private static final int NOW   = 0; // for untimed poll, tryTransfer  
// 用于offer、put、add  
private static final int ASYNC = 1; // for offer, put, add  
// 用于无超时的阻塞transfer、take  
private static final int SYNC  = 2; // for transfer, take  
// 用于超时等待的poll、tryTransfer  
private static final int TIMED = 3; // for timed poll, tryTransfer  

我们看一看LinkedTransferQueue类的入队、出队方法:

// 入队方法  
public void put(E e) {  
    xfer(e, true, ASYNC, 0);  
}  
   
public boolean offer(E e, long timeout, TimeUnit unit) {  
    xfer(e, true, ASYNC, 0);  
    return true;  
}  
   
public boolean offer(E e) {  
    xfer(e, true, ASYNC, 0);  
    return true;  
}  
   
public boolean add(E e) {  
    xfer(e, true, ASYNC, 0);  
    return true;  
}  
   
   
// 出队方法  
public E take() throws InterruptedException {  
    E e = xfer(null, false, SYNC, 0);  
    if (e != null)  
        return e;  
    Thread.interrupted();  
    throw new InterruptedException();  
}  
   
public E poll() {  
    return xfer(null, false, NOW, 0);  
}  
   
public E poll(long timeout, TimeUnit unit) throws InterruptedException {  
    E e = xfer(null, false, TIMED, unit.toNanos(timeout));  
    if (e != null || !Thread.interrupted())  
        return e;  
    throw new InterruptedException();  
}  

我们可以看到,这些出队、入队方法都会调用xfer方法,因为LinkedTransferQueue是无界的,入队操作都会成功,所以入队操作都是ASYNC的,而出队方法,则是根据不同的要求传入不同的值,比如需要阻塞的出队方法就传入SYNC,需要加入超时控制的就传入TIMED。

除了上述方法会调用xfer方法之外,TransferQueue接口定义的方法也会调用xfer方法:

public boolean tryTransfer(E e) {  
    return xfer(e, true, NOW, 0) == null;  
}  
   
public void transfer(E e) throws InterruptedException {  
    if (xfer(e, true, SYNC, 0) != null) {  
        Thread.interrupted(); // failure possible only due to interrupt  
        throw new InterruptedException();  
    }  
}  
   
public boolean tryTransfer(E e, long timeout, TimeUnit unit)  
    throws InterruptedException {  
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)  
        return true;  
    if (!Thread.interrupted())  
        return false;  
    throw new InterruptedException();  
}  

可以看到,xfer方法是实现LinkedTransferQueue的关键方法,下面我们来仔细分析一下该方法:
xfer方法

private E xfer(E e, boolean haveData, int how, long nanos) {  
    // 如果haveData但是e为null,则抛出NullPointerException异常  
    if (haveData && (e == null))  
        throw new NullPointerException();  
    // s是将要被添加的节点,如果需要  
    Node s = null;                        // the node to append, if needed  
   
    retry:  
    for (;;) {                            // restart on append race  
        // 从首节点开始匹配  
        for (Node h = head, p = h; p != null;) { // find & match first node  
            boolean isData = p.isData;  
            Object item = p.item;  
            // 判断节点是否被匹配过  
            // item != null有2种情况:一是put操作,二是take的item被修改了(匹配成功)  
            // (itme != null) == isData 要么表示p是一个put操作,要么表示p是一个还没匹配成功的take操作  
            if (item != p && (item != null) == isData) { // unmatched  
                // 节点与此次操作模式一致,无法匹配  
                if (isData == haveData)   // can't match  
                    break;  
                // 匹配成功  
                if (p.casItem(item, e)) { // match  
                    for (Node q = p; q != h;) {  
                        Node n = q.next;  // update by 2 unless singleton  
                        // 更新head为匹配节点的next节点  
                        if (head == h && casHead(h, n == null ? q : n)) {  
                            // 将旧节点自连接  
                            h.forgetNext();  
                            break;  
                        }                 // advance and retry  
                        if ((h = head)   == null ||  
                            (q = h.next) == null || !q.isMatched())  
                            break;        // unless slack < 2  
                    }  
                    // 匹配成功,则唤醒阻塞的线程  
                    LockSupport.unpark(p.waiter);  
                    // 类型转换,返回匹配节点的元素  
                    return LinkedTransferQueue.<E>cast(item);  
                }  
            }  
            // 若节点已经被匹配过了,则向后寻找下一个未被匹配的节点  
            Node n = p.next;  
            // 如果当前节点已经离队,则从head开始寻找  
            p = (p != n) ? n : (h = head); // Use head if p offlist  
        }  
   
        // 若整个队列都遍历之后,还没有找到匹配的节点,则进行后续处理  
        // 把当前节点加入到队列尾  
        if (how != NOW) {                 // No matches available  
            if (s == null)  
                s = new Node(e, haveData);  
            // 将新节点s添加到队列尾并返回s的前驱节点  
            Node pred = tryAppend(s, haveData);  
            // 前驱节点为null,说明有其他线程竞争,并修改了队列,则从retry重新开始  
            if (pred == null)  
                continue retry;           // lost race vs opposite mode  
            // 不为ASYNC方法,则同步阻塞等待  
            if (how != ASYNC)  
                return awaitMatch(s, pred, e, (how == TIMED), nanos);  
        }  
        // how == NOW,则立即返回  
        return e; // not waiting  
    }  
}  

xfer方法的整个操作流程如下所示:

1、寻找和操作匹配的节点

从head开始向后遍历寻找未被匹配的节点,找到一个未被匹配并且和本次操作的模式不同的节点,匹配节点成功就通过CAS 操作将匹配节点的item字段设置为e,若修改失败,则继续向后寻找节点。

通过CAS操作更新head节点为匹配节点的next节点,旧head节点进行自连接,唤醒匹配节点的等待线程waiter,返回匹配的 item。如果CAS失败,并且松弛度大于等于2,就需要重新获取head重试。

2、如果在上述操作中没有找到匹配节点,则根据参数how做不同的处理:

NOW:立即返回,也不会插入节点  
SYNC:插入一个item为e(isData = haveData)到队列的尾部,然后自旋或阻塞当前线程直到节点被匹配或者取消。  
ASYNC:插入一个item为e(isData = haveData)到队列的尾部,不阻塞直接返回。  
TIMED:插入一个item为e(isData = haveData)到队列的尾部,然后自旋或阻塞当前线程直到节点被匹配或者取消或者超时。  

上面提到了一个松弛度的概念,它是什么作用呢?

在节点被匹配(被删除)之后,不会立即更新head/tail,而是当 head/tail 节点和最近一个未匹配的节点之间的距离超过一个“松弛阀值”之后才会更新(在LinkedTransferQueue中,这个值为 2)。这个“松弛阀值”一般为1-3,如果太大会降低缓存命中率,并且会增加遍历链的长度;太小会增加 CAS 的开销。

入队操作则是调用了tryAppend方法:

private Node tryAppend(Node s, boolean haveData) {  
    // 从尾节点开始  
    for (Node t = tail, p = t;;) {        // move p to last node and append  
        Node n, u;                        // temps for reads of next & tail  
        // 队列为空,则将s设置为head并返回s  
        if (p == null && (p = head) == null) {  
            if (casHead(null, s))  
                return s;                 // initialize  
        }  
        else if (p.cannotPrecede(haveData))  
            return null;                  // lost race vs opposite mode  
        // 不是最后一个节点  
        else if ((n = p.next) != null)    // not last; keep traversing  
            p = p != t && t != (u = tail) ? (t = u) : // stale tail  
                (p != n) ? n : null;      // restart if off list  
        // CAS失败  
        else if (!p.casNext(null, s))  
            p = p.next;                   // re-read on CAS failure  
        else {  
            // 更新tail  
            if (p != t) {                 // update if slack now >= 2  
                while ((tail != t || !casTail(t, s)) &&  
                        (t = tail)   != null &&  
                        (s = t.next) != null && // advance and retry  
                        (s = s.next) != null && s != t);  
            }  
            return p;  
        }  
    }  
}  

该方法主要逻辑为:添加节点s到队列尾并返回s的前继节点,失败时(与其他不同模式线程竞争失败)返回null,没有前继节点则返回自身。

加入队列后,如果how还不是ASYNC则调用awaitMatch()方法阻塞等待:

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {  
    // 计算超时时间点  
    final long deadline = timed ? System.nanoTime() + nanos : 0L;  
    // 获取当前线程对象  
    Thread w = Thread.currentThread();  
    // 自旋次数  
    int spins = -1; // initialized after first item and cancel checks  
    // 随机数  
    ThreadLocalRandom randomYields = null; // bound if needed  
   
    for (;;) {  
        Object item = s.item;  
        // 若有其它线程匹配了该节点  
        if (item != e) {                  // matched  
            // assert item != s;  
            // 撤销该节点,并返回匹配值  
            s.forgetContents();           // avoid garbage  
            return LinkedTransferQueue.<E>cast(item);  
        }  
        // 线程中断或者超时,则将s的节点item设置为s  
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&  
                s.casItem(e, s)) {        // cancel  
            // 断开节点  
            unsplice(pred, s);  
            return e;  
        }  
   
        // 自旋  
        if (spins < 0) {                  // establish spins at/near front  
            // 计算自旋次数  
            if ((spins = spinsFor(pred, s.isData)) > 0)  
                randomYields = ThreadLocalRandom.current();  
        }  
        else if (spins > 0) {             // spin  
            --spins;  
            // 生成随机数来让出CPU时间  
            if (randomYields.nextInt(CHAINED\_SPINS) == 0)  
                Thread.yield();           // occasionally yield  
        }  
        // 将s的waiter设置为当前线程  
        else if (s.waiter == null) {  
            s.waiter = w;                 // request unpark then recheck  
        }  
        // 超时阻塞  
        else if (timed) {  
            nanos = deadline - System.nanoTime();  
            if (nanos > 0L)  
                LockSupport.parkNanos(this, nanos);  
        }  
        // 非超时阻塞  
        else {  
            LockSupport.park(this);  
        }  
    }  
}  

当前操作为同步操作时,会调用awaitMatch方法阻塞等待匹配,成功返回匹配节点 item,失败返回给定参数e(s.item)。在等待期间如果线程被中断或等待超时,则取消匹配,并调用unsplice方法解除节点s和其前继节点的链接。

final void unsplice(Node pred, Node s) {  
    // 设置item自连接,waiter为null  
    s.forgetContents(); // forget unneeded fields  
      
    if (pred != null && pred != s && pred.next == s) {  
        // 获取s的后继节点  
        Node n = s.next;  
        // s的后继节点为null,或不为null,就将s的前驱节点的后继节点设置为n  
        if (n == null ||  
            (n != s && pred.casNext(s, n) && pred.isMatched())) {  
            for (;;) {               // check if at, or could be, head  
                Node h = head;  
                if (h == pred || h == s || h == null)  
                    return;          // at head or list empty  
                if (!h.isMatched())  
                    break;  
                Node hn = h.next;  
                if (hn == null)  
                    return;          // now empty  
                if (hn != h && casHead(h, hn))  
                    h.forgetNext();  // advance head  
            }  
            if (pred.next != pred && s.next != s) { // recheck if offlist  
                for (;;) {           // sweep now if enough votes  
                    int v = sweepVotes;  
                    if (v < SWEEP\_THRESHOLD) {  
                        if (casSweepVotes(v, v + 1))  
                            break;  
                    }  
                    // 达到阀值,进行“大扫除”,清除队列中的无效节点  
                    else if (casSweepVotes(v, 0)) {  
                        sweep();  
                        break;  
                    }  
                }  
            }  
        }  
    }  
}  

如果s的前继节点pred还是指向s(pred.next == s),尝试解除s的链接,若s不是自连接节点,就把pred的next引用指向s的next节点。如果s不能被解除(由于它是尾节点或者pred可能被解除链接,并且pred和s都不是head节点或已经出列),则添加到sweepVotes,sweepVotes累计到阀值SWEEP_THRESHOLD之后就调用sweep()对队列进行一次“大扫除”,清除队列中所有的无效节点:

private void sweep() {  
    for (Node p = head, s, n; p != null && (s = p.next) != null; ) {  
        if (!s.isMatched())  
            // Unmatched nodes are never self-linked  
            p = s;  
        else if ((n = s.next) == null) // trailing node is pinned  
            break;  
        else if (s == n)    // stale  
            // No need to also check for p == s, since that implies s == n  
            p = head;  
        else  
            p.casNext(s, n);  
    }  
}  

xfer的主要过程如下图所示:

picture.image和SynchronousQueue相比,LinkedTransferQueue多了一个可以存储的队列,与LinkedBlockingQueue相比,LinkedTransferQueue多了直接传递元素,少了用锁来同步。

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
如何利用云原生构建 AIGC 业务基石
AIGC即AI Generated Content,是指利用人工智能技术来生成内容,AIGC也被认为是继UGC、PGC之后的新型内容生产方式,AI绘画、AI写作等都属于AIGC的分支。而 AIGC 业务的部署也面临着异构资源管理、机器学习流程管理等问题,本次分享将和大家分享如何使用云原生技术构建 AIGC 业务。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论