概述
去Google的protocol buffers简单的绕了一圈终于回来了,大致的了解了protobuf的用法,现在回来和Netty集成在一起看看到底有多香!
Netty集成protobuf
Netty的客户端和服务端传递数据的demo已经写了几个了,对Netty在使用上的套路已经有了很清楚的认识:注意各种场景下的Handler使用、重写ChannelInboundHandlerAdapter中各个管道状态的回调方法(channelRegistered、channelUnregistered、channelRead0等)......这里就不在赘述客户端和服务端怎么编写的了。
服务端:
package com.leolee.netty.sixthExample;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
/**
* @ClassName TestServer
* @Description: 基于protobuf的服务端
* @Author LeoLee
* @Date 2020/9/2
* @Version V1.0
**/
public class TestServer {
public static void main(String[] args) throws InterruptedException {
//定义线程组 EventLoopGroup为死循环
//boss线程组一直在接收客户端发起的请求,但是不对请求做处理,boss会将接收到的请i交给worker线程组来处理
//实际可以用一个线程组来做客户端的请求接收和处理两件事,但是不推荐
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//启动类定义
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//子处理器,自定义处理器,服务端可以使用childHandler或者handler,handlerr对应接收线程组(bossGroup),childHandler对应处理线程组(workerGroup)
.handler(new LoggingHandler(LogLevel.INFO))//日志处理器
.childHandler(new TestServerInitializer());
//绑定监听端口
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync();
//定义关闭监听
channelFuture.channel().closeFuture().sync();
} finally {
//Netty提供的优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.leolee.netty.sixthExample;
import com.leolee.protobuf.DataInfo;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/**
* @ClassName TestServerInitializer
* @Description: TODO
* @Author LeoLee
* @Date 2020/9/2
* @Version V1.0
**/
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("protobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(DataInfo.Student.getDefaultInstance()));
pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast(new TestServerHandler());
}
}
package com.leolee.netty.sixthExample;
import com.leolee.protobuf.DataInfo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @ClassName TestServerHandler
* @Description: TODO
* @Author LeoLee
* @Date 2020/9/2
* @Version V1.0
**/
public class TestServerHandler extends SimpleChannelInboundHandler<DataInfo.Student> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DataInfo.Student msg) throws Exception {
System.out.println(msg.getName());
System.out.println(msg.getAge());
System.out.println(msg.getAddress());
}
}
主要注意的就是TestServerInitializer中如下代码
pipeline.addLast("protobufDecoder", new ProtobufDecoder(DataInfo.Student.getDefaultInstance()));
以及TestServerHandler继承类SimpleChannelInboundHandler的泛型类型的变化:
public class TestServerHandler extends SimpleChannelInboundHandler<DataInfo.Student> {
}
这里用到我在protobuf学习(3):编译.proto文件生成Java代码,以及序列化和反序列化message中生成Java code:DataInfo类
该类提供了对Student message的序列化、反序列化等一些列的操作方法来帮助我们极其简单的构造Netty消息传递所需要的数据,同时Netty对protobuf也有很好的支持。
客户端:
package com.leolee.netty.sixthExample;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @ClassName TestClient
* @Description: 基于protobuf的客户端
* @Author LeoLee
* @Date 2020/9/2
* @Version V1.0
**/
public class TestClient {
public static void main(String[] args) throws InterruptedException {
//客户端只需要一个线程组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
//声明客户端启动类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new TestClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
//优雅关闭
eventLoopGroup.shutdownGracefully();
}
}
}
package com.leolee.netty.sixthExample;
import com.leolee.protobuf.DataInfo;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/**
* @ClassName TestClientInitializer
* @Description: TODO
* @Author LeoLee
* @Date 2020/9/2
* @Version V1.0
**/
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("protobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(DataInfo.Student.getDefaultInstance()));
pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast("TestClientHandler", new TestClientHandler());
}
}
package com.leolee.netty.sixthExample;
import com.leolee.protobuf.DataInfo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @ClassName TestClientHandler
* @Description: TODO
* @Author LeoLee
* @Date 2020/9/2
* @Version V1.0
**/
public class TestClientHandler extends SimpleChannelInboundHandler<DataInfo.Student> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DataInfo.Student msg) throws Exception {
}
/**
* 功能描述: <br> 连接建立变为活跃状态后,马上向服务端写入Student message
* 〈〉
* @Param: [ctx]
* @Return: void
* @Author: LeoLee
* @Date: 2020/9/2 21:11
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
DataInfo.Student student = DataInfo.Student
.newBuilder()
.setName("LeoLee")
.setAge(25)
.setAddress("上海")
.build();
ctx.channel().writeAndFlush(student);
}
}
运行结果:
九月 02, 2020 10:14:20 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x034b9076] REGISTERED
九月 02, 2020 10:14:20 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0x034b9076] BIND: 0.0.0.0/0.0.0.0:8899
九月 02, 2020 10:14:20 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x034b9076, L:/0:0:0:0:0:0:0:0:8899] ACTIVE
九月 02, 2020 10:14:30 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x034b9076, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x054993bd, L:/127.0.0.1:8899 - R:/127.0.0.1:49587]
九月 02, 2020 10:14:30 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x034b9076, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
LeoLee
25
上海
多message类型的传递
上面的例子很好的演示了Netty集成protobuf的解析和序列化,但是写死了DataInfo.Student.getDefaultInstance()和SimpleChannelInboundHandler<DataInfo.Student>,这样就存在一个问题,就是客户端只能传递DataInfo.Student类型的message,实际应用场景客户端和服务端数据沟通肯定是多种多样的,Netty又没有提供类似于SpringMVC那样的请求路由功能,实际上就是客户端和服务端建立了一个可保持的通讯通道,所有的数据都要从这个通道传递,那么针对于这种实际情况应该怎么处理呢?
在 protobuf 中有一种类型的字段叫做 oneof ,官网是这么解释的:
如果你想在同一时刻在一个多字段的message中只设置其中一个值,你可以使用 oneof 特性来解决,并且节省内存。
Oneof 字段除了共享内存这一特性之外就像其他optional修饰的字段一样,而且在同一时间只能设置一个 Oneof 字段,设置任何成员字段的同时 oneof 将清除其他字段,示例如下:
message SampleMessage {
oneof test_oneof {
string name = 4;
SubMessage sub_message = 9;
}
}
所以我们来创建一下我们 .proto 文件。
syntax = "proto2";
package com.leolee.protobuf;
option optimize_for = SPEED;//Can be set to SPEED, CODE_SIZE, or LITE_RUNTIME,This affects the C++ and Java code generators (and possibly third-party generators) in the following ways
option java_package = "com.leolee.protobuf";
option java_outer_classname = "DataInfo2";
//生成java code 命令:protoc --java_out=src/main/java/ src/protobuf/Person.proto
//----------------多message的根节点
message DataPackage {
optional PackageType package_type = 1;
oneof Package {
Student sudent = 2;
Dog dog = 3;
}
}
//数据包类型
enum PackageType {
STUDENT = 0;
DOG = 1;
}
//----------------多message
message Student {
optional string name = 1;
optional int32 age = 2;
optional string address = 3;
}
message Dog {
optional string dog_name = 1;
optional int32 dog_age = 2;
}
使用命令生成Java code:
protoc --java_out=src/main/java/ src/protobuf/Person.proto
关于 protobuf 生成代码的教程请看这里:protobuf学习(3):编译.proto文件生成Java代码,以及序列化和反序列化message
修改之前的客户端和服务端的代码
客户端:
package com.leolee.netty.sixthExample.multiProtocol;
import com.leolee.protobuf.DataInfo;
import com.leolee.protobuf.DataInfo2;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/**
* @ClassName TestClientInitializer
* @Description: TODO
* @Author LeoLee
* @Date 2020/9/2
* @Version V1.0
**/
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("protobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(DataInfo2.DataPackage.getDefaultInstance()));
pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast("TestClientHandler", new TestClientHandler());
}
}
package com.leolee.netty.sixthExample.multiProtocol;
import com.leolee.protobuf.DataInfo2;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @ClassName TestClientHandler
* @Description: TODO
* @Author LeoLee
* @Date 2020/9/2
* @Version V1.0
**/
public class TestClientHandler extends SimpleChannelInboundHandler<DataInfo2.DataPackage> {
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
@Override
protected void channelRead0(ChannelHandlerContext ctx, DataInfo2.DataPackage msg) throws Exception {
}
/**
* 功能描述: <br> 连接建立变为活跃状态后,马上向服务端写入Student message
* 〈〉
* @Param: [ctx]
* @Return: void
* @Author: LeoLee
* @Date: 2020/9/2 21:11
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//一秒执行一次
executor.scheduleAtFixedRate(() -> {
//随机生成 0 或者 1
int packType = new Random().nextInt(2);
switch (DataInfo2.DataPackage.PackageType.forNumber(packType)) {
case STUDENT:
System.out.println("发送student");
DataInfo2.DataPackage dataPackage = DataInfo2.DataPackage.newBuilder()
.setPackageType(DataInfo2.DataPackage.PackageType.STUDENT)
.setSudent(DataInfo2.Student.newBuilder()
.setName("LeoLee").setAge(25).setAddress("上海").build()).build();
ctx.channel().writeAndFlush(dataPackage);
break;
case DOG:
System.out.println("发送dog");
DataInfo2.DataPackage dataPackage2 = DataInfo2.DataPackage.newBuilder()
.setPackageType(DataInfo2.DataPackage.PackageType.DOG)
.setDog(DataInfo2.Dog.newBuilder()
.setDogName("恶霸犬").setDogAge(3).build()).build();
ctx.channel().writeAndFlush(dataPackage2);
break;
}
}, 0, 1, TimeUnit.SECONDS);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("客户端出现异常已关闭");
cause.printStackTrace();
ctx.close();
}
}
服务端:
package com.leolee.netty.sixthExample.multiProtocol;
import com.leolee.protobuf.DataInfo2;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/**
* @ClassName TestServerInitializer
* @Description: TODO
* @Author LeoLee
* @Date 2020/9/2
* @Version V1.0
**/
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("protobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(DataInfo2.DataPackage.getDefaultInstance()));
pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast(new TestServerHandler());
}
}
package com.leolee.netty.sixthExample.multiProtocol;
import com.leolee.protobuf.DataInfo2;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @ClassName TestServerHandler
* @Description: TODO
* @Author LeoLee
* @Date 2020/9/2
* @Version V1.0
**/
public class TestServerHandler extends SimpleChannelInboundHandler<DataInfo2.DataPackage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DataInfo2.DataPackage msg) throws Exception {
System.out.println("msg.getPackageType().getNumber():" + msg.getPackageType().getNumber());
//msg.PackageType.forNumber(packType)
switch (msg.getPackageType().getNumber()) {
case 0:
System.out.println(msg.getSudent().getName());
System.out.println(msg.getSudent().getAge());
System.out.println(msg.getSudent().getAddress());
break;
case 1:
System.out.println(msg.getDog().getDogName());
System.out.println(msg.getDog().getDogAge());
break;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("服务端出现异常已关闭");
cause.printStackTrace();
ctx.close();
}
}
执行结果就不贴出来了,自己感受一下。
需要代码的来这里拿嗷:demo项目地址
未完待续,有几个内置handler没解释?