ReentrantReadWriteLock是基于AQS的,对AQS不太了解的建议先看下我之前写的关于AQS源码分析的文章 AQS源码详细分析,让你掌握AQS原理,独占锁、共享锁、Condition
对于写少读多的场景,读写锁相对于独占锁ReentrantLock有着很大的提升。因为每次读都要加锁解锁,耗费资源。
读写锁通过读锁和写锁的配合,多个线程可以同时拥有读锁,而同时只有一个线程拥有写锁,并且写锁和读锁是冲突的,写锁和写锁也冲突。
如上图所示,ReentrantReadWriteLock内置了一个ReadLock和WriteLock,ReadLock和WriteLock是共用一个Sync的,Sync是AQS的一个子类。而NonfairSync和FairSync又是Sync的子类,分别对应着非公平锁和公平锁。
1、初始化
public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; }
ReentrantReadWriteLock提供了两个构造函数,空的构造函数默认生成的非公平锁,另一个可以传入boolean变量来控制是否创建公平锁。
然后会创建对应的ReadLock和WriteLock,从代码可以看出,ReadLock和WriteLock是共用一个Sync对象。
2、state
state是锁的当前状态的判断依据,我们先说下tryAcquireShared()方法中的state。
因为同时要统计写锁的重入次数和拥有读锁的线程个数,ReentrantReadWriteLock将int类型的state的前16位来表示读锁的线程个数,用后16位表示写锁的重入次数。
通过sharedCount()来获取对应的share个数,代码如下
static final int SHARED_SHIFT = 16; static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
通过exclusiveCount()来获取对应的写锁重入次数,代码如下:
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
3、读锁的申请
public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//返回小于0的数,代表锁不可用,需要将线程挂起 doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); //当前state的值 int c = getState(); //如果其他线程拥有了写锁,直接返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //获取共享线程的个数 int r = sharedCount(c); //此时对应两种情况: 1、读锁状态 2、当前线程拥有写锁,这时申请读锁,相当于一次重入,但是写锁仍不会释放 if (!readerShouldBlock() && //读锁的次数小于最大次数 //static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; r < MAX_COUNT && //cas将state++ compareAndSetState(c, c + SHARED_UNIT)) { //如果r==0,说明当前线程是第一个拥有读锁的线程 if (r == 0) { //第一个线程设置 firstReader = current; //第一个线程读锁的重入次数 firstReaderHoldCount = 1; //如果是线程的重入 } else if (firstReader == current) { //线程重入次数+1 firstReaderHoldCount++; } else { //static final class HoldCounter { // int count = 0; // final long tid = getThreadId(Thread.currentThread()); //} // static final class ThreadLocalHoldCounter // extends ThreadLocal<HoldCounter> { // public HoldCounter initialValue() { // return new HoldCounter(); // } //} //private transient ThreadLocalHoldCounter readHolds; //如上述代码所示,HoldCounter是用来统计拥有读锁线程的Id和重入次数 //每个线程都对应着一个ThreadLocalMap,存放线程的私有变量,然后通过ThreadLocal来管理线程对应的ThreadLocalMap。HoldCounter是存到线程中的ThreadLocalMap中的,取一次优点麻烦,所以弄了一个缓存cachedHoldCounter HoldCounter rh = cachedHoldCounter; //如果缓存为空,或者是缓存并不是当前线程对应的HoldCounter if (rh == null || rh.tid != getThreadId(current)) //从ThreadLocalHoldCounter中取出HoldCounter cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); //将count++ rh.count++; } //请求读锁成功,返回1 return 1; } //1、CAS失败 //2、需要被阻塞 //3、读锁的个数超过了最大个数 return fullTryAcquireShared(current); }
readerShouldBlock()是用来判断当前读锁是否应该被阻塞。公平锁和非公平锁的实现不一样,我们接下来对比一下
首先看非公平锁
//非公平锁 final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; //如果queue中第二个节点是写锁就阻塞,否则的话,有可能会造成写锁的饥饿,写锁一直拿不到 return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
公平锁
final boolean readerShouldBlock() { return hasQueuedPredecessors(); } public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; //只要第二个节点对应的申请线程(包括null)不是当前线程就阻塞,老老实实去排队去,这才公平嘛。第二个节点为当前对象就说明线程已经入队过了,并且被唤醒了 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
可以看出,为了防止写锁的饥饿,读锁即使能够申请到锁,也会放弃锁,直接入队等待。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { //写锁模式 //当前线程没有拥有写锁,赶紧去排队 if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) {//当前是读锁模式并且满足阻塞条件 //如果是第一个读锁线程,不做处理,此时当前线程的读锁重入数肯定不为0. if (firstReader == current) { } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { //获取对应的计数器 rh = readHolds.get(); //去过计数器为0,代表当前线程并没有拥有读锁,将其从ThreadLocalMap中移除 if (rh.count == 0) readHolds.remove(); } } //如果当前线程没有拥有读锁,并且满足阻塞条件,就返回-1,让线程进入queue去排队,别让写锁饿死。 if (rh.count == 0) return -1; } } //如果超过了读锁数量限制,就抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //申请锁成功的两个条件 //1、当前为写锁模式,锁降级 //2、当前为读锁模式,但是当前线程已经拥有了读锁,属于读锁重入。 if (compareAndSetState(c, c + SHARED_UNIT)) { //如果是第一个拥有读锁的线程 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; //如果第一个读锁线程重入 } else if (firstReader == current) { firstReaderHoldCount++; } else { //之前说过,就是将计数器+1 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } //返回1,表示申请锁成功 return 1; } } }
//将读锁线程加入到queue中,之前在AQS源码详解中说过,不在叙述 private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //如果head的第二个节点是读锁,则将其唤醒 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
读锁的释放
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) {如果是第一个申请锁的线程 如果firstReaderHoldCount==1,说明需要释放锁了 if (firstReaderHoldCount == 1) firstReader = null; else//不需要释放锁,只是将重入-1 firstReaderHoldCount--; } else { //从ThreadLocal中获取对应的计数器,将count-- HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { //从ThreadLocalMap中去除 readHolds.remove(); //出异常了 if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; //将state减去SHARED_UNIT,效果等价于sharestate-1 if (compareAndSetState(c, nextc)) //当所有读锁都释放的时候,才会唤醒后继节点 return nextc == 0; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //如果有待唤醒的线程 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //唤醒h的后继节点 unparkSuccessor(h); }//这时CAS失败,说明head的后继节点将head的waitStatus改为SIGNAL了,还需要继续循环 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
写锁的申请
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) {//当前有锁,读锁或者写锁 //当前是读锁或者是别的线程拥有写锁,直接返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //当前线程拥有写锁,此时是锁的重入,超过MAX_COUNT报错 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 设置state setState(c + acquires); return true; } //读锁和写锁都没有,如果被阻塞,或者CAS更新失败返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //更新成功设置独占线程 setExclusiveOwnerThread(current); return true; }
对于公平锁和非公平锁,writerShouldBlock是不一样的
公平锁
final boolean writerShouldBlock() { return hasQueuedPredecessors(); } public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; //只要第二个节点对应的申请线程(包括null)不是当前线程就阻塞,老老实实去排队去,这才公平嘛。第二个节点为当前对象就说明线程已经入队过了,并且被唤醒了 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
非公平锁
final boolean writerShouldBlock() { return false; // 返回false ,永远不阻塞 }
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))我们之前在AQS中已经说烂了。。。就不再详细说了,感兴趣的可以看我之前写的文章!
写锁释放
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; //如果state == 0,说明该释放写锁了,free = true boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
代码贴这里了,我也不展开说了,之前在AQS说过好多遍了。。。。
之前我们在AQS中说过了,锁工具依赖AQS来加锁,需要以下几步
1、继承AQS
2、重写tryReleaseShared来制定获取共享锁的逻辑
3、重写tryReleaseShared来指定释放共享锁的逻辑
4、重写tryAcquire来制定获取独占锁的逻辑
5、重写tryRelease来制定释放独占锁的逻辑
当我们掌握了AQS的源码之后,再看基于AQS的锁工具时,仔细分析上面的四个方法就可以了。读写锁同时实现了共享锁和独占锁,重写了上面的四个方法。实现什么锁就对应着重写什么方法就ok了。
写锁的降级
当线程拥有写锁的时候,可以在不释放写锁的前提下,直接获取读锁,这叫做锁的降级。这个时候不会释放写锁,
读锁是不能升级为写锁的,因为这个时候可能会有很多线程拥有读锁,这时候申请写锁是不合适的。
而写锁降级的时候,只有当前线程拥有写锁,这个时候去申请读锁是没毛病的。
ReentrantReadWriteLock的核心:
1、ReentrantReadWriteLock中同时实现了共享锁和独占锁,共享锁对应着读锁,独占锁对应着写锁,读锁和写锁 共同使用一个state。
state是一个int类型的变量,前十六位表示拥有读锁的线程个数,后十六位表示写锁的重入次数。
2、因为将读锁和写锁集合到了一块,写锁和读锁都对应着有公平锁和非公平锁,公平锁和非公平锁对应着不同的抢锁方式,和ReentrantLock还不一样的是,读锁的抢锁方式和写锁的抢锁方式也会互相影响。读锁如果一直抢占,那么写锁就永远无法获取到锁。
读锁和写锁的公平锁对应的抢占策略都是:直接排队到队列末尾
读锁的非公平锁抢占策略:如果队列的第二个节点不是写锁线程就申请,要不就去排队
写锁的非公平策略:直接抢占,如果抢不到才去排队
读锁加锁总结:
公平锁
1、如果当前是读锁模式,当前队列不为空,并且第二个节点的线程不等于当前线程,并且当前线程不持有读锁,就直接入队
其他情况可以自旋夺锁,直到成功
2、如果当前是写锁模式,并且写锁是由当前线程拥有的,就直接获得读锁,但是不会释放写锁。
3、如果当前是写锁模式,但是写锁不是由当前线程拥有的,就直接去排队。
4、如果是无锁模式,可以尝试夺锁,夺锁失败就再判断情况,看符合1、2、3哪条规定
非公平锁
1、如果当前是读锁模式,队列不为空,并且第二个节点为写锁对象,并且当前对象没有持有读锁,就直接入队
其他情况就自旋直到加锁成功。
2、如果当前是写锁模式,并且写锁是由当前线程拥有的,就直接获得读锁,但是不会释放写锁。
3、如果当前是写锁模式,但是写锁不是由当前线程拥有的,就直接去排队。
4、如果是无锁模式,可以尝试夺锁,夺锁失败就再判断情况,看符合1、2、3哪条规定
读锁释放总结
1、只有当前所有读锁都释放的时候,才会唤醒后继的节点来进行夺锁
2、当读锁得到锁的时候,如果后继节点是读锁请求的话,也会唤醒后继节点来尝试加锁。
写锁加锁总结
公平锁
1、如果是读锁模式,就直接入队。
2、如果是写锁模式,并且不是当前线程拥有的写锁,也是直接入队
3、如果是写锁模式,是当前线程拥有的写锁,就直接重入,将重入次数+1
4、如果当前是无锁模式,如果当前队列的第二个节点不等于当前线程就直接入队。
如果第二个节点是当前线程,就尝试夺锁,失败也入队。
非公平锁
1、如果是读锁模式,就直接入队
2、如果是写锁模式,并且不是当前线程拥有的写锁,也是直接入队
3、如果是写锁模式,是当前线程拥有的写锁,就直接重入,将重入次数+1
4、如果当前是无锁模式,就尝试夺锁,夺锁失败就入队。
写锁释放总结
如果当前写锁的重入次数=0,就唤醒后继节点来夺锁。