根据类名,可以知道这个数据结构是队列,因此数据的进出顺序是FIFO;阻塞的含义为,当需要生产/消费时,队列没有空间/数据,则对应的操作会阻塞住,直到其他操作解除这个阻塞状态
ArrayBlockingQueue作为阻塞队列,特点是内部使用Object数组(使用成了环形),并且创建时需要指定大小,有两个指针分别对应生产和消费操作
里面的迭代器弄个专题一起看吧
成员变量:
/** 队列里元素数量 */ int count; /** 存储结构 */ final Object[] items; /** 为下一次执行 take, poll, peek or remove 操作提供的index */ int takeIndex; /** 为下一次执行 put, offer, or add 操作提供的index */ int putIndex; /** 进行并发控制,以及线程间通信的锁(通信主要靠condition实现) */ final ReentrantLock lock; /** 当队列为空时获取等待 */ private final Condition notEmpty; /** 当队列满时插入等待 */ private final Condition notFull;
主要方法:
// 在不超过队列容量的情况下立即插入指定的元素,成功后返回true,如果队列已满则抛出IllegalStateException。 public boolean add(E e) // 在不超过队列容量的情况下立即在队列末尾插入指定的元素,如果成功则返回true,如果队列已满则返回false。此方法通常比add(E)方法更好,后者插入元素失败只能抛出异常。 public boolean offer(E e) // 将指定的元素插入到此队列的末尾,如果队列已满则等待直到有可用的空间。 public void put(E e) throws InterruptedException // 将指定的元素插入到此队列的末尾,如果队列已满,则在指定的超时时间之内等待空间可用,超时返回false。 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException // 检索并删除此队列的头,如果此队列为空,则返回null。 public E poll() // 检索并删除此队列的头,如有必要则等待,直到某个元素可用为止。 public E take() throws InterruptedException // 检索并删除此队列的头,如果有必要则在指定的等待时间之内等待元素可用,超时返回null。 public E poll(long timeout, TimeUnit unit) throws InterruptedException // 检索但不删除此队列的头,或在此队列为空时返回null。 public E peek() // 返回此队列中的元素数量。 public int size() // 返回此队列在理想情况下(在没有内存或资源约束的情况下)可以不阻塞地接受的新元素的数量。它总是等于这个队列的初始容量减去这个队列的当前大小。 public int remainingCapacity() // 如果指定元素存在,则从此队列中移除该元素的单个实例。更正式地说,如果队列中包含一个或多个这样的元素,则只删除匹配到的第一个元素 public boolean remove(Object o) // 如果此队列包含至少一个指定的元素,则返回true。 public boolean contains(Object o) // 返回一个数组,该数组包含此队列中的所有元素,按适当的顺序排列。返回的数组将是“安全的”,因为此队列不维护对它的引用。 public Object[] toArray() // 返回一个数组,该数组包含此队列中的所有元素,按适当的顺序排列;返回数组的运行时类型是指定数组的运行时类型。 public <T> T[] toArray(T[] a) // 返回此集合的字符串表示形式。 public String toString() // 删除此队列中的所有元素。此调用返回后,队列将为空。 public void clear() // 从此队列中删除所有可用元素并将它们添加到给定集合中。此操作可能比重复轮询此队列更有效。在试图将元素添加到集合c时遇到失败抛出相关异常时可能会导致:元素不在原集合或者集合c中,或者两个集合中都没有。 public int drainTo(Collection<? super E> c) // 从该队列中最多删除给定数量的可用元素,并将它们添加到给定集合中。异常情况同上 public int drainTo(Collection<? super E> c, int maxElements) // 按适当的顺序返回此队列中元素的迭代器。元素将按从第一个(head)到最后一个(tail)的顺序返回。返回的迭代器是弱一致的。 public Iterator<E> iterator() // 返回该队列中元素的Spliterator。返回的spliterator是弱一致的。 public Spliterator<E> spliterator()
方法中能和阻塞联系起来的,是put和take,同时offer和poll,也提供了对应的超时控制,重点关注这四个方法
/** * Throws NullPointerException if argument is null. * * @param v the element */ private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); }
/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; //获取到Object[]的引用 final Object[] items = this.items; //将putIndex位置上位置为x items[putIndex] = x; //把数组当成了一个环形的去使用 if (++putIndex == items.length) putIndex = 0; count++; //通知说现在队列有数据了 notEmpty.signal(); }
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { //对e进行非空判断 checkNotNull(e); final ReentrantLock lock = this.lock; //加锁 lock.lockInterruptibly(); try { //核心是这个while循环,阻塞在notFull上,等待notFull.signal //关于conditon,还需要再搞搞AQS的源码,不然不懂为啥这里是while,不能用if while (count == items.length) notFull.await(); //将e入队 //这里为什么不需要对迭代器进行操作呢? enqueue(e); } finally { //释放锁 lock.unlock(); } }
/** * Inserts the specified element at the tail of this queue, waiting * up to the specified wait time for space to become available if * the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; //加锁 lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; //这个方法也是在AQS里面实现的,后面再专门弄下吧 //我猜测是等待nanos时间,到点了就返回一个《=0的值 nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
/** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") //保存takeIndex对应的数据 E x = (E) items[takeIndex]; items[takeIndex] = null; //当作一个环形数组使用 if (++takeIndex == items.length) takeIndex = 0; count--; //这里对我来说也是很不熟练的地方,关于迭代器,后面也得专门弄一弄,比如ArrayList,Map之类的 if (itrs != null) itrs.elementDequeued(); //发送notFull的信号 notFull.signal(); //返回保存的中间结果 return x; }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) //当前队列没有数据,则等待直到notEmpty发来信号 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
和前面offer一样,都只是加上了一个等待时间的机制
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
/** * Deletes item at array index removeIndex. * Utility for remove(Object) and iterator.remove. * Call only when holding lock. */ void removeAt(final int removeIndex) { // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items; //如果要删除的下标刚好是takeIndex,当作一次普通的出队即可 if (removeIndex == takeIndex) { // removing front item; just advance items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; //对迭代器进行操作 if (itrs != null) itrs.elementDequeued(); } else { // an "interior" remove // slide over all others up through putIndex. final int putIndex = this.putIndex; //把removeIndex后面的数据,都向前挪动一位 for (int i = removeIndex;;) { int next = i + 1; //根据环形队列,来获取i的next下标 if (next == items.length) next = 0; //如果next不是putIndex,说明不为null,向前挪动 if (next != putIndex) { items[i] = items[next]; i = next; } else { //此时 i == putIndex,将i处的数据置为null(上一步已经向前挪动了) //将putIndex更新为i items[i] = null; this.putIndex = i; break; } } count--; //更新迭代器 if (itrs != null) itrs.removedAt(removeIndex); } //发送notFull的信号 notFull.signal(); }
特定删除,服用了removeAt()方法
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { //先找到o对应的下标i if (o.equals(items[i])) { //针对具体的下标,进行删除 removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex) //如果i遍历到了putIndex,说明队列中没有o; } return false; } finally { lock.unlock(); } }