本文为原创,转载请注明出处
netty4源码分析-socket
服务端启动的第一步必须先创建一个监听套接字ServerSocketChannel,该过程是由ChannelFuture f = b.bind(port)中的bind触发。下面详细分析其过程:
Bind源码如下,代码位于ServerBootstrap的父类AbstractBootstrap
//AbstractBootstrap public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }
validate()方法的作用为:校验:bossGroup、BootstrapChannelFactory、childHandler非空。如果childGroup为空,则复用bossGroup,将bossGroup赋值给childGroup。
接着来看doBind的逻辑:
//AbstractBootstrap private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regPromise = initAndRegister(); final Channel channel = regPromise.channel(); final ChannelPromise promise = channel.newPromise(); if (regPromise.isDone()) { doBind0(regPromise, channel, localAddress, promise); } else { regPromise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(future, channel, localAddress, promise); } }); } return promise; }
重点分析里面的initAndRegister()方法
//AbstractBootstrap final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelPromise regPromise = channel.newPromise(); group().register(channel, regPromise); if (regPromise.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regPromise; }
a)首先分析以下代码:
final Channel channel = channelFactory().newChannel()
channelFactory()方法返回之前创建的BootstrapChannelFactory,里面的newChannel()方法会根据反射创建一个ServerSocketChannel
//BootstrapChannelFactory public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
注:clazz是在服务端启动的这段代码(b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)中设置的。
clazz.newInstance()会调用NioServerSocketChannel的默认构造函数
// NioServerSocketChannel public NioServerSocketChannel() { super(null, newSocket(), SelectionKey.OP_ACCEPT); config = new DefaultServerSocketChannelConfig(this, javaChannel().socket()); } private static ServerSocketChannel newSocket() { try { return ServerSocketChannel.open(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
注意newSocket中的这行代码:
return ServerSocketChannel.open();
此处就是服务端监听套接字ServerSocketChannel创建的地方。
既然是使用NIO,那么设置创建的ServerSocketChannel为非阻塞是在哪个地方发生的呢?看下这行代码
super(null, newSocket(), SelectionKey.OP_ACCEPT);
它会对NioServerSocketChannel的父类进行初始化:NioServerSocketChannel的父类是AbstractNioMessageChannel,其构造方法仅仅初始化其父类AbstractNioChannel,父类构造方法如下:
//AbstractNioChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
ch.configureBlocking(false)此处就将之前创建的ServerSocketChannel设置为非阻塞模式。
该方法里还有三点需要注意:
1、super(parent)会调用AbstractNioChannel的父类AbstractChannel的构造方法
// AbstractChannel.java protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
newUnsafe()是由子类AbstractNioMessageChannel实现的,里面实例化了一个内部类NioMessageUnsafe(注:该类很重要,里面定义了read方法,会触发accept的调用,后面对其重点分析)。
2、this.readInterestOp = readInterestOp:设置channel的ops为SelectionKey.OP_ACCEPT(值为16)
3、pipeline = new DefaultChannelPipeline(this),创建作用于ServerSocketChannel的管道Pipeline
// DefaultChannelPipeline public DefaultChannelPipeline(Channel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; TailHandler tailHandler = new TailHandler(); tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler); HeadHandler headHandler = new HeadHandler(channel.unsafe()); head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler); head.next = tail; tail.prev = head; }
DefaultChannelPipeline维护了一个以DefaultChannelHandlerContext为元素的双向链表结构,Head是一个Outbound处理器,而tail是一个Inbound处理器。经过此步骤后,管道中的处理器链表为:Head->tail。
b)再来分析以下代码
init(channel)
该方法由子类ServerBootstrap实现
// ServerBootstrap.java void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
该方法主要做了两件事:
1、设置NioServerSocketChannel的options和attrs,并存储之后用于SocketChannel的options和attrs。
2、为NioServerSocketChannel对应的管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegistered事件发生时将ServerBootstrapAcceptor加入到管道中。 c) 最后分析 以下代码:group().register(channel, regPromise);