LTS原理--TaskTracker任务处理(四)

版权声明:欢迎转载 https://blog.csdn.net/qq924862077/article/details/82825708

TaskTracker主要的功能是执行任务,其有两点操作:

(1)当TaskTracker启动时会根据当前TaskTracker资源定时向JobTracker

(2)TaskTracker接收到JobTracker推送的任务执行任务。

参考LTS原理--JobTracker任务接收与分配(三)

一、示例:

地址:https://github.com/ltsopensource/lts-examples/tree/master/lts-example-tasktracker

1、配置

  <context:component-scan base-package="com.github.ltsopensource.example"/>

    <bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
        //任务执行类
        <property name="jobRunnerClass" value="com.github.ltsopensource.example.spring.SpringAnnotationJobRunner"/>
        <property name="clusterName" value="test_cluster"/>
        <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
        <property name="nodeGroup" value="test_trade_TaskTracker"/>
        <property name="workThreads" value="64"/>
        <property name="configs">
            <props>
                <prop key="job.fail.store">mapdb</prop>
            </props>
        </property>
    </bean>

2、任务执行类实现JobRunner接口

public class SpringAnnotationJobRunner implements JobRunner {

    private static final Logger LOGGER = LoggerFactory.getLogger(SpringAnnotationJobRunner.class);

    @Autowired
    SpringBean springBean;

    @Override
    public Result run(JobContext jobContext) throws Throwable {
        try {
            Thread.sleep(1000L);

            springBean.hello();

            // TODO 业务逻辑
            LOGGER.info("我要执行:" + jobContext.getJob());
            BizLogger bizLogger = jobContext.getBizLogger();
            // 会发送到 LTS (JobTracker上)
            bizLogger.info("测试,业务日志啊啊啊啊啊");

        } catch (Exception e) {
            LOGGER.info("Run job failed!", e);
            return new Result(Action.EXECUTE_LATER, e.getMessage());
        }
        return new Result(Action.EXECUTE_SUCCESS, "执行成功了,哈哈");
    }

}

二、请求任务

在TaskTracker初始化时会启用定时任务,根据服务器线程资源请求向JobTracker发送获取任务的请求。

在TaskTracker的beforeStart中初始化时会初始化JobPullMachine会建立定时任务向JobTracker发送任务请求。

 @Override
    protected void beforeStart() {
        appContext.setMStatReporter(new TaskTrackerMStatReporter(appContext));

        appContext.setRemotingClient(remotingClient);
        // 设置 线程池
        appContext.setRunnerPool(new RunnerPool(appContext));
        appContext.getMStatReporter().start();
        //拉取任务
        appContext.setJobPullMachine(new JobPullMachine(appContext));
        appContext.setStopWorkingMonitor(new StopWorkingMonitor(appContext));

        appContext.getHttpCmdServer().registerCommands(
                new JobTerminateCmd(appContext));     // 终止某个正在执行的任务
    }

在JobPullMachine初始化时会监听注册中心中JobTracker节点的信息,如果存在这个节点,则会调用start方法,start方法中建立定时任务向JobTracker发送请求。

 public JobPullMachine(final TaskTrackerAppContext appContext) {
        this.appContext = appContext;
        this.jobPullFrequency = appContext.getConfig().getParameter(ExtConfig.JOB_PULL_FREQUENCY, Constants.DEFAULT_JOB_PULL_FREQUENCY);

        this.machineResCheckEnable = appContext.getConfig().getParameter(ExtConfig.LB_MACHINE_RES_CHECK_ENABLE, false);
		//从zk中监听JobTracker节点,如果存在节点则调用start方法,start方法中会建立定时任务执行worker线程
        appContext.getEventCenter().subscribe(
                new EventSubscriber(JobPullMachine.class.getSimpleName().concat(appContext.getConfig().getIdentity()),
                        new Observer() {
                            @Override
                            public void onObserved(EventInfo eventInfo) {
                                if (EcTopic.JOB_TRACKER_AVAILABLE.equals(eventInfo.getTopic())) {
                                    // JobTracker 可用了
                                    start();
                                } else if (EcTopic.NO_JOB_TRACKER_AVAILABLE.equals(eventInfo.getTopic())) {
                                    stop();
                                }
                            }
                        }), EcTopic.JOB_TRACKER_AVAILABLE, EcTopic.NO_JOB_TRACKER_AVAILABLE);
        this.worker = new Runnable() {
            @Override
            public void run() {
                try {
                    if (!start.get()) {
                        return;
                    }
                    if (!isMachineResEnough()) {
                        // 如果机器资源不足,那么不去取任务
                        return;
                    }
                    //向JobTracker服务器发送任务请求
                    sendRequest();
                } catch (Exception e) {
                    LOGGER.error("Job pull machine run error!", e);
                }
            }
        };
    }

