Netty Inbound/Outbound通道处理器定义: http://donald-draper.iteye.com/blog/2387019
netty 简单Inbound通道处理器(SimpleChannelInboundHandler): http://donald-draper.iteye.com/blog/2387772
引言:
前一篇文章我们看了简单Inbound通道处理器(SimpleChannelInboundHandler),先来看回顾一下:
简单Inbound通道处理器SimpleChannelInboundHandler<I>,内部有连个变量一个为参数类型匹配器,用来判断通道是否可以处理消息,另一个变量autoRelease,用于控制是否在通道处理消息完毕时,释放消息。读取方法channelRead,首先判断跟定的消息类型是否可以被处理,如果是,则委托给channelRead0,channelRead0待子类实现;如果返回false,则将消息转递给Channel管道线的下一个通道处理器;最后,如果autoRelease为自动释放消息,且消息已处理则释放消息。
今天我们来看一下消息编码器MessageToByteEncoder:
package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.TypeParameterMatcher; /** * {@link ChannelOutboundHandlerAdapter} which encodes message in a stream-like fashion from one message to an * {@link ByteBuf}. *消息编码器MessageToByteEncoder,将高层消息对象编码为底层字节流。消息编码器实际为Outbound通道处理器。 * * Example implementation which encodes {@link Integer}s to a {@link ByteBuf}. *下面是一个编码整数到字节缓冲的例子 * <pre> * public class IntegerEncoder extends {@link MessageToByteEncoder}<{@link Integer}> { * {@code @Override} * public void encode({@link ChannelHandlerContext} ctx, {@link Integer} msg, {@link ByteBuf} out) * throws {@link Exception} { * out.writeInt(msg); * } * } * </pre> */ public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { private final TypeParameterMatcher matcher;//类型参数匹配器 private final boolean preferDirect;//是否使用Direct类型buf /** * see {@link #MessageToByteEncoder(boolean)} with {@code true} as boolean parameter. */ protected MessageToByteEncoder() { this(true);//默认使用direct类型buffer,关于direct类型buffer我们在java nio相关篇有说 } /** * see {@link #MessageToByteEncoder(Class, boolean)} with {@code true} as boolean value. 根据消息类型构造消息编码器 */ protected MessageToByteEncoder(Class<? extends I> outboundMessageType) { this(outboundMessageType, true); } /** * Create a new instance which will try to detect the types to match out of the type parameter of the class. *创建一个将会尝试探测消息类型是否匹配类型参数匹配器编码实例 * @param preferDirect {@code true} if a direct {@link ByteBuf} should be tried to be used as target for * the encoded messages. If {@code false} is used it will allocate a heap * {@link ByteBuf}, which is backed by an byte array. 如果用direct类型buffer,存储消息编码后的字节流,则preferDirect为true。如果preferDirect为false将分配一个 堆类型buffer,用一个可backed的字节数组存储消息编码后的字节流。 */ protected MessageToByteEncoder(boolean preferDirect) { //这句话,看过前面的文章,应该很好理解,获取消息编码,类型参数名I对应的类型参数匹配器 matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I"); this.preferDirect = preferDirect; } /** * Create a new instance * * @param outboundMessageType The type of messages to match * @param preferDirect {@code true} if a direct {@link ByteBuf} should be tried to be used as target for * the encoded messages. If {@code false} is used it will allocate a heap * {@link ByteBuf}, which is backed by an byte array. */ protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) { //获取消息类型outboundMessageType对应的类型参数匹配器 matcher = TypeParameterMatcher.get(outboundMessageType); this.preferDirect = preferDirect; } /** * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. 如果给定的消息可以被处理,则返回true。如果返回false,则将消息传到给Channel管道线上的下一个Outbound处理器。 */ public boolean acceptOutboundMessage(Object msg) throws Exception { //通过类型参数匹配器,判断消息是否可以被处理 return matcher.match(msg); } //写消息对象 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) {//如果消息可以被编码器处理 @SuppressWarnings("unchecked") I cast = (I) msg; //根据通道处理器上下文和preferDirect,分配一个字节buf buf = allocateBuffer(ctx, cast, preferDirect); try { //编码消息对象到字节buf encode(ctx, cast, buf); } finally { //释放消息对应引用参数 ReferenceCountUtil.release(cast); } //如果当前buffer可读,则通道处理器上下文写buffer,并将结果存储在promise中 if (buf.isReadable()) { ctx.write(buf, promise); } else { //否则释放buffer,写空buf buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { //释放buf buf.release(); } } } /** * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}. * Sub-classes may override this method to return {@link ByteBuf} with a perfect matching {@code initialCapacity}. 分配一个字节buffer,用于#encode方法中ByteBuf参数,子类可以重写此方法,返回一个合适的初始化容量的Direct类型的buffer。 */ protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) { //返回一个direct类型的buffer return ctx.alloc().ioBuffer(); } else { //返回一个heap类型buffer return ctx.alloc().heapBuffer(); } } /** * Encode a message into a {@link ByteBuf}. This method will be called for each written message that can be handled * by this encoder. *编码消息到字节buf。当写消息可以被当前编码器处理时,调用此方法。 * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to 消息编码器所属的通道处理器上下文 * @param msg the message to encode 需要编码的消息 * @param out the {@link ByteBuf} into which the encoded message will be written 字节buffer * @throws Exception is thrown if an error occurs */ protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; //获取是否使用direct类型的buffer存储消息编码后的字节序列 protected boolean isPreferDirect() { return preferDirect; } }
我们来分析一下写消息对象
//写消息对象
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) {//如果消息可以被编码器处理 @SuppressWarnings("unchecked") I cast = (I) msg; //根据通道处理器上下文和preferDirect,分配一个字节buf buf = allocateBuffer(ctx, cast, preferDirect); try { //编码消息对象到字节buf encode(ctx, cast, buf); } finally { //释放消息对应引用参数 ReferenceCountUtil.release(cast); } //如果当前buffer可读,则通道处理器上下文写buffer if (buf.isReadable()) { ctx.write(buf, promise); } else { //否则释放buffer,写空buf buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { //释放buf buf.release(); } } }
有一下代码片段我们需要注意:
//如果当前buffer可读,则通道处理器上下文写buffer,并将结果存储在promise中 if (buf.isReadable()) { ctx.write(buf, promise); } else { //否则释放buffer,写空buf buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); }
//ByteBuf
/** * Returns {@code true} * if and only if {@code (this.writerIndex - this.readerIndex)} is greater * than {@code 0}. 字节buf的写索引大于读索引,则可读 */ public abstract boolean isReadable();
字节buf我们后面有时间再说。
//Unpooled
private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT; /** * A buffer whose capacity is {@code 0}. 空容量的字节buf,这个在后面跟字节buf一起将 */ public static final ByteBuf EMPTY_BUFFER = ALLOC.buffer(0, 0);
//ChannelHandlerContext
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
//ChannelOutboundInvoker
/** * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}. * This method will not request to actual flush, so be sure to call {@link #flush()} * once you want to request to flush all pending data to the actual transport. 通过Channel处理器上下文,请求写一个消息到Channle管道线。如果方法没有实际地刷新,如果你想请求 刷新所有等待发送的数据到实际transport,必须调用flush方法一次。 */ ChannelFuture write(Object msg, ChannelPromise promise);
再来简单看一字节buf分配函数:
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) { //返回一个direct类型的buffer return ctx.alloc().ioBuffer(); } else { //返回一个heap类型buffer return ctx.alloc().heapBuffer(); } }
//ChannelHandlerContext
/** * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s. */ ByteBufAllocator alloc();
//ByteBufAllocator
public interface ByteBufAllocator { ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; /** * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. */ ByteBuf ioBuffer(); /** * Allocate a heap {@link ByteBuf}. */ ByteBuf heapBuffer(); ... }
分配字节buf,实际委托给通道处理器上下文的ByteBufAllocator。
总结:
消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,内部有一个类型参数处理器TypeParameterMatcher,用于判断消息是否可以被当前编码器处理,不能则传给Channel管道线上的下一个通道处理器;一个preferDirect参数,用于决定,当将消息编码为字节序列时,应该存储在direct类型还是heap类型的字节buffer中。消息编码器主要方法为write方法,write方法首先,判断消息是否可以被当前编码器处理,如果消息可以被编码器处理,根据通道处理器上下文和preferDirect,分配一个字节buf,委托encode方法,编码消息对象到字节buf,encode方法待子类实现;释放消息对应引用参数,如果当前buffer可读,则通道处理器上下文写buffer,否则释放buffer,写空buf,最后释放buf。消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,这个与Mina中的消息编码器是有区别的,Mina中的消息编码器要和解码器组装成编解码工厂过滤器添加到过滤链上,且编解码工厂过滤器,在过滤链上是由先后顺序的,通道Mina中编码器和通道Handler是两个概念。而Netty中编码器实际为Outbound通道处理器,主要是通过类型参数匹配器TypeParameterMatcher,来判断消息是否可以被编码器处理。