前言
前一篇匆匆讲了两个类,这次继续续上MasterExecThread类,继续水下去。
一、MasterExecThread
上一篇说任务提交的方法是submitStandByTask()这个,下面它的代码就如下,它会将前面解析后能满足提交条件的任务都放入readyToSubmitTaskList这个对象里,然后现在逐个又从这个队列里逐个取出任务传到submitTaskExec(task)方法中。
/**
* handling the list of tasks to be submitted
*/
private void submitStandByTask(){
for(Map.Entry<String, TaskInstance> entry: readyToSubmitTaskList.entrySet()) {
TaskInstance task = entry.getValue();
DependResult dependResult = getDependResultForTask(task);
if(DependResult.SUCCESS == dependResult){
if(retryTaskIntervalOverTime(task)){
submitTaskExec(task);
removeTaskFromStandbyList(task);
}
}else if(DependResult.FAILED == dependResult){
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTask.put(entry.getKey(), task);
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult);
}
}
}
开始判断任务的类型并将该任务类型注入MasterBaseTaskExecThread的子类的构造器下,该MasterBaseTaskExecThread实现了Callable接口。
接着将实例化的MasterBaseTaskExecThread的子类传进线程池帮运行,并且保存该实例的引用到activeTaskNode这个map中。
/**
* submit task to execute
* @param taskInstance task instance
* @return TaskInstance
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
MasterBaseTaskExecThread abstractExecThread = null;
if(taskInstance.isSubProcess()){
abstractExecThread = new SubProcessTaskExecThread(taskInstance);
}else if(taskInstance.isDependTask()){
abstractExecThread = new DependentTaskExecThread(taskInstance);
}else if(taskInstance.isConditionsTask()){
abstractExecThread = new ConditionsTaskExecThread(taskInstance);
}else {
abstractExecThread = new MasterTaskExecThread(taskInstance);
}
Future<Boolean> future = taskExecService.submit(abstractExecThread);
activeTaskNode.putIfAbsent(abstractExecThread, future);
return abstractExecThread.getTaskInstance();
}
再看看activeTaskNode下的那些被保存的节点是干啥用的,如下就能明了看到被调用的地方了,用于监控当前任务节点是否处理完成方便后一步动作进行处理,或可能用户需要杀死此任务,都需要此节点的索引。
二、MasterBaseTaskExecThread
然後接著看call方法的内容,如下。
/**
* call
* @return boolean
* @throws Exception exception
*/
@Override
public Boolean call() throws Exception {
this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
return submitWaitComplete();
}
其子类重写了submitWaitComplete()方法,选一个MasterTaskExecThread看一下他的提交结果,主要就由提交行为与等待任务提交成功后退出的行为构成。
/**
* submit task instance and wait complete
*
* @return true is task quit is true
*/
@Override
public Boolean submitWaitComplete() {
Boolean result = false;
this.taskInstance = submit();
if(this.taskInstance == null){
logger.error("submit task instance to mysql and queue failed , please check and fix it");
return result;
}
if(!this.taskInstance.getState().typeIsFinished()) {
result = waitTaskQuit();
}
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
logger.info("task :{} id:{}, process id:{}, exec thread completed ",
this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
return result;
}
//然后又看一下其父类提交的方法
* submit master base task exec thread
* @return TaskInstance
*/
protected TaskInstance submit() {
Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
int retryTimes = 1;
boolean submitDB = false;
boolean submitTask = false;
TaskInstance task = null;
while (retryTimes <= commitRetryTimes){
try {
if(!submitDB){
// submit task to db
task = processService.submitTask(taskInstance);
if(task != null && task.getId() != 0){
submitDB = true;
}
}
if(submitDB && !submitTask){
// dispatch task
submitTask = dispatchTask(task);
}
if(submitDB && submitTask){
return task;
}
if(!submitDB){
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
}else if(!submitTask){
logger.error("task commit failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes);
}
Thread.sleep(commitRetryInterval);
} catch (Exception e) {
logger.error("task commit to mysql and dispatcht task failed",e);
}
retryTimes += 1;
}
return task;
}
任务信息成功保存到数据库后,看一下它怎么将任务分发出去的,就是如下的方法啦,不难看到将此任务的重要的部分任务信息塞进队列里面去,另一端又从队列里面消费此任务。
/**
* dispatcht task
* @param taskInstance taskInstance
* @return whether submit task success
*/
public Boolean dispatchTask(TaskInstance taskInstance) {
try{
if(taskInstance.isConditionsTask()
|| taskInstance.isDependTask()
|| taskInstance.isSubProcess()){
return true;
}
if(taskInstance.getState().typeIsFinished()){
logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));
return true;
}
// task cannot be submitted because its execution state is RUNNING or DELAY.
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
|| taskInstance.getState() == ExecutionStatus.DELAY_EXECUTION) {
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
return true;
}
logger.info("task ready to submit: {}", taskInstance);
/**
* taskPriorityInfo
*/
String taskPriorityInfo = buildTaskPriorityInfo(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(),
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
taskUpdateQueue.put(taskPriorityInfo);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
return true;
}catch (Exception e){
logger.error("submit task Exception: ", e);
logger.error("task error : %s", JSONUtils.toJsonString(taskInstance));
return false;
}
}
然后再看一下这个等待任务退出的方法, 不难看到下面是个死循环,说明正常情况下只有这个任务运行完毕后,该任务占用的线程资源才会被回收。
/**
* polling db
*
* wait task quit
* @return true if task quit success
*/
public Boolean waitTaskQuit(){
// query new state
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
while (Stopper.isRunning()){
try {
if(this.processInstance == null){
logger.error("process instance not exists , master task exec thread exit");
return true;
}
// task instance add queue , waiting worker to kill
if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
cancelTaskInstance();
}
if(processInstance.getState() == ExecutionStatus.READY_PAUSE){
pauseTask();
}
// task instance finished
if (taskInstance.getState().typeIsFinished()){
// if task is final result , then remove taskInstance from cache
taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());
break;
}
if (checkTaskTimeout()) {
this.checkTimeoutFlag = !alertTimeout();
}
// updateProcessInstance task instance
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
logger.error("exception",e);
if (processInstance != null) {
logger.error("wait task quit failed, instance id:{}, task id:{}",
processInstance.getId(), taskInstance.getId());
}
}
}
return true;
}
总结
master端将工作流实例解析成DAG,然后从DAG里面寻找满足提交任务提交的任务保存到准备提交的集合下。接着又遍历此集合判断提交的任务类型并向下转型掉丢给线程池帮忙提交。提交的时候会将此任务信息写进数据库保存,且通过任务队列进行分发出去,供另有头消费。接着该线程就等待着此任务运行结束了才释放这个占用的线程资源才会回归线程池中。(让一个线程服务一个任务,万一这个任务是耗时长的任务感觉有点亏呀)。