截断流水线处理
如果我们有时候业务条件不满足,需要阶段流水线处理,不让进入处理下一战,我们如何做呢
通过前面的分析,我们现在整理一下
- 不调用supper.chanelXxx(ChannelHandlerContext…)方法
- 不调用ctx.fireChannelXxx
handler业务处理器的热拔插
找到chanelPipeline接口,我们有如下方法
//在头部增加一个handler
ChannelPipeline addFirst(String name, ChannelHandler handler);
//在尾部增加一个hanler
ChannelPipeline addLast(String name, ChannelHandler handler);
//在某个handler之前增加一个handler
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
//在某个handler之后增加一个handler
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
//移除一个handler
ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
//移除第一个
ChannelHandler removeFirst();
//移除最后一个
ChannelHandler removeLast();
public class PipelineTest {
static class SimpleHandlerA extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("处理器A被回调");
super.channelRead(ctx, msg);
}
}
static class SimpleHandlerB extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("处理器B被回调");
super.channelRead(ctx, msg);
ctx.pipeline().remove(this);
}
}
static class SimpleHandlerC extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("处理器C被回调");
super.channelRead(ctx, msg);
}
}
public static void main(String[] args) throws InterruptedException {
ChannelInitializer<EmbeddedChannel> channelInitializer = new ChannelInitializer<EmbeddedChannel>() {
@Override
protected void initChannel(EmbeddedChannel ch) throws Exception {
ch.pipeline().addLast(new SimpleHandlerA());
ch.pipeline().addLast(new SimpleHandlerB());
ch.pipeline().addLast(new SimpleHandlerC());
}
};
EmbeddedChannel embeddedChannel = new EmbeddedChannel(channelInitializer);
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeInt(1);
//第一次写数据到通道
embeddedChannel.writeInbound(byteBuf);
//第二次写数据到通道
embeddedChannel.writeInbound(byteBuf);
//第三次写数据到通道
embeddedChannel.writeInbound(byteBuf);
Thread.sleep(Integer.MAX_VALUE);
}
}
结果,第二次传输时,第二个handler已被删除
处理器A被回调
处理器B被回调
处理器C被回调
处理器A被回调
处理器C被回调
处理器A被回调
处理器C被回调
回顾下前面的代码,但注册成功后会把ChannelInitialize
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
// the handler.
if (initChannel(ctx)) {
// we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
// miss an event.
ctx.pipeline().fireChannelRegistered();
// We are done with init the Channel, removing all the state for the Channel now.
removeState(ctx);
} else {
// Called initChannel(...) before which is the expected behavior, so just forward the event.
ctx.fireChannelRegistered();
}
}
Bytebuf缓冲区
相比于Nio的bytebuffer,Bytebuf相对简单,它主要维护了3个 重要属性。
readerIndex、writeIndex、maxCapacity
bytebuf的引用计数
Netty中Bytebuf是通过引用计数方式管理的。一般来说,当创建完一个ByteBuf,引用计数为1,每次调用retain()方法,引用计数加一,每次调用release()方法,引用计数减一 。引用计数为0时缓冲区将不再使用 。
如果是非池化的缓冲区,会将缓冲区放回ByteBuf池子,如果是未池化的缓冲区,此时有两种情况,一是Heap堆结构缓冲,会被JVM垃圾回收器回收。如果是direct直接缓冲区,调用本地方法释放直接内存unsafe.freeMemory方法。
bytebuf的Allocator分配器
ByteBuf buffer = null;
//方法一:默认分配器,分配初始容量为9,最大容量100的缓冲
buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
//方法二:默认分配器,分配初始为256,最大容量Integer.MAX_VALUE 的缓冲
buffer = ByteBufAllocator.DEFAULT.buffer();
//方法三:非池化分配器,分配基于Java的堆内存缓冲区
buffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer();
//方法四:池化分配器,分配基于操作系统的管理的直接内存缓冲区
buffer = PooledByteBufAllocator.DEFAULT.directBuffer();
2种Bytebuf的使用实践
堆内存bytebuf
@Test
public void testHeapBuffer() {
//取得堆内存
ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer();
heapBuf.writeBytes("你好Netty".getBytes(UTF_8));
if (heapBuf.hasArray()) {
//取得内部数组
byte[] array = heapBuf.array();
Logger.info(new String(array, UTF_8));
}
heapBuf.release();
}
直接内存
@Test
public void testDirectBuffer() {
//直接缓冲区
ByteBuf directBuf = ByteBufAllocator.DEFAULT.directBuffer();
directBuf.writeBytes("你好Netty".getBytes(UTF_8));
if (!directBuf.hasArray()) {
int length = directBuf.readableBytes();
byte[] array = new byte[length];
//读取数据到堆内存
directBuf.getBytes(directBuf.readerIndex(), array);
Logger.info(new String(array, UTF_8));
}
directBuf.release();
}
最终会调用
池化内存与菲池化内存,此处默认未池化内存
最终执行了PooledUnsafeDirectByteBuf的newInstance方法,代码如下:
通过RECYCLER的get方法循环使用ByteBuf对象,如果是非内存池实现,则直接创建一个新的ByteBuf对象
bytebuf自动释放
一、TailHandler自动释放
调用super.chanelRead(ctx,msg),调用父类方法,父类会默认调用后一个hanler,最终调用到tailHandler
二、手动释放
byteBuffer.release()
三、handler继承SimpleChannelInboundHandler,这是一个模板方法模式,此类帮我们释放了,我们只需要重写chanelRead0方法就行。
出栈释放时再headHanler中进行释放的
@Sharable注解
我们handler上有时会有把这个注解,这个注解是干什么用的呢。
这个注解表示一个handler实例可以安全的被多个通道安全的共享。防止重复实例引起的内存开销用一个实例解决这个问题。
Decoder 与 Encoder重要组件
字节码与对象的转换
Decoder
inbound入栈类型
ByteToMessageDecoder
解码流程
首先,他将上一站传过来的bytebuf中的数据进行解码,解码出一个list对象列表,然后迭代 列表,逐个将java pojo对象传递到下一个inbound handler中。
- 继承ByteToMessageDecoder
- 实现decode方法,将pojo解码逻辑写在这里
- 存入list中
public class Byte2IntegerDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) {
while (in.readableBytes() >= 4) {
int i = in.readInt();
Logger.info("解码出一个整数: " + i);
out.add(i);
}
}
}
public class IntegerProcessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Integer integer = (Integer) msg;
Logger.info("打印出一个整数: " + integer);
}
}
public class Byte2IntegerDecoderTester {
/**
* 整数解码器的使用实例
*/
@Test
public void testByteToIntegerDecoder() {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(new Byte2IntegerDecoder());
ch.pipeline().addLast(new IntegerProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 0; j < 100; j++) {
ByteBuf buf = Unpooled.buffer();
buf.writeInt(j);
channel.writeInbound(buf);
}
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
最后说明 ByteToMessageDecoder 传递到下一个handler是解码后的pojo对象,不是bytebuf缓冲区。
bytebuf是如何计数和释放内存的呢。
ByteToMessageDecoder解码时会自动调用ReferenceCountUtil.release()
ReplayingDecoder
上面解码器面临一个问题,就是需要堆bytebuf长度进行判断,如果有足够的字节才能读取。这完全可以用netty帮我们解决。
- 在读取bytebuf时需要检查缓冲区是否有足够的字节
- 如果有,正常读取,反之,停止解码
public class Byte2IntegerReplayDecoder extends ReplayingDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) {
int i = in.readInt();
Logger.info("解码出一个整数: " + i);
out.add(i);
}
}
public class Byte2IntegerReplayDecoderTester {
@Test
public void testByte2IntegerReplayDecoder() {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(new Byte2IntegerReplayDecoder());
ch.pipeline().addLast(new IntegerProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 0; j < 100; j++) {
ByteBuf buf = Unpooled.buffer();
buf.writeInt(j);
channel.writeInbound(buf);
}
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
开箱即用的netty内置decode
LineBasedFrameDecoder
向通道写入100个入栈数据包,每一个入栈数据包都以\n\r结束。会分割成一个个入栈bytebuf,然后发送给StringDecoder,它的作用就是把bytebuf转化成string,然后发送给业务处理。
有一个最大长度,超过最大长度仍然没有发现换行符,就会抛出异常。
static String spliter = "\r\n";
static String content = "好好学习天天向上";
@Test
public void testLineBasedFrameDecoder() {
try {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 0; j < 100; j++) {
ByteBuf buf = Unpooled.buffer();
for (int k = 0; k < 3; k++) {
buf.writeBytes(content.getBytes("UTF-8"));
}
buf.writeBytes(spliter.getBytes("UTF-8"));
channel.writeInbound(buf);
}
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public class StringProcessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String s = (String) msg;
System.out.println("打印: " + s);
}
}
DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder不仅可以使用换行符,还可以使用其他特殊字符作为分隔符,例如\t。
构造方法如下
public DelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) {
this(maxFrameLength, stripDelimiter, true, delimiter);
}
maxFrameLength:解码数据包最大长度
stripDelimiter:解码后数据包是否去掉分隔符,一般选择是
delimiter:分隔符
static String spliter2 = "\t";
@Test
public void testDelimiterBasedFrameDecoder() {
try {
final ByteBuf delimiter = Unpooled.copiedBuffer(spliter2.getBytes("UTF-8"));
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(
new DelimiterBasedFrameDecoder(1024, true, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 0; j < 100; j++) {
//1-3之间的随机数
int random = RandomUtil.randInMod(3);
ByteBuf buf = Unpooled.buffer();
for (int k = 0; k < random; k++) {
buf.writeBytes(content.getBytes("UTF-8"));
}
buf.writeBytes(spliter2.getBytes("UTF-8"));
channel.writeInbound(buf);
}
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
LengthFieldBasedFrameDecoder
自定义长度数据包解码器,传输内容中有LengthField长度字段的值,是值放在数据包中传输内容的字节数。
static String content = "好好学习天天向上";
@Test
public void testLengthFieldBasedFrameDecoder() {
try {
final LengthFieldBasedFrameDecoder spliter =
new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4);
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(spliter);
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new StringProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 0; j < 100; j++) {
//1-3之间的随机数
int random = RandomUtil.randInMod(3);
ByteBuf buf = Unpooled.buffer();
byte[] bytes = content.getBytes("UTF-8");
buf.writeInt(bytes.length * random);
for (int k = 0; k < random; k++) {
buf.writeBytes(bytes);
}
channel.writeInbound(buf);
}
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
head-content协议
1、maxFrameLength:发送数据包的最大长度,例子中为1024,表示一个数据包最多可发送1024个字节数据
2、lengthFieldOffset:长度字段偏移量,表示长度字段位于整个数据包内部字节数组的的下标值,demo为0,也就是长度字段放在第一位
3、lengthFieldLength:长度所占的字节数,如果是一个int整数,则为4,如果是一个byte则为2,demo为4,表示长度字段占用的数据包的4个字节=
4、lengthAdjustment:长度矫正值 = 内容字段便宜量-长度字段偏移量-长度字段字节数=4-0-4=0
5、initialBytesToStrip:丢弃的字节数,表示最终获取内容时抛弃前面4个字节的数据
head-content协议示例图
多字段Head-Content协议
在实际使用过程中,远没有那么简单,除了长度和内容,还包含其他字段,如版本号等等
1、maxFrameLength:发送数据包的最大长度,例子中为1024,表示一个数据包最多可发送1024个字节数据
2、lengthFieldOffset:长度字段偏移量,表示长度字段位于整个数据包内部字节数组的的下标值,demo为0,也就是长度字段放在第一位
3、lengthFieldLength:长度所占的字节数,如果是一个int整数,则为4,如果是一个byte则为2,demo为4,表示长度字段占用的数据包的4个字节
4、lengthAdjustment:长度矫正值 = 内容字段便宜量-长度字段偏移量-长度字段字节数 = 6-0-4=2
5、initialBytesToStrip:丢弃的字节数,表示最终获取内容时抛弃前面4个字节的数据。这个字段现在为6,就是说抛弃前面6个字节的数据。
public static final int VERSION = 100;
static String content = "好好学习天天向上";
@Test
public void testLengthFieldBasedFrameDecoder2() {
try {
final LengthFieldBasedFrameDecoder spliter =
new LengthFieldBasedFrameDecoder(1024, 0, 4, 2, 6);
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(spliter);
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new StringProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 1; j <= 100; j++) {
ByteBuf buf = Unpooled.buffer();
String s = j + "次发送->" + content;
byte[] bytes = s.getBytes("UTF-8");
buf.writeInt(bytes.length);
buf.writeChar(VERSION);
buf.writeBytes(bytes);
channel.writeInbound(buf);
}
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
Encoder
解码器
MessageToByteEncoder
非常重要的一个基类,作用是将一个java对象解析成一个bytebuf数据包。这个一个抽象类,我们需要继承这个类重写encode方法。
public class Integer2ByteEncoder extends MessageToByteEncoder<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
throws Exception {
out.writeInt(msg);
}
}
泛型表示java pojo原类型,编码完成后,基类MessageToByteEncoder会将bytebuf传递到下一个outbound handler。
@Test
public void testIntegerToByteDecoder() {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(new Integer2ByteEncoder());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 0; j < 100; j++) {
channel.write(j);
}
channel.flush();
//取得通道的出站数据帧
ByteBuf buf = (ByteBuf) channel.readOutbound();
while (null != buf) {
System.out.println("o = " + buf.readInt());
buf = (ByteBuf) channel.readOutbound();
}
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
在demo中,先将处理器加入到嵌入式通道,然后调用write方法向通道写入100个数字,写完之后调用readOutbound方法从通道模拟出站数据包,并不断循环,想数据打印出来。
解码器与编码器的结合使用
public class Byte2IntegerCodec extends ByteToMessageCodec<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
throws Exception {
out.writeInt(msg);
System.out.println("write Integer = " + msg);
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
if (in.readableBytes() >= 4) {
int i = in.readInt();
System.out.println("Decoder i= " + i);
out.add(i);
}
}
}
组合器的使用
public class IntegerDuplexHandler extends CombinedChannelDuplexHandler<
Byte2IntegerDecoder,
Integer2ByteEncoder>{
public IntegerDuplexHandler() {
super(new Byte2IntegerDecoder(), new Integer2ByteEncoder());
}
}
保证原有逻辑,将编码解码放入一个组合器中。使用时比较简洁方便。
json和ProtoBuf序列化
当我们发送一个java对象时,一般都会涉及序列化,目前我i们的编码方式有
- json,常用,性能稍差
- xml,同json,性能差,可读性高
- java内置序列化与发序列化,无法跨语言
理论上来说,对于性能不太高的服务器程序,我们可以使用json,性能要求高的服务器程序,应该选择性能更高的二进制序列化框架,目前建议是ProtoBuf。
粘包和拆包
半包:收到客户端的bytebuf被拆开了,收到多个破碎的包。只接受到客户端的一部分
全包:收到一个完成符合条件的包
粘包:收到客户端的多个bytebuf粘在一起
json
建议序列化时使用gson,反序列化时使用fastjson
客户端流程
- 通过Gson将对象转成json
- 然后StringEncoder将字符串转为二进制数组
- 使用LengthFieldPrepender将二进制数组编码成head-content格式的二进制数据包,自动对json加4个字节头长度
服务端
- LengthFieldBasedFrameDecoder解码,去除掉前4位
- StringDecoder 字节数据转字符串
- 业务处理json字符串
public class JsonSendClient {
static String content = "好好学习天天向上!";
private int serverPort;
private String serverIp;
Bootstrap b = new Bootstrap();
public JsonSendClient(String ip, int port) {
this.serverPort = port;
this.serverIp = ip;
}
public void runClient() {
//创建reactor 线程组
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 设置reactor 线程组
b.group(workerLoopGroup);
//2 设置nio类型的channel
b.channel(NioSocketChannel.class);
//3 设置监听端口
b.remoteAddress(serverIp, serverPort);
//4 设置通道的参数
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配通道流水线
b.handler(new ChannelInitializer<SocketChannel>() {
//初始化客户端channel
protected void initChannel(SocketChannel ch) throws Exception {
// 客户端channel流水线添加2个handler处理器
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
}
});
ChannelFuture f = b.connect();
f.addListener((ChannelFuture futureListener) ->
{
if (futureListener.isSuccess()) {
Logger.info("EchoClient客户端连接成功!");
} else {
Logger.info("EchoClient客户端连接失败!");
}
});
// 阻塞,直到连接完成
f.sync();
Channel channel = f.channel();
//发送 Json 字符串对象
for (int i = 0; i < 1000; i++) {
JsonMsg user = build(i, i + "->" + content);
channel.writeAndFlush(user.convertToJson());
Logger.info("发送报文:" + user.convertToJson());
}
channel.flush();
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
}
}
//构建Json对象
public JsonMsg build(int id, String content) {
JsonMsg user = new JsonMsg();
user.setId(id);
user.setContent(content);
return user;
}
public static void main(String[] args) throws InterruptedException {
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
String ip = NettyDemoConfig.SOCKET_SERVER_IP;
new JsonSendClient(ip, port).runClient();
}
}
public class JsonServer {
private final int serverPort;
ServerBootstrap b = new ServerBootstrap();
public JsonServer(int port) {
this.serverPort = port;
}
public void runServer() {
//创建reactor 线程组
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 设置reactor 线程组
b.group(bossLoopGroup, workerLoopGroup);
//2 设置nio类型的channel
b.channel(NioServerSocketChannel.class);
//3 设置监听端口
b.localAddress(serverPort);
//4 设置通道的参数
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception {
// pipeline管理子通道channel中的Handler
// 向子channel流水线添加3个handler处理器
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new JsonMsgDecoder());
}
});
// 6 开始绑定server
// 通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = b.bind().sync();
Logger.info(" 服务器启动成功,监听端口: " +
channelFuture.channel().localAddress());
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 8 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
//服务器端业务处理器
static class JsonMsgDecoder extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String json = (String) msg;
JsonMsg jsonMsg = JsonMsg.parseFromJson(json);
Logger.info("收到一个 Json 数据包 =》" + jsonMsg);
}
}
public static void main(String[] args) throws InterruptedException {
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
new JsonServer(port).runServer();
}
}
ProtoBuf
ProtoBuf是一个高性能、易扩展的序列化框架。
如何生成POJO与Builder
- 下载ProtoBuf安装包,需要用到protoc.exe文件。protoc.exe --java_out=./src/main/java/ ./ Msg.proto
- maven插件
使用Builder构造者,构造POJO消息对象
public static MsgProtos.Msg buildMsg() {
MsgProtos.Msg.Builder personBuilder = MsgProtos.Msg.newBuilder();
personBuilder.setId(1000);
personBuilder.setContent("好好学习天天向上");
MsgProtos.Msg message = personBuilder.build();
return message;
}
序列化与反序列化一
public void serAndDesr1() throws IOException {
MsgProtos.Msg message = buildMsg();
//将Protobuf对象,序列化成二进制字节数组
byte[] data = message.toByteArray();
//可以用于网络传输,保存到内存或外存
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
outputStream.write(data);
data = outputStream.toByteArray();
//二进制字节数组,反序列化成Protobuf 对象
MsgProtos.Msg inMsg = MsgProtos.Msg.parseFrom(data);
}
序列化与反序列化二
public void serAndDesr2() throws IOException {
MsgProtos.Msg message = buildMsg();
//序列化到二进制流
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
message.writeTo(outputStream);
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
//从二进流,反序列化成Protobuf 对象
MsgProtos.Msg inMsg = MsgProtos.Msg.parseFrom(inputStream);
}
序列化与反序列化三
public void serAndDesr3() throws IOException {
MsgProtos.Msg message = buildMsg();
//序列化到二进制流
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
message.writeDelimitedTo(outputStream);
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
//从二进流,反序列化成Protobuf 对象
MsgProtos.Msg inMsg = MsgProtos.Msg.parseDelimitedFrom(inputStream);
}
Netty实战演练
public class ProtoBufServer {
private final int serverPort;
ServerBootstrap b = new ServerBootstrap();
public ProtoBufServer(int port) {
this.serverPort = port;
}
public void runServer() {
//创建reactor 线程组
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 设置reactor 线程组
b.group(bossLoopGroup, workerLoopGroup);
//2 设置nio类型的channel
b.channel(NioServerSocketChannel.class);
//3 设置监听端口
b.localAddress(serverPort);
//4 设置通道的参数
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception {
// pipeline管理子通道channel中的Handler
// 向子channel流水线添加3个handler处理器
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(MsgProtos.Msg.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufBussinessDecoder());
}
});
// 6 开始绑定server
// 通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = b.bind().sync();
Logger.info(" 服务器启动成功,监听端口: " +
channelFuture.channel().localAddress());
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 8 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
//服务器端业务处理器
static class ProtobufBussinessDecoder extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MsgProtos.Msg protoMsg = (MsgProtos.Msg) msg;
//经过pipeline的各个decoder,到此Person类型已经可以断定
Logger.info("收到一个 MsgProtos.Msg 数据包 =》");
Logger.info("protoMsg.getId():=" + protoMsg.getId());
Logger.info("protoMsg.getContent():=" + protoMsg.getContent());
}
}
public static void main(String[] args) throws InterruptedException {
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
new ProtoBufServer(port).runServer();
}
}
public class ProtoBufSendClient {
static String content = "好好学习天天向上";
private int serverPort;
private String serverIp;
Bootstrap b = new Bootstrap();
public ProtoBufSendClient(String ip, int port) {
this.serverPort = port;
this.serverIp = ip;
}
public void runClient() {
//创建reactor 线程组
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 设置reactor 线程组
b.group(workerLoopGroup);
//2 设置nio类型的channel
b.channel(NioSocketChannel.class);
//3 设置监听端口
b.remoteAddress(serverIp, serverPort);
//4 设置通道的参数
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配通道流水线
b.handler(new ChannelInitializer<SocketChannel>() {
//初始化客户端channel
protected void initChannel(SocketChannel ch) throws Exception {
// 客户端channel流水线添加2个handler处理器
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
}
});
ChannelFuture f = b.connect();
f.addListener((ChannelFuture futureListener) ->
{
if (futureListener.isSuccess()) {
Logger.info("EchoClient客户端连接成功!");
} else {
Logger.info("EchoClient客户端连接失败!");
}
});
// 阻塞,直到连接完成
f.sync();
Channel channel = f.channel();
//发送 Protobuf 对象
for (int i = 0; i < 1000; i++) {
MsgProtos.Msg user = build(i, i + "->" + content);
channel.writeAndFlush(user);
Logger.info("发送报文数:" + i);
}
channel.flush();
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
}
}
//构建ProtoBuf对象
public MsgProtos.Msg build(int id, String content) {
MsgProtos.Msg.Builder builder = MsgProtos.Msg.newBuilder();
builder.setId(id);
builder.setContent(content);
return builder.build();
}
public static void main(String[] args) throws InterruptedException {
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
String ip = NettyDemoConfig.SOCKET_SERVER_IP;
new ProtoBufSendClient(ip, port).runClient();
}
}