这几天一直在看netty,感觉跟之前看过的tomcat原理多多少少有点类似,看了些书,觉得还是看源码比较实在,有感觉。
这个图是 netty实战上面扣来的我觉得很实在(它是经过抽象看起来两条链,实际上就一条双向链表,无非类型不符合则不处理),在ChannelPipeLine中,维护了具体处理相关通知的成员handler,组成的双向链表,链表成员为ChannelHandlerContext。左边是链表头,右边是链表尾。
根据请求的方向分为Outboundhandler跟Inboundhandler,fireIN_EVT()即从链表头向尾传递,仅仅类型是Inboundhandler才处理,数据的出站运动则从链表尾部向头部传递,仅被Outboundhandler处理。出站数据将会达到网络层,Socket传出。
每一个Channel都会有一个对应的ChannelPipLine。
protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
下面是ChannelPipLine的构造函数
public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; //TailHandler是DefaultChannelPipeline的一个默认实现ChannelInboundHandler的一个内部类 TailHandler tailHandler = new TailHandler(); tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler); //HeadHandler是DefaultChannelPipeline的一个默认实现ChannelOutboundHandler的一个内部类 HeadHandler headHandler = new HeadHandler(channel.unsafe()); head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler); head.next = tail; tail.prev = head; }
逻辑很简单,无非构造两个头尾handler跟ChannelHandlerContext,并将headContext跟tailContext构造成双向链表。之后的addLast,addFirst无非向这两之间的添加context,双向链表的头尾依旧是以上两个。
下面看addFirst()
@Override public ChannelPipeline addFirst(String name, ChannelHandler handler) { return addFirst(null, name, handler); } @Override public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) { synchronized (this) { //检查是否已添加同名的handler checkDuplicateName(name); //根据pipline,group,name,handler构建出一个新的DefaultChannelHandlerContext DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); //添加这个newCtx到pipline:通过操作链表,将该handlerContext添加到head的后面 addFirst0(name, newCtx); } return this; }
一口气贴太多可能看不过来,以上逻辑无非先检查是否有同名headler,再根据headler,pipline封装数据得到绑定的Context(其中设置了inbound/outbound属性),最后调用addFirst0,将context加入双向链表中。
private void checkDuplicateName(String name) { if (name2ctx.containsKey(name)) { throw new IllegalArgumentException("Duplicate handler name: " + name); } }
name2ctx是一个以handler名字为key,ChannelHandlerContext为值的map;
private void addFirst0(String name, DefaultChannelHandlerContext newCtx) { /** * 检查newCtx中的handler是否被重复添加: * 对于一个没有加Shareble的hanlder类,如果每次都是new出来不同的对象,是可以重复添加到 * 同一个pipline的,但是如果是同一个对象实例,是不允许重复添加到同一个pipline的 */ checkMultiplicity(newCtx); DefaultChannelHandlerContext nextCtx = head.next; newCtx.prev = head; newCtx.next = nextCtx; head.next = newCtx; nextCtx.prev = newCtx; name2ctx.put(name, newCtx); callHandlerAdded(newCtx); }
可以看到,先checkMultiplicity 检查ctx中的handler是否多次添加的合法性,将新的ctx插入到head节点后。顺便将新的存入name2ctx中。并调用handlerAdded触发的事件。
private static void checkMultiplicity(ChannelHandlerContext ctx) { ChannelHandler handler = ctx.handler(); if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; /** * 1.判断当前添加的ChannelHandlerContext的handler是否加了注解@Sharable * 2.判断handler是否已经被添加过 * 如果没有添加@Sharable注解,又被添加过,那么将会抛出异常 * 如果加了@Sharable注解,那么这个handler可以被多次添加 */ if (!h.isSharable() && h.added) { throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed mul
@Override public ChannelHandlerContext fireChannelActive() { //先找到当前ChannelHandlerContext的下一个ChannelHandlerContext final DefaultChannelHandlerContext next = findContextInbound(); //得到执行器 EventExecutor executor = next.executor(); /** * 1.首先executor.inEventLoop()方法判断当前线程是不是netty创建的 */ //调用上面得到的next的真正的invokeChannelActive if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelActive(); } }); } return this; }
基本上,到这里,整个链表的构造理清了~下面看看双向链表中的事件的传递。
当ServerSocket刚刚监听端口bind的时候,这里会触发一次fireChannelActive事件;那我们从这开始跟踪~
@Override public ChannelPipeline fireChannelActive() { //从pipline的head开始触发ChannelActive事件 head.fireChannelActive(); //判断isAutoRead的值,如果为true(默认值为true),则自动调用read if (channel.config().isAutoRead()) { channel.read(); } return this; }我们可以看到,pipline的fireChannelActive是调用head的fireChannel,从head开始,并顺着链表往后调用,继续看下面代码
@Override public ChannelHandlerContext fireChannelActive() { //先找到当前ChannelHandlerContext的下一个ChannelHandlerContext final DefaultChannelHandlerContext next = findContextInbound(); //得到执行器 EventExecutor executor = next.executor(); /** * 1.首先executor.inEventLoop()方法判断当前线程是不是netty创建的 */ //调用上面得到的next的真正的invokeChannelActive if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelActive(); } }); } return this; }
这里很有意思,从这里也可以看出netty线程模型的优点~慢慢来,先得到链表下一个handlercontext
private DefaultChannelHandlerContext findContextInbound() { /** * inbound时候,从head往tail遍历handle,并找出这些handle里面所有inbound类型的handle, * 一直遍历到TailHandle,TailHandle是netty默认实现的一个inbound类型的handle,这个TailHandle默认实现的 * ChannelInboundHandler接口都是空方法,所以当调用到tail对应的方法的时候,调用链就会终止 */ DefaultChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
先得到链表上下一个inbound属性为true的ctx并返回,通过这个其实就已经把双向链表给用户抽象成两条链表的感觉,如最上面的那张图。
再继续回到上一个函数,得到执行器,然后判断当前线程是否是该执行器绑定Eventloop的指定线程,如果是的话则直接调用事件next.invokeChannelActive();否则将该事件分装成任务加入该执行器的任务队列中。(这里涉及到netty的线程模型和任务调度,在后面将eventLoop时会仔细分析)
private void invokeChannelActive() { try { ((ChannelInboundHandler) handler).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } }
上面的this为下一个ctx,如果inbound为ture,那么它一定是ChannelInboundHandler的子类,netty这里采用了适配器模型,我们直接看到对应的adapter的方法
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); }
实现很简单,调用下一个inbound ctx的firchannelActive,保证了事件在链表中传递。