netty 实现客户端与服务端的简单通信
我这里话就不多说了,直接上代码。
Server端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
public class Server {
// 管理连接的client
public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static void main(String[] args) {
Server server = new Server();
server.serverStart();
}
public void serverStart() {
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 负责连接 用来accept
EventLoopGroup workerGroup = new NioEventLoopGroup(2); // 负责处理
ServerBootstrap sb = new ServerBootstrap(); // 辅助启动类
try {
ChannelFuture f = sb.group(bossGroup,workerGroup) // 指定线程池组
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ServerChildHandler());
}
})
.bind(9999)
.sync();
f.channel().closeFuture().sync(); // close->ChannelFuture
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
class ServerChildHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Server.clients.add(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 删除出现异常的客户端channel,并关闭连接
Server.clients.remove(ctx.channel());
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead");
ByteBuf buf = (ByteBuf)msg;
// ctx.writeAndFlush(buf);
System.out.println(buf);
Server.clients.writeAndFlush(buf); // 将Server端接收到的信息广播到client端
}
}
Client端
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public void connect(){
// 线程池(事件处理线程池)
NioEventLoopGroup group = new NioEventLoopGroup();
// 辅助启动类
Bootstrap b = new Bootstrap();
try {
ChannelFuture cf = b.group(group) // 把线程池放进来
.channel(NioSocketChannel.class)
.handler(new ClientChannelInitializer())
.connect("localhost",9999);
cf.addListener((ChannelFutureListener) channelFuture -> {
if(!channelFuture.isSuccess()){
System.out.println("not connected!");
}else{
System.out.println("connected!");
}
}).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
Client c = new Client();
c.connect();
}
}
ClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.ReferenceCountUtil;
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ClientHandler()); // 处理channelRead,read()时才经过这个
}
}
class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// channel 第一次连上可用,写出一个字符串
ByteBuf buf = Unpooled.copiedBuffer("hello".getBytes());
ctx.writeAndFlush(buf); // 自动释放资源
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf=null;
try {
buf = (ByteBuf)msg;
System.out.println(buf);
}finally {
if (buf != null) {
ReferenceCountUtil.release(buf);
}
}
}
}
以上例子能够简单的实现客户端与服务端之间的通信