1、Netty启动源码剖析
启动类:
public class NettyNioServer { public static void main(String[] args) throws Exception { /** *创建两个线程组bossGroup和workGroup,bossGroup负责处理请求连接,workGroup负责数据的处理 *两个都是无线循环 *调用可构造方法,默认的字线程数NioEventLoopGroup是实际cpu核数*2 */ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(); try{ //创建启动器 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workGroup)//设置两个线程组 .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现 .option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到的连接数 .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动的连接状态
.handler(new LoggingHandler(LogLevel.INFO)//加入日志 .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道测试对象 //给pipeline设置处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyHandelServer());//调用处理器 } }); //启动服务器并绑定端口,绑定端口并同步,创建一个ChannelFuture对象 ChannelFuture channelFuture = bootstrap.bind(7777).sync(); //加监听器 channelFuture.addListener((future)->{ if(channelFuture.isSuccess()){ System.out.println("服务器启动成功"); }else{ System.out.println("服务器启动失败"); } }); //对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally {
//关闭 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
调用可构造方法,默认的字线程数NioEventLoopGroup是实际cpu核数*2
看源码:根据debug:我们默认的构造方法不传值,就是默认为0
/** * Create a new instance using the default number of threads, the default {@link ThreadFactory} and * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup() { this(0); }
我们继续看代码,可以看到这段代码:当nThreads == 0 时,会去获取DEFAULT_EVENT_LOOP_THREADS值,那么看看DEFAULT_EVENT_LOOP_THREADS的值
/** * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...) */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
DEFAULT_EVENT_LOOP_THREADS的值等于取值中的较大数,NettyRuntime.availableProcessors()获取系统的核数
private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } }
所以我这里workGroup的线程数是8。
接下来继续debug,可以来到MultithreadEventExecutorGroup方法,这里创建NioEventLoopGroup对象
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance 使用的线程数,默认为core *2 * @param executor the Executor to use, or {@code null} if the default should be used. 执行器:如果传入null,则采用Netty默认的线程工厂和默认的执行器ThreadPerTaskExecutor * @param chooserFactory the {@link EventExecutorChooserFactory} to use. 单例new DefaultEventExecutorChooserFactory() * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call args在创建执行器的时候传入固定参数 */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
//private final EventExecutor[] children;
//创建执行器对象,workGroup的nthreads是8
children = new EventExecutor[nThreads];
//循环创建 for (int i = 0; i < nThreads; i ++) { boolean success = false; try {
//传入的执行器默认是ThreadPerTaskExecutor,进入newChild方法,会返回一个NioEventLoopGroup对象
//创建指定的线程数的执行器组 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally {
//创建失败,关闭 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } //创建选择器 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception {
//juc下的原子类方法,线程安全 if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; //给每一个执行器添加监听器 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
//将NioEvnetLoopGroup加入链表中 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
children[i] = newChild(executor, args);这里返回一个NioEventLoopGroup对象:
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
接下来看bootstrap启动器:{@link Bootstrap} sub-class which allows easy bootstrap of {@link ServerChannel},用来启动ServerChannel
bootstrap.group()方法,传入两个NioEventGroup,父类交由本类的父类处理,子类该本类中处理
/** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */ public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//上级父类处理 super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); }
//childGroup是EventLoopGroup是用volatile关键字修饰的对象
this.childGroup = childGroup; return this; }
channel()方法,这里返回一个ReflectiveChannelFactory,里面有一个Class对象 class io.netty.channel.socket.nio.NioServerSocketChannel,这时候还没有创建channel
在ReflectiveChannelFactory类中:有一个创建channel的方法,使用java反射进制,通过Class名调用newInstance()方法,那么NioserverSocket对象是什么时候创建的?
@Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
根据后面debug可知,是在绑定端口的时候bootstrap.bind(7777).sync();
final ChannelFuture initAndRegister() { Channel channel = null; try {
//创建对象并初始化 channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
option()和childIOption()方法:
public <T> B option(ChannelOption<T> option, T value) { if (option == null) { throw new NullPointerException("option"); } if (value == null) { synchronized (options) { options.remove(option); } } else { synchronized (options) { options.put(option, value); } } return (B) this; }
这个group其实是bossGroup对象,也就是option()方法是给bossGroup设置,那么childGroup应该就是给workGroup设置了,我们debug一下:
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) { if (childOption == null) { throw new NullPointerException("childOption"); } if (value == null) { synchronized (childOptions) { childOptions.remove(childOption); } } else { synchronized (childOptions) { childOptions.put(childOption, value); } } return this; }
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class); private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>(); private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>(); private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); private volatile EventLoopGroup childGroup; private volatile ChannelHandler childHandler;
看这里的group的具体对象的确是workGroup对象。
看上面的ServerBootStrapConfig对象的内容:
handler()和childHandler()方法:从上面的 option()和childIOption()方法可以猜到,这两个方法分别作用于bossGroup和workGroup。
handler():
childHandler:
config:
serverBootStrap基本点:
1)链式调用: group方法,将boss和worker传入,boss赋值给parentGroup属性,worker 赋值给childGroup属性
2) channel 方法传入NioServerSocketChannel class对象。会根据这个class 创建channel 对象。
3) option方法传入TCP参数,放在一个LinkedHashMap中。
4) handler 方法传入一个个handler 中,这个hanlder只专属于ServerSocketChannel 而不是SocketChannel
5)childHandler 传入一个hanlder ,这个handler 将会在每个客户端连接的时候调用。供SocketChannel 使用。