Netty做一个简单的群聊服务端
一个服务端管理多个客户端进行通信。
例如:
1、 服务端监听 客户端上线,下线提醒。
2、 客户端加入群聊提醒。
3、 客户端发送消息,其他客户端都可以接收。
演示:
开始写代码
1.引入依赖
maven引入Netty依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.34.Final</version>
</dependency>
2.服务端启动代码
package com.liuqi.chat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* 群聊服务器
*
* <br>
* create by liuqi 2020/10/6
**/
public class ChatServer {
public final static int PORT = 8088;
public static void main(String[] args) {
/* 处理客户端连接的线程组 */
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
/* 处理读写的线程组 */
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
/* 创建服务端的启动对象 */
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup) // 设置 bossGroup workGroup这两个线程组
.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 作为服务器通道实现
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new StringDecoder()) // 字符串解码处理器
.addLast(new StringEncoder()) // 字符串编码处理器
.addLast(new ChatServerHandler()); // 自定义的处理器
}
});
/* 绑定一个端口并且同步 */
ChannelFuture cf = serverBootstrap.bind(PORT).sync();
System.out.println("服务启动成功");
/* 阻塞当前线程 对通道关闭进行监听*/
cf.channel().closeFuture().sync();
} catch (Exception e) {
System.out.println("启动失败 " + e.getMessage());
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
3.对自定义 ChatServerHandler 处理器的编写
服务器接收客户端连接事件,断开连接事件,接收客户端消息,发送消息给客户端的事件处理。
channel_group 管理所有的客户端通道,由这个集合群发消息给客户端。
package com.liuqi.chat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
*
* 聊天服务器 处理器
* 因为指定了 StringDecoder 解码处理器,SimpleChannelInboundHandler<T> 的泛型直接指定String类型
* 重写 channelRead0()方法接收到客户端发送的消息会解析成 String
*
* <br>
* create by liuqi 2020/10/6
**/
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 定义一个 ChannelGroup 管理所有的 Channel(客户端连接)
* GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
*/
private final static ChannelGroup channel_group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 成功建立连接
* 第一次执行
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
/**
* 会自动遍历 为每一个通道发送消息
*/
channel_group.writeAndFlush(sdf.format(new Date()) + " [客户端]" + channel.remoteAddress() + " 加入聊天");
/* 添加当前通道 */
channel_group.add(channel);
}
/**
* 关闭连接 (调用 close(), 或终止连接 )
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channel_group.writeAndFlush(sdf.format(new Date()) + "[客户端]" + channel.remoteAddress() + " 离开了");
System.out.println("channelGroup size " + channel_group.size());
// 会自动移除
// channel_group.remove(channel);
}
/**
* 通道处于活跃状态
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(sdf.format(new Date()) + "[客户端]" + ctx.channel().remoteAddress() + " 上线了~");
}
/**
* 通道处于 不活跃状态
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(sdf.format(new Date()) + "[客户端]" + ctx.channel().remoteAddress() + " 离线了~");
}
/**
* 消息解码后读取 (也就是接收到的消息处理过了)
*/
protected void channelRead0(final ChannelHandlerContext ctx, String msg) throws Exception {
channel_group.forEach(channel -> {
if (channel != ctx.channel()) {
channel.writeAndFlush(sdf.format(new Date()) + " " + channel.remoteAddress() + " 说: " + msg);
} else {
channel.writeAndFlush(sdf.format(new Date()) + " [我发送了] " + msg);
}
});
}
/**
* 通道异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("连接异常 " + cause.getMessage());
/* 关闭通道 */
ctx.close();
}
}
4.客户端启动代码
启动客户端,连接到服务端成功后,进行阻塞,监听键盘输入。每输入一行就发送消息给服务端,
服务端在将消息群发给其他客户端。
注意:连接服务端成功后需要阻塞,不然会断开连接.
可以使用 channelFuture.channel().closeFuture().sync()进行阻塞.
Channel channel = channelFuture.channel() 得到的通道可以保存起来,进行发送消息。
package com.liuqi.chat.client;
import com.liuqi.chat.ChatServer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
/**
* <br>
* create by liuqi 2020/10/6
**/
public class ChatClient {
public static void main(String[] args) {
/* 客户端需要一个读写事件循环组 */
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
/* 创建客户端启动对象 */
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(eventExecutors) // 设置处理读写的线程组
.channel(NioSocketChannel.class) // 通道处理器
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new StringDecoder()) // 字符串解码处理器
.addLast(new StringEncoder()) // 字符串编码处理器
.addLast(new ChatClientHandler()); // 自定义的处理器
}
});
/* 绑定一个端口并且同步 */
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", ChatServer.PORT).sync();
System.out.println("建立连接成功");
/* 得到连接的通道 */
Channel channel = channelFuture.channel();
/**
* 阻塞 监听键盘输入,
* 方便调试发送消息
*/
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
/* 通过channel 发送到服务器端 */
channel.writeAndFlush(msg);
}
} catch (Exception e) {
System.out.println("启动客户端失败 " + e.getMessage());
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
5 ChatClientHandler 客户端的消息处理器
这个客户端处理器很简单,就接收到服务器发送的消息,输出在控制台。
package com.liuqi.chat.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 聊天客户端处理
*
* <br>
* create by liuqi 2020/10/6
**/
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 解码后读取数据
* @param ctx
* @param msg
* @throws Exception
*/
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
/**
* 连接异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("连接异常 " + cause.getMessage());
ctx.close();
}
}
总结
到此这个简单的群聊服务就做好了
可以多复制几个客户端启动进行调试。