Select选择器
多路复用的概念
是指多个服务器去监听多个端口,如果多路不复用,则每个服务器都需要开启一个线程,在高并发下性能比较差
所以,我们可以把多个服务器注册到一个Select选择器上,只需要开启一个线程就可以处理这些服务器
选择器Selector
Selector称为选择器或多路复用器,可以让很多Channel注册其中,然后监听各个Channel发生的事件
Selector的创建:
Selector selector=Selector.open();
将我们要交给选择器的通道注册其中:
channel.configureBlocking(false); (channel是一个**非阻塞通道**)
SelectionKey key =channel.register(selector,SelectionKey.OP_ACCEPT);
参数1、该通道注册到选择器的对象
参数2、表示选择器对何种事件感兴趣,服务器通道只能写SelectionKey.OP_ACCEPT,表示有客户端连接
public class SelectDemo {
public static void main(String[] args) throws IOException {
//多路(多个服务器监听多个端口)【通道默认是阻塞的】
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//【通道必须是非阻塞的,所以应该设置为非阻塞】
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(7777));
ServerSocketChannel serverSocketChanne2 = ServerSocketChannel.open();
serverSocketChanne2.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8888));
ServerSocketChannel serverSocketChanne3 = ServerSocketChannel.open();
serverSocketChanne3.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9999));
//获取选择器对象
Selector selector = Selector.open();
//将多个通道对象注册其中
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChanne2.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChanne3.register(selector, SelectionKey.OP_ACCEPT);
}
}
Selector中常用的方法
获取所有已经成功注册到当前选择器上的通道集合
public Set<SelectionKey> keys();
获取所有已经有客户端连接的通道集合
public Set<SelectionKey> Selectedkeys();
如果目前没有客户端连接,则该方法会阻塞;如果有客户端连接,则会返回本次连接的客户端数量
public int selsct()
Selector实现多路连接(上)
//客户端
public class SocketDemo {
public static void main(String[] args) {
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("7777客户端连接服务器......");
socket.connect(new InetSocketAddress("192.168.0.104", 7777));
System.out.println("7777客户端连接成功....");
} catch (IOException e) {
System.out.println("7777异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("8888客户端连接服务器......");
socket.connect(new InetSocketAddress("192.168.0.104", 8888));
System.out.println("8888客户端连接成功....");
} catch (IOException e) {
System.out.println("8888异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("9999客户端连接服务器......");
socket.connect(new InetSocketAddress("192.168.0.104", 9999));
System.out.println("9999客户端连接成功....");
} catch (IOException e) {
System.out.println("9999异常重连");
}
}).start();
}
}
public class SelectDemo01 {
public static void main(String[] args) throws IOException {
//多路(多个服务器监听多个端口)【通道默认是阻塞的】
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//【通道必须是非阻塞的,所以应该设置为非阻塞】
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(7777));
ServerSocketChannel serverSocketChanne2 = ServerSocketChannel.open();
serverSocketChanne2.configureBlocking(false);
serverSocketChanne2.bind(new InetSocketAddress(8888));
ServerSocketChannel serverSocketChanne3 = ServerSocketChannel.open();
serverSocketChanne3.configureBlocking(false);
serverSocketChanne3.bind(new InetSocketAddress(9999));
//获取选择器对象
Selector selector = Selector.open();
//将多个通道对象注册其中
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChanne2.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChanne3.register(selector, SelectionKey.OP_ACCEPT);
//接受客户端连接
Set<SelectionKey> keys = selector.keys();
System.out.println("注册通道的数量:" + keys.size());
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("已连接通道的数量:"+selectionKeys.size());
int select = selector.select();//【此方法会阻塞】
System.out.println("本次连接数量:"+select);
}
}
Selector实现多路连接(下)
public class SelectDemo01 {
public static void main(String[] args) throws IOException, InterruptedException {
//多路(多个服务器监听多个端口)【通道默认是阻塞的】
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//【通道必须是非阻塞的,所以应该设置为非阻塞】
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(7777));
ServerSocketChannel serverSocketChanne2 = ServerSocketChannel.open();
serverSocketChanne2.configureBlocking(false);
serverSocketChanne2.bind(new InetSocketAddress(8888));
ServerSocketChannel serverSocketChanne3 = ServerSocketChannel.open();
serverSocketChanne3.configureBlocking(false);
serverSocketChanne3.bind(new InetSocketAddress(9999));
//获取选择器对象
Selector selector = Selector.open();
//将多个通道对象注册其中
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChanne2.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChanne3.register(selector, SelectionKey.OP_ACCEPT);
//接受客户端连接
while (true) {
Set<SelectionKey> keys = selector.keys();
System.out.println("注册通道的数量:" + keys.size());
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("已连接通道的数量:" + selectionKeys.size());
int select = selector.select();//【此方法会阻塞】
System.out.println("本次连接数量:" + select);
//遍历已连接通道的集合
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
//获取当前连接通道的SelectionKey
SelectionKey key = it.next();
// 从SelectionKey中获取通道对象
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
// 看一下此通道是监听哪个端口的
System.out.println("监听端口:" + channel.getLocalAddress());
//取出连接到该服务器的连接通道
SocketChannel accept = channel.accept();
System.out.println(accept);
//从连接的通道中把已经处理过的服务器通道移除
it.remove();
}
System.out.println("休息1秒......");
Thread.sleep(1000);
System.out.println();//打印一个空行 }
}
}
}
//客户端
public class SocketDemo {
public static void main(String[] args) {
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("7777客户端连接服务器......");
socket.connect(new InetSocketAddress("192.168.0.104", 7777));
System.out.println("7777客户端连接成功....");
} catch (IOException e) {
System.out.println("7777异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("8888客户端连接服务器......");
socket.connect(new InetSocketAddress("192.168.0.104", 8888));
System.out.println("8888客户端连接成功....");
} catch (IOException e) {
System.out.println("8888异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("9999客户端连接服务器......");
socket.connect(new InetSocketAddress("192.168.0.104", 9999));
System.out.println("9999客户端连接成功....");
} catch (IOException e) {
System.out.println("9999异常重连");
}
}).start();
}
}
Selector实现多路信息接收测试
public class SelectDemo01 {
public static void main(String[] args) throws IOException, InterruptedException {
//多路(多个服务器监听多个端口)【通道默认是阻塞的】
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//【通道必须是非阻塞的,所以应该设置为非阻塞】
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(7777));
ServerSocketChannel serverSocketChanne2 = ServerSocketChannel.open();
serverSocketChanne2.configureBlocking(false);
serverSocketChanne2.bind(new InetSocketAddress(8888));
ServerSocketChannel serverSocketChanne3 = ServerSocketChannel.open();
serverSocketChanne3.configureBlocking(false);
serverSocketChanne3.bind(new InetSocketAddress(9999));
//获取选择器对象
Selector selector = Selector.open();
//将多个通道对象注册其中
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChanne2.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChanne3.register(selector, SelectionKey.OP_ACCEPT);
//接受客户端连接
while (true) {
Set<SelectionKey> keys = selector.keys();
System.out.println("注册通道的数量:" + keys.size());
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("已连接通道的数量:" + selectionKeys.size());
int select = selector.select();//【此方法会阻塞】
System.out.println("本次连接数量:" + select);
/* 处理被连接的服务器通道*/
//遍历已连接通道的集合
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
//获取当前连接通道的SelectionKey
SelectionKey key = it.next();
// 从SelectionKey中获取通道对象
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
// 看一下此通道是监听哪个端口的
System.out.println("监听端口:" + channel.getLocalAddress());
//取出连接到该服务器的连接通道
SocketChannel accept = channel.accept();
//System.out.println(accept);
System.out.println("写与客户端交互代码");
/**接受客户端发送过来的信息*/
ByteBuffer inBuf = ByteBuffer.allocate(100);
accept.read(inBuf);
inBuf.flip();
String msg = new String(inBuf.array(), 0, inBuf.limit());
System.out.println("【服务器】接收到通道【" + channel.getLocalAddress() + "】的信息:" + msg);
//从连接的通道中把已经处理过的服务器通道移除
it.remove();
}
System.out.println("休息1秒......");
Thread.sleep(1000);
System.out.println();//打印一个空行 }
}
}
}
//客户端
public class SocketDemo {
public static void main(String[] args) {
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("7777客户端连接服务器......");
socket.connect(new InetSocketAddress("192.168.0.104", 7777));
System.out.println("7777客户端连接成功....");
/*发送数据*/
ByteBuffer outBuf = ByteBuffer.allocate(100);
outBuf.put("我是客户端,连接7777端口".getBytes());
outBuf.flip();
socket.write(outBuf);
} catch (IOException e) {
System.out.println("7777异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("8888客户端连接服务器......");
socket.connect(new InetSocketAddress("192.168.0.104", 8888));
System.out.println("8888客户端连接成功....");
/*发送数据*/
ByteBuffer outBuf = ByteBuffer.allocate(100);
outBuf.put("我是客户端,连接8888端口".getBytes());
outBuf.flip();
socket.write(outBuf);
} catch (IOException e) {
System.out.println("8888异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("9999客户端连接服务器......");
socket.connect(new InetSocketAddress("192.168.0.104", 9999));
System.out.println("9999客户端连接成功....");
/*发送数据*/
ByteBuffer outBuf = ByteBuffer.allocate(100);
outBuf.put("我是客户端,连接9999端口".getBytes());
outBuf.flip();
socket.write(outBuf);
} catch (IOException e) {
System.out.println("9999异常重连");
}
}).start();
}
}
AIO(异步非阻塞)
AIO概述和分类
四个异步通道:
AsynchronousSocketChannel //异步的客户端通道
AsynchronousServerSocketChannel //异步的服务器端通道
AsynchronousFileChannel //异步的文件通道
AsynchronousDatagramChannel //异步的UDP通道
表现在两个方面:
1、连接时,可以使用异步,在调用连接的方法时,非阻塞,连接成功之后会以方法回调的机制通知我们。
2、读取数据时,可以使用异步,在使用read方法时,非阻塞,等数据接收到之后以方法回调的机制通知我们。
AIO异步非阻塞连接的建立
异步的服务器端通道
//AIO下的异步的服务器端通道
public class AIOServerSocketChanneldemo {
public static void main(String[] args) throws IOException {
//创建AIO下的异步的服务器端通道
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
//绑定端口号
serverSocketChannel.bind(new InetSocketAddress(8888));
//接受异步客户端,采用异步非阻塞式
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println("客户端连接成功");
try {
System.out.println(result.getLocalAddress());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("客户端连接失败");
}
});
System.out.println("继续执行");
while (true) {
}
}
}
//AIO下的异步的客户端通道
public class AIOSocketChannel {
public static void main(String[] args) throws IOException {
//创建异步的客户端通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
//连接服务器,采用异步非阻塞方式
//connect(服务器ip和端口号、附件(null)、接口)
socketChannel.connect(new InetSocketAddress("192.168.0.104", 8888), null, new CompletionHandler<Void, Object>() {
//连接成功时的回调函数
@Override
public void completed(Void result, Object attachment) {
System.out.println("连接服务器成功");
}
//连接失败时的回调函数
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接服务器失败");
}
});
System.out.println("程序继续执行");
while (true) {
}
}
}
AIO同步读写数据
//AIO下的异步的客户端通道
public class AIOSocketChannel {
public static void main(String[] args) throws IOException {
//创建异步的客户端通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
//连接服务器,采用异步非阻塞方式
//connect(服务器ip和端口号、附件(null)、接口)
socketChannel.connect(new InetSocketAddress("192.168.0.104", 8888), null, new CompletionHandler<Void, Object>() {
//连接成功时的回调函数
@Override
public void completed(Void result, Object attachment) {
System.out.println("连接服务器成功");
//给服务器发送数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("你好,我是客户端".getBytes());
//切换读写模式
buffer.flip();
socketChannel.write(buffer);
//释放资源
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//连接失败时的回调函数
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接服务器失败");
}
});
System.out.println("程序继续执行");
while (true) {
}
}
}
//AIO下的异步的服务器端通道
public class AIOServerSocketChanneldemo {
public static void main(String[] args) throws IOException {
//创建AIO下的异步的服务器端通道
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
//绑定端口号
serverSocketChannel.bind(new InetSocketAddress(8888));
//接受异步客户端,采用异步非阻塞式
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
System.out.println("客户端连接成功");
//读客户端发送的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> future = socketChannel.read(buffer);
//buffer.flip();//如果采用异步连接,读取数据后不需要调用flip
try {
System.out.println(new String(buffer.array(), 0, future.get()));
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("客户端连接失败");
}
});
System.out.println("继续执行");
while (true) {
}
}
}
AIO异步读写数据
//AIO下的异步非阻塞的服务器端通道
public class AIOServerSocketChanneldemo {
public static void main(String[] args) throws IOException, InterruptedException {
//创建AIO下的异步的服务器端通道
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
//绑定端口号
serverSocketChannel.bind(new InetSocketAddress(8888));
//接收异步客户端,采用异步非阻塞式
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
System.out.println("客户端连接成功");
//从客户端中读取数据
//异步的read(缓冲区,超时时间,时间单位,附件null,回调接口)
ByteBuffer buffer = ByteBuffer.allocate(1024);
//new CompletionHandler是匿名内部类【重写匿名内部类中的两个方法即可】
socketChannel.read(buffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println("接收到的数据是" + new String(buffer.array(), 0, result));
//释放资源
try {
serverSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//数据接收失败
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("数据接收失败");
}
});
}
//客户端连接失败
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("客户端连接失败");
}
});
System.out.println("程序继续执行");
while (true) {
Thread.sleep(500);
}
}
}
//AIO下的异步非阻塞的客户端通道
public class AIOSocketChannel {
public static void main(String[] args) throws IOException, InterruptedException {
//创建异步的客户端通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
//连接服务器,采用异步非阻塞方式
//connect(服务器ip和端口号、附件(null)、接口)
socketChannel.connect(new InetSocketAddress("192.168.0.104", 8888), null, new CompletionHandler<Void, Object>() {
//连接成功时的回调函数
@Override
public void completed(Void result, Object attachment) {
System.out.println("连接服务器成功");
//给服务器发送数据【创建字节缓冲区】
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("你好,我是客户端".getBytes());
//切换读写模式
buffer.flip();
//异步的write(缓冲区,超时时间,时间单位,附件null,回调接口)
socketChannel.write(buffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println("数据发送成功");
//释放资源
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("数据发送失败");
}
});
}
//连接失败时的回调函数
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接服务器失败");
}
});
System.out.println("程序继续执行");
while (true) {
Thread.sleep(500);
}
}
}
小结
世上无难事,只要肯登攀。