基于netty 4.0官方文档
http://netty.io/4.0/api/index.html
Netty-ChannelPipeline
实现的接口
java.lang.Iterable<java.util.Map.Entry<java.lang.String,ChannelHandler>>
1.简介
public interface ChannelPipeline extends java.lang.Iterable<java.util.Map.Entry<java.lang.String,ChannelHandler>>
ChannelPipeline
就像工厂里的流水线一样,可以在上面安装多个ChannelHanler
,输入输出的事件和操作就会从ChannelPipeline
的一侧流向另一侧,流经相应的ChannelHanler
时得到加工。
ChannelPipeline
实现了拦截器的一种高级形式,使得用户可以对事件的处理以及ChannelHanler
之间交互获得完全的控制权。
2.创建pipeline
每个Channel
都有一个属于它的pipeline
,在创建Channel
时会自动创建一个pipeline
。
3.事件是怎样在pipeline中流动的
流入的I/O事件会被ChannelInboundHandler
或者 ChannelOutboundHandler
处理,并在处理完成之后,依靠定义在ChannelHandlerContext
中的事件冒泡方法(e.g. ChannelHandlerContext.fireChannelRead(Object)
或者 ChannelHandlerContext.write(Object)
)来将事件传递给下一个最近的ChannelHandler
。
在上图中,输入事件由inbound handlers
从下而上的处理。inbound handler
通常负责处理上图底部的I/O线程生成的数据(这些数据通常来源于实际的输入操作,e.g. SokcetChannel.read(ByteBuffer)
)。如果一个输入事件流到了pipeline
的末端,那么它要么被直接丢弃,要么被记录到日志中。
而输出事件由outbound handlers
由上而下的处理。outbound handlers
通常负责生成或者传输写请求之类的操作。一个输出事件流到pipeline
的末端时就会得到Channel
对应的I/O线程的处理(e.g. SocketChannel.write(ByteBuffer)
).
例如,有如下的pipeline
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
如上的pipeline
对于输入事件的流过的顺序为1,2,3,4,5(Channelpipeline
实际上会跳过不相关的handler
,所以实际应该是1,2,5),对于输出事件的顺序即为5,4,3,2,1(实际为5,4,3)。
4.将事件传递给下一个handler
如前文提到的,事件在pipeline
传递需要调用ChannelHandlerContext
中定义的事件冒泡方法,这些方法包括:
// Inbound event propagation methods:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
// Outbound event propagation methods:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)
一个简单的例子:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}
5.构造一个pipeline
用户应该在pipline
中添加一个或多个handler
来处理各种I/O事件。例如,一个典型的服务端应该至少包括以下几种handler
:
- Protocol Decoder - 将二进制数据转化为Java对象的解码器
- Protocol Encoder - 将java对象转化为二进制数据的编码器
- Business Logic Handler - 负责具体服务逻辑的
handler
一个简单的例子:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());
/**
* 告知pipeline将MyBusinessLogicHandler运行在另一个非I/O的线程以防止阻塞I/O线程,
* 如果MyBusinessLogicHandler中的逻辑是纯异步的或者需要的时间很短,那么就不必这样做
*/
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
6.线程安全
ChannelHandler
可以在任意时刻添加会删除,因为ChannelPipeline
是线程安全的。例如你可以在敏感数据即将被交换时添加一个加密的handler
,并在交换完成后移除掉。