Netty中的ChannelPipeline和ChannelHandler机制类似于Servlet和Filter过滤器,这类拦截器实际上是责任链模式的一种变种,主要是为了方便事件的拦截和用户自定义逻辑的定义。
一、ChannelPipeline
(一)简单介绍
ChannelPipeline是ChannelHandler的容器,它负责ChannelHandler的管理和事件拦截与调度;
(二)事件处理
上图展示了一个消息被ChannelPipeline的ChannelHandler链拦截和处理的全过程 这个图是在ChannelPipeline里面截取出来的。
- 底层的SocketChannel.read()方法读取ByteBuf,触发ChannelRead方法,由I/O线程NioEventLoop调用ChannelPipeline的fireChannelRead(Object msg)方法,将消息传送到ChannelPipeline中;
- 消息依次被ChannelHandler1、ChannelHandler2、…等等进行拦截和处理,在这个过程中,任何ChannelHandler都可以中断流程来结束消息的传递;
- 写出消息,依次被这些ChannelHandler进行处理;
- 我们发现,读取信息的时候,触发的是InboundHandler,写数据的时候,触发的是OutboundHanler。
(三)I/O事件分类
Netty中的事件分为两种:inbound事件和outbound事件。
inbound事件:通常由i/o线程触发,这些方法被封装在ChannelInboundHandler里面;
outbound事件:通常由用户代码触发,这些方法被封装在ChannelOutboundHandler里面;
(四)继承体系
1. DefaultChannelPipeline解析
(1)构造器
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
可以看到,这里面形成了一个指针,每个元素的类型是AbstractChannelHandlerContext 查看tail和head的定义,
(2)addLast方法
这里我们查看ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler)方法,因为所有的addLast最终都会调用这个方法。
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) { //①
checkMultiplicity(handler); //②
newCtx = newContext(group, filterName(name, handler), handler); //③
addLast0(newCtx); //④
....
- 注意到,这里使用了synchronized来保证了多线程的安全性 ①
- 对handler进行了去重检查 ②,其实就是检查是否存在没有多个没有添加@Sharable的同类handler
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
- 创建出一个新的AbstractChannelHandlerContext ③
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
查看DefaultChannelHandlerContext的定义:
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
.....
可以看到,里面存放了一个ChannelHandler,构造的时候,也传入了一个handler,也就是说,他实际上是对ChannelHandler进行了包装
- 把创建的AbstractChannelHandlerContext添加到链表尾部 ④
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
- 综合上述代码分析,我们可以理解为,在ChannelPipeline里面存放了一个ChannelHandler的链表(以AbstractChannelHandlerContext的形式进行了包装)
二、ChannelHandler
(一)功能介绍
ChannelHandler的功能类似于Servlet的Filter过滤器,我们可以根据自己的业务逻辑编写自己的ChannelHandler。
(二)支持的注解
- @Sharable:多个ChannelPipeline共享同一个ChannelHandler
(三)系统预置ChannelHandler
Netty为我们准备了很多好用的ChannelHandler供我们使用,其中大多数是编解码的处理器。
具体可以参看
核心API
(四)用户自定义ChannelHandler
实际上,如果我们需要自己定义ChannelHandler,那么我们需要根据自己的需要来实现ChannelInboundHandler接口或者是ChannelOutboundHandler,具体实现哪个接口,需要跟我们关心的事件进行区分,这个可以参看ChannelPipeline中的事件分类,当然,一般情况下,我们都会去实现ChannelInboundHandler。
ChannelInboundHandler里面的方法比较多,编写起来比较麻烦,所以Netty给我们提供了ChannelInboundHandlerAdapter,对所有的方法都有默认的实现,我们只需要继承他然后重写我们关心的方法即可。