服务端Mina线程关系和数据流动分析



一  线程关系

NioSocketAcceptor类

线程数量:1

         线程名格式:NioSocketAcceptor-1

创建Runnable类:AbstractPollingIoAcceptor. Acceptor

         用途:

         负责处理Selector选择器获得的新建连接,为信道包装IoSession、为IoSession指派唯一的NioProcessor线程、为IoSession设置初始Attributes、初始化IoSession域变量、将IoSession加入到NioProcessor的newSessions列队,最后激活NioProcessor类的处理线程。

         注意事项:创建NioSocketAcceptor对象时可以传递int参数作为NioProcessor的线程数量。NioSocketAcceptor用SimpleIoProcessorPool包装NioProcessor来控制线程数量。SimpleIoProcessorPool传递ThreadPoolExecutor给NioProcessor的构造方法,用于NioProcessor申请线程,并创建NioProcessor对象将Pool装满。



NioProcessor类

默认线程数量:cpu核心数+1

线程名格式NioProcessor-85

创建Runnable类:AbstractPollingIoProcessor.Processor

    用途:

1 将Selector选择到的可读状态的IoSession对应的SocketChannel读取缓冲区内容读取到一个新的IoBuffer中,并激活IoFilterChain. fireMessageReceived(buf)事件,将IoBuffer传递给过滤器链。

2 将Selector选择到可写状态的IoSession加入到flushingSessions列队,等待另外一个线程负责写入操作。

3 将NioSocketAcceptor类添加到newSessions列队中的IoSession注册到自己的Selector的OP_READ状态、建立IoSession上的过滤器链、调用IoServiceListenerSupport.fireSessionCreated(IoSession)以激活过滤器链的sessionCreated(IoSession)方法。

注意事项:

由于这里为每个IoSession指定了唯一的NioProcessor线程,所以单个IoSession上的读取数据解码、过滤器链的消息接受事件触发一定是在同一个线程中。也就是说一个多个IoSession被绑定在一个NioProcessor的线程上。分配策略为简单循环分配。



ExecutorFilter类(可选,可大幅提高性能)

默认线程数量:16

创建Runnable类:用指定线程工厂包装参数

    用途:

    在FilterChain最后,负责将事件从NioProcessor线程分发至业务线程,再触发事件,可避免业务逻辑占用IO线程。

    建议:

    为避免过长时间占用IO线程、避免IO线程参与业务逻辑,应将IoBuffer->Message的转换工作放在ExecutorFilter触发之后。

    注意事项:

IoHandler的sessionCreated(IoSession)应该尽量简短、保证尽快完成,因为NioSocketAcceptor是先将IoSession注册到Selector的OP_READ状态,由于业务线程和IO线程分开,很可能在sessionCreated链完成操作之前被其他线程触发接sessionOpened或messageReceived,导致数据未初始化完成。







二  数据流动分析(业务消息)

接收过程的三次线程切换

1 客户端->服务器底层缓冲区->等待Selector选中OP_READ。

2 NioProcessor线程处理可读状态IoSession并触发。IoFilterChain.fireMessageReceived(Object);ExecutorFilter调用OrderedThreadPoolExecutor.execute(Runnable)用IoFilterEvent包装消息,并加入IoSession.getAttribute(TASKS_QUEUE)列队,等其他线程处理TASKS_QUEUE列队。

3 业务逻辑线程读取TASKS_QUEUE列队并执行IoFilterEvent.fire()方法,继续执行后续Filter和IoHandler事件。


发送过程的三次线程切换

1 业务逻辑调用IoSession.write(Object)。IoSession将参数包装为DefaultWriteRequest,并触发IoFilterChain.fireFilterWrite(WriteRequest),org.apache.mina.core.filterchain.DefaultIoFilterChain.HeadFilter.filterWrite(NextFilter, IoSession, WriteRequest)方法将消息加入IoSession. writeRequestQueue列队,等待其他线程处理消息发送。

2 NioProcessor的Selector取出OP_WRITE状态的IoSession;再从中取出有消息要写入的IoSession,清除其OP_WRITE状态,如果IoSession.currentWriteRequest不存在,从IoSession. writeRequestQueue取出一条消息,设置为IoSession.currentWriteRequest准备发送;但由于缓冲区余量等原因,不保证每次调用都能发出消息,更不保证能清空IoSession. writeRequestQueue(输出缓冲区为输入缓冲区的1.5倍);根据结果设置IoSession的OP_WTIRE。

3 服务器底层缓冲区->客户端

猜你喜欢

转载自duzc2.iteye.com/blog/1522633