从这一篇博客开始,开始总结线程间协作,并发流程控制的工具类,这一篇主要介绍Semaphore和Condition
Semaphore中文译文为信号量,操作系统中也有同样的概念。类似于生活中常见的许可证的概念。在执行指定业务逻辑之前,需要先获取相关的许可证。在限流的使用场景中也有Semaphore的影子。
在Java中,信号量的作用是维护一个"许可证"的计数,线程可以获取许可证,获取成功信号量的个数就减一,同时线程也可以释放一个许可证,释放一个信号量就加一,当信号量所拥有的许可证数量为0,那么其他想要获取许可证的线程就需要等待,直到有另外的线程释放了许可证。
在初始化Semaphore的时候,需要初始化信号量个数,同时也可以设置对应的公平策略,如果指定为公平,那么Semaphore会把之前的等待线程放入到自己维护的一个FIFO队列中,信号量的分发会根据在FIFO队列中等待的时长来进行。
通过Semaphore获取信号量的方法有多个,大体上分为以下三类
方法 | 作用 |
---|---|
acquire() acquire(int permits) | 获取信号量的时候阻塞,同时响应中断。这里的permits参数指的是想要获取信号量的个数 |
acquireUninterruptibly() acquireUninterruptibly(int permits) | 在获取信号量的时候不响应中断 |
tryAcquire() tryAcquire(int permits) tryAcquire(int permits, long timeout, TimeUnit unit) tryAcquire(long timeout, TimeUnit unit) | 尝试获取信号量,如果没有则不阻塞,可以指定获取信号量的时间 |
如果执行完逻辑,可以调用release来释放信号量
简单实例
/** * autor:liman * createtime:2021/11/28 * comment:多个线程等待许可证 */ public class SemaphoreDemo01 { static Semaphore semaphore = new Semaphore(3, true);//3个许可证,公平模式 public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(50); for (int i = 0; i < 100; i++) { executorService.submit(new Task()); } //判断线程池是否停止 executorService.shutdown(); while(!executorService.isTerminated()){ //线程池没有执行完成任务,主线程就空转,直到线程池运行结束 } } static class Task implements Runnable { @Override public void run() { try { semaphore.acquire(); //获取多个信号量 //semaphore.acquire(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "拿到了许可证"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); //释放多个信号量 //semaphore.release(3); System.out.println(Thread.currentThread().getName() + "释放了许可证"); } } }
信号量的获取个数可以灵活指定,这在一定程度上可以根据线程耗费资源的程度进行侧重,比如有两个任务,一个是TaskA ,一个是TaskB,TaskA的执行需要耗费较多的资源,TaskB需要耗费较少的资源,我们可以定义3个信号量。执行TaskA需要获取全部信号量才能执行,而执行TaskB只需要获取一个信号量,这样TaskA和TaskB就永远不可能同时执行。
信号量需要注意的是,在获取和释放的时候,数量必须要保证一致,每次获取多少信号量就要释放多少信号量,如果某个线程获取了2个信号量,但是释放的时候,只释放了1个,随着时间的推移,总会有一个时间点,其他线程的信号量不够用了,这样会导致程序卡死。
信号量的获取和释放,可以不在同一个线程中执行,也许是线程A获取了3个信号量,线程B释放了3个信号量,这也是可行的,但要保证逻辑合理。
Condition是一个接口,其通过ReentrantLock创建,其作用非常类似于Object中的wait和notify的组合,在Conditon中变成了await和signal。关于wait和notify这两者可以参考之前的博客Thread和Object中的方法。
简单基本实例
/** * autor:liman * createtime:2021/11/28 * comment:普通的condition用法实例 */ @Slf4j public class ConditionDemo { private ReentrantLock lock = new ReentrantLock(); //condition由锁来实例化 private Condition condition = lock.newCondition(); public void method01(){ lock.lock(); try{ System.out.println("条件不满足,进入await"); //这里 condition.await(); System.out.println("条件满足,开始继续执行"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void method02(){ lock.lock(); try{ System.out.println("准备工作完成,唤醒其他线程"); condition.signal(); }finally { lock.unlock(); } } public static void main(String[] args) { ConditionDemo conditionDemo = new ConditionDemo(); new Thread(()->{ try { Thread.sleep(1000); conditionDemo.method02();//唤醒在condition上等待的线程 } catch (InterruptedException e) { e.printStackTrace(); } }).start(); //主线程调用method01先行阻塞,在1秒之后,被上面新建的线程唤醒 conditionDemo.method01(); } }
与notify相对应的,Condition也存在signal和signalAll两个方法,其中signalAll会唤醒Condition上等待的所有线程,而signal是公平的,只会唤醒等待时间最长的线程。
利用Condition实现的生产者消费者模式
/** * autor:liman * createtime:2021/11/28 * comment:condition实现生产者和消费者 */ @Slf4j public class ConditionProducerAndCustomer { private int queueSize = 10; private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize); private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); class Customer extends Thread{ @Override public void run() { consume(); } private void consume(){ while(true){//消费者一直消费 lock.lock(); try{ while(queue.size() == 0){ System.out.println("队列为空,消费者进入等待"); try { notEmpty.await();//用notEmpty进行等待 } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); notFull.signal();//取出数据,同志爱生产者继续生产 System.out.println("从队列取走了一个数据,队列剩余"+queue.size()+"个数据"); }finally { lock.unlock(); } } } } class Producer extends Thread{ @Override public void run() { produce(); } private void produce(){ while(true){//生产者一直生产 lock.lock(); try{ while(queue.size() == queueSize){ System.out.println("队列已满,生产者进入等待"); try { notFull.await();//用 notFull 进行等待 } catch (InterruptedException e) { e.printStackTrace(); } } queue.offer(new Random().nextInt(100)); notEmpty.signalAll();//取出数据,同志爱生产者继续生产 System.out.println("生产者向消费者插入了一个数据,队列剩余"+queue.size()+"个数据"); }finally { lock.unlock(); } } } } public static void main(String[] args) { ConditionProducerAndCustomer conditionProducerAndCustomer = new ConditionProducerAndCustomer(); Producer producer = conditionProducerAndCustomer.new Producer(); Customer customer = conditionProducerAndCustomer.new Customer(); customer.start(); producer.start(); } }
如果说Lock是用来替代synchronized关键字的,那么Condition其实可以理解为用来替代Object.wati/notify的,在用法和性质上,二者几乎没有区别。
await方法会释放lock的锁,和Object.wait一样,不需要自己手动先行释放
调用await的时候,也必须要持有lock锁,否则会出现异常(虽然编译的时候不会出现异常)
简单总结了一下Semaphore和Condition,下一篇总结一下CountDownLatch和CyclicBarrier