在真正底层IO操作的时候我们只能发送ByteBuffer,在netty中,转换为ChannelBuffer,因此如果我们想发送字符串/对象或者自定义格式的数据,就需要编写自己的编码/.解码器,进行转换,这部分被netty统一抽象为pipeline&handler
Netty中的pipeline可以理解为连接两台计算机之间的管道,管道中有多个部分,即handler,假设A/B两台主机,当A向B发送的时候,对A来说,表示数据往下流入管道,经过层层处理(encode)之后发送出去,对B来说,数据从管道网上层层处理(decode)之后,到达接收端,因此handler分两类,分别是UpStream和DownStream(数据统一以event表示)
(MessageEvent)
Handler的处理流程如下:
收到的消息走的是UpStream,表示数据是经过层层剥离以后上来的,发出去的消息走的是DownStream,表示数据是经过层层封装以后发出去的,比如下面的代码:
- pipeline.addLast(“framer”, new DelimiterBasedFrameDecoder(
- 8192, Delimiters.lineDelimiter()));
- pipeline.addLast(“decoder”, new StringDecoder());
- pipeline.addLast(“encoder”, new StringEncoder());
- // and then business logic.
- pipeline.addLast(“handler”, new TelnetClientHandler());
- pipeline.addLast(“encoder2″, new StringEncoder());
发消息的时候,会经过encode,顺序是encode2—–>encoder
收消息的时候,会经过decoder,顺序是framer、decoder、handler
(感觉都是同个顺序出去的话,是不是顺手点呢?)
内部结构
从上面的图可以看到,所有的handler被加入以后,会被封装成HandlerContext(包括PreHandler、NextHandler等),然后组成一个链状。UpStream消息从队头开始,DownStream从队尾开始
- DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
- DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
数据的拆分/组装和常用的几个Handler
因为TCP/IP是面向流的协议,数据会被拆分成小的包,并且在接收端重新组装,比如我们发送:
+—–+—–+—–+
| ABC | DEF | GHI |
+—–+—–+—–+
接收端可能受到如下:
+—-+——-+—+—+
| AB | CDEFG | H | I |
+—-+——-+—+—+
因此我们需要将接收到的数据进行整理成可识别的数据格式,在netty中这类属于FrameDecoder,另外还有一类OneToOneHandler,这类的decoder一般要和一个FrameDecoder一起使用。
可以看到都继承自FrameDecoder类,在这个类中有一个成员变量cumulation,因为底层的缓冲区都是每次读完以后就马上释放的,因此就需要一个Buffer来累积的数据:
- public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
- private ChannelBuffer cumulation;
- }
下面是处理累计数据的时候可能遇到的场景:
1.读到的数据不足以解析成一条逻辑数据,继续等待
2.读到的数据包含多条逻辑数据
3.读完一条完整数据之后可能还会有剩余的数据
FrameDecoder主要处理边界逻辑,至于什么时候能够解析出一条完整的逻辑数据则交给子类去实现,这里使用了ChannerBuffer中的CompositeChannelBuffer实现0拷贝
- //cumulation表示缓存的数据
- //1.如果之前没有缓存数据
- if (cumulation == null) {
- // 1.1直接尝试decode
- callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
- if (input.readable()) {
- //1.2 decode之后还有剩余,则将剩下的放到新的ChannelBuffer
- //如果在callDecode并且完整解析出消息之后,抛出异常,可能会使得input剩余的数据丢失 https://github.com/netty/netty/issues/364
- (this.cumulation = newCumulationBuffer(ctx,
- input.readableBytes())).writeBytes(input);
- }
- } else {
- //2.之前就有缓存数据
- assert cumulation.readable();
- boolean fit = false;
- //假设新到的数据有20字节可写
- int readable = input.readableBytes();
- //cumlation只剩余10可写(即还需要10字节的空间)
- int writable = cumulation.writableBytes();
- int w = writable - readable;
- //缓冲区可写的小于新写入的
- if (w < 0) {
- //看之前已经读到了哪里,也就是说之前有多少数据已经废弃
- int readerIndex = cumulation.readerIndex();
- //如果丢弃已经读过的部分,可以将新写入的数据写入,则丢弃
- if (w + readerIndex >= 0) {
- // the input will fit if we discard all read bytes, so do it
- cumulation.discardReadBytes();
- fit = true;
- }
- } else {
- // ok the input fit into the cumulation buffer
- fit = true;
- }
- //cumulation中是否有足够的空间,如果没有,则拼成一个CompositeChannelBuffer,如果有则写入
- ChannelBuffer buf;
- if (fit) {
- // the input fit in the cumulation buffer so copy it over
- buf = this.cumulation;
- buf.writeBytes(input);
- } else {
- // wrap the cumulation and input
- buf = ChannelBuffers.wrappedBuffer(cumulation, input);
- this.cumulation = buf;
- }
- callDecode(ctx, e.getChannel(), buf, e.getRemoteAddress());
- //如果数据全部解析完毕,清空缓冲区,否则将剩下的部分取出
- //如果在callDecode并且完整解析出消息之后,抛出异常;cumulation没有清空,那么下次来的时候assert会失败;或者剩余数据丢失 https://github.com/netty/netty/issues/364
- if (!buf.readable()) {
- // nothing readable left so reset the state
- this.cumulation = null;
- } else {
- // create a new buffer and copy the readable buffer into it
- this.cumulation = newCumulationBuffer(ctx, buf.readableBytes());
- this.cumulation.writeBytes(buf);
- }
- }
- ==============================================================
- private void callDecode(
- ChannelHandlerContext context, Channel channel,
- ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
- while (cumulation.readable()) {
- int oldReaderIndex = cumulation.readerIndex();
- // 是否解析成功的逻辑由子类实现
- Object frame = decode(context, channel, cumulation);
- if (frame == null) {
- if (oldReaderIndex == cumulation.readerIndex()) {
- // Seems like more data is required.
- // Let us wait for the next notification.
- break;
- } else {
- // Previous data has been discarded.
- // Probably it is reading on.
- continue;
- }
- } else if (oldReaderIndex == cumulation.readerIndex()) {
- throw new IllegalStateException(
- “decode() method must read at least one byte ” +
- “if it returned a frame (caused by: ” + getClass() + “)”);
- }
- // 如果解析出来多条数据,判断是否分开发送还是一起发送
- unfoldAndFireMessageReceived(context, remoteAddress, frame);
- }
- }
下面是几个常用的FrameDecoder
1.FixLengthFrameDecoder,定长的解码器
- if (buffer.readableBytes() < frameLength) {
- return null;
- } else {
- return buffer.readBytes(frameLength);
- }
2.DelimiterBasedFrameDecoder,指定分隔符的decoder
基于分隔符(可以多个,以最前面的为准)的解码器,可以指定最大长度
假设指定\n 为分隔符,最大长度100,
a.在超过100长度以后还没有找到\n的,当前的数据以及之后的数据区全部丢弃,直到找到\n
b.没有超过最大长度,继续保留数据
3.LengthFieldBasedFrameDecoder
基于长度的解码器,比如”长度+内容“,主要是下面4个属性
lengthFieldOffset = 0 在某些协议可能会以固定的特殊字符开头,这表示这些字符的长度
lengthFieldLength = 2 表示长度的字节
lengthAdjustment = 0 某些协议长度可能包括头信息,这里可以出去头
initialBytesToStrip = 0 (= do not strip header) 跳过多少个头字节
其他一些decoder
1.OneToOneHandler,比如StringEncoder/StringDecoder,如果经常要发送字符串,那这个比较有用
2)ReplayingDecoder: 它是FrameDecoder的一个变种子类,它相对于FrameDecoder是非阻塞解码。也就是说,使用 FrameDecoder时需要考虑到读到的数据有可能是不完整的,而使用ReplayingDecoder就可以假定读到了全部的数据。
3)ObjectEncoder 和ObjectDecoder:编码解码序列化的Java对象。
4)HttpRequestEncoder和 HttpRequestDecoder:http协议处理。
(http://www.kafka0102.com/2010/06/167.html )
经过decode出完整的逻辑数据后,会进行真正的处理,或者在将数据encode成字节数据,就会进行发送。后面再看看真正进行IO read的时候一些高效的缓冲区操作,以及write的时候流控等。