目录
Semaphore源码解读
前言
源码解读
Semaphore字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。构造方法:
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
permits 表示许可线程的数量fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程 。
主要方法:
public void acquire() throws InterruptedException
public void release()
tryAcquire(long timeout, TimeUnit unit)
acquire() 表示阻塞并获取许可,release() 表示释放许可,tryAcquire尝试获取,不会阻塞,超时后返回。具体的使用方法非常简单,就不细讲了,下面直接进入源码。
和其他juc中的组件一样,Semaphore内部同样维护了一个Sync继承AbstractQueuedSynchronizer作为同步器的实现,下面看获得许可acquire():
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
会调用AbstractQueuedSynchronizer.acquireSharedInterruptibly(1):
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //如果线程由中断唤醒,直接抛出中断异常 if (tryAcquireShared(arg) < 0) //尝试获取许可,如果返回值不小于0.则说明获取成功 doAcquireSharedInterruptibly(arg); }
进入tryAcquireShared(arg),这个方法有不同的实现,这里看非公平版本,tryAcquireShared(int acquires)--->Semaphore.Sync.nonfairTryAcquireShared(int acquires):
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); //获取state资源状态 int remaining = available - acquires; //计算剩余资源 /** *如果剩余资源小于0,说明无剩余资源,直接返回。否则cas修改资源状态,只要还有剩余资源,则会无限自旋,不断获取 */ if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
如果上面的方法返回值小于0,则说明无剩余资源,进入AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(arg),这个方法的代码和Reentranlock中acquireQueued()大同小异(传送门:Reentantlock源码解读_w7sss的博客-CSDN博客),下面就挑不一样的讲讲:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); //入队,同Reentranlock boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); //如果前驱节点是头节点再尝试一次获取资源 if (r >= 0) { setHeadAndPropagate(node, r); //成功获得资源,修改节点状态为propagate p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //这边线程会正式阻塞,被唤醒后如果判断是由中断唤醒的会抛出中断异常,这里和Reentranlock不一样,Reentranlock的lock方法不会响应中断 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
这里主要和Reentranlock不同的是Semaphore是共享锁,区别在这个方法:AbstractQueuedSynchronizer.setHeadAndPropagate(node, r):
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
这个方法的主要逻辑在if中,这里
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0)
同时出现了一串判断,我把Doug Lea的原文注释贴上了,也说明这个if确实没那么好理解,我们暂且先看if中的逻辑,这要有助于理解这个if判断:
if (s == null || s.isShared())
doReleaseShared(); //这里如果node节点的下一个节点是共享节点,就继续Release,即在接下来的代码有可能继续能唤醒后继节点,这在propagate > 0的时候是理所应当的
进入 doReleaseShared():
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //如果头节点waitStatus是SIGNAL,则唤醒后继节点,这种情况是最好理解的,相信大家自己按正常流程跟代码(不用考虑并发),就能走到这里 } /** * 如果头节点waitStatus是0,则cas修改为PROPAGATE,这里就要考虑并发了,在一个线程 *正在获取资源的时候,又有一个线程释放了资源,也走到这个方法,执行unparkSuccessor(), *head的waitStatus被改为0,说明啥?说明即将又有资源被释放了,所以在这里把waitStatus改为PROPAGATE, *告诉执行到setHeadAndPropagate的线程,你可以进入doReleaseShared()了,因为我马上要把坑空出来了,这时如果有新线程入队,就可以有机会马上被唤醒。某种意义上这有自旋的含义 */ else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) // loop if head changed break; } }
讲到这里,那setHeadAndPropagate中的那个if判断就好理解了:
先看第一个h.waitStatus < 0,因为在doReleaseShared()中有线程释放资源把head的waitStatus改为PROPAGATE<0,表明有剩余资源了,所以这里可以判断进入doReleaseShared()操作直接唤醒可能马上就会入队的线程。
再看第二个h.waitStatus < 0,这时第一个h.waitStatus == 0,说明执行到第一个的时候还没有线程释放资源,但是到了第二个的时候h又被赋值成当前的head了,此时一般h.waitStatus ==0,但是如果此时又发生了在doReleaseShared()中有线程释放资源把head的waitStatus改为PROPAGATE<0的情况,那么也因该判断进入doReleaseShared()操作唤醒可能马上就会入队的线程。
当然了,这里是有可能导致不必要的唤醒的,因为h.waitStatus同样可能是-1,这里这么写的目的估计也是为了让资源尽可能地被充分利用。这段代码要结合多个线程并发的情况看。个人感觉关键还是要看
在doReleaseShared()中有线程释放资源把head的waitStatus改为PROPAGATE<0的情况
是发生在 setHead(node)之前还是之后,这个对应了两处h.waitStatus < 0的判断。
笔者讲的可能也不是太清楚,如有错误请指正。
好了acquire操作讲完了,下面是release操作:
public void release() { sync.releaseShared(1); } 进入 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
我想这个就不用讲了,都是之前的代码。
再次申明:如有错误,欢迎指正!