在高并发的场景下,出于对系统的保护会对流量进行限制。
提到限流器的实现方式,很容易可以想到信号量是与之类似的原理,都是允许一定数量的线程访问临界区,具体实现代码如下所示,同一时刻只允许两个线程访问临界区域,其它线程等待实现限流目的。
// 定义信号量对象 指定允许同时访问临界区的线程数 Semaphore semaphore = new Semaphore(2); ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i <20 ; i++) { int finalI = i; executorService.submit(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "===进入===" + finalI); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + "===结束===" + finalI); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }); }
但是信号量的实现还需要类似互斥锁的加锁解锁操作,能不能进一步优化呢?
Guava 是谷歌开源的工具类库,它提供了RateLimiter作为限流工具类,使用前需要引入它的依赖如下。
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>31.1-jre</version> </dependency>
简单使用如下
// 定义限流器流速:2个请求/秒 RateLimiter limiter = RateLimiter.create(2.0); // 执行任务的线程池 ExecutorService es = Executors.newFixedThreadPool(1); // 记录上一次执行时间 AtomicLong prev = new AtomicLong(System.nanoTime()); // 测试执行20次 for (int i = 0; i < 20; i++) { //限流器限流 limiter.acquire(); //提交任务异步执行 es.execute(() -> { long cur = System.nanoTime(); //打印时间间隔:毫秒 System.out.println((cur - prev.get()) / 1000_000); prev.set(cur); }); }
请求结果如下,可以很明显的看到大多数请求维持在500毫秒左右,可以得出第一个结论限流的另外一个体现应该是均速,比如限流器定义流速为2,那么就是每秒两个请求,500毫秒就是一个请求,符合请求结果。
Guava 在使用上还是比较简单,那么Guava限流的原理是什么呢?其实就是我们熟知的令牌桶算法,以一定速率向桶中放入令牌,想要通过限流器的线程需要拿到令牌桶中的令牌才让放行,也就是说我们只要限制令牌的放入速度,就能控制流速,达到限流的目的,详细描述如下:
我们可以采用生产者消费者模式,其中生产者以一定频率向任务队列中添加令牌,而试图通过限流器的线程作为消费者只有从任务队列中取出令牌才能通过限流器,基于这个原理,演示代码如下。
// 自定义限流器 public class CustomRateLimiter { // 任务队列,任务队列的数量就代表限流器允许通过的最大线程数 private static BlockingQueue blockingQueue = new ArrayBlockingQueue(3); static { try { blockingQueue.put(new Object()); blockingQueue.put(new Object()); blockingQueue.put(new Object()); } catch (InterruptedException e) { e.printStackTrace(); } } // 生产者 public void product(){ ScheduledExecutorService ses = Executors.newScheduledThreadPool(3); // scheduleAtFixedRate 以固定的频率 上一个任务开始的时间计时,一个period后,检测上一个任务是否执行完毕, // 如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行 // scheduleWithFixedDelay 以固定的延时 ,delay(延时)指的是一次执行终止和下一次执行开始之间的延迟 ses.scheduleWithFixedDelay(()->{ try { blockingQueue.put(new Object()); } catch (InterruptedException e) { e.printStackTrace(); } },4,4, TimeUnit.SECONDS); } // 限流器 public void acquire(){ try { // 阻塞获取 blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }
测试代码如下
class Test3{ public static void main(String[] args) { CustomRateLimiter customRateLimiter = new CustomRateLimiter(); // 调用生产者 customRateLimiter.product(); ExecutorService es = Executors.newFixedThreadPool(10); //记录上一次执行时间 AtomicLong prev = new AtomicLong(System.nanoTime()); // 去除random方法,和static中的初始化逻辑 时间会稳定在2秒左右 匀速 for (int i = 0; i <20 ; i++) { random(); customRateLimiter.acquire(); es.execute(()->{ long cur = System.nanoTime(); // 打印时间间隔:毫秒 System.out.println(Thread.currentThread().getName()+"==="+(cur - prev.get()) / 1000_000); prev.set(cur); }); } } public static void random(){ Random random = new Random(); int i = random.nextInt(5); try { System.out.println(Thread.currentThread().getName()+"==睡眠了=="+i); TimeUnit.SECONDS.sleep(i); } catch (InterruptedException e) { e.printStackTrace(); } } }
去除random方法和static中的初始化逻辑线程池的访问都保持在两秒以内,如下所示效果
当加上random方法和static中的初始化逻辑,就能明显看到限流器的效果,如下所示
用生产者消费者实现都是很完美的写法,那么为什么Guava,不是采用这个实现的呢?原因很简单因为在高并发下机器的CPU基本上是忙碌状态,这时定时任务去占用CPU大概率会抢占不到,就又可能导致定时任务延时。
那Guava又是如何处理的呢?由于篇幅问题下期再聊。