一个阻塞队列,注意只要有Blocking,都是阻塞的,要阻塞,那容量必须是固定的,在构造方法中指定数量.
如果已经Full,那么put操作会一直等待有空位置出来。
如果已经Empty,那么take会一直等有新元素进来.
package com.abc.test; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 注意:以下测试用例只能单独执行. */ public class ArrayBlockingQueueTest { public static class MyElement { final int order; final String name; public MyElement(String name, int order) { this.name = name; this.order = order; } } ArrayBlockingQueue<MyElement> arrayBlockingQueue = new ArrayBlockingQueue<>(5); //注意线程池数目不能设置为1个. ExecutorService pools = Executors.newFixedThreadPool(10); /** * 先准备5个元素,供每个测试用例使用. */ @Before public void init() { arrayBlockingQueue.add(new MyElement("1", 1)); arrayBlockingQueue.add(new MyElement("3", 3)); arrayBlockingQueue.add(new MyElement("5", 5)); arrayBlockingQueue.add(new MyElement("10", 10)); arrayBlockingQueue.add(new MyElement("15", 15)); } /** * 只能放5个元素,继续add会报错. */ @Test(expected = IllegalStateException.class) public void testFullAndAdd() { arrayBlockingQueue.add(new MyElement("16", 16)); } /** * 只能放5个元素,offer不成功会返回false. */ @Test public void testFullAndOffer() { Assert.assertFalse(arrayBlockingQueue.offer(new MyElement("16", 16))); } /** * poll. */ @Test public void testPoll() throws InterruptedException { MyElement element = arrayBlockingQueue.poll(); Assert.assertEquals(element.order, 15); Assert.assertEquals(arrayBlockingQueue.size(), 4); MyElement element2 = arrayBlockingQueue.poll(); Assert.assertEquals(element2.order, 1); Assert.assertEquals(arrayBlockingQueue.size(), 3); } /** * put的时候如果已经full会等待有多余位置出来. */ @Test public void testFullAndPut() throws InterruptedException { pools.execute(new Runnable() { @Override public void run() { try { arrayBlockingQueue.put(new MyElement("16", 16)); System.out.println("after put"); } catch (InterruptedException e) { e.printStackTrace(); } } }); pools.execute(new Runnable() { @Override public void run() { try { Thread.sleep(1000 * 5); } catch (InterruptedException e) { e.printStackTrace(); } arrayBlockingQueue.poll(); System.out.println("after poll"); } }); Thread.sleep(2000); { MyElement[] array = new MyElement[5]; arrayBlockingQueue.toArray(array); MyElement last = array[array.length - 1]; Assert.assertEquals(last.order, 15); } pools.awaitTermination(8, TimeUnit.SECONDS); { MyElement[] array = new MyElement[5]; arrayBlockingQueue.toArray(array); MyElement last = array[array.length - 1]; Assert.assertEquals(last.order, 16); } MyElement element = arrayBlockingQueue.poll(); Assert.assertEquals(element.order, 3); Assert.assertEquals(arrayBlockingQueue.size(), 4); } /** * take如果为空会等待. */ @Test public void testTake() throws InterruptedException { arrayBlockingQueue.clear(); pools.execute(new Runnable() { @Override public void run() { try { MyElement element = arrayBlockingQueue.take(); Assert.assertEquals(element.order, 100); System.out.println("after put"); } catch (InterruptedException e) { e.printStackTrace(); } } }); pools.execute(new Runnable() { @Override public void run() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } try { arrayBlockingQueue.put(new MyElement("100", 100)); } catch (InterruptedException e) { e.printStackTrace(); } } }); pools.awaitTermination(8, TimeUnit.SECONDS); } }