Netty中有一种特殊的Channel实现 —— EmbeddedChannel ,它是Netty专门为改进针对ChannelHandler 的单元测试而提供的。
可以将入站数据或者出站数据写入到 EmbeddedChannel 中,然后检查是否有任何东西到达了 ChannelPipeline 的尾端。以这种方式,你便可以确定消息是否已经被编码或者被解码过了,以及是否触发了任何的ChannelHandler 动作。
writeInbound(Object... msgs)
将入站消息写到 EmbeddedChannel 中。如果可以通过 readInbound() 方法从 EmbeddedChannel 中读取数据,则返回true
readInbound()
从EmbeddedChannel 中读取一个入站消息。任何返回的东西都穿越了整个ChannelPipeline。如果没有任何可供读取的,则返回null
writeOutbound(Object... msgs)
将出站消息写到EmbeddedChannel中。如果现在可以通过readOutbound()方法从EmbeddedChannel 中读取到什么东西,则返回true
readOutbound()
从EmbeddedChannel 中读取一个出站消息。任何返回的东西都穿越了整个ChannelPipeline。如果没有任何可供读取的,则返回null
finish()
将EmbeddedChannel 标记为完成,并且如果有可被读取的入站数据或者出站数据,则返回true。这个方法还将会调用EmbeddedChannel 上的close()方法。
入站数据由ChannelInboundHandler 处理,代表从远程节点读取的数据。出站数据由ChannelOutboundHandler 处理,代表将要写到远程节点的数据。
使用writeOutbound()
方法将消息写到 Channel 中,并通过 ChannelPipeline 沿着出站的方向传递。随后,你可以使用readOutbound()
方法来读取已被处理过的消息,以确定结果是否和预期一样。 类似地,对于入站数据,你需要使用writeInbound()
和readInbound()
方法。
在每种情况下,消息都将会传递过ChannelPipeline,并且被相关的ChannelInboundHandler 或者ChannelOutboundHandler 处理。
测试入站消息
展示了一个简单的 ByteToMessageDecoder 实现。给定足够的数据,这个实现将产生固定大小的帧。如果没有足够的数据可供读取,它将等待下一个数据块的到来,并将再次检查是否能够产生一个新的帧。
public class FixedLengthDecoder extends ByteToMessageDecoder {
private final int fixedLength;
public FixedLengthDecoder(int fixedLength) {
if(fixedLength <= 0)
throw new IllegalArgumentException("fixedLength must be a positive integer");
this.fixedLength = fixedLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= fixedLength){
ByteBuf buf = in.readBytes(fixedLength);
out.add(buf);
}
}
}
这个特定的解码器将产生固定为3 字节大小的帧。因此,它可能会需要多个事件来提供足够的字节数以产生一个帧。
public class FixedLengthDecoderTest {
@Test
public void testFixedLengthDecoder(){
//创建一个ByteBuf,并存储6个字节
ByteBuf byteBuf = Unpooled.buffer();
for (int i = 0; i < 6; i++) {
byteBuf.writeByte(i);
}
ByteBuf writeBuf = byteBuf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthDecoder(3));
assertFalse(channel.writeInbound(writeBuf.readBytes(1)));
assertFalse(channel.writeInbound(writeBuf.readBytes(1)));
assertTrue(channel.writeInbound(writeBuf.readBytes(1)));
assertTrue(channel.writeInbound(writeBuf.readBytes(3)));
assertTrue(channel.finish());
ByteBuf readBuf = channel.readInbound();
assertEquals(byteBuf.readSlice(3), readBuf);
readBuf.release();
readBuf = channel.readInbound();
assertEquals(byteBuf.readSlice(3), readBuf);
assertNull(channel.readInbound());
byteBuf.release();
}
}
测试出站消息
在测试出栈消息时,我们可以继承了 MessageToMessageEncoder 实现了DoubleIntegerEncoder类,主要就是将写入的Integer类型的数字,进行乘以2处理
public class DoubleIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
while (msg.readableBytes() >= 4) {
int value = msg.readInt() * 2;
out.add(value);
}
}
}
在测试时,我们先将出站消息写到EmbeddedChannel中,然后再读取出来进行比较
public class DoubleIntegerEncoderTest {
@Test
public void testDoubleIntegerEncoder() {
ByteBuf byteBuf = Unpooled.buffer();
for (int i = 0; i < 2; i++) {
byteBuf.writeInt(i);
}
EmbeddedChannel channel = new EmbeddedChannel(new DoubleIntegerEncoder());
Assert.assertTrue(channel.writeOutbound(byteBuf));
Assert.assertTrue(channel.finish());
for (int i = 0; i < 2; i++) {
Assert.assertEquals(i * 2, channel.readOutbound());
}
Assert.assertNull(channel.readOutbound());
}
}
测试异常处理
如果所读取的字节数超出了某个特定的限制,我们将会抛出一个TooLongFrameException。
public class LengthLimitDecoder extends ByteToMessageDecoder {
private final int maxLength;
public LengthLimitDecoder(int maxLength) {
this.maxLength = maxLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readableBytes = in.readableBytes();
if(readableBytes > maxLength){
in.clear();
throw new TooLongFrameException();
}
ByteBuf byteBuf = in.readBytes(readableBytes);
out.add(byteBuf);
}
}
在测试代码中,我们先将一个符合该长度限制的入站消息写到 EmbeddedChannel 中,然后再写入一个超过长度限制的入站消息
public class LengthLimitDecoderTest {
@Test
public void testLengthLimitDecoder(){
ByteBuf byteBuf = Unpooled.buffer();
for (int i = 0; i < 5; i++) {
byteBuf.writeByte(i);
}
EmbeddedChannel channel = new EmbeddedChannel(new LengthLimitDecoder(2));
assertTrue(channel.writeInbound(byteBuf.readBytes(2)));
try {
channel.writeInbound(byteBuf.readBytes(3));
} catch (Exception e) {
e.printStackTrace();
}
}
}