服务端
创建一个server包,并在该包下创建ChatServer.java用于开启服务器,与用户进行连接、以及将转发用户的信息到其他用户中
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ChatServer {
private static final String LOCALHOST = "localhost";
private static final int DEFAULT_PORT = 8888;
private static final String QUIT = "quit";
private static final int BUFFER = 1024;
private static final int THREADPOOL_SIZE = 8;
private AsynchronousChannelGroup channelGroup;
private AsynchronousServerSocketChannel serverSocketChannel;
private Charset charset = Charset.forName("utf-8");
private List<ClientHandler> connectedClient;
private int port;
public ChatServer(int port) {
this.port = port;
this.connectedClient = new ArrayList<>();
}
public ChatServer() {
this(DEFAULT_PORT);
}
/**
* 当输入"quit"时表示客户退出
* @param msg
* @return
*/
private boolean readyToQuit(String msg){
return QUIT.equals(msg);
}
/**
* 关闭相对应的流并释放与之相关联的任何系统资源,如果流已关闭,则调用此方法将不起任何作用
* @param closeable
*/
private void close(Closeable closeable){
if (closeable!=null){
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 添加一个新的客户端进客户端列表(list集合)
* @param handler
*/
private synchronized void addClient(ClientHandler handler) {
connectedClient.add(handler);
System.out.println(getClientName(handler.clientChannel)+"已经连接到服务器");
}
/**
* 将该客户(下线)从列表中删除
* @param clientHandler
*/
private synchronized void removeClient(ClientHandler clientHandler) {
connectedClient.remove(clientHandler);
System.out.println(getClientName(clientHandler.clientChannel)+"已断开连接");
//关闭该客户对应流
close(clientHandler.clientChannel);
}
/**
* 服务器其实客户端发送的信息,并将该信息进行utf-8解码
* @param buffer
* @return
*/
private synchronized String receive(ByteBuffer buffer) {
CharBuffer charBuffer = charset.decode(buffer);
return String.valueOf(charBuffer);
}
/**
* 服务器端转发该客户发送的消息到其他客户控制室上(转发信息)
* @param clientChannel
* @param fwdMsg
*/
private synchronized void fwdwordMessage(AsynchronousSocketChannel clientChannel, String fwdMsg) {
for (ClientHandler handler:connectedClient){
//该信息不用再转发到发送信息的那个人那
if (!handler.clientChannel.equals(clientChannel)){
try {
//将要转发的信息写入到缓冲区中
ByteBuffer buffer = charset.encode(getClientName(handler.clientChannel)+":"+fwdMsg);
//将相应的信息写入到用户通道中,用户再通过获取通道中的信息读取到对应转发的内容
handler.clientChannel.write(buffer,null,handler);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 获取客户端的端口号并打印出来
* @param clientChannel
* @return
*/
private String getClientName(AsynchronousSocketChannel clientChannel) {
int clientPort = -1;
try {
InetSocketAddress inetSocketAddress = (InetSocketAddress) clientChannel.getRemoteAddress();
clientPort = inetSocketAddress.getPort();
} catch (IOException e) {
e.printStackTrace();
}
return "客户端["+clientPort+"]:";
}
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
@Override
public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
if (serverSocketChannel.isOpen()){
serverSocketChannel.accept(null,this);
}
if (clientChannel!=null&&clientChannel.isOpen()){
//为该新连接的用户创建handler,用于读写操作
ClientHandler handler = new ClientHandler(clientChannel);
ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
//将新用户添加到在线用户列表
addClient(handler);
//第一个buffer表示从clientChannel中读取的信息写入到buffer缓冲区中
//第二个buffer:但handler回调函数被调用时,buffer会被当做一个attachment参数传入到该回调函数中
clientChannel.read(buffer,buffer,handler);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接失败:"+exc);
}
}
private class ClientHandler implements CompletionHandler<Integer,Object> {
private AsynchronousSocketChannel clientChannel;
public ClientHandler(AsynchronousSocketChannel clientChannel) {
this.clientChannel=clientChannel;
}
@Override
public void completed(Integer result, Object attachment) {
ByteBuffer buffer = (ByteBuffer) attachment;
if (buffer!=null){
//写操作
if (result<=0){
//客户端异常
//TODO 将客户移除在线客户列表
}else {
buffer.flip();
String fwdMsg = receive(buffer);
System.out.println(getClientName(clientChannel)+fwdMsg);
//转发给在线的所有客户
fwdwordMessage(clientChannel,fwdMsg);
//清除缓冲区
buffer.clear();
//检查用户是否退出
if (readyToQuit(fwdMsg)){
//将客户从在线客户列表中去除
removeClient(this);
}else {
//如果不是则继续等待读取用户输入的信息
clientChannel.read(buffer,buffer,this);
}
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("读写失败:"+exc);
}
}
/**
* 启动服务器
*/
private void start(){
try {
//创建并绑定
//初始化线程此
ExecutorService executorService = Executors.newFixedThreadPool(THREADPOOL_SIZE);
//将线程池加入到异步通道中
channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
//打开通道
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
//为通道绑定本地主机和端口
serverSocketChannel.bind(new InetSocketAddress(LOCALHOST,port));
System.out.println("启动服务器,监听端口:"+port);
while (true){
//一直调用accept函数,接收要与服务端建立连接的用户
serverSocketChannel.accept(null,new AcceptHandler());
//阻塞式调用,防止占用系统资源
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 主函数入口
* @param args
*/
public static void main(String[] args) {
ChatServer server = new ChatServer(7777);
server.start();
}
}
客户端
- 创建client包,在该包下创建ChatClient.java,用于打开通道与服务器进行连接,以及接收和发送信息
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ChatServer {
private static final String LOCALHOST = "localhost";
private static final int DEFAULT_PORT = 8888;
private static final String QUIT = "quit";
private static final int BUFFER = 1024;
private static final int THREADPOOL_SIZE = 8;
private AsynchronousChannelGroup channelGroup;
private AsynchronousServerSocketChannel serverSocketChannel;
private Charset charset = Charset.forName("utf-8");
private List<ClientHandler> connectedClient;
private int port;
public ChatServer(int port) {
this.port = port;
this.connectedClient = new ArrayList<>();
}
public ChatServer() {
this(DEFAULT_PORT);
}
/**
* 当输入"quit"时表示客户退出
* @param msg
* @return
*/
private boolean readyToQuit(String msg){
return QUIT.equals(msg);
}
/**
* 关闭相对应的流并释放与之相关联的任何系统资源,如果流已关闭,则调用此方法将不起任何作用
* @param closeable
*/
private void close(Closeable closeable){
if (closeable!=null){
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 添加一个新的客户端进客户端列表(list集合)
* @param handler
*/
private synchronized void addClient(ClientHandler handler) {
connectedClient.add(handler);
System.out.println(getClientName(handler.clientChannel)+"已经连接到服务器");
}
/**
* 将该客户(下线)从列表中删除
* @param clientHandler
*/
private synchronized void removeClient(ClientHandler clientHandler) {
connectedClient.remove(clientHandler);
System.out.println(getClientName(clientHandler.clientChannel)+"已断开连接");
//关闭该客户对应流
close(clientHandler.clientChannel);
}
/**
* 服务器其实客户端发送的信息,并将该信息进行utf-8解码
* @param buffer
* @return
*/
private synchronized String receive(ByteBuffer buffer) {
CharBuffer charBuffer = charset.decode(buffer);
return String.valueOf(charBuffer);
}
/**
* 服务器端转发该客户发送的消息到其他客户控制室上(转发信息)
* @param clientChannel
* @param fwdMsg
*/
private synchronized void fwdwordMessage(AsynchronousSocketChannel clientChannel, String fwdMsg) {
for (ClientHandler handler:connectedClient){
//该信息不用再转发到发送信息的那个人那
if (!handler.clientChannel.equals(clientChannel)){
try {
//将要转发的信息写入到缓冲区中
ByteBuffer buffer = charset.encode(getClientName(handler.clientChannel)+":"+fwdMsg);
//将相应的信息写入到用户通道中,用户再通过获取通道中的信息读取到对应转发的内容
handler.clientChannel.write(buffer,null,handler);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 获取客户端的端口号并打印出来
* @param clientChannel
* @return
*/
private String getClientName(AsynchronousSocketChannel clientChannel) {
int clientPort = -1;
try {
InetSocketAddress inetSocketAddress = (InetSocketAddress) clientChannel.getRemoteAddress();
clientPort = inetSocketAddress.getPort();
} catch (IOException e) {
e.printStackTrace();
}
return "客户端["+clientPort+"]:";
}
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
@Override
public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
if (serverSocketChannel.isOpen()){
serverSocketChannel.accept(null,this);
}
if (clientChannel!=null&&clientChannel.isOpen()){
//为该新连接的用户创建handler,用于读写操作
ClientHandler handler = new ClientHandler(clientChannel);
ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
//将新用户添加到在线用户列表
addClient(handler);
//第一个buffer表示从clientChannel中读取的信息写入到buffer缓冲区中
//第二个buffer:但handler回调函数被调用时,buffer会被当做一个attachment参数传入到该回调函数中
clientChannel.read(buffer,buffer,handler);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接失败:"+exc);
}
}
private class ClientHandler implements CompletionHandler<Integer,Object> {
private AsynchronousSocketChannel clientChannel;
public ClientHandler(AsynchronousSocketChannel clientChannel) {
this.clientChannel=clientChannel;
}
@Override
public void completed(Integer result, Object attachment) {
ByteBuffer buffer = (ByteBuffer) attachment;
if (buffer!=null){
//写操作
if (result<=0){
//客户端异常
//TODO 将客户移除在线客户列表
}else {
buffer.flip();
String fwdMsg = receive(buffer);
System.out.println(getClientName(clientChannel)+fwdMsg);
//转发给在线的所有客户
fwdwordMessage(clientChannel,fwdMsg);
//清除缓冲区
buffer.clear();
//检查用户是否退出
if (readyToQuit(fwdMsg)){
//将客户从在线客户列表中去除
removeClient(this);
}else {
//如果不是则继续等待读取用户输入的信息
clientChannel.read(buffer,buffer,this);
}
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("读写失败:"+exc);
}
}
/**
* 启动服务器
*/
private void start(){
try {
//创建并绑定
//初始化线程此
ExecutorService executorService = Executors.newFixedThreadPool(THREADPOOL_SIZE);
//将线程池加入到异步通道中
channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
//打开通道
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
//为通道绑定本地主机和端口
serverSocketChannel.bind(new InetSocketAddress(LOCALHOST,port));
System.out.println("启动服务器,监听端口:"+port);
while (true){
//一直调用accept函数,接收要与服务端建立连接的用户
serverSocketChannel.accept(null,new AcceptHandler());
//阻塞式调用,防止占用系统资源
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 主函数入口
* @param args
*/
public static void main(String[] args) {
ChatServer server = new ChatServer(7777);
server.start();
}
}
- 再在client包下创建UserInputHandler.java,用于客户端向服务器发送信息
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class UserInputHandler implements Runnable {
private ChatClient chatClient;
public UserInputHandler(ChatClient chatClient) {
this.chatClient = chatClient;
}
@Override
public void run() {
try {
//等待用户输入消息
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
while (true){
String input = br.readLine();
//向服务器发送消息
chatClient.send(input);
//检查用户是否准备退出
if (chatClient.readyToQuit(input)){
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行效果:
- 运行ChatServer.java用于开启服务器等待用户连接
- 运行ChatClient.java,并输入信息
- 此时再运行一个ChatClient.java,同样输入消息