NioSelector和KafkaSelector有什么区别?
先说结论,KafkaSelector(org.apache.kafka.common.network.selector)是对NioSelector(java.nio.channels.Selector)的进一步封装。回想一下NioSelector,它参与了IO中的哪些过程?
1、创建一个通道,并将通道注册到NioSelector上,我们可以得到一个SelectionKey
2、轮询NioSelector中的ready集合,拿到对应的SelectionKey,并根据这个SelectionKey所关注的事件去执行对应的操作
实际上,KafkaSelector也是在调用NioSelector去执行这些操作,待补充……
一、创建连接
KafkaSelector创建连接,和普通的NioSelector并没有什么不同,首先创建一个通道,并将其设置为非阻塞式的长连接,设置完毕后,执行连接操作。
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);// 非阻塞模式
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);// 设置为长连接
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
socket.setSendBufferSize(sendBufferSize);// 设置SO_SNDBUF 大小
}
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
socket.setReceiveBufferSize(receiveBufferSize);// 设置 SO_RCVBUF 大小
}
socket.setTcpNoDelay(true);
boolean connected;
try {
connected = socketChannel.connect(address);// 因为是非阻塞模式,所以方法可能会在连接正式连接之前返回
} catch (UnresolvedAddressException e) {
socketChannel.close();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException e) {
socketChannel.close();
throw e;
}
创建完通道后,将其注册到NioSelector上,并关注OP_CONNECT,再以节点Id,SelectionKey来创建KafkaChannel,这里先不详细说明KafkaChannel,它是对通道的进一步封装。在创建完KafkaChannel后,将KafkaChannel与SelectionKey、节点ID做进一步绑定。
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 将当前这个socketChannel注册到nioSelector上,并关注OP_CONNECT事件
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 创建KafkaChannel
key.attach(channel);// 将channel绑定到key上
this.channels.put(id, channel);// 将 nodeId 和 Channel绑定
这样有一个好处,首先KafkaChannel中包含了节点ID与SelectionKey,而我们也可以根据节点ID来拿到KafkaChannel,同样可以根据SelectionKey来拿到KafkaChannel,这就意味着,我们只要拿到了KafkaChannel、SelectionKey、节点ID中的任意一个,都可以通过这些引用关系拿到彼此,从而进行相关操作。
二、预发送
实际上就是将要发送的ByteBuffer扔进KafkaChannel,此时并未进行IO操作,这里的Send对象,实际上就是对ByteBuffer的进一步封装,它主要包含了将要发往的节点ID、ByteBuffer大小、是否发送完毕等信息。我们这里根据节点ID,从我们刚才的channels中,取出KafkaChannel。
public void send(Send send) {
// 看看send要发的这个nodeId在不在
KafkaChannel channel = channelOrFail(send.destination());
try {
// 把数据扔进KafkaChannel中(只能放一个,放多个会报错),并关注write事件
channel.setSend(send);
} catch (CancelledKeyException e) {
// 失败了加一条node_id的失败记录
this.failedSends.add(send.destination());
close(channel);
}
}
三、进行IO操作
来到了我们比较熟悉的轮询环节,从NioSelector中取出所有SelectionKey进行轮询。
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false);// 处理I/O的核心方法
pollSelectionKeys(immediatelyConnectedKeys, true);
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// 创建连接时(connect)将kafkaChannel注册到key上,就是为了在这里获取
KafkaChannel channel = channel(key);
……………………
1、判断一下key 连接好了没有,因为我们用的是非阻塞连接,所以到了轮询阶段,还没有完成连接是正常的。
if (isImmediatelyConnected || key.isConnectable()) {
// finishConnect方法会先检测socketChannel是否建立完成,建立后,会取消对OP_CONNECT事件关注,//TODO 并开始关注OP_READ事件
if (channel.finishConnect()) {
this.connected.add(channel.id());// 将当前channel id 添加到已连接的集合中
this.sensors.connectionCreated.record();
} else {
continue;// 代表连接未完成,则跳过对此Channel的后续处理
}
}
2、身份验证(略过)
3、判断KafkaChannel有没有准备好,有没有关注OP_READ,能不能读之类的,并进行读操作。
这里有一个判断,就是判断当前的KafkaChannel是不是在StagedReceives里。我们往后看看,在从网络上读取数据时,我们会将KafkaChannel扔进StagedReceives里,也就是说,如果这个KafkaChannel已经在StagedReceives里了,那么代表它已经在读数据了。
if (channel.ready() // 连接的三次握手完成,并且 todo 权限验证通过
&& key.isReadable() // key是否关注了read事件
&& !hasStagedReceive(channel)) {// todo 这个通道不能是正在读数据的,因为在读的时候,会把这个channel扔进stagedReceives里面
NetworkReceive networkReceive;
/**
* 实际上这里就是分多次去一个channel取数据,直到取完,并将其保存在key:channel value:new ArrayDeque<NetworkReceive> 中
*/
while ((networkReceive = channel.read()) != null) {
addToStagedReceives(channel, networkReceive);
}
}
4、判断KafkaChannel有没有准备好,有没有关注OP_WRITE,并进行写操作
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
// 这里会将KafkaChannel的send字段发送出去,
// 如果未完成发送,或者没发完,则返回null
// 发送成功则返回send对象
if (send != null) {
this.completedSends.add(send);// 添加到completedSends集合
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
四、关闭空闲连接
在每一次IO操作完毕后,KafkaSelector都会调用一个方法,去关闭掉那些没怎么用的连接,实际上它就是一个基于时间戳的断连机制。
KafkaSelector中维护了一个哈希表,
LinkedHashMap<String, Long> lruConnections (new LinkedHashMap<>(16, .75F, true);
在每次进行IO操作时,将Key:节点ID,Value:当前时间戳扔进哈希表里面,在IO操作进行完毕时,检查一下,最大的那个节点,它的最后一次IO时间+connectionsMaxIdleNanos(创建KafkaSelector时指定),是否超过了当前的时间。
如果是,这个连接就会被关掉。
比如说connectionsMaxIdleNanos被指定成了1分钟,那么如果这个有序哈希表的最后一个节点的时间是一分钟之前,那么这个节点ID的通道将会被关掉。
private void maybeCloseOldestConnection() {
if (currentTimeNanos > nextIdleCloseCheckTime) {
if (lruConnections.isEmpty()) {
nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
} else {
Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet()
.iterator()
.next();
Long connectionLastActiveTime = oldestConnectionEntry.getValue();
nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
if (currentTimeNanos > nextIdleCloseCheckTime) {
String connectionId = oldestConnectionEntry.getKey();
if (log.isTraceEnabled()) {
log.trace("About to close the idle connection from " + connectionId
+ " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
}
disconnected.add(connectionId);
close(connectionId);
}
}
}
}