服务端:
package com.jym.groupchat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* @program: NettyPro
* @description:
* @author: jym
* @create: 2020/02/09
*/
public class JymGroupChatSever {
private String host;
private int port;
public JymGroupChatSever(String host, int port) {
this.host = host;
this.port = port;
}
/**
* 编写run方法,处理客户端请求
*/
public void run(){
// 创建线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置参数
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 向pipeline加入一个解码器
pipeline.addLast("decoder",new StringDecoder());
// 向pipeline加入一个编码器
pipeline.addLast("encoder",new StringEncoder());
// 加入自定义处理器
pipeline.addLast(new JymGroupChatHandler());
}
});
System.out.println("服务器 is ready");
// 异步监听
ChannelFuture sync = serverBootstrap.bind(port).sync();
sync.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
JymGroupChatSever jymGroupChatSever = new JymGroupChatSever("127.0.0.1",6668);
jymGroupChatSever.run();
}
}
服务端自定义handler
package com.jym.groupchat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
/**
* @program: NettyPro
* @description:
* @author: jym
* @create: 2020/02/09
*/
public class JymGroupChatHandler extends SimpleChannelInboundHandler<String> {
/**
* 定义一个线程组,管理所有channel
* GlobalEventExecutor 全局的事件执行器,是一个单例
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
/**
* handlerAdded 表示连接建立,一旦连接,第一个被执行
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 将该客户上线的消息,推送给其他在线的客户端,channelGroup.writeAndFlush会将所有的channel遍历发送
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天\n");
channelGroup.add(channel);
}
/**
* 表示channel处于活动状态,提示xx上线
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress()+"上线了");
}
/**
* 表示channel处于不活动状态,提示xx下线
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress()+"离线了");
}
/**
* 断开连接,将下线信息提示给当前在线的所有客户端
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"离开了\n");
}
/**
* 读取数据
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, final String msg) throws Exception {
final Channel channel = channelHandlerContext.channel();
// 根据不同的情况 回送不同的消息
channelGroup.forEach(ch ->{
// 不是当前的channel 转发消息
if(ch!=channel){
ch.writeAndFlush("[客户]"+channel.remoteAddress()+"发送消息:"+msg+"\n");
} else {
ch.writeAndFlush("[自己]发送了消息:"+msg+"\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端:
package com.jym.groupchat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
/**
* @program: NettyPro
* @description:
* @author: jym
* @create: 2020/02/09
*/
public class JymGroupChatClient {
private final String host;
private final int port;
public JymGroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run(){
// 创建线程组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(eventExecutors)
// 设置通道的实现类(反射)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 向pipeline加入一个解码器
pipeline.addLast("decoder",new StringDecoder());
// 向pipeline加入一个编码器
pipeline.addLast("encoder",new StringEncoder());
// 加入自定义处理器
pipeline.addLast(new JymGroupChatClientHandler());
}
});
System.out.println("客户端 is ok");
ChannelFuture sync = bootstrap.connect(host, port).sync();
// 得到channel
Channel channel = sync.channel();
System.out.println("=========="+channel.localAddress()+"============");
// 客户端需要输入信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.nextLine();
// 通过channel发送传到服务器端
channel.writeAndFlush(msg+"\r\n");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
public static void main(String[] args) {
JymGroupChatClient jymGroupChatClient = new JymGroupChatClient("127.0.0.1",6668);
jymGroupChatClient.run();
}
}
客户端自定义handler
package com.jym.groupchat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
/**
* @program: NettyPro
* @description:
* @author: jym
* @create: 2020/02/09
*/
public class JymGroupChatHandler extends SimpleChannelInboundHandler<String> {
/**
* 定义一个线程组,管理所有channel
* GlobalEventExecutor 全局的事件执行器,是一个单例
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
/**
* handlerAdded 表示连接建立,一旦连接,第一个被执行
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 将该客户上线的消息,推送给其他在线的客户端,channelGroup.writeAndFlush会将所有的channel遍历发送
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天\n");
channelGroup.add(channel);
}
/**
* 表示channel处于活动状态,提示xx上线
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress()+"上线了");
}
/**
* 表示channel处于不活动状态,提示xx下线
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress()+"离线了");
}
/**
* 断开连接,将下线信息提示给当前在线的所有客户端
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"离开了\n");
}
/**
* 读取数据
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, final String msg) throws Exception {
final Channel channel = channelHandlerContext.channel();
// 根据不同的情况 回送不同的消息
channelGroup.forEach(ch ->{
// 不是当前的channel 转发消息
if(ch!=channel){
ch.writeAndFlush("[客户]"+channel.remoteAddress()+"发送消息:"+msg+"\n");
} else {
ch.writeAndFlush("[自己]发送了消息:"+msg+"\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
测试结果:
这里我们启动三个客户端和一个服务端:
服务端:
客户端: