关注:王有志,一个分享硬核Java技术的互金摸鱼侠。
欢迎你加入Java人的提桶跑路群:共同富裕的Java人
今天我们来聊一聊AQS家族中另一个重要成员Semaphore,我只收集到了一道关于Semaphore的面试题,问了问“是什么”和“如何实现的”:
按照我们的惯例,依旧是按照“是什么”,“怎么用”和“如何实现的”这3步来分析Semaphore。另外,今天提供了题解。
Semaphore直译过来是信号量,是计算机科学中非常Old School的处理同步与互斥的机制,与互斥锁不同的是它允许指定数量的线程或进程访问共享资源。
Semaphore处理同步与互斥的机制和我们平时过地铁站的闸机非常相似。刷卡打开闸机(acquire
操作),通过后(访问临界区)闸机关闭(release
操作),后面的人才能够继续刷卡,而在前一个人通过前,后面的人只能排队等候(队列机制)。当然,地铁站不可能只有一个闸机,拥有几个闸机,就允许几个人同时通过。
信号量也是这样的,通过构造函数定义许可数量,使用时申请许可,处理完业务逻辑后释放许可:
// 信号量中定义1个许可 Semaphore semaphore = new Semaphore(1); // 申请许可 semaphore.acquire(); ...... // 释放许可 semaphore.release();
当我们为Semaphore定义一个许可时,它和互斥锁相同,同一时间只允许一个线程进入临界区。但是当我们定义了多个许可时,它与互斥锁的差异就体现出来了:
Semaphore semaphore = new Semaphore(3); for(int i = 1; i < 5; i++) { int finalI = i; new Thread(()-> { try { semaphore.acquire(); System.out.println("第[" + finalI + "]个线程获取到semaphore"); TimeUnit.SECONDS.sleep(10); semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } }).start(); }
执行这段代码可以看到,同一时间3个线程都进入了临界区,只有第4个线程被挡在了临界区外。
还记得在《AQS的今生,构建出JUC的基础》中提到的同步状态吗?我们当时说它是某些同步器的计数器:
AQS中,state不仅用作表示同步状态,也是某些同步器实现的计数器,如:Semaphore中允许通过的线程数量,ReentrantLock中可重入特性的实现,都依赖于
state
作为计数器的特性。
先来看Semaphore与AQS的关系:
与ReentrantLock一样,Semaphore内部实现了继承自AQS的同步器抽象类Sync
,并有FairSync
和NonfairSync
两个实现类。接下来我们就通过剖析Semaphore的源码,来验证我们之前的说法。
Semaphore提供了两个构造方法:
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
可以看到Semaphore和ReentrantLock的设计思路是一致的,Semaphore内部也实现了两个同步器FairSync
和NonfairSync
,分别实现公平模式和非公平模式,而Semaphore的构造本质上是构造同步器的实现。我们以非公平模式的NonfairSync
的实现为例:
public class Semaphore implements java.io.Serializable { static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } } abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { setState(permits); } } } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected final void setState(int newState) { state = newState; } }
追根溯源,构造器的参数permits
最终还是回归到了AQS的state
身上,借助了state
作为计数器的特性来实现Semaphore的功能。
现在我们已经为Semaphore设置了一定数量的许可(permits),接下来我们就需要通过Semaphore#acquire
方法获取许可,进入Semaphore所“守护”的临界区:
public class Semaphore implements java.io.Serializable { public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } if (tryAcquireShared(arg) < 0) { doAcquireSharedInterruptibly(arg); } } }
这两步和ReentrantLock非常相似,先通过tryAcquireShared
尝试直接获取许可,失败后通过doAcquireSharedInterruptibly
加入到等待队列中。
Semaphore中直接获取许可的逻辑非常简单:
static final class NonfairSync extends Sync { protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { final int nonfairTryAcquireShared(int acquires) { for (;;) { // 获取可用许可数量 int available = getState(); // 计算许可数量 int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } } }
首先是获取并减少可用许可的数量,当许可数量小于0时返回一个负数,或通过CAS更新许可数量成功后,返回一个正数。此时doAcquireSharedInterruptibly
会将当前的申请Semaphore许可的线程添加到AQS的等待队列中。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 创建共享模式的等待节点 final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 再次尝试获取许可,并返回剩余许可数量 int r = tryAcquireShared(arg); if (r >= 0) { // 获取成功,更新头节点 setHeadAndPropagate(node, r); p.next = null; return; } } // 获取失败进入等待状态 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { throw new InterruptedException(); } } } catch (Throwable t) { cancelAcquire(node); throw t; } } }
Semaphore的使用的doAcquireSharedInterruptibly
与ReentrantLock
使用的acquireQueued
方法核心逻辑一直,但是有细微的实现差别:
创建节点使用Node.SHARED
模式;
更新头节点使用了setHeadAndPropagate
方法。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); // 是否要唤醒等待中的节点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) { // 唤醒等待中的节点 doReleaseShared(); } } }
我们知道在ReentrantLock中执行acquireQueued
,当成功获取锁后,只需要执行setHead(node)
即可,那么为什么Semaphore还要再进行唤醒?
假设有3个许可的Semaphore同时有T1,T2,T3和T4总计4个线程竞争:
它们同时进入nonfairTryAcquireShared
方法,假设只有T1通过compareAndSetState(available, remaining)
成功修改有效的许可数量,T1进入临界区;
T2,T3和T4进入doAcquireSharedInterruptibly
方法,通过addWaiter(Node.SHARED)
构建出AQS的等待队列(参考AQS的今生中关于addWaiter
方法的分析);
假设T2成为了头节点的直接后继节点,T2再次执行tryAcquireShared
尝试获取许可,T3和T4执行parkAndCheckInterrupt
;
T2成功获取许可并进入临界区,此时Semaphore剩余1个许可,而T3和T4处于暂停状态中。
这种场景中,只有两个许可产生了作用,显然不符合我们对的初衷,因此在执行setHeadAndPropagate
更新头节点时,判断剩余许可的数量,当数量大于0时继续唤醒后继节点。
Tips:
Semaphore在获取许可的流程与ReentrantLock加锁的过程高度相似~~
下文分析doReleaseShared
是如何唤醒等待中节点的。
Semaphore的release方法就非常简单了:
public class Semaphore implements java.io.Serializable { public void release() { sync.releaseShared(1); } abstract static class Sync extends AbstractQueuedSynchronizer { protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); // 计算许可数量 int next = current + releases; if (next < current) { throw new Error("Maximum permit count exceeded"); } // 通过CAS更新许可数量 if (compareAndSetState(current, next)) { return true; } } } } } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; // 判断AQS的等待队列是否为空 if (h != null && h != tail) { int ws = h.waitStatus; // 判断当前节点是否处于待唤醒的状态 if (ws == Node.SIGNAL) { if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)){ continue; } unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) { // 状态为0时,更新节点的状态为无条件传播 continue; } } if (h == head) { break; } } } }
我们可以看到Semaphore的release
方法分了两部分:
tryReleaseShared
方法更新Semaphore的有效许可数量;
doReleaseShared
唤醒处于等待中的节点。
唤醒的逻辑并不复杂,依旧是对节点状态waitStatus
的判断,来确定是否需要执行unparkSuccessor
,当状态为ws == 0
,会将节点的状态更新为Node.PROPAGAT
,即无条件传播。
Tips:与ReentrantLock所不同的是,Semaphore并不支持Node.CONDITION
状态,同样的ReentrantLock也不支持Node.PROPAGATE
状态。
关于Semaphore的内容到这里就结束了,今天我们只具体分析了非公平模式下核心方法的实现,至于公平模式的实现,以及其它方法的实现,就留个大家自行探索了。
好了,希望本文能够带给你一些帮助,我们下次再见!最后欢迎大家关注王有志的专栏《Java面试都问啥?》。