要讲AbstractQueuedSynchronizer,我觉得最好还是实际使用一个重入锁,来看内部实现,这里使用ReentrantLock
ReentrantLock NonFairLock = new ReentrantLock(); ReentrantLock fairLock = new ReentrantLock(true);
可以看到ReentrantLock构造函数有两种实现,一个是默认的非公平锁,另一个是非公平锁
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
看一下ReentrantLock内部构造
//首先实际lock,调用的是一个Sync类,Sync继承了AQS abstract static class Sync extends AbstractQueuedSynchronizer {} //具体的非公平锁,继承了Sync类 static final class NonfairSync extends Sync {} static final class FairSync extends Sync {}
以下是nonFairLock.lock()方法的大致流程
ReentrantLock.lock()
public void lock() { sync.lock(); }
这里使用了ReentrantLock的类常量,private final Sync sync,sync调用它子类NonfairSync.lock()方法
NonfairSync.lock()
final void lock() { //初次获取锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //判断是重入锁还是去竞争别人的锁 acquire(1); }
重点关注下,当使用CAS更改state变量失败后(也即是没有初次获取到锁,如果是重入锁,也会失败),AQS.acuire(1)
AQS.acquire(int arg)
public final void acquire(int arg) { if (!tryAcquire(arg) && //addWaiter(独占锁模式) acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这个方法首先会去调用子类实现的tryAcquire(int arg)方法,在非公平锁中会调用Sync的nonfairTryAcquire(int acquires)方法
ReentrantLock.Sync.nonfairTryAcquire(int acquires)
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取当前的state int c = getState(); //如果当前state == 0,那么使用CAS修改state,即获取锁 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //当前线程已经获取了锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //获取锁失败,返回false,调用后续的函数 return false; }
获取锁成功,!tryacquire(arg)相当于是false,直接返回
如果获取锁失败,!tryacquire(arg)相当于是true,需要执行if判断中的acquireQueued(final Node node, int arg)函数
这里看一看获取锁失败后的流程
首先将当前线程封装成Node节点,进入队列(实际是链表)
AQS.addWaiter(Node mode)
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //直接获取连边末尾的节点,尝试进行快速插入 //这里的条件是链表中已经有了等待节点,也即是链表初始化了 //当采用一次CAS设置末尾节点的时候,还是会调用enq方法,进行for死循环插入队列 Node pred = tail; if (pred != null) { node.prev = pred; //看,这里又用到了CAS if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果CAS设置失败 //或者链表还没有初始化,则进行初始化 enq(node); return node; }
看一下enq(final Node node) 方法
AQS.enq(final Node node)
private Node enq(final Node node) { //这里采用了for(;;)循环的方式 for (;;) { Node t = tail; //链表没有初始化,此时会生成一个dummy虚头节点 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //链表已经初始化了 //将新的node节点放入末尾 node.prev = t; //再次采用CAS设置,如果失败,则会反复for循环 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
将等待线程封装成节点后,调用acquireQueued(final Node node, int arg)方法
AQS.acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //再次for(;;)循环 for (;;) { //获取到当前节点的前继 final Node p = node.predecessor(); //当前继为头结点的时候,可以尝试使用tryAcuqire(arg)方法获取锁 //在非公平锁中会调用Sync的nonfairTryAcquire(int acquires)方法 if (p == head && tryAcquire(arg)) { //获取锁成功,将当前节点设置为头结点 setHead(node); /** AQS.setHead(node)方法 private void setHead(Node node) { head = node; //将头结点中的线程属性设位null node.thread = null; node.prev = null; } */ p.next = null; // help GC failed = false; return interrupted; } //当该节点的前继不是head节点,或者获取锁失败 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //如果挂起线程,检测到被中断,那么将中断标识设置为true interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
在for循环中,如果节点的前继不是head,或者前继是head,但是获取锁失败(前继没有完全释放,state != 0),会在if()判断中调用shouldParkAfterFailedAcquire(p, node) 和 parkAndCheckInterrupt()方法
注意,如果should()方法返回是false,不会执行park()方法
AQS.shouldParkAfterFailedAcquire(p, node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ //前继的等待状态已经设置为SIGNAL,该节点可以调用LockSupport.park()方法挂起 return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ //前继节点已经取消了,此时需要找新的前继节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //前继节点为0或者PROPAGARE,采用CAS的方式更改为SIGNAL,并返回false compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
如果should()返回true,说明前继的等待状态已经是SIGNAL了,则执行parkAndCheckInterrupt()挂起当前线程
AQS.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //判断当前线程是否被中断 return Thread.interrupted(); }
因为解锁的线程首先要求获取到了锁,所以不存在锁竞争,调用链相对简单
ReentrantLock.unlock()
public void unlock() { sync.release(1); }
调用了sync.release(int arg)方法,也即是父类AQS的release(int arg)方法
AQS.release(int arg)
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
首先会执行tryRelease(1)方法
ReentrantLock.Sync.tryRelease(int releases)
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) //如果当前线程没有获得锁,抛出异常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //如果state被释放为0,则标记free,锁完全释放 free = true; //设置当前锁没有被任何线程获取 setExclusiveOwnerThread(null); } //更新state setState(c); //返回free标记,用于AQS.release()流程判断 return free; }
如果Sync.tryRelease()方法完全释放了锁,则AQS.release()方法中会执行unparkSuccessor(head)方法,去唤醒后继节点
AQS.unparkSuccessor(Node node)
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; //如果node的后继节点为空或者被取消,则从后往前找下一个可以唤醒的节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒后继节点(线程) LockSupport.unpark(s.thread); }
LockSupport.unpark()
public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }
至此,ReentrantLock的非公平加锁和解锁,流程看完了