Semaphore是信号量的意思,作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。
public class SemaphoreRunner { public static void main(String[] args) { //初始state = 4,总的容量池 Semaphore semaphore = new Semaphore(2); for (int i = 0; i < 10; i++) { new Thread(new Task(semaphore, "haxisaite"+i)).start(); } } static class Task extends Thread{ Semaphore semaphore; public Task(Semaphore semaphore, String tname) { super(tname); this.semaphore = semaphore; } public void run() { try { //获取公共资源。从容量池里拿出凭据,默认为permits=1个,即state-permits // semaphore.acquire(); if (semaphore.tryAcquire(500, TimeUnit.MILLISECONDS)) { System.out.println(Thread.currentThread().getName() + " :acquire() at time: " + System.currentTimeMillis()); Thread.sleep(5000); //释放公共资源。将凭据还回容量池,默认为permits=1个,即state+permits semaphore.release(); } else { fallBack(); } System.out.println(Thread.currentThread().getName() + " :release() at time: " + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } } public void fallBack() { //调用降级方法 System.out.println("降级!"); } } }
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public void release() { sync.releaseShared(1); } public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }