2.2 网络多线程(私聊、群发、发送文件、推送新闻、离线留言)

一、私聊

1.1 分析

客户端A和客户端B私聊的时候,其实服务端在中间做了一个转发

流程

  • 客户端A —> 服务端 —> 客户端B

  • 客户端B —> 服务端 —> 客户端A

服务端可以读取到客户端A发送给客户端B的消息,服务端再从管理线程的集合中获取接收者客户端B的线程,也就能获取到其socket,此线程在服务端就会将消息发送给客户端B,也就是服务器只需要做个转发即可

image-20231208224106294

1.2 客户端

1.2.1 MessageClientService 私聊类

/**
 * 该类提供和消息相关的服务方法
 */
public class MessageClientService {
    
    
    /**
     * @param content  内容
     * @param senderId 发送用户id
     * @param getterId 接收用户id
     */
    public void sendMessageToOne(String content, String senderId, String getterId) {
    
    
        //封装消息
        Message message = new Message();
        message.setContent(content);
        message.setSender(senderId);
        message.setGetter(getterId);
        message.setSendTime(new Date().toString());
        message.setMesType(MessageType.MESSAGE_COMM_MES.getCode());//普通消息
        System.out.println("用户"+senderId+"和用户"+getterId+"说:"+content);
        //获取senderId对应的socket
        ClientConnectServerThread clientConnectServerThread = ManagerClientConnectServerThread.getClientConnectServerThread(senderId);
        Socket socket = clientConnectServerThread.getSocket();

        //输出消息
        try {
    
    
            ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
            oos.writeObject(message);
            oos.flush();

        } catch (IOException e) {
    
    
            e.printStackTrace();
        }

    }
}

1.2.2 ClientConnectServerThread 线程类

@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ClientConnectServerThread extends Thread {
    
    
    //该线程需要持有Socket属性
    private Socket socket;


    /**
     *因为Thread需要在后台跟我们的服务器进行通信(保持一个联系),因此我们使用while循环来控制
     */
    @Override
    public void run() {
    
    
        while(true){
    
    
            //一直读取从服务器端回收的消息
            System.out.println("客户端线程,等待读取从服务端发送的消息....");

            try {
    
    
                ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                //如果服务器端没有发送消息过来,这个地方会堵塞,此线程会一直等待
                //这就是一个堵塞式网络编程,效率是相对比较低的
                Message message = (Message)ois.readObject();

                //判断message的类型,然后做响应的业务处理
                if (message.getMesType().equals(MessageType.MESSAGE_RETTURN_ONLINE_FRIEND.getCode())){
    
    
                    //获取在线用户,取出在线列表信息并显示
                    String[] onlineUsers = message.getContent().split(" ");
                    System.out.println("当前在线用户列表如下");
                    for (int i=0;i<onlineUsers.length;i++){
    
    
                        System.out.println("用户:"+onlineUsers[i]);
                    }
                }else if (MessageType.MESSAGE_COMM_MES.getCode().equals(message.getMesType())) {
    
    
                    //转发给指定客户端,假如说客户不在线的话,可以保存到数据库,这样就可以实现离线留言
                    System.out.println("用户"+message.getGetter()+"收到来自用户"+message.getSender()+"的消息:"+message.getContent());
                }else{
    
    
                    System.out.println("其他类型的message,暂时不处理");
                }
            } catch (IOException | ClassNotFoundException e) {
    
    
                e.printStackTrace();
            }
        }
    }
}

1.3 服务端

1.3.1 ServerConnectClientThread 线程类

/**
 * 该类对应的对象和某个客户端保持通信
 */
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServerConnectClientThread extends Thread {
    
    

    /**
     * 可以区分此socket是和哪个用户进行关联的
     */
    private String userId;//连接到服务端的这个用户id

    private Socket socket;

    /**
     * 线程处于run状态,可以发送或者接收客户端的消息
     */
    @Override
    public void run() {
    
    
        //不断的从socket中读数据和写数据
        while (true) {
    
    
            System.out.println("服务端和客户端保持通信,读取数据.... userId:" + userId);
            ObjectInputStream ois = null;
            try {
    
    
                ois = new ObjectInputStream(socket.getInputStream());
                //读取数据
                Message message = (Message) ois.readObject();

                //根据Message的类型,判断客户端想要执行什么操作
                if (MessageType.MESSAGE_GET_ONLINE_FRIEND.getCode().equals(message.getMesType())) {
    
    
                    System.out.println("用户" + userId + "获取在线用户");
                    //拉取在线用户(客户端要拉取在线用户列表)
                    Socket socket = ManagerServerConnectServerThread.getClientThread(userId).getSocket();

                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                    //构建Message发送给服务端
                    Message returnMessage = new Message();
                    returnMessage.setMesType(MessageType.MESSAGE_RETTURN_ONLINE_FRIEND.getCode());
                    returnMessage.setContent(ManagerServerConnectServerThread.getOnlineUser());
                    //说明要发送给谁
                    returnMessage.setGetter(message.getSender());
                    //返回给客户端
                    oos.writeObject(returnMessage);
                    oos.flush();
                } else if (MessageType.MESSAGE_CLIENT_EXIT.getCode().equals(message.getMesType())) {
    
    
                    //说明客户端想要退出,服务端要将socket关闭并退出线程就可以了
                    //将客户端对应的线程从集合中删除
                    ManagerServerConnectServerThread.remove(userId);
                    //关闭socket
                    socket.close();
                    System.out.println("用户" + userId + "退出系统");
                    //退出循环
                    return;
                } else if (MessageType.MESSAGE_COMM_MES.getCode().equals(message.getMesType())) {
    
    
                    //转发给指定客户端,假如说客户不在线的话,可以保存到数据库,这样就可以实现离线留言
                    Socket socket = ManagerServerConnectServerThread.getClientThread(message.getGetter()).getSocket();
                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

                    oos.writeObject(message);
                    oos.flush();

                } else {
    
    
                    System.out.println("其他类型暂时不处理");
                }

            } catch (IOException | ClassNotFoundException e) {
    
    
                e.printStackTrace();
            }
            //如果服务器端没有发送消息过来,这个地方会堵塞,此线程会一直等待
            //读取客户端发送的User对象

        }
    }
}

1.4功能演示

客户端A

image-20231208233050369

客户端B

image-20231208233107188

服务端

image-20231208233117506

二、群发消息

将消息发送给所有的在线用户

2.1 分析

客户端A群发消息后,服务端会遍历线程集合,将消息发送给除了客户端A以外的所有客户端,完成群发功能

2.2 客户端

2.2.1 MessageClientService类

/**
 * 群发消息
 * @param userId 发送消息的用户id
 * @param content 需要发送的内容
 */
public void sendMessageToOnlineUser(String userId, String content) {
    
    
    Message message = new Message();
    message.setContent(content);
    message.setSender(userId);
    message.setSendTime(new Date().toString());
    message.setMesType(MessageType.MESSAGE_TO_ALL_EXIT.getCode());//普通消息
    System.out.println("用户"+userId+"群发消息说:"+content);

    ClientConnectServerThread clientConnectServerThread = ManagerClientConnectServerThread.getClientConnectServerThread(userId);
    Socket socket = clientConnectServerThread.getSocket();

    //输出消息
    try {
    
    
        ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
        oos.writeObject(message);
        oos.flush();

    } catch (IOException e) {
    
    
        e.printStackTrace();
    }
}

2.2.2 ClientConnectServerThread 线程类

else if (MessageType.MESSAGE_TO_ALL_EXIT.getCode().equals(message.getMesType())) {
    
    
    //群发消息
    System.out.println("\n用户"+message.getGetter()+"收到来自用户"+message.getSender()+"的群发消息:"+message.getContent());

}

2.3 服务端

2.3.1 ServerConnectClientThread 线程类

else if (MessageType.MESSAGE_TO_ALL_EXIT.getCode().equals(message.getMesType())) {
    
    
    //群发消息
    //遍历线程集合取出所有线程对应的socket发送消息即可
    HashMap<String, ServerConnectClientThread> hm = ManagerServerConnectServerThread.getHm();
    Iterator<String> iterator = hm.keySet().iterator();
    while (iterator.hasNext()) {
    
    
        //取出在线人的id
        String onlineId = iterator.next();
        if (!onlineId.equals(message.getSender())) {
    
    
            ObjectOutputStream oos = new ObjectOutputStream(
                    hm.get(onlineId).getSocket().getOutputStream()
            );
            oos.writeObject(message);
            oos.flush();
        }

    }

}

2.4 测试

客户端A

客户端B

image-20231209001512979

服务端

image-20231209001526468

三、发送文件

3.0 消息类扩展

@Data
public class Message implements Serializable {
    
    
    private static final long serialVersionUID = -3567747187962510012L;

    /**
     * 消息类型:发送文件、纯文本、视频聊天....
     */
    private String mesType;

    /*
     *发送者
     */
    private String sender;

    /**
     * 接收者
     */
    private String getter;

    /**
     * 消息内容
     */
    private String content;

    /**
     * 发送时间
     */
    private String sendTime;

    /**
     * 扩展好文件香菇那的成员变量
     */
    //字节数组存储文件字节
    private byte[] fileBytes;

    //文件大小的长度初始化为0
    private int fileLen = 0;

    //文件的目的地是哪个位置
    private String dest;

    //传输的是哪个文件(原文件路径)
    private String src;


}

3.1 分析

image-20231209003348777

3.2 客户端

3.2.1 FileClientService 文件传输类

向服务器发送文件

/**
 * 该类完成文件的传输
 */
public class FileClientService {
    
    

    public void sendFileToOne(String src, String dest, String sender, String getter) {
    
    
        //读取src文件
        Message message = new Message();
        message.setMesType(MessageType.MESSAGE_FILE_MES.getCode());
        message.setSender(sender);
        message.setGetter(getter);
        message.setSrc(src);
        message.setDest(dest);
        //需要将文件从客户端读取
        FileInputStream fileInputStream = null;
        byte[] fileBytes = new byte[(int) new File(src).length()];
        // 二进制流

        try {
    
    
            //读取文件
            fileInputStream = new FileInputStream(src);
            //将src文件读入到程序的字节数组中
            fileInputStream.read(fileBytes);

            //将文件对应的字节数粗设置到message
            message.setFileBytes(fileBytes);


        } catch (Exception e) {
    
    
            e.printStackTrace();
        }finally {
    
    
            if (fileInputStream!=null){
    
    
                try {
    
    
                    fileInputStream.close();
                } catch (IOException e) {
    
    
                    e.printStackTrace();
                }
            }
        }

        //提示信息
        System.out.println("用户" + sender + "向用户" + getter + "发送文件" + src + "并存储到对方电脑目录" + dest);

        //向服务端发送Message
        try {
    
    
            ObjectOutputStream oos = new ObjectOutputStream(ManagerClientConnectServerThread.getClientConnectServerThread(sender).getSocket().getOutputStream());
            oos.writeObject(message);
            oos.flush();
        } catch (Exception e) {
    
    
            e.printStackTrace();
        }
        System.out.println("发送文件完毕");
    }

}

3.2.2 ClientConnectServerThread 线程类接收文件

 else if (MessageType.MESSAGE_FILE_MES.getCode().equals(message.getMesType())) {
    
    
    System.out.println("用户" + message.getGetter() + "收到用户" + message.getSender() + "发送的文件" + message.getSrc() + "并存储到我方电脑目录" + message.getDest());
    FileOutputStream fileOutputStream = new FileOutputStream(message.getDest());
    fileOutputStream.write(message.getFileBytes());
    fileOutputStream.flush();
    fileOutputStream.close();
    System.out.println("保存文件成功");
}

3.3 服务端

3.3.1 ServerConnectClientThread 线程类

服务端起到一个转发的作用而已

else if (MessageType.MESSAGE_FILE_MES.getCode().equals(message.getMesType())){
    
    
                    System.out.println("用户" + message.getSender() + "向用户" + message.getGetter() + "发送文件" + message.getSrc() + "并存储到对方电脑目录" + message.getDest());
                    //发送文件
                    Socket socket = ManagerServerConnectServerThread.getClientThread(message.getGetter()).getSocket();
                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

                    oos.writeObject(message);
                    oos.flush();
                }

