BIO、NIO实现聊天室

一、前言

最近在修改自己以前写的博客内容,隔一段时间有更新的理解,并且以前有的文章也是半懂不懂写上去的。最近打算重写一下Netty的那篇博客,本文是为了个人更加深刻的理解NIO所写,本例写完后,可以对比出NIO想较于BIO在多客户端连接时,可以看出通过多路复用的方式很好的解决了每个客户端必须创建新的线程的问题。

二、简介

  • BIO - 同步阻塞IO : 传统的IO操作,在读/写文件时线程会一直阻塞,直到文件读/写结束。

  • NIO - 同步非阻塞IO:NIO在读/写文件时并不阻塞当前线程,也就是说在读/写文件时是线程是可以继续执行其他任务的。NIO之所以是同步,是因为它的accept/read/write方法的内核I/O操作都会阻塞当前线程。IO 都是同步阻塞模式,所以需要多线程以实现多任务处理。而 NIO 则是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高

    NIO多路复用主要步骤和元素:

    1. 首先,通过 Selector.open() 创建一个 Selector,作为类似调度员的角色。
    2. 然后,创建一个 ServerSocketChannel,并且向 Selector 注册,通过指定 SelectionKey.OP_ACCEPT,告诉调度员,它关注的是新的连接请求。注意,为什么我们要明确配置非阻塞模式呢?这是因为阻塞模式下,注册操作是不允许的,会抛出 IllegalBlockingModeException 异常。
    3. Selector 阻塞在 select 操作,当有 Channel 发生接入请求,就会被唤醒。
    4. 在 具体的 方法中,通过 SocketChannel 和 Buffer 进行数据操作

    NIO 的核心思路:

    1. NIO 模型中通常会有两个线程,每个线程绑定一个轮询器 selector ,在上面例子中serverSelector负责轮询是否有新的连接,clientSelector负责轮询连接是否有数据可读

    2. 服务端监测到新的连接之后,不再创建一个新的线程,而是直接将新连接绑定到clientSelector上,这样就不用BIO模型中1w 个while循环在阻塞,参见1

    3. clientSelector被一个 while 死循环包裹着,如果在某一时刻有多条连接有数据可读,那么通过clientSelector.select(1)方法可以轮询出来,进而批量处理,参见2

    4. 数据的读写面向 Buffer,参见4

  • AIO - 异步非阻塞IO:AIO是异步IO的缩写,虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的。对于NIO来说,我们的业务线程是在IO操作准备好时,得到通知,接着就由这个线程自行进行IO操作,IO操作本身是同步的。但是对AIO来说,则更加进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此AIO是不会阻塞的,此时我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。 即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。

关于这一部分的详细内容可以参考 : https://www.cnblogs.com/sxkgeek/p/9488703.html

附: 一张NIO的流转图
来源: https://my.oschina.net/andylucc/blog/614295?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io
在这里插入图片描述

三、实现

关于 代码中的 Lambda.forEach、Lambda.map 请戳这里

1. BIO

BIO 网上很多例子,这里就简单写一下。BIO的实现避免不了每个客户端创建一个线程,这显然是不合理的是。所以下面写了NIO版本。

1. 服务端

public class ChatServer implements SocketMsg {
    
    
    private static final int PORT = 6666;
    private ExecutorService clientPool;
    private List<Socket> clientList;

    public ChatServer() {
    
    
        clientPool = Executors.newCachedThreadPool();
        clientList = new ArrayList<>();
    }

    /**
     * 启动服务
     *
     * @throws IOException
     */
    public void start() throws IOException {
    
    
        ServerSocket serverSocket = new ServerSocket(PORT);
        while (true) {
    
    
            // 等待客户端连接
            Socket clientSocket = serverSocket.accept();
            System.out.println("有新客户端连接:" + clientSocket.toString());
            clientList.add(clientSocket);
            // 开辟一个新线程处理客户端事件
            clientPool.execute(new MsgHandler(clientSocket, this));
        }
    }

    /**
     * 转发消息到所有客户端
     *
     * @param msg
     */
    @Override
    public void dispatchMsg(Message msg) {
    
    
        clientList.stream().forEach(Lambda.forEach(client -> {
    
    
            OutputStream outputStream = client.getOutputStream();
            outputStream.write(JSON.toJSONString(msg).getBytes());
            outputStream.flush();
        }));
    }
}

