阅读须知
- Netty版本:4.1.14.Final
- 文章中使用/* */注释的方法会做深入分析
正文
我们在之前的文章中分析了Bootstrap的源码,ServerBootstrap和Bootstrap都继承了AbstractBootstrap,所以它们有相似的逻辑,Bootstrap用户客户端,而ServerBootstrap用于服务端,我们还是先来看一个简单的服务端启动的方法:
private void bind(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
// ...
}
});
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
同样有为各个属性赋值的链式调用,这里的group方法传入了两个EventLoopGroup(它是一个线程组),也就是我们经常提到的boss线程组和worker线程组了,它们一个用于服务端接受客户端的连接,另一个用于进行SocketChannel的网络读写,它们也是我们经常听到的Reactor模式的核心,我们会在后续的文章中详细介绍它们的作用。下面我们来看绑定Channel的流程:
AbstractBootstrap:
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
AbstractBootstrap:
public ChannelFuture bind(SocketAddress localAddress) {
validate(); // 校验group和channelFactory不能为null
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
/* 绑定操作 */
return doBind(localAddress);
}
AbstractBootstrap:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 注册和初始化Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// 到这里说明注册成功
ChannelPromise promise = channel.newPromise();
/* 绑定操作 */
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 注册future几乎总是已经完成,但以防万一
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// EventLoop上的注册失败会导致ChannelPromise直接失败,导致我们尝试访问Channel的EventLoop时不会导致IllegalStateException。
promise.setFailure(cause);
} else {
// 注册成功
promise.registered();
/* 绑定操作 */
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
方法中调用的注册和初始化Channel的initAndRegister方法我们在分析Bootstrap源码时已经分析过,方法中会调用init方法初始化Channel,我们来看ServerBootstrap对init方法的实现:
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
// 将用户配置的option选项设置到Channel的ChannelConfig对象中
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
// 将用户配置的attr属性设置到Channel的Attribute属性中
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
// 这里为Channel popeline添加ChannelHandler的是用户指定的,示例中传入的是ChannelInitializer对象
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加接收器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
下面我们来看Channel绑定操作:
AbstractBootstrap:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 绑定channel,添加监听器,在失败时关闭Channel
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
Channel的绑定我们会用单独的文章分析,到这里,ServerBootstrap引导启动的源码分析就完成了。