NIO是非阻塞IO,其核心组件就是多路复用器Selector和channel,所有的channel都要在Selector上去注册,来实现非阻塞的过程;
Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。
一个Selector可以同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。
主要是看代码注释1.服务器代码NioServer.java
package bigdata.studynio;
public class NioServer {
public static void main(String[] args) {
int port = 8080;
if(args != null && args.length < 0){
//port = Integer.valueOf(args[0]);
}
//创建服务器线程
NioServerWork nioServerWork = new NioServerWork(port);
new Thread(nioServerWork, "server").start();
}
}
2.服务器的业务操作代码NioServerWork.java
package bigdata.studynio;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioServerWork implements Runnable {
//多路复用器 Selector会对注册在其上面的channel进行;轮询,当某个channel发生读写操作时,
//就会处于相应的就绪状态,通过SelectionKey的值急性IO 操作
private Selector selector;//多路复用器
private ServerSocketChannel channel;
private volatile boolean stop;
/**
* @param port
* 构造函数
*/
public NioServerWork(int port) {
try {
selector = Selector.open();//打开多路复用器
channel = ServerSocketChannel.open();//打开socketchannel
channel.configureBlocking(false);//配置通道为非阻塞的状态
channel.socket().bind(new InetSocketAddress(port), 1024);//通道socket绑定地址和端口
channel.register(selector, SelectionKey.OP_ACCEPT);//将通道channel在多路复用器selector上注册为接收操作
System.out.println("NIO 服务启动 端口: "+ port);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void stop(){
this.stop=true;
}
@Override
public void run() {//线程的Runnable程序
System.out.println("NIO 服务 run()");
while(!stop){
try {
selector.select(1000);//最大阻塞时间1s
//获取多路复用器的事件值SelectionKey,并存放在迭代器中
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey key =null;
//System.out.println("NIO 服务 try");
while(iterator.hasNext()){
System.out.println("NIO 服务 iterator.hasNext()");
key = iterator.next();
iterator.remove();//获取后冲迭代器中删除此值
try {
handleinput(key);//根据SelectionKey的值进行相应的读写操作
} catch (Exception e) {
if(key!=null){
key.cancel();
if(key.channel()!=null)
key.channel().close();
}
}
}
} catch (IOException e) {
System.out.println("NIO 服务 run catch IOException");
e.printStackTrace();
System.exit(1);
}
}
}
/**
* @param key
* @throws IOException
* 根据SelectionKey的值进行相应的读写操作
*/
private void handleinput(SelectionKey key) throws IOException {
System.out.println("NIO 服务 handleinput");
if(key.isValid()){//判断所传的SelectionKey值是否可用
if(key.isAcceptable()){//在构造函数中注册的key值为OP_ACCEPT,,在判断是否为接收操作
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();//获取key值所对应的channel
SocketChannel sc = ssc.accept();//设置为接收非阻塞通道
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);//并把这个通道注册为OP_READ
}
if(key.isReadable()){//判断所传的SelectionKey值是否为OP_READ,通过上面的注册后,经过轮询后就会是此操作
SocketChannel sc = (SocketChannel)key.channel();//获取key对应的channel
ByteBuffer readbuf = ByteBuffer.allocate(1024);
int readbytes = sc.read(readbuf);//从channel中读取byte数据并存放readbuf
if(readbytes > 0){
readbuf.flip();//检测时候为完整的内容,若不是则返回完整的
byte[] bytes = new byte[readbuf.remaining()];
readbuf.get(bytes);
String string = new String(bytes, "UTF-8");//把读取的数据转换成string
System.out.println("服务器接受到命令 :"+ string);
//"查询时间"就是读取的命令,此字符串要与客户端发送的一致,才能获取当前时间,否则就是bad order
String currenttime = "查询时间".equalsIgnoreCase(string) ? new java.util.Date(System.currentTimeMillis()).toString() : "bad order";
dowrite(sc,currenttime);//获取到当前时间后,就需要把当前时间的字符串发送出去
}else if (readbytes < 0){
key.cancel();
sc.close();
}else{}
}
}
}
/**
* @param sc
* @param currenttime
* @throws IOException
* 服务器的业务操作,将当前时间写到通道内
*/
private void dowrite(SocketChannel sc, String currenttime) throws IOException {
System.out.println("服务器 dowrite currenttime"+ currenttime);
if(currenttime !=null && currenttime.trim().length()>0){
byte[] bytes = currenttime.getBytes();//将当前时间序列化
ByteBuffer writebuf = ByteBuffer.allocate(bytes.length);
writebuf.put(bytes);//将序列化的内容写入分配的内存
writebuf.flip();
sc.write(writebuf); //将此内容写入通道
}
}
}
3.客户端代码NioClient.java
package bigdata.studynio;
public class NioClient {
public static void main(String[] args) {
int port = 8080;
if(args !=null && args.length > 0){
try {
//port = Integer.valueOf(args[0]);
} catch (Exception e) {
// TODO: handle exception
}
}
//创建客户端线程
new Thread(new NioClientWork("127.0.0.1",port),"client").start();
}
}
4.客户端业务操作代码NioClientWork.java
package bigdata.studynio;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioClientWork implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
/**
* @param string
* @param port
* 构造函数
*/
public NioClientWork(String string, int port) {
this.host = string == null ? "127.0.0.1":string;
this.port = port;
try {
selector= Selector.open();//打开多路复用器
socketChannel=SocketChannel.open();//打开socketchannel
socketChannel.configureBlocking(false);
System.out.println("NIO 客户端启动 端口: "+ port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
doConnect();//客户端线程需要连接服务器
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
while(!stop){
try {
selector.select(1000);//最大阻塞时间1s
//获取多路复用器的事件值SelectionKey,并存放在迭代器中
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey key =null;
while (iterator.hasNext()) {
key = iterator.next();
iterator.remove();
try {
handleinput(key);//获取多路复用器的事件值SelectionKey,并存放在迭代器中
} catch (Exception e) {
if(key == null){
key.cancel();
if(socketChannel ==null)
socketChannel.close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
if(selector !=null){
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @throws IOException
* 线程连接服务器,并注册操作
*
*/
private void doConnect() throws IOException {
if(socketChannel.connect(new InetSocketAddress(host, port))){//检测通道是否连接到服务器
System.out.println("NIO 客户端 idoConnect OP_READ ");
socketChannel.register(selector, SelectionKey.OP_READ);//如果已经连接到了服务器,就把通道在selector注册为OP_READ
dowrite(socketChannel);
}else{
System.out.println("NIO 客户端 doConnect OP_CONNECT ");
socketChannel.register(selector, SelectionKey.OP_CONNECT);//如果客户端未连接到服务器,则将通道注册为OP_CONNECT操作
}
}
/**
* @param key
* @throws IOException
* 根据SelectionKey的值进行相应的读写操作
*/
private void handleinput(SelectionKey key) throws IOException {
//System.out.println("NIO 客户端 handleinput ");
if(key.isValid()){//判断所传的SelectionKey值是否可用
//System.out.println("NIO 客户端 isValid() ");
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){//一开始的时候,客户端需要连接服务器操作,所以检测是否为连接状态
System.out.println("NIO 客户端 isConnectable ");
if(sc.finishConnect()){//是否完成连接
System.out.println("NIO 客户端 finishConnect ");
dowrite(sc);//向通道内发送数据,就是“查询时间” 的命令,读写通道与通道注册事件类型无关,注册事件只是当有事件来了,就会去处理相应事件
sc.register(selector, SelectionKey.OP_READ);//如果完成了连接,就把通道注册为 OP_READ操作,用于接收服务器出过来的数据
}else{
System.out.println("NIO 客户端 not finishConnect ");
System.exit(1);
}
}
if(key.isReadable()){//根据上面注册的selector的通道读事件,进行操作
System.out.println("NIO 客户端 isReadable() ");
ByteBuffer readbuf = ByteBuffer.allocate(1024);
int readbytes = sc.read(readbuf);//获取通道从服务器发过来的数据,并反序列化
if(readbytes > 0){
readbuf.flip();
byte[] bytes=new byte[readbuf.remaining()];
readbuf.get(bytes);
String string = new String(bytes, "UTF-8");
System.out.println("时间是: " + string);
this.stop=true; //操作完毕后,关闭所有的操作
}else if (readbytes < 0){
key.cancel();
sc.close();
}else{}
}
}
}
private void dowrite(SocketChannel sc) throws IOException {
byte[] req = "查询时间".getBytes();
ByteBuffer writebuf = ByteBuffer.allocate(req.length);
writebuf.put(req);
writebuf.flip();
sc.write(writebuf);
if(!writebuf.hasRemaining()){
System.out.println("向服务器发送命令成功 ");
}
}
}