概述
当Netty与客户端建立连接后(SocketChannel建立),通道处于 OP_READ 状态(对读感兴趣),这时就需要分配一定的缓冲区(如ByteBuffer,实则为字节数组)来用于数据的读取。之前学习 Java NIO:Buffer缓冲区源码详解以及“零拷贝” 的时候我们其实已经知道 NIO 中存在堆内和堆外两种缓冲,在 Netty 底层的实现过程中是怎么来做缓存分配的呢,又是怎么决定分配多大的缓冲区用于Channel的读与写呢?
Netty 提供了一种“可预测性” 的分配方式来处理问题,这种方式可以通过一定策略性的预判来决定分配缓冲区的大小。该策略解决的问题:
- 相对于实际数据分配过大的缓冲区,浪费空间,给 GC 增加压力
- 相对于实际数据分配过小的缓冲区,当前缓冲区装不下所有数据,缓冲区将“扩容”,就会产生数据拷贝的动作,牺牲性能
源码解析
以一个简单的 Netty 服务端为例开始:
package com.leolee.netty.secondExample;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @ClassName MySocketServer
* @Description: socket服务端
* @Author LeoLee
* @Date 2020/8/23
* @Version V1.0
**/
public class MySocketServer {
public static void main(String[] args) throws InterruptedException {
//定义线程组 EventLoopGroup为死循环
//boss线程组一直在接收客户端发起的请求,但是不对请求做处理,boss会将接收到的请i交给worker线程组来处理
//实际可以用一个线程组来做客户端的请求接收和处理两件事,但是不推荐
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//启动类定义
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//子处理器,自定义处理器,服务端可以使用childHandler或者handler,handlerr对应接收线程组(bossGroup),childHandler对应处理线程组(workerGroup)
.childHandler(new MySocketServerInitializer());
//绑定监听端口
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
//定义关闭监听
channelFuture.channel().closeFuture().sync();
} finally {
//Netty提供的优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
在这个示例中,serverBootstrap.bind(8899).sync() 之前是一些服务端启动初始化相关配置的一些操作,并没有真正的启动,我们暂不详解(之后会总结),目前只关注 serverBootstrap.bind(8899) 这一点,因为这一步才是 Netty 真正开始启动服务。
让我们从 bind() 开始一步一步的解读源码
AbstractBootstrap类:
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
至此我们找到了关键方法 private ChannelFuture doBind(final SocketAddress localAddress)
其中第一行的 initAndRegister() 方法中通过 channelFactory 创建了一个 channel 并注册了
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
channelFactory 是怎么创建了一个 channel 呢?
实际上是通过 ChannelFactory 接口的实现类 ReflectiveChannelFactory 创建的,顾名思义,ReflectiveChannelFactory 的 newChannel() 通过反射技术来生成的 channel 对象
ReflectiveChannelFactory类
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.StringUtil;
import java.lang.reflect.Constructor;
/**
* A {@link ChannelFactory} that instantiates a new {@link Channel} by invoking its default constructor reflectively.
*/
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(ReflectiveChannelFactory.class) +
'(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)";
}
}
而构造器对象 constructor 是哪里来的呢,实际就是之前示例代码中通过 ServerBootstrap 来配置的 NioServerSocketChannel
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//子处理器,自定义处理器,服务端可以使用childHandler或者handler,handlerr对应接收线程组(bossGroup),childHandler对应处理线程组(workerGroup)
.childHandler(new MySocketServerInitializer());
配置的底层代码就是调用了 ReflectiveChannelFactory.ReflectiveChannelFactory(Class<? extends T> clazz)构造方法 来设置好了之后返回的 channel 类型
/**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
由于反射技术的机制,channel 对象创建调用的是 NioServerSocketChannel 类的无参构造方法
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
紧接着将这个 JDK 提供的原生的 SelectorProvider 对象传入 newSocket(SelectorProvider provider) 并返回ServerSocketChannel,之后将该 ServerSocketChannel 传入其带参的构造方法
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
跟着 super方法一层一层的递进,最终到 AbstractNioChannel类
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
*/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
有次可以看出 NioServerSocketChannel 对象初始化了一个对 OP_ACCEPT 感兴趣的 ServerSocketChannel,并设置了其非阻塞模式
回到带参构造方法 NioServerSocketChannel(ServerSocketChannel channel)
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
javaChannel() 方法返回了我们之前在 AbstractNioChannel类 中赋值的 ServerSocketChannel
带着该 ServerSocketChannel 继续跟进代码到 DefaultChannelConfig
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
setRecvByteBufAllocator(allocator, channel.metadata());
this.channel = channel;
}
重点来了—— AdaptiveRecvByteBufAllocator
这个类就是 Netty 自适应分配缓冲区的关键
package io.netty.channel;
import java.util.ArrayList;
import java.util.List;
import static io.netty.util.internal.ObjectUtil.checkPositive;
import static java.lang.Math.max;
import static java.lang.Math.min;
/**
* The {@link RecvByteBufAllocator} that automatically increases and
* decreases the predicted buffer size on feed back.
* <p>
* It gradually increases the expected number of readable bytes if the previous
* read fully filled the allocated buffer. It gradually decreases the expected
* number of readable bytes if the read operation was not able to fill a certain
* amount of the allocated buffer two times consecutively. Otherwise, it keeps
* returning the same prediction.
*/
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
//...
}
该类Java doc:
- RecvByteBufAllocator 根据“反馈”动态的增加和减小与 Channel 相关的 buffer 的大小
- 如果之前的分配的 buffer 读满了,它会优雅的增加 readable bytes 大小(缓冲区大小)
- 逐渐降低了预期可读字节数,当读操作不能连续两次填满某个分配的缓冲区
默认构造方法:
static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024;
static final int DEFAULT_MAXIMUM = 65536;
private static final int[] SIZE_TABLE;
static {
List<Integer> sizeTable = new ArrayList<Integer>();
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
for (int i = 512; i > 0; i <<= 1) {
sizeTable.add(i);
}
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
}
/**
* Creates a new predictor with the default parameters. With the default
* parameters, the expected buffer size starts from {@code 1024}, does not
* go down below {@code 64}, and does not go up above {@code 65536}.
*/
public AdaptiveRecvByteBufAllocator() {
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}
首先该类的静态代码块初始化了 SIZE_TABLE
其中代码块中第一个遍历,从16开始,每次增加16,到496为止,将这些元素放入List;之后第二个遍历,从512开始,每次向左位移两位(相当于i * 2),放入List,直到超过范围编程负数为止;随后将List的元素赋值到SIZE_TABLE
所以 SIZE_TABLE 的元素为 [16, 32, 64 ......496, 512, 1024, 2048 ......]
SIZE_TABLE :自适应分配大小的时候,从SIZE_TABLE 的元素中选择大小的值
之后通过 private void record(int actualReadBytes) 和 private static int getSizeTableIndex(final int size) 来判断对SIZE_TABLE 的取值
private static int getSizeTableIndex(final int size) {
for (int low = 0, high = SIZE_TABLE.length - 1;;) {
if (high < low) {
return low;
}
if (high == low) {
return high;
}
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b) {
low = mid + 1;
} else if (size < a) {
high = mid - 1;
} else if (size == a) {
return mid;
} else {
return mid + 1;
}
}
}
//内部类
private final class HandleImpl extends MaxMessageHandle {
private final int minIndex;
private final int maxIndex;
private int index;
private int nextReceiveBufferSize;
private boolean decreaseNow;
HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
index = getSizeTableIndex(initial);
nextReceiveBufferSize = SIZE_TABLE[index];
}
@Override
public void lastBytesRead(int bytes) {
// If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
// This helps adjust more quickly when large amounts of data is pending and can avoid going back to
// the selector to check for more data. Going back to the selector can add significant latency for large
// data transfers.
if (bytes == attemptedBytesRead()) {
record(bytes);
}
super.lastBytesRead(bytes);
}
@Override
public int guess() {
return nextReceiveBufferSize;
}
private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
if (decreaseNow) {
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
@Override
public void readComplete() {
record(totalBytesRead());
}
}
继续跟进内部类 HandleImpl 的父类 MaxMessageHandle
其中有如下一个方法:
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
这里的 guess() 就是之前 AdaptiveRecvByteBufAllocator 的内部类 HandleImpl 中的 guess(),返回“预计的”下一个 buffer 分配的大小
调用 alloc.ioBuffer(guess()) 后:
AbstractByteBufAllocator类
@Override
public ByteBuf ioBuffer(int initialCapacity) {
if (PlatformDependent.hasUnsafe() || isDirectBufferPooled()) {
return directBuffer(initialCapacity);
}
return heapBuffer(initialCapacity);
}
这里看到了 Netty 关于分配堆内堆外内存的判断!!!
关于堆内buffer 和 直接 buffer 详见我的另一篇文章:Java NIO:Buffer缓冲区源码详解以及“零拷贝”
private static final Throwable UNSAFE_UNAVAILABILITY_CAUSE = unsafeUnavailabilityCause0();
/**
* Return {@code true} if {@code sun.misc.Unsafe} was found on the classpath and can be used for accelerated
* direct memory access.
*/
public static boolean hasUnsafe() {
return UNSAFE_UNAVAILABILITY_CAUSE == null;
}
Java doc:当在classpath中找到 sun.misc.Unsafe 路径时返回 true,用于加速内存的访问
private static Throwable unsafeUnavailabilityCause0() {
if (isAndroid()) {
logger.debug("sun.misc.Unsafe: unavailable (Android)");
return new UnsupportedOperationException("sun.misc.Unsafe: unavailable (Android)");
}
if (isIkvmDotNet()) {
logger.debug("sun.misc.Unsafe: unavailable (IKVM.NET)");
return new UnsupportedOperationException("sun.misc.Unsafe: unavailable (IKVM.NET)");
}
Throwable cause = PlatformDependent0.getUnsafeUnavailabilityCause();
if (cause != null) {
return cause;
}
try {
boolean hasUnsafe = PlatformDependent0.hasUnsafe();
logger.debug("sun.misc.Unsafe: {}", hasUnsafe ? "available" : "unavailable");
return hasUnsafe ? null : PlatformDependent0.getUnsafeUnavailabilityCause();
} catch (Throwable t) {
logger.trace("Could not determine if Unsafe is available", t);
// Probably failed to initialize PlatformDependent0.
return new UnsupportedOperationException("Could not determine if Unsafe is available", t);
}
}
判断逻辑如下
- 安卓平台不支持
- IKVM.NET(ikvm.net是能够运行在mono和.net framework的java虚拟机)平台不支持
- 在classpath中能否找到 sun.misc.Unsafe 路径,能就返回 null 异常,不能就返回不支持的原因
- isDirectBufferPooled() 判断是否存在初始化的 PoolArena(之后章节再详细解释)
返回堆外内存情况:
继续跟代码如下
UnpooledByteBufAllocator类
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
这里有三个可返回 bytebuf 的方法:
- InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
- InstrumentedUnpooledUnsafeDirectByteBuf
- InstrumentedUnpooledDirectByteBuf
不管哪一个,一层一层的代码跟下去都会出现 new DirectByteBuffer(capacity) 的身影,即创建了直接内存访问的 buffer
返回堆内内存情况:
继续跟代码如下:
UnpooledByteBufAllocator类
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
这里有两个返回 bytebuf 的方法:
- InstrumentedUnpooledUnsafeHeapByteBuf
- InstrumentedUnpooledHeapByteBuf
不管哪一个,一层一层的代码跟下去都会出现 new byte[initialCapacity] 的身影