KafkaConsumer这个类实现了同步commit和异步commit。
同步commit的实现函数如下:
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
offsets.forEach(this::updateLastSeenEpochIfNewer);
#调用commitOffsetsSync 继续
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
"committing offsets " + offsets);
}
} finally {
release();
}
}
kafka-trunk\clients\src\main\java\org\apache\kafka\clients\consumer\internals\ConsumerCoordinator.java
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
invokeCompletedOffsetCommitCallbacks();
if (offsets.isEmpty())
return true;
do {
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
#系统会一直等在这里,调用poll函数等到future 返回成功。
client.poll(future, timer);
// We may have had in-flight offset commits when the synchronous commit began. If so, ensure that
// the corresponding callbacks are invoked prior to returning in order to preserve the order that
// the offset commits were applied.
invokeCompletedOffsetCommitCallbacks();
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return true;
}
if (future.failed() && !future.isRetriable())
throw future.exception();
timer.sleep(retryBackoffMs);
} while (timer.notExpired());
return false;
}
而对于异步commit的实现如下:
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
log.debug("Committing offsets: {}", offsets);
offsets.forEach(this::updateLastSeenEpochIfNewer);
#调用commitOffsetsAsync来commit
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
} finally {
release();
}
}
kafka-trunk\clients\src\main\java\org\apache\kafka\clients\consumer\internals\ConsumerCoordinator.java
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
} else {
pendingAsyncCommits.incrementAndGet();
#这里添加了一个listerner,来分别添加成功请求时候的回调函数和失败请求时候的回调函数
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
pendingAsyncCommits.decrementAndGet();
doCommitOffsetsAsync(offsets, callback);
client.pollNoWakeup();
}
@Override
public void onFailure(RuntimeException e) {
pendingAsyncCommits.decrementAndGet();
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
new RetriableCommitFailedException(e)));
}
});
}
}
KafkaConsumer的同步commit和异步commit。
猜你喜欢
转载自blog.csdn.net/tiantao2012/article/details/88424194
今日推荐
周排行