什么是执行器
- 执行器就是指跑任务逻辑的节点
- 官方提供了执行器的samples,位于xxl-job\xxl-job-executor-samples
简析启动过程
- 以springboot版本的执行器为例子来解析
- 首先看com.xxl.job.executor.core.config.XxlJobConfig
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
// 设置调度中心的地址
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
// 执行器名字
xxlJobSpringExecutor.setAppname(appname);
// 注册到调度中心的地址,如果为空默认使用ip:port
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
// 如果admin配置了accesstoken 那么执行器也需要配置
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
复制代码
- XxlJobSpringExecutor
- XxlJobSpringExecutor是一个典型的spring管理的bean,需要分解一下它的继承实现结构以及它的生命周期方法
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean
复制代码
- XxlJobExecutor这是执行的关键类,我的理解XxlJobSpringExecutor只是用来适配spring的生命周期来达到XxlJobExecutor生命周期方法的调用
- 初始化方法
// start
// com.xxl.job.core.executor.impl.XxlJobSpringExecutor
@Override
public void afterSingletonsInstantiated() {
// 这一步是初始化被xxl-job注解的方法,这些方法其实就是任务逻辑
initJobHandlerMethodRepository(applicationContext);
// 刷新胶水代码的工厂?先保留悬念,我猜它是处理admin上直接编写代码的逻辑
// 这部分解析放到下一个章节
GlueFactory.refreshInstance(1);
try {
// 调用XxlJobExecutor生命周期的方法
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// 省略代码N行
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
// 1.遍历上下文所有的bean
// 2.通过MethodIntrospector筛选所有被@XxlJob标注的的方法
// 3.读取注解的属性,拿到任务名字,初始化方法,销毁方法
// 4.构造一个MethodJobHandler注册到jobHandlerRepository(一个map变量)
}
复制代码
- XxlJobExecutor
public void start() throws Exception {
// 初始化日志文件,如果没有指定,默认是/data/applogs/xxl-job/jobhandler
XxlJobFileAppender.initLogPath(logPath);
// 初始化调度中心的列表
// 本质上就是构造AdminBizClient,然后保存到列表里
initAdminBizList(adminAddresses, accessToken);
// 如方法名,日志文件清理线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 触发器回调,启动方法里会创建2个线程去执行轮询逻辑
TriggerCallbackThread.getInstance().start();
// 初始化内置服务器
initEmbedServer(address, ip, port, appname, accessToken);
}
复制代码
//com.xxl.job.core.thread.TriggerCallbackThread#start
//省略N行代码
public void start() {
// 1.判断调度中心列表是否为空,空则推出
// 2.启动回调线程
// callback
// 用一个toStop变量来控制循环是否退出,推出后后面还有一段兜底逻辑,会把
// 队列里所有剩下的元素拉出来处理
// 这里主要分析回调是啥
while(!toStop){
try {
// com.xxl.job.core.thread.TriggerCallbackThread#callBackQueue
// callBackQueue是一个阻塞型的队列
// HandleCallbackParam (handleCode,handleMsg)
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
// 一次性取出队列所有元素组装成的一个list取调用doCallback
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
// 启动重试线程,主要是从回调失败的记录文件里拿出记录重新回调
}
// 回调的主要代码
private void doCallback(List<HandleCallbackParam> callbackParamList){
// callback, will retry if error
// 这里会对所有的调度中心进行回调,只要有一个回调失败就会重试
// 如果没记错,官方提倡提供一个域名,屏蔽掉调度中心的地址列表
// 所以这段代码我理解是“不同”的调度中心
// 回调的逻辑是:一个http请求,至于这个请求会触发admin什么逻辑,以后会分析的
// 目前猜测是任务执行完毕,回调调度中心,类似一个ACK的东西
// return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
// 失败的话会插入到失败重试的文件
}
复制代码
- 启动的最后一步,EmbedServer start!!!
public void start(final String address, final int port, final String appname, final String accessToken) {
// ExecutorBiz 这个接口定义了心跳,任务kill,任务运行的接口
// Impl实现了里面的“真正逻辑”,还有一个实现类主要是http请求ExecutorBizClient
executorBiz = new ExecutorBizImpl();
// 构建一条线程
// 然后启动了一个http服务器,实现是基于Netty(后面也会有netty相对应的学习笔记)
// 省略N行代码
// 前面3个handler都是netty内置的,有兴趣的同学可以学一下netty相关的东西
// 处理空闲连接
addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
// http请求的编解码
addLast(new HttpServerCodec())
// http对象聚合器
addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
// 这个就是xxl-job自己实现的http处理器了,传入刚才的业务处理类和一个max=200的线程池
addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
复制代码
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// 省略N行代码
// 丢进线程池去执行,持有ctx可以进行异步写回,不会阻塞netty处理器的线程
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// 这段代码会对请求做分发
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// 序列化对象
// 写回去
}
});
}
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// 省略N行前置验证
// services mapping
try {
// 心跳处理
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
// 空闲心跳处理
// 这个参数里带了个jobid,任务id
// 利用jobid去查找有没有对应的jobThread,如果没有代表该任务是空闲的
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
// 任务处理
// TriggerParam 参数有点多,感兴趣的伙伴自己去看下
// 后面我们会重点讲这段里面的逻辑
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
// 停止任务
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
// log请求
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
复制代码
- com.xxl.job.core.biz.impl.ExecutorBizImpl#run
// 这段代码的分析以第一次执行来备注
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// 获取管理这个jobid的job线程
// job线程持有一个LinkedBlockingQueue,和一个id集合(集合是用来避免admin重复投递相同的任务的)
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
// 第一次获取不到job线程,所以绑定的job处理器也是空
// 这个处理器就是最开始初始化的时候,扫描xxljob注解的方法组装的类
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// 这里就根据name直接去jobHandlerRepository获取一个处理器
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// 校验
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// 变量替换
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
}
// 省略非Bean模式的代码执行,后面讲glue的时候再回过头来看
// 第一次进来,所以job线程为null,所以这段逻辑先绕过
// 后面会回过头来讲这一段东西
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// 注册一个job线程
// 注册线程的代码主要是传入jobid和处理器,然后启动(启动代码下面分析,目前猜测它不是马上执行任务的)
// 同时会把旧的关联jobid的线程拿出来停止和中断
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// 把任务推到job线程的队列和集合
// jobthread启动是不断拉取队列的元素进行消费的
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
复制代码
- 接下来看JobThread启动后干了什么(com.xxl.job.core.thread.JobThread#run)
// 篇幅原因我会适当省略代码
@Override
public void run() {
// 初始化方法执行
// 注意这个地方,这个初始化方法就是@XxlJob的init()
// jobThread启动的时候才会执行一次,并不是每次执行execute方法都会执行init和destroy
handler.init();
// 接下里是一个循环不断从队列中拉取元素
// toStop依然是一个volatile变量用来控制jobThread是否运行的
while(!toStop){
// 接下来会创建一个针对当前日期的当前jobid的日志文件,把本次job的日志打到对应的文本
// 这也是为什么admin后台可以精准针对每个job的日志rolling了
// 接下来就是任务执行,先检查传过来的执行超时时间
if (triggerParam.getExecutorTimeout() > 0) {
Thread futureThread = null;
try {
// 如果存在执行超时时间,其实本质上就是利用一个线程单独去处理,返回future,然后对future结果
// 获取执行超时设置
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
XxlJobContext.setXxlJobContext(xxlJobContext);
handler.execute();
return true;
}
});
futureThread = new Thread(futureTask);
futureThread.start();
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
// 省略N行
} finally {
futureThread.interrupt();
}
} else {
// 没超时直接执行
handler.execute();
}
// 上面的逻辑都是从队列获取到的元素不为空
// 那么有个变量 idleTimes 是判断连续30次都从队列获取不到元素,那就推出这个jobthread了
if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
// 后面还有一段finally逻辑,会把任务处理结果丢进一个队列,然后有一条异步线程拉取然后http推回admin
// 就是上文提到的TriggerCallbackThread
TriggerCallbackThread.pushCallBack
}
}
复制代码
结尾
- 上面留了一些问题,后面的文章会继续解析
- callback回调后admin做了什么?
- glue代码是怎么执行的?
- 上面简单分析了一整个启动过程,总结一下
- 我们可以把xxl-job这种分布式任务调度框架简单理解为:每个执行器其实就是一个http服务器,然后依然是请求应答模式,只是现在的请求是admin发出的,然后执行器就执行任务
- 以往的单点任务调度把任务和调度2件事放在了一起,所以不方便做集群,分布式处理。现在就是调度抽离出去到admin,执行器就做任务处理,分而治之