上一篇:Java并发之AQS原理解读(二)
本文从源码角度分析AQS
共享锁工作原理,并介绍下使用共享锁的子类如何工作的。
共享锁与独占锁的不同之处在于,获取锁和释放锁成功后,都会循环唤醒从表头开始的第一个阻塞结点,直到表头没有改变。
doReleaseShared
方法存在无效的调用,即存在无效的线程唤醒,但为了避免程序出现问题,无伤大雅。PROPAGATE
状态用于setHeadAndPropagate
方法判断是否唤醒阻塞结点。
1、先尝试 tryAcquireShared
,如果获取锁成功直接返回;
2、否则,addWaiter
先将线程封装成Node
入队,再判断当前节点的前驱是否是head
头结点,是的话尝试tryAcquireShared
获取锁,如果锁资源>=0
则将当前节点设置为head
并循环唤醒从表头开始的第一个等待结点,直到表头节点没有改变;不是头结点的话,将当前节点的前驱的waitStatus
设为SIGNAL
,并阻塞当前节点。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { // 结点入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 设置为头结点并唤醒后继结点 setHeadAndPropagate(node, r); p.next = null; // help GC // 当发生线程中断时,补偿设置中断标识 if (interrupted) selfInterrupt(); failed = false; return; } } // 设置前驱结点等待状态 waitStatus 为 SIGNAL,并阻塞自己 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); /** * 当锁资源大于 0 时,或者旧头结点 waitStatus 为 PROPAGATE 或者新节点 waitStatus 为 PROPAGATE * 并且新节点的下一节点是可共享的时,唤醒下一个结点 * * h == null 和 (h = head) == null 一定不成立,因为之前调用了 addWaiter */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { for (;;) { Node h = head; // 当队列存储等待结点时 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } // 将过渡状态 0 设为可传播 PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // 如果头结点发生了改变,则继续唤醒下一个等待结点 if (h == head) break; } }
1、直接尝试释放锁,如果失败则返回;
2、如果释放锁成功,则循环唤醒从表头开始的第一个等待结点,直到表头节点没有改变。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; // 当队列存储等待结点时 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } // 将过渡状态 0 设为可传播 PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // 如果头结点发生了改变,则继续唤醒下一个等待结点 if (h == head) break; } }
将state
设置为初始化倒数,每次countDown
都递减state
。如果state != 0
时,tryAcquireShared
都返回没有资源,线程执行到await
就写入队列阻塞;直到某个线程执行完state == 0
,tryAcquireShared
返回有资源后,就会唤醒等待队列中的第一个线程往下执行。
// 设置state public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); } // 递减 state public void countDown() { sync.releaseShared(1); } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } /** * 阻塞等待,直到 state == 0 * 因为当 state == 0 时,tryAcquireShared 才返回正数,表示有锁资源 * 从而 sync#acquireSharedInterruptibly 可以执行成功,不再阻塞 */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
参考:
setHeadAndPropagate源码分析