结合CountDownLatch和CyclicBarrier了解一下AQS的共享锁部分
先看下CountDownLatch是怎么使用的
public class CountDownLatchTest { public void CountDownLatchTest() throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(5); ExecutorService e = Executors.newFixedThreadPool(8); // 创建 N 个任务,提交给线程池来执行 for (int i = 1; i <= 5; ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); // 等待所有的任务完成,这个方法才会返回,还没完成就阻塞在这里 doneSignal.await(); // wait for all to finish System.out.println("所有线程的任务都做完了"); e.shutdown(); System.out.println("线程池已关闭"); } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } public void run() { doWork(i); // 这个线程的任务完成了,调用 countDown 方法 doneSignal.countDown(); } void doWork(int i) { System.out.println("线程" + i + "的任务做完了"); } } public static void main(String[] args) throws InterruptedException { new CountDownLatchTest().CountDownLatchTest(); } }
输出结果
线程1的任务做完了 线程2的任务做完了 线程3的任务做完了 所有线程的任务都做完了 线程4的任务做完了 线程5的任务做完了 线程池已关闭
从输出结果可以看过,5个线程只有完成了任务await()方法才会返回然后执行后面的,还没完成就会被阻塞,直到5个线程完全任务才不阻塞,实际应用就是把一个大的任务分成几个小任务然后让不同的线程并发执行,例如把一段数据分成多段,每个数据处理一部分,直到整段数据处理后才返回
说完CountDownLatch的使用就可以开始了CountDownLatch的原理了,看下CountDownLatch是怎样的使用AQS的共享锁的
构造方法的作用是初始化state的值,相当于初始化了多少把锁,不允许传入负数
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); }
假设现在线程1,线程2调用了await()方法
首先调用的是CountDownLatch的await()方法
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
然后调用AQS的acquireSharedInterruptibly()方法
//因为这个方法是会处理中断的,所以先判断中断状态 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //state不为0条件成立然后调用doAcquireSharedInterruptibly(arg) if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
然后调用CountDownLatch的静态内部类Sync的 tryAcquireShared()方法
protected int tryAcquireShared(int acquires) { //state为0时才返回1 return (getState() == 0) ? 1 : -1; }
tryAcquireShared返回-1,所以调用AQS的doAcquireSharedInterruptibly()方法
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //把线程1入队 由于线程1入队之前队列是空的,所以线程1会创建一个head节点,然后再把线程1入队 //因为是共享锁没有线程单独持有锁所以exclusiveOwnerThread这个变量用不上 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //获取前驱节点 final Node p = node.predecessor(); //前驱结点是头结点 if (p == head) { //就可以尝试去获取锁 int r = tryAcquireShared(arg); //大于等于0表示获取成功 if (r >= 0) { //把当前节点设为新的头结点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //获取锁失败就挂起,把前驱节点的ws设为了-1 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //被中断就抛异常,不会继续尝试抢锁 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
线程1、线程2调用await()方法获取锁失败的话会先被挂起等待被唤醒
此时的队列是
每调用一次countDown()方法,state就会减1,相当于解了一把锁,直到state减为0,即表示锁解完了
线程3和线程4分别调用CountDownLatch的countDown()方法
public void countDown() { sync.releaseShared(1); }
然后调用AQS的releaseShared()方法
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
然后调用CountDownLatch静态内部类的tryReleaseShared()方法
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { //获取当前状态值 int c = getState(); //为0表示不用解锁 if (c == 0) return false; int nextc = c-1; //cas把state减1 if (compareAndSetState(c, nextc)) //减完之后state为0就返回true return nextc == 0; } }
如果tryReleaseShared()方法返回true表示锁已经全部解完了,可以调用AQS的doReleaseShared()方法唤醒所有阻塞的线程了
//只有当state的值为0或被中断了才会调用这个方法 private void doReleaseShared() { for (;;) { Node h = head;//头结点 //头结点不为空,而且队列不止一个节点 if (h != null && h != tail) { int ws = h.waitStatus; //头结点ws //如果头结点ws的值是SIGNAL-1表示头结点的后继节点要被唤醒 if (ws == Node.SIGNAL) { //把头结点的ws通过cas设为0,要用cas是因为可能有多个线程修改ws if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒头结点的后继节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //直到head后面的节点都被唤醒了,队列只剩head节点就不会继续唤醒 if (h == head) // loop if head changed break; } }
此时头结点的后继节点被唤醒了,即线程1被唤醒了
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); //被唤醒后r为1即大于等于所以会调用setHeadAndPropagate(node, r) if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //被唤醒后会先检查中断,然后再继续循环 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //把线程1设为新的头结点 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //会继续唤醒新的头结点的后继节点,即线程2,直到所有的线程被唤醒 if (s == null || s.isShared()) doReleaseShared(); } }
1、CountDownLatch使用了AQS的共享锁,共享锁的意思是可以多个线程去进行加锁和解锁,而独占锁只能由一个线程进行加锁和解锁
2、一个线程调用了await()方法,如果state不为0就会进入同步队列等待唤醒获取锁
3、其他线程调用countDown()方法后,如果state为0就会把同步队列里head后面的节点都唤醒
4、同一个线程不能同时调用await()方法和countDown()方法,因为不能自己唤醒自己