JavaNIO--2.实现ECHO服务器

JavaNIO技术实现ECHO服务器

所谓ECHO服务器就是客户端发送到服务器端什么内容,服务器端就返回什么内容的一种服务器,者几乎是最简单的网络服务器(当然还有更简单的抛弃服务器)

阅读需要基础:JavaNIO基础

1.NIO核心组件的使用

NIO核心组件主要包括SelectorChannel,而Buffer主要用于和Channel进行数据交互,所以不在此作详细的使用介绍。

1.1初始化NIO组件

public class NioServer {

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private int port;

    public NioServer(int port) throws IOException {
        // 打开一个ServerSocketChannel
        serverSocketChannel = ServerSocketChannel.open();
        // 设置为非阻塞模式才能注册到Selector
        serverSocketChannel.configureBlocking(false);
        // 打开一个选择器
        selector = Selector.open();
        this.port = port;   
    }

    // 启动服务器的方法
    private void startServer() {
        try {
            serverSocketChannel.bind(new InetSocketAddress(port));
            // 注册该通道到选择器,注意兴趣操作是SelectionKey.OP_ACCEPT
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            selectLoop();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}

这里写图片描述

  1. 需要了解的是Channel需要设置为非阻塞模式才能注册到选择器

  2. Channel调用register()方法时需要指定兴趣操作,意思就是该选择器会监听这个通道有没有准备好可以执行的操作,兴趣操作有:SelectionKey.OP_ACCEPTSelectionKey.OP_READSelectionKey.OP_WRITESelectionKey.OP_CONNECT,分别对应的是ServerSocketChannelaccept()方法可以执行(不需阻塞),SocketChannelread()/write()方法可以执行(不需阻塞),以及SocketChannel内含的Socketconnect()方法可以调用(不需阻塞)。

如果不太了解NIO对应的操作模型,可以去参考我的上一篇博客:IO多路复用和NIO

1.2Accept组件

    private void acceptClient(SelectionKey selectionKey) throws IOException {
        // 与对端Socket建立连接
        SocketChannel socketChannel = serverSocketChannel.accept();

        if (socketChannel != null) {
            System.err.println("接收到一个连接,对端IP为:"+socketChannel.socket().getInetAddress());
        }
        // 将接收到的SocketChannel注册到Selector,注意此时通道要设置为非阻塞模式,且兴趣操作为OP_READ
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

使用和传统ServerSocketaccept()方法流程一致,需要注意的是,传统的accept()调用时会阻塞直到建立一个TCP连接,而使用Selector选择器可以避免阻塞,确保调用该方法时一定有一个(或多个)Socket连接已经在等待建立。

1.3SelectLoop(核心组件)

可以看到一个java.nio.channels.Selector可以注册多个通道,Selector可以监听注册到自身的通道的状态。

    private void selectLoop() throws IOException {
        while(true) {
            // select()方法会阻塞,直到有注册到该选择器的通道有兴趣事件准备完毕
            selector.select();
            // selectedKeys()方法会获得有兴趣事件发生的通道,注册得到的SelectionKey的集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while(iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // 循环判断其中的key
                if (selectionKey.isAcceptable()) {
                    // 如果key处于可接受状态,就进入接收函数
                    acceptClient(selectionKey);
                }else if(selectionKey.isReadable()) {
                    // 如果key处于可读状态,就进入读函数
                    readDate(selectionKey);
                }
            }
            // 每次处理完通道事件以后,要进行一次清空
            selectionKeys.clear();
        }
    }

可以看到,通过调用选择器的select()会不断的得到将要发生事件通道,只要是注册到该选择器的通道,都会被轮询一次,而我们通过while循环,可以做到单线程无阻塞I/O。

2.NIO通道读写(Buffer)

2.1读取通道内容

    private void readDate(SelectionKey selectionKey) throws IOException {

        // 每一次都先获取之前绑定在这个key上的buffer
        ByteBuffer oldBuffer = (ByteBuffer)selectionKey.attachment();

        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer newBuffer = ByteBuffer.allocate(64);

        int read;
        while((read = socketChannel.read(newBuffer))<=0) {
            return;
        }

        newBuffer.flip();
        // 读取Buffer,看是否有换行符
        String line = readLine(newBuffer);
        if (line != null) {

            // 如果这次读到了行结束符,就将原来不含有行结束符的buffer合并位一行
            String sendData = readLine(mergeBuffer(oldBuffer, newBuffer));
            if (readLineContent(sendData).equalsIgnoreCase("exit")) { // 如果这一行的内容是exit就断开连接
                socketChannel.close();
                return;
            }
            // 然后直接发送回到客户端
            ByteBuffer sendBuffer = ByteBuffer.wrap(sendData.getBytes("utf-8"));
            while (sendBuffer.hasRemaining()) {
                socketChannel.write(sendBuffer);
            }
            selectionKey.attach(null);
        }else {
            // 如果这次没读到行结束付,就将这次读的内容和原来的内容合并,并刷新绑定到key对象上
            selectionKey.attach(mergeBuffer(oldBuffer, newBuffer));
        }

    }

这里写图片描述

2.2Buffer处理辅助方法

/**
     * 读取ByteBuffer直到一行的末尾
     * 返回这一行的内容,包括换行符
     * 
     * @param buffer
     * @return String 读取到行末的内容,包括换行符 ; null 如果没有换行符
     * @throws UnsupportedEncodingException
     */
    private String readLine(ByteBuffer buffer) throws UnsupportedEncodingException {
        // windows中的换行符表示手段 "\r\n"
        // 基于windows的软件发送的换行符是会是CR和LF
        char CR = '\r';
        char LF = '\n';

        boolean crFound = false;
        int index = 0;
        int len = buffer.limit();
        buffer.rewind();
        while(index < len) {
            byte temp = buffer.get();
            if (temp == CR) {
                crFound = true;
            }
            if (crFound && temp == LF) {
                // Arrays.copyOf(srcArr,length)方法会返回一个 源数组中的长度到length位 的新数组
                return new String(Arrays.copyOf(buffer.array(), index+1),"utf-8");
            }
            index ++;
        }
        return null;
    }

    /**
     * 获取一行的内容,不包括换行符
     * @param buffer
     * @return String 行的内容
     * @throws UnsupportedEncodingException
     */
    private String readLineContent(String line) throws UnsupportedEncodingException {
        return line.substring(0, line.length() - 2);
    }

    /**
     * 对传入的Buffer进行拼接
     * @param oldBuffer
     * @param newBuffer
     * @return ByteBuffer 拼接后的Buffer
     */
    public static ByteBuffer mergeBuffer(ByteBuffer oldBuffer,ByteBuffer newBuffer) {
        // 如果原来的Buffer是null就直接返回
        if (oldBuffer == null) {
            return newBuffer;
        }
        // 如果原来的Buffer的剩余长度可容纳新的buffer则直接拼接
        newBuffer.rewind();
        if (oldBuffer.remaining() > (newBuffer.limit()-newBuffer.position())) {
            return oldBuffer.put(newBuffer);
        }

        // 如果不是以上两种情况就构建新的Buffer进行拼接
        int oldSize = oldBuffer != null?oldBuffer.limit():0;
        int newSize = newBuffer != null?newBuffer.limit():0;
        ByteBuffer result = ByteBuffer.allocate(oldSize+newSize);

        result.put(Arrays.copyOfRange(oldBuffer.array(), 0, oldSize));
        result.put(Arrays.copyOfRange(newBuffer.array(), 0, newSize));

        return result;
    }

这些代码是为了实现ECHO返回而实现的辅助方法,主要是进行Buffer的处理。

3.测试结果

这里写代码片
这里写图片描述
这里写图片描述

使用telnet进行连接测试,实现了ECHO服务器的功能,而且输入exit会关闭该连接。

4.完整代码

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;

public class NioServer {

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private int port;

    public NioServer(int port) throws IOException {
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        selector = Selector.open();
        this.port = port;   
    }

    private void selectLoop() throws IOException {
        while(true) {
            // select()方法会阻塞,直到有注册到该选择器的通道有兴趣事件发生
            selector.select();
            // selectedKeys()方法会获得有兴趣事件发生的通道,注册得到的SelectionKey的集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while(iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // 循环判断其中的key
                if (selectionKey.isAcceptable()) {
                    // 如果key处于可接受状态,就进入接收函数
                    acceptClient(selectionKey);
                }else if(selectionKey.isReadable()) {
                    // 如果key处于可读状态,就进入读函数
                    readDate(selectionKey);
                }
            }
            selectionKeys.clear();
        }
    }

    /**
     * 接收连接并将建立的通道注册到选择器
     * 
     * @param selectionKey
     * @throws IOException
     */
    private void acceptClient(SelectionKey selectionKey) throws IOException {
        // 与对端Socket建立连接
        SocketChannel socketChannel = serverSocketChannel.accept();

        if (socketChannel != null) {
            System.err.println("接收到一个连接,对端IP为:"+socketChannel.socket().getInetAddress());
        }
        // 将接收到的SocketChannel注册到Selector,注意此时通道要设置为非阻塞模式,且兴趣操作为OP_READ
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    private void readDate(SelectionKey selectionKey) throws IOException {

        // 每一次都先获取之前绑定在这个key上的buffer
        ByteBuffer oldBuffer = (ByteBuffer)selectionKey.attachment();

        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer newBuffer = ByteBuffer.allocate(64);

        int read;
        while((read = socketChannel.read(newBuffer))<=0) {
            return;
        }

        newBuffer.flip();
        String line = readLine(newBuffer);
        if (line != null) {

            // 如果这次读到了行结束符,就将原来不含有行结束符的buffer合并位一行
            String sendData = readLine(mergeBuffer(oldBuffer, newBuffer));
            if (readLineContent(sendData).equalsIgnoreCase("exit")) { // 如果这一行的内容是exit就断开连接
                socketChannel.close();
                return;
            }
            // 然后直接发送回到客户端
            ByteBuffer sendBuffer = ByteBuffer.wrap(sendData.getBytes("utf-8"));
            while (sendBuffer.hasRemaining()) {
                socketChannel.write(sendBuffer);
            }
            selectionKey.attach(null);
        }else {
            // 如果这次没读到行结束付,就将这次读的内容和原来的内容合并,并刷新绑定到key对象上
            selectionKey.attach(mergeBuffer(oldBuffer, newBuffer));
        }

    }

    /**
     * 读取ByteBuffer直到一行的末尾
     * 返回这一行的内容,包括换行符
     * 
     * @param buffer
     * @return String 读取到行末的内容,包括换行符 ; null 如果没有换行符
     * @throws UnsupportedEncodingException
     */
    private String readLine(ByteBuffer buffer) throws UnsupportedEncodingException {
        // windows中的换行符表示手段 "\r\n"
        // 基于windows的软件发送的换行符是会是CR和LF
        char CR = '\r';
        char LF = '\n';

        boolean crFound = false;
        int index = 0;
        int len = buffer.limit();
        buffer.rewind();
        while(index < len) {
            byte temp = buffer.get();
            if (temp == CR) {
                crFound = true;
            }
            if (crFound && temp == LF) {
                // Arrays.copyOf(srcArr,length)方法会返回一个 源数组中的长度到length位 的新数组
                return new String(Arrays.copyOf(buffer.array(), index+1),"utf-8");
            }
            index ++;
        }
        return null;
    }

    /**
     * 获取一行的内容,不包括换行符
     * @param buffer
     * @return String 行的内容
     * @throws UnsupportedEncodingException
     */
    private String readLineContent(String line) throws UnsupportedEncodingException {
        return line.substring(0, line.length() - 2);
    }

    /**
     * 对传入的Buffer进行拼接
     * @param oldBuffer
     * @param newBuffer
     * @return ByteBuffer 拼接后的Buffer
     */
    public static ByteBuffer mergeBuffer(ByteBuffer oldBuffer,ByteBuffer newBuffer) {
        // 如果原来的Buffer是null就直接返回
        if (oldBuffer == null) {
            return newBuffer;
        }
        // 如果原来的Buffer的剩余长度可容纳新的buffer则直接拼接
        newBuffer.rewind();
        if (oldBuffer.remaining() > (newBuffer.limit()-newBuffer.position())) {
            return oldBuffer.put(newBuffer);
        }

        // 如果不是以上两种情况就构建新的Buffer进行拼接
        int oldSize = oldBuffer != null?oldBuffer.limit():0;
        int newSize = newBuffer != null?newBuffer.limit():0;
        ByteBuffer result = ByteBuffer.allocate(oldSize+newSize);

        result.put(Arrays.copyOfRange(oldBuffer.array(), 0, oldSize));
        result.put(Arrays.copyOfRange(newBuffer.array(), 0, newSize));

        return result;
    }

    private void startServer() {
        try {
            serverSocketChannel.bind(new InetSocketAddress(port));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            selectLoop();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }


    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            new NioServer(12345).startServer();

        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

猜你喜欢

转载自blog.csdn.net/CringKong/article/details/80172531