Java源码解析之LinkedBlockingQueue
LinkedBlockingQueue是BlockingQueue基于链表的实现,正如其名称的自描述一样“链表,阻塞,队列”。是面向生产者-消费者模式的典型例子。LinkedBlockingQueue的实现使用了入队和出队的概念管理同步。所有的入队操作使用一把锁,所有的出队操作使用另一把锁。同一时间入队和出队都各自只能有一个线程处于活跃状态,但入队与出队是可以并发运行的。先了解下LinkedBlockingQueue的实现约定。
- 保证线程安全,关于锁,阻塞和唤醒的操作由内部封装
- 以FIFO(先进先出)的方式操作元素,元素被添加在尾部,从头部移除或查看
- 不接受null,入队会进行非空检查
- 如果不显式的设置,最大值将是
Integer.MAX_VALUE
,理论上的无界队列
- | 不阻塞,抛出异常 | 不阻塞,返回特殊值 | 阻塞 | 阻塞但允许超时 |
---|---|---|---|---|
入队 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
出队 | remove() | poll() | take() | poll(time ,unit) |
查看 | element() | peek() | 不支持 | 不支持 |
文中的代码基于oracle-jdk-13版本,不同的版本可能略有差异。
链表节点的静态内部类
// 单向链表
static class Node<E> {
/** 元素的泛型引用 */
E item;
/** 下一个节点的引用,如果是null,则是尾节点 */
Node<E> next;
Node(E x) { item = x; }
}
成员属性
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 最大容量,默认值Integer.MAX_VALUE */
private final int capacity;
/** 当前元素数量的原子计数器 */
private final AtomicInteger count = new AtomicInteger();
/** 链表头部,头部的item始终为null,不保存元素,不参与数量计算 */
transient Node<E> head;
/** 链表尾部 */
private transient Node<E> last;
/** 出队操作的可重入锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 出队操作的阻塞、唤醒监视器 */
private final Condition notEmpty = takeLock.newCondition();
/** 入队操作的可重入锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 入队操作的阻塞、唤醒监视器 */
private final Condition notFull = putLock.newCondition();
}
count
AtomicInteger提供了基于Integer加减运算的原子级实现。入队和出队用的是两把锁,使得两种类型的操作可以并发进行。使用AtomicInteger可以保证并发运算的准确性。
head
对链表的出队和查看操作实际上都是在获取
head.next.item
,head
起到的是一个标识的作用。在入队的时候不用进行头部非空检查和初始化操作,因为head
是始终存在的,不参与元素个数计算,也不保存元素。ReentrantLock
LinkedBlockingQueue中主要使用
lock()
获取锁,如果锁已经被其他线程获取,则会阻塞直到被其他线程唤醒,再次尝试获取锁,如此往复。直到成功获取到锁,进行相应操作之后通过调用unlock()
释放锁。与Condition不同的是ReentrantLock是以锁的争夺权为目的而阻塞线程的。Condition
对于入队和出队拥有各自的监视器。线程的阻塞与唤醒就是通过它的
await()
和signal()
实现的。与ReentrantLock
不同的是Condition
是以节点数量的控制为目的而阻塞线程的。对于入队而言,如果当前数量等于最大值,则入队线程被阻塞。
对于出队而言,如果队列中没有元素,则出队线程被阻塞。
入队和出队的最终实现
关于入队和出队的操作最终都是调用enqueue
和dequeue
这两个函数,先来介绍下它们,方便后面的展开。
入队
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
参数是新增节点的实例。需要添加到尾部,所以现有的尾部的
last.next
直接指向它,last
也指向它,就成为了新的尾节点。函数是私有的,由入队相关函数使用,不关心锁的获取和元素个数的计算,由调用者实现。关于源码上的两行注释,是调用者必须保证的。
assert putLock.isHeldByCurrentThread();
调用者必须保证当前线程是
putLock
的取得者。这是为了保证线程安全性,如果没有拿到锁的线程也能调用这个函数,那么相当于会有复数线程同时操作尾节点的情况存在。assert last.next == null;
node
会成为新的尾节点,尾节点的定义要求。假设这段代码Node<E> willBeLost = new Node<E>(other); Node<E> node = new Node<E>(e); node.next = willBeLost; enqueue(node);
对
enqueue(node)
的多次调用都会覆盖willBeLost
节点。使得该节点从队列中丢失。
出队
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; // h = 当前头 Node<E> h = head; // first = 实际拥有元素的头部 Node<E> first = h.next; h.next = h; // help GC // 更新头部 head = first; // 获得要返回的值 E x = first.item; // 保证head约定 first.item = null; return x; }
因为
head
不保存元素,所以实际出队的是head.next.item
,head
被移出队列,h.next = h
保持其不与任何节点产生引用关系,GC调度不用再通过额外的逻辑判断目标是否可以被回收。关于源码上的两行注释,与入队一样,只不过需要取得的锁是
takeLock
。
构造函数
基础构造函数,被其他构造函数使用
public LinkedBlockingQueue(int capacity) { // 参数检查 if (capacity <= 0) throw new IllegalArgumentException(); // 接受最大容量参数,队列成为有限容量队列。 this.capacity = capacity; // 初始化node值为null,主要用于head。此时计数器是0,head不参与计数,不保存元素。 last = head = new Node<E>(null); }
默认构造函数。初始化一个理论上的无界队列。直接复用上一个构造函数的初始化方式
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
最后一个构造函数
public LinkedBlockingQueue(Collection<? extends E> c) { // 初始化head和capacity this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; // 获取入队锁 putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; // 遍历参数 for (E e : c) { // 非空检查 if (e == null) throw new NullPointerException(); // 最大值检查 if (n == capacity) throw new IllegalStateException("Queue full"); // 入队 enqueue(new Node<E>(e)); ++n; } // 设置元素数量 count.set(n); } finally { // 释放入队锁 putLock.unlock(); } }
这个构造函数为我们展示了几个编码最佳实践
安全的初始化
putLock.lock(); // Never contended, but necessary for visibility
正如其注释所说,因为在构造函数中,这把锁不会被多个线程同时竞争。但在构造未完成之前,实例可能已经对其他线程可见。
以下代码摘自《java并发编程》3.5章节(安全发布)
public class Holder { private int n; public Holder(int n) { this.n = n; } public void assertSanity() { if (n != n) throw new AssertionError("This statement is false."); } }
public Holder holder; public void initialize() { holder = new Holder(42); }
书中提到多线程的情况下,访问
assertSanity
函数是有可能触发异常的。JVM可以对字节码指令重排序,其他线程可能先看到holder的引用存在于内存中,而此时构造函数并没有完成。在第一次获取n的时候还是0,第二次获取到了更新的值。虽然挺违反直觉,而且也没有复现出来过。异常情况的锁释放和栈封闭
final ReentrantLock putLock = this.putLock; putLock.lock(); try { // ... } finally { putLock.unlock(); }
try
块中可能会发生异常,异常不一定是代码抛出的,也可能是JVM抛出的,无论是否处理异常,都应该有一个finally
块,在其中释放当前获得的锁。并且try
的前一句总是获取锁的代码。在jdk类库中使用到锁的代码,都是以这个写法来使用锁的。final ReentrantLock putLock = this.putLock;
尽管LinkedBlockingQueue保证锁不会被改变。但拥有良好的编码习惯总是促使程序员把代码当成艺术品。从获取锁到释放锁之间程序到底会运行多久我们不得而知,也可能有人恶作剧般的通过反射修改了锁的实例。在调用栈中存储锁,以保证获取与释放的锁一定是同一个。放弃直接使用API
对
count
的设置并没有直接使用Collection.size()
,传递的参数只是接口,它可能是jdk类库的标准实现,也可能是第三方框架的扩展,还可能是用户自定义的实现。非阻塞队列java.util.concurrent.ConcurrentLinkedQueue
对size的实现是遍历整个队列。如果直接通过size()
获取元素个数,则构造函数中实际上遍历了两次参数。如果队列很大的话,就会成为不必要的性能损失。另一种情况是实现并没有经过严格的测试,它提供的size可能与实际数量不符。
唤醒线程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 获取出队锁
takeLock.lock();
try {
// 唤醒被阻塞在出队(队列中没有元素可被获取而阻塞)的线程,如果有。
notEmpty.signal();
} finally {
// 释放出队锁
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
// 获取入队锁
putLock.lock();
try {
// 唤醒被阻塞在入队(队列长度达到最大值而阻塞)的线程,如果有。
notFull.signal();
} finally {
// 释放入队锁
putLock.unlock();
}
}
这两个函数用于元素数量发生变化时,唤醒相应线程。每次只唤醒一个。对于ReentrantLock与Condition有一个概念是必须要了解的。
notEmpty
是由takeLock.newCondition()
实例化的。这两个实例之间具有绑定关系,notEmpty
可以操作takeLock
内部的线程队列。对notEmpty
的所有操作必须要先取得takeLock
的所有权,否则会抛出IllegalMonitorStateException异常。这一点与Object.wait()
和Object.notify()
相同。notFull
则对应putLock
。
入队和出队API
阻塞入队
public void put(E e) throws InterruptedException { // 非空检查 if (e == null) throw new NullPointerException(); // 计数器 final int c; // 新节点 final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 获取入队锁,接受中断信号,直接抛出 putLock.lockInterruptibly(); try { // 旋转判断是否达到最大元素数 while (count.get() == capacity) { // 如果成立则阻塞 notFull.await(); } // 入队 enqueue(node); // 计数器原子递增,并获取递增前的值 c = count.getAndIncrement(); // 判断当前元素数是否小于最大值 if (c + 1 < capacity) // 如果小于,则还能继续入队操作,唤醒下一条入队线程,如果存在。 // 当前线程已经持有入队锁,无需调用signalNotFull notFull.signal(); } finally { // 释放入队锁 putLock.unlock(); } // 判断入队之前队列是否没有元素 if (c == 0) // 如果成立,则可能会有出队线程处于阻塞,执行唤醒操作 signalNotEmpty(); }
入队的锁获取允许被中断,并且直接向调用者抛出。之后是自旋。如果满足条件,进入
notFull.await();
阻塞当前线程。而后被唤醒,重新判断count.get() == capacity
条件。这个过程可能发生不止一次。入队之后的递增操作把递增前的值保存了下来。出队的唤醒判断时已经放弃了入队锁。假设当前线程在获得锁后,计数器的值是0,出队线程由于判断队列中没有元素而阻塞,在当前线程释放入队锁之后,
if (c == 0)
之前,另一个入队线程完成了递增操作,此时使用if (count.get() == 0)
希望唤醒出队线程,实际情况是count.get() = 1
。被阻塞的出队线程就很难被唤醒或是永远不会被唤醒。阻塞出队
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 获取出队锁,接受中断信号 takeLock.lockInterruptibly(); try { // 旋转判断是否没有元素 while (count.get() == 0) { // 如果成立,则阻塞 notEmpty.await(); } // 出队 x = dequeue(); // 计数器原子递减,并获取递减前的值 c = count.getAndDecrement(); // 判断是否还有元素 if (c > 1) // 如果有,则还能继续出队操作,唤醒下一条出队线程,如果存在。 notEmpty.signal(); } finally { // 释放出队锁 takeLock.unlock(); } // 判断出队之前队列是否已满 if (c == capacity) // 如果成立,则可能会有入队线程处于阻塞,执行唤醒操作 signalNotFull(); return x; }
阻塞超时入队
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // 非空检查 if (e == null) throw new NullPointerException(); // 获得超时纳秒数 long nanos = unit.toNanos(timeout); final int c; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 获取入队锁,接受中断信号 putLock.lockInterruptibly(); try { while (count.get() == capacity) { // 如果参数很奇怪,返回false。 // 程序无法决定超时时间,也不能无限阻塞(违反API约定) if (nanos <= 0L) return false; // 等待相应的时间,超时或被唤醒后,继续旋转 nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
不阻塞的返回值出队
public E poll() { final AtomicInteger count = this.count; // 判断是否没有元素 if (count.get() == 0) // 成立,返回null return null; final E x; final int c; final ReentrantLock takeLock = this.takeLock; // 获取入队锁,不接受中断 takeLock.lock(); try { // 双重检查 if (count.get() == 0) return null; // 出队 x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
不阻塞的异常入队出队
public boolean add(E e) { // 非阻塞入队且判断是否成功 if (offer(e)) return true; else // 入队失败,抛出异常 throw new IllegalStateException("Queue full"); } public E remove() { // 非阻塞出队 E x = poll(); // 判断是否是有效的元素 if (x != null) return x; else // 队列中没有元素,抛出异常 throw new NoSuchElementException(); }
基于异常的API是由AbstractQueue实现的,通过调用
offer
和poll
根据失败抛出异常。
Collection相关API
BlogkcingQueue是为生产者消费者模式设计的,队列同时也可以是容器。所以它支持Collection的API。LinkedBlockingQueue在对Collection的实现上,使用的是完全同步的方式。同时获取入队和出队锁。并且不响应中断请求。
void fullyLock() {
putLock.lock(); // 获取入队锁
takeLock.lock(); // 获取出队锁
}
void fullyUnlock() {
// 以获取时相反的顺序
takeLock.unlock(); // 释放出队锁
putLock.unlock(); // 释放入队锁
}
之后是两个接下来会被使用的函数
// 这个函数被(Iterator)迭代器的remove和LinkedBlockingQueue的remove同时使用
// 迭代器因为每次迭代都只记录当前节点,实现时需要对节点是否已经被其他线程移除进行假设性考虑
void unlink(Node<E> p, Node<E> pred) {
// assert putLock.isHeldByCurrentThread(); 调用者需要拥有入队锁
// assert takeLock.isHeldByCurrentThread(); 调用者需要拥有出队锁
// 不改变p.next,假设p是迭代器的当前节点,不改变next可以使迭代器正确运行
p.item = null;
pred.next = p.next; // 上一个节点的next不再指向p,而指向p.next,从而将p移出了链表
if (last == p)
// 如果被移除的p是尾节点,更改尾节点的指向
last = pred;
if (count.getAndDecrement() == capacity)
// 计数器原子递减前的值如果达到最大限制,则递减后允许入队,唤醒入队线程
notFull.signal();
}
// 从ancestor的位置往下查找p的上一个节点,unlink函数移除节点,需要使用到被移除节点和其上一个节点
Node<E> findPred(Node<E> p, Node<E> ancestor) {
// ancestor节点已经被移除,从head开始
if (ancestor.item == null)
ancestor = head;
// 从ancestor开始,依次向下找,满足ancestor.next == p就会停止。此时ancestor就是p的上一个节点
for (Node<E> q; (q = ancestor.next) != p; )
ancestor = q;
return ancestor;
}
contains(Object o)
public boolean contains(Object o) { // null值直接返回 if (o == null) return false; // 获取锁 fullyLock(); try { // 从head.next开始遍历 for (Node<E> p = head.next; p != null; p = p.next) // Object.equals判断 if (o.equals(p.item)) // 条件成立,返回true return true; // 不包含,返回false return false; } finally { // 释放锁 fullyUnlock(); } }
remove(Object o)
public boolean remove(Object o) { // null值直接返回 if (o == null) return false; // 获取锁 fullyLock(); try { // pred始终指向上一个节点。 // p = 当前节点。如果是null,表示已经遍历完整个链表 for (Node<E> pred = head, p = pred.next; p != null; pred = p, p = p.next) { // Object.equals判断 if (o.equals(p.item)) { // 条件成立,从链表中移除p unlink(p, pred); return true; } } // 没有匹配的元素 return false; } finally { // 释放锁 fullyUnlock(); } }
iterator()
public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { private Node<E> next; // 当前节点的下一个节点 private E nextItem; // 当前节点保存的值,防止外部类的remove()操作而丢失item private Node<E> lastRet; // 当前节点 // 最近一次被remove节点的遍历状态。再次remove可以从这个节点开始,无需从头查找。 private Node<E> ancestor; // 构造函数 Itr() { // 获取锁 fullyLock(); try { // 拿到head.next第一个节点 if ((next = head.next) != null) // 保存节点的值,初始化时next是当前节点 nextItem = next.item; } finally { // 释放锁 fullyUnlock(); } } public boolean hasNext() { return next != null; } public E next() { Node<E> p; // 无效的调用检查,即使hasNext返回false,next依然可以被调用 if ((p = next) == null) throw new NoSuchElementException(); // p即将指向p.next,保存当前节点,用于remove lastRet = p; // 获取当前节点的值 E x = nextItem; // 获取锁 fullyLock(); try { E e = null; // p = next.next,移动到下一节点 // p.item == null,测试下一节点是否已经被其他线程移除 for (p = p.next; p != null && (e = p.item) == null; ) // 如果被移除,继续往下找,直到p.item != null p = succ(p); // 修改成员属性,使next指向有效的next节点 next = p; // 修改成员属性,使next.item被保存,以防止其他线程执行unlink nextItem = e; } finally { // 释放锁 fullyUnlock(); } // 返回当前节点的值 return x; } public void forEachRemaining(Consumer<? super E> action) { // 非空检查 Objects.requireNonNull(action); Node<E> p; // 忽略无效的调用 if ((p = next) == null) return; // 保存当前节点,用于remove lastRet = p; // 成员属性next = null防止其他线程重复调用next或forEachRemaining next = null; // 分批执行的最大长度,每次从队列中获取64个节点执行action。 final int batchSize = 64; // 分批处理的节点元素存储数组 Object[] es = null; int n, len = 1; do { // 获取锁 fullyLock(); try { // 判断节点数组是否被初始化 if (es == null) { // 实始化操作 // 移动节点 p = p.next; // 向下遍历,获取有效节点的个数 // 节点是有效节点,len递增 for (Node<E> q = p; q != null; q = succ(q)) if (q.item != null && ++len == batchSize) break; // 初始化数组,按有效节点的长度,最大64 es = new Object[len]; // 当前节点的值放入首位 es[0] = nextItem; nextItem = null; // 从1开始设置元素 n = 1; } else // 数组已经被初始化,直接从第0个位置开始 n = 0; // 从p开始向下移动,有效的节点值存入数组 for (; p != null && n < len; p = succ(p)) if ((es[n] = p.item) != null) { // 保存当前节点,用于remove lastRet = p; n++; } } finally { // 释放锁 fullyUnlock(); } // 对数组执行一次action。n <= 64 for (int i = 0; i < n; i++) { @SuppressWarnings("unchecked") E e = (E) es[i]; action.accept(e); } // 循环条件判断,直到所有节点都执行过action } while (n > 0 && p != null); } public void remove() { // 获取当前节点 Node<E> p = lastRet; // 非法值,迭代器可能还没有调用过next,抛出异常 if (p == null) throw new IllegalStateException(); // 清理当前节点状态,避免对remove的多次调用 lastRet = null; // 获取锁 fullyLock(); try { // 当前节点已经被移除,其他线程调用了外部类的remove if (p.item != null) { // 如果ancestor已经被初始化,直接从保存的值开始,O(1) // ancestor有可能被移除,这种情况只能从head开始,O(n) if (ancestor == null) // 首次remove操作,从head开始 ancestor = head; // 查找p节点的上一个节点,ancestor.next == p ancestor = findPred(p, ancestor); // 移除节点 unlink(p, ancestor); } } finally { // 释放锁 fullyUnlock(); } } }
LinkedBlockingQueue的主要函数介绍完毕。其他的函数基本上大同小异,思路与写法是差不多的。鉴于篇幅原因,不再详细展开。