/**
 * 回调接口
 */
interface SocketMsg {
    
    
    void dispatchMsg(Message msg);
}

2. 客户端

public class ChatClient {
    
    
    private static final int PORT = 6666;
    private static final String SERVER_IP = "localhost";
    private Socket clientSocket;

    public ChatClient() throws IOException {
    
    
        clientSocket = new Socket(SERVER_IP, PORT);
        new Thread(() -> {
    
    
            try {
    
    
                while (true) {
    
    
                    InputStream inputStream = clientSocket.getInputStream();
                    int msgLength = inputStream.available();
                    if (msgLength > 0) {
    
    
                        byte[] msgBytes = new byte[msgLength];
                        inputStream.read(msgBytes);
                        System.out.println("收到消息:" + new String(msgBytes));
                    }
                }
            } catch (IOException e) {
    
    
                e.printStackTrace();
            }
        }).start();
    }

    /**
     * 发送消息
     * @param msg
     * @return
     * @throws IOException
     */
    public void sendMsg(Message msg) throws IOException {
    
    
        OutputStream outputStream = clientSocket.getOutputStream();
        outputStream.write(JSON.toJSONString(msg).getBytes());
        outputStream.flush();
    }

    /**
     * 关闭连接
     * @throws IOException
     */
    public void close() throws IOException {
    
    
        clientSocket.close();
    }
}

3. 消息处理类

public class MsgHandler implements Runnable {
    
    
    private Socket clientSocket;
    private SocketMsg socketMsg;

    public MsgHandler(Socket clientSocket, SocketMsg socketMsg) {
    
    
        this.clientSocket = clientSocket;
        this.socketMsg = socketMsg;
    }

    @Override
    public void run() {
    
    
        try {
    
    
            while (true) {
    
    
                Thread.sleep(200);
                InputStream inputStream = clientSocket.getInputStream();
                int msgLength = inputStream.available();
                if (msgLength > 0) {
    
    
                    byte[] msgBytes = new byte[msgLength];
                    inputStream.read(msgBytes);
                    System.out.println("服务端接收到消息:" + new String(msgBytes));
                    Message message = JSON.parseObject(new String(msgBytes), Message.class);
                    if (MessageEnum.SENGMSG == message.getMessageEnum()){
    
    
                        message.setMessageEnum(MessageEnum.DISPATCHMSG);
                        System.out.println("服务端接转发消息:" +message.toString());
                        socketMsg.dispatchMsg(message);
                    }
                }

            }
        } catch (IOException | InterruptedException e) {
    
    
            e.printStackTrace();
        }
    }
}

4. 其它

消息结构

public class Message  {
    
    
    private String clientName;
    private String data;
    private MessageEnum messageEnum;   // 消息类型,0

    public String getClientName() {
    
    
        return clientName;
    }

    public void setClientName(String clientName) {
    
    
        this.clientName = clientName;
    }

    public String getData() {
    
    
        return data;
    }

    public void setData(String data) {
    
    
        this.data = data;
    }

    public MessageEnum getMessageEnum() {
    
    
        return messageEnum;
    }

    public void setMessageEnum(MessageEnum messageEnum) {
    
    
        this.messageEnum = messageEnum;
    }

    @Override
    public String toString() {
    
    
        return "Message{" +
                "clientName='" + clientName + '\'' +
                ", data='" + data + '\'' +
                ", messageEnum=" + messageEnum +
                '}';
    }
}

消息类型枚举,发送和转发类型

public enum MessageEnum {
    
    
    SENGMSG, DISPATCHMSG
}

2. NIO

2.1 讲解

整个结构图大致:

