由于工作中涉及大量的后台数据处理任务,于是开发了一套基于redis和kafka的多线程任务处理组件,两者的应用场景差不多:
1、ETL工具(kettle)将需要处理的数据抽取到redis/kafka;
2、后台Job基于redis/kafaka的数据拉取任务队列,然后多线程执行;
注:Job的计算频率大概是30分钟一轮,每一轮大约3万条目标数据,每一条目标数据有多重计算任务(5-10个),不同计算任务的耗时在10秒-5分钟不等,不同数据之间的计算任务没有依赖关系和先后约束。
基于上述需求,多线程任务处理组件,需要实现如下几个核心功能:
1、任务及任务的数据源可配置
2、任务执行线程数可以配置
实现的过程摘要如下,仅记录思路:
##任务配置-支持不同任务定义不同数据源、不同任务不同的并发数量 redis: enable: true max-thread: 4 queue-consumers: - consumer-name: expectCalculateQueueConsumer queue-name: sdc-vessel-expect-cal enable: true batch-size: 24 max-thread: 24 - consumer-name: vesselEfficiencyQueueConsumer queue-name: sdc-vessel-efficiency-cal enable: false batch-size: 200 max-thread: 4 |
启动类
@Component @Order(11) @Slf4j public class RedisServiceExecutor implements ApplicationRunner, DisposableBean { @Autowired private SdcRedisProperties sdcRedisProperties; @Override public void run(ApplicationArguments args) throws Exception { if (Boolean.TRUE.equals(sdcRedisProperties.getEnable()) && !CollectionUtils.isEmpty(sdcRedisProperties.getQueueConsumers())) { for (SdcRedisQueueConsumerProperties redisConsumers : sdcRedisProperties.getQueueConsumers()) { if (Boolean.TRUE.equals(redisConsumers.getEnable())) { startRedisQueueConsumer(redisConsumers, sdcRedisProperties); } } } } private void startRedisQueueConsumer(SdcRedisQueueConsumerProperties redisConsumers, SdcRedisProperties sdcRedisProperties) { QueueConsumer queueConsumer = ApplicationContextUtil.getBean(redisConsumers.getConsumerName(), QueueConsumer.class); if (queueConsumer != null) { queueConsumer.initConsumer(redisConsumers, sdcRedisProperties); queueConsumer.startConsume(); } } @Override public void destroy() throws Exception { if (Boolean.TRUE.equals(sdcRedisProperties.getEnable()) && !CollectionUtils.isEmpty(sdcRedisProperties.getQueueConsumers())) { for (SdcRedisQueueConsumerProperties redisConsumers : sdcRedisProperties.getQueueConsumers()) { if (Boolean.TRUE.equals(redisConsumers.getEnable())) { destroyConsumer(redisConsumers, sdcRedisProperties); } } } } private void destroyConsumer(SdcRedisQueueConsumerProperties redisQueueConsumerProperties, SdcRedisProperties sdcRedisProperties) { QueueConsumer queueConsumer = ApplicationContextUtil.getBean(redisQueueConsumerProperties.getConsumerName(), QueueConsumer.class); if (queueConsumer != null) { queueConsumer.destroy(redisQueueConsumerProperties, sdcRedisProperties); } } }
QueueConsumer定义
public interface QueueConsumer<T> { Integer getBatchSize(); Integer getMaxThread(); RedisTemplate getRedisTemplate(); void initConsumer(SdcRedisQueueConsumerProperties serviceProperties, SdcRedisProperties sdcRedisProperties); void startConsume(); void destroy(SdcRedisQueueConsumerProperties redisQueueConsumerProperties, SdcRedisProperties sdcRedisProperties); }
QueueConsumer的抽象实现
public abstract class AbstractQueueConsumer<T> implements QueueConsumer<T> { private static final Log logger = LogFactory.getLog(AbstractQueueConsumer.class); protected SdcRedisProperties sdcRedisProperties; protected SdcRedisQueueConsumerProperties sdcRedisQueueConsumerProperties; public static final Integer QUEUE_CAPACITY = 128; protected ThreadPoolExecutor consumerTaskExecutor; @Override public void initConsumer(SdcRedisQueueConsumerProperties serviceProperties, SdcRedisProperties sdcRedisProperties) { this.sdcRedisProperties = sdcRedisProperties; this.sdcRedisQueueConsumerProperties = serviceProperties; getThreadPool(); } @Override public void startConsume() { Thread puller = new Thread(new Runnable() { @Override public void run() { ThreadPoolExecutor threadPoolTaskExecutor = getThreadPool(); while (true) { try { if (!workerQueueFull(threadPoolTaskExecutor)) { List<T> lstData = batchPopByPipeline(sdcRedisQueueConsumerProperties.getQueueName()); if (!CollectionUtils.isEmpty(lstData)) { lstData = cleanBeforeConsume(lstData); consumeRecords(lstData, threadPoolTaskExecutor); } else { logger.info("no record pulled from queue " + sdcRedisQueueConsumerProperties.getQueueName() + ", sleep 30 seconds"); threadSleep(30000); } } else { threadSleep(3000); logger.info(sdcRedisQueueConsumerProperties.getConsumerName() + " Wait data pulling on full executor pool for 3 seconds"); } } catch (Exception e) { logger.error(ExceptionUtils.getStackTrace(e)); } finally { getThreadPool().purge(); } } } }); puller.start(); } protected boolean workerQueueFull(ThreadPoolExecutor threadPoolTaskExecutor) { int queueSize = threadPoolTaskExecutor.getQueue().size(); logger.info(sdcRedisQueueConsumerProperties.getConsumerName() + " Task Queue Size: " + queueSize + ", Worker Pool Size: " + threadPoolTaskExecutor.getPoolSize()); return queueSize >= QUEUE_CAPACITY/* || threadPoolTaskExecutor.getPoolSize() >= getMaxThread()*/; } private List<T> cleanBeforeConsume(List<T> lstData) { List<T> toConsume = null; if (!CollectionUtils.isEmpty(lstData)) { toConsume = new ArrayList<>(); for (T datum : lstData) { if (datum != null && !toConsume.contains(datum)) { toConsume.add(datum); } } } return toConsume; } private List<T> batchPopByPipeline(String queueName) { Long size = getRedisTemplate().opsForList().size(queueName); Long fetchSize = Math.min(size, getBatchSize()); if (fetchSize > 0L) { return getRedisTemplate().executePipelined(new RedisCallback<T>() { @Override public T doInRedis(RedisConnection connection) throws DataAccessException { for (int i = 0; i < fetchSize; i++) { connection.lPop(queueName.getBytes()); } return null; } }); } else { return null; } } protected void consumeRecords(List<T> records, ThreadPoolExecutor threadPoolTaskExecutor) { if (!CollectionUtils.isEmpty(records)) { logger.info("-------------- records pulled from redis queue: " + StringUtils.join(records)); if (getMaxThread() == null || getMaxThread() <= 1) { try { if (getQueueService().getTimeoutOfSingleTaskInSecond() > 0) { TimedExecutor.timedInvoke(getQueueService(), "execute", new Class[]{List.class}, new Object[]{records}, getQueueService().getTimeoutOfSingleTaskInSecond(), TimeUnit.SECONDS); } else { getQueueService().execute(records); } } catch (Exception e) { logger.warn(String.format("Queue consumer unexpect finished for [%s]: ", StringUtil.join(records, ",")) + ExceptionUtils.getStackTrace(e)); } } else { int taskNum = records.size(); int workerNum = getMaxThread(); if (taskNum <= workerNum) { for (int i = 0; i < Math.min(taskNum, workerNum); i++) { while (workerQueueFull(threadPoolTaskExecutor)) { threadSleep(3000); logger.warn(sdcRedisQueueConsumerProperties.getConsumerName() + " Wait task submitting on full executor pool for 3 seconds"); } QueueConsumeTask consumeTask = new QueueConsumeTask<T>(records.subList(i, i + 1), getQueueService()); consumerTaskExecutor.submit(consumeTask); } } else { int batchSize = taskNum / workerNum; if (taskNum % workerNum != 0) { batchSize = batchSize + 1; } for (int i = 0; i < Math.min(workerNum, taskNum / batchSize + 1); i++) { int endIndex = Math.min(i * batchSize + batchSize, taskNum); while (workerQueueFull(threadPoolTaskExecutor)) { threadSleep(3000); logger.warn(sdcRedisQueueConsumerProperties.getConsumerName() + " Wait submitting on full pool for 3 seconds"); } QueueConsumeTask consumeTask = new QueueConsumeTask<T>(records.subList(i * batchSize, endIndex), getQueueService()); consumerTaskExecutor.submit(consumeTask); } } } } } private void threadSleep(int ms) { try { Thread.sleep(ms); } catch (InterruptedException e) { logger.error(ExceptionUtils.getStackTrace(e)); } } @Override public void destroy(SdcRedisQueueConsumerProperties redisQueueConsumerProperties, SdcRedisProperties sdcRedisProperties) { getThreadPool().shutdown(); } protected abstract RedisQueueService getQueueService(); public synchronized ThreadPoolExecutor getThreadPool() { if (consumerTaskExecutor == null) { initTaskExecutor(); } return consumerTaskExecutor; } public void initTaskExecutor() { consumerTaskExecutor = new ThreadPoolExecutor(getMaxThread(), getMaxThread(), 1, TimeUnit.SECONDS, new LinkedBlockingDeque<>(QUEUE_CAPACITY), new ThreadPoolExecutor.DiscardPolicy()); consumerTaskExecutor.prestartAllCoreThreads(); consumerTaskExecutor.allowCoreThreadTimeOut(true); } public Integer getMaxThread() { Integer max = this.sdcRedisQueueConsumerProperties.getMaxThread() != null ? this.sdcRedisQueueConsumerProperties.getMaxThread() : this.sdcRedisProperties.getMaxThread(); return max != null ? max : 1; } @Override public Integer getBatchSize() { return this.sdcRedisQueueConsumerProperties.getBatchSize(); } }
抽象实现中几个设计点:
1、调度线程:专门启动一个调度线程拉取redis的任务数据,再交由线程池处理;
2、线程池定义:corePool和maxPool设置一致,通过一个固定大小的queue存储任务,queue慢了不拉取,也不submit任务,避免数据拉下来后异常停机丢失;
3、支持批量拉取并基于不同的并发分配
QueueConsumer实现的核心思想是,线程执行需要在可设置时长内返回,不能因为redisQueueService的执行时长阻塞workQueue的轮训。
public class QueueConsumeTask<T> implements Callable<Long> { private static final Log logger = LogFactory.getLog(QueueConsumeTask.class); private final List<T> records; private final RedisQueueService redisQueueService; public QueueConsumeTask(List<T> records, RedisQueueService redisQueueService) { this.records = records; this.redisQueueService = redisQueueService; } @Override public Long call() { Long ret = 0L; try { if (redisQueueService.getTimeoutOfSingleTaskInSecond() > 0) { ret = (Long) TimedExecutor.timedInvoke(redisQueueService, "execute", new Class[]{List.class}, new Object[]{records}, redisQueueService.getTimeoutOfSingleTaskInSecond(), TimeUnit.SECONDS); } else { ret = redisQueueService.execute(records); } } catch (Exception e) { logger.warn(String.format("Queue consumer unexpect finished for [%s]: ", StringUtil.join(records, ",")) + ExceptionUtils.getStackTrace(e)); } return ret; } }
基于时长可控的原则,定义了TimedExecutor
public class TimedExecutor { private static final Log logger = LogFactory.getLog(TimedExecutor.class); public static Object timedInvoke(final Object target, final String methodName, final Class<?>[] parameterTypes, final Object[] params, long timeout, TimeUnit timeUnit) throws TimeoutException { Object ret = null; ExecutorService executorService = Executors.newSingleThreadExecutor(); FutureTask<?> futureTask = new FutureTask<>(new Callable<Object>() { public Object call() throws Exception { try { Method method = target.getClass().getDeclaredMethod(methodName, parameterTypes); return method.invoke(target, params); } catch (Exception e) { throw e; } } }); try { executorService.execute(futureTask); ret = futureTask.get(timeout, timeUnit); } catch (TimeoutException e) { logger.warn("timedInvoke timeout, try to cancel future"); futureTask.cancel(true); throw e; } catch (Exception e) { logger.warn("timedInvoke: " + ExceptionUtils.getStackTrace(e)); throw new RuntimeException(e); } finally { executorService.shutdownNow(); } return ret; } }
RedisQueueService则是具体的业务逻辑
public interface RedisQueueService<T> { Long execute(List<T> records); default long getTimeoutOfSingleTaskInSecond() { return 0L; } }
实现实例:
@Service @Slf4j public class VesselExpectQueueService implements RedisQueueService<Integer> { @Autowired DtVesselEstimateArrivalCalcService dtVesselEstimateArrivalCalcService; @Override public Long execute(List<Integer> records) { if (records != null) { for (Integer mmsi : records) { dtVesselEstimateArrivalCalcService.calcExpectArrAutoByJob(mmsi); } return (long) records.size(); } return 0L; } @Override public long getTimeoutOfSingleTaskInSecond() { return 60; } }