package com.nio.real;
import java.io.IOException;
public class TimeServer {
public static void main(String[] args)throws IOException {
// 设置要监听的端口
int port = 8785;
if (args != null && args.length > 0){
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
// 创建一个被称为MultiplexerTimeServer的多路复用类
// 它是一个单独的线程,负责轮询多路复用器seletor,可以处理多个客户端的并发接入
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
}
}
package com.nio.real;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TimeClientHandler implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandler(String host, int port) {
this.port = port;
this.host = host == null ? "localhost" : host;
try {
// 初始化多路复用器和SocketChannel
selector = Selector.open();
socketChannel = SocketChannel.open();
// 设置为异步非阻塞模式
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
while (!stop){
try {
// 设置休眠时间为1s,无论是否发生读写等事件,selector都会每隔1s倍唤醒一次
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
// 轮询多路复用器selector
while (it.hasNext()){
key = it.next();
it.remove();
try {
// 当有就绪的channel时,执行handleInput方法
handleInput(key);
} catch (Exception e) {
if (key != null){
key.cancel();
if (key.channel() != null){
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
// 多路复用关闭后所有注册在上面的channel和pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key)throws IOException {
if (key.isValid()){
// 判断是否连接成功
SocketChannel sc = (SocketChannel) key.channel();
// 判断连接状态,处于连接状态说明服务端已经返回ack应答消息
if (key.isConnectable()){
// 对连接结果判断,返回TRUE说明客户端连接成功
if (sc.finishConnect()){
// 将sc注册到多路复用器上,注册SelectionKey.OP_READ,监听网络ducaoz,然后将消息发送给服务端
sc.register(selector,SelectionKey.OP_READ);
doWrite(sc);
}else {
// 连接失败,进程退出
System.exit(1);
}
}
if (key.isReadable()){
// 预分配1MB接收缓冲区的应答消息
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// 调用read()方法进行异步读取操作
int readBytes = sc.read(readBuffer);
if (readBytes > 0){
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
// 对读取到的消息进行解码并且打印
String body = new String(bytes, "utf-8");
System.out.println("现在是:" + body);
// 将stop设置为TRUE,线程退出循环
this.stop = true;
}else if (readBytes < 0){
// 对端链路关闭
key.cancel();
sc.close();
}else
; // 读到0字节,忽略
}
}
}
private void doConnect() throws IOException{
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if (socketChannel.connect(new InetSocketAddress(host, port))){
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else {
// 没有直接连接成功,说明服务端没有返回TCP握手应答消息,但是不代表连接失败
// 注册到多路复用器上,注册electionKey.OP_CONNECT,当服务端返回TCPsyn-ack消息后,
// selector就能够轮训到这个socketChannel处于连接就绪状态
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel sc)throws IOException {
// 构造请求消息体
String str = "QUERY TIME ORDER";
// 将字符串编码成字节数组
byte[] req = str.getBytes();
// 根据字节数组的容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
// 将字节数组复制到缓冲区中
writeBuffer.put(req);
// 将缓冲区当前的limit设置为position,position = 0;用于后续对缓冲区的读取操作
writeBuffer.flip();
// 发送
sc.write(writeBuffer);
// 对发送结果进行判断,如果缓冲区中的消息全部发送完毕,将输出打印结果
if (!writeBuffer.hasRemaining()){
System.out.println("客户端成功发送: " + str);
}
}
}