Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的。
Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。
PV操作是操作系统一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。
P操作的主要动作是:
①S减1;
②若S减1后仍大于或等于0,则进程继续执行;
③若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。
V操作的主要动作是:
①S加1;
②若相加后结果大于0,则进程继续执行;
③若相加后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。
/** * Creates a {@code Semaphore} with the given number of * permits and nonfair fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. */ public Semaphore(int permits) { sync = new NonfairSync(permits); } /** * Creates a {@code Semaphore} with the given number of * permits and the given fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. * @param fair {@code true} if this semaphore will guarantee * first-in first-out granting of permits under contention, * else {@code false} */ public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
public void acquire() throws InterruptedException public boolean tryAcquire() public void release() public int availablePermits() public final int getQueueLength() public final boolean hasQueuedThreads() protected void reducePermits(int reduction)
可以用于做流量控制,特别是公用资源有限的应用场景。
public class SemaphoneTest2 { /** * 实现一个同时只能处理5个请求的限流器 */ private static Semaphore semaphore = new Semaphore(5); /** * 定义一个线程池 */ private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200)); /** * 模拟执行方法 */ public static void exec() { try { semaphore.acquire(1); // 模拟真实方法执行 System.out.println("执行exec方法"); Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(1); } } public static void main(String[] args) throws InterruptedException { { for (; ; ) { Thread.sleep(100); // 模拟请求以10个/s的速度 executor.execute(() -> exec()); } } } }
现在是5个5个一执行,做到了限流的功能。
关注点:
1. Semaphore的加锁解锁(共享锁)逻辑实现
2. 线程竞争锁失败入队阻塞逻辑和获取锁的线程释放锁唤醒阻塞线程竞争锁的逻辑实现
源码跟踪流程图:
// permits: 获得许可证的数量 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 判断剩余资源数remaining是否<0 // 如果remaining<0,表示要进行阻塞;否则表示成功获取到锁资源 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
tryAcquireShared在非公平Sync中的实现方法是:nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) { for (;;) { // semaphorer中用state表示可以用资源数 int available = getState(); // 计算剩下的资源数 int remaining = available - acquires; // 如果本次请求数超过了剩余可用资源数,返回剩余数(此时应当是负数) // 如果remaining > 0, 则cas设置剩余资源数,并返回剩余数(此时应当是>=0) if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 创建新节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 死循环保证竞争到锁 for (;;) { // 获取前趋节点 final Node p = node.predecessor(); // 如果前趋节点为头结点,即当前节点为第一个有效节点 if (p == head) { // 再次尝试获取共享锁,得到计算后的锁资源数量remaining int r = tryAcquireShared(arg); // 如果剩余锁资源数>=0,表示获取锁成功 if (r >= 0) { // 将当前节点设置为头结点,断开旧的头结点指针 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 如果当前节点前趋节点不是头结点 // shouldParkAfterFailedAcquire方法判断是否满足阻塞条件(前趋节点waitStatus=-1) if (shouldParkAfterFailedAcquire(p, node) && // 如果满足阻塞条件,阻塞线程并返回中断状态 parkAndCheckInterrupt()) // 如果是被阻塞的,抛出异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
public final boolean releaseShared(int arg) { // 资源数state计算后(恢复资源数),开始释放线程 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); // 计算锁资源数 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // 计算后cas设置资源数,设置成功后返回true if (compareAndSetState(current, next)) return true; } }
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; // 如果头结点不为空并且不是空队列 if (h != null && h != tail) { // 获取头结点等待状态 int ws = h.waitStatus; // 如果是-1,表示满足唤醒线程 if (ws == Node.SIGNAL) { // 将节点等待状态cas设置为0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 成功设置为0后释放阻塞线程 unparkSuccessor(h); } // 如果等待状态是0,并且cas设置等待状态为PROPAGATE失败后,继续执行循环 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。
CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值(count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象 —— count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier。
// 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行 public void await() throws InterruptedException { }; // 和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; // 会将 count 减 1,直至为 0 public void countDown() { sync.releaseShared(1); }
CountDownLatch一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch的初始化决定)任务执行完成。
CountDownLatch的两种使用场景:
场景1 让多个线程等待:模拟并发,让并发线程一起执行
public class CountDownLatchTest { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 5; i++) { new Thread(() -> { try { //准备完毕……运动员都阻塞在这,等待号令 countDownLatch.await(); String parter = "【" + Thread.currentThread().getName() + "】"; System.out.println(parter + "开始执行……"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } Thread.sleep(2000);// 裁判准备发令 System.out.println("开始!!!"); countDownLatch.countDown();// 发令枪:执行发令 } }
场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并
public class CountDownLatchTest2 { public static void main(String[] args) throws Exception { CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { final int index = i; new Thread(() -> { try { Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000)); System.out.println(Thread.currentThread().getName() + " finish task" + index); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } // 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。 countDownLatch.await(); System.out.println("主线程:在所有任务运行完成后,进行结果汇总"); } }
底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻塞线程;这一步是由最后一个执行countdown方法的线程执行的。
而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。
CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
设置Sync中的state的值为count。
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); }
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果线程被中断了 if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取共享锁:判断state即设置的count是否为0 // count = 0 return 1, else return -1 if (tryAcquireShared(arg) < 0) // 阻塞当前主线程 doAcquireSharedInterruptibly(arg); // 如果count = 0,主线程继续向下执行 }
// 判断state即设置的count是否为0 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 创建一个新节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取前趋节点 final Node p = node.predecessor(); // 如果前趋节点是头结点,表明这是第一个新建的节点,不需要阻塞线程,直接去竞争锁即可 if (p == head) { // 尝试获取锁资源,这里其实就是判断count是否等于0 int r = tryAcquireShared(arg); // r >= 0表示count=0,表示不用阻塞了 if (r >= 0) { // 将当前节点设置为头结点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 如果当前节点前趋节点不是头结点,判断是否满足阻塞条件(前趋节点等待状态视为等于-1,如果不是则将其设置为-1) if (shouldParkAfterFailedAcquire(p, node) && // 如果当前节点满足阻塞条件,阻塞线程并返回中断标记 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
await方法调用之后,会调用tryAcquireShared方法区判断count是否等于0,如果是的话,则什么也不做,主线程继续向下执行;
如果count>0,则需要尝试将主线程进行阻塞。这里会有一个死循环,在死循环中首先再次尝试获取一次锁,即再判断一次count是否等于0,如果等于0跳出循环,不阻塞主线程。否则,进行阻塞。
public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { // 尝试获取共享锁,将count值减去arg(1) if (tryReleaseShared(arg)) { // 减去成功后释放共享锁 doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 获取count int c = getState(); // 如果等于0,返回false,表示获取共享锁失败,因为count已经等于0了 if (c == 0) return false; // count-- int nextc = c-1; // cas设置count的值 if (compareAndSetState(c, nextc)) // 设置成功后,返回剩余count是否等于0 return nextc == 0; } }
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; // 队列已经初始化并且有节点 if (h != null && h != tail) { // 获取头结点的等待状态 int ws = h.waitStatus; // 如果是-1,表示当前节点可以被唤醒 if (ws == Node.SIGNAL) { // 首先尝试cas将头结点等待状态从-1修改为0,失败后continue,继续循环知道修改成功 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 头节点等待状态修改为0之后,唤醒线程 unparkSuccessor(h); } // 如果头结点等待状态=0,则cas将其设置为-3 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
// 唤醒线程,node = header private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ // 获取头结点等待状态 int ws = node.waitStatus; // cas将其设置为0,因为此时要唤醒第一个有效节点 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 获取第一个有效节点 Node s = node.next; // 如果队列为空,或者第一个有效节点的等待状态>0(大于0几位1,表示CANCELLED),应当跳过被CANCELLED的节点 if (s == null || s.waitStatus > 0) { s = null; // 从尾结点开始遍历,找到没有被取消的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒线程 LockSupport.unpark(s.thread); }
cas减少state的值,并唤醒阻塞节点中的线程,让其继续执行。
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。 public CyclicBarrier(int parties) // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行) public CyclicBarrier(int parties, Runnable barrierAction)
//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞 // BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时 public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException //循环 通过reset()方法可以进行重置 public void reset()
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。
public class CyclicBarrierTest2 { //保存每个学生的平均成绩 private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(); private ExecutorService threadPool = Executors.newFixedThreadPool(3); private CyclicBarrier cb = new CyclicBarrier(3, () -> { int result = 0; Set<String> set = map.keySet(); for (String s : set) { result += map.get(s); } System.out.println("三人平均成绩为:" + (result / 3) + "分"); }); public void count() { for (int i = 0; i < 3; i++) { threadPool.execute(new Runnable() { @Override public void run() { //获取学生平均成绩 int score = (int) (Math.random() * 40 + 60); map.put(Thread.currentThread().getName(), score); System.out.println(Thread.currentThread().getName() + "同学的平均成绩为:" + score); try { //执行完运行await(),等待所有学生平均成绩都计算完毕 cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); } } public static void main(String[] args) { CyclicBarrierTest2 cb = new CyclicBarrierTest2(); cb.count(); } }
上面线程1和线程2会在await处阻塞,直到线程3也执行到await,这时候会执行Runnable barrierAction,计算三人的平均成绩。
利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发车”的场景。
public class CyclicBarrierTest3 { public static void main(String[] args) { AtomicInteger counter = new AtomicInteger(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 5, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), r -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy()); CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比赛开始~~")); for (int i = 0; i < 10; i++) { threadPoolExecutor.submit(new Runner(cyclicBarrier)); } } static class Runner extends Thread { private CyclicBarrier cyclicBarrier; public Runner(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { int sleepMills = ThreadLocalRandom.current().nextInt(1000); Thread.sleep(sleepMills); System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting()); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } }
关注点:
1.一组线程在触发屏障之前互相等待,最后一个线程到达屏障后唤醒逻辑是如何实现的
2.删栏循环使用是如何实现的
3.条件队列到同步队列的转换实现逻辑
// The command to run when tripped private final Runnable barrierCommand; public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); // 备份一份参与者数量,因为reset之后要恢复数据 this.parties = parties; // 设置count this.count = parties; // 触发之后要执行的命令 this.barrierCommand = barrierAction; }
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 这里使用的是reentrantLock final ReentrantLock lock = this.lock; lock.lock(); try { // generation:一代。创建第一次 final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); // 如果线程被中断了,抛出异常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 减少count int index = --count; // 计算后如果是0,触发action if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 更新障碍物行程的状态并唤醒所有。仅在持有锁时调用 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { // 如果未超时,一直等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); // 开始下一代,即这次竞争结束开始新一轮 private void nextGeneration() { // signal completion of last generation // 唤醒所有,这里是使用的reentrantLock的条件队列 trip.signalAll(); // set up next generation // 设置count为初始备份数量 count = parties; generation = new Generation(); }