类图
构造方法
//java.util.concurrent.locks.ReentrantLock //默认非公平锁 public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } //执行lock实际上是执行sync的lock方法 public void lock() { sync.lock(); }
先上流程图
我们先看非公平锁的加锁过程
//java.util.concurrent.locks.ReentrantLock NonfairSync内部类 final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
compareAndSetState方法写在AQS里面
//java.util.concurrent.locks.AbstractQueuedSynchronizer public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** * 同步状态,初始化后为0 */ private volatile int state; private transient volatile Node head; private transient volatile Node tail; //unsafe在这获取 private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; //静态代码块获取偏移量 static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } //第一次是0,所以成功设置为1,代表已经获取到锁了 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
第一次返回true,继续执行setExclusiveOwnerThread
//java.util.concurrent.locks.AbstractOwnableSynchronizer private transient Thread exclusiveOwnerThread; //设置独占线程 protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
第一次lock代码很简单,再看第二次lock,此时compareAndSetState返回false,所以走acquire(1);
//java.util.concurrent.locks.AbstractQueuedSynchronizer public final void acquire(int arg) { //子类实现的,返回false则继续执行 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
调用tryAcquire
//java.util.concurrent.locks.ReentrantLock //NonfairSync内部类 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //锁已经被释放,重设锁 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //获取锁的线程和当前线程是同一个,这是重入锁关键 else if (current == getExclusiveOwnerThread()) { //锁status计数器加一 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //走到这说明需要等待锁,返回false继续往下走 return false; }
继续看acquireQueued
//java.util.concurrent.locks.AbstractQueuedSynchronizer //Node是一个内部类 static final class Node { static final Node EXCLUSIVE = null; } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //插入队列 enq(node); return node; } private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 第一次tail是null,CAS设置为头 if (compareAndSetHead(new Node())) tail = head;//tail和head都设置为上面new Node,之后重新走第二次循环 } else { //然后把当前node插入队尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
接下来看acquireQueued
//java.util.concurrent.locks.AbstractQueuedSynchronizer final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //第一次执行到这,这里的p就是之前设置的head,所以条件p == head成立 //再次执行tryAcquire假设还锁没有释放,则返回false if (p == head && tryAcquire(arg)) { ... } if (shouldParkAfterFailedAcquire(p, node) &&//第二次返回true,继续执行parkAndCheckInterrupt()方法 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } Node内部方法 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//第一次执行为Node默认waitStatus为0 if (ws == Node.SIGNAL)//第二次直接返回true /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //CAS设pred为SIGNAL状态 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //这部分代码比较简单,只是挂起线程 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
Node结构图
lock过程基本分析完成,接下来看unlock
//java.util.concurrent.locks.ReentrantLock public void unlock() { sync.release(1); } //java.util.concurrent.locks.AbstractQueuedSynchronizer public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; //如果之前等待队列生成了head,则会执行unparkSuccessor if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //java.util.concurrent.locks.ReentrantLock Sync 内部类 protected final boolean tryRelease(int releases) { int c = getState() - releases; //没有拥有锁的线程调用会直接崩溃 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //c==0说明线程已经释放了锁,可以被其他线程获取了 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //java.util.concurrent.locks.AbstractQueuedSynchronizer private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //获取head的next,然后判空,此处先忽略为null或者waitStatus > 0情况 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null)//重新唤醒线程 LockSupport.unpark(s.thread); }
在这唤醒后,会返回之前挂起的地方,我们回过头看之前的代码
//返回线程的中断状态 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //如果当前Node的上一节点是head,则重设head if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果这里是true则设置interrupted状态,然后返回for循环 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //然后返回这 public final void acquire(int arg) { //如果状态状态是true则执行selfInterrupt if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //尝试把自己中断 static void selfInterrupt() { Thread.currentThread().interrupt(); }
另一个之前没解决的问题是head的next节点什么情况waitStatus > 0,CANCELLED = 1,所以证明中间节点被cancel的时候则状态变了。一般来说failed只有发生异常等情况会为true,但是有一种情况例外,我们来看下。
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //在这直接抛异常,不再执行for,也就是failed依然是true,然后执行finally throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
看cancelAcquire方法
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0)//往前遍历找到之前一个没被cancel的节点 node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. //如果现在已经是队尾,则移除自己,成功则设置后继节点为null if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. // int ws; if (//此处判断较多拆开分析 //当前节点的前驱节点不是head pred != head && //前驱节点的状态是SIGNAL ((ws = pred.waitStatus) == Node.SIGNAL || //否则尝试设置前驱的status为SIGNAL (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && //上面两个满足其中一个,判断前驱thread是否为null pred.thread != null) { //设置前驱节点的next节点为当前Node的next节点,等于删除了当前Node Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //假设前驱thread为null,也就是当前前驱为head,则唤醒当前Node的后继节点线程 unparkSuccessor(node); } node.next = node; // help GC } }
回到之前的问题:
为什么在unparkSuccessor的时候,如果node.next==null或者waitStatus > 0寻找节点从后往前?代码如下:
Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //我们模拟多线程的一些特殊情况 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { //假设执行到这里被挂起 //tail已经为最新的node,但是从前往后遍历找不到tail pred.next = node; return node; } } //插入队列 enq(node); return node; }
(未完待续)