Flume 1.7 源码分析(一)源码编译
Flume 1.7 源码分析(二)整体架构
Flume 1.7 源码分析(三)程序入口
Flume 1.7 源码分析(四)从Source写数据到Channel
Flume 1.7 源码分析(五)从Channel获取数据写入Sink
6 从Channel获取数据写入Sink
6.1 Sink部分
Sink部分主要分为以下3个步骤:
1. 由SinkRunner不断调用SinkProcessor的process方法。
2. 根据配置的SinkProcessor的不同,会使用不同的策略来选择sink。SinkProcessor有3种,默认是DefaultSinkProcessor。
3. 调用选择的sink的process方法。
6.1.1 Sink的Process方法
以LoggerSink为例进行说明。这个方法来自Sink接口,主要用于取出数据进行处理,如果失败则回滚(takeList中内容退回quene):
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
try {
transaction.begin();
event = channel.take();//从channel中获取一条数据
if (event != null) {
if (logger.isInfoEnabled()) {
logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
//输出event到日志
}
} else {
result = Status.BACKOFF;
}
transaction.commit();//执行提交操作
} catch (Exception ex) {
transaction.rollback();//执行回滚操作
throw new EventDeliveryException("Failed to log event: " + event, ex);
} finally {
transaction.close();
}
return result;
}
6.2 Channel部分
6.2.1 doTake方法
这个方法中主要是从queue中取出事件,放到takeList中。
protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();
//获取take列表容量的许可,如果没有则报异常。
if (takeList.remainingCapacity() == 0) {
throw new ChannelException("");
}
//尝试获取queue数量的许可,如果没有则代表没有数据可以取,直接返回。
if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
return null;
}
Event event;
synchronized (queueLock) {
event = queue.poll();//从queue中取出一条数据
}
Preconditions.checkNotNull(event, "");
takeList.put(event);//放到takeList中
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
takeByteCounter += eventByteSize;//设置计数器
return event;
}
6.2.2 doCommit方法
前面说到put和take操作的提交都是通过这个方法来提交的。
这个步骤要做的事情有:
1. putList放入queue,完成后就代表eventList->putList->queue这个步骤完成。
2. 假如doTake过程没报错(能进到这个方法说明没报错),说明sink那边已经获取到了全部的event,这时可直接清空takeList,代表queuetakeList & sink这个步骤完成。
综上,两个事情合并在一起的话,要做的就是,把putList放入queue再清空takeList。
protected void doCommit() throws InterruptedException {
int remainingChange = takeList.size() - putList.size();
if (remainingChange < 0) {
if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
throw new ChannelException("");
}
if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized (queueLock) {
if (puts > 0) {
while (!putList.isEmpty()) {
if (!queue.offer(putList.removeFirst())) {
throw new RuntimeException("");
}
}
}
putList.clear();
takeList.clear();
}
//后面是重新设置相关计数器
}
这个方法一开始去比较takeList和putList的容量差,是为了简化申请许可的过程。正常的流程是清空takeList,释放takeList.size个许可,再申请putList.size个许可,它是两个步骤合并起来的。
6.2.3 doRollback方法
与doCommit方法类似,这里的回滚,也分为2种情况:
- 由take操作引起的
该transaction的流程如下:queue->takeList & sink,所以回滚操作要做的事情就是:把takeList放回queue。
- 由put操作引起的
该transaction的流程如下:eventList->putList->queue,由于doPut和doCommit执行出现异常就直接跳出了,还没执行清空语句,也就是eventList还没有清空,所以可以直接清空putList,这样下次循环还会重新读取该eventList中的数据。
综上,两种操作要合为一个方法的话,就把takeList放回queue,然后清理putList就可以了。代码如下:
protected void doRollback() {
int takes = takeList.size();
synchronized (queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),"");
while (!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
putList.clear();
}
//后面是重新设置相关计数器
}
附注:从目前的代码看,在take操作的时候,应该已经获取到了部分数据,如果这个时候异常了,把takeList返回queue的话,会导致重复数据。