Semaphore(信号量)是用来控制同时访问特定资源的线程数量,调用acquire方法尝试获取一个许可(如果没有许可则等待,直到有许可出现),调用release释放一个许可。
Semaphore通常用来控制同时访问特定资源的线程数量,通过协调各个线程以保证资源的合理应用。
比如说停车场的例子,停车场的车位是有限的,同一时间内可以停的车的最大数量是一定的,当停车场停的车的数量满了的时候,这个时候如果又有车想要停的时候,必须等待停车场中的车离开车位才可以,接下来我们来模拟一下停车场的例子。
// 用Semaphore模拟停车场的例子 package Semaphore; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; public class Demo01 { public static void main(String[] args) { // 代表现在停车场有三个车位 Semaphore semaphore = new Semaphore(3); // 现在有5辆车 int n = 5; for (int i = 1; i <= n; i++) { new Thread(() -> { try { // 车辆尝试获取许可,也就是尝试停车 semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "抢到车位"); TimeUnit.SECONDS.sleep(2); // 之后离开车位 System.out.println(Thread.currentThread().getName() + "离开车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 离开车位要释放许可 semaphore.release(); } }, "线程" + i).start(); } } }
结果:
从上面的例子我们可以更加理解Semaphore的作用。Semaphore构造时会初始化固定数量n的许可,调用acquire()会尝试获取一个许可,调用release()会释放一个许可,同一时刻最多只能有n个线程会拿到许可,当没有许可时,只有等到其他线程释放许可才可以拿到许可。
Semaphore中有三个内部类,分别是Sync、NonfairSync、FairSync,其中Sync继承了AQS,用来获取一系列多线程访问共享资源的模板,NonfairSync、FairSync是Sync的子类,其中NonfairSync和FairSync最主要的区别是前者是非公平的获取资源、后者是公平的获取资源,在实现Semaphore的功能时,Sync起着非常重要的作用,继承了AQS,又可以利用多态的特性使用非公平模式或公平模式,非常方便。
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; // 设置state Sync(int permits) { setState(permits); } // 获取state的值 final int getPermits() { return getState(); } // 非公平的获取资源(直接尝试和AQS中首部接点的后继节点争抢资源) final int nonfairTryAcquireShared(int acquires) { for (;;) { // // 获取当前state的值 int available = getState(); // 获取资源时state的值要减小,这里得到减小后state的值 int remaining = available - acquires; // 如果减小后的state的值小于0直接返回减小后state的值,否则设置state的值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 尝试释放资源 protected final boolean tryReleaseShared(int releases) { for (;;) { // 获取当前state的值 int current = getState(); // 释放资源state的值要增加,获得释放资源后state的值 int next = current + releases; // 这里说明释放资源时传的参数为负数,非法抛出异常 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // CAS操作设置state的值 if (compareAndSetState(current, next)) return true; } } // 减少许可 final void reducePermits(int reductions) { for (;;) { // 获取当前state的值 int current = getState(); // 获取减小许可之后state的值 int next = current - reductions; // 如果next > current说明传入的参数为负,抛出异常 if (next > current) // underflow throw new Error("Permit count underflow"); // 原子设置state的值 if (compareAndSetState(current, next)) return; } } // 销毁许可(清空许可) final int drainPermits() { for (;;) { // 获取当前state的值 int current = getState(); // 当前state的值为0直接返回state的值,否则原子更新state的值为0再返回state的值(也就是0) if (current == 0 || compareAndSetState(current, 0)) return current; } } }
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; // 调用父类Sync的构造方法 NonfairSync(int permits) { super(permits); } // 非公平的获取资源,这个方法的实现其实父类Sync已经实现了,所以直接调用即可 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; // 调用父类的构造方法 FairSync(int permits) { super(permits); } // 公平的获取资源 protected int tryAcquireShared(int acquires) { for (;;) { // 判断AQS队列中1. 头部结点的后继节点是否为空或者2. 头部结点的后继节点是否不是当前线程 // 如果阻塞队列不为空直接返回-1 if (hasQueuedPredecessors()) return -1; // 执行到这里说明阻塞队列为空或者当前线程是AQS中的head的后继节点,那当前线程就可以尝试获取资源了 // 得到当前state的值 int available = getState(); // 得到获取资源后state的值 int remaining = available - acquires; // 得到资源后state的值小于0直接返回state的值,否则CAS更新state的值再返回state的值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } // 这个方法位于AQS中 // 1. 阻塞队列为空或者2. 当前正在执行线程的后继节点不是当前线程返回true // 否则返回false public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
// SemaphoreSemaphore中的成员变量 // 记录版本号 private static final long serialVersionUID = -3222578661600680210L; // SemaphoreSemaphore内部中最重要的成员变量便是Sync变量也是除了版本变量外的唯一一个变量 private final Sync sync;
// 只传入一个int类型参数的构造方法采用的是非公平竞争资源的方式 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 传入一个int类型参数和一个boolean类型参数的构造方法可以指定竞争资源的方式是公平方式还是非公平方式 // 传入true代表是公平方式传入false代表是公平方式 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
获取资源方法
// 获取一个资源(许可),里面调用acquireSharedInterruptibly(int permits)方法 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 获取指定数量的资源(许可),里面调用acquireSharedInterruptibly(int permits)方法 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
获取资源模板
// acquireSharedInterruptibly(int arg)方法位于AQS中 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 有中断就抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取共享资源,失败返回小于0的数,之后调用doAcquireSharedInterruptibly(arg)方法 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
尝试获取资源
// 这个是内部类FairSync的成员方法,公平竞争资源 protected int tryAcquireShared(int acquires) { for (;;) { // 判断阻塞队列是否有等待者线程,有的话直接返回-1结束方法 if (hasQueuedPredecessors()) return -1; // 没有等待者线程就尝试获取资源 // 得到state的值 int available = getState(); // 得到获取资源后state的值 int remaining = available - acquires; // state小于0说明获取资源失败直接返回state的值否则CAS更新state的值并返回state if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 这个是内部类NonfairSync的成员方法,它会调用父类Sync的成员方法nonfairTryAcquireShared(acquires)方法,非公平竞争资源 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // 这个是内部类Sync的成员方法 final int nonfairTryAcquireShared(int acquires) { for (;;) { // 如果是非公平竞争方式,就不用看有没有等待者线程了,直接尝试获取资源 // 得到当前state的值 int available = getState(); // 得到获取资源后state的值 int remaining = available - acquires; // 获取资源后state值小于0直接返回state的值否则CAS更新state的值并返回state if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
如果获取资源失败返回一个小于0的数就会执行这个方法
// 如果获取资源失败返回一个小于0的数就会执行这个方法,这个方法位于AQS中 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 将当前线程包装成Node结点加入AQS队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 如果当前节点是head结点的后继节点则当前线程可以继续尝试获取资源 int r = tryAcquireShared(arg); if (r >= 0) { // 获取资源成功就设置新的头结点之后清空原来的头结点就直接返回 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 当前节点不是head结点的后继节点判断是否应该挂起,应该挂起就把当前线程挂起(等待状态) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
// 这两个方法都位于Semaphore中,会调用内部类Sync的成员方法releaseShared(arg) public void release() { sync.releaseShared(1); } public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
释放资源模板
// 该方法位于AQS中 public final boolean releaseShared(int arg) { // 尝试释放资源,释放资源成功,调用doReleaseShared()方法 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
尝试释放资源
// 该方法位于Semaphore的内部类Sync中 protected final boolean tryReleaseShared(int releases) { for (;;) { // 获取当前state的值 // 释放资源state值要增加 int current = getState(); // 得到释放资源后state的值 int next = current + releases; // next < current说明传入参数是负数,非法,抛出异常 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // 尝试CAS更新state的值,更新成功返回true if (compareAndSetState(current, next)) return true; } }
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 当前节点的状态是唤醒后继节点 if (ws == Node.SIGNAL) { // 更新头结点状态为0(默认状态),更新失败自旋,更新成功唤醒后继结点 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒head的后继节点 unparkSuccessor(h); } // 当前头结点状态是0(默认状态)就更新当前头结点状态为共享锁需要无条件的传播(更新失败自旋) else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
非中断获取资源,获取资源失败进入AQS队列排队
public void acquireUninterruptibly() { sync.acquireShared(1); }
直接获取资源,会调用非公平竞争获取资源方法,直接竞争,返回的参数大于等于0代表获取资源成功
// 直接尝试获取资源 public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } // 在一定时间内获取资源 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }
得到当前许可的值
public int availablePermits() { return sync.getPermits(); }
销毁(清空)许可,调用内部类Sync的drainPermits()方法
public int drainPermits() { return sync.drainPermits(); }
减少指定数量的许可,调用内部类Sync的reducePermits(reduction)方法
protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); }
判断竞争资源方式是否是公平的
public boolean isFair() { return sync instanceof FairSync; }