【通信】NIO实现+源码解析

目录

1.简介

2.最简demo使用

2.1 网络线程模型图

2.2 demo代码

3 源码分析

3.1 Selector选择器

3.1.1 open()创建实例

3.1.2 select()遍历fd选择就绪

3.2 Channel

3.2.1 read读取到buffer

write写入

3.3 Buffer缓冲区

3.3.1 核心字段

3.3.2 demo测试

3.3.3 总结


1.简介

java NIO(Non-blocking IO,非阻塞IO),本质是多路复用,以此提高服务端的并发和吞吐量。 多路复用是一个选择器Selector可以监听多个通道Channel的多种事件,如下图所示。

   

核心概念包括 Selector(选择器)、 Channel(通道)、Buffer(缓冲区),如读、写、Accept事件等。下面我们通过一个最简单的TCP服务端实现demo讲解下NIO的这些组件一般用法。

Channels通道

与Socket等支持非阻塞IO的连接

Buffers缓冲

可由Channel通道直接读取或写入的类数组对象

Selectors

指示Channel通道有IO事件

SelectionKeys

维护IO事件状态和绑定

2.最简demo使用

用nio写个最简单的TCPServer,客户端可以连接服务端,以echo打头的指令可以响应回响,否则响应BAD_REQUEST,效果如图所示

2.1 网络线程模型图

为实现上面的服务效果,设计demo使用主线程接收客户端连接,用ThreadPoolExecutor线程池做异步IO处理。

  1. Acceptor线程(主线程)使用ServerSocketChannel监听8888端口,并将其accept事件注册到选择器上;
  2. 当客户端连接后(如上图的telnet连接),触发accept事件,服务通道接收连接获取对应的SocketChannel,并在其上注册RW读写事件,监听该通道的读写事件
  3. 当发生读写事件,读写线程Reactor线程(EchoServerTask进行相应的读写处理

2.2 demo代码

public class EchoServer {
    private static ExecutorService executor; // 用于异步读写的线程池
    static {
        executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
    }
    public static void main(String[] args) throws IOException {

        ServerSocketChannel ssc = ServerSocketChannel.open();// 静态工厂创建服务通道
        ssc.socket().bind(new InetSocketAddress(8888));
        ssc.configureBlocking(false); // !!一定要
        Selector selector = Selector.open(); // 静态工厂模式
        ssc.register(selector, ssc.validOps()); // 注册该服务通道的Accept操作

        while (true) { // 在选择器上轮询
            int readyCount = selector.select(1000);
            if(readyCount==0){
                continue;
            }
            Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 就绪的事件

            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()){
                SelectionKey selectionKey = keyIterator.next();
                if(selectionKey.isValid()){
                    // ServerSocketChannel有连接事件
                    if(selectionKey.isAcceptable()){
                        ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); // key与通道关联

                        SocketChannel socketChannel = server.accept(); // 接收连接,获得传输通道
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE); // 注册通道的读写事件
                    }
                    // SocketChannel数据通道可读
                    if(selectionKey.isReadable()){
                        executor.submit(new EchoServerTask(selectionKey));  // 异步处理读事件
                    }
                    keyIterator.remove();
                }
            }
        }
    }
}
public class EchoServerTask implements Runnable{
    private SelectionKey selectionKey;

    public EchoServerTask(SelectionKey selectionKey) {
        this.selectionKey = selectionKey;
    }

    @Override
    public void run() {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
        try {
            int count =0;
            while ((count = channel.read(byteBuffer)) > 0) {
                byteBuffer.flip();
                byte[] request=new byte[byteBuffer.remaining()];
                byteBuffer.get(request);
                String requestStr=new String(request);
                byteBuffer.clear();
                // 处理客户端请求
                if (requestStr.startsWith("echo")) {
                    // 放入缓冲区
                    byteBuffer.put(("response:" + requestStr).getBytes());
                    byteBuffer.flip(); //此处需要 做byteBuffer模式切换,否则会采用读模式
                    channel.write(byteBuffer);
                } else {
                    byteBuffer.put("BAD_REQUEST".getBytes());
                    byteBuffer.flip();
                    channel.write(byteBuffer);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
            selectionKey.cancel();
        }
    }
}

3 源码分析

3.1 Selector选择器

一般使用方法是在选择器上注册多个通道的多个事件,选择器可以监听多个事件,其select()是个阻塞方法,但当其任意一个事件发生都将唤醒返回。

  使用模式  
注册 ssc.register(selector, ssc.validOps()); 注册该服务通道的Accept操作
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);  注册通道的读写事件

选择

 while (true) { // 在选择器上轮询
            int readyCount = selector.select(1000);
            Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 就绪的事件

            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()){
                SelectionKey selectionKey = keyIterator.next();
                SelectableChannel channel = selectionKey.channel(); 

                .....do something with channel ......
            }
        }

轮询选择

select(timeout)阻塞等待

注册的事件任意一个发生,唤醒返回

处理事件

3.1.1 open()创建实例

selector实例一般使用DefaultSelectorProvider.create()根据底层操作系统版本选择不同的选择器实现,因为不同操作系统支持不同的多路复用,如linux 2.6以上才支持Epoll模型,一般使用poll模型。

//Selector
    public static Selector open() throws IOException { //静态工厂方法
        return SelectorProvider.provider().openSelector();
    }
//SelectorProvider
    public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();//
                            return provider;
                        }
                    });
        }
    }
