Java源码解析之LinkedBlockingQueue


LinkedBlockingQueueBlockingQueue基于链表的实现,正如其名称的自描述一样“链表,阻塞,队列”。是面向生产者-消费者模式的典型例子。LinkedBlockingQueue的实现使用了入队出队的概念管理同步。所有的入队操作使用一把锁,所有的出队操作使用另一把锁。同一时间入队和出队都各自只能有一个线程处于活跃状态,但入队与出队是可以并发运行的。先了解下LinkedBlockingQueue的实现约定。

  1. 保证线程安全,关于锁,阻塞和唤醒的操作由内部封装
  2. FIFO(先进先出)的方式操作元素,元素被添加在尾部,从头部移除或查看
  3. 不接受null,入队会进行非空检查
  4. 如果不显式的设置,最大值将是Integer.MAX_VALUE,理论上的无界队列
-不阻塞,抛出异常不阻塞,返回特殊值阻塞阻塞但允许超时
入队add(e)offer(e)put(e)offer(e, time, unit)
出队remove()poll()take()poll(time ,unit)
查看element()peek()不支持不支持

文中的代码基于oracle-jdk-13版本,不同的版本可能略有差异。

链表节点的静态内部类

// 单向链表
static class Node<E> {

    /** 元素的泛型引用 */
    E item;

    /** 下一个节点的引用,如果是null,则是尾节点 */
    Node<E> next;

    Node(E x) { item = x; }
}

成员属性

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 最大容量,默认值Integer.MAX_VALUE */
    private final int capacity;

    /** 当前元素数量的原子计数器 */
    private final AtomicInteger count = new AtomicInteger();

    /** 链表头部,头部的item始终为null,不保存元素,不参与数量计算 */
    transient Node<E> head;

    /** 链表尾部 */
    private transient Node<E> last;

    /** 出队操作的可重入锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 出队操作的阻塞、唤醒监视器 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 入队操作的可重入锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 入队操作的阻塞、唤醒监视器 */
    private final Condition notFull = putLock.newCondition();

}
  • count

    AtomicInteger提供了基于Integer加减运算的原子级实现。入队和出队用的是两把锁,使得两种类型的操作可以并发进行。使用AtomicInteger可以保证并发运算的准确性。

  • head

    对链表的出队和查看操作实际上都是在获取head.next.itemhead起到的是一个标识的作用。在入队的时候不用进行头部非空检查和初始化操作,因为head是始终存在的,不参与元素个数计算,也不保存元素。

  • ReentrantLock

    LinkedBlockingQueue中主要使用lock()获取锁,如果锁已经被其他线程获取,则会阻塞直到被其他线程唤醒,再次尝试获取锁,如此往复。直到成功获取到锁,进行相应操作之后通过调用unlock()释放锁。与Condition不同的是ReentrantLock是以锁的争夺权为目的而阻塞线程的。

  • Condition

    对于入队和出队拥有各自的监视器。线程的阻塞与唤醒就是通过它的await()signal()实现的。与ReentrantLock不同的是Condition是以节点数量的控制为目的而阻塞线程的。

    对于入队而言,如果当前数量等于最大值,则入队线程被阻塞。

    对于出队而言,如果队列中没有元素,则出队线程被阻塞。

入队和出队的最终实现

关于入队和出队的操作最终都是调用enqueuedequeue这两个函数,先来介绍下它们,方便后面的展开。

  • 入队

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
    

    参数是新增节点的实例。需要添加到尾部,所以现有的尾部的last.next直接指向它,last也指向它,就成为了新的尾节点。函数是私有的,由入队相关函数使用,不关心锁的获取和元素个数的计算,由调用者实现。

    关于源码上的两行注释,是调用者必须保证的。

    • assert putLock.isHeldByCurrentThread();

      调用者必须保证当前线程是putLock的取得者。这是为了保证线程安全性,如果没有拿到锁的线程也能调用这个函数,那么相当于会有复数线程同时操作尾节点的情况存在。

    • assert last.next == null;

      node会成为新的尾节点,尾节点的定义要求。假设这段代码

      Node<E> willBeLost = new Node<E>(other);
      Node<E> node = new Node<E>(e);
      node.next = willBeLost;
      enqueue(node);
      

      enqueue(node)的多次调用都会覆盖willBeLost节点。使得该节点从队列中丢失。

  • 出队

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        // h = 当前头
        Node<E> h = head;
        // first = 实际拥有元素的头部
        Node<E> first = h.next;
        h.next = h; // help GC
        // 更新头部
        head = first;
        // 获得要返回的值
        E x = first.item;
        // 保证head约定
        first.item = null;
        return x;
    }
    

    因为head不保存元素,所以实际出队的是head.next.itemhead被移出队列,h.next = h保持其不与任何节点产生引用关系,GC调度不用再通过额外的逻辑判断目标是否可以被回收。

    关于源码上的两行注释,与入队一样,只不过需要取得的锁是takeLock

