前几天在对比Synchronized和ReentrantLock的关系和区别时,以及学习使用Semaphore、CountDownLatch和CyclicBarrier时,发现里面底层都有这样一个同步器。这让我觉得学习它们的底层原理,就不得不学习AQS自身的底层原理,那么,我们就来吧。
这里参考了队列同步器(AQS)详解 和 这才是图文并茂:我写了1万多字,就是为了让你了解AQS是怎么运行的
目录
成员变量
Node
独占式同步组件的设计
同步器提供的模板方法
供子类重写的方法
具体方法实现
acquire方法
Release
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; /** 创建一个新的AbstractQueuedSynchronizer实例,初始同步状态为零。 */ protected AbstractQueuedSynchronizer() { } static final class Node{...} //数据结构中的节点,下面细说。 /** 等待队列的头部,延迟初始化。 除初始化外,仅通过 setHead 方法进行修改。 注意:如果 head 存在,则保证其 waitStatus 不会被 CANCELLED。 */ private transient volatile Node head; /** 等待队列的尾部,延迟初始化。 仅通过方法 enq 修改以添加新的等待节点 */ private transient volatile Node tail; /** 同步状态。 */ private volatile int state; }
可以看出同步器本身其实就是一个双向链表,我们直接能获取的节点只有head和tail。主要是我们要考擦这个节点Node的含义。其中head节点为正在运行线程的的节点。
static final class Node { /** 指示节点在共享模式下等待的标记 */ static final Node SHARED = new Node(); /** 指示节点正在以独占模式等待的标记 */ static final Node EXCLUSIVE = null; /** 指示线程已取消的 waitStatus 值*/ static final int CANCELLED = 1; /** 指示后继节点需要被唤醒的waitStatus 值 */ static final int SIGNAL = -1; /** waitStatus 值指示线程正在等待条件(进入等待队列) */ static final int CONDITION = -2; /**指示下一个acquireShared 应无条件传播的waitStatus 值*/ static final int PROPAGATE = -3; volatile int waitStatus; /** 前驱节点 */ volatile Node prev; /** 后继节点 */ volatile Node next; /** 该节点装载的线程 在构造时初始化并在使用后归零。 */ volatile Thread thread; /** 链接到下一个等待条件的节点,或特殊值 SHARED。 因为条件队列只有在独占模式下才会被访问,所以我们只需要一个简单的链接队列来保存节点,因为它们正在等待条件。 然后将它们转移到队列以重新获取。 并且因为条件只能是独占的,所以我们通过使用特殊值来表示共享模式来保存字段。 */ Node nextWaiter; /** 如果节点在共享模式下等待,则返回 true。 */ final boolean isShared() { return nextWaiter == SHARED; } /** 上一个节点*/ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
代码李定义了一个表示当前Node节点等待状态的字段waitStatus,取值总共有CANCELLED(-1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,这5个值代表了不同的特定场景。
当waitStatus为负值表示结点处于有效等待状态,为正值的时候表示结点已被取消。
//独占式获取同步状态,如果当前线程获取同步状态成功,立即返回。否则,将会进入同步队列等待, //该方法将会重复调用重写的tryAcquire(int arg)方法 public final void acquire(int arg) {...} //与acquire(int arg)基本相同,但是该方法响应中断。 public final void acquireInterruptibly(int arg){...} //独占式释放同步状态,该方法会在释放同步状态后,将同步队列中第一个节点包含的线程唤醒 public final boolean release(int arg) {...}
//独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 protected boolean tryAcquire(int arg) //独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 protected boolean tryRelease(int arg) //当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 protected boolean isHeldExclusively()
/** 以独占模式获取,忽略中断。 通过至少调用一次tryAcquire ,成功返回。 否则线程会排队,可能会反复阻塞和解除阻塞,调用tryAcquire直到成功。 此方法可用于实现方法Lock.lock 。 */ public final void acquire(int arg) { if (!tryAcquire(arg) && //如果tryacquire失败 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//生成节点并加入同步队列 selfInterrupt(); } /** 为当前线程和给定mode创建节点并排入队列 */ private Node addWaiter(Node mode) { //共享/独占性,null为独占 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { // node.prev = pred;//连接到尾部之后 if (compareAndSetTail(pred, node)) { //CAS设置,期望当前尾部是pred,是的话改为node。 //如果CAS尝试成功,就说明"设置当前节点node的前驱"与"CAS设置tail"之间没有别的线程设置tail 成功 //只需要将"之前的tail"的后继节点指向node即可 pred.next = node; return node; } } enq(node);//否则,通过死循环来保证节点的正确添加 return node; } /** 每个节点以死循环方式来判断自身是否头结点。 */ final boolean acquireQueued(final Node node, int arg) { //自旋 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//前驱节点是头节点并且获取到了同步状态 setHead(node); //设置其为首节点 p.next = null; // help GC 断开引用 failed = false; return interrupted; //自旋退出 } if (shouldParkAfterFailedAcquire(p, node) && //获取同步状态失败后判断是否需要阻塞或中断 parkAndCheckInterrupt()) //阻塞当前线程 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
上述逻辑可由下图表示:
当前线程获取同步状态失败时,同步器会将当前线程、等待状态等信息构造成一个Node并加入同步器队列中,同时会阻塞当前线程。
同时可以看出每个同步器队列中的节点都在进行自旋,自省地观察自己的前驱节点是否为head节点,如果是就尝试tryAcquire,成功即可退出自旋。
可以看到节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO。并且也便于对过早通知的处理(过早通知是指:前驱节点不是头节点的线程由于中断而被唤醒)。
head节点的线程在释放同步状态时,将会唤醒后续节点,后续节点将在获取同步状态成功时将自己设置为head节点。
public final boolean release(int arg) { if (tryRelease(arg)) { //释放同步状态成功的话 Node h = head; if (h != null && h.waitStatus != 0) //独占即为SIGNAL -1 要唤醒后面的 unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; //当前节点waitStatus if (ws < 0) compareAndSetWaitStatus(node, ws, 0);//更新为0 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) //从尾部往前找 if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); //唤醒 }