Semaphore(信号量)是通过计数器来控制线程数量,如果计数器大于0,则允许访问。 如果为0,则拒绝访问。 底层还是使用的AbstractQueuedSynchronizer那一套控制。
定义一个线程数量为8的线程池,使用for循环创建10个任务,任务中使用信号量控制。可以看出,尽管最大线程数量是8,但是实际执行过程中因为获取Semaphore资源,而Semaphore设置为4,所以同时执行的线程数量仍然是4。
private static ExecutorService executorService = Executors.newFixedThreadPool(8); public static void main(String[] args) { Semaphore semaphore = new Semaphore(4); long beginTime = System.currentTimeMillis(); System.out.println(beginTime); for(int i = 0 ; i < 10; i++){ int finalI = i; executorService.execute( () -> { try { semaphore.acquire(); System.out.println("get semaphore " + finalI + " time : "+ ((System.currentTimeMillis() - beginTime ) /1000 )); Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); }finally { semaphore.release(); } }); } } get semaphore 0 time : 0 get semaphore 2 time : 0 get semaphore 1 time : 0 get semaphore 3 time : 0 get semaphore 8 time : 3 get semaphore 4 time : 3 get semaphore 9 time : 3 get semaphore 5 time : 3 get semaphore 6 time : 6 get semaphore 7 time : 6
一般可以用来作为线程隔离使用。比如断路器Hystrix的舱壁模式中,其中一个是基于线程池的隔离,每个微服务调用创建一个线程池,服务之间的异常互不影响,但是维护线程池会带来额外的开销;另一种就是基于信号量的控制,通过控制每一个服务的信号量,来控制每个服务使用的线程数量。
Semaphore semaphore = new Semaphore(4); //底层实际创建一个sync public Semaphore(int permits) { sync = new NonfairSync(permits); } //sync 也是调用父类Sync的构造函数 NonfairSync(int permits) { super(permits); } //最终把这个参数传递给了AbstractQueuedSynchronizer的state参数,用来控制信号数量 abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { setState(permits); } }
semaphore.acquire(); //实际调用了sync的acquireSharedInterruptibly方法,参数就是控制state自减的参数 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //acquireSharedInterruptibly 方法尝试去获取信号,即调用tryAcquireShared(arg)方法,如果state自减之后小于0,表示无信号可用,当前线程调用doAcquireSharedInterruptibly挂起 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // tryAcquireShared方法调用了semaphore静态内部类Sync的nonfairTryAcquireShared方法。 //nonfairTryAcquireShared主要使用一个自循环,对state字段做自减操作,当下列任意条件满足时退出: // 1、当前可用信号量小于0 2、自减操作成功 返回当前可用信号量 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //上一步的if对返回值进行判断,当小于0时,表示无信号可用,当前线程调用doAcquireSharedInterruptibly挂起,底层调用的还是AbstractQueuedSynchronizer的doAcquireSharedInterruptibly方法 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //新增一个node节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //返回该节点的上一个节点 final Node p = node.predecessor(); //如果是头节点,尝试去获取信号 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //获取信号成功,释放p节点,即新增节点的上一个节点,并且将新增节点置为头节点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //判断节点状态,去除一些非正常状态的节点,然后将线程LockSupport.park(this)挂起,等待唤起的时候,调用一下Thread.interrupted()方法判断线程是否异常。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
semaphore.release(); public void release() { //这里调用的还是AbstractQueuedSynchronizer的releaseShared方法 sync.releaseShared(1); } public final boolean releaseShared(int arg) { //这里使用自循环对state做相反的自加操作,直到操作成功返回 if (tryReleaseShared(arg)) { //然后唤醒头节点 doReleaseShared(); return true; } return false; } private void doReleaseShared() { //这里也是使用了一个自循环来剔除异常节点,超时这些,直到获取到一个正常的Node.SIGNAL状态节点,修改他的状态为正常执行,并且unparkSuccessor唤醒该线程 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }