Selector(选择器)
- 选择器 Selector 是 NIO 中的重要技术之一。它与 SelectableChannel 联合使用实现了非阻塞的多路复用。使用它可以节省 CPU 资源,提高程序的运行效率;
1. 多路复用的概念
- "多路":服务器端同时监听多个“端口”的情况。每个端口都要监听多个客户端的连接。
- 服务器端的非多路复用效果:
- 如果不使用“多路复用”,服务器端需要开很多线程处理每个端口的请求。如果在高并发环境下,造成系统性能下降;
- 服务器端的多路复用效果:
- 使用了多路复用,只需要一个线程就可以处理多个通道,降低内存占用率,减少CPU切换时间,在高并发、高频段业务环境下有非常重要的优势;
2. 选择器 Selector
- Selector 被称为选择器,也被称为多路复用器,它可以注册到很多个 Channel 上,监听各个 Channel 上发生的事件,并且能够根据事件情况决定 Channel 读写。这样,通过一个线程管理多个 Channel,就可以处理大量网络连接了;
- 有了 Selector,我们就可以利用一个线程来处理所有的 Channels。线程之间的切换对操作系统来说代价是很高的,并且每个线程也会占用一定的系统资源。所以,对系统来说使用的线程越少越好;
a. 创建一个 Selector
- Selector 就是您注册对各种 I/O 事件兴趣的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件;
Selector selector = Selector.open();
b. 注册 Channel 到 Selector
- 为了能让 Channel 和 Selector 配合使用,我们需要把 Channel 注册到 Selector 上。通过调用 channel.register() 方法来实现注册:
channel.configureBlocking(false);
SelectionKey key =channel.register(selector,SelectionKey.OP_READ);
- 注意,注册的 Channel 必须设置成异步模式才可以,否则异步 IO 就无法工作,这就意味着我们不能把一个 FileChannel 注册到 Selector,因为 FileChannel 没有异步模式,但是网络编程中的 SocketChannel 是可以的;
- register() 方法的第二个参数:是一个 int 值,意思是在通过 Selector 监听 Channel 时对什么事件感兴趣。可以监听四种不同类型的事件,而且可以使用 SelectionKey 的四个常量表示:
- 连接就绪–常量:SelectionKey.OP_CONNECT
- 接收就绪–常量:SelectionKey.OP_ACCEPT(ServerSocketChannel 在注册时只能使用此项)
- 读就绪–常量:SelectionKey.OP_READ
- 写就绪–常量:SelectionKey.OP_WRITE
- 注意:对于 ServerSocketChannel 在注册时,只能使用 OP_ACCEPT,否则抛出异常;
c. 示例:服务器创建 3 个通道和 3 个端口
- 下面的例子,服务器创建 3 个通道,同时监听 3 个端口,并将 3 个通道注册到一个选择器中:
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
public class Server {
public static void main(String[] args) throws Exception {
ServerSocketChannel channelA = ServerSocketChannel.open();
channelA.configureBlocking(false);
channelA.bind(new InetSocketAddress(7777));
ServerSocketChannel channelB = ServerSocketChannel.open();
channelB.configureBlocking(false);
channelB.bind(new InetSocketAddress(8888));
ServerSocketChannel channelC = ServerSocketChannel.open();
channelC.configureBlocking(false);
channelC.bind(new InetSocketAddress(9999));
Selector selector = Selector.open();
channelA.register(selector, SelectionKey.OP_ACCEPT);
channelB.register(selector, SelectionKey.OP_ACCEPT);
channelC.register(selector, SelectionKey.OP_ACCEPT);
}
}
- 接下来,就可以通过选择器 selector 操作三个通道了;
3. 多路连接
- Selector 的 keys() 方法:返回一个
Set<SelectionKey>
集合,表示:已注册通道的集合。每个已注册通道封装为一个 SelectionKey 对象;
- Selector 的 selectedKeys() 方法:返回一个
Set<SelectionKey>
集合,表示:当前已连接的通道的集合。每个已连接通道同一封装为一个 SelectionKey 对象;
- Selector 的 select() 方法:此方法会阻塞,直到有至少 1 个客户端连接。此方法会返回一个 int 值,表示有几个客户端连接了服务器;
a. 示例:使服务器端更好地接收客户端信息
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Set;
public class Server {
public static void main(String[] args) throws Exception {
ServerSocketChannel channelA = ServerSocketChannel.open();
channelA.configureBlocking(false);
channelA.bind(new InetSocketAddress(7777));
ServerSocketChannel channelB = ServerSocketChannel.open();
channelB.configureBlocking(false);
channelB.bind(new InetSocketAddress(8888));
ServerSocketChannel channelC = ServerSocketChannel.open();
channelC.configureBlocking(false);
channelC.bind(new InetSocketAddress(9999));
Selector selector = Selector.open();
channelA.register(selector, SelectionKey.OP_ACCEPT);
channelB.register(selector, SelectionKey.OP_ACCEPT);
channelC.register(selector, SelectionKey.OP_ACCEPT);
Set<SelectionKey> keys = selector.keys();
System.out.println("注册通道数量:" + keys.size());
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("已连接的通道数量:" + selectionKeys.size());
System.out.println("----------------------------------------------");
System.out.println("【服务器】等待连接......");
int selectedCount = selector.select();
System.out.println("连接数量:" + selectedCount);
System.out.println("----------------------------------------------");
Set<SelectionKey> keys1 = selector.keys();
System.out.println("注册通道数量:" + keys1.size());
Set<SelectionKey> selectionKeys1 = selector.selectedKeys();
System.out.println("已连接的通道数量:" + selectionKeys1.size());
}
}
- 客户端:启动两个线程,模拟两个客户端,同时连接服务器的 7777 和 8888 端口;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
public class Client {
public static void main(String[] args) {
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("7777客户端连接服务器......");
socket.connect(new InetSocketAddress("localhost", 7777));
System.out.println("7777客户端连接成功....");
} catch (IOException e) {
System.out.println("7777异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("8888客户端连接服务器......");
socket.connect(new InetSocketAddress("localhost", 8888));
System.out.println("8888客户端连接成功....");
} catch (IOException e) {
System.out.println("8888异常重连");
}
}).start();
}
}
- 服务器端输出结果:
- 客户端输出结果:
- 在服务器端加入循环,确保接收到每个通道的连接;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Set;
public class Server {
public static void main(String[] args) throws Exception {
ServerSocketChannel channelA = ServerSocketChannel.open();
channelA.configureBlocking(false);
channelA.bind(new InetSocketAddress(7777));
ServerSocketChannel channelB = ServerSocketChannel.open();
channelB.configureBlocking(false);
channelB.bind(new InetSocketAddress(8888));
ServerSocketChannel channelC = ServerSocketChannel.open();
channelC.configureBlocking(false);
channelC.bind(new InetSocketAddress(9999));
Selector selector = Selector.open();
channelA.register(selector, SelectionKey.OP_ACCEPT);
channelB.register(selector, SelectionKey.OP_ACCEPT);
channelC.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("等待连接......");
int selectedCount = selector.select();
System.out.println("连接数量:" + selectedCount);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("集合大小:" + selectionKeys.size());
System.out.println("休息1秒......");
Thread.sleep(1000);
System.out.println();
}
}
}
- 服务器端输出结果:
- 注意:此例会有一个问题——服务器端第一次 select() 会阻塞,获取到一次连接后再次循环时,select() 将不会再阻塞,从而造成死循环,所以这里加了一个 sleep(),这个在后边解决;
- 接下来,我们获取"已连接通道"的集合,并遍历:
- 服务器端:
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Set;
public class Server {
public static void main(String[] args) throws Exception {
ServerSocketChannel channelA = ServerSocketChannel.open();
channelA.configureBlocking(false);
channelA.bind(new InetSocketAddress(7777));
ServerSocketChannel channelB = ServerSocketChannel.open();
channelB.configureBlocking(false);
channelB.bind(new InetSocketAddress(8888));
ServerSocketChannel channelC = ServerSocketChannel.open();
channelC.configureBlocking(false);
channelC.bind(new InetSocketAddress(9999));
Selector selector = Selector.open();
channelA.register(selector, SelectionKey.OP_ACCEPT);
channelB.register(selector, SelectionKey.OP_ACCEPT);
channelC.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
System.out.println("等待连接......");
int selectedCount = selector.select();
System.out.println("连接数量:" + selectedCount);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("集合大小:" + selectionKeys.size());
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
System.out.println("监听端口:" + channel.getLocalAddress());
}
System.out.println("休息1秒......");
Thread.sleep(1000);
System.out.println();
}
}
}
- 服务器端输出结果:
- 关于 SelectionKey:
- 当一个"通道"注册到选择器 Selector 后,选择器 Selector 内部就创建一个 SelectionKey 对象,里面封装了这个通道和这个选择器的映射关系;
- 通过 SelectionKey 的 channel() 方法,可以获取它内部的通道对象;
- 解决 select() 不阻塞,导致服务器端死循环的问题:
- 原因:在将"通道"注册到"选择器 Selector"时,指定了关注的事件 SelectionKey.OP_ACCEPT,而获取到管道对象后,并没有处理这个事件,所以导致 select() 方法一直循环;
- 解决:处理 SelectionKey.OP_ACCEPT 事件;
- 更改服务器端代码:
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Server {
public static void main(String[] args) throws Exception {
ServerSocketChannel channelA = ServerSocketChannel.open();
channelA.configureBlocking(false);
channelA.bind(new InetSocketAddress(7777));
ServerSocketChannel channelB = ServerSocketChannel.open();
channelB.configureBlocking(false);
channelB.bind(new InetSocketAddress(8888));
ServerSocketChannel channelC = ServerSocketChannel.open();
channelC.configureBlocking(false);
channelC.bind(new InetSocketAddress(9999));
Selector selector = Selector.open();
channelA.register(selector, SelectionKey.OP_ACCEPT);
channelB.register(selector, SelectionKey.OP_ACCEPT);
channelC.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("等待连接......");
int selectedCount = selector.select();
System.out.println("连接数量:" + selectedCount);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("集合大小:" + selectionKeys.size());
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
System.out.println("监听端口:" + channel.getLocalAddress());
SocketChannel accept = channel.accept();
}
}
}
}
- 现在的服务器端可以很好的接收客户端连接了,但还有一个小问题,在接下来的互发信息的例子中我们可以看到这个问题并解决它;
4. 多路信息接收
a. 服务器端
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Server {
public static void main(String[] args) throws Exception {
ServerSocketChannel serverChannel1 = ServerSocketChannel.open();
serverChannel1.bind(new InetSocketAddress(7777));
serverChannel1.configureBlocking(false);
ServerSocketChannel serverChannel2 = ServerSocketChannel.open();
serverChannel2.bind(new InetSocketAddress(8888));
serverChannel2.configureBlocking(false);
ServerSocketChannel serverChannel3 = ServerSocketChannel.open();
serverChannel3.bind(new InetSocketAddress(9999));
serverChannel3.configureBlocking(false);
Selector selector = Selector.open();
SelectionKey key1 = serverChannel1.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey key2 = serverChannel2.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey key3 = serverChannel3.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("等待客户端连接...");
int keyCount = selector.select();
System.out.println("连接数量:" + keyCount);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey nextKey = it.next();
System.out.println("获取通道...");
ServerSocketChannel channel = (ServerSocketChannel) nextKey.channel();
System.out.println("等待【" + channel.getLocalAddress() + "】通道数据...");
SocketChannel socketChannel = channel.accept();
ByteBuffer inBuf = ByteBuffer.allocate(100);
socketChannel.read(inBuf);
inBuf.flip();
String msg = new String(inBuf.array(), 0, inBuf.limit());
System.out.println("【服务器】接收到通道【" + channel.getLocalAddress() + "】的信息:" + msg);
}
}
}
}
b. 客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class Client {
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("7777客户端连接服务器......");
socket.connect(new InetSocketAddress("localhost", 7777));
System.out.println("7777客户端连接成功....");
ByteBuffer outBuf = ByteBuffer.allocate(100);
outBuf.put("我是客户端,连接7777端口".getBytes());
outBuf.flip();
socket.write(outBuf);
} catch (IOException e) {
System.out.println("7777异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("8888客户端连接服务器......");
socket.connect(new InetSocketAddress("localhost", 8888));
System.out.println("8888客户端连接成功....");
ByteBuffer outBuf = ByteBuffer.allocate(100);
outBuf.put("我是客户端,连接8888端口".getBytes());
outBuf.flip();
socket.write(outBuf);
} catch (IOException e) {
System.out.println("8888异常重连");
}
}).start();
}
}
c. 结果
- 服务器端输出:
- 客户端输出:
d. Debug
- 可以看到,出现了异常,为什么会这样?
- 问题就出现在获取 selectedKeys() 的集合;
- 第一次的 7777 连接,selectedKeys() 获取的集合中只有一个 SelectionKey 对象;
- 第二次的 8888 连接,selectedKeys() 获取的集合中有 2 个 SelectionKey 对象,一个是连接 7777 客户端的,另一个是连接 8888 客户端的。而此时应该只处理连接 8888 客户端的,所以在上一次处理完 7777 的数据后,应该将其 SelectionKey 对象移除;
- 更改服务器端代码:
public class Server {
public static void main(String[] args) throws Exception {
ServerSocketChannel serverChannel1 = ServerSocketChannel.open();
serverChannel1.bind(new InetSocketAddress(7777));
serverChannel1.configureBlocking(false);
ServerSocketChannel serverChannel2 = ServerSocketChannel.open();
serverChannel2.bind(new InetSocketAddress(8888));
serverChannel2.configureBlocking(false);
ServerSocketChannel serverChannel3 = ServerSocketChannel.open();
serverChannel3.bind(new InetSocketAddress(9999));
serverChannel3.configureBlocking(false);
Selector selector = Selector.open();
SelectionKey key1 = serverChannel1.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey key2 = serverChannel2.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey key3 = serverChannel3.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("等待客户端连接...");
int keyCount = selector.select();
System.out.println("连接数量:" + keyCount);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey nextKey = it.next();
System.out.println("获取通道...");
ServerSocketChannel channel = (ServerSocketChannel) nextKey.channel();
System.out.println("等待【" + channel.getLocalAddress() + "】通道数据...");
SocketChannel socketChannel = channel.accept();
ByteBuffer inBuf = ByteBuffer.allocate(100);
socketChannel.read(inBuf);
inBuf.flip();
String msg = new String(inBuf.array(), 0, inBuf.limit());
System.out.println("【服务器】接收到通道【" + channel.getLocalAddress() + "】的信息:" + msg);
it.remove();
}
}
}
}
- 测试:先启动服务器,再启动客户端,可以正常接收客户端数据了(客户端可以再添加一个线程连接 9999 端口):