netty(二)–使用Marshalling编解码java对象
文章目录
一、简介
marshalling是jboss的java对象序列化包,修正了jdk原生序列化存在的问题,保持了对java.io.Serializable接口的兼容,这里介绍以marshalling为编解码工具在netty中的使用。
二、代码示例
2.1 添加maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.6.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.46</version>
</dependency>
2.2 定义消息对象
package com.dragon.study.netty;
import lombok.Data;
import java.io.Serializable;
@Data
public class Msg implements Serializable {
private String id;
private String content;
public Msg(String id, String content) {
this.id = id;
this.content = content;
}
}
2.3 定义marshalling编解码工具
package com.dragon.study.netty;
import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
public class MarshallingFactory {
//生成Marshalling工厂类
private static MarshallerFactory mf = Marshalling.getProvidedMarshallerFactory("serial");
private static MarshallingConfiguration conf = new MarshallingConfiguration();
//解码器
private static MarshallingDecoder decoder;
//编码器
private static MarshallingEncoder encoder;
static{
//设置版本
conf.setVersion(5);
UnmarshallerProvider upr = new DefaultUnmarshallerProvider(mf, conf);
decoder = new MarshallingDecoder(upr, 1024);
MarshallerProvider pr = new DefaultMarshallerProvider(mf, conf);
encoder = new MarshallingEncoder(pr);
}
//生成编码器
public static MarshallingEncoder getEncoder(){
return encoder;
}
//生成编码器
public static MarshallingDecoder getDecoder(){
return decoder;
}
}
2.4 定义服务端
package com.dragon.study.netty;
import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Scanner;
public class NettyMarshallingServerMain {
public static void main(String[] args) throws Exception {
int port = 7001;
//主线程组,接收网络请求
EventLoopGroup bossGroup = new NioEventLoopGroup();
//worker线程组,对接收到的请求进行读写处理
EventLoopGroup workerGroup = new NioEventLoopGroup();
//启动服务的启动类(辅助类)
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 添加主线程组和worker线程组
.channel(NioServerSocketChannel.class) //设置channel为服务端NioServerSocketChannel
.childHandler(new ChannelInitializer<NioSocketChannel>() { //绑定io事件处理类
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
//设置编解码
pipeline.addLast(MarshallingFactory.getEncoder());
pipeline.addLast(MarshallingFactory.getDecoder());
pipeline.addLast(new IODisposeHandler()); //添加io处理器
}
})
.option(ChannelOption.SO_BACKLOG, 128) //设置日志
.option(ChannelOption.SO_SNDBUF, 32 * 1024) //设置发送缓存
.option(ChannelOption.SO_RCVBUF, 32 * 1024) //接收缓存
.childOption(ChannelOption.SO_KEEPALIVE, true); //是否保持连接
//绑定端口,同步等待成功
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("服务启动,等待连接");
//关闭监听端口,同步等待
future.channel().closeFuture().sync();
//退出,释放线程资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
/**
* io事件处理
*/
static class IODisposeHandler extends ChannelHandlerAdapter {
WriteThread writeThread;
/**
* 建立连接
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("收到连接:" + ctx.channel());
//新起写数据线程
writeThread = new WriteThread(ctx);
writeThread.start();
}
/**
* 消息读取
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Msg m = (Msg) msg;
System.out.println("server receive msg:" + JSON.toJSONString(m));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("连接出错");
writeThread.runFlag = false;
ctx.close();
}
}
/**
* 写数据线程
*/
static class WriteThread extends Thread {
ChannelHandlerContext ctx;
//线程关闭标志位
volatile boolean runFlag = true;
public WriteThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
try {
Scanner scanner = new Scanner(System.in);
while (runFlag) {
System.out.print("server send msg:");
String msg = scanner.nextLine();
Msg m = new Msg("two", msg);
//发送数据
ctx.channel().writeAndFlush(m);
}
} catch (Exception e) {
}
}
}
}
2.5 定义客户端
package com.dragon.study.netty;
import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Scanner;
public class NettyMarshallingClientMain {
public static void main(String[] args) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
//设置编解码
pipeline.addLast(MarshallingFactory.getEncoder());
pipeline.addLast(MarshallingFactory.getDecoder());
pipeline.addLast(new IODisposeHandler());//添加io处理器
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 7001).sync();
future.channel().closeFuture().sync();
workerGroup.shutdownGracefully();
}
/**
* io事件处理
*/
static class IODisposeHandler extends ChannelHandlerAdapter {
WriteThread writeThread;
/**
* 建立连接
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("收到连接:" + ctx.channel());
//新起写数据线程
writeThread = new WriteThread(ctx);
writeThread.start();
}
/**
* 消息读取
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Msg m = (Msg) msg;
System.out.println("server receive msg:" + JSON.toJSONString(m));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("连接出错");
writeThread.runFlag = false;
ctx.close();
}
}
/**
* 写数据线程
*/
static class WriteThread extends Thread {
ChannelHandlerContext ctx;
//线程关闭标志位
volatile boolean runFlag = true;
public WriteThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
try {
Scanner scanner = new Scanner(System.in);
while (runFlag) {
System.out.print("server send msg:");
String msg = scanner.nextLine();
Msg m = new Msg("one", msg);
//发送数据
ctx.channel().writeAndFlush(m);
}
} catch (Exception e) {
}
}
}
}