版权声明:如果喜欢的话,可以撩我哟,此处没有联系方式,想要就自己找哈。 https://blog.csdn.net/qq_39384184/article/details/84861974
- NettyServer:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class NettyServer {
public static int port;
private static NioEventLoopGroup bossGroup = new NioEventLoopGroup();
private static NioEventLoopGroup workGroup = new NioEventLoopGroup();
// Test Code
public static void main(String[] args) {
NettyServer server = new NettyServer();
NettyServer.port = 8300;
server.start();
System.out.println("Netty服务端已启动");
}
public void start() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup);
bootstrap.channel(NioServerSocketChannel.class);
// 内核会根据somaxconn和backlog的较小值设置accept queue的大小
// Windows NT Server 4.0+: 200
// Linux and Max OS X: 128
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
// 通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
bootstrap.option(ChannelOption.TCP_NODELAY, true);
// 防止机器出现意外,导致端口没有释放,而使重启后的绑定失败
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
// 保持长连接状态
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加String编解码器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 业务逻辑处理
pipeline.addLast(new NettyServerHandler());
}
});
// 启动端口
ChannelFuture future;
try {
future = bootstrap.bind(port).sync();
if (future.isSuccess()) {
System.out.println("端口" + port + "已绑定");
}
} catch (InterruptedException e) {
System.out.println("端口" + port + "绑定失败");
}
}
public static void shut() {
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("端口" + port + "已解绑");
}
}
- NettyServerHandler:
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
public class NettyServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("服务端收到消息:" + msg.toString());
ctx.writeAndFlush("服务端收到了:" + msg.toString());
ctx.writeAndFlush("msg from server");
ctx.writeAndFlush("quit");
}
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
System.out.println("服务端发送消息:" + msg.toString());
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
System.out.println("客户端断开连接");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// System.out.println("channelActive");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// System.out.println("exceptionCaught");
}
}
- NettyClient:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class NettyClient {
private ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1);
public static final int PORT = 8300;
public static final String IP = "localhost";
public static EventLoopGroup group = new NioEventLoopGroup();
public void connect(int port, String host, final int index)
throws Exception {
// 配置客户端NIO线程组
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加String编解码器
pipeline.addLast(new StringDecoder(
CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(
CharsetUtil.UTF_8));
// 业务逻辑处理
pipeline.addLast(new NettyClientHandler(index));
}
});
// 发起异步连接操作
ChannelFuture future = b.connect(new InetSocketAddress(host, port));
future.channel().closeFuture().sync();
} finally {
// 所有资源释放完成之后,清空资源,再次发起重连操作
executor.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
try {
// 以下代码可以再channel断开的时候自动重连
// connect(PORT, IP, index);// 发起重连操作
} catch (Exception e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
for (int i = 0; i < 3; i++) {
new NettyClient().connect(PORT, IP, (i + 1));
}
}
}
- NettyClientHandler:
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
public class NettyClientHandler extends ChannelHandlerAdapter {
public int index;
public NettyClientHandler(int index) {
this.index = index;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("客户端" + index + "收到消息:" + msg.toString());
if (msg.toString().equals("quit")) {
ctx.channel().close();
// NettyClient.group.shutdownGracefully();
} else {
ctx.writeAndFlush("msg from client");
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
System.out.println("客户端" + index + "发送消息:" + msg.toString());
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
System.out.println("客户端" + index + "断开连接");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端" + index + "连接上了");
ctx.writeAndFlush("客户端发送的消息,服务端接收到了吗?");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.out.println("exceptionCaught");
}
}