一、Channel简介
Channel是Netty的网络抽象类,聚合了很多的功能,包括网络读、写,客户端发起连接,主动关闭连接,链路关闭,获取通讯双方的网络地址等。也包括了Netty框架自身的一些功能,如获取该Channel的EventLoop,获取缓冲分配器ByteBufferAllocator和pipeline等等。
二、Channel功能介绍
以上是Channel接口提供的一些方法,很多比较重要,所以这里简单的进行介绍:
- alloc():获取当前Channel使用的ByteBuf缓冲区分配器
- read():从Channel中读取数据,如果数据被读取成功,那么就会触发
io.netty.channel.ChannelInboundHandler#channelRead
方法,读取操作调用完成之后,会调用io.netty.channel.ChannelInboundHandler#channelReadComplete
- flush():将之前写入到唤醒消息数组的数据全部写到目标Channel中;
- disconnect(ChannelPromise var1):请求断开远程的连接并使用ChannelPromise来处理操作的结果
- bind(SocketAddress localAddress):绑定指定的本地Socket地址
- config():获取当前Channel的配置信息
- isOpen():判断Channel是否已经打开
- isRegistered():判断当前Channel是否已经注册到EventLoop上;
- isActive():是否已经处于激活状态
- metadata():获取当前Channel的元数据信息,包括TCP参数配置等等
- localAddress():获取当前Channel绑定的本地地址;
- remoteAddress():获取当前Channel绑定的远程地址;
- eventLoop():获取当前Channel注册到的EventLoop,本质上是Reactor线程组;
- parent():对于服务端来说,会返回空,对于客户端来说,会返回创建他的ServerSocketChannel;
- id():返回一个ChannelId对象,是Channel的唯一标志,生成策略如下:
- 机器的MAC地址,可以代表全局唯一的信息;
- 当前的进程ID;
- 当前系统时间的毫秒数;
- 当前系统时间纳秒数;
- 32位整形随机数;
- 32位自增的序列数;
参看DefaultChannelId原代码:
private DefaultChannelId() {
this.data = new byte[MACHINE_ID.length + 4 + 4 + 8 + 4];
int i = 0;
System.arraycopy(MACHINE_ID, 0, this.data, i, MACHINE_ID.length);
int i = i + MACHINE_ID.length;
i = this.writeInt(i, PROCESS_ID);
i = this.writeInt(i, nextSequence.getAndIncrement());
i = this.writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
int random = PlatformDependent.threadLocalRandom().nextInt();
i = this.writeInt(i, random);
assert i == this.data.length;
this.hashCode = Arrays.hashCode(this.data);
}
三、Channel继承体系
这里分别查看服务端的NioServerSocketChannel和客户端的NioSocketChannel
(一)NIOServerSocketChannel继承体系:
(二)NioSocketChannel继承体系:
(三)AbstractChannel
1. 成员变量
可以看到,除了前面定义的一些异常之外,就是对Channel的一些方法对应的变量,所以这里不做过多讲解。
2. 成员方法
比较重要的是:
- write(Object msg):写出数据,不过并没有直接写出,而是放到了环形缓冲区,当调用了flush方法的时候,才会真正的写出
- writeAndFlush(Object msg):写出数据,同时flush
- connect:建立连接
(二)AbstractNioChannel
1. 成员变量
- protected final int readInterestOp:代表的是SelectionKey的OP_READ;
- volatile SelectionKey selectionKey:注册到EventLoop返回的选择键,由于Channel会面临并发写情况,需要修改selectionKey,需要让其他线程知道,所以使用volatile修饰;
- private ChannelPromise connectPromise:处理注册结果;
- ScheduledFuture<?> connectTimeoutFuture:连接超时定时器
2. 核心API源码
(1) Channel的注册
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if (selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
这里定义了一个boolean的变量selected代表是否注册成功,然后循环调用SelectableChannel的register方法进行注册,直到成功为止。
注册的时候需要指定监听的网络操作位来表示Channel对哪些网络事件感兴趣,网络事事件定义在SelectionKey里面,定义如下:
- public static final int OP_READ = 1 << 0:读操作位
- public static final int OP_WRITE = 1 << 2:写操作位
- public static final int OP_CONNECT = 1 << 3:客户端连接服务器操作位
- public static final int OP_ACCEPT = 1 << 4:服务端接受客户端操作连接为
我们看到,这里面传入的是0,表示不对任何事件感兴趣,仅仅是完成事件的注册。注册时候可以携带附件(这里携带的是当前Channel实例),后续Channel接收到之后,可以重新获取到之前的附件信息进行处理。
(三)AbstractNioByteChannel
1. 成员变量
就一个flushTask,用来负责继续写半包消息。
2. 核心API源码
(1) doWrite方法
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = this.config().getWriteSpinCount();
do {
Object msg = in.current(); //①
if (msg == null) { //②
this.clearOpWrite();
return;
}
writeSpinCount -= this.doWriteInternal(in, msg); //③
} while(writeSpinCount > 0); //④
this.incompleteWrite(writeSpinCount < 0);
}
这里会从循环消息队列中弹出一条消息 ①。判断是否为null。
如果为空,则证明数组中所有等待发送的消息全部发送完毕,就清空半包标志 ②,并且退出循环。清空半包标志代码如下:
protected final void clearOpWrite() {
SelectionKey key = this.selectionKey();
if (key.isValid()) {
int interestOps = key.interestOps(); // ⑤
if ((interestOps & 4) != 0) { //⑥
key.interestOps(interestOps & -5); //⑦
}
}
}
从当前的SelectionKey中获取网路操作位 ⑤,然后与Selection_OP_WRITE(4)做按位与 ⑥,如果结果不为0,证明SelectionKey是isWritable的,那么就需要进行清除写操作位,清除的方法很简单,就是对操作位和 ~Selection_OP_WRITE(-5)进行按位与运算 ⑦,说白了,就是把写操作位置成0。
如果不为空,进行写操作 ③。查看doWriteInternal方法可以发现,写出的是ByteBuf或者是FileRegion,这里不做过多赘述
(四)AbstractNioMessageChannel
1. 成员变量
- inputShutdown:用来表示输入是否已经关闭
2. 核心API解析
比较重要的也是doWrite方法,如下:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SelectionKey key = this.selectionKey();
int interestOps = key.interestOps();
while(true) {
Object msg = in.current();
if (msg == null) {
if ((interestOps & 4) != 0) {
key.interestOps(interestOps & -5);
}
break;
}
try {
boolean done = false;
for(int i = this.config().getWriteSpinCount() - 1; i >= 0; --i) {
if (this.doWriteMessage(msg, in)) {
done = true;
break;
}
}
if (!done) {
if ((interestOps & 4) == 0) {
key.interestOps(interestOps | 4);
}
break;
}
in.remove();
} catch (Exception var7) {
if (!this.continueOnWriteError()) {
throw var7;
}
in.remove(var7);
}
}
}
其实核心逻辑和AbstractNioByteChannel类似,不同的地方在于写出的是POJO对象,而不是ByteBuf和FieldRegion,具体由子类对doWriteMessage的实现来决定。
(五)NioServerSocketChannel
1. 成员变量
- ChannelMetadata METADATA:元数据信息;
- ServerSocketChannelConfig config:用于配置TCP参数的配置对象;
2. 核心API解析
(1)打开ServerSocketChannel通道
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel(); // ①
} catch (IOException var2) {
throw new ChannelException("Failed to open a server socket.", var2);
}
}
- 默认情况下①处使用的provider是KQueueSelectorProvider
public class KQueueSelectorProvider extends SelectorProviderImpl {
public KQueueSelectorProvider() {
}
public AbstractSelector openSelector() throws IOException {
return new KQueueSelectorImpl(this);
}
}
- 我们发现,并没有openServerSocketChannel方法,所以实际上是在父类SelectorProviderImpl中
public abstract class SelectorProviderImpl extends SelectorProvider {
......
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this); //这是NIO的ServerSocket实现
}
......
}
(2)读取数据
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(this.javaChannel()); //①
try {
if (ch != null) { //
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable var6) {
logger.warn("Failed to create a new channel from an accepted socket.", var6);
try {
ch.close();
} catch (Throwable var5) {
logger.warn("Failed to close a socket.", var5);
}
}
return 0;
}
- 接受客户端的SocketChannel ①;
- 如果不为空,则利用接收到的SocketChannel创建NioSocketChannel;
- 把创建好的NioSocketChannel添加到
List<Object>
中(在NIOServerSocketChannel中到这里就算完成了); - 返回1,表示消息读取成功;
- 也就是说,NioServerSocketChannel的读取操作就是接受客户端的连接,创建NioSocketChannel对象。
(六)NioSocketChannel
1. 成员变量
- SocketChannelConfig config : 客户端TCP配置实例
2. 核心API解析
四、Unsafe介绍
Unsafe实际上Channel接口的辅助接口,它不应该被用户代码直接调用,而不是说他不安全。实际上Channel接口的很多操作都是由它来实现的。