RemotehttpJobBean 触发任务源码分析
xxl-job 所有的任务触发最终都是通过这个类来执行 , 该类继承关系如下:
RemoteHttpJobBean > QuartzJobBean > Job
当quartz监听到有任务需要触发是,会调用 JobRunShell 的run方法, 在该类的run方法中,会调用当前任务的JOB_CLASS 的excute方法,
调用链最终会调用到remoteHttpJobBean 的 executeInternal()
@Override
protected
void
executeInternal(JobExecutionContext context)
throws
JobExecutionException {
// load jobId
JobKey jobKey = context.getTrigger().getJobKey();
Integer jobId = Integer.valueOf(jobKey.getName());
// trigger
XxlJobTrigger.trigger(jobId);
// 详细的代码分析往下看
}
|
public
static
void
trigger(
int
jobId) {
// 通过JobId从数据库中查询该任务的具体信息
XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId);
// job info
if
(jobInfo ==
null
) {
logger.warn(
">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}"
, jobId);
return
;
}
// 获取该类型的执行器信息
XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());
// group info
// 匹配运行模式
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);
// block strategy
// 匹配失败后的处理模式
ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(), ExecutorFailStrategyEnum.FAIL_ALARM);
// fail strategy
// 获取路由策略
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),
null
);
// route strategy
// 获取该执行器的集群机器列表
ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
// 判断路由策略 是否为 分片广播模式
if
(ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) {
for
(
int
i =
0
; i < addressList.size(); i++) {
String address = addressList.get(i);
//定义日志信息
XxlJobLog jobLog =
new
XxlJobLog();
// .....省略
ReturnT<String> triggerResult =
new
ReturnT<String>(
null
);
if
(triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
// 4.1、trigger-param
TriggerParam triggerParam =
new
TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setBroadcastIndex(i);
// 设置分片标记
triggerParam.setBroadcastIndex(addressList.size());
// 设置分片总数
// ......省略组装参数的过程
// 根据参数以及 机器地址,向执行器发送执行信息 , 此处将会详细讲解runExecutor 这个方法
triggerResult = runExecutor(triggerParam, address);
}
// 将日志ID,放入队列,便于日志监控线程来监控任务的执行状态
JobFailMonitorHelper.monitor(jobLog.getId());
logger.debug(
">>>>>>>>>>> xxl-job trigger end, jobId:{}"
, jobLog.getId());
}
}
else
{
// 出分片模式外,其他的路由策略均走这里
//定义日志信息
XxlJobLog jobLog =
new
XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
// .....省略
ReturnT<String> triggerResult =
new
ReturnT<String>(
null
);
if
(triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
// 4.1、trigger-param
TriggerParam triggerParam =
new
TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setBroadcastIndex(
0
);
// 默认分片标记为0
triggerParam.setBroadcastTotal(
1
);
// 默认分片总数为1
// .... 省略组装参数的过程
// 此处使用了策略模式, 根据不同的策略 使用不同的实现类,此处不再详细说明
triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
}
JobFailMonitorHelper.monitor(jobLog.getId());
logger.debug(
">>>>>>>>>>> xxl-job trigger end, jobId:{}"
, jobLog.getId());
}
}
|
继续往下面看, 着重看 runExecutor 这个方法 , 向执行器发送指令都是从这个方法中执行的
public
static
ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult =
null
;
try
{
//创建一个ExcutorBiz 的对象,重点在这个方法里面
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
// 这个run 方法不会最终执行,仅仅只是为了触发 proxy object 的 invoke方法,同时将目标的类型传送给服务端, 因为在代理对象的invoke的方法里面没有执行目标对象的方法
runResult = executorBiz.run(triggerParam);
}
catch
(Exception e) {
logger.error(
">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running."
, address, e);
runResult =
new
ReturnT<String>(ReturnT.FAIL_CODE,
""
+e );
}
StringBuffer runResultSB =
new
StringBuffer(I18nUtil.getString(
"jobconf_trigger_run"
) +
":"
);
runResultSB.append(
"<br>address:"
).append(address);
runResultSB.append(
"<br>code:"
).append(runResult.getCode());
runResultSB.append(
"<br>msg:"
).append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
runResult.setContent(address);
return
runResult;
}
|
通过上面可知, XxlJobDynamicScheduler.getExecutorBiz (address) , 通过机器地址,获取一个executor , 看下他的源码。
public
static
ExecutorBiz getExecutorBiz(String address)
throws
Exception {
// valid
if
(address==
null
|| address.trim().length()==
0
) {
return
null
;
}
// load-cache
address = address.trim();
//查看缓存里面是否存在,如果存在则不需要再去创建executorBiz了
ExecutorBiz executorBiz = executorBizRepository.get(address);
if
(executorBiz !=
null
) {
return
executorBiz;
}
// 创建ExecutorBiz的代理对象,重点在这个里面。
executorBiz = (ExecutorBiz)
new
NetComClientProxy(ExecutorBiz.
class
, address, accessToken).getObject();
executorBizRepository.put(address, executorBiz);
return
executorBiz;
}
|
NetComClientProxy 这是一个factoryBean , 所以我们主要看他的getObject 方法就知道怎么创建对象并返回的。
下面这个代理对象的invoke里面并没有执行目标类的方法,而是将目标类的信息包装好,发送给执行器那一端来做。
public
Object getObject()
throws
Exception {
return
Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(),
new
Class[] { iface },
new
InvocationHandler() {
@Override
public
Object invoke(Object proxy, Method method, Object[] args)
throws
Throwable {
if
(Object.
class
.getName().equals(method.getDeclaringClass().getName())) {
logger.error(
">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]"
, method.getDeclaringClass().getName(), method.getName());
throw
new
RuntimeException(
"xxl-rpc proxy class-method not support"
);
}
// 重点来了,创建request信息, 发送HTTP请求到执行器服务器上。
RpcRequest request =
new
RpcRequest();
request.setServerAddress(serverAddress);
// 服务器地址
request.setCreateMillisTime(System.currentTimeMillis());
// 创建时间, 用于判断请求是否超时
request.setAccessToken(accessToken);
// 数据校验
request.setClassName(method.getDeclaringClass().getName());
// 将目标类的class名称传给执行器,让那边来创建对象,并执行逻辑代码
request.setMethodName(method.getName());
// 方法名称为run
request.setParameterTypes(method.getParameterTypes());
// 参数类型
request.setParameters(args);
// 参数
RpcResponse response = client.send(request);
// 发送HTTP请求
if
(response ==
null
) {
logger.error(
">>>>>>>>>>> xxl-rpc netty response not found."
);
throw
new
Exception(
">>>>>>>>>>> xxl-rpc netty response not found."
);
}
if
(response.isError()) {
throw
new
RuntimeException(response.getError());
}
else
{
// 返回请求结果
return
response.getResult();
}
}
});
}
|
以上就是调度中心,触发任务之后执行的核心代码 , 接下来继续分析执行器服务端接收到请求之后的处理逻辑
执行器启动源码分析
服务端应用里面,实际上是在应用中,内嵌了一个jetty服务器, 服务器在xxlJobExecutor 初始化的时候启动。
本次示例代码中是由spring-boot 中截取而来, 该项目中,由XxlJobConfig 这个配置类来配置Executor
呈现代码宏出错: 参数'firstline'的值无效@Bean(initMethod = "start", destroyMethod = "destroy") public XxlJobExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobExecutor xxlJobExecutor = new XxlJobExecutor(); xxlJobExecutor.setAdminAddresses(adminAddresses); xxlJobExecutor.setAppName(appName); xxlJobExecutor.setIp(ip); xxlJobExecutor.setPort(port); xxlJobExecutor.setAccessToken(accessToken); xxlJobExecutor.setLogPath(logPath); xxlJobExecutor.setLogRetentionDays(logRetentionDays); return xxlJobExecutor; }
由上面可以看出,初始化 XxlJobExecutor 这个bean之后,会默认执行start 方法
public
void
start()
throws
Exception {
// 初始化调度中心的地址列表, 通过NetComClientProxy创建好adminBiz实例
initAdminBizList(adminAddresses, accessToken);
// 初始化所有带有@JobHandler的handle, 根据name , 放入一个ConcurrentHashMap 中
initJobHandlerRepository(applicationContext);
// 初始化本地日志路径
XxlJobFileAppender.initLogPath(logPath);
// 初始化本地jetty服务器
initExecutorServer(port, ip, appName, accessToken);
// 启动一个线程,用来清理本地日志, 默认保留最近一天的日志
JobLogFileCleanThread.getInstance().start(logRetentionDays);
}
|
上面初始化的那些方法,着重看 initExecutorServer () 这个方法
private
void
initExecutorServer(
int
port, String ip, String appName, String accessToken)
throws
Exception {
// 如果port为空,则默认9999为他的jetty服务器端口
port = port>
0
?port: NetUtil.findAvailablePort(
9999
);
// 创建一个ExecutorService 实例,放入Map中,后面会通过class获取到他的实例执行run方法
NetComServerFactory.putService(ExecutorBiz.
class
,
new
ExecutorBizImpl());
// rpc-service, base on jetty
NetComServerFactory.setAccessToken(accessToken);
// 启动jetty 服务器
serverFactory.start(port, ip, appName);
// jetty + registry
}
|
public
void
start(
final
int
port,
final
String ip,
final
String appName)
throws
Exception {
thread =
new
Thread(
new
Runnable() {
@Override
public
void
run() {
// The Server
server =
new
Server(
new
ExecutorThreadPool());
// 非阻塞
// HTTP connector
ServerConnector connector =
new
ServerConnector(server);
if
(ip!=
null
&& ip.trim().length()>
0
) {
connector.setHost(ip);
// The network interface this connector binds to as an IP address or a hostname. If null or 0.0.0.0, then bind to all interfaces.
}
connector.setPort(port);
// 设置连接器
server.setConnectors(
new
Connector[]{connector});
// 设置一个连接处理的handler
HandlerCollection handlerc =
new
HandlerCollection();
handlerc.setHandlers(
new
Handler[]{
new
JettyServerHandler()});
server.setHandler(handlerc);
try
{
// Start server
server.start();
logger.info(
">>>>>>>>>>> xxl-job jetty server start success at port:{}."
, port);
// 此处是启动一个执行器注册的线程, 该线程第一次执行的时候,将该执行器的信息注册到数据库, xxl_job_qrtz_trigger_registry 这张表中 ,
// 此后,没过30秒, 执行器就会去数据库更新数据,表示自己还在存活中
// 调度中心那边会有一个线程定期的去数据库扫描,会自动的将30秒之内未更新信息的机器剔除, 同时将新加入的服务载入到集群列表中
ExecutorRegistryThread.getInstance().start(port, ip, appName);
// 启动一个日志监控的线程,里面设置了一个队列,每次有任务结束后,都会把任务的日志ID和处理结果放入队列,
// 线程从队列里面拿到日志ID和处理结果,通过调用adminBiz的callback方法来回调给调度中心执行结果
TriggerCallbackThread.getInstance().start();
server.join();
// block until thread stopped
logger.info(
">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}"
, JettyServer.
class
.getName(), port);
}
catch
(Exception e) {
logger.error(e.getMessage(), e);
}
finally
{
//destroy();
}
}
});
thread.setDaemon(
true
);
// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
|
JettyServerHandler 接收请求后的处理流程
public
void
handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws
IOException, ServletException {
// invoke 主要在这个方法
RpcResponse rpcResponse = doInvoke(request);
// serialize response
byte
[] responseBytes = HessianSerializer.serialize(rpcResponse);
response.setContentType(
"text/html;charset=utf-8"
);
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(
true
);
// 响应结果
OutputStream out = response.getOutputStream();
out.write(responseBytes);
out.flush();
}
|
通过上面的handle中的代码可以知道,主要的执行逻辑在doInvoke 的方法中,
private
RpcResponse doInvoke(HttpServletRequest request) {
try
{
// deserialize request
// 读取请求数据
byte
[] requestBytes = HttpClientUtil.readBytes(request);
if
(requestBytes ==
null
|| requestBytes.length==
0
) {
RpcResponse rpcResponse =
new
RpcResponse();
rpcResponse.setError(
"RpcRequest byte[] is null"
);
return
rpcResponse;
}
// 通过hessian的序列化机制,反序列化得到字符串数据
RpcRequest rpcRequest = (RpcRequest) HessianSerializer.deserialize(requestBytes, RpcRequest.
class
);
// 得到参数之后, 执行
RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest,
null
);
return
rpcResponse;
}
catch
(Exception e) {
logger.error(e.getMessage(), e);
RpcResponse rpcResponse =
new
RpcResponse();
rpcResponse.setError(
"Server-error:"
+ e.getMessage());
return
rpcResponse;
}
}
|
public
static
RpcResponse invokeService(RpcRequest request, Object serviceBean) {
// request中的数据结构,可以看上面源码分析中提到的 NetComClientProxy中的getObjct 方法,此处不再赘述
if
(serviceBean==
null
) {
// 这个serviceBean 就是在执行器启动的时候,initExecutorServer () 这个方法中,将一个ExecutorBiz的实例放进去了,此处通过
// classname来获取这个实例
serviceBean = serviceMap.get(request.getClassName());
}
if
(serviceBean ==
null
) {
// TODO
}
RpcResponse response =
new
RpcResponse();
// 判断是否超时
if
(System.currentTimeMillis() - request.getCreateMillisTime() >
180000
) {
response.setResult(
new
ReturnT<String>(ReturnT.FAIL_CODE,
"The timestamp difference between admin and executor exceeds the limit."
));
return
response;
}
// 数据校验,验证token是否匹配,前提是token不为空
if
(accessToken!=
null
&& accessToken.trim().length()>
0
&& !accessToken.trim().equals(request.getAccessToken())) {
response.setResult(
new
ReturnT<String>(ReturnT.FAIL_CODE,
"The access token["
+ request.getAccessToken() +
"] is wrong."
));
return
response;
}
try
{
// 获取class
Class<?> serviceClass = serviceBean.getClass();
// 拿到请求中的方法名字, 此处这个值 是 run 方法
String methodName = request.getMethodName();
//方法类型
Class<?>[] parameterTypes = request.getParameterTypes();
// 方法参数
Object[] parameters = request.getParameters();
// spring的工具类, 创建一个fastClass 实例
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
// 拿到方法之后执行方法的invoke ,
Object result = serviceFastMethod.invoke(serviceBean, parameters);
response.setResult(result);
}
catch
(Throwable t) {
t.printStackTrace();
response.setError(t.getMessage());
}
return
response;
}
|
通过调度中心发过来的参数,以及执行器的处理逻辑,我们有理由可以得出此时是执行的是ExecutorBizImpl中的run方法
@Override
public
ReturnT<String> run(TriggerParam triggerParam) {
// 通过参数中的JobID, 从本地线程库里面获取线程 ( 第一次进来是没有线程的,jobThread为空 ,
// 如果线程运行90秒空闲之后,那么也会被移除)
// 本地线程库,本质上就是一个ConcurrentHashMap<Integer, JobThread>
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=
null
?jobThread.getHandler():
null
;
String removeOldReason =
null
;
//匹配任务类型, BEAN是我们自定义JOBHANDLE的模式
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if
(GlueTypeEnum.BEAN == glueTypeEnum) {
// 通过参数中的handlerName从本地内存中获取handler实例 (在执行器启动的时候,是把所有带有@JobHandler的实例通过name放入到一个map中的 )
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// 如果修改了任务的handler, name此处会默认把以前老的handler清空,后面会以最新的newJobHandler为准
if
(jobThread!=
null
&& jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason =
"更换JobHandler或更换任务模式,终止旧任务线程"
;
jobThread =
null
;
jobHandler =
null
;
}
// valid handler
if
(jobHandler ==
null
) {
jobHandler = newJobHandler;
if
(jobHandler ==
null
) {
return
new
ReturnT<String>(ReturnT.FAIL_CODE,
"job handler ["
+ triggerParam.getExecutorHandler() +
"] not found."
);
}
}
}
else
if
(GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// .... 省略一段代码
// 此处说的是,任务模式为 GLUE JAVA版, 最后是通过GROOVY的方式,将代码生成class类,最终执行,最终原理和上面一直
}
else
if
(glueTypeEnum!=
null
&& glueTypeEnum.isScript()) {
// 其他脚本执行模式,shell , python等
}
else
{
return
new
ReturnT<String>(ReturnT.FAIL_CODE,
"glueType["
+ triggerParam.getGlueType() +
"] is not valid."
);
}
// executor block strategy
if
(jobThread !=
null
) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(),
null
);
if
(ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// 这种阻塞策略说的是,丢弃后续调度, 如果这个线程正在执行的话,那么当前这个任务就不执行了,直接返回
if
(jobThread.isRunningOrHasQueue()) {
return
new
ReturnT<String>(ReturnT.FAIL_CODE,
"阻塞处理策略-生效:"
+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
}
else
if
(ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// 覆盖之前的调度, 如果当前线程以及执行了的话,那么中断这个线程, 直接将jobThread销毁
if
(jobThread.isRunningOrHasQueue()) {
removeOldReason =
"阻塞处理策略-生效:"
+ ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread =
null
;
}
}
else
{
// just queue trigger
}
}
// 如果jobThread为空,那么这个时候,就要注册一个线程到本地线程库里面去。 同时启动这个线程。
if
(jobThread ==
null
) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// 将本次任务的参数 ,放入到队列里面去,供线程调度。
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return
pushResult;
}
|
通过上面我们可以发现, 执行executorBiz的run 方法的时候, 首先会通过JOBID,从本地线程库里面获取该任务对应的线程,同时,如果任务的JobHandler有更新的话,
那么会自动使用最新的jobHandler , 同时根据任务的阻塞策略。 执行不同的操作。 最终,如果是第一次执行任务的时候,系统会分配给改任务一个线程,同时启动该线程。
接下来,可以在具体看一下JobThread 的run方法,看下最终的任务是如何执行的。
@Override
public
void
run() {
// init
try
{
// 执行IJobHandler 中的init方法,以后如果有一些,在执行handler之前的初始化的工作,可以覆写这个方法
handler.init();
}
catch
(Throwable e) {
logger.error(e.getMessage(), e);
}
// stop 为fasle的时候执行
while
(!toStop){
running =
false
;
// 执行次数
idleTimes++;
TriggerParam triggerParam =
null
;
ReturnT<String> executeResult =
null
;
try
{
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
// 从linkBlockingQueue中获取数据,如果3秒获取不到,则返回null
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if
(triggerParam!=
null
) {
running =
true
;
// 将运行次数清空,保证运行90秒空闲之后会被移除
idleTimes =
0
;
// 获取数据
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
// 创建日志
String logFileName = XxlJobFileAppender.makeLogFileName(
new
Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName);
// 写入分片信息, 将当前机器的分片标记和分片总数写入到ShardingUtil中,到时候,可以在handler中通过这个工具类获取
ShardingUtil.setShardingVo(
new
ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
// execute
XxlJobLogger.log(
"<br>----------- xxl-job job execute start -----------<br>----------- Param:"
+ triggerParam.getExecutorParams());
// 执行。。。
executeResult = handler.execute(triggerParam.getExecutorParams());
if
(executeResult ==
null
) {
executeResult = IJobHandler.FAIL;
}
XxlJobLogger.log(
"<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:"
+ executeResult);
}
else
{
if
(idleTimes >
30
) {
// 每3秒获取一次数据,获取30次都没有获取到数据之后,则现场被清除
XxlJobExecutor.removeJobThread(jobId,
"excutor idel times over limit."
);
}
}
}
catch
(Throwable e) {
if
(toStop) {
XxlJobLogger.log(
"<br>----------- JobThread toStop, stopReason:"
+ stopReason);
}
StringWriter stringWriter =
new
StringWriter();
e.printStackTrace(
new
PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
executeResult =
new
ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
XxlJobLogger.log(
"<br>----------- JobThread Exception:"
+ errorMsg +
"<br>----------- xxl-job job execute end(error) -----------"
);
}
finally
{
if
(triggerParam !=
null
) {
// callback handler info
if
(!toStop) {
// handler执行完成之后,将结果写入到日志里面去, 就是在执行器启动的时候,会建立一个线程,用来实时处理日志,此处是将结果和logID放入到队列里面去,
// 让日志线程异步的去处理
TriggerCallbackThread.pushCallBack(
new
HandleCallbackParam(triggerParam.getLogId(), executeResult));
}
else
{
// is killed
ReturnT<String> stopResult =
new
ReturnT<String>(ReturnT.FAIL_CODE, stopReason +
" [业务运行中,被强制终止]"
);
TriggerCallbackThread.pushCallBack(
new
HandleCallbackParam(triggerParam.getLogId(), stopResult));
}
}
}
}
// 当现场被终止之后,队列里面剩余的未执行的任务,将被终止的这些任务放入队列,供日志监控线程来处理,回调给调度中心
while
(triggerQueue !=
null
&& triggerQueue.size()>
0
){
TriggerParam triggerParam = triggerQueue.poll();
if
(triggerParam!=
null
) {
// is killed
ReturnT<String> stopResult =
new
ReturnT<String>(ReturnT.FAIL_CODE, stopReason +
" [任务尚未执行,在调度队列中被终止]"
);
TriggerCallbackThread.pushCallBack(
new
HandleCallbackParam(triggerParam.getLogId(), stopResult));
}
}
// destroy
try
{
handler.destroy();
}
catch
(Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(
">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}"
, Thread.currentThread());
}
|
最后来看一下,TriggerCallbackThread.pushCallBack ()这个方法,将本次任务记录的日志ID和处理结果放入队列中去了,
private
void
doCallback(List<HandleCallbackParam> callbackParamList){
// 获取调度中心的adminBiz列表,在执行器启动的时候,初始化的,
for
(AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try
{
// 这里的adminBiz 调用的callback方法,因为是通过NetComClientProxy 这个factoryBean创建的代理对象,
// 在getObject方法中,最终是没有调用的目标类方法的invoke的。 只是将目标类的方法名,参数,类名,等信息发送给调度中心了
// 发送的地址调度中心的接口地址是 :“调度中心IP/api” 这个接口 。 这个是在执行器启动的时候初始化设置好的。
// 调度中心的API接口拿到请求之后,通过参数里面的类名,方法,参数,反射出来一个对象,然后invoke, 最终将结果写入数据库
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if
(callbackResult!=
null
&& ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackResult = ReturnT.SUCCESS;
// 因为调度中心是集群式的,所以只要有一台机器返回success,那么就算成功,直接break
logger.info(
">>>>>>>>>>> xxl-job callback success, callbackParamList:{}, callbackResult:{}"
,
new
Object[]{callbackParamList, callbackResult});
break
;
}
else
{
logger.info(
">>>>>>>>>>> xxl-job callback fail, callbackParamList:{}, callbackResult:{}"
,
new
Object[]{callbackParamList, callbackResult});
}
}
catch
(Exception e) {
logger.error(
">>>>>>>>>>> xxl-job callback error, callbackParamList:{}"
, callbackParamList, e);
//getInstance().callBackQueue.addAll(callbackParamList);
}
}
}
|
以上就是XXL-JOB从调用到处理的核心代码分析。