目录
1.简介
java NIO(Non-blocking IO,非阻塞IO),本质是多路复用,以此提高服务端的并发和吞吐量。 多路复用是一个选择器Selector可以监听多个通道Channel的多种事件,如下图所示。
核心概念包括 Selector(选择器)、 Channel(通道)、Buffer(缓冲区),如读、写、Accept事件等。下面我们通过一个最简单的TCP服务端实现的demo讲解下NIO的这些组件一般用法。
Channels通道 |
与Socket等支持非阻塞IO的连接 |
Buffers缓冲 |
可由Channel通道直接读取或写入的类数组对象 |
Selectors |
指示Channel通道有IO事件 |
SelectionKeys |
维护IO事件状态和绑定 |
2.最简demo使用
用nio写个最简单的TCPServer,客户端可以连接服务端,以echo打头的指令可以响应回响,否则响应BAD_REQUEST,效果如图所示
2.1 网络线程模型图
为实现上面的服务效果,设计demo使用主线程接收客户端连接,用ThreadPoolExecutor线程池做异步IO处理。
|
2.2 demo代码
public class EchoServer {
private static ExecutorService executor; // 用于异步读写的线程池
static {
executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
}
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();// 静态工厂创建服务通道
ssc.socket().bind(new InetSocketAddress(8888));
ssc.configureBlocking(false); // !!一定要
Selector selector = Selector.open(); // 静态工厂模式
ssc.register(selector, ssc.validOps()); // 注册该服务通道的Accept操作
while (true) { // 在选择器上轮询
int readyCount = selector.select(1000);
if(readyCount==0){
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 就绪的事件
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while(keyIterator.hasNext()){
SelectionKey selectionKey = keyIterator.next();
if(selectionKey.isValid()){
// ServerSocketChannel有连接事件
if(selectionKey.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); // key与通道关联
SocketChannel socketChannel = server.accept(); // 接收连接,获得传输通道
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE); // 注册通道的读写事件
}
// SocketChannel数据通道可读
if(selectionKey.isReadable()){
executor.submit(new EchoServerTask(selectionKey)); // 异步处理读事件
}
keyIterator.remove();
}
}
}
}
}
public class EchoServerTask implements Runnable{
private SelectionKey selectionKey;
public EchoServerTask(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
@Override
public void run() {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
try {
int count =0;
while ((count = channel.read(byteBuffer)) > 0) {
byteBuffer.flip();
byte[] request=new byte[byteBuffer.remaining()];
byteBuffer.get(request);
String requestStr=new String(request);
byteBuffer.clear();
// 处理客户端请求
if (requestStr.startsWith("echo")) {
// 放入缓冲区
byteBuffer.put(("response:" + requestStr).getBytes());
byteBuffer.flip(); //此处需要 做byteBuffer模式切换,否则会采用读模式
channel.write(byteBuffer);
} else {
byteBuffer.put("BAD_REQUEST".getBytes());
byteBuffer.flip();
channel.write(byteBuffer);
}
}
} catch (IOException e) {
e.printStackTrace();
selectionKey.cancel();
}
}
}
3 源码分析
3.1 Selector选择器
一般使用方法是在选择器上注册多个通道的多个事件,选择器可以监听多个事件,其select()是个阻塞方法,但当其任意一个事件发生都将唤醒返回。
使用模式 | ||
注册 | ssc.register(selector, ssc.validOps()); | 注册该服务通道的Accept操作 |
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE); | 注册通道的读写事件 | |
选择 |
while (true) { // 在选择器上轮询 Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); .....do something with channel ...... |
轮询选择 select(timeout)阻塞等待 注册的事件任意一个发生,唤醒返回 处理事件 |
3.1.1 open()创建实例
selector实例一般使用DefaultSelectorProvider.create()根据底层操作系统版本选择不同的选择器实现,因为不同操作系统支持不同的多路复用,如linux 2.6以上才支持Epoll模型,一般使用poll模型。
//Selector
public static Selector open() throws IOException { //静态工厂方法
return SelectorProvider.provider().openSelector();
}
//SelectorProvider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();//
return provider;
}
});
}
}
//DefaultSelectorProvider
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
// PollSelectorProvider
public AbstractSelector openSelector() throws IOException {
return new PollSelectorImpl(this);
}
3.1.2 select()遍历fd选择就绪
最终调用Native方法,对JNI有兴趣的可以参考JNI讲解,最终调用到PollArrayWrapper#poll0(long pollAddress, int numfds, long timeout),启动调用到jdk中动态连接库中的PollArrayWrapper.c的Java_sun_nio_ch_PollArrayWrapper_poll0方法,方法实现如下:
JNIEXPORT jint JNICALL
Java_sun_nio_ch_PollArrayWrapper_poll0(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout)
{
struct pollfd *a;
int err = 0;
a = (struct pollfd *) jlong_to_ptr(address);
if (timeout <= 0) { /* Indefinite or no wait */
RESTARTABLE (poll(a, numfds, timeout), err);
} else { /* Bounded wait; bounded restarts */
err = ipoll(a, numfds, timeout);
}
if (err < 0) {
JNU_ThrowIOExceptionWithLastError(env, "Poll failed");
}
return (jint)err;
}
其中<poll.h>的poll()方法其实是调用操作系统的接口,对描述符进行遍历,查看是否有描述符就绪,如果有就返回就绪文件描述符的个数。
3.2 Channel
如果是读写事件的话,选择器触发唤醒线程去处理事件,一般的处理就是从通道channel中读数据到缓冲Buffer去,或者从缓冲区写入到通道中。
使用模式 | 图解 |
|
|
channel.write(byteBuffer); |
3.2.1 read读取到buffer
IOUtil读取和src/solaris/native/sun/nio/ch/FileDispatcherImpl.c实现如下,映射文件描述符,从文件描述符去读取内容到buf所在内存地址
// IOUtil
static int read(FileDescriptor fd, ByteBuffer dst, long position,
NativeDispatcher nd)
throws IOException
{
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
if (dst instanceof DirectBuffer)
return readIntoNativeBuffer(fd, dst, position, nd);
// Substitute a native buffer
ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
try {
int n = readIntoNativeBuffer(fd, bb, position, nd); //从buf的potition处开始存放
bb.flip();
if (n > 0)
dst.put(bb);
return n;
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
long position, NativeDispatcher nd)
throws IOException
{
int pos = bb.position();
int lim = bb.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
if (rem == 0)
return 0;
int n = 0;
if (position != -1) {
n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
rem, position);
} else {
n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem); // 计算内存地址,加上了position
}
if (n > 0)
bb.position(pos + n);
return n;
}
// C实现
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo,
jlong address, jint len, jlong offset)
{
jint fd = fdval(env, fdo);
void *buf = (void *)jlong_to_ptr(address);
return convertReturnVal(env, pread64(fd, buf, len, offset), JNI_TRUE);
}
write写入
写入也是类似,都是调用IOUtil
3.3 Buffer缓冲区
程序读写数据时,需要有个缓冲区取放 于通道交互的数据。
3.3.1 核心字段
字段\模式 | 0 <= mark <= position <= limit <= capacity |
|
写模式 | 读模式 | |
position |
下一个可写的位置 | 下一个可读的位置 |
limit | 可写的最大位置limit==capacity | 可读的最大位置,后面未写入的不能读取 |
capacity |
缓冲区容量,数组的长度 |
3.3.2 demo测试
@Test
public void testByteBuffer() throws IOException {
ByteBuffer buffer=ByteBuffer.allocate(1024);
buffer.order(ByteOrder.LITTLE_ENDIAN);
System.out.println(buffer); // [pos=0 lim=1024 cap=1024]
// 写入5个字节
buffer.put((byte) 0).put((byte) 1).put((byte)2).put((byte)3).put((byte)4);
System.out.println(buffer); // [pos=5 lim=1024 cap=1024]
// 直接获取字节数组array()
InputStream in=new ByteArrayInputStream(buffer.array(),0,buffer.position());
int c ;
while((c = in.read()) >= 0) {
System.out.println((byte)c);
}
// flip 转换 读模式 读取刚刚写入的数据
buffer.flip();
System.out.println(buffer);// [pos=0 lim=5 cap=1024]
// get 读取,并记录读取进度
while (buffer.hasRemaining()){
byte b = buffer.get();
System.out.println(b);
}
System.out.println(buffer);//[pos=5 lim=5 cap=1024]
}
3.3.3 总结
如果读写共用一个buffer,一定在是读后、写后flip切换模式