3.4 测试结果

客户端A

image-20231209205819824

客户端B

这个地方的用户名错了,就不截取第二次了

image-20231209205836682

服务端

image-20231209205852008

四、服务端推送新闻

4.1 分析

服务端推送新闻本质就是群发消息

在服务器启动一条独立的线程,专门负责发送推送新闻

image-20231209213009521

4.2 客户端

4.2.1 ClientConnectServerThread 线程类

这个方法我们之前使用过

else if (MessageType.MESSAGE_TO_ALL_EXIT.getCode().equals(message.getMesType())) {
    
    
    //群发消息
    System.out.println("\n用户收到来自用户" + message.getSender() + "的群发消息:" + message.getContent());

}

4.3 服务端

4.3.1 SendNewsAllService推送消息

/**
 * 发送新闻
 */
public class SendNewsAllService implements Runnable {
    
    


    @Override
    public void run() {
    
    
        //多次推送新闻,使用while循环
        while (true) {
    
    
            System.out.println("请输入服务器要推送的信息/消息【输入exit表示退出】");
            String content = Utility.readString(500);
            if ("exit".equals(content)) {
    
    
                break;
            }
            //构建消息类型
            Message message = new Message();
            message.setSender("服务器");
            message.setMesType(MessageType.MESSAGE_TO_ALL_EXIT.getCode());
            message.setContent(content);
            message.setSendTime(new Date().toString());
            System.out.println("服务器推送消息给所有人 说:" + content);

            //遍历当前所有的通信线程得到socket
            HashMap<String, ServerConnectClientThread> hm = ManagerServerConnectServerThread.getHm();
            Iterator<String> iterator = hm.keySet().iterator();
            while (iterator.hasNext()) {
    
    
                String next = iterator.next();
                ServerConnectClientThread serverConnectClientThread = hm.get(next);
                try {
    
    
                    //给每个用户发送消息
                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(serverConnectClientThread.getSocket().getOutputStream());
                    objectOutputStream.writeObject(message);
                    objectOutputStream.flush();
                } catch (IOException e) {
    
    
                    e.printStackTrace();
                }
            }
        }

    }
}

4.3.2 QQServer启动线程

/**
 * 这是服务器,在监听9999,等待客户端的连接,并保持通信
 */
@Data
public class QQServer {
    
    

    //创建一个集合存放多个用户,如果是此用户登录,便认为是合法的
    //也可以使用ConcurrentHashMap,可以在并发的环境下处理(没有线程安全问题)
    //HashMap是没有处理线程安全的,因此在多线程情况下是不安全的
    private static HashMap<String, User> validUser = new HashMap<>();

    private ServerSocket serverSocket = null;

    /**
     * 进行类加载的时候会执行下面这个代码
     */
    static {
    
    
        validUser.put("100", new User("100", "123456"));
        validUser.put("200", new User("200", "123456"));
        validUser.put("300", new User("300", "123456"));
        validUser.put("至尊宝", new User("至尊宝", "123456"));
        validUser.put("紫霞仙子", new User("紫霞仙子", "123456"));
        validUser.put("菩提老祖", new User("菩提老祖", "123456"));
    }

    /**
     * 这是一个循环监听的过程
     * 并不是客户端A发送完信息服务器接收到后此服务器就关闭,而是一直监听,因为还有可能其他客户端发送过来信息
     */
    public QQServer() {
    
    
        System.out.println("服务端在9999端口监听....");
        //启动推送新闻的线程
        new Thread(new SendNewsAllService()).start();

        ObjectInputStream ois = null;
        ObjectOutputStream oos = null;
        try {
    
    
            this.serverSocket = new ServerSocket(9999);

            //监听是一直进行,当和某个客户端连接后,会继续监听,因此使用while循环
            while (true) {
    
    
                //没有客户端连接9999端口时,程序会堵塞,等待连接
                Socket socket = serverSocket.accept();

                ois = new ObjectInputStream(socket.getInputStream());
                //如果服务器端没有发送消息过来,这个地方会堵塞,此线程会一直等待
                //读取客户端发送的User对象
                User user = (User) ois.readObject();

                //创建Message对象,准备恢复客户端
                Message message = new Message();
                oos = new ObjectOutputStream(socket.getOutputStream());
                //验证用户是否合法
                User userValid = validUser.get(user.getUserId());
                if (userValid != null && userValid.getUserId().equals(user.getUserId()) && userValid.getPasswd().equals(user.getPasswd())) {
    
    
                    //合法用户
                    message.setMesType(MessageType.find(1));
                    //给客户端进行回复
//                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                    oos.writeObject(message);
                    oos.flush();

                    //创建一个线程,和客户端保持通信。
                    //该线程需要持有Socket对象
                    ServerConnectClientThread serverConnectClientThread = new ServerConnectClientThread(user.getUserId(), socket);
                    serverConnectClientThread.start();

                    //把该线程对象放入到一个集合中
                    ManagerServerConnectServerThread.addClientThread(user.getUserId(), serverConnectClientThread);

                } else {
    
    
                    //登录失败
                    message.setMesType(MessageType.find(2));
                    oos.writeObject(message);
                    oos.flush();

                    socket.close();
                }
            }


        } catch (IOException | ClassNotFoundException e) {
    
    
            e.printStackTrace();
        } finally {
    
    
//          如果服务端退出了while循环,说明服务器端不再监听了,因此需要关闭资源
            if (serverSocket != null) {
    
    
                try {
    
    
                    serverSocket.close();
                } catch (IOException e) {
    
    
                    e.printStackTrace();
                }
            }
            if (ois != null) {
    
    
                try {
    
    
                    ois.close();
                } catch (IOException e) {
    
    
                    e.printStackTrace();
                }
            }

            if (oos != null) {
    
    
                try {
    
    
                    oos.close();
                } catch (IOException e) {
    
    
                    e.printStackTrace();
                }
            }

        }
    }
}