在start方法中建立定时任务,定时任务中会调用sendRequest方法,定时向JobTracker服务发送任务请求。

扫描二维码关注公众号,回复: 3394324 查看本文章
private void start() {
        try {
            if (start.compareAndSet(false, true)) {
                if (scheduledFuture == null) {
                    scheduledFuture = executorService.scheduleWithFixedDelay(worker, jobPullFrequency * 1000, jobPullFrequency * 1000, TimeUnit.MILLISECONDS);
                }
                LOGGER.info("Start Job pull machine success!");
            }
        } catch (Throwable t) {
            LOGGER.error("Start Job pull machine failed!", t);
        }
    }

在sendRequest中会判断当前TaskTracker可执行线程数,然后向JobTracker发送请求

private void sendRequest() throws RemotingCommandFieldCheckException {
		//判断当前节点还有可执行线程数
        int availableThreads = appContext.getRunnerPool().getAvailablePoolSize();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("current availableThreads:{}", availableThreads);
        }
        if (availableThreads == 0) {
            return;
        }
		//创建请求体
        JobPullRequest requestBody = appContext.getCommandBodyWrapper().wrapper(new JobPullRequest());
        requestBody.setAvailableThreads(availableThreads);
        RemotingCommand request = RemotingCommand.createRequestCommand(JobProtos.RequestCode.JOB_PULL.code(), requestBody);

        try {
			//向JobTracker发送任务请求
            RemotingCommand responseCommand = appContext.getRemotingClient().invokeSync(request);
            if (responseCommand == null) {
                LOGGER.warn("Job pull request failed! response command is null!");
                return;
            }
            if (JobProtos.ResponseCode.JOB_PULL_SUCCESS.code() == responseCommand.getCode()) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Job pull request success!");
                }
                return;
            }
            LOGGER.warn("Job pull request failed! response command is null!");
        } catch (JobTrackerNotFoundException e) {
            LOGGER.warn("no job tracker available!");
        }
    }

在RemotingClientDelegate的invokeSync方法中会从注册中心获取JobTracker节点信息,然后建立远程连接发送请求。

/**
     * 同步调用
     */
    public RemotingCommand invokeSync(RemotingCommand request)
            throws JobTrackerNotFoundException {

        Node jobTracker = getJobTrackerNode();

        try {
            RemotingCommand response = remotingClient.invokeSync(jobTracker.getAddress(),
                    request, appContext.getConfig().getInvokeTimeoutMillis());
            this.serverEnable = true;
            return response;
        } catch (Exception e) {
            // 将这个JobTracker移除
            jobTrackers.remove(jobTracker);
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e1) {
                LOGGER.error(e1.getMessage(), e1);
            }
            // 只要不是节点 不可用, 轮询所有节点请求
            return invokeSync(request);
        }
    }

三、接收任务执行

TaskTracker也是提供了RemotingDispatcher类,用来接收JobTracker发送过来的任务

public class RemotingDispatcher extends AbstractProcessor {

    private final Map<JobProtos.RequestCode, RemotingProcessor> processors = new HashMap<JobProtos.RequestCode, RemotingProcessor>();

    public RemotingDispatcher(TaskTrackerAppContext appContext) {
        super(appContext);
        processors.put(JobProtos.RequestCode.PUSH_JOB, new JobPushProcessor(appContext));
        processors.put(JobProtos.RequestCode.JOB_ASK, new JobAskProcessor(appContext));
    }
    //处理JobTracker发送过来的任务
    @Override
    public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {

        JobProtos.RequestCode code = JobProtos.RequestCode.valueOf(request.getCode());
        RemotingProcessor processor = processors.get(code);
        if (processor == null) {
            return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_CODE_NOT_SUPPORTED.code(),
                    "request code not supported!");
        }
        return processor.processRequest(channel, request);
    }

}

在JobPushProcessor的processRequest中会根据JobTracker发送过来的数据,线程池中执行相关任务。

