Acker
//Acker相当于一个bolt,用于处理事件
public class Acker implements IBolt {
private RotatingMap<Object, AckObject> pending = null;
@Override
public void execute(Tuple input) {
Object id = input.getValue(0);
AckObject curr = pending.get(id);
String stream_id = input.getSourceStreamId();
if (Acker.ACKER_INIT_STREAM_ID.equals(stream_id)) {//处理开始追踪事件,放进自己的pending队列。只有spout会发送该事件
if (curr == null) {
curr = new AckObject();
curr.val = input.getLong(1);
curr.spout_task = input.getInteger(2);
pending.put(id, curr);
} else {
// bolt's ack first come
curr.update_ack(input.getValue(1));
curr.spout_task = input.getInteger(2);
}
} else if (Acker.ACKER_ACK_STREAM_ID.equals(stream_id)) {//bolt发送过来的ack事件
if (curr != null) {
curr.update_ack(input.getValue(1));//bolt发送过来的值是它要ack的tup的ID和它产生的tup的ID的异或值。
} else {
// two case
// one is timeout
// the other is bolt's ack first come
curr = new AckObject();
curr.val = input.getLong(1);
pending.put(id, curr);
}
} else if (Acker.ACKER_FAIL_STREAM_ID.equals(stream_id)) {//bolt发送过来的失败
if (curr == null) {
// do nothing
// already timeout, should go fail
return;
}
curr.failed = true;
} else {
LOG.info("Unknow source stream, " + stream_id + " from task-" + input.getSourceTask());
return;
}
Integer task = curr.spout_task;
if (task != null) {
if (curr.val == 0) {//如果校验值为0,则证明发送成功
pending.remove(id);
List values = JStormUtils.mk_list(id);
collector.emitDirect(task, Acker.ACKER_ACK_STREAM_ID, values);
} else {
if (curr.failed) {//将失败的tup直接发送给对应的spout task
pending.remove(id);
List values = JStormUtils.mk_list(id);
collector.emitDirect(task, Acker.ACKER_FAIL_STREAM_ID, values);
}
}
} else {
}
// add this operation to update acker's ACK statics
collector.ack(input);
long now = System.currentTimeMillis();
if (now - lastRotate > rotateTime) {
lastRotate = now;
Map<Object, AckObject> tmp = pending.rotate();
LOG.info("Acker's timeout item size:{}", tmp.size());
}
}
}
Spout创建一个新的tuple的时候给acker发送消息
(spout-tuple-id, task-id)。
这是告诉acker, 一个新的spout-tuple出来了, 你跟踪一下,它是由id为task-id的task创建的(这个task-id在后面会被acker用来通知这个task:你的tuple处理成功了/失败了)。处理完这个消息之后, acker会在它的pending这个map(类型为TimeCacheMap)里面添加这样一条记录:
{spout-tuple-id {:spout-tasktask-id :valack-val)}
这就是acker对spout-tuple进行跟踪的核心数据结构, 对于每个spout-tuple所产生的tuple树的跟踪都只需要保存上面这条记录。acker后面会检查:val什么时候变成0,变成0, 说明这个spout-tuple产生的tuple都处理完成了。此时,acker将该条信息从自己的map里移除。
每个tuple在被ack的时候,会给acker发送一个消息,消息格式是(spout-tuple-id, tmp-ack-val),
Tuple处理失败的时候会给acker发送失败消息。acker会对该spout-tuple-id标记上失败。