在这里插入图片描述
解读: 这里的解读会附带下面的代码。

  1. 服务端启动时会将 serverSocketChannel 注册到 selector 上,selector 将会监听 serverSocketChannel 通道上的 OP_ACCEPT 类型事件。
    在这里插入图片描述
  2. 客户端01连接时,会再打开一个Selector。将socketChannel 通道上的 OP_CONNECT 事件注册到 客户端01的Selector上
    在这里插入图片描述
  3. 当客户端连接服务器时,服务器的 serverSocketChannel 中 会出现 ON_ACCEPT 的就绪事件,这时候
    selector.select() > 0 成立,会遍历就绪事件并且调用handler方法处理。注意处理完一定要清空集合,否则会重复消费事件。
    在这里插入图片描述
  4. 服务端在Handler方法中判断,如果是 ON_ACCEPT 事件则创建到客户端的 socketChannel ,并监听这个通道中的 ON_READ 事件。当客户端向这个通道中写入数据时,Selector可以监听到并处理。
    在这里插入图片描述
  5. 当客户端连接成功后,注册到自己的Selector上ON_READ 事件,即当 客户端的socketChannel 通道中有可读事件时, Selector可以监听到。并且开启一个新线程监控控制台输入,并写入到SocketChannel中
    在这里插入图片描述
  6. 服务端检测到有客户端的可读事件,则读取数据并调用 dispatch 方法转发给所有的客户端(每个客户端都有一个Selector都监听着SocketChannel中的可读事件)。当服务端调用dispatch方法转发消息时,客户端的SocketChannel中具有可读事件,则客户端中的 Selector中发现有可读就绪事件,则读取处理。

其它:
1. SocketChannel.read(ByteBuffer dst) 返回值可能为0或者-1。具体返回情况如下:

  • 返回-1 : 客户端的数据发送完毕,并且主动的close socket。所以在这种场景下,(服务器程序)你需要关闭socketChannel并且取消key,最好是退出当前函数。注意,这个时候服务端要是继续使用该socketChannel进行读操作的话,就会抛出“远程主机强迫关闭一个现有的连接”的IO异常。

  • 返回0: 其实read返回0有3种情况,一是某一时刻socketChannel中当前(注意是当前)没有数据可以读,这时会返回0,其次是bytebuffer的position等于limit了,即bytebuffer的remaining等于0,这个时候也会返回0,最后一种情况就是客户端的数据发送完毕了(注意看后面的程序里有这样子的代码),这个时候客户端想获取服务端的反馈调用了recv函数,若服务端继续read,这个时候就会返回0。

2. socketChannel.register(selector, SelectionKey.OP_ACCEPT);

selector 将关心 socketChannel 通道中的 SelectionKey.OP_ACCEPT 类型事件。即 当 socketChannel 通道中有 OP_ACCEPT 事件就绪时,selector.select() 返回值大于0。会执行代码中的方法。

2.2 代码

代码上有详细注释。不再赘述具体内容。

服务端

public class NioChatServer {
    
    
    private static final int PORT = 8888;
    private List<SocketChannel> socketChannels = new ArrayList<>();


    /**
     * 启动服务
     *
     * @throws IOException
     */
    public void startServer() throws IOException {
    
    
        // 开启一个 ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 设置成非阻塞形式
        serverSocketChannel.configureBlocking(false);
        // 绑定端口号
        serverSocketChannel.bind(new InetSocketAddress(PORT));
        // 将通道注册到 Selector 上。该Selector会关心 serverSocketChannel 上的 accept事件
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("## 服务器启动成功");
        while (true) {
    
    
            // 如果就绪事件数量大于1。即说明有关心事件就绪
            if (selector.select() > 0) {
    
    
                // 获取遍历事件进行处理
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                selectionKeys.stream().forEach(Lambda.forEach(key -> hanlder(key)));
                selectionKeys.clear();
            }
        }
    }

    private void hanlder(SelectionKey selectionKey) throws IOException {
    
    
        if (selectionKey.isValid()) {
    
    
            // 判断事件类型
            if (selectionKey.isAcceptable()) {
    
    
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
                // 获取新接入的客户端
                SocketChannel socketChannel = serverSocketChannel.accept();
                System.out.println("## 有新客户端接入 : " + socketChannel.toString());
                socketChannel.configureBlocking(false);
                // 将 socketChannel 也注册到 该Selector上,关心事件类型为 Read
                socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
                socketChannels.add(socketChannel);
            } else if (selectionKey.isReadable()) {
    
         // 管道中读就绪  ->  客户端向服务端发送消息
                // 读取管道中的可读内容
                StringBuilder msg = new StringBuilder();
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int read = 0;
                while ((read = socketChannel.read(byteBuffer)) != 0) {
    
    
                    // 如果有客户端要退出
                    if (-1 == read) {
    
    
                        socketChannel.close();
                        msg.append(socketChannel.toString() + "客户端断开连接");
                        break;
                    }
                    byteBuffer.flip();
                    msg.append(new String(byteBuffer.array(), 0, read));
                    byteBuffer.clear();
                }

                System.out.println("## 服务端收到消息: " + msg.toString());
                // 转发消息给所有客户端
                dispatch(msg.toString());

            }
        }
    }

