Java教程

Java多线程批量处理、线程池的使用

本文主要是介绍Java多线程批量处理、线程池的使用,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1、引言

在开发中,有时会遇到批量处理的业务。如果单线程处理,速度会非常慢,可能会导致上游超时。这是就需要使用多线程开发。

创建线程时,应当使用线程池。一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

可以使用J.U.C提供的线程池:ThreadPoolExecutor类。在Spring框架中,也可以使用ThreadPoolTaskExecutor类。ThreadPoolTaskExecutor其实是对ThreadPoolExecutor的一种封装。

2、使用ThreadPoolExecutor类

假设现有业务,输入Input类,输出Output类:

@Data
@AllArgsConstructor
public class Input {
    int i;
}

@Data
@AllArgsConstructor
public class Output {
    boolean success;
    String s;
}

这里@Data与@AllArgsConstrutor使用了Lombok工具

处理方法:

public Output singleProcess(Input input) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
        return new Output(false, null);
    }
    return new Output(true, String.valueOf(2 * input.getI() + 1))
}

 

现在该业务需要批量处理,输入List<Input>,输出List<Output>。那么可以创建一个核心线程数为4的线程池,每个线程把执行结果添加到线程安全的List中。这里List应当使用SynchronizedList而不是CopyOnWriteArrayList,因为这里写操作有多次,而读操作只有一次。并使用CountDownLatch等待所有线程执行结束:

public List<Output> multiProcess(List<Input> inputList) {
    ExecutorService executorService = Executors.newFixedThreadPool(4);
    CountDownLatch countDownLatch = new CountDownLatch(inputList.size());
    List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size()));

    for (Input input : inputList) {
        executorService.submit(() -> {
            try {
                // 单个处理
                Output output = singleProcess(input);
                outputList.add(ouput);
            } catch (Exception e) {
                // 处理异常
            } finally {
                countDownLatch.countDown();
            }
        })
    }

    // 等待所有线程执行完成
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    return outputList;
}

 

但是这样还是有很大的问题:

  1. 阿里巴巴开发手册不建议我们使用Executors创建线程池,因为Executors.newFixedThreadPool方法没有限制线程队列的容量,如果input数量过多,可能导致OOM。
  2. multiProcess不适合被多次调用,不适合用在大多数业务场景。

3、在Spring框架中使用ThreadPoolTaskExecutor类

为了应对大多数业务场景,配合Spring Boot框架,我们可以使用ThreadPoolTaskExecutor创建线程池,并把它注入到ioc容器中,全局都可以使用。

首先,配置线程池参数

@Data
@Component
@ConfigurationProperties(prefix = "thread-pool")
public class ThreadPoolProperties {
    private int corePoolSize;
    private int maxPoolSize;
    private int queueCapacity;
    private int keepAliveSeconds;
}

  

在配置文件application.yml中

thread-pool:
  core-pool-size: 4
  max-pool-size: 16
  queue-capacity: 80
  keep-alive-seconds: 120

 

这里线程池各参数的意义可以参考Java线程池实现原理及其在美团业务中的实践

其次,将ThreadPoolTaskExecutor加入至ioc容器中

  

@EnableAsync
@Configuration
public class ThreadPoolConfig {
    private final ThreadPoolProperties threadPoolProperties;

    @Autowired
    public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) {
        this.threadPoolProperties = threadPoolProperties;
    }

    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
        executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
        executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
        executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
        executor.setThreadNamePrefix("thread-pool-");
        return executor;
    }
}

 

这里@EnableAsync是与@Async配合使用,用于执行异步任务,后面会给出示例

最后,在业务类中通过自定义SpringUtils类获取bean或使用@Async,来使用线程池。

/**
 * 业务实现类
 */
@Service
@Slf4j
public class Input2OutputServiceImpl implements Input2OutputService {
    /**
     * 单个处理
     * @param input 输入对象
     * @return 输出对象
     */
    @Override
    public Output singleProcess(Input input) {
        log.info("Processing...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return new Output(false, null);
        }
        return new Output(true, String.valueOf(2 * input.getI() + 1));
    }

    /**
     * 批量处理
     * @param inputList 输入对象列表
     * @return 输出对象列表
     */
    @Override
    public List<Output> multiProcess(List<Input> inputList) {
        ThreadPoolTaskExecutor executor
                = SpringUtils.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class);
        CountDownLatch latch = new CountDownLatch(inputList.size());
        List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size()));

        for (Input input : inputList) {
            executor.execute(() -> {
                try {
                    Output output = singleProcess(input);
                    outputList.add(output);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            });
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return outputList;
    }

    /**
     * 异步处理
     * @param input 输入对象
     * @return 输出Future对象
     */
    @Async("threadPoolTaskExecutor")
    @Override
    public Future<Output> asyncProcess(Input input) {
        return new AsyncResult<>(singleProcess(input));
    }
}

 

以上代码的完整代码包括测试代码在笔者的GitHub项目thread-pool-demo,在项目中用到ThreadPoolTaskExecutor可参考。

这篇关于Java多线程批量处理、线程池的使用的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!