上节,我们讲了AQS的阻塞与释放实现原理,线程间通信(Condition)的原理。这次,我们就讲讲基于AQS实现的ReentrantLock(重入锁)。
结合上面的ReentrantLock类图,ReentrantLock实现了Lock接口,它的内部类Sync继承自AQS,绝大部分使用AQS的子类需要自定义的方法存在Sync中。而ReentrantLock有公平与非公平的区别,即'是否先阻塞就先获取资源',它的主要实现就是FairSync与NonfairSync,后面会从源码角度看看它们的区别。
Sync是ReentrantLock控制同步的基础。它的子类分为了公平与非公平。使用AQS的state代表获取锁的数量
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); ... }
我们可以看出内部类Sync是一个抽象类,继承它的子类(FairSync与NonfairSync)需要实现抽象方法lock。
下面我们先从非公平锁的角度来看看获取资源与释放资源的原理
故事就从就两个变量开始:
// 获取一个非公平的独占锁 /** * public ReentrantLock() { * sync = new ReentrantLock.NonfairSync(); * } */ private Lock lock = new ReentrantLock(); // 获取条件变量 private Condition condition = lock.newCondition();
lock.lock()
public void lock() { sync.lock(); }
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; // 获取资源 final void lock() { // 若此时没有线程获取到资源,直接设置当前线程独占访问资源。 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // AQS的方法 acquire(1); } protected final boolean tryAcquire(int acquires) { // 实现在父类Sync中 return nonfairTryAcquire(acquires); } }
AQS的acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上一节已经剖析过AQS的acquire的整个流程了,就差子类如何去实现tryAcquire了。
// Sync实现的非公平的tryAcquire final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 此时若没有线程获取到资源,当前线程就直接占用该资源 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 若当前线程已经占用了该资源,可以再次获取该资源 ->这个行为就是可重入锁的支撑 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
尝试获取资源的过程是非常简单的,这里再贴一下acquire的流程图
lock.unlock();
public void unlock() { // AQS的方法 sync.release(1); }
AQS的release
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
release的流程已经剖析过了,接下来看看tryRelease的实现
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 这里可以看出若没有持有锁,就释放资源,就会报错 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
tryRelease的实现也很简单,这里再贴一下release的流程图
公平锁与非公平锁,即'是否先阻塞就先获取资源', ReentrantLock中公平与否的控制就在tryAcquire中。下面我们看看,公平锁的tryAcquire
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // (2.3.1) // sync queue中是否存在前驱结点 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
区别在代码(2.3.1)
hasQueuedPredecessors
判断当前线程的前面有无其他线程排队;若当前线程在队列头部或者队列为空返回false
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
本文介绍了利用AQS实现的锁ReetrantLock