引言
上篇文章中主要对Netty
启动过程中,涉及的NioEventLoopGroup
相关细节进行了详细介绍,本篇文章主要介绍启动过程中其他一些初始化步骤。
Channel
的创建和初始化过程- 总结
一、Channel
的创建和初始化过程
Channel
是Netty
对于网络实现层的抽象,可以对应于JDK
中的NIO
包实现,Netty
服务端的Channel
类型是 NioServerSocketChannel
。下面来分析NioServerSocketChannel
的创建和初始化。
1、端口绑定
启动过程中重要的操作为绑定端口,而NioServerSocketChannel
的创建就是在此过程中进行的。ServerBootStrap
的bind()
方法
ChannelFuture f = b.bind(PORT).sync();
查看对应的源码,实际调用的是AbstractBootstrap
类中的bind
方法:
...
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
...
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
...
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//若注册成功,则进行bind操作
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
2、创建NioServerSocketChannel
initAndRegister()
中完成NioServerSocketChannel
初始化以及注册的操作,当初始化以及注册完成后,进行实际的绑定操作,我们继续往下看:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//channel工厂创建新的channel
channel = channelFactory.newChannel();
//初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//注册channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
创建channel
,实际调用的是ReflectiveChannelFactory
中的newChannel
方法。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
...
}
从上述代码中我们可以看出,clazz.getConstructor().newInstance()
通过反射的方式创建了Channel
。
Channel
创建好之后,进行Channel
的初始化操作,
@Override
void init(Channel channel) {
//设置channle选项
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
//设置channel属性
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
二、总结
本文主要讲述了NioServerSocketChannel
创建过程以及源码分析,后面文章继续说明channel创建之后的操作,如等待连接等等。