ReentrantReadWriteLock 从字面意思可以看出,是和重入、读写有关系的锁,实际上 ReentrantReadWriteLock 确实也是支持可重入的读写锁,并且支持公平和非公平获取锁两种模式。
为什么会出现读写锁?
普通锁可以保证共享数据在同一时刻只被一个线程访问,就算有多个线程都只是读取的操作,也还是要排队等待获取锁,我们直到数据如果只涉及到读操作,是不会出现线程安全方面的问题的,那这部分加锁是不是可以去掉?或者是加锁不互斥?如果在读多写少的情况下,使用普通的锁,在所有读的情况加锁互斥等待会是一个及其影响系统并发量的问题,如果所有的读操作不互斥,只有涉及到写的时候才互斥,这样会不会大大的提高并发量呢?答案是肯定的,ReentrantReadWriteLock 就是这样干的,读读不互斥,读写、写读、写写都是互斥的,可以大大提高系统并发量。
源码分析 类结构ReentrantReadWriteLock 仅实现了ReadWriteLock接口
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {...}
ReadWriteLock 接口仅有两个方法,分别是 readLock() 和 writeLock();
主要属性ReentrantReadWriteLock 有3个重要的属性,分别是读锁readerLock,写锁writerLock和同步器sync,源码如下:
private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync;主要内部类
有两个默认的构造方法,无参默认采用非公平锁,有参传入true使用公平锁
public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }获取读写锁
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }获取读锁:readLock.lock()
读锁主要是按照共享模式来获取锁的,在前面讲AQS的例子中——基于AQS实现自己的共享锁,也是差不多的流程,只不过不同的锁的实现方法tryAcquireShared有一定的区别。ReentrantReadWriteLock 读锁获取过程源码如下:
public void lock() { // 共享模式获取锁 sync.acquireShared(1); } // acquireShared 是AQS框架里面的代码 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } // tryAcquireShared 是RRWLock.Sync 里面的自己实现,所以这里没有公平和非公平所谓之称 protected final int tryAcquireShared(int unused) { // 当前想要获得锁的线程 Thread current = Thread.currentThread(); // 获取state值 int c = getState(); // 独占锁被占用了,并且不是当前线程占有的,返回-1,出去要排队 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 读锁共享锁的次数 int r = sharedCount(c); // 判断读是否要阻塞,读共享锁的次数是否超过最大值,CAS 更新锁state值 // readerShouldBlock 的返回要根据同步器是否公平的具体实现来决定 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // r==0, 设置第一次获得读锁的读者 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 持有第一个读者读锁的线程重入计数 firstReaderHoldCount++; } else { // 除第一个线程之后的其他线程获得读锁 // 每个线程每次获得读锁重入计数+1 // readHolds 就是一个ThreadLocal,里面放的HoldCounter,用来统计每个线程的重入次数 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } // 获得读锁,返回1 return 1; } // 上面if分支没进去时,走这里尝试获取读锁 return fullTryAcquireShared(current); }
上面代码中的readerShouldBlock()方法有两种情况下会返回true:
上面的if分支进入失败时,会进入到fullTryAcquireShared()方法再次尝试获得读锁有3种情况会进入到这个方法:
下面是 fullTryAcquireShared() 方法的分析:
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 自旋 for (;;) { int c = getState(); // != 0 已经有其他线程获得了写锁 if (exclusiveCount(c) != 0) { // 如果不是当前线程获得的写锁,返回-1,出去阻塞排队 if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // 要进入到这个分支,说明exclusiveCount(c) == 0 , 也就是写锁没被占用 // readerShouldBlock() == true , 公平模式下,同步队列中有其他线程在排队,非公平模式下,有即将要获得写锁的线程 // readerShouldBlock() 返回true ,也就是要阻塞当前线程的意思 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 进入到这里,说明第一个读锁不是当前线程获得的 // rh 可以理解为当前线程的重入计数 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // 返回-1,阻塞当前线程,出去排队 if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) // 超读锁上限,抛出错误 throw new Error("Maximum lock count exceeded"); // 进入到这儿,说明线程没有其他线程获得了写锁,并且不需要阻塞当前线程 // 再次尝试CAS 获得锁,CAS 修改失败会继续自旋进行 if (compareAndSetState(c, c + SHARED_UNIT)) { // 成功获得锁 if (sharedCount(c) == 0) { // 第一个获得读锁的线程 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 第一个获得读锁的线程重入计数+1 firstReaderHoldCount++; } else { // 非第一个获得读锁的线程 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); // 线程重入计数 rh.count++; // 缓存成功获取readLock的最后一个线程的计数 cachedHoldCounter = rh; // cache for release } return 1; } } }
如果上面fullTryAcquireShared()方法还是没有获得锁,返回-1,就会进入下面的doAcquireShared(int arg)方法:
// doAcquireShared 方法是AQS里面的代码,非RRWLock 实现 private void doAcquireShared(int arg) { // 添加一个共享模式的节点到同步队列,并返回当前节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 中断标识 boolean interrupted = false; // for循环自旋操作 for (;;) { // 在同步队列中,当前节点的前驱结点 final Node p = node.predecessor(); if (p == head) { // 如果前驱结点是头结点,说明排队轮到当前节点获得锁 // tryAcquireShared 再次尝试获取锁,上面的逻辑一模一样 int r = tryAcquireShared(arg); if (r >= 0) { // >=0 说明成功获得了锁 // 设置新的头结点,并检查后面是否是在获得读锁,如果是就唤醒它 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) // 阻塞期间线程被中断了 selfInterrupt(); failed = false; return; } } // 阻塞中断线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { // 旧的头结点 Node h = head; // Record old head for check below // 获得锁的线程节点设置为新的头结点 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 检查获得锁的下一个节点s是否是共享模式的节点(读) if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { // 自旋 for (;;) { Node h = head; // 同步队列不为空 if (h != null && h != tail) { int ws = h.waitStatus; // -1 :表示当前节点的后继节点包含的线程需要运行,也就是unpark if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒被阻塞的下一个节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 只会唤醒一个节点,在调用上面代码过程中,如果head节点变了,就会一直自旋,直到成功 if (h == head) // loop if head changed break; } }
获取读锁过程总结
读锁释放锁的逻辑如下:
public void unlock() { // 开始释放读锁 sync.releaseShared(1); } //AQS框架中 的方法 public final boolean releaseShared(int arg) { // tryReleaseShared 在RRWLock 中的Sync里面 if (tryReleaseShared(arg)) { // 唤醒后面的读锁节点 doReleaseShared(); return true; } return false; } // RRWLock.Sync 的实现方法 protected final boolean tryReleaseShared(int unused) { // 当前线程 Thread current = Thread.currentThread(); if (firstReader == current) { // 第一个读锁线程 if (firstReaderHoldCount == 1) // 如果它只获得了一次锁,直接置为null firstReader = null; else // 第一个线程获得读锁,并且重入获取锁很多次,慢慢减,直到为1,置为null firstReaderHoldCount--; } else { // 不是第一个线程 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) // 线程没有锁,还来释放锁,会抛出异常 throw unmatchedUnlockException(); } // 减计数 --rh.count; } // 上面只是减重入的计数 // 下面是自旋,重置同步状态state值 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // CAS 修改成功,并且要state为0才是真正释放了读锁 // 如果有重入,只有释放最后一次才会返回true, 之后才会去尝试唤醒之后的节点 return nextc == 0; } } private void doReleaseShared() { // 自旋 for (;;) { Node h = head; // 同步等待的队列不为空 if (h != null && h != tail) { int ws = h.waitStatus; // 检查状态是否要唤醒下一个节点的线程 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 加入h节点是持有锁的节点,会唤醒它的下一个节点线程 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 理论上唤醒一个就会退出 if (h == head) // loop if head changed break; } }
释放读锁过程总结:
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { // 当前线程 Thread current = Thread.currentThread(); int c = getState(); // 写锁计数,>0的话说明写锁已经被占用了 int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // c != 0 and w == 0 可能共享锁已经被占用了,这时候写锁获取失败 // 同一个线程先获取读锁,再获取写锁,也会在这里返回false,获取写锁出去之后会阻塞自己, // 然后自己的读锁也不会释放,其他线程也获取不了读锁,就出现了死锁 if (w == 0 || current != getExclusiveOwnerThread()) // c != 0 and w == 0 锁的持有者不是当前线程,返回false return false; if (w + exclusiveCount(acquires) > MAX_COUNT) // 超限了 65535 throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 重入获取锁,计数+1 setState(c + acquires); return true; } // writerShouldBlock的实现代码,以看上面读锁获取readerShouldBlock的分析 // 公平锁时,writerShouldBlock 调用的hasQueuedPredecessors() // 非公平锁时,只返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) // CAS 修改失败,返回false return false; // 成功获取写锁,设置锁的拥有者线程 setExclusiveOwnerThread(current); return true; }
如果上面方法没有获取到写锁,会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ,这块的代码分析,可以查看之前的文章,关于AQS的分析或者ReentrantLock的分析。
释放写锁:writeLock.unlock()释放写锁的逻辑比较简单,一般加锁和解锁都是成对出现的,所以这里解锁并不需要同步互斥的手段来进行,源代码如下:
public void unlock() { sync.release(1); } // AQS 框架的代码 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { // 校验是否是当前线程持有写锁 if (!isHeldExclusively()) // 释放别人的写锁,抛出异常 throw new IllegalMonitorStateException(); // 计算下一个同步状态值 int nextc = getState() - releases; // 重入的情况,是否已经完全释放了 boolean free = exclusiveCount(nextc) == 0; if (free) // 完全释放了,设置锁的持有者线程 setExclusiveOwnerThread(null); // setState(nextc); return free; }
完全释放锁成功后,唤醒下一个节点的逻辑在AQS的unparkSuccessor代码中,不需要RRWLock来实现。
死锁问题在上面获取写锁的过程中,分析了同一个线程先获取读锁,再获取写锁,写锁的逻辑会阻塞自己的线程,但是写锁和读锁又是同一个线程,相当于前面的写锁也被阻塞了,这时候写锁没地方释放,读锁也没有地方释放,其他线程读锁和写锁也都获取不了了,因为前面有个写锁在排队获取。
public static void main(String[] args) throws InterruptedException{ ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); Lock writeLock = lock.writeLock(); Lock readLock = lock.readLock(); new Thread(new Runnable(){ @SneakyThrows @Override public void run(){ TimeUnit.SECONDS.sleep(1); // 模拟1秒后其他线程来获得读锁 System.out.println(Thread.currentThread().getName()+":准备获得读锁"); readLock.lock(); System.out.println(Thread.currentThread().getName()+":线程获得读锁"); readLock.unlock(); System.out.println(Thread.currentThread().getName()+":释放了读锁"); } },"T0").start(); readLock.lock(); System.out.println(Thread.currentThread().getName()+":获得了读锁"); writeLock.lock(); System.out.println(Thread.currentThread().getName()+":获得了写锁"); readLock.unlock(); System.out.println(Thread.currentThread().getName()+":解读锁"); writeLock.unlock(); System.out.println(Thread.currentThread().getName()+":解写锁"); }
输出结果:
main:获得了读锁 T0:准备获得读锁
从上面输出结果可以看出,只有main线程获得了读锁,自己获取写锁被阻塞,其他线程也获取不了读锁,最后产生了死锁。
写线程饥饿问题ReentrantReadWriteLock 的读写是互斥的,意思就是读锁在获取锁后,在还没有释放锁的期间,获取写锁的进程来了也要阻塞自己排队,如果有大量的线程获取了读锁,之后有一个线程获取写锁,写锁就可能一直获取不到写锁,引起写锁线程“饥饿”,这就是RRWLock的写线程饥饿问题。
我们用代码来验证一下上面的结论:
private static void testWriteLockHunger() throws InterruptedException{ ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); Lock writeLock = lock.writeLock(); Lock readLock = lock.readLock(); // T0 线程先获得读锁,并持有一段时间 new Thread(new Runnable(){ @SneakyThrows @Override public void run(){ readLock.lock(); System.out.println(Thread.currentThread().getName()+":最开始线程获得读锁"); // 睡眠15秒,一直持有读锁 TimeUnit.SECONDS.sleep(15); readLock.unlock(); System.out.println(Thread.currentThread().getName()+":释放了读锁"); } },"T0").start(); // 1秒后其他线程再来获取锁,保证前面那个T0线程最先获得读锁 TimeUnit.SECONDS.sleep(1); // TW-1 来排队获取写锁,是为了让后面的读锁,写锁都入队排队 new Thread(new Runnable(){ @SneakyThrows @Override public void run(){ System.out.println(Thread.currentThread().getName()+":准备获得写锁"); writeLock.lock(); System.out.println(Thread.currentThread().getName()+":获得写锁"); TimeUnit.SECONDS.sleep(5); writeLock.unlock(); System.out.println(Thread.currentThread().getName()+":释放了写锁"); } },"TW-1").start(); TimeUnit.SECONDS.sleep(1); // 这里睡眠1秒是为了写锁排队在读锁获取的前面 IntStream.range(1,5).forEach(i->{ new Thread(new Runnable(){ @SneakyThrows @Override public void run(){ System.out.println(Thread.currentThread().getName()+":准备获取读锁"); readLock.lock(); System.out.println(Thread.currentThread().getName()+":获取了读锁"); // 持有部分时间的读锁 TimeUnit.SECONDS.sleep(i*2); readLock.unlock(); System.out.println(Thread.currentThread().getName()+":释放了读锁"); } },"T-"+i).start(); }); // 最后再来个获取写锁的线程,肯定会在所有读锁的后面获取到写锁 new Thread(new Runnable(){ @SneakyThrows @Override public void run(){ System.out.println(Thread.currentThread().getName()+":准备获取写锁"); writeLock.lock(); System.out.println(Thread.currentThread().getName()+":获取了写锁"); // 持有部分时间的读锁 TimeUnit.SECONDS.sleep(2); writeLock.unlock(); System.out.println(Thread.currentThread().getName()+":释放了写锁"); } },"TW").start(); }
上面代码输出示例:
T0:最开始线程获得读锁 TW-1:准备获得写锁 T-1:准备获取读锁 T-2:准备获取读锁 T-4:准备获取读锁 T-3:准备获取读锁 TW:准备获取写锁 T0:释放了读锁 TW-1:获得写锁 TW-1:释放了写锁 T-1:获取了读锁 T-2:获取了读锁 T-4:获取了读锁 T-3:获取了读锁 T-1:释放了读锁 T-2:释放了读锁 T-3:释放了读锁 T-4:释放了读锁 TW:获取了写锁 TW:释放了写锁
从上面输出结果可以看出,TW写锁是最后才获取到写锁的,如果前面有大量的读锁在排队的话,写锁肯定就会造成饥饿的。
如果不想让获取写锁的线程“饥饿”怎么办呢?
可以把最后获取写锁的线程TW获取锁方式改造下,代码如下:
new Thread(new Runnable(){ @SneakyThrows @Override public void run(){ System.out.println(Thread.currentThread().getName()+":准备获取写锁"); while(!writeLock.tryLock()){ // 一直尝试获得写锁,直到成功 } System.out.println(Thread.currentThread().getName()+":获取了写锁"); // 持有部分时间的读锁 TimeUnit.SECONDS.sleep(2); writeLock.unlock(); System.out.println(Thread.currentThread().getName()+":释放了写锁"); } },"TW").start();
测试输出结果:
T0:最开始线程获得读锁 TW-1:准备获得写锁 T-1:准备获取读锁 T-2:准备获取读锁 T-3:准备获取读锁 T-4:准备获取读锁 TW:准备获取写锁 T0:释放了读锁 TW-1:获得写锁 TW-1:释放了写锁 TW:获取了写锁 TW:释放了写锁 T-4:获取了读锁 T-2:获取了读锁 T-3:获取了读锁 T-1:获取了读锁 T-1:释放了读锁 T-2:释放了读锁 T-3:释放了读锁 T-4:释放了读锁
从上面输出结果可以看出,TW线程成功地在读锁前面获取到了写锁;那为什么会这样呢?因为采用lock()来获取锁,如果第一次tryAcquire没有获取到锁,就会被加入到队列等待,只要进入了队列,就只能按照队列中的顺序来获得锁了,而tryLock在获取锁失败后是不会加入到同步等待队列中去的,从而实现“插队”的功能。
总结原文链接:https://www.cnblogs.com/admol/p/13999889.html
如果觉得本文对你有帮助,可以点赞关注支持一下,也可以关注我公众号,上面有更多技术干货文章以及相关资料共享,大家一起学习进步!