@Override
    public RemotingCommand processRequest(Channel channel,
                                          final RemotingCommand request) throws RemotingCommandException {

		//获取请求数据
        JobPushRequest requestBody = request.getBody();

        // JobTracker 分发来的 job
        final List<JobMeta> jobMetaList = requestBody.getJobMetaList();
        List<String> failedJobIds = null;

        for (JobMeta jobMeta : jobMetaList) {
            try {
				//执行任务
                appContext.getRunnerPool().execute(jobMeta, jobRunnerCallback);
            } catch (NoAvailableJobRunnerException e) {
                if (failedJobIds == null) {
                    failedJobIds = new ArrayList<String>();
                }
                failedJobIds.add(jobMeta.getJobId());
            }
        }
        if (CollectionUtils.isNotEmpty(failedJobIds)) {
            // 任务推送失败
            JobPushResponse jobPushResponse = new JobPushResponse();
            jobPushResponse.setFailedJobIds(failedJobIds);
            return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.NO_AVAILABLE_JOB_RUNNER.code(), jobPushResponse);
        }

        // 任务推送成功
        return RemotingCommand.createResponseCommand(JobProtos
                .ResponseCode.JOB_PUSH_SUCCESS.code(), "job push success!");
    }

在RunnerPool中调用execute执行任务,会创建线程在线程池中进行执行

public void execute(JobMeta jobMeta, RunnerCallback callback) throws NoAvailableJobRunnerException {
        try {
            threadPoolExecutor.execute(
                    new JobRunnerDelegate(appContext, jobMeta, callback));
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Receive job success ! " + jobMeta);
            }
        } catch (RejectedExecutionException e) {
            LOGGER.warn("No more thread to run job .");
            throw new NoAvailableJobRunnerException(e);
        }
    }

在线程JobRunnerDelegate中调用run方法执行,通过反射调用jobRunnerClass的实现类,完成run执行

@Override
    public void run() {

        thread = Thread.currentThread();

        try {
            blockedOn(interruptor);
            if (Thread.currentThread().isInterrupted()) {
                ((InterruptibleAdapter) interruptor).interrupt();
            }

            while (jobMeta != null) {
                long startTime = SystemClock.now();
                // 设置当前context中的jobId
                Response response = new Response();
                response.setJobMeta(jobMeta);

                BizLoggerAdapter logger = (BizLoggerAdapter) BizLoggerFactory.getLogger(
                        appContext.getBizLogLevel(),
                        appContext.getRemotingClient(), appContext);

                try {
                    appContext.getRunnerPool().getRunningJobManager()
                            .in(jobMeta.getJobId(), this);
					//获取JobRunner接口的实现类
                    this.curJobRunner = appContext.getRunnerPool().getRunnerFactory().newRunner();
					//调用run方法,完成任务调度
                    Result result = this.curJobRunner.run(buildJobContext(logger, jobMeta));

                    if (result == null) {
                        response.setAction(Action.EXECUTE_SUCCESS);
                    } else {
                        if (result.getAction() == null) {
                            response.setAction(Action.EXECUTE_SUCCESS);
                        } else {
                            response.setAction(result.getAction());
                        }
                        response.setMsg(result.getMsg());
                    }

                    long time = SystemClock.now() - startTime;
                    stat.addRunningTime(time);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Job execute completed : {}, time:{} ms.", jobMeta.getJob(), time);
                    }
                } catch (Throwable t) {
                    StringWriter sw = new StringWriter();
                    t.printStackTrace(new PrintWriter(sw));
                    response.setAction(Action.EXECUTE_EXCEPTION);
                    response.setMsg(sw.toString());
                    long time = SystemClock.now() - startTime;
                    stat.addRunningTime(time);
                    LOGGER.error("Job execute error : {}, time: {}, {}", jobMeta.getJob(), time, t.getMessage(), t);
                } finally {
                    checkInterrupted(logger);
                    appContext.getRunnerPool().getRunningJobManager()
                            .out(jobMeta.getJobId());
                }
                // 统计数据
                stat(response.getAction());

                if (isStopToGetNewJob()) {
                    response.setReceiveNewJob(false);
                }
                this.jobMeta = callback.runComplete(response);
                DotLogUtils.dot("JobRunnerDelegate.run get job " + (this.jobMeta == null ? "NULL" : "NOT_NULL"));
            }
        } finally {
            blockedOn(null);
        }
    }

总结:

TaskTracker主要有两方面:

(1)主动向JobTracker发送请求获取任务

(2)监听JobTracker发送过来的任务进行任务执行

猜你喜欢

转载自blog.csdn.net/qq924862077/article/details/82825708