现实业务开发中,通常为了避免超时、对方接口限制等原因需要对支持批量的接口的数据分批调用。
比如List参数的size可能为 几十个甚至上百个,但是假如对方dubbo接口比较慢,传入50个以上会超时,那么可以每次传入20个,分批执行。
通常很多人会写 for 循环或者 while 循环,非常不优雅,无法复用,而且容易出错。
下面结合 Java8 的 Stream ,Function ,Consumer 等特性实现分批调用的工具类封装和自测。
并给出 CompletableFuture 的异步改进方案。
工具类:
package com.chujianyun.common.java8.function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.collections4.CollectionUtils; import java.util.*; import java.util.function.Consumer; import java.util.function.Function; /** * 执行工具类 * * @author 明明如月 */ public class ExecuteUtil { public static <T> void partitionRun(List<T> dataList, int size, Consumer<List<T>> consumer) { if (CollectionUtils.isEmpty(dataList)) { return; } Preconditions.checkArgument(size > 0, "size must not be a minus"); Lists.partition(dataList, size).forEach(consumer); } public static <T, V> List<V> partitionCall2List(List<T> dataList, int size, Function<List<T>, List<V>> function) { if (CollectionUtils.isEmpty(dataList)) { return new ArrayList<>(0); } Preconditions.checkArgument(size > 0, "size must not be a minus"); return Lists.partition(dataList, size) .stream() .map(function) .filter(Objects::nonNull) .reduce(new ArrayList<>(), (resultList1, resultList2) -> { resultList1.addAll(resultList2); return resultList1; }); } public static <T, V> Map<T, V> partitionCall2Map(List<T> dataList, int size, Function<List<T>, Map<T, V>> function) { if (CollectionUtils.isEmpty(dataList)) { return new HashMap<>(0); } Preconditions.checkArgument(size > 0, "size must not be a minus"); return Lists.partition(dataList, size) .stream() .map(function) .filter(Objects::nonNull) .reduce(new HashMap<>(), (resultMap1, resultMap2) -> { resultMap1.putAll(resultMap2); return resultMap1; }); } }
待调用的服务(模拟)
package com.chujianyun.common.java8.function; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class SomeManager { public void aRun(Long id, List<String> data) { } public List<Integer> aListMethod(Long id, List<String> data) { return new ArrayList<>(0); } public Map<String, Integer> aMapMethod(Long id, List<String> data) { return new HashMap<>(0); } }
单元测试:
package com.chujianyun.common.java8.function; import org.apache.commons.lang3.RandomUtils; import org.jeasy.random.EasyRandom; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.internal.verification.Times; import org.powermock.api.mockito.PowerMockito; import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @RunWith(PowerMockRunner.class) public class ExecuteUtilTest { private EasyRandom easyRandom = new EasyRandom(); @Mock private SomeManager someManager; // 测试数据 private List<String> mockDataList; private int total = 30; @Before public void init() { // 构造30条数据 mockDataList = easyRandom.objects(String.class, 30).collect(Collectors.toList()); } @Test public void test_a_run_partition() { // mock aRun PowerMockito.doNothing().when(someManager).aRun(anyLong(), any()); // 每批 10 个 ExecuteUtil.partitionRun(mockDataList, 10, (eachList) -> someManager.aRun(1L, eachList)); //验证执行了 3 次 Mockito.verify(someManager, new Times(3)).aRun(anyLong(), any()); } @Test public void test_call_return_list_partition() { // mock 每次调用返回条数(注意每次调用都是这2个) int eachReturnSize = 2; PowerMockito .doReturn(easyRandom.objects(String.class, eachReturnSize).collect(Collectors.toList())) .when(someManager) .aListMethod(anyLong(), any()); // 分批执行 int size = 4; List<Integer> resultList = ExecuteUtil.partitionCall2List(mockDataList, size, (eachList) -> someManager.aListMethod(2L, eachList)); //验证执行次数 int invocations = 8; Mockito.verify(someManager, new Times(invocations)).aListMethod(anyLong(), any()); // 正好几轮 int turns; if (total % size == 0) { turns = total / size; } else { turns = total / size + 1; } Assert.assertEquals(turns * eachReturnSize, resultList.size()); } @Test public void test_call_return_map_partition() { // mock 每次调用返回条数 // 注意: // 如果仅调用doReturn一次,那么每次返回都是key相同的Map, // 如果需要不覆盖,则doReturn次数和 invocations 相同) int eachReturnSize = 3; PowerMockito .doReturn(mockMap(eachReturnSize)) .doReturn(mockMap(eachReturnSize)) .when(someManager).aMapMethod(anyLong(), any()); // 每批 int size = 16; Map<String, Integer> resultMap = ExecuteUtil.partitionCall2Map(mockDataList, size, (eachList) -> someManager.aMapMethod(2L, eachList)); //验证执行次数 int invocations = 2; Mockito.verify(someManager, new Times(invocations)).aMapMethod(anyLong(), any()); // 正好几轮 int turns; if (total % size == 0) { turns = total / size; } else { turns = total / size + 1; } Assert.assertEquals(turns * eachReturnSize, resultMap.size()); } private Map<String, Integer> mockMap(int size) { Map<String, Integer> result = new HashMap<>(size); for (int i = 0; i < size; i++) { // 极力保证key不重复 result.put(easyRandom.nextObject(String.class) + RandomUtils.nextInt(), easyRandom.nextInt()); } return result; } }
注意:
1 判空
.filter(Objects::nonNull)
这里非常重要,避免又一次调用返回 null,而导致空指针异常。
2 实际使用时可以结合apollo配置, 灵活设置每批执行的数量,如果超时随时调整
3 用到的类库
集合工具类: commons-collections4、guava (可以不用)
这里的list划分子list也可以使用stream的 skip ,limit特性自己去做,集合判空也可以不借助collectionutils.
构造数据:easy-random
单元测试框架: Junit4 、 powermockito、mockito
4 大家可以加一些更强大的功能,如允许设置每次调用的时间间隔、并行或并发调用等。
以上面的List接口为例,将其改为异步版本:
public static <T, V> List<V> partitionCall2ListAsync(List<T> dataList, int size, ExecutorService executorService, Function<List<T>, List<V>> function) { if (CollectionUtils.isEmpty(dataList)) { return new ArrayList<>(0); } Preconditions.checkArgument(size > 0, "size must not be a minus"); List<CompletableFuture<List<V>>> completableFutures = Lists.partition(dataList, size) .stream() .map(eachList -> { if (executorService == null) { return CompletableFuture.supplyAsync(() -> function.apply(eachList)); } else { return CompletableFuture.supplyAsync(() -> function.apply(eachList), executorService); } }) .collect(Collectors.toList()); CompletableFuture<Void> allFinished = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])); try { allFinished.get(); } catch (Exception e) { throw new RuntimeException(e); } return completableFutures.stream() .map(CompletableFuture::join) .filter(CollectionUtils::isNotEmpty) .reduce(new ArrayList<V>(), ((list1, list2) -> { List<V> resultList = new ArrayList<>(); if(CollectionUtils.isNotEmpty(list1)){ resultList.addAll(list1); } if(CollectionUtils.isNotEmpty(list2)){ resultList.addAll(list2); } return resultList; })); }
测试代码:
// 测试数据 private List<String> mockDataList; private int total = 300; private AtomicInteger atomicInteger; @Before public void init() { // 构造total条数据 mockDataList = easyRandom.objects(String.class, total).collect(Collectors.toList()); } @Test public void test_call_return_list_partition_async() { ExecutorService executorService = Executors.newFixedThreadPool(10); atomicInteger = new AtomicInteger(0); Stopwatch stopwatch = Stopwatch.createStarted(); // 分批执行 int size = 2; List<Integer> resultList = ExecuteUtil.partitionCall2ListAsync(mockDataList, size, executorService, (eachList) -> someCall(2L, eachList)); Stopwatch stop = stopwatch.stop(); log.info("执行时间: {} 秒", stop.elapsed(TimeUnit.SECONDS)); Assert.assertEquals(total, resultList.size()); // 正好几轮 int turns; if (total % size == 0) { turns = total / size; } else { turns = total / size + 1; } log.info("共调用了{}次", turns); Assert.assertEquals(turns, atomicInteger.get()); // 顺序也一致 for(int i =0; i< mockDataList.size();i++){ Assert.assertEquals((Integer) mockDataList.get(i).length(), resultList.get(i)); } } /** * 模拟一次调用 */ private List<Integer> someCall(Long id, List<String> strList) { log.info("当前-->{},strList.size:{}", atomicInteger.incrementAndGet(), strList.size()); try { TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } return strList.stream() .map(String::length) .collect(Collectors.toList()); }
通过异步可以尽可能快得拿到执行结果。
1 要灵活运用Java 8 的 特性简化代码
2 要注意代码的封装来使代码更加优雅,复用性更强
3 要利用来构造单元测试的数据框架如 java-faker和easy-random来提高构造数据的效率
4 要了解性能改进的常见思路:合并请求、并发、并行、缓存等。