分析入口
ignite.compute().call(IgniteCallable<R> job)
1, IgniteComputeImpl.callAsync0
2, GridClosureProcessor.callAsync
3, GridTaskProcessor.execute -> startTask 这个方法比较长 前面大部分处理 taskClass, deploy
分析对客户端运算类的处理
execute执行的时候,mode与job封装进T7, T7 继承 GridPeerDeployAwareTaskAdapter , 实现了 ComputeTask (map,reduce)
GridClosureProcessor
ctx.task().execute(new T7<>(mode, job), null, sys, execName);
T7实现了ComputeTask的map reduce方法
@Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
//这个方法的目的是根据调用方式是rebalance还是broadcast,将job与要执行的节点node匹配上
return outMap(t.get1(), F.asList(t.get2()), subgrid, lb);
}
private <T, R> ComputeTaskInternalFuture<R> startTask(
@Nullable String taskName,
@Nullable Class<?> taskCls,
@Nullable ComputeTask<T, R> task,
IgniteUuid sesId,
@Nullable T arg,
boolean sys,
@Nullable String execName)
startTask 的第三个参数传入 T7,前两个参数传入的是空, 在方法前半段,对taskName,taskClass做了初始化,来源于task
4, 创建TaskSession
// Creates task session with task name and task version.
GridTaskSessionImpl ses = ctx.session().createTaskSession(
sesId,
ctx.localNodeId(),
taskName,
dep,
taskCls == null ? null : taskCls.getName(),
top,
topPred,
startTime,
endTime,
Collections.<ComputeJobSibling>emptyList(),
Collections.emptyMap(),
fullSup,
internal,
subjId,
execName);
5.创建 GridTaskWorker
GridTaskWorker<?, ?> taskWorker = new GridTaskWorker<>(
ctx,
arg,
ses,
fut,
taskCls,
task,
dep,
new TaskEventListener(),
map,
subjId);
6. 调用 taskWorker.run(); 启动 job
下面进入 GridTaskWorker 看看run的动作
run方法属于 抽象类 GridWorker 中的方法,里面调用 body()
如下是GridTaskWorker里面的 body方法实现
@Override protected void body() {
evtLsnr.onTaskStarted(this);
try {
// Use either user task or deployed one.
if (task == null) {
assert taskCls != null;
assert ComputeTask.class.isAssignableFrom(taskCls);
try {
task = newTask((Class<? extends ComputeTask<T, R>>)taskCls);
}
catch (IgniteCheckedException e) {
// If cannot instantiate task, then assign internal flag based
// on information available.
internal = dep.internalTask(null, taskCls);
recordTaskEvent(EVT_TASK_STARTED, "Task started.");
throw e;
}
}
internal = ses.isInternal();
recordTaskEvent(EVT_TASK_STARTED, "Task started.");
initializeSpis();
ses.setClassLoader(dep.classLoader());
// Nodes are ignored by affinity tasks.
final List<ClusterNode> shuffledNodes =
affCacheIds == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();
// Load balancer.
ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes);
continuous = ctx.resource().isAnnotationPresent(dep, task, TaskContinuousMapperResource.class);
if (log.isDebugEnabled())
log.debug("Injected task resources [continuous=" + continuous + ']');
// Inject resources.
ctx.resource().inject(dep, task, ses, balancer, mapper);
Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(dep.classLoader(),
new Callable<Map<? extends ComputeJob, ClusterNode>>() {
@Override public Map<? extends ComputeJob, ClusterNode> call() {
return task.map(shuffledNodes, arg);
}
});
if (log.isDebugEnabled())
log.debug("Mapped task jobs to nodes [jobCnt=" + (mappedJobs != null ? mappedJobs.size() : 0) +
", mappedJobs=" + mappedJobs + ", ses=" + ses + ']');
if (F.isEmpty(mappedJobs)) {
synchronized (mux) {
// Check if some jobs are sent from continuous mapper.
if (F.isEmpty(jobRes))
throw new IgniteCheckedException("Task map operation produced no mapped jobs: " + ses);
}
}
else
processMappedJobs(mappedJobs);
synchronized (mux) {
lockRespProc = false;
}
processDelayedResponses();
}
catch (ClusterGroupEmptyCheckedException e) {
U.warn(log, "Failed to map task jobs to nodes (topology projection is empty): " + ses);
finishTask(null, e);
}
catch (IgniteException | IgniteCheckedException e) {
if (!fut.isCancelled()) {
if (!(e instanceof VisorClusterGroupEmptyException))
U.error(log, "Failed to map task jobs to nodes: " + ses, e);
finishTask(null, e);
}
else if (log.isDebugEnabled())
log.debug("Failed to map task jobs to nodes due to task cancellation: " + ses);
}
// Catch throwable to protect against bad user code.
catch (Throwable e) {
String errMsg = "Failed to map task jobs to nodes due to undeclared user exception" +
" [cause=" + e.getMessage() + ", ses=" + ses + "]";
U.error(log, errMsg, e);
finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
if (e instanceof Error)
throw e;
}
}
核心方法 1 , processMappedJobs(mappedJobs);
这个方法是发送job,如果job是本地类型,则本地执行,如果是远程的,发送到topic
下面的是直接调用了, 返回了结果集, 这个结果集是封装好的ComputeJob
Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(dep.classLoader(),
new Callable<Map<? extends ComputeJob, ClusterNode>>() {
@Override public Map<? extends ComputeJob, ClusterNode> call() {
return task.map(shuffledNodes, arg);
}
});
针对映射好的job->node封装进 GridJobResultImpl , ComputeJob的实现类 C2
jobResList.add(new GridJobResultImpl(job, jobId, node, sib));
然后调用sendRequest方法 , 发送请求
/**
* @param res Job result.
*/
private void sendRequest(ComputeJobResult res) {
assert res != null;
GridJobExecuteRequest req = null;
ClusterNode node = res.getNode();
try {
ClusterNode curNode = ctx.discovery().node(node.id());
// Check if node exists prior to sending to avoid cases when a discovery
// listener notified about node leaving after topology resolution. Note
// that we make this check because we cannot count on exception being
// thrown in case of send failure.
if (curNode == null) {
U.warn(log, "Failed to send job request because remote node left grid (if fail-over is enabled, " +
"will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class);
GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),
res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + node));
onResponse(fakeRes);
}
else {
long timeout = ses.getEndTime() == Long.MAX_VALUE ? Long.MAX_VALUE :
ses.getEndTime() - U.currentTimeMillis();
if (timeout > 0) {
boolean loc = node.id().equals(ctx.discovery().localNode().id()) &&
!ctx.config().isMarshalLocalJobs();
Map<Object, Object> sesAttrs = ses.isFullSupport() ? ses.getAttributes() : null;
Map<? extends Serializable, ? extends Serializable> jobAttrs =
(Map<? extends Serializable, ? extends Serializable>)res.getJobContext().getAttributes();
boolean forceLocDep = internal || !ctx.deploy().enabled();
try {
MarshallerUtils.jobReceiverVersion(node.version());
req = new GridJobExecuteRequest(
ses.getId(),
res.getJobContext().getJobId(),
ses.getTaskName(),
ses.getUserVersion(),
ses.getTaskClassName(),
loc ? null : U.marshal(marsh, res.getJob()),
loc ? res.getJob() : null,
ses.getStartTime(),
timeout,
ses.getTopology(),
loc ? ses.getTopologyPredicate() : null,
loc ? null : U.marshal(marsh, ses.getTopologyPredicate()),
loc ? null : U.marshal(marsh, ses.getJobSiblings()),
loc ? ses.getJobSiblings() : null,
loc ? null : U.marshal(marsh, sesAttrs),
loc ? sesAttrs : null,
loc ? null : U.marshal(marsh, jobAttrs),
loc ? jobAttrs : null,
ses.getCheckpointSpi(),
dep.classLoaderId(),
dep.deployMode(),
continuous,
dep.participants(),
forceLocDep,
ses.isFullSupport(),
internal,
subjId,
affCacheIds,
affPartId,
mapTopVer,
ses.executorName());
}
finally {
MarshallerUtils.jobReceiverVersion(null);
}
//如果是本地调用,则直接执行
if (loc)
ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
//远程调用,则发送到GridTopic
else {
byte plc;
if (internal)
plc = MANAGEMENT_POOL;
else {
Byte ctxPlc = getThreadContext(TC_IO_POLICY);
if (ctxPlc != null)
plc = ctxPlc;
else
plc = PUBLIC_POOL;
}
// Send job execution request.
ctx.io().sendToGridTopic(node, TOPIC_JOB, req, plc);
if (log.isDebugEnabled())
log.debug("Sent job request [req=" + req + ", node=" + node + ']');
}
if (!loc)
ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class);
}
else
U.warn(log, "Job timed out prior to sending job execution request: " + res.getJob());
}
}
catch (IgniteCheckedException e) {
IgniteException fakeErr = null;
try {
boolean deadNode = e instanceof ClusterTopologyCheckedException || isDeadNode(res.getNode().id());
// Avoid stack trace if node has left grid.
if (deadNode) {
U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +
"will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
fakeErr = new ClusterTopologyException("Failed to send job due to node failure: " + node, e);
}
else
U.error(log, "Failed to send job request: " + req, e);
}
catch (IgniteClientDisconnectedCheckedException e0) {
if (log.isDebugEnabled())
log.debug("Failed to send job request, client disconnected [node=" + node +
", taskName=" + ses.getTaskName() + ", taskSesId=" + ses.getId() + ", jobSesId=" +
res.getJobContext().getJobId() + ']');
fakeErr = U.convertException(e0);
}
GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),
res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
if (fakeErr == null)
fakeErr = U.convertException(e);
fakeRes.setFakeException(fakeErr);
onResponse(fakeRes);
}
}
如果是发送本地的,则新建 GridJobWorker , 然后执行初始化 , run -> body() -> execute0()
res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() {
@Nullable @Override public Object call() {
try {
if (internal && ctx.config().isPeerClassLoadingEnabled())
ctx.job().internal(true);
return job.execute();
}
finally {
if (internal && ctx.config().isPeerClassLoadingEnabled())
ctx.job().internal(false);
}
}
});
最终执行的地方在这里, 调用了job的execute()方法, 而job 是 ComputeJob 的实现类 , 是GridJobExecuteRequest 中的getJob
再次回到 sendRequest , 追根溯源 ,定义GridJobExecuteRequest 的地方 , 第7个参数是 ComputeJob, 追到 processMappedJobs
查看上面写的 U.wrapThreadLoader, 这个方法将task转成了computeJob, 实现类 C2, 而C2里面的execute方法如下
/** {@inheritDoc} */
@Override public Object execute() {
try {
return c.call();
}
catch (Exception e) {
throw new IgniteException(e);
}
}
也就是说,本地执行,就是调用callable里面的call方法
如果是发送到远程服务器 则调用
// Send job execution request.
ctx.io().sendToGridTopic(node, TOPIC_JOB, req, plc);
/**
* @param node Destination node.
* @param topic Topic to send the message to.
* @param topicOrd GridTopic enumeration ordinal.
* @param msg Message to send.
* @param plc Type of processing.
* @param ordered Ordered flag.
* @param timeout Timeout.
* @param skipOnTimeout Whether message can be skipped on timeout.
* @param ackC Ack closure.
* @param async If {@code true} message for local node will be processed in pool, otherwise in current thread.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
private void send(
ClusterNode node,
Object topic,
int topicOrd,
Message msg,
byte plc,
boolean ordered,
long timeout,
boolean skipOnTimeout,
IgniteInClosure<IgniteException> ackC,
boolean async
) throws IgniteCheckedException {
assert node != null;
assert topic != null;
assert msg != null;
assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
assert topicOrd >= 0 || !(topic instanceof GridTopic) : msg;
GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
if (locNodeId.equals(node.id())) {
assert plc != P2P_POOL;
CommunicationListener commLsnr = this.commLsnr;
if (commLsnr == null)
throw new IgniteCheckedException("Trying to send message when grid is not fully started.");
if (ordered)
processOrderedMessage(locNodeId, ioMsg, plc, null);
else if (async)
processRegularMessage(locNodeId, ioMsg, plc, NOOP);
else
processRegularMessage0(ioMsg, locNodeId);
if (ackC != null)
ackC.apply(null);
}
else {
if (topicOrd < 0)
ioMsg.topicBytes(U.marshal(marsh, topic));
try {
if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);
else
getSpi().sendMessage(node, ioMsg);
}
catch (IgniteSpiException e) {
if (e.getCause() instanceof ClusterTopologyCheckedException)
throw (ClusterTopologyCheckedException)e.getCause();
if (!ctx.discovery().alive(node))
throw new ClusterTopologyCheckedException("Failed to send message, node left: " + node.id(), e);
throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
"TCP connection cannot be established due to firewall issues) " +
"[node=" + node + ", topic=" + topic +
", msg=" + msg + ", policy=" + plc + ']', e);
}
}
}
发送数据使用了类 GridTcpNioCommunicationClient, 实现接口 GridCommunicationClient, 此接口还有一个实现类
GridShmemCommunicationClient
核心方法2 processDelayedResponses 这个方法debug了一下,发现没有什么用处,从队列里拿到的数据是null
这个方法从队列 delayedRess (ConcurrentLinkedDeque)中获取返回数据
GridJobExecuteResponse res = delayedRess.poll();
poll()方法表示,如果队列是空,则返回null, 那么要想取到数据,就必然要确保delayedRess队列有值, 下面分析一下这个response是在何时放入这个队列的
找到位置 , 通过debug , 发现 processMappedJobs 这个方法最后的 processDelayedResponses();
是有作用的,结果就是通过这个方法拿到的.
下面重新梳理一下整个调用过程,及结果的获取
IgniteComputeImpl -> call(IgniteCallable<R> job) 有返回值
IgniteComputeImpl -> callAsync0(IgniteCallable<R> job) 有返回值 IgniteInternalFuture<R>
GridClosureProcessor-> callAsync 返回 ComputeTaskInternalFuture
GridTaskProcessor -> ComputeTaskInternalFuture<R> execute
GridTaskProcessor -> startTask 返回 ComputeTaskInternalFuture 这个是最后的返回, 里面执行的是taskWorker.run()
也就是说在执行startTask的时候,后面的返回结果会放进 ComputeTaskInternalFuture ,异步等待结果
在定义GridTaskWorker的时候, ComputeTaskInternalFuture的实例放进了 GridTaskWorker 中
ComputeTaskInternalFuture 的get方法会判断状态,当状态是空,或者不是错误的时候,返回state,这个state就是结果
在onResponse方法中
下面的代码就是反序列化结果的地方
Object res0 = loc ? res.getJobResult() : U.unmarshal(marsh, res.getJobResultBytes(),
U.resolveClassLoader(clsLdr, ctx.config()));
在上一篇文章中 https://mp.csdn.net/postedit/89333644
报错 Cluster group is empty 就是因为这个返回结果中的list显示9个,但实际只有一个,其余是空
继续在上一篇中分析问题的原因.这篇只分析调用过程