阅读须知
- Netty版本:4.1.14.Final
- 文章中使用/* */注释的方法会做深入分析
正文
提起Channel,相信熟悉java网络编程的同学都不陌生,Channel是JDK NIO类库的重要组成部分,用于非阻塞的I/O操作,同样的,Netty也提供了自己的Channel实现,为什么Netty不使用JDK NIO原生的Channel而要自己实现新的Channel呢?我们引用《Netty权威指南》这本书中对这个问题的解答:
- JDK的SocketChannel和ServerSocketChannel没有统一的Channel接口供业务开发者使用,对于用户而言,没有统一的操作视图,使用起来并不方便。
- JDK的SocketChannel和ServerSocketChannel和主要职责就是网络I/O操作,由于他们是SPI接口,由具体的虚拟机厂家提供,所以通过集成SPI功能类来扩展其功能的难度很大,直接实现ServerSocketChannel和SocketChannel抽象类,其工作量和重新开发一个新的Channel功能类是差不多的。
- Netty的Channel需要能够跟Netty的整体架构融合在一起,例如I/O模型、基于ChannelPipeline的定制模型,以及基于元数据描述配置化的TCP参数等,这些JDK的SocketChannel和ServerSocketChannel都没有提供,需要重新封装。
- 自定义的Channel,功能实现更加灵活。
基于上述4个原因,Netty重新设计了Channel接口,并且给予了很多不同的实现。它的设计原理比较简单,但是功能却比较繁杂,主要的设计理念如下:
- 在Channel接口层,采用Facade模式进行统一封装,将网络I/O操作、网络I/O相关联的其他操作封装起来,统一对外提供。
- Channel接口的定义尽量大而全,为SocketChannel和ServerSocketChannel提供统一视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度上实现功能和接口的重用。
- 具体实现采用聚和而非包含的方式,将相关的功能类聚和在Channel中,由Channel统一负责分配和调度,功能实现更加灵活。
Channel是与网络socket或能够进行I / O操作(如读取,写入,连接和绑定)的组件的纽带。
Channel的功能比较繁杂,我们在ServerBootstrap源码分析文章的开篇部分写了一个小的启动demo,demo中使用了NioServerSocketChannel,我们就以NioServerSocketChannel作为入口进行分析,首先我们来看NioServerSocketChannel的层次结构图:
构造函数:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
这里的newSocket方法会返回JDK的ServerSocketChannel实例,实现为ServerSocketChannelImpl。
NioServerSocketChannel:
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
这里的javaChannel().socket()会调用JDK的ServerSocketChannelImpl的socket方法返回JDK的ServerSocket实例。
AbstractNioMessageChannel:
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
AbstractNioChannel:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
AbstractChannel:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe(); // NioMessageUnsafe
pipeline = newChannelPipeline(); // DefaultChannelPipeline
}
我们在分析ServerBootstrap源码时看到了启动过程中对channel.bind方法的调用,我们来分析一下bind方法的实现:
AbstractChannel:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
这里的pipeline也就是我们刚刚看到的在构造函数中初始化的DefaultChannelPipeline。
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
这里的tail对象是一个ChannelHandlerContext接口的实例,它的作用是作为ChannelHandler和ChannelPipeline交互的上下文。
AbstractChannelHandlerContext:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
return promise; // 如果promise不合法,取消
}
// 这里会取出第一个outbound为true的ChannelHandlerContext,初始化pipeline时初始化了两个ChannelHandlerContext
// tail和head,tail的outbound属性赋值为false,head的outbound属性赋值为true,所以这里首次取出的是head
final AbstractChannelHandlerContext next = findContextOutbound();
// 获取线程组,这里获取到的是boss线程组
EventExecutor executor = next.executor();
// 判断当前线程是否正在事件循环中执行
if (executor.inEventLoop()) {
/* 绑定调用 */
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
/* 绑定调用 */
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
AbstractChannelHandlerContext:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
// 尽可能判断是否调用了ChannelHandler的handlerAdded方法
// handlerAdded方法会在将ChannelHandler添加到上下文并准备好处理事件之后调用
if (invokeHandler()) {
try {
/* 绑定 */
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
// 失败通知监听器
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise); // 递归
}
}
DefaultChannelPipeline.HeadContext:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
这里的unsafe来自于channel的unsafe。示例中我们使用的NioServerSocketChannel的unsafe为NioMessageUnsafe实例。
AbstractChannel.AbstractUnsafe:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 断言Channel未注册并且当前线程正在事件循环中执行
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
// 判断Channel的活跃状态,示例中使用的NioServerSocketChannel调用的是JDK的ServerSocket的isBound方法判断绑定状态
boolean wasActive = isActive();
try {
/* 绑定操作 */
doBind(localAddress);
} catch (Throwable t) {
// 异常将指定的promise标记为失败
safeSetFailure(promise, t);
/* 如果Channel未开启,则关闭Channel */
closeIfClosed();
return;
}
// 判断Channel是否是在绑定后变为活跃状态
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// Channel激活,调用ChannelInboundHandler的channelActive方法
pipeline.fireChannelActive();
}
});
}
// 将指定的promise标记为成功
safeSetSuccess(promise);
}
这里关于promise的操作我们会在Promise源码分析的文章中进行详细分析,示例中我们使用的是NioServerSocketChannel,我们来看NioServerSocketChannel对doBind方法的实现:
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
这里就是调用JDK原生的函数来实现绑定操作,java7以上的版本调用我们上文提到的ServerSocketChannelImpl的bind方法实现,java7以下的版本调用ServerSocket的bind方法实现。
AbstractChannel.AbstractUnsafe:
protected final void closeIfClosed() {
if (isOpen()) {
return;
}
close(voidPromise());
}
AbstractChannel.AbstractUnsafe:
public final void close(final ChannelPromise promise) {
assertEventLoop();
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}
AbstractChannel.AbstractUnsafe:
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
// 设置promise不能被取消
if (!promise.setUncancellable()) {
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
if (!(promise instanceof VoidChannelPromise)) {
// 这意味着之前调用了close(),所以我们只注册一个监听器并返回
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
if (closeFuture.isDone()) {
// 已经关闭,将指定的promise标记为成功
safeSetSuccess(promise);
return;
}
final boolean wasActive = isActive();
// 不允许向outboundBuffer添加任何消息和刷新操作
this.outboundBuffer = null;
// 准备关闭Channel。如果此方法返回Executor,则调用者必须调用Executor的execute方法
// 并在返回的Executor上调用doClose()。如果此方法返回null,则必须从调用方线程调用doClose()
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
doClose0(promise); /* 执行close */
} finally {
invokeLater(new Runnable() {
@Override
public void run() {
// 将所有排队的消息置失败
outboundBuffer.failFlushed(cause, notify);
// 关闭buffer,释放缓冲区资源
outboundBuffer.close(closeCause);
/* Channel停用并注销 */
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
doClose0(promise);
} finally {
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
AbstractChannel.AbstractUnsafe:
private void doClose0(ChannelPromise promise) {
try {
doClose();
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
NioServerSocketChannel:
protected void doClose() throws Exception {
javaChannel().close(); // JDK原生close方法
}
AbstractChannel.AbstractUnsafe:
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
deregister(voidPromise(), wasActive && !isActive());
}
AbstractChannel.AbstractUnsafe:
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
safeSetSuccess(promise);
return;
}
invokeLater(new Runnable() {
@Override
public void run() {
try {
/* 将Channel从它的EventLoop中注销,默认空实现,子类覆盖 */
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
// Channel停用,调用ChannelInboundHandler的channelInactive方法
pipeline.fireChannelInactive();
}
if (registered) {
registered = false;
// Channel注销,调用ChannelInboundHandler的channelUnregistered方法
pipeline.fireChannelUnregistered();
}
safeSetSuccess(promise);
}
}
});
}
AbstractNioChannel:
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey()); // 取消SelectionKey
}
这里的SelectionKey同样来自于JDK的NIO。到这里,bind流程的源码分析就完成了。