Java教程

06、Java进阶--阻塞队列

本文主要是介绍06、Java进阶--阻塞队列,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

阻塞队列

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列有两个常见的阻塞场景,它们分别是:

(1)当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。

(2)当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

支持以上两种阻塞场景的队列被称为阻塞队列。

BlockingQueue核心方法

offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里。

offer(E o,long timeout,TimeUnit unit):可以设定等待的时间。

put(anObject):将anObject加到BlockingQueue里。

poll(time):取走 BlockingQueue 里排在首位的对象。

poll(long timeout,TimeUnit unit):从BlockingQueue中取出一个队首的对象。

take():取走BlockingQueue里排在首位的对象。

drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数)。

Java中的阻塞队列

在Java中提供了7个阻塞队列,它们分别如下所示。

名称 描述
ArrayBlockingQueue 由数组结构组成的有界阻塞队列。
LinkedBlockingQueue 由链表结构组成的有界阻塞队列。
PriorityBlockingQueue 支持优先级排序的无界阻塞队列。
DelayQueue 使用优先级队列实现的无界阻塞队列。
SynchronousQueue 不存储元素的阻塞队列。
LinkedTransferQueue 由链表结构组成的无界阻塞队列。
LinkedBlockingDeque 由链表结构组成的双向阻塞队列。

下面分别介绍这些阻塞队列。

1、ArrayBlockingQueue

它是用数组实现的有界阻塞队列,并按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平地访问队列。

公平访问队列就是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列。即先阻塞的生产者线程,可以先往队列里插入元素;

先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列,如下所示:

ArrayBlockingQueue fairQueue=new ArrayBlockingQueue(2000,true);

2、LinkedBlockingQueue

它是基于链表的阻塞队列,同ArrayListBlockingQueue类似,此队列按照先进先出(FIFO)的原则对元素进行排序,其内部也维持着一个数据缓冲队列。

如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE)。

如果生产者速度大于消费者速度,还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽。

3、PriorityBlockingQueue

它是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。

这里可以自定义实现compareTo()方法来指定元素进行排序规则;或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。

但其不能保证同优先级元素的顺序。

4、DelayQueue

它是一个支持延时获取元素的无界阻塞队列。

队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口。创建元素时,可以指定元素到期的时间,只有在元素到期时才能从队列中取走。

5、SynchronousQueue

它是一个不存储元素的阻塞队列。

每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此此队列内部其实没有任何一个元素,或者说容量是0,

严格来说它并不是一种容器。由于队列没有容量,因此不能调用peek操作。

6、LinkedTransferQueue

它是一个由链表结构组成的无界阻塞TransferQueue队列。

LinkedTransferQueue实现了一个重要的接口TransferQueue。该接口含有5个方法,其中有3个重要的方法,它们分别如下所示:

transfer(E e):若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者;如果没有消费者在等待接收数据,就会将元素插入到队列尾部,并且等待进入阻塞状态,直到有消费者线程 取走该元素。

tryTransfer(E e):若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者;若不存在,则返回 false,并且不进入队列,这是一个不阻塞的操作。

tryTransfer(E e,long timeout,TimeUnit unit):若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者;若不存在则将元素插入到队列尾部,并且等待消费者线程取走该元素。

7、LinkedBlockingDeque

它是一个由链表结构组成的双向阻塞队列。

双向队列可以从队列的两端插入和移出元素,因此在多线程同时入队时,也就减少了一半的竞争。由于是双向的,因此LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast等方法。

阻塞队列的原理

以ArrayBlockingQueue为例,我们先来看看代码,如下所示:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -817911632652898426L;
    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    ......
}

ArrayBlockingQueue 是维护一个 Object 类型的数组,takeIndex 和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数,lock则是一个可重入锁,notEmpty和notFull是等待条件。接下来我们看看关键方法put,代码如下所示:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

从 put 方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待。

当此线程被其他线程唤醒时,通过enqueue(e)方法插入元素,最后解锁。接下来看看enqueue(e)方法,如下所示:

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

插入成功后,通过notEmpty唤醒正在等待取元素的线程。再来看看take方法。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

跟put方法实现类似,put方法等待的是notFull信号,而take方法等待的是notEmpty信号。

在take方法中,如果可以取元素,则通过dequeue方法取得元素。下面是dequeue方法的实现。

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

跟enqueue方法类似,在获取元素后,通过notFull的signal方法来唤醒正在等待插入元素的线程。

阻塞队列的使用场景

除了线程池的实现使用阻塞队列外,我们还可以在生产者-消费者模式中使用阻塞队列:首先使用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者模式,代码如下所示:

public class Main {
    private static int queueSize = 10;
    private static PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    public static void main(String[] args) {
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        producer.start();
        consumer.start();
    }

    static class Consumer extends Thread {
        @Override
        public void run() {
            while (true){
                synchronized (queue){
                    while (queue.size() == 0){
                        try {
                            System.out.println("队列空,等待数据");
                            queue.wait();
                        }catch (InterruptedException e){
                            e.printStackTrace();
                            queue.notify();
                        }
                    }
                    // 每次移走队首元素
                    queue.poll();
                    queue.notify();
                }
            }
        }
    }

    static class Producer extends Thread {
        @Override
        public void run() {
            while (true){
                synchronized (queue){
                    while (queue.size() == queueSize){
                        try {
                            System.out.println("队列满,等待有空余空间");
                            queue.wait();
                        }catch (InterruptedException e){
                            e.printStackTrace();
                            queue.notify();
                        }
                    }
                    // 每次插入一个元素
                    queue.offer(1);
                    queue.notify();
                }
            }
        }
    }
}

下面是使用阻塞队列实现的生产者-消费者模式:

public class Main {
    private static int queueSize = 10;
    private static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);

    public static void main(String[] args) {
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        producer.start();
        consumer.start();
    }

    static class Consumer extends Thread {
        @Override
        public void run() {
            while(true){
                try {
                    queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Producer extends Thread {
        @Override
        public void run() {
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

很显然,使用阻塞队列实现无须单独考虑同步和线程间通信的问题,其实现起来很简单。

这篇关于06、Java进阶--阻塞队列的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!