EchoServer:
package messagepack;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
/**
* 时间服务器,当有客户端请求时,返回服务端当前时间
*/
public class EchoServer {
public void bind(int port){
//NioEventLoopGroup是一个线程组,它包含了一组NIO线程,
// 这里的两个线程组一个是用于服务端接受客户端的连接,
// 另一个用于SocketChannel的网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//netty用于启动NIO服务端的辅助启动类
ServerBootstrap b = new ServerBootstrap();
//设置线程组
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//设置channel
.option(ChannelOption.SO_BACKLOG, 100)//设置channel的TCP参数
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.childHandler(new ChildChannelHandler());//绑定IO事件处理类
//绑定监听端口,调用同步阻塞方法等待绑定完成
ChannelFuture f = b.bind(port).sync();
//阻塞,等待服务端链路关闭后main函数才退出
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//优雅退出,释放跟shutdownGracefully相关联的所有资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel serverSocket) throws Exception {
serverSocket.pipeline().addLast("frameDecode",
new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));
serverSocket.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
serverSocket.pipeline().addLast("frameEncode", new LengthFieldPrepender(2));
serverSocket.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
serverSocket.pipeline().addLast(new EchoServerHandler());
}
}
public static void main(String[] args) {
int port = 8080;
new EchoServer().bind(port);
}
}
EchoServerHandler
package messagepack;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
/**
* 对网络事件进行读写操作
*/
public class EchoServerHandler extends ChannelHandlerAdapter {
/**
* 当异常发生时
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//super.exceptionCaught(ctx, cause);
ctx.close();
}
/**
* 读取缓冲区里面的数据,处理并返回
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
List<UserInfo> userInfo = (List<UserInfo>) msg;
System.out.println("server receive message from client =" + userInfo);
ctx.writeAndFlush(userInfo);
}
}
解码器MsgpackDecoder:
package messagepack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;
import java.util.List;
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//首先获取需要解码的byte数组
final byte [] array;
final int length = msg.readableBytes();
array = new byte[length];
msg.getBytes(msg.readerIndex(), array, 0, length);
//mp的read方法将其反序列化为object对象
MessagePack mp = new MessagePack();
mp.register(UserInfo.class);
out.add(mp.read(array));
}
}
编码器MsgpackEncoder
package messagepack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;
public class MsgpackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
MessagePack mp = new MessagePack();
byte [] raw = mp.write(msg);
out.writeBytes(raw);
}
}
传输对象UserInfo:
package messagepack;
import org.msgpack.annotation.Message;
@Message
public class UserInfo{
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "UserInfo{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
EchoClient:
package messagepack;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
/**
* netty的客户端
*/
public class EchoClient {
private final String host;
private final int port;
private final int sendNum;
private EchoClient(String host, int port, int sendNum){
this.host = host;
this.port = port;
this.sendNum = sendNum;
}
public void run(){
//NioEventLoopGroup是一个线程组,它包含了一组NIO线程
EventLoopGroup group = new NioEventLoopGroup();
try {
//客户端辅助启动类
Bootstrap b = new Bootstrap();
//设置线程组
b.group(group)
.channel(NioSocketChannel.class)//设置Channel
.option(ChannelOption.TCP_NODELAY, true)//设置TCP的参数
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {//匿名内部类设置handler
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//解决拆包、粘包读写问题
//在解码器之前增加LengthFieldBasedFrameDecoder,用于处理半包消息,这样接受到的永远是整包消息
//个人觉得和分隔符的意义差不多
socketChannel.pipeline().addLast("frameDecode",
new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));
//解码
socketChannel.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
//在编码器之前增加2个消息的消息长度字段
socketChannel.pipeline().addLast("frameEncode", new LengthFieldPrepender(2));
socketChannel.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
socketChannel.pipeline().addLast(new EchoClientHandler(sendNum));
}
});
//异步连接客户端,同步阻塞直到连接成功
ChannelFuture f = b.connect(host, port).sync();
//阻塞,等待客户端链路关闭后main函数才退出
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
String host = "127.0.0.1";
int port = 8080;
new EchoClient(host, port, 10).run();
}
}
EchoClientHandler:
package messagepack;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class EchoClientHandler extends ChannelHandlerAdapter {
private final int sendNum;
public EchoClientHandler(int sendNum){
this.sendNum = sendNum;
}
/**
* 连接服务端成功后开始发送消息
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
UserInfo [] userInfos = userInfo();
for (UserInfo userInfo: userInfos) {
ctx.writeAndFlush(userInfo);
}
// ctx.writeAndFlush(userInfos[0]);
}
private UserInfo [] userInfo(){
UserInfo [] userInfos = new UserInfo[sendNum];
for (int i = 0; i < sendNum; i++){
UserInfo userInfo = new UserInfo();
userInfo.setAge(i);
userInfo.setName("ABCDEF------->" + i);
userInfos[i] = userInfo;
}
return userInfos;
}
/**
* 读取客户端的返回消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Client receive the msgpack messag:" + msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* 发生异常时关闭ctx
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//super.exceptionCaught(ctx, cause);
ctx.close();
}
}