构造函数

  1. 基础构造函数,被其他构造函数使用

    public LinkedBlockingQueue(int capacity) {
        // 参数检查
        if (capacity <= 0) throw new IllegalArgumentException();
        // 接受最大容量参数,队列成为有限容量队列。
        this.capacity = capacity;
        // 初始化node值为null,主要用于head。此时计数器是0,head不参与计数,不保存元素。
        last = head = new Node<E>(null);
    }
    
  2. 默认构造函数。初始化一个理论上的无界队列。直接复用上一个构造函数的初始化方式

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
  3. 最后一个构造函数

    public LinkedBlockingQueue(Collection<? extends E> c) {
        // 初始化head和capacity
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        // 获取入队锁
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            // 遍历参数
            for (E e : c) {
                // 非空检查
                if (e == null)
                    throw new NullPointerException();
                // 最大值检查
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                // 入队
                enqueue(new Node<E>(e));
                ++n;
            }
            // 设置元素数量
            count.set(n);
        } finally {
            // 释放入队锁
            putLock.unlock();
        }
    }
    

    这个构造函数为我们展示了几个编码最佳实践

    1. 安全的初始化

      putLock.lock(); // Never contended, but necessary for visibility
      

      正如其注释所说,因为在构造函数中,这把锁不会被多个线程同时竞争。但在构造未完成之前,实例可能已经对其他线程可见。

      以下代码摘自《java并发编程》3.5章节(安全发布)

      public class Holder {
          private int n;
      
          public Holder(int n) { this.n = n; }
      
          public void assertSanity() {
              if (n != n)
                  throw new AssertionError("This statement is false.");
          }
      }
      
      public Holder holder;
      
      public void initialize() {
          holder = new Holder(42);
      }
      

      书中提到多线程的情况下,访问assertSanity函数是有可能触发异常的。JVM可以对字节码指令重排序,其他线程可能先看到holder的引用存在于内存中,而此时构造函数并没有完成。在第一次获取n的时候还是0,第二次获取到了更新的值。虽然挺违反直觉,而且也没有复现出来过。

    2. 异常情况的锁释放和栈封闭

      final ReentrantLock putLock = this.putLock;
      putLock.lock();
      try {
          // ...
      } finally {
          putLock.unlock();
      }
      

      try块中可能会发生异常,异常不一定是代码抛出的,也可能是JVM抛出的,无论是否处理异常,都应该有一个finally块,在其中释放当前获得的锁。并且try的前一句总是获取锁的代码。在jdk类库中使用到锁的代码,都是以这个写法来使用锁的。

      final ReentrantLock putLock = this.putLock;尽管LinkedBlockingQueue保证锁不会被改变。但拥有良好的编码习惯总是促使程序员把代码当成艺术品。从获取锁到释放锁之间程序到底会运行多久我们不得而知,也可能有人恶作剧般的通过反射修改了锁的实例。在调用栈中存储锁,以保证获取与释放的锁一定是同一个。

    3. 放弃直接使用API

      count的设置并没有直接使用Collection.size(),传递的参数只是接口,它可能是jdk类库的标准实现,也可能是第三方框架的扩展,还可能是用户自定义的实现。非阻塞队列java.util.concurrent.ConcurrentLinkedQueue对size的实现是遍历整个队列。如果直接通过size()获取元素个数,则构造函数中实际上遍历了两次参数。如果队列很大的话,就会成为不必要的性能损失。另一种情况是实现并没有经过严格的测试,它提供的size可能与实际数量不符。

