Java教程

Java多线程分片数据处理

本文主要是介绍Java多线程分片数据处理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

java对于数据量较大的数据插入处理或者业务逻辑调用多个远程接口出现性能瓶颈,如何用多线程优化

示例一、对于插入百万级批量数据的处理

1、基于java jdk并发包的实现数据分片处理
//线程池的定义
    private static final int corePoolSize = Runtime.getRuntime().availableProcessors();
    private static final int maximumPoolSize = corePoolSize * 4 + 1;
    private static final ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
            3L, TimeUnit.HOURS,
            new ArrayBlockingQueue<>(10000), new DefaultThreadFactory("async-data-worker"),
            new ThreadPoolExecutor.CallerRunsPolicy());
 //业务数据分片
  List<T> datas= ...获取业务数据,若数据量过大,可以只获取id集分片,然后再查询整个model数据处理
              
  List<List<T>> partitions = Lists.partition(datas, 1000);

   List<CompletableFuture> futures = new ArrayList<>();
                    for (int i = 0; i < partitions.size(); i++) {
                        List<T> pDatas = partitions.get(i);
                        final int j =i;
                    futures.add(CompletableFuture.supplyAsync(() -> saveBatch(pDatas), executorService)
                            //不需要处理返回值不需要执行thenAccept方法
                            .thenAccept((s) -> results.add(j)));
                }
//                CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();//不需要后续处理直接join等待所有子任务执行完成
                CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).thenRun(() -> {
                    results.forEach(item->{
                            log.info("批量数据执行结果={}",item);
                    });
                }).get();

从log打印的结果来看,整个返回的结果数据集是无序的

2、基于Google 的Guava的并发包实现数据分片处理

//业务数据分片
                List<T> datas= ...获取业务数据,若数据量过大,可以只获取id集分片,然后再查询整个model数据处理

                List<List<T>> partitions = Lists.partition(datas, 1000);
                // 定义监听执行服务
                ListeningExecutorService listeningExecutorService =
                        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
                List<ListenableFuture<Integer>> listenableFutures = new ArrayList<>();

                for (int i = 0; i < partitions.size(); i++) {
                    List<T> pDatas= partitions.get(i);
                    final int j =i;
                    listenableFutures.add(listeningExecutorService.submit(()->{
                        saveBatch(pDatas);
                        return j;
                    }));
                }
                ListenableFuture<List<Integer>> listListenableFuture = Futures.allAsList(
                        Lists.newArrayList(listenableFutures));
                List<Integer> lists = listListenableFuture.get();
                lists.forEach(item->{
                    System.out.println(item);
                });

Guava的实现是要简洁一些的,从log打印的结果来看,整个返回的结果数据集是有序的

多接口并行调用用多线程优化也是同理

这篇关于Java多线程分片数据处理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!