最近在学习Netty,借此平台记录学习心得,方便今后的复习回顾,若在下对知识有理解错误的地方望各位大佬批评指正,不多B了。
本文目录:
1、 Linux网络I/O
我们先了解一下Linux系统的I/O,Linux系统的I/O分为两个阶段:1)内核准备数据;2)数据从内核拷贝到用户空间;
Linux内核将所有的外部设备都看做文件进行操作,通过调用内核的系统命令对文件进行操作并返回一个文件描述符fd(file descriptor),而对Socket的读写也会有一个相应的描述符,称为socketfd。描述符是一个数字,它会指向内核中的一个结构体(文件路径、数据区等一些列属性)。操作系统会将IO的数据缓存到文件系统的页缓存(cache page)。
网络 I/O 本质上就是对Socket的读取,一次读取操作分两个阶段:
1、等待网络上数据到达,将数据写入到内核的某个缓冲区;
2、将缓冲区中的数据复制到应用进程的缓冲区;
Linux网络 I/O 模型,大致分为五大类:
1、阻塞I/O模型
最常用的I/O模型,默认情况下所有的文件操作都是阻塞。以Socket为例,进程空间调用recvfrom,从系统调用开始到接受的数据包被复制到用户进程缓冲区或者发生错误返回为止,在这一过程中会一直等待,进程在整个时间段内都是被阻塞的。模型图示如下:
- 2、非阻塞I/O模型
和阻塞的I/O模型不同之处在于,系统调用recvfrom,若内核缓冲区中没有数据的话不会等待而是直接返回一个EWOULDBLOCK错误,一般非阻塞的模型都将通过轮询来检查这一状态,判断内核是否有数据到来。模型图示如下:
3、I/O复用模型
Linux提供了select/poll,进程将一个或者多fd传递给select或poll系统调用,操作在select上是阻塞的,select/poll通过顺序扫描fd帮助我们监测多个fd是否处于就绪状态,其存在很多制约。例如:select的单个进程能够监视的文件描述符的数量存在最大限制,通常是1024,当然可以更改数量,但由于select采用轮询的方式扫描文件描述符,文件描述符数量越多,性能越差,当要支持100万的并发时则至少需要开辟1000个进程,进程间上下文切换等将带来巨大的开销;每次的select调用都将fd集合从用户态拷贝到内核态;每次调用select都要在内核遍历fd;众多的并发连接数中若活跃连接数低带来不必要的资源开销。Linux还提供了epoll系统调用,epoll采用事件驱动代替扫描,克服上述缺点因此性能更高,当有fd就绪时,立即回调函数rollback。模型图示如下:
4、信号驱动I/O模型
为Socket开启信号驱动I/O功能,并通过系统调用sigaction执行信号处理函数,此系统调用立即返回,属于非阻塞的,当数据准备就绪后为该进程生成一个SIGIO信号,通过信号回调通知应用程序调用recvfrom来读取数据,并通知主循环函数处理数据。模型图示如下:
- 5、异步I/O
通知内核启动某个操作,在实现数据从内核到用户自己的缓冲区的步骤完成后通知我们,该模型和信号驱动的区别在于:前者是内核通知我们I/O操作何时已经完成,后者则是通知我们何时可以开始I/O操作。模型图示如下:
Java NIO的核心类库多路复用器Selector就是基于epoll的多路复用技术实现。epoll相较于select做了很大的改进:
- 一个进程打开socket fd不受限制,仅受限于操作系统的最大文件句柄数;
- I/O效率不会随着FD数目增加而线性下降,epoll只会对活跃的Socket进行操作,只有活跃的Socket才会去主动调用callback函数。所以在所有Socket都活跃的网络环境下epoll并不比select/poll的效率高太多反而有时效率稍微降低;
- select、poll、epoll都需要内核把FD消息通知给用户空间,epoll使用mmap加速内核与用户空间的消息传递;
2、Java的I/O演进
早期的网路开发人员需要话费大量的时间去学习C语言的复杂Socket库去处理不同操作系统上的问题。虽然最早的 Java引入了足够多的面向对象来隐藏一些棘手的细节问题,但是创建一个复杂的客户端/服务器协议仍然需要大量的样板代码(以及相当多的底层研究才能使它整个流畅地运行起来)。
在JDK1.4推出Java NIO之前,基于Java的所有Socket通信都采用的同步阻塞模式(BIO),因此很长一段时间大型应用服务器都是采用C或者C++开发的,因为他们可以直接使用操作系统提供的异步I/O或者事件驱动I/O能力,此期间UNIX网络编程中概念或者接口在I/O类库中都没有体现,例如Pipe、Channel、Buffer和Selector等。
在JDK1.4时新增了java.nio包提供了很多进行异步I/O开发的API和类库,
- 进行异步I/O操作的缓冲区ByteBuffer等;
- 进行异步I/O操作的管道Pipe;
- 进行各种I/O操作(异步或者同步)的Channel,包括ServerSocketChannel 和 SocketChannel;
- 多种字符集的编码能力和解码能力;
- 实现非阻塞I/O操作的多路复用器Selector;
- 基于流行的Perl实现的正则表达式类库;
- 文件通道FileChannel;
NIO类库极大地促进了基于Java的异步非阻塞编程的发展和应用,但仍然有不完善的地方,特别是对文件系统的处理,
- 没有统一的文件属性(例如读写权限);
- API能力比较弱,例如目录的级联创建和递归遍历,需要自己实现;
- 底层存储系统的一些高级API无法使用;
- 所有的文件操作都是同步阻塞调用,不支持异步文件读写操作;
JDK1.7设计了原来的NIO类库提供了三个方面的改进:
- 批量获取文件属性的API,这些API具有平台无关性;
- 提供AIO功能,支持基于文件的异步I/O操作和针对网络Socket的异步操作;
完成JSR-51定义的通道功能,包括对配置和多播数据报的支持等;
3、Java IO编程
3.1、 传统BIO
服务端代码:
package com.linfo.netty.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
/**
* Created by linfeng
*/
public class BioServer {
public static void main(String[] args) throws Exception{
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(12998);
System.out.println("server start ...");
Socket socket = null;
int i = 0;
while (true) {
socket = serverSocket.accept();
System.out.println("第" + ++i + "个线程...");
System.out.println("server received connect ...");
new Thread(new BioServerHandler(socket)).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
serverSocket.close();
}
}
static class BioServerHandler implements Runnable {
private Socket socket;
public BioServerHandler(Socket socket) {
this.socket = socket;
}
public void run(){
BufferedReader bufferedReader = null;
PrintWriter printWriter = null;
try {
bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
printWriter = new PrintWriter(socket.getOutputStream(),true);
String line = null;
while ((line = bufferedReader.readLine()) != null && line.length() != 0) {
System.out.println("server thread " + Thread.currentThread().getId() + " received : " + line );
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
bufferedReader.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
printWriter.close();
} catch (Exception e) {
e.printStackTrace();
}
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端代码:
package com.linfo.netty.bio;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
/**
* Created by linfeng
*/
public class BioClient {
public static void main(String[] args) throws Exception{
for (int i = 0; i < 100; i++) {
Thread.sleep(1000);
new Thread(new Runnable() {
public void run() {
Socket socket = null;
PrintWriter printWriter = null;
try {
socket = new Socket("127.0.0.1", 12998);
System.out.println("one client start ...");
printWriter = new PrintWriter(socket.getOutputStream(),true);
String content = "thread send content";
printWriter.println(content);
printWriter.flush();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
socket.close();
printWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
}
3.2、伪异步IO
客户端代码:
package com.linfo.netty.pio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;
/**
* Created by linfeng
*/
public class PioServer {
public static void main(String[] args) throws Exception{
ServerSocket server = null;
try {
server = new ServerSocket(8088);
System.out.println("listen port 8088...");
Socket socket = null;
PioServerHandlerExecutor singleExecutor = new PioServerHandlerExecutor(50, 1000);
while (true) {
socket = server.accept();
System.out.println("receive request from client...");
singleExecutor.execute(new PioServerHandler(socket));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
server.close();
}
}
static class PioServerHandlerExecutor {
private ExecutorService executorService;
public PioServerHandlerExecutor(int maxPoolSize, int queueSize) {
executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(java.lang.Runnable task) {
executorService.execute(task);
System.out.println("Thread " + Thread.currentThread().getId());
}
}
static class PioServerHandler implements Runnable {
private Socket socket;
public PioServerHandler(Socket socket) {this.socket = socket;}
public void run(){
BufferedReader bufferedReader = null;
try {
bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line = "";
while ((line = bufferedReader.readLine()) != null && line.length() != 0) {
System.out.println("content of request is: " + line);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
bufferedReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端代码不变。
3.3、NIO
客户端代码:
package com.linfo.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* Created by linfeng
*/
public class NioServer {
public static void main(String[] args) {
NioServerHandler handler = new NioServerHandler(8088);
new Thread(handler).start();
}
static class NioServerHandler implements Runnable {
private int port;
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean stop;
public NioServerHandler(int port) {
try {
this.port = port;
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("listen port " + port + " ...");
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey key = null;
while (iterator.hasNext()) {
key = iterator.next();
iterator.remove();
handlerKey(key);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handlerKey(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = sc.read(buffer);
if (read > 0) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String content = new String(bytes, "UTF-8");
System.out.println("server received a request : " + content);
} else if (read < 0) {
key.cancel();
sc.close();
}
}
}
}
}
}
3.4、AIO
客户端代码:
package com.linfo.netty.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
/**
* Created by linfeng
*/
public class AioServer {
public static void main(String[] args) {
AioServerHandler handler = new AioServerHandler(8088);
new Thread(handler).start();
}
static class AioServerHandler implements Runnable {
private int port;
private AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AioServerHandler(int port) {
this.port = port;
try {
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("listen port : " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
asynchronousServerSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AioServerHandler>() {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
@Override
public void completed(AsynchronousSocketChannel result, AioServerHandler attachment) {
System.out.println("reading ...");
try {
buffer.clear();
result.read(buffer).get();
buffer.flip();
System.out.println("received a request : " + Charset.defaultCharset().decode(buffer));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
attachment.asynchronousServerSocketChannel.accept(attachment, this);
}
}
@Override
public void failed(Throwable exc, AioServerHandler attachment) {
System.out.println("received failed");
attachment.asynchronousServerSocketChannel.accept(attachment, this);
}
});
}
}
}
学习书籍《netty实战》和《netty权威指南》