在【Java 并发编程】——AQS 源码探索之独占式一文中从源码详细介绍了 AQS 独占式的实现方式。本文将介绍 AQS 的共享式,顾名思义,共享式就是允许多个线程同时访问同一个资源。
在独占式中,AQS 中的状态用来表示可获取或者已独占(比如 0 表示可获取,1 表示已被占用)。共享式中,状态已不再是具体数值,而是一个范围:大于等于 0 表示可获取,小于 0 表示已被占满。
下面是一个自定义共享式同步工具类 TwinsLock,同一时刻最多允许两个线程访问:
public class TwinsLock implements Lock { private Sync sync = new Sync(2); private static final class Sync extends AbstractQueuedSynchronizer { private Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must larger than 0"); } // 初始化 state 的值,表示可同时访问的线程数量 setState(count); } @Override protected int tryAcquireShared(int reduceCount) { for(;;) { // 获取当前 int currentCount = getState(); // 计算剩余可用数量 int newCount = currentCount - reduceCount; if (newCount < 0 || compareAndSetState(currentCount, newCount)) { return newCount; } } } @Override protected boolean tryReleaseShared(int returnCount) { for(;;) { int currentCount = getState(); int newCount = currentCount + returnCount; if (compareAndSetState(currentCount, newCount)) { return true; } } } } @Override public void lock() { sync.acquireShared(1); } @Override public void unlock() { sync.releaseShared(1); } // 省略其他方法 }
按照惯例,定义静态内部类实现 AbstractQueuedSynchronizer,共享式需要重写 tryAcquireShared() 和 tryReleaseShared() 方法。TwinsLock 类的作用是同时允许两个线程通过,其他线程需要等待。获取和释放的具体逻辑可以看上面代码注释,使用方式如下
TwinsLock lock = new TwinsLock(); lock.lock(); try { // do sth... } finaly { lock.unlock(); }
使用方式上和 ReentrantLock 一毛一样有木有。
接下来分析获取的流程,lock.lock() 调用的是 sync.acquireShared() 方法
public final void acquireShared(int arg) { // tryAcquireShared 需要子类重写 if (tryAcquireShared(arg) < 0) // 获取失败后调用 doAcquireShared(arg); } /** * 共享模式获取,不响应中断 */ private void doAcquireShared(int arg) { // 入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 为头节点 if (p == head) { // 重新获取 int r = tryAcquireShared(arg); // 获取成功 if (r >= 0) { // 设置头节点 setHeadAndPropagate(node, r); p.next = null; // help GC // 如果检测到中断,就中断自己 if (interrupted) selfInterrupt(); failed = false; return; } } // 判断是否应该阻塞 if (shouldParkAfterFailedAcquire(p, node) && // 阻塞并检查中断 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
共享式获取失败后的操作 doAcquireShared() 和独占式中操作很相似,获取的前提是前驱节点必须是头节点,否则进行阻塞。获取成功后执行 setHeadAndPropagate() 方法,并检查中断,如果需要中断,那就中断当前线程,最后返回。看看 setHeadAndPropagate 方法
private void setHeadAndPropagate(Node node, int propagate) { // 把获取成功的节点设置为头头节点 Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0) { // 获取下一个节点 Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
重点瞅瞅 doReleaseShared()
/** * Release action for shared mode -- signal successor and ensure * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) * * 共享模式下的唤醒动作 -- 唤醒后继者并保证这种方式传播下去 */ private void doReleaseShared() { for (;;) { Node h = head; // 1. h = null,队列还没初始化 // 2. h == tail,队列刚初始化,就一个头节点 if (h != null && h != tail) { int ws = h.waitStatus; // 头节点的状态为 SIGNAL if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒后继节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果头节点没有发生变化,退出循环 if (h == head) // loop if head changed break; } }
这个方法是将头节点的的状态先从 SIGNAL 改为 0,再从 0 改为 PROPAGATE,至于为什么不一步到位,可以看看 unparkSuccessor() 中的代码
if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
这里将状态小于 0 的改成 0,如果 doReleaseShared() 方法直接将头节点的状态改为 PROPAGATE,那这里相当于做了一次无用功。如果改变状态失败,说明头节点被改变了,那么进行下一次循环,重新获取头节点。从if (h == head) {break;}
可以知道每次只会唤醒头节点的后继节点。
释放代码
public final boolean releaseShared(int arg) { // tryReleaseShared() 由子类实现 if (tryReleaseShared(arg)) { // 释放操作,分析如上 doReleaseShared(); return true; } return false; }
独占式中状态表示可获取已占用,比如 0 表示可以获取,获取成功后将状态改为 1,这种改变通过 CAS 实现,代码如下
if (compareAndSetState(0, 1)) { // 获取成功 }
释放的时候将状态改为 0 即可
setState(0);
而共享式中状态一般用来表示的可用许可数量,当许可大于或等于 0 表示允许获取,每次获取成功后减掉指定许可数量并改变状态,直到状态小于 0 表示不可获取
for(;;) { int currentCount = getState(); int newCount = currentCount - reduceCount; if (newCount < 0 || compareAndSetState(currentCount, newCount)) { return newCount; } }
可以看到,不管是独占式还是共享式,核心还是状态的改变。
独占式中,队列中阻塞线程需要前驱节点唤醒,而只有前驱节点在释放操作是才会去唤醒。
而共享式中,除了释放的时候唤醒,重新获取成功的时候也会去唤醒后继节点。
独占式中获取代码如下
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
代码中可以知道,如果线程获取同步状态失败,那么就将加入队列尾部。
而共享式则不同,
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
共享式中获取失败就直接返回,不会再加入队列。因为共享式一般用来允许指定数量的线程同时访问共享资源,当同步状态小于 0,则表示访问的线程数已达上限,后来的线程只能拒之门外。