前面我们在提到ChannelPipeline的地方只是简单的描述了一下,这里我们再进一步深入到ChannelPipeline的内部中。
在netty中每一个channel都会只有一个ChannelPipeline与之对应.
在看看AbstractChannel类的构造函数吧,channel的初始化都会到这个父类的构造函数中。
protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
DefaultChannelPipeline的构造函数:
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
保存与之关联的channel到DefaultChannelPipeline对象中。创建了tail和head字段,这个是双向链表的头和尾,在DefaultChannelPipeline维护了一个以AbstractChannelHandlerContext为节点的双向链表。TailContext继承了AbstractChannelHandlerContext实现了ChannelInboundHandler。
HeadContext继承了AbstractChannelHandlerContext实现了ChannelOutboundHandler和ChannelInboundHandler。
看看HeadContext的构造函数:
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); }
HeadContext调用了父类AbstractChannelHandlerContext的构造器,并传入参数inbound=false,outbound= true。TailContext的构造器与HeadContext的相反。传入参数inbound=true,outbound=false。
HeadContext是一个outboundHandler,TailContext是一个inboundHandler。HeadContext和TailContext即是一个ChannelHandler,又是一个ChannelHandlerContext。
以上就是一个通道的DefaultChannelPipeline自身的组成参数。那么我们在使用管道的时候,都会向里面添加自定义的ChannelHandler。这样才会发挥出DefaultChannelPipeline的魅力。
在EchoClient中:
EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } p.addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect(HOST, PORT).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); }
在调用.handler方法时候传入了ChannelInitializer对象,它最终实现了ChannelHandler接口,那么在ChannelInitializer对象是怎么添加到DefaultChannelPipeline的?
handler方法会调用到AbstractBootstrap实例的handler方法:
public B handler(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; return (B) this; }
这个方法做了一件事件就是把传入的handler实例赋值给AbstractBootstrap的handler变量。
还记得我们在第三节讲到的initAndRegister方法不。在此方法中调用了channel = channelFactory().newChannel()方法创建了一个新的channel后,就会调用init方法:
这个方法在Bootstrap类中会重写:
@Override @SuppressWarnings("unchecked") void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(handler()); final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } } }
handler()获取我们在刚刚赋值的handler变量的值(ChannelInitializer),然后添加到管道的最后:调用的addLast方法。看看DefaultChannelPipeline的addLast方法:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; }
newContext(group, filterName(name, handler), handler);为这个handler创建一个与之关联的DefaultChannelHandlerContext实例。在DefaultChannelHandlerContext中有一个handler变量保存传入关联的handler实例(ChannelInitializer)。
看看DefaultChannelHandlerContext的构造函数:
DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; }
看2个方法:
private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; }
这两个方法就是判断了Handler是不是实现了ChannelOutboundHandler或者是ChannelInboundHandler接口。这个2个方法返回的true或者是false都会传入到父类中:AbstractChannelHandlerContext。
ChannelInitializer实现了ChannelInboundHandler接口,因此这里实例化的 DefaultChannelHandlerContext的inbound = true,outbound = false。这两个字段关系到在pipeline中的事件的流向与分类。
在创建好DefaultChannelHandlerContext后( 在pipeline的链表中只会加入ChannelHandlerContext对象),会将他插入到pipeline的双向链表中。
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
这个是典型的双向链表的插入操作。
还记得channel的注册过程不,在第三节的时候,我们讨论的channel的注册过程,在此过程中调用了AbstractUnsafe.register0方法中调用了pipeline.fireChannelRegistered() :
@Override public final ChannelPipeline fireChannelRegistered() { AbstractChannelHandlerContext.invokeChannelRegistered(head); return this; }
AbstractChannelHandlerContext中的invokeChannelRegistered:
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } }
next变量的实例就是HeadContext,invokeChannelRegistered调用的AbstractChannelHandlerContext的:
private void invokeChannelRegistered() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } }
这里handler()调用的是DefaultChannelHandlerContext中的handler()方法。返回的就是ChannelInitializer对象。这就是和前面关联起来了。
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (initChannel(ctx)) { ctx.pipeline().fireChannelRegistered(); } else { ctx.fireChannelRegistered(); } }
initChannel方法:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { remove(ctx); } return true; } return false; }
initChannel((C) ctx.channel());熟悉吧,这个就会调用我们在EchoClient类中覆盖的方法:
.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } p.addLast(new EchoClientHandler()); } });
再调用remove(ctx)后,pipeline里面就是head---->EchoClientHandler---->tail。