4.4 测试

服务端

image-20231209223129728

客户端

image-20231209223147358

五、离线消息

客户端A给离线客户端B发送消息

我们可以在服务端创建一个集合,集合(HashMap就行)存放离线Message

对于集合的Key接收者的id,value是一个ArrayList,此ArrayList存放Message,因为客户端A可以给离线用户客户端B发送多条消息

当客户端B登录之后,服务端会首先到此HashMap集合中读取看看有没有离线消息,如果有的话从服务端发送到客户端B即可

image-20231209225316565

六、代码总结

6.1 公共类代码

6.1.1 消息类

@Data
public class Message implements Serializable {
    
    
    private static final long serialVersionUID = -3567747187962510012L;

    /**
     * 消息类型:发送文件、纯文本、视频聊天....
     */
    private String mesType;

    /*
     *发送者
     */
    private String sender;

    /**
     * 接收者
     */
    private String getter;

    /**
     * 消息内容
     */
    private String content;

    /**
     * 发送时间
     */
    private String sendTime;

    /**
     * 扩展好文件香菇那的成员变量
     */
    //字节数组存储文件字节
    private byte[] fileBytes;

    //文件大小的长度初始化为0
    private int fileLen = 0;

    //文件的目的地是哪个位置
    private String dest;

    //传输的是哪个文件(原文件路径)
    private String src;


}

6.1.2 消息类型类

/**
 * 消息类型
 * 不同行亮的值表示不同的消息类型
 */
@Getter
public enum MessageType {
    
    
    /**
     * 登录成功
     */
    MESSAGE_LOGIN_SUCCEED("1"),
    /**
     * 登录失败
     */
    MESSAGE_LOGIN_FAIL("2"),
    /**
     * 普通信息对象
     */
    MESSAGE_COMM_MES("3"),

    /**
     * 获取在线用户
     * 要求服务器返回在线用户列表
     */
    MESSAGE_GET_ONLINE_FRIEND("4"),

    /**
     * 服务器返回在线用户列表
     */
    MESSAGE_RETTURN_ONLINE_FRIEND("5"),

    /**
     * 客户端请求退出
     */
    MESSAGE_CLIENT_EXIT("6"),
    /**
     * 群发消息
     */
    MESSAGE_TO_ALL_EXIT("7"),
    /**
     * 发送文件
     */
    MESSAGE_FILE_MES("8"),
    ;

    private final String code;


    MessageType(String code) {
    
    
        this.code = code;
    }


    public static String find(Integer code) {
    
    
        for (MessageType value : MessageType.values()) {
    
    
            if (code.toString().equals(value.getCode())) {
    
    
                return value.getCode();
            }
        }
        return null;
    }
}

6.1.3 客户类

/**
 * 客户信息
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
    
    

    private static final long serialVersionUID = 4300366482842276408L;
    private String userId; //用户id
    private String passwd; //用户密码
}

6.1.4 控制台读取类

public class Utility {
    
    

    private static Scanner scanner;

    static {
    
    
        scanner = new Scanner(System.in);
    }

    public Utility() {
    
    

    }

    public static char readMenuSelection() {
    
    
        while (true) {
    
    
            String str = readKeyBoard(1, false);
            char c = str.charAt(0);
            if (c == '1' || c == '2' || c == '3' || c == '4' || c == '5') {
    
    
                return c;
            }

            System.out.print("选择错误,请重新输入:");
        }
    }

    public static char readChar() {
    
    
        String str = readKeyBoard(1, false);
        return str.charAt(0);
    }

    public static char readChar(char defaultValue) {
    
    
        String str = readKeyBoard(1, true);
        return str.length() == 0 ? defaultValue : str.charAt(0);
    }

    public static int readInt() {
    
    
        while (true) {
    
    
            String str = readKeyBoard(2, false);
            try {
    
    
                int n = Integer.parseInt(str);
                return n;
            } catch (NumberFormatException var3) {
    
    
                System.out.println("数字输入错误,请重新输入:");
            }
        }
    }

    public static int readInt(int defaultValue) {
    
    
        while (true) {
    
    
            String str = readKeyBoard(2, true);
            if (str.equals("")) {
    
    
                return defaultValue;
            }

            try {
    
    
                int n = Integer.parseInt(str);
                return n;
            } catch (NumberFormatException var4) {
    
    
                System.out.print("数字输入错误,请重新输入:");
            }
        }
    }

    private static String readKeyBoard(int limit, boolean blankReturn) {
    
    
        String line = "";

        while (scanner.hasNextLine()) {
    
    
            line = scanner.nextLine();
            if (line.length() == 0) {
    
    
                if (blankReturn) {
    
    
                    return line;
                }
            } else {
    
    
                if (line.length() >= 1 && line.length() <= limit) {
    
    
                    break;
                }
                System.out.println("输入长度(不大于" + limit + ")错误,请重新输入:");
            }
        }
        return line;
    }
    public static String readString(int limit) {
    
    
        return readKeyBoard(limit, false);
    }



    public static char readConfirmSelection(){
    
    
        while (true){
    
    
            String str=readKeyBoard(1,false).toUpperCase();
            char c=str.charAt(0);
            if(c=='Y'||c=='N'){
    
    
                return c;
            }
            System.out.print("选择错误,请重新输入:");
        }
    }

}

6.2 客户端代码

6.2.1 QQView 客户端页面

/**
 * 菜单界面
 */
public class QQView {
    
    

    /**
     * 控制是否显示菜单
     */
    private boolean loop = true;
    /**
     * 接收用户的键盘输入
     */
    private String key = "";

    /**
     * 完成用户登录验证和用户注册等功能
     */
    public UserClientService userClientService = new UserClientService();

    public MessageClientService messageClientService = new MessageClientService();

    private FileClientService fileClientService = new FileClientService();

    public static void main(String[] args) {
    
    
        QQView qqView = new QQView();
        qqView.mainMenu();
        System.out.println("退出客户端系统");
    }

