服务端
- 在server包下创建ChatServer.java,该类用监听有没有任何的客户端发起建立连接的要求,如果有的话接收连接的要求并接收任意用户发来的消息转发给在线的所有用户
public class ChatServer {
//端口号
private int DEFAULT_PORT = 8888;
//用户输入quit时退出聊天
private final String QUIT = "quit";
//线程池
private ExecutorService executorService;
//接收各个客户端发送的连接请求
private ServerSocket serverSocket;
//保存目前为止所有客户发送的消息。
// key:客户端的端口号 value:服务端向对应的客户端发送信息时的writer
private Map<Integer, Writer> connectedClients;
//初始化参数
public ChatServer() {
//创建含10个线程的线程池
executorService = Executors.newFixedThreadPool(10);
connectedClients = new HashMap<>();
}
/**
* 接收客户端的socket,建立客户端与服务端的连接
* 使用synchronized ,确保同一时间内只有一个线程来调用该函数
* (防止多个函数同时调用更改Map集合,确保线程安全)下面几个函数也是要使用synchronized
* @param socket
*/
public synchronized void addClient(Socket socket) throws IOException {
//判断该socket是可以进行操作的值
if (socket!=null){
int port = socket.getPort();//Key
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())
);//Value
//将key,value写入Map中
connectedClients.put(port,writer);
System.out.println("客户端["+port+"]已经连接到服务器");
}
}
/**
* 断开指定客户端与服务器的连接
* @param socket
*/
public synchronized void removeClient(Socket socket) throws IOException {
if (socket!=null){
int port = socket.getPort();
//先判断当前客户端有没有在map中(之前有没有与客户端建立连接)
if (connectedClients.containsKey(port)){
//如果在,直接将writer流关闭
connectedClients.get(port).close();
}
//并将对应的值从map中去除
connectedClients.remove(port);
System.out.println("客户端["+port+"]断开连接");
}
}
/**
* 当服务器接收到某一客户发送够来的消息时要将消息转发到各个客户的控制台上显示
* @param socket 发送消息的用户
* @param fwdMsg 客户端需要转发的消息
*/
public synchronized void forwordMessage(Socket socket,String fwdMsg) throws IOException {
//通过map遍历当前在线的所有客户(只要该客户不是发送消息的人,客户端都要把消息转发给他)
for (Integer id:connectedClients.keySet()){
if (!id.equals(socket.getPort())){
//如果该端口与发送消息的用户端口号不一致,则转发消息
Writer writer = connectedClients.get(id);
//将消息转发给客户
writer.write(fwdMsg);
writer.flush();
}
}
}
/**
* 检查用户是否退出
*/
public boolean readyToQuit(String msg){
return QUIT.equals(msg)?true:false;
}
/**
* 关闭流
*/
public synchronized void close(){
if (serverSocket!=null){
try {
serverSocket.close();
System.out.println("关闭ServerSocket");
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 启动服务器端
*/
public void start(){
//绑定监听端口
try {
serverSocket = new ServerSocket(DEFAULT_PORT);
System.out.println("服务器已启动,监听端口:"+DEFAULT_PORT+"......");
while (true){
//等待是否有客户端需要尝试建立连接
Socket socket = serverSocket.accept();
//创建ChatHandler线程
executorService.execute(new ChatHandler(this,socket));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
}
}
public static void main(String[] args) {
ChatServer server = new ChatServer();
server.start();
}
}
- 在server包下创建ChatHandler.java,用于建立客户端与用户一对一的交换(一个用户对应一个线程)
public class ChatHandler implements Runnable {
//用操作存放在服务器端的map集合
private ChatServer server;
//建立连接的客户端socket
private Socket socket;
public ChatHandler(ChatServer server, Socket socket) {
this.server = server;
this.socket = socket;
}
@Override
public void run() {
try {
//存储新上线用户
server.addClient(socket);
//读取用户发送来的消息
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream())
);
String msg = null;
while ((msg=reader.readLine())!=null){
String forMsg = "客户端["+socket.getPort()+"]:"+msg+"\n";
System.out.println(forMsg);
//将收到的消息转发给聊天室里在线的其他用户
//由于该方法读取的消息也是由readLine读取,而该方法要读取到\n才可以接受读取
//因此需要在msg后面加上\n,用于使读取接受
server.forwordMessage(socket,forMsg);
//检查用户是否准备退出
if (server.readyToQuit(msg)){
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
//如果该用户退出聊天,也要将该用户的key、value从map中删除
server.removeClient(socket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
用户端
- 在server包下创建ChatClient.java,该类用于使用Socket与服务器端建立连接,并不停地监听从服务器端中聊天室发来的其他人的消息并把这个消息显示在屏幕上
public class ChatClient {
private final String DEFAULT_SERVER_HOST = "127.0.0.1";
private final int DEFAULT_SERVER_PORT = 8888;
private final String QUIT = "quit";
private Socket socket;
private BufferedReader reader;
private BufferedWriter writer;
/**
* 发送消息给服务器
*/
public void send(String msg) throws IOException {
//判断该socket的输出流是否为开放的状态
if (!socket.isOutputShutdown()){
writer.write(msg+"\n");
writer.flush();
}
}
/**
* 从服务器接收消息
*/
public String receive() throws IOException {
String msg = null;
if (!socket.isInputShutdown()){
msg = reader.readLine();
}
return msg;
}
/**
* 检查用户是否准备退出
*/
public boolean readyToQuit(String msg){
return QUIT.equals(msg);
}
/**
* 关闭流
*/
public void close(){
if (writer!=null){
try {
System.out.println("关闭socket");
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start(){
try {
//创建socket
socket = new Socket(DEFAULT_SERVER_HOST,DEFAULT_SERVER_PORT);
//创建用来发送和接收信息的io流
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
//处理用户的输入
new Thread(new UserInputHander(this)).start();
//读取服务器转发的各种信息
String msg = null;
while ((msg=receive())!=null){
System.out.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
}
}
public static void main(String[] args) {
ChatClient chatClient = new ChatClient();
chatClient.start();
}
}
- 在server包下创建UserInputHander.java,该类用于*处理用户在控制台上的输入(因为等待用户的输入本身就是阻塞式的调用,我们不能让他阻塞ChatClient的线程)
public class UserInputHander implements Runnable {
private ChatClient chatClient;
public UserInputHander(ChatClient chatClient) {
this.chatClient = chatClient;
}
@Override
public void run() {
try {
//等待用户输入信息
BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));
while (true){
String input = consoleReader.readLine();
//先服务器发送消息
chatClient.send(input);
//检查用户是否准备退出
if (chatClient.readyToQuit(input)){
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
效果演示
- 先运行ChatServer.java,开启服务器,控制台输入如下结果
- 然后再运行ChatClient.java,用于创建一个用户端,此时在ChatServer控制台那显示该用户已连接到服务器中
- 然后在用户端输入一串信息,发现服务端有显示出用户输入的内容
- 接下来运行一个新的ChatClient.java,同时输入一串信息,发现另一个用户控制台上也显示出该用户输入的信息,同时服务端也显示该用户连接上了,并也输出该用户的信息
用户2
用户1
服务端
- 如果用户1要退出的话,直接输入quit即可结束进程