Kafka的Sender解析
目录
Kafka生产者源码解析之一KafkaProducer
Kafka生产者源码解析之二RecordAccumulator
Kafka生产者源码解析之三NIO
Kafka生产者源码解析之四Sender
Kafka生产者源码解析之五小结
回忆
上几篇我们从主线程出发介绍了kafka的消息发送的过程,NIO核心组件的初始化。本篇主要从另一个辅助线程出发,详细介绍此线程的作用。
线程start
此线程是在 KafkaProducer 的构造器里初始化,并开启的
// 创建一个Senfer线程
this.sender = newSender(logContext, kafkaClient, this.metadata);
// 给此线程一个名分
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// 将此线程嵌入到另一个线程中
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
// 开启线程,因为 KafkaThread 没有自己实现的run方法,所以走sender的run方法
this.ioThread.start();
...
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
// 调用父类的构造器,传入的是sender线程
super(runnable, name);
configureThread(name, daemon);
}
run方法
启动线程,就是运行run方法,那就让我们来揭开run方法的神秘面纱。
/**
* The main run loop for the sender thread
*/
public void run() {
// 通过这个日志我们知道 这是kafka 生产者操作I/O流的线程
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
// 一直循环,直到此线程被关闭,通过running标志控制
while (running) {
try {
// 此为核心run方法,2.3详细介绍
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 日志显示 开始关闭Kafka Producer I/O线程,发送剩余记录
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
// 判断强制关闭标识是否为false,强制关闭会忽略所有未完成的操作并强制关闭sender线程
// 2.1详细介绍this.accumulator.hasUndrained()
// 2.2详细介绍this.client.inFlightRequestCount()
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 如果强制关闭
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
// 2.4详细介绍
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
2.1 RecordAccumulator 的 hasUndrained 方法
由kafka源码解析第二篇我们知道,accumulator 就是 RecordAccumulator 类,看Sender的成员变量也可得知。
/* the record accumulator that batches records */
private final RecordAccumulator accumulator;
接下来我们看下hasUndrained方法。
/**
* Check whether there are any batches which haven't been drained
*/
// 官方解释: 检查是否还有没被抽干的批处理对象
public boolean hasUndrained() {
// 遍历消息累加器的batches变量,一个读写线程安全的map集合
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
// 获取集合中的双端队列
Deque<ProducerBatch> deque = entry.getValue();
synchronized (deque) {
// 如果队列不为空则返回true
if (!deque.isEmpty())
return true;
}
}
return false;
}
此方法主要是判断消息累加器中是否还有消息。
2.2 NetworkClient 的 inFlightRequestCount 方法
上一篇介绍NIO初始化的时候我们知道,Sender对象初始化的时候,是将 NetworkClient 对象传进去的。所以我们进入到此类中看下inFlightRequestCount方法
/**
* Get the number of in-flight requests
*/
@Override
// 获取已发送或正在发送但尚未收到响应的请求数
public int inFlightRequestCount() {
return this.inFlightRequests.count();
}
此方法主要判断已发送或正在发送但尚未收到响应的请求数
2.3 核心run方法
如果不是强制关闭,并且(消息累加器中还有消息或者已发送或正在发送但尚未收到响应的请求数大于0),那么我们执行核心run方法。如下:
/**
* Run a single iteration of sending
*
* @param now The current POSIX time in milliseconds
*/
void run(long now) {
// 判断维护事务对象是否为空
if (transactionManager != null) {
try {
if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
// Check if the previous run expired batches which requires a reset of the producer state.
// 检查上一个运行的批次是否过期,该批次需要重置生产者状态。
transactionManager.resetProducerId();
// 判断是否有事务id
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
transactionManager.transitionToFatalError(
new KafkaException("The client hasn't received acknowledgment for " +
"some previously sent messages and can no longer retry them. It isn't safe to continue."));
// 判断是否有未完成的请求
} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
// as long as there are outstanding transactional requests, we simply wait for them to return
// 3.1详细介绍此方法
client.poll(retryBackoffMs, now);
return;
}
// do not continue sending if the transaction manager is in a failed state or if there
// is no producer id (for the idempotent case).
// 如果事务管理是失败状态或者没有生产者id
if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
// 获取事务管理里面最后出现的异常
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
// 由于异常,终止所有未完成的批处理对象,即将队列里的批处理对象移除,将批处理对象的缓存回收等
maybeAbortBatches(lastError);
// 3.1详细介绍此方法
client.poll(retryBackoffMs, now);
return;
// 事务有异常
} else if (transactionManager.hasAbortableError()) {
// 终止任何还有消息的批处理对象
accumulator.abortUndrainedBatches(transactionManager.lastError());
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request: {}", e);
// 记录异常结果
transactionManager.authenticationFailed(e);
}
}
// 取出超时时间:下一批过期时间和检查数据可用性的延迟时间之间的较小值
long pollTimeout = sendProducerData(now);
//3.1详细介绍此方法
client.poll(pollTimeout, now);
}
3.1 NetworkClient 的 poll 方法
官方定义:对套接字执行实际的读写操作
/**
* Do actual reads and writes to sockets.
*
* @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
* must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
* metadata timeout
* @param now The current time in milliseconds
* @return The list of responses received
*/
@Override
public List<ClientResponse> poll(long timeout, long now) {
// 判断状态是活动状态,如果不是抛异常
ensureActive();
// 判断响应集合是否为空
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
// 将还没处理完的响应对象加入responses,并清空 this.abortedSends 响应对象集合
handleAbortedSends(responses);
// 完成响应,调用回调函数
completeResponses(responses);
return responses;
}
// 3.2详细介绍此方法
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
// 3.3详细介绍此方法
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
// 处理任务已完成的请求发送
handleCompletedSends(responses, updatedNow);
//处理所有已完成的接收,并使用接收到的响应更新响应列表。
handleCompletedReceives(responses, updatedNow);
//处理任何断开的连接
handleDisconnections(responses, updatedNow);
//记录任何新完成的连接
handleConnections();
//主要给 kafkaChanne 设置send为 NetworkSend 父类是 ByteBufferSend
handleInitiateApiVersionRequests(updatedNow);
//遍历所有超时的请求,并将请求关联的节点的连接断开
handleTimedOutRequests(responses, updatedNow);
// 完成响应,调用回调函数
completeResponses(responses);
return responses;
}
3.2 DefaultMetadataUpdater 的 maybeUpdate方法
为什么说上文中的 NetworkClient 类的metadataUpdater属性是DefaultMetadataUpdater 类型呢?我们来看下**NetworkClient **类的构造器里面关于metadataUpdater初始化的地方。
if (metadataUpdater == null) {
if (metadata == null)
throw new IllegalArgumentException("`metadata` must not be null");
this.metadataUpdater = new DefaultMetadataUpdater(metadata);
} else {
this.metadataUpdater = metadataUpdater;
}
通过之前文章我们知道 KafkaProducer 类的构造器里面会创建一个 NetworkClient 对象,metadataUpdater传入的是null。所以这里metadataUpdater就是新创建的 ** DefaultMetadataUpdater** 对象。
接下来我们看下它的maybeUpdate方法
@Override
public long maybeUpdate(long now) {
// should we update our metadata?
// 下一次更新集群信息的时间是当前信息过期和可以更新当前信息的时间的最大值(即后退时间已经过去);如果已请求更新,则过期时间为现在
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
//如果有一个元数据请求已经发送,但我们还没有收到响应,则返回用于等待服务器确认的单个请求的默认超时
long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0;
// 取两者较大值
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
if (metadataTimeout > 0) {
return metadataTimeout;
}
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
// 4.1详解此方法
Node node = leastLoadedNode(now);
if (node == null) {
// 由于没有可用的节点,因此放弃发送元数据请求
log.debug("Give up sending metadata request since no node is available");
// 返回重试创建与服务器的连接之前等待的时间(毫秒)
return reconnectBackoffMs;
}
return maybeUpdate(now, node);
}
/**
* Add a metadata request to the list of sends if we can make one
*/
private long maybeUpdate(long now, Node node) {
// 获取节点id
String nodeConnectionId = node.idString();
// 我们是否已经连接并准备好向给定的连接发送更多的请求?
if (canSendRequest(nodeConnectionId, now)) {
// 将正在处理元数据发送请求的标识置为true
this.metadataFetchInProgress = true;
MetadataRequest.Builder metadataRequest;
// 获取是否需要所有主题的元数据
if (metadata.needMetadataForAllTopics())
// 获取所有主题的元数据请求
metadataRequest = MetadataRequest.Builder.allTopics();
else
// 获取指定主题的元数据请求
metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),
metadata.allowAutoTopicCreation());
log.debug("Sending metadata request {} to node {}", metadataRequest, node);
// 向节点发送元数据请求
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
// 返回单个请求等待服务器确认的默认超时时间
return defaultRequestTimeoutMs;
}
// If there's any connection establishment underway, wait until it completes. This prevents
// the client from unnecessarily connecting to additional nodes while a previous connection
// attempt has not been completed.
// 判断当前是否至少有一个连接建立
if (isAnyNodeConnecting()) {
// Strictly the timeout we should return here is "connect timeout", but as we don't
// have such application level configuration, using reconnect backoff instead.
// 返回重试创建与服务器的连接之前等待的时间(毫秒)
return reconnectBackoffMs;
}
// 如果我们现在可以在该节点上启动一个连接(之前在该节点无连接),那么返回true
if (connectionStates.canConnect(nodeConnectionId, now)) {
// we don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node);
//启动该节点的连接,比如打开通道,注册到选择器等
initiateConnect(node, now);
// 返回重试创建与服务器的连接之前等待的时间(毫秒)
return reconnectBackoffMs;
}
// connected, but can't send more OR connecting
// In either case, we just need to wait for a network event to let us know the selected
// connection might be usable again.
return Long.MAX_VALUE;
}
4.1 DefaultMetadataUpdater 的 leastLoadedNode 方法
选择未完成请求最少的节点,该节点至少符合连接条件。
此方法将选择具有现有连接的节点,但如果所有现有连接都在使用,则可能选择尚未具有连接的节点。
此方法永远不会选择没有现有连接且在重新连接回退期间已断开连接的节点。
@Override
public Node leastLoadedNode(long now) {
// 获取当前集群所有节点
List<Node> nodes = this.metadataUpdater.fetchNodes();
if (nodes.isEmpty())
throw new IllegalStateException("There are no nodes in the Kafka cluster");
int inflight = Integer.MAX_VALUE;
Node found = null;
// 集合大小内随机取一个整数,作为偏移量
int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
// 计算节点id
int idx = (offset + i) % nodes.size();
// 获取该节点
Node node = nodes.get(idx);
// 获取该节点上现有的请求数
int currInflight = this.inFlightRequests.count(node.idString());
// 如果没有请求数 并且 已经建立好连接等待的话
if (currInflight == 0 && isReady(node, now)) {
// if we find an established connection with no in-flight requests we can stop right away
log.trace("Found least loaded node {} connected with no in-flight requests", node);
// 返回此节点
return node;
// 如果节点是可连接的 并且 请求数小于目前已知节点最小的请求数的话
} else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {
// otherwise if this is the best we have found so far, record that
// 更新最小请求数
inflight = currInflight;
// 记录此节点
found = node;
} else if (log.isTraceEnabled()) {
// 否则,打印日志,此节点不是我们要寻找的节点
log.trace("Removing node {} from least loaded node selection: is-blacked-out: {}, in-flight-requests: {}",
node, this.connectionStates.isBlackedOut(node.idString(), now), currInflight);
}
}
if (found != null)
log.trace("Found least loaded node {}", found);
else
log.trace("Least loaded node selection failed to find an available node");
// 返回记录的请求数最少的节点
return found;
}
3.3 network.Selector 的 poll 方法
官方定义:在每个连接上做任何可以做的非阻塞I/O操作。这包括完成连接、完成断开连接、启动新的发送或进行中的发送或接收。
@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
// 获取上一次poll时是否有足够内存读取缓存的数据,如果有则为true
boolean madeReadProgressLastCall = madeReadProgressLastPoll;
// 清除上一个poll结果
clear();
// 判断selectorKey是否为空
boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
timeout = 0;
// 判断内存是否会溢出
if (!memoryPool.isOutOfMemory() && outOfMemory) {
//we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
log.trace("Broker no longer low on memory - unmuting incoming sockets");
for (KafkaChannel channel : channels.values()) {
if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
channel.maybeUnmute();
}
}
outOfMemory = false;
}
/* check ready keys */
// 开始选择通道时间
long startSelect = time.nanoseconds();
// 选择就绪的通道,返回就绪key的个数
int numReadyKeys = select(timeout);
// 结束选择通道时间
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
// key大于0 或者 刚连接的通道不为空 或者已缓存数据的通道不为空
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
// 返回已选择键的就绪通道
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
// Poll from channels that have buffered data (but nothing more from the underlying socket)
// 从已缓存数据的通道进行轮询
if (dataInBuffers) {
// 去掉已选择就绪的key,因为它会在下面进行轮询
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
// 开始轮询已缓存数据的通道
// 4.2详细介绍此方法
pollSelectionKeys(toPoll, false, endSelect);
}
// Poll from channels where the underlying socket has more data
// 开始轮询已就绪的通道
// 4.2详细介绍此方法
pollSelectionKeys(readyKeys, false, endSelect);
// Clear all selected keys so that they are included in the ready count for the next select
// 清除已就绪的键,键对应通道
readyKeys.clear();
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();
} else {
madeReadProgressLastPoll = true; //no work is also "progress"
}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// Close channels that were delayed and are now ready to be closed
// 关闭延时的通道
completeDelayedChannelClose(endIo);
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
// 关闭连接,通过select结束时的时间来确保不会关闭任何刚刚在pollSelectionKeys中处理过的连接
maybeCloseOldestConnection(endSelect);
// Add to completedReceives after closing expired connections to avoid removing
// channels with completed receives until all staged receives are completed.
// 检查是否有暂存接收,并将其添加到completedreceive
addToCompletedReceives();
}
4.2 network.Selector 的 pollSelectionKeys方法
处理一组选择键上的任何就绪I/O
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
// 通过key获取到 kafkaChannel
KafkaChannel channel = channel(key);
long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
boolean sendFailed = false;
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
// 键是否是就绪状态
if (isImmediatelyConnected || key.isConnectable()) {
// 判断是否完成连接
if (channel.finishConnect()) {
// 添加连接的id到已连接id的集合中
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
// 通过键获取对应的通道
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
} else {
continue;
}
}
/* if channel is not ready finish prepare */
// 如果通道已经连接,但是没有准备好
if (channel.isConnected() && !channel.ready()) {
try {
// 使用配置的身份验证程序进行TransportLayer和身份验证,并将通道状态设置为准备状态
channel.prepare();
} catch (AuthenticationException e) {
sensors.failedAuthentication.record();
throw e;
}
if (channel.ready())
sensors.successfulAuthentication.record();
}
//如果通道已准备就绪,并且要从套接字或缓冲区中读取字节,并且没有先前的接收已转移或正在进行中,则从该通道中读取
attemptRead(key, channel);
if (channel.hasBytesBuffered()) {
//this channel has bytes enqueued in intermediary buffers that we could not read
//(possibly because no memory). it may be the case that the underlying socket will
//not come up in the next poll() and so we need to remember this channel for the
//next poll call otherwise data may be stuck in said buffers forever. If we attempt
//to process buffered data and no progress is made, the channel buffered status is
//cleared to avoid the overhead of checking every time.
//此通道在中间缓冲区列队中无法读取的字节(可能是因为没有内存)。在下一个poll()中底层套接字可能不会出现,因此我们需要记住下一个poll调用的通道,否则数据可能永远卡在所述缓冲区中。如果我们试图处理缓冲数据,但没有取得进展,那么通道缓冲状态将被清除,以避免每次检查的开销。
keysWithBufferedRead.add(key);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
// 如果通道已经准备好了,那么就向缓冲区中有空间的任何套接字写入数据。
if (channel.ready() && key.isWritable()) {
Send send;
try {
// 将buffer中的消息数据写入到通道中,返回ByteBufferSend
send = channel.write();
} catch (Exception e) {
sendFailed = true;
throw e;
}
if (send != null) {
// 记录已经完成的send
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid())
// 关闭连接
close(channel, CloseMode.GRACEFUL);
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else if (e instanceof AuthenticationException) // will be logged later as error by clients
log.debug("Connection with {} disconnected due to authentication exception", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
if (e instanceof DelayedResponseAuthenticationException)
maybeDelayCloseOnAuthenticationFailure(channel);
else
close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : CloseMode.GRACEFUL);
} finally {
maybeRecordTimePerConnection(channel, channelStartTimeNanos);
}
}
}
2.4 NetworkClient 的 close 方法
关闭net work 客户端
/**
* Close the network client
*/
@Override
public void close() {
// 将当前状态active设置成期望状态closing
state.compareAndSet(State.ACTIVE, State.CLOSING);
// 将当前状态closing设置成期望状态closed,因为两个状态不一样所以返回true
if (state.compareAndSet(State.CLOSING, State.CLOSED)) {
// 关闭 org.apache.kafka.common.network.Selector 对象,具体看下面代码
this.selector.close();
// 关闭当前节点元数据对象
this.metadataUpdater.close();
} else {
log.warn("Attempting to close NetworkClient that has already been closed.");
}
}
org.apache.kafka.common.network.Selector
/**
* Close this selector and all associated connections
*/
@Override
public void close() {
List<String> connections = new ArrayList<>(channels.keySet());
// 关闭NIO的所有channel
for (String id : connections)
close(id);
try {
// 关闭NIO的selector
this.nioSelector.close();
} catch (IOException | SecurityException e) {
log.error("Exception closing nioSelector:", e);
}
sensors.close();
// 关闭NIO的channel的创建者
channelBuilder.close();
}
2.5 小结
本篇主要从Sender线程出发,介绍了下整体工作流程,水平有限,有些细节地方没有解释,有些官方解释很全面就直接翻译了,还望海涵。如有不对之处还请指正。
下一篇主要针对kafka生产者做一个全面的小结。