@Service public class MasterSchedulerService extends Thread
可以看出该类继承了线程基类,那该类就可以在线程池内执行。
/** * logger of MasterSchedulerService */ private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); /** * dolphinscheduler database interface */ @Autowired private ProcessService processService; /** * zookeeper master client */ @Autowired private MasterRegistryClient masterRegistryClient; /** * master config */ @Autowired private MasterConfig masterConfig; /** * alert manager */ @Autowired private ProcessAlertManager processAlertManager; /** * netty remoting client */ private NettyRemotingClient nettyRemotingClient; /** * master exec service */ private ThreadPoolExecutor masterExecService;
可以看出它有一个ProcessService
这个属性集成了很多mappers类,提供数据dao服务。还有一个ProcessAlertManager
告警的管理器,另外也有netty的客户端和一个线程池。
@PostConstruct public void init() { this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); }
对线程池进行赋值,并且创建一个netty的客户端。
既然是线程类,就必须有run方法,我们查看一下run方法
/** * run of MasterSchedulerService */ @Override public void run() { logger.info("master scheduler started"); while (Stopper.isRunning()) { try { boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); if (!runCheckFlag) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } scheduleProcess(); } catch (Exception e) { logger.error("master scheduler thread error", e); } } }
该run方法会先检查一下资源,看是否有空闲资源,如果没有就让线程睡眠一会儿然后重新检查资源,当有了足够的资源就开始执行scheduleProcess
方法
我们向下追踪scheduleProcess
方法
private void scheduleProcess() throws Exception { try { masterRegistryClient.blockAcquireMutex(); int activeCount = masterExecService.getActiveCount(); // make sure to scan and delete command table in one transaction Command command = processService.findOneCommand(); if (command != null) { logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType()); try { ProcessInstance processInstance = processService.handleCommand(logger, getLocalAddress(), this.masterConfig.getMasterExecThreads() - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); masterExecService.execute( new MasterExecThread( processInstance , processService , nettyRemotingClient , processAlertManager , masterConfig)); } } catch (Exception e) { logger.error("scan command error ", e); processService.moveToErrorCommand(command, e.toString()); } } else { //indicate that no command ,sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } finally { masterRegistryClient.releaseLock(); } }
首先获取一个master的分布式锁
查看master的executor线程池中有多少active状态的线程。
从数据库中拿到一个command命令
然后构造一个该命令对应的实体类ProcessInstance
然后在该类的线程池内执行MasterExecThread线程【点击打开MasterExecThread】。