LinkedBlockingQueue是一种基于链表实现的可选边界的阻塞队列,该队列排序元素FIFO。队列的队首是在该队列上停留时间最长的元素,队列的队尾是在该队列上停留最短时间的元素。在队列尾部插入新的元素,队列检索操作在队列的头部获取元素。
在大多数并发应用程序中,基于链表实现的队列通常具有比基于数组实现的队列更高的吞吐量,但性能上未必占优势。
LinkedBlockingQueue在初始化时可以指定容量也可以不指定容量。当初始化LinkedBlockingQueue指定容量时,是有界队列;当初始化LinkedBlockingQueue未指定容量时,其内部会以Integer.MAX_VALUE值作为容量。当然,因为Integer.MAX_VALUE值非常大,近似无限大,因此LinkedBlockingQueue未指定容量时也可以近似认为是无界队列。
为防止队列的过度的扩展,建议在LinkedBlockingQueue初始化时指定容量。LinkedBlockingQueue内部的链接节点在每次入队元素时动态创建,除非这会使队列超过容量。
LinkedBlockingQueue类及其迭代器实现了Collection和Iterator接口的所有可选方法。LinkedBlockingQueue是Java Collections Framework的一个成员。
1. LinkedBlockingQueue的声明
LinkedBlockingQueue的接口和继承关系如下
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
…
}
完整的接口继承关系如下图所示。
从上述代码可以看出,LinkedBlockingQueue既实现了BlockingQueue<E>和java.io.Serializable接口,又继承了java.util.AbstractQueue<E>。其中,AbstractQueue是Queue接口的抽象类,此处不再赘述。
2. LinkedBlockingQueue的成员变量和构造函数
以下是LinkedBlockingQueue的构造函数和成员变量。
// 容量
private final int capacity;
// 当前元素个数
private final AtomicInteger count = new AtomicInteger();
// 链表头结点
// 不变式: head.item == null
transient Node<E> head;
// 链表尾结点
// 不变式: last.next == null
private transient Node<E> last;
// 用于锁住take、poll等操作
private final ReentrantLock takeLock = new ReentrantLock();
// 队列非空,唤醒消费者
private final Condition notEmpty = takeLock.newCondition();
// 用于锁住put、offer等操作
private final ReentrantLock putLock = new ReentrantLock();
// 队列非满,唤醒生产者
private final Condition notFull = putLock.newCondition();
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 只锁可见,不互斥
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
从上述代码可以看出,构造函数有三种。构造函数中的参数含义如下
l capacity用于设置队列容量。该参数是可选的,如果未设置,则取Integer.MAX_VALUE值作为容量
l c用于设置最初包含给定集合的元素,按集合迭代器的遍历顺序添加
类成员last和head分别指代链表的尾结点和头结点。链表中的结点用Node类型表示,代码如下:
static class Node<E> {
E item;
/**
* next有以下三种场景:
* - 真正的后继结点
* - 当前结点是头结点,则后继结点是head.next
* - 值为null,表示当前结点是尾结点,没有后继结点
*/
Node<E> next;
Node(E x) { item = x; }
}
访问策略是通过ReentrantLock来实现的。通过两个加锁条件notEmpty、notFull来实现并发控制。与ArrayBlockingQueue所不同的是,LinkedBlockingQueue使用了takeLock和putLock两把锁来分别锁住出队操作和入队操作。
count用于记录当前队列里面的元素个数。
3. LinkedBlockingQueue的核心方法
以下对LinkedBlockingQueue常用核心方法的实现原理进行解释。
3.1. offer(e)
执行offer(e)方法后有两种结果
l 队列未满时,返回 true
l 队列满时,返回 false
LinkedBlockingQueue的offer (e)方法源码如下:
public boolean offer(E e) {
if (e == null) throw new NullPointerException(); // 判空
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
final int c;
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 加锁
try {
if (count.get() == capacity)
return false;
enqueue(node); // 入队
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); // 标识当前队列非满
} finally {
putLock.unlock(); // 解锁
}
if (c == 0)
signalNotEmpty(); // 标识当前队列已经是非空
return true;
}
从上面代码可以看出,执行offer(e)方法时,分为以下几个步骤:
l 判断待入队的元素e是否为null。为null则抛出NullPointerException异常。
l 判断count是否超过了容量的限制,如果是则证明队列已经满了,直接返回false。
l 为了确保并发操作的安全先做了加锁处理。
l 再次判断count是否超过了容量的限制,如果是则证明队列已经满了,直接返回false;否则将元素e做入队处理,并返回true。
l 解锁。
l c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。
enqueue(node)方法代码如下:
private void enqueue(Node<E> node) {
last = last.next = node;
}
enqueue(node)方法就在链表的尾部插入数据元素。
signalNotEmpty()方法代码如下:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
思考:细心的读者可能会发现,在offer (e)方法方法中做了两次判断count是否超过了容量的限制。那么为什么要判断两次呢?
3.2. put(e)
执行put(e)方法后有两种结果:
•
l 队列未满时,直接插入没有返回值
l 队列满时,会阻塞等待,一直等到队列未满时再插入
LinkedBlockingQueue的put(e)方法源码如下:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final int c;
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); // 获取锁
try {
while (count.get() == capacity) {
notFull.await(); // 使线程等待
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); // 标识当前队列非满
} finally {
putLock.unlock(); // 解锁
}
if (c == 0)
signalNotEmpty(); // 标识当前队列已经是非空
}
从上面代码可以看出,put(e)方法的实现,分为以下几个步骤:
l 先是要获取锁。
l 而后判断count是否等于容量,如果是则证明队列已经满了,就等待;否则执行enqueue(e)方法做元素的入队。
l 解锁。
l c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。
3.3. offer(e,time,unit)
offer(e,time,unit)方法与offer(e)方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false。执行offer(e,time,unit)方法有两种结果:
•
l 队列未满时,返回 true
l 队列满时,会阻塞等待,如果在指定时间内还不能往队列中插入数据则返回 false
LinkedBlockingQueue的offer(e,time,unit)方法源码如下:
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final int c;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); // 获取锁
try {
while (count.get() == capacity) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos); // 使线程等待指定的时间
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); // 标识当前队列非满
} finally {
putLock.unlock(); // 解锁
}
if (c == 0)
signalNotEmpty(); // 标识当前队列已经是非空
return true;
}
从上面代码可以看出,offer(e,time,unit)方法的实现,分为以下几个步骤:
l 先是要获取锁。
l 而后判断count是否等于容量,如果是则证明队列已经满了,就等待;否则执行enqueue(e)方法做元素的入队。
l 解锁。
l c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。
3.4. add(e)
执行add(e)方法后有两种结果
l 队列未满时,返回 true
l 队列满时,则抛出异常
ArrayBlockingQueue的add(e)方法源码如下:
public boolean add(E e) {
return super.add(e);
}
从上面代码可以看出,add(e)方法的实现,直接是调用了父类AbstractQueue的add(e)方法。而AbstractQueue的add(e)方法源码如下:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
从上面代码可以看出,add(e)方法又调用了offer(e)方法。offer(e)方法此处不再赘述。
3.5. poll ()
执行poll ()方法后有两种结果:
l 队列不为空时,返回队首值并移除
l 队列为空时,返回 null
LinkedBlockingQueue的poll ()方法源码如下:
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
final E x;
final int c;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock(); // 加锁
try {
if (count.get() == 0)
return null;
x = dequeue(); // 出队
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal(); // 标识当前队列非空
} finally {
takeLock.unlock(); // 解锁
}
if (c == capacity)
signalNotFull(); // 标识当前队列已经是非满
return x;
}
从上面代码可以看出,执行poll()方法时,分为以下几个步骤:
l 先是判断count是否等于0,如果等于0则证明队列为空,直接返回null。
l 为了确保并发操作的安全先做了加锁处理。
l 再次判断count是否等于0,如果等于0则证明队列为空,直接返回null;否则执行dequeue()方法做元素的出队。
l 解锁。
l c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。
dequeue()方法源码如下:
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // 利于GC
head = first;
E x = first.item;
first.item = null;
return x;
}
上面代码比较简单,就是移除链表的头结点。
3.6. take()
执行take()方法后有两种结果:
l 队列不为空时,返回队首值并移除
l 队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值
LinkedBlockingQueue的take ()方法源码如下:
public E take() throws InterruptedException {
final E x;
final int c;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // 获取锁
try {
while (count.get() == 0) {
notEmpty.await(); // 使线程等待
}
x = dequeue(); // 出队
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal(); // 标识当前队列非空
} finally {
takeLock.unlock(); // 解锁
}
if (c == capacity)
signalNotFull(); // 标识当前队列已经是非满
return x;
}
从上面代码可以看出,执行take()方法时,分为以下几个步骤:
l 先是要获取锁。
l 而后判断count是否等于0,如果等于0则证明队列为空,会阻塞等待;否则执行dequeue()方法做元素的出队。
l 解锁。
l c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。
dequeue()和signalNotFull ()方法此处不再赘述。
3.7. poll(time,unit)
poll(time,unit)方法与poll()方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内队列还为空,则返回null。执行poll(time,unit)方法后有两种结果:
l 队列不为空时,返回队首值并移除
l 队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null
LinkedBlockingQueue的poll(time,unit)方法源码如下:
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
final E x;
final int c;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // 获取锁
try {
while (count.get() == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos); // 使线程等待指定的时间
}
x = dequeue(); // 出队
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal(); // 标识当前队列非空
} finally {
takeLock.unlock(); // 解锁
}
if (c == capacity)
signalNotFull(); // 标识当前队列已经是非满
return x;
}
从上面代码可以看出,执行poll(time,unit)方法时,分为以下几个步骤:
l 先是要获取锁。
l 而后判断count是否等于0,如果等于0则证明队列为空,会阻塞等待;否则执行dequeue()方法做元素的出队。
l 解锁。
l c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。
dequeue()和signalNotFull ()方法此处不再赘述。
3.8. remove()
执行remove()方法后有两种结果:
l 队列不为空时,返回队首值并移除
l 队列为空时,抛出异常
LinkedBlockingQueue的remove()方法其实是调用了父类AbstractQueue的remove ()方法,源码如下:
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
从上面代码可以看出,remove()直接调用了poll()方法。如果poll()方法返回结果为null,则抛出NoSuchElementException异常。
poll()方法此处不再赘述。
3.9. peek()
执行peek()方法后有两种结果:
l 队列不为空时,返回队首值但不移除
l 队列为空时,返回null
peek()方法源码如下:
public E peek() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock(); // 加锁
try {
return (count.get() > 0) ? head.next.item : null; // 空则返回null
} finally {
takeLock.unlock(); // 解锁
}
}
从上面代码可以看出,peek()方法比较简单,直接就是获取了链表里面头结点的元素值。
3.10. element()
执行element()方法后有两种结果:
l 队列不为空时,返回队首值但不移除
l 队列为空时,抛出异常
element()方法其实是调用了父类AbstractQueue的element()方法,源码如下:
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
从上面代码可以看出,执行element()方法时,先是获取peek()方法的结果,如果结果是null,则抛出NoSuchElementException异常。
4. LinkedBlockingQueue的单元测试
LinkedBlockingQueue的单元测试如下:
package com.waylau.java.demo.datastructure;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
/**
* LinkedBlockingQueue Test
*
* @since 1.0.0 2020年5月24日
* @author <a href="https://waylau.com">Way Lau</a>
*/
class LinkedBlockingQueueTests {
@Test
void testOffer() {
// 初始化队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
// 测试队列未满时,返回 true
boolean resultNotFull = queue.offer("Java");
assertTrue(resultNotFull);
// 测试队列满则,返回 false
queue.offer("C");
queue.offer("Python");
boolean resultFull = queue.offer("C++");
assertFalse(resultFull);
}
@Test
void testPut() throws InterruptedException {
// 初始化队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
// 测试队列未满时,直接插入没有返回值;
queue.put("Java");
// 测试队列满则, 会阻塞等待,一直等到队列未满时再插入。
queue.put("C");
queue.put("Python");
queue.put("C++"); // 阻塞等待
}
@Test
void testOfferTime() throws InterruptedException {
// 初始化队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
// 测试队列未满时,返回 true
boolean resultNotFull = queue.offer("Java", 5, TimeUnit.SECONDS);
assertTrue(resultNotFull);
// 测试队列满则,返回 false
queue.offer("C");
queue.offer("Python");
boolean resultFull = queue.offer("C++", 5, TimeUnit.SECONDS); // 等5秒
assertFalse(resultFull);
}
@Test
void testAdd() {
// 初始化队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
// 测试队列未满时,返回 true
boolean resultNotFull = queue.add("Java");
assertTrue(resultNotFull);
// 测试队列满则抛出异常
queue.add("C");
queue.add("Python");
Throwable excpetion = assertThrows(IllegalStateException.class, () -> {
queue.add("C++");// 抛异常
});
assertEquals("Queue full", excpetion.getMessage());
}
@Test
void testPoll() throws InterruptedException {
// 初始化队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
// 测试队列为空时,返回 null
String resultEmpty = queue.poll();
assertNull(resultEmpty);
// 测试队列不为空时,返回队首值并移除
queue.put("Java");
queue.put("C");
queue.put("Python");
String resultNotEmpty = queue.poll();
assertEquals("Java", resultNotEmpty);
}
@Test
void testTake() throws InterruptedException {
// 初始化队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
// 测试队列不为空时,返回队首值并移除
queue.put("Java");
queue.put("C");
queue.put("Python");
String resultNotEmpty = queue.take();
assertEquals("Java", resultNotEmpty);
// 测试队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值
queue.clear();
String resultEmpty = queue.take(); // 阻塞等待
assertNotNull(resultEmpty);
}
@Test
void testPollTime() throws InterruptedException {
// 初始化队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
// 测试队列不为空时,返回队首值并移除
queue.put("Java");
queue.put("C");
queue.put("Python");
String resultNotEmpty = queue.poll(5, TimeUnit.SECONDS);
assertEquals("Java", resultNotEmpty);
// 测试队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null
queue.clear();
String resultEmpty = queue.poll(5, TimeUnit.SECONDS); // 等待5秒
assertNull(resultEmpty);
}
@Test
void testRemove() throws InterruptedException {
// 初始化队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
// 测试队列为空时,抛出异常
Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {
queue.remove();// 抛异常
});
assertEquals(null, excpetion.getMessage());
// 测试队列不为空时,返回队首值并移除
queue.put("Java");
queue.put("C");
queue.put("Python");
String resultNotEmpty = queue.remove();
assertEquals("Java", resultNotEmpty);
}
@Test
void testPeek() throws InterruptedException {
// 初始化队列
Queue<String> queue = new LinkedBlockingQueue<String>(3);
// 测试队列不为空时,返回队首值并但不移除
queue.add("Java");
queue.add("C");
queue.add("Python");
String resultNotEmpty = queue.peek();
assertEquals("Java", resultNotEmpty);
resultNotEmpty = queue.peek();
assertEquals("Java", resultNotEmpty);
resultNotEmpty = queue.peek();
assertEquals("Java", resultNotEmpty);
// 测试队列为空时,返回null
queue.clear();
String resultEmpty = queue.peek();
assertNull(resultEmpty);
}
@Test
void testElement() throws InterruptedException {
// 初始化队列
Queue<String> queue = new LinkedBlockingQueue<String>(3);
// 测试队列不为空时,返回队首值并但不移除
queue.add("Java");
queue.add("C");
queue.add("Python");
String resultNotEmpty = queue.element();
assertEquals("Java", resultNotEmpty);
resultNotEmpty = queue.element();
assertEquals("Java", resultNotEmpty);
resultNotEmpty = queue.element();
assertEquals("Java", resultNotEmpty);
// 测试队列为空时,抛出异常
queue.clear();
Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {
queue.element();// 抛异常
});
assertEquals(null, excpetion.getMessage());
}
}
5. LinkedBlockingQueue的应用案例
以下是一个生产者-消费者的示例。该示例模拟了1个生产者,2个消费者。当队列满时,则会阻塞生产者生产;当队列空时,则会阻塞消费者消费。
package com.waylau.java.demo.datastructure;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* LinkedBlockingQueue Demo
*
* @since 1.0.0 2020年5月23日
* @author <a href="https://waylau.com">Way Lau</a>
*/
public class LinkedBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
// 1个生产者
Producer p = new Producer(queue);
// 2个消费者
Consumer c1 = new Consumer("c1", queue);
Consumer c2 = new Consumer("c2", queue);
// 启动线程
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
class Producer implements Runnable {
private final BlockingQueue<String> queue;
Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
// 模拟耗时操作
Thread.sleep(1000L);
queue.put(produce());
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
String produce() {
String apple = "apple: " + System.currentTimeMillis();
System.out.println("produce " + apple);
return apple;
}
}
class Consumer implements Runnable {
private final BlockingQueue<String> queue;
private final String name;
Consumer(String name, BlockingQueue<String> queue) {
this.queue = queue;
this.name = name;
}
public void run() {
try {
while (true) {
// 模拟耗时操作
Thread.sleep(2000L);
consume(queue.take());
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
void consume(Object x) {
System.out.println(this.name + " consume " + x);
}
}
运行上述程序,输出内容如下:
produce apple: 1590308520134
c1 consume apple: 1590308520134
produce apple: 1590308521135
c2 consume apple: 1590308521135
produce apple: 1590308522142
c1 consume apple: 1590308522142
produce apple: 1590308523147
c2 consume apple: 1590308523147
produce apple: 1590308524156
c1 consume apple: 1590308524156
produce apple: 1590308525157
c2 consume apple: 1590308525157
produce apple: 1590308526157
c1 consume apple: 1590308526157
produce apple: 1590308527157
c2 consume apple: 1590308527157
6. 参考引用
本系列归档至《Java数据结构及算法实战》:https://github.com/waylau/java-data-structures-and-algorithms-in-action
《数据结构和算法基础(Java语言实现)》(柳伟卫著,北京大学出版社出版):https://item.jd.com/13014179.html