前言
在看完 ReentrantLock 之后,在高并发场景下 ReentrantLock 已经足够使用,但是因为 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而很多应用场景都是读多写少,这时候使用 ReentrantLock 就不太合适了。读多写少的场景该如何使用?在 JUC 包下同样提供了读写锁 ReentrantReadWriteLock 来应对读多写少的场景。
公众号:liuzhihangs,记录工作学习中的技术、开发及源码笔记;时不时分享一些生活中的见闻感悟。欢迎大佬来指导!
支持类似 ReentrantLock 语义的 ReadWriteLock 的实现。
具有以下属性:
此类不会将读取优先或写入优先强加给锁访问的排序。但是,它确实支持可选的公平 策略。
支持公平模式和非公平模式,默认为非公平模式。
允许 reader 和 writer 按照 ReentrantLock
的样式重新获取读锁或写锁。在写线程释放持有的所有写锁后,reader 才允许重入使用它们。此外,writer 可以获取读锁,但反过来则不成立。
重入还允许从写锁降级为读锁,通过先获取写锁,然后获取读锁,最后释放写锁的方式降级。但是,从读锁升级到写锁是不可能的。
读锁和写锁都支持锁获取期间的中断。
Condition
支持写锁提供了一个 Condition
实现,对于写锁来说,该实现的方式与 ReentrantLock.newCondition()
提供的 Condition
实现对 ReentrantLock
所做的行为相同。当然,此 Condition
只能用于写锁。读锁不支持 Condition
。
此类支持一些确定是保持锁还是争用锁的方法。这些方法设计用于监视系统状态,而不是同步控制。
锁最多支持 65535 个递归写锁和 65535 个读锁
以上为 Java Api 官方文档[1] 的解释,总结一下内容如下:
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { // 读锁加锁 rwl.readLock().lock(); if (!cacheValid) { // 获取写锁之前必须释放读锁 rwl.readLock().unlock(); // 写锁加锁 rwl.writeLock().lock(); try { // 重新检查状态,因为另一个线程可能 // 在执行操作之前获取了写锁定并更改了状态 if (!cacheValid) { data = ... cacheValid = true; } // 通过在释放写锁之前获取读锁来降级 rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); // Unlock write, still hold read } } try { use(data); } finally { rwl.readLock().unlock(); } } } 复制代码
上面只是官方文档提供的一个 demo。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private static final long serialVersionUID = -6992448646407690164L; /** 提供读锁的内部类 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** 提供写锁的内部类 */ private final ReentrantReadWriteLock.WriteLock writerLock; /** 执行所有同步机制 */ final Sync sync; } 复制代码
之前在阅读 ReentrantLock 源码的时候 state 代表了锁的状态,0 表示没有线程持有锁,大于 1 表示已经有线程持有锁及其重入的次数。而在 ReentrantReadWriteLock 是读写锁,那就需要保存读锁和写锁两种状态的,那是怎么样表示的呢?
在 ReentrantReadWriteLock 中同样存在一个 Sync 继承了 AbstractQueuedSynchronizer,也是 FairSync、NonfairSync 的父类。内部定义了 state 的一些操作。
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; // 移位数 static final int SHARED_SHIFT = 16; // 单位 static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 最大数量 1 << 16 -> 65536 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 计算独占数使用 1 << 16 -> 65536 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 返回共享保留数 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 返回独占保留数 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } } 复制代码
在 AQS 中定义 state 为 int 类型,而在 ReentrantReadWriteLock 中,将 state 的 高 16 位和低 16 位拆开表示读写锁。其中高 16 位表示读锁,低 16 位表示写锁。分别使用 sharedCount 和 exclusiveCount 方法获取读锁和写锁的当前状态。
下面分别从读锁和写锁的角度来看如何进行加锁和释放锁的?
public static class ReadLock implements Lock, java.io.Serializable { /** * 获取读取锁。 * 如果写锁没有被另一个线程持有,则获取读锁并立即返回。 * 如果写锁由另一个线程持有,则出于线程调度目的, * 当前线程将被禁用,并处于休眠状态,直到获取读锁为止。 */ public void lock() { // 调用 AQS 获取共享资源 sync.acquireShared(1); } } 复制代码
获取共享资源,这块使用的 AQS 的逻辑,其中 tryAcquireShared(arg) 是在 ReentrantReadWriteLock.Sync 中实现的。并且 AQS 中有规定,tryAcquireShared 分为三种返回值:
abstract static class Sync extends AbstractQueuedSynchronizer { protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); // 获取 state 值 int c = getState(); // 独占计数不为 0 且 不是当前线程, 说明已经有写锁 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 获取共享计数(读锁计数) int r = sharedCount(c); // 不需要阻塞读锁 && 共享计数小于最大值 && state 更新成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // 当前读锁计数为 0 // firstReader是获得读锁的第一个线程 // firstReaderHoldCount是firstReader的保持计数 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 读锁重入 firstReaderHoldCount++; } else { // 当前缓存计数 HoldCounter rh = cachedHoldCounter; // 当前线程没有计数 或者 没有创建计数器 if (rh == null || rh.tid != getThreadId(current)) // 创建计数,基于 ThreadLocal cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); // 计数累加 rh.count++; } return 1; } // 完整地获取共享锁方法,作为tryAcquireShared方法因CAS获取锁失败后的处理。 // 因为前面可能失败 CAS 失败, 队列策略失败等原因。 return fullTryAcquireShared(current); } } 复制代码
firstReader:firstReader是获得读锁的第一个线程; firstReaderHoldCount:firstReaderHoldCount是firstReader的保持计数。即获得读锁的第一个线程的重入次数。 cachedHoldCounter:最后一个获得读锁的线程获得读锁的重入次数。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 无限循环 for (;;) { int c = getState(); // 是否有写锁 if (exclusiveCount(c) != 0) { // 有写锁,但是不是当前线程,直接返回失败 if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // 需要阻塞 // 没有写锁,确保没有重新获取读锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 当前线程的读锁计数 ThreadLocal 中 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); // 计数结束,remove 掉 if (rh.count == 0) readHolds.remove(); } } // 为 0 直接失败 if (rh.count == 0) return -1; } } // 到达上限 抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // CAS 设置读锁 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } 复制代码
当存在写锁(独占锁)时,方法会返回 -1 失败,后续会调用 AQS 的 doAcquireShared 方法,循环获取资源。doAcquireShared 方法会不断循环,尝试获取读锁,一旦获取到读锁,当前节点会立即唤醒后续节点,后续节点开始尝试获取读锁,依次传播。
public static class ReadLock implements Lock, java.io.Serializable { public void unlock() { sync.releaseShared(1); } } 复制代码
调用 AQS 的 releaseShared 释放共享资源方法。
其中 tryReleaseShared 有 ReadLock 实现。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 第一个线程是当前线程 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 第一个线程不是当前线程,更新自己的 ThreadLocal 里面的计数 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // 循环 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; // 使用 CAS 更新 state if (compareAndSetState(c, nextc)) // 但是如果现在读和写锁都已释放, // 它可能允许等待的写程序继续进行。 return nextc == 0; } } 复制代码
public static class WriteLock implements Lock, java.io.Serializable { /** * 获取写入锁。 * 如果没有其他线程持有读锁或写锁,会直接返回,并将写锁计数设置为1。 * 如果当前线程持有写锁,则将写锁计数 +1,然后返回。 * 如果锁正在被其他线程持有,则当前线程用于线程调度目的, * 当前线程将被禁用,并处于休眠状态,直到获取读锁并将写锁计数设置为1。 */ public void lock() { sync.acquire(1); } } 复制代码
tryAcquire 方法由 Write 自己实现,方式和 ReentrantLock 类似。
protected final boolean tryAcquire(int acquires) { // 如果读锁计数为非零或写锁计数为非零,并且所有者是另一个线程,则失败。 // 如果计数饱和,则失败。只有在count不为零时,才可能发生这种情况。 // 否则,如果该线程是可重入获取或队列策略允许的话,则有资格进行锁定。 // 如果是这样,请更新状态并设置所有者。 Thread current = Thread.currentThread(); int c = getState(); // 写锁计数 int w = exclusiveCount(c); // c != 0 说明有有线程获取锁了 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // 判断是不是自己,不是自己 返回 false if (w == 0 || current != getExclusiveOwnerThread()) return false; // 判断有没有超过上限 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 重入 setState(c + acquires); return true; } // 不需要阻塞,或者 CAS 更新 state 失败 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } 复制代码
public static class WriteLock implements Lock, java.io.Serializable { // 如果当前线程是此锁的持有者,则保持计数递减。 // 如果保持现在的计数为零,则解除锁定。 // 如果当前线程不是此锁的持有者则IllegalMonitorStateException异常。 public void unlock() { sync.release(1); } } 复制代码
同样这块代码是使用 AQS 的逻辑,tryRelease 部分由 WriteLock 自己实现。
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } 复制代码
Q: 在 ReentrantReadWriteLock 中 state 代表什么?
A: state 代表锁的状态。state 为 0 ,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算可以获取读写锁的实际值。
Q: 线程获取锁的流程是怎么样的?
A: 可以参考上面的源码笔记,以及后面的流程图。
Q: 读锁和写锁的可重入性是如何实现的?
A: 在加锁的时候,判断是否为当前线程,如果是当前线程,则直接累加计数。值得注意的是:读锁重入计数使用的 ThreadLocal 在线程中缓存计数,而写锁则直接用的 state 进行累加(其实和 state 低 16 位进行累加一样)。
Q: 当前线程获取锁失败,被阻塞的后续操作是什么?
A: 获取失败,会放到 AQS 等待队列中,在队列中不断循环,监视前一个节点是否为 head ,是的话,会重新尝试获取锁。
Q: 锁降级是怎么降级的?
A:
如图,在圈出部分 fullTryAcquireShared 代码中,可以看出来,在获取读锁的时候,如果当前线程持有写锁,是可以获取读锁的。这块就是指锁降级,比如线程 A 获取到了写锁,当线程 A 执行完毕时,它需要获取当前数据,假设不支持锁降级,就会导致 A 释放写锁,然后再次请求读锁。而在这中间是有可能被其他阻塞的线程获取到写锁的。从而导致线程 A 在一次执行过程中数据不一致。[1] Java Api:docs.oracle.com/javase/8/do…