    /**
     * 显示主菜单
     */
    private void mainMenu() {
    
    
        while (loop) {
    
    
            System.out.println("***********欢迎登录网络通信系统*************");
            System.out.println("\t\t 1 登录系统");
            System.out.println("\t\t 9 退出系统");
            System.out.print("请输入你的选择:");
            key = Utility.readString(1);

            //根据用户的输入来处理不同的逻辑
            switch (key) {
    
    
                case "1":
                    System.out.print("请输入用户号");
                    String userId = Utility.readString(50);
                    System.out.print("请输入密  码");
                    String password = Utility.readString(50);

                    //TODO 到服务端验证用户是否合法
                    if (userClientService.checkUser(userId,password)) {
    
    
                        //进入二级菜单
                        System.out.println(String.format("网络通信系统二级菜单(用户%s)", userId));
                        while (loop) {
    
    
                            System.out.println(String.format("\n========网络通信系统二级菜单(用户%s)===========", userId));
                            System.out.println("\t\t 1.显示在线用户列表");
                            System.out.println("\t\t 2.群发消息");
                            System.out.println("\t\t 3.私聊消息");
                            System.out.println("\t\t 4.发送文件");
                            System.out.println("\t\t 9.退出系统");

                            System.out.print("请输入你的选择:");
                            key = Utility.readString(1);
                            switch (key) {
    
    
                                case "1":
                                    //获取在线用户列表
                                    userClientService.onlineFriendList();
                                    break;
                                case "2":
                                    //群发消息
                                    System.out.print("请输入想说的话:");
                                    String content = Utility.readString(100);
                                    messageClientService.sendMessageToOnlineUser(userId,content);
                                    break;
                                case "3":
                                    //私发消息
                                    System.out.print("请输入想聊天的在线用户号:");
                                    //用户号最长为50
                                    String getterId = Utility.readString(50);
                                    System.out.print("请输入想说的话:");
                                    String contentToAll = Utility.readString(100);
                                    messageClientService.sendMessageToOne(contentToAll,userId,getterId);
                                    break;
                                case "4":
                                    System.out.println("正在发送文件....");
                                    System.out.print("请输入文件接收者:");
                                    String getter = Utility.readString(50);
                                    System.out.print("\n请输入想要发送文件的路径:");
                                    String src = Utility.readString(50);
                                    System.out.print("\n请输入想要将文件存储在对方哪里:");
                                    String dest = Utility.readString(50);
                                    fileClientService.sendFileToOne(src,dest,userId,getter);
                                    break;
                                case "9":
                                    loop = false;
                                    //调用方法,给服务器发送一个退出系统的Message
                                    System.out.println("退出系统");
                                    userClientService.logout();
                                    break;
                            }
                        }
                    }else {
    
    
                        System.out.println("登录服务器失败,用户名或密码存在问题");
                    }
                    break;
                case "9":
                    loop = false;
                    System.out.println("退出系统");
            }
        }
    }
}

6.2.2 ClientConnectServerThread线程类

@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ClientConnectServerThread extends Thread {
    
    
    //该线程需要持有Socket属性
    private Socket socket;


    /**
     * 因为Thread需要在后台跟我们的服务器进行通信(保持一个联系),因此我们使用while循环来控制
     */
    @Override
    public void run() {
    
    
        while (true) {
    
    
            //一直读取从服务器端回收的消息
            System.out.println("客户端线程,等待读取从服务端发送的消息....");

            try {
    
    
                ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                //如果服务器端没有发送消息过来,这个地方会堵塞,此线程会一直等待
                //这就是一个堵塞式网络编程,效率是相对比较低的
                Message message = (Message) ois.readObject();

                //判断message的类型,然后做响应的业务处理
                if (message.getMesType().equals(MessageType.MESSAGE_RETTURN_ONLINE_FRIEND.getCode())) {
    
    
                    //获取在线用户,取出在线列表信息并显示
                    String[] onlineUsers = message.getContent().split(" ");
                    System.out.println("当前在线用户列表如下");
                    for (int i = 0; i < onlineUsers.length; i++) {
    
    
                        System.out.println("用户:" + onlineUsers[i]);
                    }
                } else if (MessageType.MESSAGE_COMM_MES.getCode().equals(message.getMesType())) {
    
    
                    //转发给指定客户端,假如说客户不在线的话,可以保存到数据库,这样就可以实现离线留言
                    System.out.println("\n用户" + message.getGetter() + "收到来自用户" + message.getSender() + "的消息:" + message.getContent());
                } else if (MessageType.MESSAGE_TO_ALL_EXIT.getCode().equals(message.getMesType())) {
    
    
                    //群发消息
                    System.out.println("\n用户收到来自用户" + message.getSender() + "的群发消息:" + message.getContent());

                } else if (MessageType.MESSAGE_FILE_MES.getCode().equals(message.getMesType())) {
    
    
                    System.out.println("用户" + message.getGetter() + "收到用户" + message.getSender() + "发送的文件" + message.getSrc() + "并存储到我方电脑目录" + message.getDest());
                    FileOutputStream fileOutputStream = new FileOutputStream(message.getDest());
                    fileOutputStream.write(message.getFileBytes());
                    fileOutputStream.flush();
                    fileOutputStream.close();
                    System.out.println("保存文件成功");
                } else {
    
    
                    System.out.println("其他类型的message,暂时不处理");
                }
            } catch (IOException | ClassNotFoundException e) {
    
    
                e.printStackTrace();
            }
        }
    }
}

6.2.3 ManagerClientConnectServerThread线程管理类

/**
 * 管理客户端连接到服务端线程的一个类
 */
public class ManagerClientConnectServerThread {
    
    
    //把多个线程放入一个HashMap中进行管理,key是用户id,value是客户端与服务端通信的线程
    private static HashMap<String, ClientConnectServerThread> hm = new HashMap<>();

    //将某个线程加入到集合中
    public static void addClientConnectServerThread(String userId, ClientConnectServerThread clientConnectServerThread) {
    
    
        hm.put(userId, clientConnectServerThread);
    }

    public static ClientConnectServerThread getClientConnectServerThread(String userId) {
    
    
        return hm.get(userId);
    }
}

6.2.4 MessageClientService发送消息类

