阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列有两个常见的阻塞场景,它们分别是:
(1)当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
(2)当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
支持以上两种阻塞场景的队列被称为阻塞队列。
BlockingQueue核心方法
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里。
offer(E o,long timeout,TimeUnit unit):可以设定等待的时间。
put(anObject):将anObject加到BlockingQueue里。
poll(time):取走 BlockingQueue 里排在首位的对象。
poll(long timeout,TimeUnit unit):从BlockingQueue中取出一个队首的对象。
take():取走BlockingQueue里排在首位的对象。
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数)。
在Java中提供了7个阻塞队列,它们分别如下所示。
名称 | 描述 |
---|---|
ArrayBlockingQueue | 由数组结构组成的有界阻塞队列。 |
LinkedBlockingQueue | 由链表结构组成的有界阻塞队列。 |
PriorityBlockingQueue | 支持优先级排序的无界阻塞队列。 |
DelayQueue | 使用优先级队列实现的无界阻塞队列。 |
SynchronousQueue | 不存储元素的阻塞队列。 |
LinkedTransferQueue | 由链表结构组成的无界阻塞队列。 |
LinkedBlockingDeque | 由链表结构组成的双向阻塞队列。 |
下面分别介绍这些阻塞队列。
1、ArrayBlockingQueue
它是用数组实现的有界阻塞队列,并按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平地访问队列。
公平访问队列就是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列。即先阻塞的生产者线程,可以先往队列里插入元素;
先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列,如下所示:
ArrayBlockingQueue fairQueue=new ArrayBlockingQueue(2000,true);
2、LinkedBlockingQueue
它是基于链表的阻塞队列,同ArrayListBlockingQueue类似,此队列按照先进先出(FIFO)的原则对元素进行排序,其内部也维持着一个数据缓冲队列。
如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE)。
如果生产者速度大于消费者速度,还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽。
3、PriorityBlockingQueue
它是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。
这里可以自定义实现compareTo()方法来指定元素进行排序规则;或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。
但其不能保证同优先级元素的顺序。
4、DelayQueue
它是一个支持延时获取元素的无界阻塞队列。
队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口。创建元素时,可以指定元素到期的时间,只有在元素到期时才能从队列中取走。
5、SynchronousQueue
它是一个不存储元素的阻塞队列。
每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此此队列内部其实没有任何一个元素,或者说容量是0,
严格来说它并不是一种容器。由于队列没有容量,因此不能调用peek操作。
6、LinkedTransferQueue
它是一个由链表结构组成的无界阻塞TransferQueue队列。
LinkedTransferQueue实现了一个重要的接口TransferQueue。该接口含有5个方法,其中有3个重要的方法,它们分别如下所示:
transfer(E e):若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者;如果没有消费者在等待接收数据,就会将元素插入到队列尾部,并且等待进入阻塞状态,直到有消费者线程 取走该元素。
tryTransfer(E e):若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者;若不存在,则返回 false,并且不进入队列,这是一个不阻塞的操作。
tryTransfer(E e,long timeout,TimeUnit unit):若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者;若不存在则将元素插入到队列尾部,并且等待消费者线程取走该元素。
7、LinkedBlockingDeque
它是一个由链表结构组成的双向阻塞队列。
双向队列可以从队列的两端插入和移出元素,因此在多线程同时入队时,也就减少了一半的竞争。由于是双向的,因此LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast等方法。
以ArrayBlockingQueue为例,我们先来看看代码,如下所示:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; ...... }
ArrayBlockingQueue 是维护一个 Object 类型的数组,takeIndex 和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数,lock则是一个可重入锁,notEmpty和notFull是等待条件。接下来我们看看关键方法put,代码如下所示:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
从 put 方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待。
当此线程被其他线程唤醒时,通过enqueue(e)方法插入元素,最后解锁。接下来看看enqueue(e)方法,如下所示:
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
插入成功后,通过notEmpty唤醒正在等待取元素的线程。再来看看take方法。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
跟put方法实现类似,put方法等待的是notFull信号,而take方法等待的是notEmpty信号。
在take方法中,如果可以取元素,则通过dequeue方法取得元素。下面是dequeue方法的实现。
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
跟enqueue方法类似,在获取元素后,通过notFull的signal方法来唤醒正在等待插入元素的线程。
除了线程池的实现使用阻塞队列外,我们还可以在生产者-消费者模式中使用阻塞队列:首先使用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者模式,代码如下所示:
public class Main { private static int queueSize = 10; private static PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize); public static void main(String[] args) { Producer producer = new Producer(); Consumer consumer = new Consumer(); producer.start(); consumer.start(); } static class Consumer extends Thread { @Override public void run() { while (true){ synchronized (queue){ while (queue.size() == 0){ try { System.out.println("队列空,等待数据"); queue.wait(); }catch (InterruptedException e){ e.printStackTrace(); queue.notify(); } } // 每次移走队首元素 queue.poll(); queue.notify(); } } } } static class Producer extends Thread { @Override public void run() { while (true){ synchronized (queue){ while (queue.size() == queueSize){ try { System.out.println("队列满,等待有空余空间"); queue.wait(); }catch (InterruptedException e){ e.printStackTrace(); queue.notify(); } } // 每次插入一个元素 queue.offer(1); queue.notify(); } } } } }
下面是使用阻塞队列实现的生产者-消费者模式:
public class Main { private static int queueSize = 10; private static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize); public static void main(String[] args) { Producer producer = new Producer(); Consumer consumer = new Consumer(); producer.start(); consumer.start(); } static class Consumer extends Thread { @Override public void run() { while(true){ try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Producer extends Thread { @Override public void run() { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }
很显然,使用阻塞队列实现无须单独考虑同步和线程间通信的问题,其实现起来很简单。