ReentrantLock加锁释放锁,都是通过其一个对象属性Sync来实现的,Sync继承了AbstractQueuedSynchronizer,也就是大名鼎鼎的aqs了。而Sync本身有两个子类,一个是非公平实现,一个是公平实现,ReentrantLock中默认采用非公平锁。本文只探讨非公平实现逻辑
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
lock方法会调用sync的lock方法
final void lock() { //cas尝试修改state变量,从0变为1 //这里其实就能体现所谓的非公平了,上来就直接尝试获取锁,不会老老实实去同步队列尾部等着 if (compareAndSetState(0, 1)) //修改成功,设置当前持有锁的线程 setExclusiveOwnerThread(Thread.currentThread()); else //失败,这段代码的实现在父类AQS中 acquire(1); }
state是aqs中的一个属性,加锁时就是使其+1,释放锁就是-1
public final void acquire(int arg) { //tryAcquire方法会走子类实现,最终执行nonfairTryAcquire //如果tryAcquire返回了false,那么就会往下走 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取state int c = getState(); //等于0说明没有线程持有锁 if (c == 0) { //cas加锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //不等于0,锁被持有了,看看是否是自己持有的,是的话,state+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //获取锁失败,返回false return false; }
tryAcquire方法就是尝试去获取锁了,lock加锁失败,到这里可能锁又被释放了,或者是自己加的锁,现在是重入,所以tryAcquire就是重新尝试一下,如果还是失败了,那么就需要加入到同步队列尾部了,也就是接下来的addWaiter方法。
这里先说说这个同步队列。这个同步队列的节点是一个Node类,是AQS的一个静态内部类,也是AQS的核心。里面包装了当前类,以及waitStatus,如等待唤醒、取消等
static final class Node{ static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; //已取消状态,要等阻塞队列里某个线程被唤醒抢锁失败抛异常才标记为这个 static final int CANCELED = 1; //被加入阻塞队列后,等待唤醒的状态 static final SIGNAL = -1; //等待状态,调用Condition.await方法,会加入到等待队列,状态就是这个 static final CONDITION = -2; //这是用以读写重入锁的 static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; }
//先看addWaiter方法 private Node addWaiter(Node mode) { //首先构造一个node,状态是EXCLUSIVE,意思就是排它锁了 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //这里如果为null,说明链表还不存在,不为null,加入到尾部,这里使用cas操作进行抢占,因为可能有别的线程也在尝试 //如果链表不存在,或者抢占失败,走enq if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //加入到链表尾部 enq(node); return node; } private Node enq(final Node node) { for (;;) { //自旋,不断尝试。 Node t = tail; //如果链表不存在,那么先创建一个头结点,这个头结点不存储线程信息 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //否则的话,尝试加入尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //这是一个中断标志,如果线程在自旋尝试获取锁的过程中被阻塞过,会返回true,然后再acquire方法中再次中断一下,目的是恢复中断标志位 //因为在下面parkAndCheckInterrupt中会调用interrupted方法,这个方法会清除中断标志 boolean interrupted = false; for (;;) { //自旋 //拿到这个node的前一个结点 final Node p = node.predecessor(); //如果p是头结点 //前面tryAcquire方法我们知道,这里是去尝试抢占锁,或者重入锁,如果失败了,看下面 //如果成功了呢,把这个节点变成头结点,原来的头结点置为null,这里会把这个node的thread=null,prev=null if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果p的waitStatus不是-1,返回的false,那么自旋重新来,如果是-1了,就走parkAndCheckInterrupt,阻塞线程 //那么假设现在我们的这个线程被阻塞在这个地方了,不会往下走了,我们看看当它的前一个线程释放锁会怎样,unlock //!!!看完了吗!前一个节点unlock释放锁后,会唤醒后一个节点或是从后往前第一个可执行节点,这个线程被唤醒,那么它再次自旋去尝试抢占锁 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //报错了,就会进入这个方法了 if (failed) cancelAcquire(node); } }
可以看到,addWaiter方法实际上就构造了一个同步队列(阻塞队列),每一个tryAcquire尝试获取锁失败的线程,都加入到这个队列尾部去。之后在acquireQueued方法中,自旋,看看自己是不是head的下一个节点,是的话尝试获取锁,不是的话在shouldParkAfterFailedAcquire方法中会判断前一个节点的waitStatus是否是signal,也就是-1,是的话就阻塞自己,同时这个方法内也会清除该节点前面的状态为CANCELLED的节点
而finally中的cancelAcquire,主要就是清除被取消的节点
public void unlock() { sync.release(1); } public final boolean release(int arg) { //如果state现在为0了,就是返回true了,看看怎么走 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒之前被阻塞的线程,注意传入的节点是head节点 unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { //释放锁就是-1嘛,如果c==0,说明现在没人持有锁了,就设置拥有线程为null,放回true。 //否则说明之前是重入的,需要继续释放,返回false int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //如果下一个节点是null或者状态是已取消 if (s == null || s.waitStatus > 0) { s = null; //从后往前找到一个可执行节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //如果是一个可执行节点,唤醒它! if (s != null) LockSupport.unpark(s.thread); }
一段lock与unlock流程看下来,就是lock的时候尝试获取锁,获取锁是通过cas修改state从0到1,修改成功的即获取锁成功并且标识出当前拥有锁的线程,同一个线程多次lock,state会+1,最后释放锁也需要多次释放直到state变为0。
而获取锁失败的线程,就会被封装成一个node节点,加入到同步队列的尾部,期间会通过自旋反复尝试获取锁,最终当前一个节点状态为待唤醒时就会阻塞自己,等待唤醒。
而释放锁就是cas让state-1了,当state为0时就是锁完全释放,唤醒头结点的下一个节点,或者是尾部往前第一个可执行节点。