/**
 * 该类提供和消息相关的服务方法
 */
public class MessageClientService {
    
    
    /**
     * @param content  内容
     * @param senderId 发送用户id
     * @param getterId 接收用户id
     */
    public void sendMessageToOne(String content, String senderId, String getterId) {
    
    
        //封装消息
        Message message = new Message();
        message.setContent(content);
        message.setSender(senderId);
        message.setGetter(getterId);
        message.setSendTime(new Date().toString());
        message.setMesType(MessageType.MESSAGE_COMM_MES.getCode());//普通消息
        System.out.println("用户"+senderId+"和用户"+getterId+"说:"+content);
        //获取senderId对应的socket
        ClientConnectServerThread clientConnectServerThread = ManagerClientConnectServerThread.getClientConnectServerThread(senderId);
        Socket socket = clientConnectServerThread.getSocket();

        //输出消息
        try {
    
    
            ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
            oos.writeObject(message);
            oos.flush();

        } catch (IOException e) {
    
    
            e.printStackTrace();
        }

    }

    /**
     * 群发消息
     * @param userId 发送消息的用户id
     * @param content 需要发送的内容
     */
    public void sendMessageToOnlineUser(String userId, String content) {
    
    
        Message message = new Message();
        message.setContent(content);
        message.setSender(userId);
        message.setSendTime(new Date().toString());
        message.setMesType(MessageType.MESSAGE_TO_ALL_EXIT.getCode());//普通消息
        System.out.println("用户"+userId+"群发消息说:"+content);

        ClientConnectServerThread clientConnectServerThread = ManagerClientConnectServerThread.getClientConnectServerThread(userId);
        Socket socket = clientConnectServerThread.getSocket();

        //输出消息
        try {
    
    
            ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
            oos.writeObject(message);
            oos.flush();

        } catch (IOException e) {
    
    
            e.printStackTrace();
        }
    }
}

6.2.5 FileClientService发送文件类

/**
 * 该类完成文件的传输
 */
public class FileClientService {
    
    

    public void sendFileToOne(String src, String dest, String sender, String getter) {
    
    
        //读取src文件
        Message message = new Message();
        message.setMesType(MessageType.MESSAGE_FILE_MES.getCode());
        message.setSender(sender);
        message.setGetter(getter);
        message.setSrc(src);
        message.setDest(dest);
        //需要将文件从客户端读取
        FileInputStream fileInputStream = null;
        byte[] fileBytes = new byte[(int) new File(src).length()];
        // 二进制流

        try {
    
    
            //读取文件
            fileInputStream = new FileInputStream(src);
            //将src文件读入到程序的字节数组中
            fileInputStream.read(fileBytes);

            //将文件对应的字节数粗设置到message
            message.setFileBytes(fileBytes);


        } catch (Exception e) {
    
    
            e.printStackTrace();
        }finally {
    
    
            if (fileInputStream!=null){
    
    
                try {
    
    
                    fileInputStream.close();
                } catch (IOException e) {
    
    
                    e.printStackTrace();
                }
            }
        }

        //提示信息
        System.out.println("用户" + sender + "向用户" + getter + "发送文件" + src + "并存储到对方电脑目录" + dest);

        //向服务端发送Message
        try {
    
    
            ObjectOutputStream oos = new ObjectOutputStream(ManagerClientConnectServerThread.getClientConnectServerThread(sender).getSocket().getOutputStream());
            oos.writeObject(message);
            oos.flush();
        } catch (Exception e) {
    
    
            e.printStackTrace();
        }
        System.out.println("发送文件完毕");
    }

}

6.2.6 UserClientService 用户登录验证类

/**
 * 完成用户登录验证和用户注册等功能
 */
@Data
public class UserClientService {
    
    

    //其他地方也会使用user信息,所以将其作为一个属性
    private User user = new User();

    private Socket socket = null;

    /**
     *根据userId和pwd到服务器验证该用户是否合法
     */
    public boolean checkUser(String userId, String pwd) {
    
    
        //临时变量b,用户是否合法的标志
        boolean b = false;

        //TODO 创建User对象
        user.setUserId(userId);
        user.setPasswd(pwd);

        try {
    
    
            //TODO 连接到服务端,发送User对象
            socket = new Socket(InetAddress.getByName("127.0.0.1"), 9999);
            //得到ObjectOutputStream对象流(序列化流,也是字节流中一种)
            ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
            oos.writeObject(user);
            oos.flush();

            //TODO 读取从服务器回复的Message对象
            ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
            Message msg = (Message) ois.readObject();

            if (MessageType.find(1).equals(msg.getMesType())) {
    
    

                //登录成功
                //一旦登录成功,我们需要启动一个线程维护或者持有此socket,保持此线程可以跟我们服务器端一直进行通信
                //不启动线程的话此Socket不好维护。如果我们有数据发送或者接收,我们可以从这个线程里面进行拉取
                //为什么将Socket放入一个线程中管理?
                // 1.如果不创建这个线程的话,一个客户端会有多个socket,socket管理起来就比较麻烦
                // 2.需要socket不断的从数据通道中读写数据,所以也必须做成一个线程
                ClientConnectServerThread ccst = new ClientConnectServerThread(socket);
                //启动客户端的线程
                ccst.start();
                //为了后面客户端的扩展,我们将线程放入到集合中管理
                ManagerClientConnectServerThread.addClientConnectServerThread(userId, ccst);

                b = true;
            } else {
    
    
                //登录失败
                //我们是有Socket的,但是没有线程,即登录失败,不能启动和服务器通信的线程
                //关闭socket
                socket.close();
            }

        } catch (IOException | ClassNotFoundException e) {
    
    
            e.printStackTrace();
        }

        return b;
    }

