一.BIO
BIO是java1.4之前唯一的IO逻辑,在客户端通过socket向服务端传输数据,服务端监听端口,由于传统io读数据的时候,如果数据没有传达,IO会一直等待输入传入,所以当有请求过来的时候,新起一条线程对数据进行等待,处理.导致每一个链接都对应着服务器的一个线程.
可以看出,BIO是同步阻塞的
BIO实现代码如下:
public class BIO {
public static void main(String[] args) throws IOException, InterruptedException {
ServerSocket serverSocket = new ServerSocket(1111);
new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
Socket socket = serverSocket.accept();//BIO:服务端接收到链接,新起线程等待读数据并处理数据
new Thread(new Runnable() {
@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
System.out.println("开始等待...."+"当前线程:"+Thread.currentThread().getName()+Thread.currentThread().getId());
int count = 0;
while (count == 0) {
count = inputStream.available();
}
byte[] a = new byte[count];
while (inputStream.read(a)>0){
System.out.println("receive:"+new String(a)+"当前线程:"+Thread.currentThread().getName()+Thread.currentThread().getId());
}
inputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
},"接收到消息新起线程-").start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
},"服务器接收线程").start();
Socket socketClient = new Socket("127.0.0.1",1111);
Socket socketClient2 = new Socket("127.0.0.1",1111);
Thread.sleep(5000);
new Thread(new Runnable() {//新起线程标识客户端:一秒之后客户端发送数据
@Override
public void run() {
try {
String a = "message";
OutputStream outputStream = socketClient.getOutputStream();
System.out.println("send:"+a+"当前线程:"+Thread.currentThread().getName()+Thread.currentThread().getId());
outputStream.write(a.getBytes());
outputStream.flush();
outputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
},"客户端发送线程1-").start();
new Thread(new Runnable() {//新起线程标识客户端:一秒之后客户端发送数据
@Override
public void run() {
try {
String a = "message";
OutputStream outputStream = socketClient2.getOutputStream();
System.out.println("send:"+a+"当前线程:"+Thread.currentThread().getName()+Thread.currentThread().getId());
outputStream.write(a.getBytes());
outputStream.flush();
outputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
},"客户端发送线程2-").start();
}
}
用两个新线程模拟两个客户端,5秒之后发送数据
返回结果:
开始等待....当前线程:接收到消息新起线程-15
开始等待....当前线程:接收到消息新起线程-16
send:message当前线程:客户端发送线程1-17
receive:message当前线程:接收到消息新起线程-15
send:message当前线程:客户端发送线程2-18
receive:message当前线程:接收到消息新起线程-16
可见多个客户端(客户端发送线程1-17,客户端发送线程2-18)同时发送两个请求时,BIO会启用两个线程来处理(接收到消息新起线程-15,接收到消息新起线程-16)
二.NIO
先说下Reactor模式
Reactor模式:多个消息到达后由消息接收器将消息放入缓存,select监听器监听缓存队列,消息到达后消息分发器根据消息类型调用对应的消息处理器处理方法,实现消息处理.
reactor模式可以解决多请求的问题,当请求过多的时候不会因为缓存或线程池不足的时候拒绝接受请求或者堵塞;缺点就是编程稍麻烦,并且处理异常情况比较麻烦.
而NIO就是基于reactor模式:服务端将服务通道注册到selector中,selector.select()方法监听通道情况,监听到消息后通过selectionKey找到客户端通道,从通道中获取数据
所以NIO为同步非阻塞
NIO实现代码如下
public class NIO {
public static void main(String[] args) throws IOException,InterruptedException{
Selector selector = Selector.open();;
ServerSocketChannel serverChannel = ServerSocketChannel.open();;
serverChannel.configureBlocking(false);//设置为非阻塞模式
serverChannel.socket().bind(new InetSocketAddress(1111), 1024);//绑定监听的端口地址
serverChannel.register(selector, SelectionKey.OP_ACCEPT);//将ServerSocketChannel注册到Selector,交给Selector监听
System.out.println("start listen thread!");
new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
System.out.println(new Date().getTime()+" start select!"+"当前线程:"+Thread.currentThread().getName()+Thread.currentThread().getId());
selector.select();
System.out.println(new Date().getTime()+"select complete!"+"当前线程:"+Thread.currentThread().getName()+Thread.currentThread().getId());
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
try {
if(selectionKey.isValid()){//判断key是否有效
if (selectionKey.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel client = server.accept();//获取到客户端的通道
client.configureBlocking(true);//设置成阻塞
ByteBuffer receivebuffer = ByteBuffer.allocate(1024);
//读取客户端请求消息到缓冲区
int count = client.read(receivebuffer); //非阻塞
if (count > 0) {
receivebuffer.flip();
byte[] bytes = new byte[receivebuffer.remaining()];
receivebuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println(new Date().getTime()+"receive message:"+body+"当前线程:"+Thread.currentThread().getName()+Thread.currentThread().getId());
}else{
client.close();
}
}
}
} catch (Exception e) {
e.printStackTrace();
if(selectionKey!=null){
selectionKey.cancel();
if(selectionKey.channel()!=null){
selectionKey.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
},"服务器线程-").start();
Thread.sleep(5000);
SocketChannel socketChannel = SocketChannel.open();
new Thread(new Runnable() {//新起线程代表客户端发送数据
@Override
public void run() {
try {
boolean connect = socketChannel.connect(new InetSocketAddress("127.0.0.1", 1111));
if(connect){
Thread.sleep(5000);//隔5秒钟在写
ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
sendbuffer.put("hello!".getBytes());
sendbuffer.flip();
socketChannel.write(sendbuffer);
if(!sendbuffer.hasRemaining()){
System.out.println(new Date().getTime()+"Send hello to server succeed."+"当前线程:"+Thread.currentThread().getName()+Thread.currentThread().getId());
}else{
System.out.println("has remaining!");
}
}else{
System.out.println("connect error!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
},"客户端线程1-").start();
SocketChannel socketChannel2 = SocketChannel.open();
new Thread(new Runnable() {//新起线程代表客户端发送数据
@Override
public void run() {
try {
boolean connect = socketChannel2.connect(new InetSocketAddress("127.0.0.1", 1111));
if(connect){
Thread.sleep(5000);//隔5秒钟在写
ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
sendbuffer.put("hello!".getBytes());
sendbuffer.flip();
socketChannel2.write(sendbuffer);
if(!sendbuffer.hasRemaining()){
System.out.println(new Date().getTime()+"Send hello to server succeed."+"当前线程:"+Thread.currentThread().getName()+Thread.currentThread().getId());
}else{
System.out.println("has remaining!");
}
}else{
System.out.println("connect error!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
},"客户端线程2-").start();
}
}
用两个新线程模拟两个客户端,链接完成后,5秒之后发送数据(为方便结果展示,代码中client.configureBlocking(true)服务器接收部分设置成了阻塞,如果业务需要非阻塞的从通道获取消息,true改成false即可,这样如果客户端延迟写数据到通道,读取到的count可能为0,需要后续轮询读取),
返回结果:
start listen thread!
1551858174162 start select!当前线程:服务器线程-14
1551858179176select complete!当前线程:服务器线程-14
1551858184176Send hello to server succeed.当前线程:客户端线程1-15
1551858184176receive message:hello!当前线程:服务器线程-14
1551858184176 start select!当前线程:服务器线程-14
1551858184176select complete!当前线程:服务器线程-14
1551858184186receive message:hello!当前线程:服务器线程-14
1551858184186 start select!当前线程:服务器线程-14
1551858184186Send hello to server succeed.当前线程:客户端线程2-16
可见多个客户端(客户端线程2-16,客户端线程1-15)同时发送两个请求时,NIO只用一个线程来处理(服务器线程-14)
三.AIO(也叫NIO.2)
AIO是以通知的形式来告知服务端:客户端已经将数据传输完成并且有效,服务端可以开始处理数据了,为异步非阻塞
即实现一个CompletionHandler,并实现其
completed,failed方法.当客户端数据写完,发送一个通知,CompletionHandler对象的 completed或者failed方法将会被调用.
代码实现:
public class AIO {
public static void main(String[] args) throws InterruptedException{
new Thread(new Runnable() {
@Override
public void run() {
try {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(
new InetSocketAddress(1111));
server.accept(null,new CompletionHandler<AsynchronousSocketChannel, Object>() {//获取客户端连接方法
@Override
public void completed(AsynchronousSocketChannel acceptChannel, Object attachment) {
System.out.println(new Date().getTime()+"accept");
ByteBuffer buffer = ByteBuffer.allocate(6);
acceptChannel.read(buffer, 10L, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {//读取数据方法
@Override
public void completed(Integer result, Object attachment) {
if (result != -1) {
buffer.flip();
System.out.println(new Date().getTime()+"received message:" + Charset.forName("UTF-8").decode(buffer));
buffer.clear();
}else{
try {
acceptChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
},"服务器线程-").start();
new Thread(new Runnable() {
@Override
public void run() {
try {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
System.out.println(new Date().getTime()+"connect");
client.connect(new InetSocketAddress("127.0.0.1", 1111)).get();
Thread.sleep(5000);
System.out.println(new Date().getTime()+"send message:hello");
client.write(ByteBuffer.wrap("hello".getBytes()));
} catch (Exception e) {
e.printStackTrace();
}
}
},"客户端线程1-").start();
Thread.sleep(50000);
}
}
这里读缓存大小如果不足,则只会传部分数据.
返回结果:
1551861491202connect
1551861491212accept
1551861496213send message:hello
1551861496213received message:hello