效果图
功能
从项目代码结构来,代码主要分为简单的服务端和客户端。当运行服务端后,可运行多个客户端连接到服务端。某个客户端发送消息,都会经由服务端转发到除了自己的其他客户端。
代码虽然不多,而且直接使用原生java手写,但是却很大程度接近聊天室的功能,这比网上很多关于Socket的入门案例都要有含金量。甚至你可以对应代码打包成jar包。如下图,并将server.jar放到云服务器上运行。那么其他人都可以通过cmd的方式运行client.jar来加入聊天室,并进行群聊天。
体验
https://www.ysqorz.top/uploaded/file/client.jar
想试一下效果的同学,可通过上述链接,下载client.jar。然后在jar包所在目录,以管理员身份运行cmd。通过命令:java -jar client.jar 运行jar包。即可连接到服务器。如果有其他人也运行jar包,连接到服务器,那么就可以群聊了。当然,你可以运行多个cmd来模拟多人的情况。
实现思路
1、多个客户端与服务端建立起TCP连接,等待连接的过程是阻塞状态。因此需要一个独立的线程Connector来专门处理客户端的连接,并且该线程只负责这件事,也就是说这个线程一旦等到有客户端连接,就建立其连接,之后就不管了。客户端的消息收发右其他线程完成。
2、对于每一个单独的客户端,它的消息的收发需要交由专门的类ClientHandler来完成。顾名思义,ClientHandler意为客户端处理者。每一个ClientHandler专门负责一个客户端的消息收发。因此有多少个建立连接的客户端,服务端就持有多少个ClientHandler。因此服务端需要持有并维护一个ClientHandler列表。
3、对于每个客户端进行读写分离。这一点是非常有必要的。在read客户端消息时,会处于阻塞状态,如果不分离,就不能对客户端进行写操作了。网上很多案例都是,read的时候阻塞,然后读到客户端的消息后,再自动回送消息。要进行读写分离,客户端的读写操作就必须由单独的线程来负责。因此每一个ClientHandler里面都持有读、写两个线程。
上面分析了实现聊天室案例的难点和主体思路,更多坑点和细节,参照源码!该案例涉及Socket的使用、多线程、Java面向对象的思想,对功能职责进行划分。
转载请注明出处
完整代码
Package server
package server;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import utils.CloseUtil;
/**
* 客户端处理者
* @author passerbyYSQ
* @date 2020-9-23 9:18:31
*/
public class ClientHandler {
private Socket socket;
private ClientInfo clientInfo;
// 对客户端进行读写分离
// 负责从客户端读的单独线程
private ReadHandler reader;
// 负责向客户端写的单独线程
private WriteHandler writer;
// 事件回调。当某个客户端的事件触发时,回调给TcpServer
private ClientEventCallback callback;
public ClientHandler(Socket socket, ClientEventCallback callback) throws IOException {
this.socket = socket;
this.callback = callback;
clientInfo = new ClientInfo(socket.getLocalAddress(), socket.getPort());
System.out.println("新客户端链接:" + clientInfo);
reader = new ReadHandler(socket.getInputStream());
reader.start();
writer = new WriteHandler(socket.getOutputStream());
}
public void sendMsg(String msg) {
writer.send(msg);
}
public void exit() {
reader.exit();
writer.exit();
CloseUtil.close(socket);
}
public ClientInfo getClientInfo() {
return clientInfo;
}
interface ClientEventCallback {
void onMsgReceived(ClientHandler handler, String msg);
void onClientExit(ClientHandler handler);
}
class ClientInfo {
InetAddress inetAddr;
Integer port;
ClientInfo(InetAddress inetAddr, Integer port) {
this.inetAddr = inetAddr;
this.port = port;
}
@Override
public String toString() {
return "client [ip=" + this.inetAddr.getHostAddress() + ", port=" + this.port + "]";
}
}
class ReadHandler extends Thread {
DataInputStream in;
boolean running = true;
ReadHandler(InputStream inputStream) {
in = new DataInputStream(inputStream);
}
public void run() {
while(running) {
try {
String msg = in.readUTF();
if ("bye".equalsIgnoreCase(msg)) {
exit();
callback.onClientExit(ClientHandler.this);
break;
}
callback.onMsgReceived(ClientHandler.this, msg);
} catch (IOException e) {
System.out.println(clientInfo.toString() + "关闭:" +
"异常-" + e.getCause() + ",信息-" + e.getMessage());
exit();
}
}
}
void exit() {
running = false;
CloseUtil.close(in);
}
}
/**
* 负责向客户端写的单独线程。
* 不继承Thread类,而是巧妙地采用一个单例线程池来发送消息
*/
class WriteHandler {
DataOutputStream out;
ExecutorService executor;
WriteHandler(OutputStream outputStream) {
out = new DataOutputStream(outputStream);
executor = Executors.newSingleThreadExecutor();
}
void send(final String msg) {
executor.execute(new Runnable() {
public void run() {
try {
out.writeUTF(msg);
out.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
void exit() {
CloseUtil.close(out);
executor.shutdownNow();
}
}
}
package server;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import server.ClientHandler.ClientEventCallback;
import utils.CloseUtil;
public class TcpServer implements ClientEventCallback {
private static ServerSocket serverSocket;
// 客户端处理者的列表
private List<ClientHandler> clientHandlerList = new ArrayList<>();
private Connector connector;
public void setup() throws IOException {
serverSocket = new ServerSocket(8096);
connector = new Connector();
connector.start();
System.out.println("服务器启动成功,服务器信息:ip-" +
serverSocket.getInetAddress().getHostAddress() +
", port-" + serverSocket.getLocalPort());
}
public void exit() {
exitAllClients();
connector.exit();
CloseUtil.close(serverSocket);
}
public void exitAllClients() {
System.out.println("客户端数量:" + clientHandlerList.size());
for (ClientHandler handler : clientHandlerList) {
handler.exit();
}
clientHandlerList.clear();
}
/**
* 消息到达时的转发
*/
@Override
public void onMsgReceived(ClientHandler handler, String msg) {
msg = handler.getClientInfo().toString() + ":" + msg;
System.out.println(msg);
broadcast(handler, msg);
}
/**
* 客户端退出时的回调
*/
@Override
public void onClientExit(ClientHandler handler) {
handler.exit();
clientHandlerList.remove(handler);
String msg = handler.getClientInfo() +
"已退出群聊。当前客户端数量:" + clientHandlerList.size();
broadcast(handler, msg);
System.out.println(msg);
}
/**
* 转发消息
* @param handler 消息所来自的客户端
* @param msg
*/
private void broadcast(ClientHandler handler, String msg) {
for (ClientHandler clientHandler : clientHandlerList) {
// 转发消息时跳过自己
if (clientHandler == handler) {
continue;
}
clientHandler.sendMsg(msg);
}
}
/**
* 负责处理客户端连接的单独线程
*/
class Connector extends Thread {
boolean running = true;
public Connector() {
super("负责处理客户端连接的单独线程");
this.setPriority(MAX_PRIORITY);
}
public void run() {
while(running) {
try {
Socket socket = serverSocket.accept();
ClientHandler handler = new ClientHandler(socket, TcpServer.this);
clientHandlerList.add(handler);
broadcast(handler, handler.getClientInfo() +
"加入群聊。当前客户端数量:" + clientHandlerList.size());
} catch (IOException e) {
System.out.println("ServerSocket异常关闭:异常-" +
e.getCause() + ",信息-" + e.getMessage());
exit();
}
}
}
void exit() {
running = false;
}
}
}
package server;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class Server {
public static void main(String[] args) throws IOException {
TcpServer tcpServer = new TcpServer();
tcpServer.setup();
BufferedReader bufReader = new BufferedReader(new InputStreamReader(System.in));
boolean flag = true;
do {
String command = bufReader.readLine();
if (command == null) {
break;
}
switch (command.toLowerCase()) {
case "exit clients": {
tcpServer.exitAllClients();
break;
}
case "exit": {
tcpServer.exit();
flag = false;
break;
}
default: {
System.out.println("Unsupport commond!");
}
}
} while(flag);
tcpServer.exit();
}
}
Package client
package client;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import utils.CloseUtil;
public class TcpClient {
public static final String SERVER_IP = "119.45.164.115";
public static final int SERVER_PORT = 8096;
private Socket socket = new Socket(SERVER_IP, SERVER_PORT);
private ReadHandler reader;
private WriteHandler writer;
public TcpClient() throws IOException {
System.out.println("连接服务器成功,服务器信息:ip-" +
socket.getInetAddress().getHostAddress() +
", port-" + socket.getPort());
System.out.println("客户端信息:ip-" + socket.getLocalAddress() +
", port-" + socket.getLocalPort());
reader = new TcpClient.ReadHandler(socket.getInputStream());
reader.start();
writer = new TcpClient.WriteHandler(socket.getOutputStream());
}
public void exit() {
reader.exit();
writer.exit();
CloseUtil.close(socket);
}
public void send(String msg) {
writer.send(msg);
}
class ReadHandler extends Thread {
DataInputStream in;
boolean running = true;
ReadHandler(InputStream inputStream) {
this.in = new DataInputStream(inputStream);
}
public void run() {
while(running) {
try {
String msg = this.in.readUTF();
System.out.println(msg);
} catch (IOException e) {
System.out.println("客户端关闭:异常-" + e.getCause() +
",信息-" + e.getMessage());
exit();
}
}
}
void exit() {
running = false;
CloseUtil.close(in);
}
}
class WriteHandler {
DataOutputStream out;
ExecutorService executor;
WriteHandler(OutputStream outputStream) {
out = new DataOutputStream(outputStream);
executor = Executors.newSingleThreadExecutor();
}
void send(final String msg) {
executor.execute(new Runnable() {
public void run() {
try {
out.writeUTF(msg);
out.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
void exit() {
CloseUtil.close(out);
executor.shutdownNow();
}
}
}
package client;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class Client {
public static void main(String[] args) throws IOException {
TcpClient tcpClient = new TcpClient();
BufferedReader bufReader = new BufferedReader(
new InputStreamReader(System.in));
boolean isClosed = false;
while(true) {
String msg = bufReader.readLine();
// socket异常断开,可能造成msg为null,引发下面的空指针异常
if (msg == null) {
break;
}
// 空字符串不发送
if (msg.length() == 0) {
continue;
}
// 退出客户端
if ("exit".equalsIgnoreCase(msg)) {
break;
}
// 连接未断开时,才发送
if (!isClosed) {
// 断开连接
if ("bye".equalsIgnoreCase(msg)) {
System.out.println("已请求服务器断开连接,输入exit退出客户端!");
isClosed = true;
}
tcpClient.send(msg);
}
}
tcpClient.exit();
}
}
Package utils
package utils;
import java.io.Closeable;
import java.io.IOException;
/**
* 关闭资源的工具类
* @author passerbyYSQ
* @date 2020-9-23 8:55:33
*/
public class CloseUtil {
public static void close(Closeable... closeableArr) {
if (closeableArr == null) {
return;
}
for (Closeable closeable : closeableArr) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}