    /**
     * 转发信息给所有客户端
     *
     * @param msg
     */
    private void dispatch(String msg) throws IOException {
    
    
        ByteBuffer byteBuffer = ByteBuffer.allocate(msg.getBytes().length);
        byteBuffer.put(msg.getBytes());
        for (int i = 0; i < socketChannels.size(); i++) {
    
    
            SocketChannel client = socketChannels.get(i);
            if (client.isOpen()) {
    
    
                byteBuffer.flip();
                System.out.println("服务端转发消息: " + client.toString());
                client.write(byteBuffer);
            } else {
    
    
                socketChannels.remove(client);
            }
        }
    }
}

客户端

public class NioChatClient {
    
    
    private static final String SERVER_IP = "localhost";
    private static final int PORT = 8888;

    private String clientName;

    public NioChatClient(String clientName) {
    
    
        this.clientName = clientName;
    }

    /**
     * 连接服务器
     *
     * @throws IOException
     */
    public void connect() throws IOException {
    
    
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        // 让 Selector 关心 channal 的 connect 事件。
        socketChannel.register(selector, SelectionKeyImpl.OP_CONNECT);

        socketChannel.connect(new InetSocketAddress(SERVER_IP, PORT));
        System.out.println("## " + clientName + "客户端开始连接...");

        while (socketChannel.isOpen()) {
    
    
            if (selector.select() > 0) {
    
    
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                selectionKeys.forEach(Lambda.forEach(key -> handler(key)));
                selectionKeys.clear();
            }
        }
    }

    /**
     * 处理就绪事件
     *
     * @param key
     * @throws IOException
     */
    private void handler(SelectionKey key) throws IOException {
    
    
        if (key.isValid()) {
    
    
            if (key.isConnectable()) {
    
    
                SocketChannel socketChannel = (SocketChannel) key.channel();
                if (socketChannel.isConnectionPending()) {
    
    
                    socketChannel.finishConnect();
                    System.out.println("## " + clientName + "客户端连接成功...");
                    socketChannel.register(key.selector(), SelectionKey.OP_READ);
                    // 开启新线程监控控制台输入内容
                    new Thread(() -> {
    
    
                        Scanner scanner = new Scanner(System.in);
                        while (true) {
    
    
                            String msg = scanner.nextLine();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(msg.getBytes().length);
                            try {
    
    
                                if ("exit".equals(msg)) {
    
    
                                    socketChannel.close();
                                    System.out.println("## 退出连接");
                                    break;
                                }
                                // 将控制台内容写入管道中。即发送消息到客户端
                                byteBuffer.put(msg.getBytes());
                                byteBuffer.flip();
                                socketChannel.write(byteBuffer);
                                System.out.println("## " + clientName + " 发送消息: " + msg);
                            } catch (IOException e) {
    
    
                                e.printStackTrace();
                            } finally {
    
    
                                byteBuffer = null;
                            }
                        }
                    }).start();
                }
            } else if (key.isReadable()) {
    
    
                // 读就绪,即管道中有消息可读,说明服务端向客户端发送了消息
                StringBuilder msg = new StringBuilder();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                SocketChannel socketChannel = (SocketChannel) key.channel();
                int read = 0;
                while ((read = socketChannel.read(byteBuffer)) != 0) {
    
    
                    if (-1 == read) {
    
    
                        socketChannel.close();
                        msg.append("客户端断开连接");
                        break;
                    }
                    byteBuffer.flip();
                    msg.append(new String(byteBuffer.array(), 0, read));
                    byteBuffer.clear();
                }
                System.out.println("## " + clientName + " 收到消息: " + msg);
            }
        }
    }
}

以上:内容部分参考
https://www.jianshu.com/p/16104564f640
https://mp.weixin.qq.com/s/wVoHfhh28Vh5sgKQbPXk8w
https://www.cnblogs.com/sxkgeek/p/9488703.html
如有侵扰,联系删除。 内容仅用于自我记录学习使用。如有错误,欢迎指正

猜你喜欢

转载自blog.csdn.net/qq_36882793/article/details/103879927