    /**
     * 向服务器端请求在线用户列表
     */
    public void onlineFriendList(){
    
    
        //发送一个message,并且消息的类型是MESSAGE_GET_ONLINE_FRIEND
        Message message = new Message();
        message.setMesType(MessageType.MESSAGE_GET_ONLINE_FRIEND.getCode());
        message.setSender(user.getUserId());
        //发送给服务器
        //得到当前线程的Socket对应的ObjectOutputStream
        //clientConnectServerThread线程一直在运行过程中,监听从服务器传输过来的消息
        ClientConnectServerThread clientConnectServerThread = ManagerClientConnectServerThread.getClientConnectServerThread(user.getUserId());
        try {
    
    

            ObjectOutputStream oos = new ObjectOutputStream(clientConnectServerThread.getSocket().getOutputStream());
            oos.writeObject(message);
            oos.flush();
        } catch (IOException e) {
    
    
            e.printStackTrace();
        }
    }


    /**
     * 编写方法退出客户端,并给服务端发送一个退出系统的Message对象
     */
    public void logout(){
    
    
        Message message = new Message();
        message.setMesType(MessageType.MESSAGE_CLIENT_EXIT.getCode());
        // 要退出这个用户
        message.setSender(user.getUserId());
        ClientConnectServerThread clientConnectServerThread = ManagerClientConnectServerThread.getClientConnectServerThread(user.getUserId());
        try {
    
    

            ObjectOutputStream oos = new ObjectOutputStream(clientConnectServerThread.getSocket().getOutputStream());
            oos.writeObject(message);
            oos.flush();
           System.exit(0);
        } catch (IOException e) {
    
    
            e.printStackTrace();
        }
    }

    /**
     *
     */
}

6.3 服务端代码

6.3.1 后台启动

/**
 * 此类创建一个QQServer对象,启动后台的服务
 */
public class QQFrame {
    
    
    public static void main(String[] args) {
    
    
        //创建QQServer对象,会启动QQServer构造器
        QQServer qqServer = new QQServer();

    }
}

6.3.2 服务器

/**
 * 这是服务器,在监听9999,等待客户端的连接,并保持通信
 */
@Data
public class QQServer {
    
    

    //创建一个集合存放多个用户,如果是此用户登录,便认为是合法的
    //也可以使用ConcurrentHashMap,可以在并发的环境下处理(没有线程安全问题)
    //HashMap是没有处理线程安全的,因此在多线程情况下是不安全的
    private static HashMap<String, User> validUser = new HashMap<>();

    private ServerSocket serverSocket = null;

    /**
     * 进行类加载的时候会执行下面这个代码
     */
    static {
    
    
        validUser.put("100", new User("100", "123456"));
        validUser.put("200", new User("200", "123456"));
        validUser.put("300", new User("300", "123456"));
        validUser.put("至尊宝", new User("至尊宝", "123456"));
        validUser.put("紫霞仙子", new User("紫霞仙子", "123456"));
        validUser.put("菩提老祖", new User("菩提老祖", "123456"));
    }

    /**
     * 这是一个循环监听的过程
     * 并不是客户端A发送完信息服务器接收到后此服务器就关闭,而是一直监听,因为还有可能其他客户端发送过来信息
     */
    public QQServer() {
    
    
        System.out.println("服务端在9999端口监听....");
        //启动推送新闻的线程
        new Thread(new SendNewsAllService()).start();

        ObjectInputStream ois = null;
        ObjectOutputStream oos = null;
        try {
    
    
            this.serverSocket = new ServerSocket(9999);

            //监听是一直进行,当和某个客户端连接后,会继续监听,因此使用while循环
            while (true) {
    
    
                //没有客户端连接9999端口时,程序会堵塞,等待连接
                Socket socket = serverSocket.accept();

                ois = new ObjectInputStream(socket.getInputStream());
                //如果服务器端没有发送消息过来,这个地方会堵塞,此线程会一直等待
                //读取客户端发送的User对象
                User user = (User) ois.readObject();

                //创建Message对象,准备恢复客户端
                Message message = new Message();
                oos = new ObjectOutputStream(socket.getOutputStream());
                //验证用户是否合法
                User userValid = validUser.get(user.getUserId());
                if (userValid != null && userValid.getUserId().equals(user.getUserId()) && userValid.getPasswd().equals(user.getPasswd())) {
    
    
                    //合法用户
                    message.setMesType(MessageType.find(1));
                    //给客户端进行回复
//                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                    oos.writeObject(message);
                    oos.flush();

                    //创建一个线程,和客户端保持通信。
                    //该线程需要持有Socket对象
                    ServerConnectClientThread serverConnectClientThread = new ServerConnectClientThread(user.getUserId(), socket);
                    serverConnectClientThread.start();

                    //把该线程对象放入到一个集合中
                    ManagerServerConnectServerThread.addClientThread(user.getUserId(), serverConnectClientThread);

                } else {
    
    
                    //登录失败
                    message.setMesType(MessageType.find(2));
                    oos.writeObject(message);
                    oos.flush();

                    socket.close();
                }
            }


        } catch (IOException | ClassNotFoundException e) {
    
    
            e.printStackTrace();
        } finally {
    
    
//          如果服务端退出了while循环,说明服务器端不再监听了,因此需要关闭资源
            if (serverSocket != null) {
    
    
                try {
    
    
                    serverSocket.close();
                } catch (IOException e) {
    
    
                    e.printStackTrace();
                }
            }
            if (ois != null) {
    
    
                try {
    
    
                    ois.close();
                } catch (IOException e) {
    
    
                    e.printStackTrace();
                }
            }

            if (oos != null) {
    
    
                try {
    
    
                    oos.close();
                } catch (IOException e) {
    
    
                    e.printStackTrace();
                }
            }

        }
    }
}

6.3.3 ServerConnectClientThread线程类

