Netty 源码分析之 二 事件流在ChannlePipeline中的流转

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Java_Jsp_Ssh/article/details/82807528

channelPipeline是一个接口,其实现类为DefaultChannelPipeline;通常像pipeline中添加channelHandler时最终都会调用以下方法:

@Override
public ChannelPipeline addLast(String name, ChannelHandler handler) {
    return addLast(null, name, handler);
}

public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
        synchronized (this) {
            checkDuplicateName(name);

            DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
            addLast0(name, newCtx);
        }

        return this;
    }

将handler与其关联的name,如果name为空,会生成一个handler classNmae + "#0” 形式;最终封装成一个双链表的DefaultChannelHandlerContext节点,添加到pipeline中。在DefaultChannelHandlerContext有 inbound 和 outbound 两个 boolean 变量,private final boolean inbound; private final boolean outbound;代表着不同的事件,而不同的事件对应的事件流向是不相同的;

从netty api中可以看出outBound对应的是stock的写入事件,其事件的流转是从上完下进行的;即首先由Channel发送某种事件数据流,经过一系列的Handler后,通过最后的handler 将事件写出去。这里比如客户端的连接事件或者服务端的bind事件;而inbound事件对应的stock的读取事件,其事件流转的途径是从下往上;即首先从TCP缓存区中读取数据,然后经过不同的Handler进行处理;这里对应IO流监听的读取事件;

Inbound 事件传播方法有:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

Oubound 事件传输方法有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)

注意, 如果我们捕获了一个事件, 并且想让这个事件继续传递下去, 那么需要调用 Context 相应的传播方法.
例如:

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("Connected!");
        ctx.fireChannelActive();
    }
}

public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        System.out.println("Closing ..");
        ctx.close(promise);
    }
}

1 Outbound 事件传播操作;

以客户端connect事件为例:看代码,客户端在发起连接时,会经过一段以下代码:

首先是tail节点,也就是尾节点调用connect方法,在findContextOutbound方法中,查找outbound属性为真的节点,也就是channelOutboundHandler类型的;也就是说outbound事件是有tail开始传播,执行的过程是从上往下执行的;

@Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, localAddress, promise);
    }

public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        validatePromise(promise, false);

        final DefaultChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }

        return promise;
    }

 private DefaultChannelHandlerContext findContextOutbound() {
        DefaultChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

如果没有handler重写connect方法时,最终将会由headHandler去执行connect,操作,因为headHandler节点是outbound为真的节点;

 @Override
        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }

最终还是交给底层工具类unsafe去执行连接操作;

2 inbound 事件传播操作

以上面的连接操作继续讲解,outbound与inbound事件如何联系起来,形成一个环形事件的;当unsafe执行connect方法时,其实是调用其实现类,AbstractNioUnsafe.connect方法;在doConnect中,如果连接成功会执行fulfillConnectPromise方法;接着pipeline().fireChannelActive();而fireChannelActive方法是pileline激活下一个handler的入口方法;

@Override
        public void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            if (!ensureOpen(promise)) {
                return;
            }

            try {
                if (connectPromise != null) {
                    throw new IllegalStateException("connection attempt already made");
                }

                boolean wasActive = isActive();
                if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
                } else {
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    // Schedule connect timeout.
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }

                    promise.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isCancelled()) {
                                if (connectTimeoutFuture != null) {
                                    connectTimeoutFuture.cancel(false);
                                }
                                connectPromise = null;
                                close(voidPromise());
                            }
                        }
                    });
                }
            } catch (Throwable t) {
                if (t instanceof ConnectException) {
                    Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
                    newT.setStackTrace(t.getStackTrace());
                    t = newT;
                }
                promise.tryFailure(t);
                closeIfClosed();
            }
        }
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) {
                // Closed via cancellation and the promise has been notified already.
                return;
            }

            // trySuccess() will return false if a user cancelled the connection attempt.
            boolean promiseSet = promise.trySuccess();

            // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
            // because what happened is what happened.
            if (!wasActive && isActive()) {
                pipeline().fireChannelActive();
            }

            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
            if (!promiseSet) {
                close(voidPromise());
            }
        }
public ChannelPipeline fireChannelActive() {
        head.fireChannelActive();

        if (channel.config().isAutoRead()) {
            channel.read();
        }

        return this;
    }
public ChannelHandlerContext fireChannelActive() {
        final DefaultChannelHandlerContext next = findContextInbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
        return this;
    }

private DefaultChannelHandlerContext findContextInbound() {
        DefaultChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

private void invokeChannelActive() {
    try {
        ((ChannelInboundHandler) handler()).channelActive(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

而fireChannelActive,首先调用的是headHandler 节点的方法,fireChannelActive方法;而该方法中的findContextInbound是从头节点开始查找第一个inbound属性为true的节点;然后调用该headler的channelActive方法;因此inbound事件是有head开始传播,事件流从上往下执行,逐个调用其channelActive方法,通过可以在该方法中,通过调用fireChannelActive方法来触发下一个handler;如果没有自定义类headler,那么inbound事件最终流转到tailHandler,而其channelActive就是个空方法,那么该事件最终无法处理就结束了。

public void channelActive(ChannelHandlerContext ctx) throws Exception { }

总结

对于 Outbound事件:

  • Outbound 事件是请求事件(由 Connect 发起一个请求, 并最终由 unsafe 处理这个请求)

  • Outbound 事件的发起者是 Channel

  • Outbound 事件的处理者是 unsafe

  • Outbound 事件在 Pipeline 中的传输方向是 tail -> head.

  • 在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Hnalder, 则需要调用 ctx.xxx (例如 ctx.connect) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.

  • Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT

对于 Inbound 事件:

  • Inbound 事件是通知事件, 当某件事情已经就绪后, 通知上层.

  • Inbound 事件发起者是 unsafe

  • Inbound 事件的处理者是 Channel, 如果用户没有实现自定义的处理方法, 那么Inbound 事件默认的处理者是 TailContext, 并且其处理方法是空实现.

  • Inbound 事件在 Pipeline 中传输方向是 head -> tail

  • 在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Hnalder, 则需要调用 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.

  • Outbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT

猜你喜欢

转载自blog.csdn.net/Java_Jsp_Ssh/article/details/82807528