在互联网的高并发场景下,请求会非常多,但是数据库连接池比较少,或者说需要减少CPU压力,减少处理逻辑的,需要把单个查询,用某些手段,改为批量查询多个后返回。 如:支付宝中,查询“个人信息”,用户只会触发一次请求,查询自己的信息,但是多个人同时这样做就会产生多次数据库连接。为了减少连接,需要在JAVA服务端进行合并请求,把多个“个人信息”查询接口,合并为批量查询多个“个人信息”接口,然后以个人信息在数据库的id作为Key返回给上游系统或者页面URL等调用方。
在上一章已经说了第一二种,鉴于有同学没有SpringCloud,所以使用第3种来做请求合并,并一起分析请求合并的原理。
建议先看第一章,第二章相当于为HystrixCollapser的内部原理描述 高并发场景-请求合并(一)SpringCloud中Hystrix请求合并
本章节为利用JDK原生包开发,所以没有SpringCloud那么多东西要配置,编写代码只有一个类。
只需要暴露单个查询的接口,业务逻辑层里做请求合并的逻辑。
@RestController public class UserController { @Autowired private UserBatchWithFutureServiceImpl userBatchWithFutureServiceImpl; @RequestMapping(method = RequestMethod.GET,value = "/userbyMergeWithFuture/{id}") public User userbyMergeWithFuture(@PathVariable Long id) throws InterruptedException, ExecutionException { User user = this.userBatchWithFutureServiceImpl.getUserById(id); return user; } } 复制代码
@Component public class UserBatchWithFutureServiceImpl { /** 积攒请求的阻塞队列 */ private LinkedBlockingDeque<UserQueryDto> requestQueue = new LinkedBlockingDeque<>(); public User getUserById(Long id) throws InterruptedException, ExecutionException { UserQueryDto userQueryDto = new UserQueryDto(); userQueryDto.setId(id); CompletableFuture<User> completedFuture = new CompletableFuture<>(); userQueryDto.setCompletedFuture(completedFuture); requestQueue.add(userQueryDto); User user = completedFuture.get(); return user; } 复制代码
HystrixCollapser也是利用这种办法来做异步通知的手段,让请求接口主线程在获得真正结果前阻塞等待。
在相同的类下创建定时任务,利用@PostConstruct让当前类的Bean构造完后执行该方法,生成一个5秒定时任务。 大家可以设定定时的时间,我为了比较方便测试,而用了5秒。
/** 线程池数量 */ private int threadNum = 1; /** 定时间隔时长 */ private long period = 5000; @PostConstruct public void init() { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(createDeviceMergeNum); // 每5秒执行一次 scheduledExecutorService.scheduleAtFixedRate(new UserBatchThread(), 0, createDeviceMergePeriod, TimeUnit.MILLISECONDS); } 复制代码
HystrixCollapser的每隔n毫秒就会处理一次执行单个方法转批量方法,也是通过这类来实现的。
创建内部类为了定时任务执行此逻辑,并且为了代码整洁,不在创建线程池时编写大方法块的代码。
在内部类里面主要逻辑:
public class UserBatchThread implements Runnable { @Override public void run() { List<UserQueryDto> requestQueueTmp = new ArrayList<>(); // 存放批量查询的入参 List<Long> requestId = new ArrayList<>(); // 把出请求层放入的消息queue的元素取出来 int size = requestQueue.size(); for (int i = 0; i < size; i++) { UserQueryDto request = requestQueue.poll(); if (Objects.nonNull(request)) { requestQueueTmp.add(request); requestId.add(request.getId()); } } if (!requestId.isEmpty()) { try { List<User> response = getUserBatchById(requestId); Map<Long, User> collect = response.stream().collect( Collectors.toMap(detail -> detail.getId(), Function.identity(), (key1, key2) -> key2)); // 通知请求的线程 for (UserQueryDto request : requestQueueTmp) { request.getCompletedFuture().complete(collect.get(request.getId())); } } catch (Exception e) { // 通知请求的线程-异常 requestQueueTmp.forEach(request -> request.getCompletedFuture().obtrudeException(e)); } } } } public List<User> getUserBatchById(List<Long> ids) { System.out.println("进入批量处理方法" + ids); List<User> ps = new ArrayList<>(); for (Long id : ids) { User p = new User(); p.setId(id); p.setUsername("dizang" + id); ps.add(p); } return ps; } 复制代码
请求接口中入队列的元素,就会从这里取出,HystrixCollasper也是利用这种poll方法原子性的获取队列里面元素,不会被定时任务的多次触发而重复的获取,只要满足有至少一个都会做批量查询,所以HystrixCollasper合并请求时,即使n毫秒内只有一个请求,也会去处理。
到这里相信大家都已经完成了合并请求了。这次没有依赖框架,基于原生做法,利用队列存查询所需的入参,然后利用线程池定时地获取队列的入参,再批量处理,利用线程的Future做异步返回结果。这样我们就理解了SpringCloud的HystrixCollasper的内部流程了。希望能够帮助没有框架的项目,或者公司技术栈不合适的情况下的同学。
都在我springcloud的demo里面了,看provider-hystrix-request-merge这个工程下的内容,在UserBatchWithFutureServiceImpl类中。
我的公众号 :地藏思维
掘金:地藏Kelvin
简书:地藏Kelvin
我的Gitee: 地藏Kelvin gitee.com/kelvin-cai