/**
 * 该类对应的对象和某个客户端保持通信
 */
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServerConnectClientThread extends Thread {
    
    

    /**
     * 可以区分此socket是和哪个用户进行关联的
     */
    private String userId;//连接到服务端的这个用户id

    private Socket socket;

    /**
     * 线程处于run状态,可以发送或者接收客户端的消息
     */
    @Override
    public void run() {
    
    
        //不断的从socket中读数据和写数据
        while (true) {
    
    
            System.out.println("服务端和客户端保持通信,读取数据.... userId:" + userId);
            ObjectInputStream ois = null;
            try {
    
    
                ois = new ObjectInputStream(socket.getInputStream());
                //读取数据
                Message message = (Message) ois.readObject();

                //根据Message的类型,判断客户端想要执行什么操作
                if (MessageType.MESSAGE_GET_ONLINE_FRIEND.getCode().equals(message.getMesType())) {
    
    
                    System.out.println("用户" + userId + "获取在线用户");
                    //拉取在线用户(客户端要拉取在线用户列表)
                    Socket socket = ManagerServerConnectServerThread.getClientThread(userId).getSocket();

                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                    //构建Message发送给服务端
                    Message returnMessage = new Message();
                    returnMessage.setMesType(MessageType.MESSAGE_RETTURN_ONLINE_FRIEND.getCode());
                    returnMessage.setContent(ManagerServerConnectServerThread.getOnlineUser());
                    //说明要发送给谁
                    returnMessage.setGetter(message.getSender());
                    //返回给客户端
                    oos.writeObject(returnMessage);
                    oos.flush();
                } else if (MessageType.MESSAGE_CLIENT_EXIT.getCode().equals(message.getMesType())) {
    
    
                    //说明客户端想要退出,服务端要将socket关闭并退出线程就可以了
                    //将客户端对应的线程从集合中删除
                    ManagerServerConnectServerThread.remove(userId);
                    //关闭socket
                    socket.close();
                    System.out.println("用户" + userId + "退出系统");
                    //退出循环
                    return;
                } else if (MessageType.MESSAGE_COMM_MES.getCode().equals(message.getMesType())) {
    
    
                    //转发给指定客户端,假如说客户不在线的话,可以保存到数据库,这样就可以实现离线留言
                    Socket socket = ManagerServerConnectServerThread.getClientThread(message.getGetter()).getSocket();
                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

                    oos.writeObject(message);
                    oos.flush();

                } else if (MessageType.MESSAGE_TO_ALL_EXIT.getCode().equals(message.getMesType())) {
    
    
                    //群发消息
                    //遍历线程集合取出所有线程对应的socket发送消息即可
                    HashMap<String, ServerConnectClientThread> hm = ManagerServerConnectServerThread.getHm();
                    Iterator<String> iterator = hm.keySet().iterator();
                    while (iterator.hasNext()) {
    
    
                        //取出在线人的id
                        String onlineId = iterator.next();
                        if (!onlineId.equals(message.getSender())) {
    
    
                            ObjectOutputStream oos = new ObjectOutputStream(
                                    hm.get(onlineId).getSocket().getOutputStream()
                            );
                            oos.writeObject(message);
                            oos.flush();
                        }

                    }

                }else if (MessageType.MESSAGE_FILE_MES.getCode().equals(message.getMesType())){
    
    
                    System.out.println("用户" + message.getSender() + "向用户" + message.getGetter() + "发送文件" + message.getSrc() + "并存储到对方电脑目录" + message.getDest());
                    //发送文件
                    Socket socket = ManagerServerConnectServerThread.getClientThread(message.getGetter()).getSocket();
                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

                    oos.writeObject(message);
                    oos.flush();
                }else {
    
    
                    System.out.println("其他类型暂时不处理");
                }

            } catch (IOException | ClassNotFoundException e) {
    
    
                e.printStackTrace();
            }
            //如果服务器端没有发送消息过来,这个地方会堵塞,此线程会一直等待
            //读取客户端发送的User对象

        }
    }
}

6.3.4 ManagerServerConnectServerThread管理线程类

/**
 * 该类用于管理和客户端通信的线程
 */
@Data
public class ManagerServerConnectServerThread {
    
    
    private static HashMap<String, ServerConnectClientThread> hm = new HashMap<>();


    public static HashMap<String, ServerConnectClientThread> getHm() {
    
    
        return hm;
    }

    /**
     * 添加线程对象到hm集合
     */
    public static void addClientThread(String userId, ServerConnectClientThread clientConnectServerThread) {
    
    
        hm.put(userId, clientConnectServerThread);
    }

    /**
     * 从集合中获取对应线程对象
     */
    public static ServerConnectClientThread getClientThread(String userId) {
    
    
        return hm.get(userId);
    }

    /**
     * 获取在线用户
     */
    public static String getOnlineUser() {
    
    
        //集合遍历,遍历hashMap的key
        Iterator<String> iterator = hm.keySet().iterator();
        String onlineUserList = "";

        while (iterator.hasNext()) {
    
    
            onlineUserList += iterator.next().toString() + " ";
        }
        return onlineUserList;
    }

    /**
     * 从集合中删除掉某个线程对象
     */
    public static void remove(String userId) {
    
    
       hm.remove(userId);
    }

}

6.3.5 SendNewsAllService 推送新闻类

/**
 * 发送新闻
 */
public class SendNewsAllService implements Runnable {
    
    


    @Override
    public void run() {
    
    
        //多次推送新闻,使用while循环
        while (true) {
    
    
            System.out.println("请输入服务器要推送的信息/消息【输入exit表示退出】");
            String content = Utility.readString(500);
            if ("exit".equals(content)) {
    
    
                break;
            }
            //构建消息类型
            Message message = new Message();
            message.setSender("服务器");
            message.setMesType(MessageType.MESSAGE_TO_ALL_EXIT.getCode());
            message.setContent(content);
            message.setSendTime(new Date().toString());
            System.out.println("服务器推送消息给所有人 说:" + content);

            //遍历当前所有的通信线程得到socket
            HashMap<String, ServerConnectClientThread> hm = ManagerServerConnectServerThread.getHm();
            Iterator<String> iterator = hm.keySet().iterator();
            while (iterator.hasNext()) {
    
    
                String next = iterator.next();
                ServerConnectClientThread serverConnectClientThread = hm.get(next);
                try {
    
    
                    //给每个用户发送消息
                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(serverConnectClientThread.getSocket().getOutputStream());
                    objectOutputStream.writeObject(message);
                    objectOutputStream.flush();
                } catch (IOException e) {
    
    
                    e.printStackTrace();
                }
            }
        }

    }
}

猜你喜欢

转载自blog.csdn.net/weixin_51351637/article/details/134902569
2.2