Sender.run -> client.ready -> initiateConnect -> selector.connect
/* TODO 这个selector是kafka主机封装的一个selector
* 他是基于java NIO里面的selector去封装的。
*/
public class Selector implements Selectable {
public static final long NO_IDLE_TIMEOUT_MS = -1;
private static final Logger log = LoggerFactory.getLogger(Selector.class);
//这个对象就是javaNIO里面的Selector
//Selector是负责网络的建立,发送网络请求,处理实际的网络IO。
//所以他是最最核心的这么样的一个组件。
private final java.nio.channels.Selector nioSelector;
//broker 和 KafkaChannel(SocketChnnel)的映射
//这儿的kafkaChannel大家暂时可以理解为就是SocketChannel
//代表的就是一个网络连接。
private final Map<String, KafkaChannel> channels;
//已经完成发送的请求
private final List<Send> completedSends;
//已经接收到的,并且处理完了的响应。
private final List<NetworkReceive> completedReceives;
//已经接收到了,但是还没来得及处理的响应。
//一个连接,对应一个响应队列
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
//没有建立连接的主机
private final List<String> disconnected;
//完成建立连接的主机
private final List<String> connected;
//建立连接失败的主机。
private final List<String> failedSends;
private final Time time;
private final SelectorMetrics sensors;
private final String metricGrpPrefix;
private final Map<String, String> metricTags;
private final ChannelBuilder channelBuilder;
private final int maxReceiveSize;
private final boolean metricsPerConnection;
private final IdleExpiryManager idleExpiryManager;
//TODO 我们认为这个KafkaChannel就是对javaNIO里面的SocketChannel
//进行了封装。
public class KafkaChannel {
//一个broker就对应一个KafkaChannel
//这个id就是broker的id
private final String id;
//我们推测这个里面应该会有SocketChannel
private final TransportLayer transportLayer;
private final Authenticator authenticator;
private final int maxReceiveSize;
//接收到的响应
private NetworkReceive receive;
//发送出去的请求
private Send send;
public interface TransportLayer{
/**
* returns underlying socketChannel
*
* 这个核心的组件,就是javaNIO里面的SocketChannel
*/
SocketChannel socketChannel();
}
网络连接流程
initiateConnect
selector.connect