唤醒线程

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    // 获取出队锁
    takeLock.lock();
    try {
        // 唤醒被阻塞在出队(队列中没有元素可被获取而阻塞)的线程,如果有。
        notEmpty.signal();
    } finally {
        // 释放出队锁
        takeLock.unlock();
    }
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    // 获取入队锁
    putLock.lock();
    try {
        // 唤醒被阻塞在入队(队列长度达到最大值而阻塞)的线程,如果有。
        notFull.signal();
    } finally {
        // 释放入队锁
        putLock.unlock();
    }
}

这两个函数用于元素数量发生变化时,唤醒相应线程。每次只唤醒一个。对于ReentrantLockCondition有一个概念是必须要了解的。

notEmpty是由takeLock.newCondition()实例化的。这两个实例之间具有绑定关系,notEmpty可以操作takeLock内部的线程队列。对notEmpty的所有操作必须要先取得takeLock的所有权,否则会抛出IllegalMonitorStateException异常。这一点与Object.wait()Object.notify()相同。notFull则对应putLock

入队和出队API

  • 阻塞入队

    public void put(E e) throws InterruptedException {
        // 非空检查
        if (e == null) throw new NullPointerException();
        // 计数器
        final int c;
        // 新节点
        final Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 获取入队锁,接受中断信号,直接抛出
        putLock.lockInterruptibly();
        try {
            // 旋转判断是否达到最大元素数
            while (count.get() == capacity) {
                // 如果成立则阻塞
                notFull.await();
            }
            // 入队
            enqueue(node);
            // 计数器原子递增,并获取递增前的值
            c = count.getAndIncrement();
            // 判断当前元素数是否小于最大值
            if (c + 1 < capacity)
                // 如果小于,则还能继续入队操作,唤醒下一条入队线程,如果存在。
                // 当前线程已经持有入队锁,无需调用signalNotFull
                notFull.signal();
        } finally {
            // 释放入队锁
            putLock.unlock();
        }
        // 判断入队之前队列是否没有元素
        if (c == 0)
            // 如果成立,则可能会有出队线程处于阻塞,执行唤醒操作
            signalNotEmpty();
    }
    

    入队的锁获取允许被中断,并且直接向调用者抛出。之后是自旋。如果满足条件,进入notFull.await();阻塞当前线程。而后被唤醒,重新判断count.get() == capacity条件。这个过程可能发生不止一次。

    入队之后的递增操作把递增前的值保存了下来。出队的唤醒判断时已经放弃了入队锁。假设当前线程在获得锁后,计数器的值是0,出队线程由于判断队列中没有元素而阻塞,在当前线程释放入队锁之后,if (c == 0)之前,另一个入队线程完成了递增操作,此时使用if (count.get() == 0)希望唤醒出队线程,实际情况是count.get() = 1。被阻塞的出队线程就很难被唤醒或是永远不会被唤醒。

  • 阻塞出队

    public E take() throws InterruptedException {
        final E x;
        final int c;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 获取出队锁,接受中断信号
        takeLock.lockInterruptibly();
        try {
            // 旋转判断是否没有元素
            while (count.get() == 0) {
                // 如果成立,则阻塞
                notEmpty.await();
            }
            // 出队
            x = dequeue();
            // 计数器原子递减,并获取递减前的值
            c = count.getAndDecrement();
            // 判断是否还有元素
            if (c > 1)
                // 如果有,则还能继续出队操作,唤醒下一条出队线程,如果存在。
                notEmpty.signal();
        } finally {
            // 释放出队锁
            takeLock.unlock();
        }
        // 判断出队之前队列是否已满
        if (c == capacity)
            // 如果成立,则可能会有入队线程处于阻塞,执行唤醒操作
            signalNotFull();
        return x;
    }
    
  • 阻塞超时入队

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 非空检查
        if (e == null) throw new NullPointerException();
        // 获得超时纳秒数
        long nanos = unit.toNanos(timeout);
        final int c;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 获取入队锁,接受中断信号
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                // 如果参数很奇怪,返回false。
                // 程序无法决定超时时间,也不能无限阻塞(违反API约定)
                if (nanos <= 0L)
                    return false;
                // 等待相应的时间,超时或被唤醒后,继续旋转
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
    
  • 不阻塞的返回值出队

    public E poll() {
        final AtomicInteger count = this.count;
        // 判断是否没有元素
        if (count.get() == 0)
            // 成立,返回null
            return null;
        final E x;
        final int c;
        final ReentrantLock takeLock = this.takeLock;
        // 获取入队锁,不接受中断
        takeLock.lock();
        try {
            // 双重检查
            if (count.get() == 0)
                return null;
            // 出队
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
  • 不阻塞的异常入队出队

    public boolean add(E e) {
        // 非阻塞入队且判断是否成功
        if (offer(e))
            return true;
        else
            // 入队失败,抛出异常
            throw new IllegalStateException("Queue full");
    }
    
    public E remove() {
        // 非阻塞出队
        E x = poll();
        // 判断是否是有效的元素
        if (x != null)
            return x;
        else
            // 队列中没有元素,抛出异常
            throw new NoSuchElementException();
    }
    

    基于异常的API是由AbstractQueue实现的,通过调用offerpoll根据失败抛出异常。

Collection相关API

BlogkcingQueue是为生产者消费者模式设计的,队列同时也可以是容器。所以它支持Collection的API。LinkedBlockingQueue在对Collection的实现上,使用的是完全同步的方式。同时获取入队和出队锁。并且不响应中断请求。

void fullyLock() {
    putLock.lock(); // 获取入队锁
    takeLock.lock(); // 获取出队锁
}

void fullyUnlock() {
    // 以获取时相反的顺序
    takeLock.unlock(); // 释放出队锁
    putLock.unlock(); // 释放入队锁
}

之后是两个接下来会被使用的函数

// 这个函数被(Iterator)迭代器的remove和LinkedBlockingQueue的remove同时使用
// 迭代器因为每次迭代都只记录当前节点,实现时需要对节点是否已经被其他线程移除进行假设性考虑
void unlink(Node<E> p, Node<E> pred) {
    // assert putLock.isHeldByCurrentThread(); 调用者需要拥有入队锁
    // assert takeLock.isHeldByCurrentThread(); 调用者需要拥有出队锁

    // 不改变p.next,假设p是迭代器的当前节点,不改变next可以使迭代器正确运行
    p.item = null;
    pred.next = p.next; // 上一个节点的next不再指向p,而指向p.next,从而将p移出了链表
    if (last == p)
        // 如果被移除的p是尾节点,更改尾节点的指向
        last = pred;
    if (count.getAndDecrement() == capacity)
        // 计数器原子递减前的值如果达到最大限制,则递减后允许入队,唤醒入队线程
        notFull.signal();
}
// 从ancestor的位置往下查找p的上一个节点,unlink函数移除节点,需要使用到被移除节点和其上一个节点
Node<E> findPred(Node<E> p, Node<E> ancestor) {
    // ancestor节点已经被移除,从head开始
    if (ancestor.item == null)
        ancestor = head;
    // 从ancestor开始,依次向下找,满足ancestor.next == p就会停止。此时ancestor就是p的上一个节点
    for (Node<E> q; (q = ancestor.next) != p; )
        ancestor = q;
    return ancestor;
}
  • contains(Object o)

    public boolean contains(Object o) {
        // null值直接返回
        if (o == null) return false;
        // 获取锁
        fullyLock();
        try {
            // 从head.next开始遍历
            for (Node<E> p = head.next; p != null; p = p.next)
                // Object.equals判断
                if (o.equals(p.item))
                    // 条件成立,返回true
                    return true;
            // 不包含,返回false
            return false;
        } finally {
            // 释放锁
            fullyUnlock();
        }
    }
    
  • remove(Object o)

    public boolean remove(Object o) {
        // null值直接返回
        if (o == null) return false;
        // 获取锁
        fullyLock();
        try {
            // pred始终指向上一个节点。
            // p = 当前节点。如果是null,表示已经遍历完整个链表
            for (Node<E> pred = head, p = pred.next;
                 p != null;
                 pred = p, p = p.next) {
                // Object.equals判断
                if (o.equals(p.item)) {
                    // 条件成立,从链表中移除p
                    unlink(p, pred);
                    return true;
                }
            }
            // 没有匹配的元素
            return false;
        } finally {
            // 释放锁
            fullyUnlock();
        }
    }
    
  • iterator()

    public Iterator<E> iterator() {
        return new Itr();
    }
    
    private class Itr implements Iterator<E> {
        private Node<E> next; // 当前节点的下一个节点
        private E nextItem; // 当前节点保存的值,防止外部类的remove()操作而丢失item
        private Node<E> lastRet; // 当前节点
        // 最近一次被remove节点的遍历状态。再次remove可以从这个节点开始,无需从头查找。
        private Node<E> ancestor; 
    
        // 构造函数
        Itr() {
            // 获取锁
            fullyLock();
            try {
                // 拿到head.next第一个节点
                if ((next = head.next) != null)
                    // 保存节点的值,初始化时next是当前节点
                    nextItem = next.item;
            } finally {
                // 释放锁
                fullyUnlock();
            }
        }
    
        public boolean hasNext() {
            return next != null;
        }
    
        public E next() {
            Node<E> p;
            // 无效的调用检查,即使hasNext返回false,next依然可以被调用
            if ((p = next) == null)
                throw new NoSuchElementException();
            // p即将指向p.next,保存当前节点,用于remove
            lastRet = p;
            // 获取当前节点的值
            E x = nextItem;
            // 获取锁
            fullyLock();
            try {
                E e = null;
                // p = next.next,移动到下一节点
                // p.item == null,测试下一节点是否已经被其他线程移除
                for (p = p.next; p != null && (e = p.item) == null; )
                    // 如果被移除,继续往下找,直到p.item != null
                    p = succ(p);
                // 修改成员属性,使next指向有效的next节点
                next = p;
                // 修改成员属性,使next.item被保存,以防止其他线程执行unlink
                nextItem = e;
            } finally {
                // 释放锁
                fullyUnlock();
            }
            // 返回当前节点的值
            return x;
        }
    
        public void forEachRemaining(Consumer<? super E> action) {
            // 非空检查
            Objects.requireNonNull(action);
            Node<E> p;
            // 忽略无效的调用
            if ((p = next) == null) return;
            // 保存当前节点,用于remove
            lastRet = p;
            // 成员属性next = null防止其他线程重复调用next或forEachRemaining
            next = null;
            // 分批执行的最大长度,每次从队列中获取64个节点执行action。
            final int batchSize = 64;
            // 分批处理的节点元素存储数组
            Object[] es = null;
            int n, len = 1;
            do {
                // 获取锁
                fullyLock();
                try {
                    // 判断节点数组是否被初始化
                    if (es == null) {
                        // 实始化操作
                        // 移动节点
                        p = p.next;
                        // 向下遍历,获取有效节点的个数
                        // 节点是有效节点,len递增
                        for (Node<E> q = p; q != null; q = succ(q))
                            if (q.item != null && ++len == batchSize)
                                break;
                        // 初始化数组,按有效节点的长度,最大64
                        es = new Object[len];
                        // 当前节点的值放入首位
                        es[0] = nextItem;
                        nextItem = null;
                        // 从1开始设置元素
                        n = 1;
                    } else
                        // 数组已经被初始化,直接从第0个位置开始
                        n = 0;
                    // 从p开始向下移动,有效的节点值存入数组
                    for (; p != null && n < len; p = succ(p))
                        if ((es[n] = p.item) != null) {
                            // 保存当前节点,用于remove
                            lastRet = p;
                            n++;
                        }
                } finally {
                    // 释放锁
                    fullyUnlock();
                }
                // 对数组执行一次action。n <= 64
                for (int i = 0; i < n; i++) {
                    @SuppressWarnings("unchecked") E e = (E) es[i];
                    action.accept(e);
                }
    
                // 循环条件判断,直到所有节点都执行过action
            } while (n > 0 && p != null);
        }
    
        public void remove() {
            // 获取当前节点
            Node<E> p = lastRet;
            // 非法值,迭代器可能还没有调用过next,抛出异常
            if (p == null)
                throw new IllegalStateException();
            // 清理当前节点状态,避免对remove的多次调用
            lastRet = null;
            // 获取锁
            fullyLock();
            try {
                // 当前节点已经被移除,其他线程调用了外部类的remove
                if (p.item != null) {
                    // 如果ancestor已经被初始化,直接从保存的值开始,O(1)
                    // ancestor有可能被移除,这种情况只能从head开始,O(n)
                    if (ancestor == null)
                        // 首次remove操作,从head开始
                        ancestor = head;
                    // 查找p节点的上一个节点,ancestor.next == p 
                    ancestor = findPred(p, ancestor);
                    // 移除节点
                    unlink(p, ancestor);
                }
            } finally {
                // 释放锁
                fullyUnlock();
            }
        }
    }
    

LinkedBlockingQueue的主要函数介绍完毕。其他的函数基本上大同小异,思路与写法是差不多的。鉴于篇幅原因,不再详细展开。