信号量(Semaphore)允许多个线程同时访问临界区资源,而 ReentrantLock 这类锁同时只允许一个线程访问临界区资源。
Semaphore 就是共享锁,它的构造方法可以传一个许可数 permits,表示临界区资源数量,多少个资源同时也最多只能由多少个线程来访问。当 permits 等于 1 时,Semaphore 就等价于 ReentrantLock 这类互斥锁。
代码示例:
@Test public void test() throws InterruptedException { // 定义一个线程池 ExecutorService executor = Executors.newFixedThreadPool(3); int userNuber = 10; CountDownLatch countDownLatch = new CountDownLatch(userNuber); // 三个许可,表示资源总数,这里表示只有三本书籍,所以最多只能由三个人借阅,即最多只能有三个线程获取到锁 int permits = 3; Semaphore semaphore = new Semaphore(permits); IntStream.range(0, userNuber).forEach(i -> { executor.submit(() -> { try { System.out.println("学生" + i + "等待借阅书籍......"); // 获取一个许可(加锁) semaphore.acquire(1); System.out.println("学生" + i + "借阅到了一个本书籍,当前还剩余" + semaphore.availablePermits() + "本书籍"); Thread.sleep(i * 500); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放一个许可(解锁) semaphore.release(1); System.out.println("学生" + i + "归还了一个本书籍,当前还剩余" + semaphore.availablePermits() + "本书籍"); countDownLatch.countDown(); } }); }); countDownLatch.await(); executor.shutdown(); /** * 执行结果: * * 学生0等待借阅书籍...... * 学生0借阅到了一个本书籍,当前还剩余2本书籍 * 学生1等待借阅书籍...... * 学生1借阅到了一个本书籍,当前还剩余1本书籍 * 学生2等待借阅书籍...... * 学生2借阅到了一个本书籍,当前还剩余0本书籍 * 学生0归还了一个本书籍,当前还剩余1本书籍 * 学生3等待借阅书籍...... * 学生3借阅到了一个本书籍,当前还剩余0本书籍 * 学生1归还了一个本书籍,当前还剩余1本书籍 * 学生4等待借阅书籍...... * 学生4借阅到了一个本书籍,当前还剩余0本书籍 * 学生2归还了一个本书籍,当前还剩余1本书籍 * 学生5等待借阅书籍...... * 学生5借阅到了一个本书籍,当前还剩余0本书籍 * 学生3归还了一个本书籍,当前还剩余1本书籍 * 学生6等待借阅书籍...... * 学生6借阅到了一个本书籍,当前还剩余0本书籍 * 学生4归还了一个本书籍,当前还剩余1本书籍 * 学生7等待借阅书籍...... * 学生7借阅到了一个本书籍,当前还剩余0本书籍 * 学生5归还了一个本书籍,当前还剩余1本书籍 * 学生8等待借阅书籍...... * 学生8借阅到了一个本书籍,当前还剩余0本书籍 * 学生6归还了一个本书籍,当前还剩余1本书籍 * 学生9等待借阅书籍...... * 学生9借阅到了一个本书籍,当前还剩余0本书籍 * 学生7归还了一个本书籍,当前还剩余1本书籍 * 学生8归还了一个本书籍,当前还剩余2本书籍 * 学生9归还了一个本书籍,当前还剩余3本书籍 */ }
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; // Semaphore 构造函数的传参 permits 赋值给 state Sync(int permits) { setState(permits); } // 获取许可数(即 state 的值) final int getPermits() { return getState(); } // 非公平锁方式获取共享锁 final int nonfairTryAcquireShared(int acquires) { for (;;) { // 当前可用的许可数 int available = getState(); // 当前申请锁后剩余的许可数 int remaining = available - acquires; // 如果剩余许可数小于 0 直接返回剩余许可数 // 如果剩余许可数大于 0,将其设置为 state,如果成功,则返回剩余许可数 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 释放锁 protected final boolean tryReleaseShared(int releases) { for (;;) { // 当前可用的许可数 int current = getState(); // (释放锁后)增加许可数 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // 将增加后的许可数赋值给 state,成功则返回,否则自旋重试 if (compareAndSetState(current, next)) return true; } } // 减少许可数 final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } // 清空许可数 final int drainPermits() { for (;;) { // 当前许可数 int current = getState(); // 将 state 设置为 0 if (current == 0 || compareAndSetState(current, 0)) // 返回当前许可数 return current; } } }
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { // 如果当前线程节点还有前序节点,则获取锁失败 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
// 默认使用非公平锁 public Semaphore(int permits) { sync = new NonfairSync(permits); } // permits 表示同时允许访问临界区资源的线程数 // fair 表示使用公平锁实现还是非公平锁实现 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
// 可中断地阻塞获取锁 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
// 不可中断地阻塞获取锁 public void acquireUninterruptibly() { sync.acquireShared(1); }
public boolean tryAcquire() { // nonfairTryAcquireShared 返回剩余可用许可数 // 剩余可用许可数大于等于 0,表示可以获取锁 return sync.nonfairTryAcquireShared(1) >= 0; } public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
// 释放锁(一个许可) public void release() { sync.releaseShared(1); }
// 一次获取 permits 个许可(可中断,阻塞获取) public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
// 一次获取 permits 个许可(不可中断、阻塞获取) public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }
// 一次获取 permits 个许可(可中断,非阻塞获取) public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } // 一次获取 permits 个许可(可中断,非阻塞获取、超时可退出) public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }
// 释放多个许可 public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }