NIO、AIO看这个就OK啦!!!

Select选择器

多路复用的概念

是指多个服务器去监听多个端口,如果多路不复用,则每个服务器都需要开启一个线程,在高并发下性能比较差
所以,我们可以把多个服务器注册到一个Select选择器上,只需要开启一个线程就可以处理这些服务器

选择器Selector

Selector称为选择器或多路复用器,可以让很多Channel注册其中,然后监听各个Channel发生的事件
Selector的创建:

Selector selector=Selector.open();

将我们要交给选择器的通道注册其中:

channel.configureBlocking(false); (channel是一个**非阻塞通道**)
SelectionKey key =channel.register(selector,SelectionKey.OP_ACCEPT);

参数1、该通道注册到选择器的对象
参数2、表示选择器对何种事件感兴趣,服务器通道只能写SelectionKey.OP_ACCEPT,表示有客户端连接

public class SelectDemo {
    public static void main(String[] args) throws IOException {
        //多路(多个服务器监听多个端口)【通道默认是阻塞的】
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //【通道必须是非阻塞的,所以应该设置为非阻塞】
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(7777));

        ServerSocketChannel serverSocketChanne2 = ServerSocketChannel.open();
        serverSocketChanne2.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8888));

        ServerSocketChannel serverSocketChanne3 = ServerSocketChannel.open();
        serverSocketChanne3.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(9999));
        //获取选择器对象
        Selector selector = Selector.open();
        //将多个通道对象注册其中
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChanne2.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChanne3.register(selector, SelectionKey.OP_ACCEPT);
    }
}

Selector中常用的方法

获取所有已经成功注册到当前选择器上的通道集合

public Set<SelectionKey> keys();

获取所有已经有客户端连接的通道集合

public Set<SelectionKey> Selectedkeys();

如果目前没有客户端连接,则该方法会阻塞;如果有客户端连接,则会返回本次连接的客户端数量

public int selsct()

Selector实现多路连接(上)

//客户端
public class SocketDemo {
    public static void main(String[] args) {
        new Thread(() -> {
            try (SocketChannel socket = SocketChannel.open()) {
                System.out.println("7777客户端连接服务器......");
                socket.connect(new InetSocketAddress("192.168.0.104", 7777));
                System.out.println("7777客户端连接成功....");
            } catch (IOException e) {
                System.out.println("7777异常重连");
            }
        }).start();
        new Thread(() -> {
            try (SocketChannel socket = SocketChannel.open()) {
                System.out.println("8888客户端连接服务器......");
                socket.connect(new InetSocketAddress("192.168.0.104", 8888));
                System.out.println("8888客户端连接成功....");
            } catch (IOException e) {
                System.out.println("8888异常重连");
            }
        }).start();
        new Thread(() -> {
            try (SocketChannel socket = SocketChannel.open()) {
                System.out.println("9999客户端连接服务器......");
                socket.connect(new InetSocketAddress("192.168.0.104", 9999));
                System.out.println("9999客户端连接成功....");
            } catch (IOException e) {
                System.out.println("9999异常重连");
            }
        }).start();
    }
}
public class SelectDemo01 {
    public static void main(String[] args) throws IOException {
        //多路(多个服务器监听多个端口)【通道默认是阻塞的】
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //【通道必须是非阻塞的,所以应该设置为非阻塞】
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(7777));

        ServerSocketChannel serverSocketChanne2 = ServerSocketChannel.open();
        serverSocketChanne2.configureBlocking(false);
        serverSocketChanne2.bind(new InetSocketAddress(8888));

        ServerSocketChannel serverSocketChanne3 = ServerSocketChannel.open();
        serverSocketChanne3.configureBlocking(false);
        serverSocketChanne3.bind(new InetSocketAddress(9999));
        //获取选择器对象
        Selector selector = Selector.open();
        //将多个通道对象注册其中
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChanne2.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChanne3.register(selector, SelectionKey.OP_ACCEPT);
        //接受客户端连接
        Set<SelectionKey> keys = selector.keys();
        System.out.println("注册通道的数量:" + keys.size());
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        System.out.println("已连接通道的数量:"+selectionKeys.size());
        int select = selector.select();//【此方法会阻塞】
        System.out.println("本次连接数量:"+select);
    }
}

Selector实现多路连接(下)

public class SelectDemo01 {
    public static void main(String[] args) throws IOException, InterruptedException {
        //多路(多个服务器监听多个端口)【通道默认是阻塞的】
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //【通道必须是非阻塞的,所以应该设置为非阻塞】
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(7777));

        ServerSocketChannel serverSocketChanne2 = ServerSocketChannel.open();
        serverSocketChanne2.configureBlocking(false);
        serverSocketChanne2.bind(new InetSocketAddress(8888));

        ServerSocketChannel serverSocketChanne3 = ServerSocketChannel.open();
        serverSocketChanne3.configureBlocking(false);
        serverSocketChanne3.bind(new InetSocketAddress(9999));
        //获取选择器对象
        Selector selector = Selector.open();
        //将多个通道对象注册其中
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChanne2.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChanne3.register(selector, SelectionKey.OP_ACCEPT);
        //接受客户端连接
        while (true) {
            Set<SelectionKey> keys = selector.keys();
            System.out.println("注册通道的数量:" + keys.size());
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            System.out.println("已连接通道的数量:" + selectionKeys.size());
            int select = selector.select();//【此方法会阻塞】
            System.out.println("本次连接数量:" + select);

            //遍历已连接通道的集合
            Iterator<SelectionKey> it = selectionKeys.iterator();
            while (it.hasNext()) {
                //获取当前连接通道的SelectionKey
                SelectionKey key = it.next();
                // 从SelectionKey中获取通道对象
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                // 看一下此通道是监听哪个端口的
                System.out.println("监听端口:" + channel.getLocalAddress());
                //取出连接到该服务器的连接通道
                SocketChannel accept = channel.accept();
                System.out.println(accept);
                //从连接的通道中把已经处理过的服务器通道移除
                it.remove();
            }
            System.out.println("休息1秒......");
            Thread.sleep(1000);
            System.out.println();//打印一个空行 }
        }
    }
}
//客户端
public class SocketDemo {
    public static void main(String[] args) {
        new Thread(() -> {
            try (SocketChannel socket = SocketChannel.open()) {
                System.out.println("7777客户端连接服务器......");
                socket.connect(new InetSocketAddress("192.168.0.104", 7777));
                System.out.println("7777客户端连接成功....");
            } catch (IOException e) {
                System.out.println("7777异常重连");
            }
        }).start();
        new Thread(() -> {
            try (SocketChannel socket = SocketChannel.open()) {
                System.out.println("8888客户端连接服务器......");
                socket.connect(new InetSocketAddress("192.168.0.104", 8888));
                System.out.println("8888客户端连接成功....");
            } catch (IOException e) {
                System.out.println("8888异常重连");
            }
        }).start();
        new Thread(() -> {
            try (SocketChannel socket = SocketChannel.open()) {
                System.out.println("9999客户端连接服务器......");
                socket.connect(new InetSocketAddress("192.168.0.104", 9999));
                System.out.println("9999客户端连接成功....");
            } catch (IOException e) {
                System.out.println("9999异常重连");
            }
        }).start();
    }
}

Selector实现多路信息接收测试

public class SelectDemo01 {
    public static void main(String[] args) throws IOException, InterruptedException {
        //多路(多个服务器监听多个端口)【通道默认是阻塞的】
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //【通道必须是非阻塞的,所以应该设置为非阻塞】
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(7777));

        ServerSocketChannel serverSocketChanne2 = ServerSocketChannel.open();
        serverSocketChanne2.configureBlocking(false);
        serverSocketChanne2.bind(new InetSocketAddress(8888));

        ServerSocketChannel serverSocketChanne3 = ServerSocketChannel.open();
        serverSocketChanne3.configureBlocking(false);
        serverSocketChanne3.bind(new InetSocketAddress(9999));
        //获取选择器对象
        Selector selector = Selector.open();
        //将多个通道对象注册其中
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChanne2.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChanne3.register(selector, SelectionKey.OP_ACCEPT);
        //接受客户端连接
        while (true) {
            Set<SelectionKey> keys = selector.keys();
            System.out.println("注册通道的数量:" + keys.size());
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            System.out.println("已连接通道的数量:" + selectionKeys.size());
            int select = selector.select();//【此方法会阻塞】
            System.out.println("本次连接数量:" + select);
            /* 处理被连接的服务器通道*/
            //遍历已连接通道的集合
            Iterator<SelectionKey> it = selectionKeys.iterator();
            while (it.hasNext()) {
                //获取当前连接通道的SelectionKey
                SelectionKey key = it.next();
                // 从SelectionKey中获取通道对象
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                // 看一下此通道是监听哪个端口的
                System.out.println("监听端口:" + channel.getLocalAddress());
                //取出连接到该服务器的连接通道
                SocketChannel accept = channel.accept();
                //System.out.println(accept);
                System.out.println("写与客户端交互代码");
                /**接受客户端发送过来的信息*/
                ByteBuffer inBuf = ByteBuffer.allocate(100);
                accept.read(inBuf);
                inBuf.flip();
                String msg = new String(inBuf.array(), 0, inBuf.limit());
                System.out.println("【服务器】接收到通道【" + channel.getLocalAddress() + "】的信息:" + msg);
                //从连接的通道中把已经处理过的服务器通道移除
                it.remove();
            }
            System.out.println("休息1秒......");
            Thread.sleep(1000);
            System.out.println();//打印一个空行 }
        }
    }
}
//客户端
public class SocketDemo {
    public static void main(String[] args) {
        new Thread(() -> {
            try (SocketChannel socket = SocketChannel.open()) {
                System.out.println("7777客户端连接服务器......");
                socket.connect(new InetSocketAddress("192.168.0.104", 7777));
                System.out.println("7777客户端连接成功....");
                /*发送数据*/
                ByteBuffer outBuf = ByteBuffer.allocate(100);
                outBuf.put("我是客户端,连接7777端口".getBytes());
                outBuf.flip();
                socket.write(outBuf);
            } catch (IOException e) {
                System.out.println("7777异常重连");
            }
        }).start();
        new Thread(() -> {
            try (SocketChannel socket = SocketChannel.open()) {
                System.out.println("8888客户端连接服务器......");
                socket.connect(new InetSocketAddress("192.168.0.104", 8888));
                System.out.println("8888客户端连接成功....");
                /*发送数据*/
                ByteBuffer outBuf = ByteBuffer.allocate(100);
                outBuf.put("我是客户端,连接8888端口".getBytes());
                outBuf.flip();
                socket.write(outBuf);
            } catch (IOException e) {
                System.out.println("8888异常重连");
            }
        }).start();
        new Thread(() -> {
            try (SocketChannel socket = SocketChannel.open()) {
                System.out.println("9999客户端连接服务器......");
                socket.connect(new InetSocketAddress("192.168.0.104", 9999));
                System.out.println("9999客户端连接成功....");
                /*发送数据*/
                ByteBuffer outBuf = ByteBuffer.allocate(100);
                outBuf.put("我是客户端,连接9999端口".getBytes());
                outBuf.flip();
                socket.write(outBuf);
            } catch (IOException e) {
                System.out.println("9999异常重连");
            }
        }).start();
    }
}

AIO(异步非阻塞)

AIO概述和分类

四个异步通道:
AsynchronousSocketChannel //异步的客户端通道
AsynchronousServerSocketChannel //异步的服务器端通道
AsynchronousFileChannel //异步的文件通道
AsynchronousDatagramChannel //异步的UDP通道
表现在两个方面:
1、连接时,可以使用异步,在调用连接的方法时,非阻塞,连接成功之后会以方法回调的机制通知我们。
2、读取数据时,可以使用异步,在使用read方法时,非阻塞,等数据接收到之后以方法回调的机制通知我们。

AIO异步非阻塞连接的建立

异步的服务器端通道

//AIO下的异步的服务器端通道
public class AIOServerSocketChanneldemo {
    public static void main(String[] args) throws IOException {
        //创建AIO下的异步的服务器端通道
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
        //绑定端口号
        serverSocketChannel.bind(new InetSocketAddress(8888));
        //接受异步客户端,采用异步非阻塞式
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel result, Object attachment) {
                System.out.println("客户端连接成功");
                try {
                    System.out.println(result.getLocalAddress());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("客户端连接失败");

            }
        });
        System.out.println("继续执行");
        while (true) {
        }
    }
}
//AIO下的异步的客户端通道
public class AIOSocketChannel {
    public static void main(String[] args) throws IOException {
        //创建异步的客户端通道
        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
        //连接服务器,采用异步非阻塞方式
        //connect(服务器ip和端口号、附件(null)、接口)
        socketChannel.connect(new InetSocketAddress("192.168.0.104", 8888), null, new CompletionHandler<Void, Object>() {
            //连接成功时的回调函数
            @Override
            public void completed(Void result, Object attachment) {
                System.out.println("连接服务器成功");
            }

            //连接失败时的回调函数
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("连接服务器失败");
            }
        });
        System.out.println("程序继续执行");
        while (true) {
        }
    }
}

