记得以前有个最经典的面试题:如何用多个线程顺序的从1输出到100?
上章说了Java中锁的使用以及原理分析,上述面试题应该手到擒来了吧
本章主要说下Java中线程通信实现生产消费队列
以及Condition源码
利用共享锁的互斥实现两个线程通信,从而实现生产消费队列
//定义一个队列 static Queue<Integer> list = new LinkedList<>(); //定义队列的大小 static int size = 10; //生产这代码 放线程执行 public static void producer() throws InterruptedException { int i = 0; while (true) { i++; //给队列加锁 synchronized (list) { if (list.size() == size) { System.out.println("队列满了"); //队列满了就等待,消费者消费后又会唤醒生产者 list.wait(); } Thread.sleep(1000); list.add(i); System.out.println("生产者添加:" + i); //唤醒线程后会从wait后面接着执行 意味着又是去抢占到锁才能继续执行 list.notify(); } } } //消费者代码 public static void comsume() throws InterruptedException { while (true) { synchronized (list) { if (list.size() == 0) { System.out.println("队列空了"); //队列空了就等待,生产者生产了就会唤醒消费者 list.wait(); } Thread.sleep(1000); Integer remove = list.remove(); System.out.println("消费者消费:" + remove); list.notify(); } } }
signal/await
等价于synchronized的notify/wait
务必先了解上一篇文章中的内容
)Lock lock = new ReentrantLock(); //与synchronized不同的是Condition里面可以用多个队列 放不同的线程 Condition addCondition = lock.newCondition(); Condition removeCondition = lock.newCondition(); int count = 10; List<String> list = new ArrayList<>(count); public void producer() { int i = 0; while (true) { lock.lock(); i++; try { if (list.size() == count) { System.out.println("队列满了"); //阻塞生产者 释放锁 消费者消费时会唤醒生产者 addCondition.await(); } Thread.sleep(1000); list.add("abc"+i); System.out.println("生产者添加:" + i); //唤醒消费者 removeCondition.signal(); } catch (Exception e) { } finally { lock.unlock(); } } } public void comsume() { while (true) { lock.lock(); try { if (list.size() == 0) { System.out.println("队列空了"); //阻塞消费者 释放锁 生产者添加时会唤醒 removeCondition.await(); } Thread.sleep(1000); System.out.println("消费者消费:"+list.get(0)); list.remove(0); addCondition.signal(); } catch (Exception e) { } finally { lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { ConditionDemo prod = new ConditionDemo(); new Thread(() -> { prod.producer(); }).start(); Thread.sleep(100); new Thread(() -> { prod.comsume(); }).start(); }
代码一点要自己手写跑一跑,不然眨眼忘
Condition队列有自己管理的一个单向链表的等待队列
interrupt
的操作(可能是中断标识唤醒,不是Signal唤醒
)//AbstractQueuedSynchronizer#ConditionObject#await public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //添加到Condition等待队列 与构建AQS队列差不多 但这个是单向队列 状态为CONDITION = -2; Node node = addConditionWaiter(); //完全的释放锁 可能会重入 int savedState = fullyRelease(node); int interruptMode = 0; //判断是否已经在AQS队列 然后根据状态为CONDITION会返回false阻塞当前线程 //signal唤醒会同步到AQS队列 然后isOnSyncQueue 会返回true跳出循环 while (!isOnSyncQueue(node)) { LockSupport.park(this); //等待被唤醒后,从这里开始要先处理是不是由interrupt中断唤醒 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //接下来就是AQS里的内容了 这里是拿到锁以后的内容 是线程安全的 //被唤醒后重新去竞争锁 savedState/被释放的锁重入次数 //如果没抢到就会由继续parkAndCheckInterrupt阻塞,然后等待AQS队列中的线程唤醒 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled //清空无效线程 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
这个很好理解,就是在原有的AQS队列前面加了一个单向链表的等待队列。
//AbstractQueuedSynchronizer#ConditionObject#signal public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //当下第一个等待节点 Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { //取出下一个等待的节点 判断条件是transferForSignal if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //过滤无效状态的节点 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //这个方法就很熟悉了 加到AQS队列 Node p = enq(node); int ws = p.waitStatus; //如果已经是待唤醒状态 直接唤醒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //唤醒Condition队列的线程 又回到LockSupport.park(this);后面继续执行 LockSupport.unpark(node.thread); return true; }
这个源码仔细看起来还是很简单的,看不懂的多看几遍慢慢的就懂了。
以上就是本章的全部内容了。
上一篇:J.U.C ReentrantLock可重入锁使用以及源码分析
下一篇:J.U.C中的阻塞队列使用及源码分析–ArrayBlockingQueue
志士惜日短,愁人知夜长