最近在设计一个项目,项目里面涉及到了任务创建和任务运行,这个就让我想到做一个单独的执行器服务。按照以往的经验,项目里的数据量也不会很高,那么任务的创建运行实际上单台机器就能应付,好像也没必要硬上分布式执行器吧。但是呢,虽然以往的经验如此,万一这个项目就“运气”好的爆表数据量很大呢,那后面改造就挺费事儿了,而且就算前期数据量一般,那么顶多也就是部署单节点咯。
平时上网划水也听说过xxl-job这个分布式任务调度平台,感觉也挺适合我这个项目的,所以就打算直接用这个了。clone了它的代码,然后按照教程学习了操作,之后就基本决定用它了。既然是为项目设计架构,好像我也没道理不去更深入了解下xxl-job的一些实现原理了吧,因此在学习完操作后我就开始看xxl-job的源码了。
在看xxl-job的源码时(主要是执行器部分),发现到了两个东西,一个是bug一个是设计逻辑,然后我从设计逻辑倒推了下执行器的适用场景。
先说结论:个人认为xxl-job比较适合运行耗时比较长的大任务(定时大任务的估计也差不多),或者数量少耗时短的微任务;对于那种数量多或是运行时间较短的微任务可能不太适合(我的项目正好是后面这种...)。
再具体点,可以细分为两类:
以上结论是我的个人观点,如有错误请多指正。
说完结论,我来分析下我得出结论的依据。
首先来看ExecutorBizImpl.java的实现,这是执行器端的具体实现,.run()方法就是在接收到admin端调度任务后执行器端执行的方法。
我们看该方法的第一行,这里首先就根据jobId来获取指定任务(job)所在的执行线程(JobThread,继承自Thread)。从这里我们可以猜测,相同jobId的任务最终在单个执行器服务里面是只被分配了一个线程的。
@Override public ReturnT<String> run(TriggerParam triggerParam) { // load old:jobHandler + jobThread JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); // 忽略后面的... }
接着我们看到该方法的最后几行,首先会根据jobThread来判断jobId指定的任务是否“正在”运行中,如果是的那就复用线程,如果不是那就通过.registJobThread()新建一个JobThread实例。之后把运行参数(triggerParam)给放到了jobThread实例的队列里(JobThread会不断从内置队列中获取运行参数来执行任务)。这几行代码也验证了我上面猜测的,那也就是说相同jobId的任务在并发执行时是会通过Queue作为中介排队运行的。
@Override public ReturnT<String> run(TriggerParam triggerParam) { // 忽略前面的... // replace thread (new or exists invalid) if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // push data to queue ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; } }
所以以上也就是我为什么说xxl-job执行器不适合执行jobId相同且数量多耗时长的任务了,因为后面来的任务都在Queue中排队了。
至于我为什么一直说不适合耗时短的微任务呢?首先我们先了解一个前提,那就是JobThread在不执行任务90秒后(也就是有idle timeout)会被回收掉。所以假设jobId不同,那么jobId越多就势必会创建大量线程,如果jobId不多,那么就有频繁创建和销毁线程的开销的可能了,这个好像就有点浪费资源;再假设jobId相同,如果微任务很多,那么排队执行相对来说还是比较慢的,如果微任务不多,那还是有浪费资源的可能。
除了上述ExecutorBizImpl.java:.run()开头和结尾的代码,它中间还有些其它逻辑,我选择一部分出来解释下。
比如这个是在判断任务job(也就是这个JobHandler)有没有在运行时被更新过,如果有,那么就要把运行中的旧job给停掉,然后运行新job。
@Override public ReturnT<String> run(TriggerParam triggerParam) { // 忽略前面的... // valid:jobHandler + jobThread GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); if (GlueTypeEnum.BEAN == glueTypeEnum) { // new jobhandler IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // valid old jobThread if (jobThread!=null && jobHandler != newJobHandler) { // change handler, need kill old thread removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = newJobHandler; if (jobHandler == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); } } } // 忽略后面的... }
这里这个是job的阻塞策略,在创建任务时可选,阻塞策略其实是跟前文讲的相同jobId执行要排队有关的。当BlockStrategy是SERIAL_EXECUTION时,那么相同jobId的任务就是排队执行;当是DISCARD_LATER时,那么就抛弃后面来的job;当是COVER_EARLY时,那么就停止之前的job,开始运行新来的这个job。
@Override public ReturnT<String> run(TriggerParam triggerParam) { // 忽略前面的... // executor block strategy if (jobThread != null) { ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // discard when running if (jobThread.isRunningOrHasQueue()) { return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // kill running jobThread if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // just queue trigger } } // 忽略后面的... }
那么怎么解决不适合执行大量相同jobId任务的这个问题呢?我的思路就在于增加一种阻塞模式:单机并行(xxl-job应该也考虑过,不过没有实现,可见ExecutorBlockStrategyEnum.java中注释掉的类型)。当属于单机并行的任务在创建线程时,实际是去创建一个线程池,这样数量较多的相同jobId的任务就能并发执行了。然后这里打个广告,贴出我的魔改线程池版xxl-job的项目,跪求star