//DefaultSelectorProvider
    public static SelectorProvider create() {
        String osname = AccessController
            .doPrivileged(new GetPropertyAction("os.name"));
        if (osname.equals("SunOS"))
            return createProvider("sun.nio.ch.DevPollSelectorProvider");
        if (osname.equals("Linux"))
            return createProvider("sun.nio.ch.EPollSelectorProvider");
        return new sun.nio.ch.PollSelectorProvider();
    }
// PollSelectorProvider
public AbstractSelector openSelector() throws IOException {
    return new PollSelectorImpl(this);
}

3.1.2 select()遍历fd选择就绪

最终调用Native方法,对JNI有兴趣的可以参考JNI讲解,最终调用到PollArrayWrapper#poll0(long pollAddress, int numfds, long timeout),启动调用到jdk中动态连接库中的PollArrayWrapper.c的Java_sun_nio_ch_PollArrayWrapper_poll0方法,方法实现如下:

JNIEXPORT jint JNICALL
Java_sun_nio_ch_PollArrayWrapper_poll0(JNIEnv *env, jobject this,
                                       jlong address, jint numfds,
                                       jlong timeout)
{
    struct pollfd *a;
    int err = 0;
    a = (struct pollfd *) jlong_to_ptr(address);
    if (timeout <= 0) {           /* Indefinite or no wait */
        RESTARTABLE (poll(a, numfds, timeout), err);
    } else {                     /* Bounded wait; bounded restarts */
        err = ipoll(a, numfds, timeout);
    }
    if (err < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "Poll failed");
    }
    return (jint)err;
}

其中<poll.h>的poll()方法其实是调用操作系统的接口,对描述符进行遍历,查看是否有描述符就绪,如果有就返回就绪文件描述符的个数。

3.2 Channel

如果是读写事件的话,选择器触发唤醒线程去处理事件,一般的处理就是从通道channel中读数据到缓冲Buffer去,或者从缓冲区写入到通道中。

使用模式 图解
count =channel.read(byteBuffer)
byteBuffer.flip();
channel.write(byteBuffer);

3.2.1 read读取到buffer

IOUtil读取和src/solaris/native/sun/nio/ch/FileDispatcherImpl.c实现如下,映射文件描述符,从文件描述符去读取内容到buf所在内存地址

// IOUtil
    static int read(FileDescriptor fd, ByteBuffer dst, long position,
                    NativeDispatcher nd)
        throws IOException
    {
        if (dst.isReadOnly())
            throw new IllegalArgumentException("Read-only buffer");
        if (dst instanceof DirectBuffer)
            return readIntoNativeBuffer(fd, dst, position, nd);

        // Substitute a native buffer
        ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
        try {
            int n = readIntoNativeBuffer(fd, bb, position, nd); //从buf的potition处开始存放
            bb.flip();
            if (n > 0)
                dst.put(bb);
            return n;
        } finally {
            Util.offerFirstTemporaryDirectBuffer(bb);
        }
    }
    private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                            long position, NativeDispatcher nd)
        throws IOException
    {
        int pos = bb.position();
        int lim = bb.limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);

        if (rem == 0)
            return 0;
        int n = 0;
        if (position != -1) {
            n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
                         rem, position);
        } else {
            n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem); // 计算内存地址,加上了position
        }
        if (n > 0)
            bb.position(pos + n);
        return n;
    }
// C实现
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo,
                            jlong address, jint len, jlong offset)
{
    jint fd = fdval(env, fdo);
    void *buf = (void *)jlong_to_ptr(address);

    return convertReturnVal(env, pread64(fd, buf, len, offset), JNI_TRUE);
}

write写入

写入也是类似,都是调用IOUtil

3.3 Buffer缓冲区

程序读写数据时,需要有个缓冲区取放 于通道交互的数据。

3.3.1 核心字段

字段\模式

0 <= mark <= position <= limit <= capacity

  写模式 读模式

position

下一个可写的位置 下一个可读的位置
limit 可写的最大位置limit==capacity 可读的最大位置,后面未写入的不能读取

capacity

缓冲区容量,数组的长度

3.3.2 demo测试

    @Test
    public void testByteBuffer() throws IOException {
        ByteBuffer buffer=ByteBuffer.allocate(1024);
        buffer.order(ByteOrder.LITTLE_ENDIAN);
        System.out.println(buffer); // [pos=0 lim=1024 cap=1024]
        // 写入5个字节
        buffer.put((byte) 0).put((byte) 1).put((byte)2).put((byte)3).put((byte)4);
        System.out.println(buffer); // [pos=5 lim=1024 cap=1024]
        // 直接获取字节数组array()
        InputStream in=new ByteArrayInputStream(buffer.array(),0,buffer.position());
        int c ;
        while((c = in.read()) >= 0) {
            System.out.println((byte)c);
        }
        // flip 转换 读模式 读取刚刚写入的数据
        buffer.flip();
        System.out.println(buffer);// [pos=0 lim=5 cap=1024]
        // get 读取,并记录读取进度
        while (buffer.hasRemaining()){
            byte b = buffer.get();
            System.out.println(b);
        }
        System.out.println(buffer);//[pos=5 lim=5 cap=1024]
    }

3.3.3 总结

如果读写共用一个buffer,一定在是读后、写后flip切换模式

发布了92 篇原创文章 · 获赞 14 · 访问量 5822

猜你喜欢

转载自blog.csdn.net/sarafina527/article/details/103616294