AIO同步读写数据

//AIO下的异步的客户端通道
public class AIOSocketChannel {
    public static void main(String[] args) throws IOException {
        //创建异步的客户端通道
        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
        //连接服务器,采用异步非阻塞方式
        //connect(服务器ip和端口号、附件(null)、接口)
        socketChannel.connect(new InetSocketAddress("192.168.0.104", 8888), null, new CompletionHandler<Void, Object>() {
            //连接成功时的回调函数
            @Override
            public void completed(Void result, Object attachment) {
                System.out.println("连接服务器成功");
                //给服务器发送数据
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                buffer.put("你好,我是客户端".getBytes());
                //切换读写模式
                buffer.flip();
                socketChannel.write(buffer);
                //释放资源
                try {
                    socketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }

            //连接失败时的回调函数
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("连接服务器失败");
            }
        });
        System.out.println("程序继续执行");
        while (true) {
        }
    }
}
//AIO下的异步的服务器端通道
public class AIOServerSocketChanneldemo {
    public static void main(String[] args) throws IOException {
        //创建AIO下的异步的服务器端通道
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
        //绑定端口号
        serverSocketChannel.bind(new InetSocketAddress(8888));
        //接受异步客户端,采用异步非阻塞式
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
                System.out.println("客户端连接成功");
                //读客户端发送的数据
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                Future<Integer> future = socketChannel.read(buffer);
                //buffer.flip();//如果采用异步连接,读取数据后不需要调用flip
                try {
                    System.out.println(new String(buffer.array(), 0, future.get()));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("客户端连接失败");

            }
        });
        System.out.println("继续执行");
        while (true) {
        }
    }
}

AIO异步读写数据

//AIO下的异步非阻塞的服务器端通道
public class AIOServerSocketChanneldemo {
    public static void main(String[] args) throws IOException, InterruptedException {
        //创建AIO下的异步的服务器端通道
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
        //绑定端口号
        serverSocketChannel.bind(new InetSocketAddress(8888));
        //接收异步客户端,采用异步非阻塞式
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
                System.out.println("客户端连接成功");
                //从客户端中读取数据
                //异步的read(缓冲区,超时时间,时间单位,附件null,回调接口)
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //new CompletionHandler是匿名内部类【重写匿名内部类中的两个方法即可】
                socketChannel.read(buffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
                    @Override
                    public void completed(Integer result, Object attachment) {
                        System.out.println("接收到的数据是" + new String(buffer.array(), 0, result));
                        //释放资源
                        try {
                            serverSocketChannel.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                    //数据接收失败
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("数据接收失败");
                    }
                });
            }

            //客户端连接失败
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("客户端连接失败");

            }
        });
        System.out.println("程序继续执行");
        while (true) {
            Thread.sleep(500);
        }
    }
}
//AIO下的异步非阻塞的客户端通道
public class AIOSocketChannel {
    public static void main(String[] args) throws IOException, InterruptedException {
        //创建异步的客户端通道
        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
        //连接服务器,采用异步非阻塞方式
        //connect(服务器ip和端口号、附件(null)、接口)
        socketChannel.connect(new InetSocketAddress("192.168.0.104", 8888), null, new CompletionHandler<Void, Object>() {
            //连接成功时的回调函数
            @Override
            public void completed(Void result, Object attachment) {
                System.out.println("连接服务器成功");
                //给服务器发送数据【创建字节缓冲区】
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                buffer.put("你好,我是客户端".getBytes());
                //切换读写模式
                buffer.flip();
                //异步的write(缓冲区,超时时间,时间单位,附件null,回调接口)
                socketChannel.write(buffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
                    @Override
                    public void completed(Integer result, Object attachment) {
                        System.out.println("数据发送成功");
                        //释放资源
                        try {
                            socketChannel.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void failed(Throwable exc, Object attachment) {

                        System.out.println("数据发送失败");
                    }
                });

            }

            //连接失败时的回调函数
            @Override
            public void failed(Throwable exc, Object attachment) {

                System.out.println("连接服务器失败");
            }
        });
        System.out.println("程序继续执行");
        while (true) {
            Thread.sleep(500);
        }
    }
}

小结

世上无难事,只要肯登攀。

发布了34 篇原创文章 · 获赞 9 · 访问量 1279

猜你喜欢

转载自blog.csdn.net/weixin_43616750/article/details/105133730