Mina 过滤链默认构建器: http://donald-draper.iteye.com/blog/2375985
Mina 过滤器定义: http://donald-draper.iteye.com/blog/2376161
引言:
前面一篇文章我们看了一下过滤器定义,先来回顾一下:
IoFilter添加到过滤链时,一般以ReferenceCountingIoFilter包装,添加到过滤链,init方法在添加到过滤链时,由ReferenceCountingIoFilter调用,所以可以在init方法初始化一些共享资源。如果过滤器没有包装成ReferenceCountingIoFilter,init方法将不会调用。然后调用onPreAdd通知过滤器将会添加到过滤连上,当过滤器添加到过滤链上时,所有IoHandler事件和IO请求,将会被过滤器拦截当过滤器添加到过滤链上后,将会调用onPostAdd,如果方法有异常,过滤器在过滤链的链尾,ReferenceCountingIoFilter将调用#destroy方法释放共享资源。过滤器IoFilter,主要是监听会话IoSession相关事件(创建,打开,空闲,异常,关闭,接受数据,发送数据) 及IoSesssion的Write与close事件;过滤器后继NextFilter关注的事件与过滤器IoFilter相同,主要是转发相关事件。WriteRequest是会话IoSession写操作write的包装,内部有一个消息对象用于存放write的内容,一个socket地址,即会话写请求的目的socket地址,一个写请求结果返回值WriteFuture,用于获取会话write消息的操作结果。当一个过滤器从过滤链上移除时,#onPreRemove被调用,用于通知过滤器将从过滤链上移除;如果过滤器从过滤链上移除,所有IoHandler事件和IO请求,过滤器不再拦截;#onPostRemove调用通知过滤器已经从过滤链上移除;移除后,如果过滤器在过滤链的链尾,ReferenceCountingIoFilter将调用#destroy方法释放共享资源。
IoFilter生命周期如下:
inti->onPreAdd->onPostAdd->(拦截IoHandler相关事件:sessionCreated,Opened,Idle,
exceptionCaught,Closed,messageSent,messageReceived;会话相关事件:filterWrite,filterClose)->onPreRemove
->onPostRemove->destroy。
在TCP简单通信实例这篇文章中我们用到的Mina的日志过滤器LoggingFilter如下:
//配置过滤器
DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = acceptor.getFilterChain(); LoggingFilter loggingFilter = new LoggingFilter(); defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter);
今天我们来看一下LoggingFilter。
package org.apache.mina.filter; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.util.SessionLog; import org.slf4j.Logger; /** * Logs all MINA protocol events to {@link Logger}. * * @author The Apache Directory Project ([email protected]) * @version $Rev$, $Date$ * * @see SessionLog */ public class LoggingFilter extends IoFilterAdapter { /** * Session attribute key: prefix string */ public static final String PREFIX = SessionLog.PREFIX; /** * Session attribute key: {@link Logger} */ public static final String LOGGER = SessionLog.LOGGER; /** * Creates a new instance. */ public LoggingFilter() { } //拦截IoHandler的sessionCreated事件,将日志输出委托给SessionLog, //然后将相关事件传递给过滤器后继,拦截sessionOpened,sessionClosed思路基本相同 public void sessionCreated(NextFilter nextFilter, IoSession session) { SessionLog.info(session, "CREATED"); nextFilter.sessionCreated(session); } public void sessionOpened(NextFilter nextFilter, IoSession session) { SessionLog.info(session, "OPENED"); nextFilter.sessionOpened(session); } public void sessionClosed(NextFilter nextFilter, IoSession session) { SessionLog.info(session, "CLOSED"); nextFilter.sessionClosed(session); } public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) { //先判断会话日志Info级别是否开启 if (SessionLog.isInfoEnabled(session)) { SessionLog.info(session, "IDLE: " + status); } nextFilter.sessionIdle(session, status); } public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) { if (SessionLog.isWarnEnabled(session)) { SessionLog.warn(session, "EXCEPTION:", cause); } nextFilter.exceptionCaught(session, cause); } public void messageReceived(NextFilter nextFilter, IoSession session, Object message) { if (SessionLog.isInfoEnabled(session)) { SessionLog.info(session, "RECEIVED: " + message); } nextFilter.messageReceived(session, message); } public void messageSent(NextFilter nextFilter, IoSession session, Object message) { if (SessionLog.isInfoEnabled(session)) { SessionLog.info(session, "SENT: " + message); } nextFilter.messageSent(session, message); } public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) { if (SessionLog.isInfoEnabled(session)) { SessionLog.info(session, "WRITE: " + writeRequest); } nextFilter.filterWrite(session, writeRequest); } public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { SessionLog.info(session, "CLOSE"); nextFilter.filterClose(session); } }
LoggingFilter拦截IoHandler的sessionCreated事件,将日志输出委托给SessionLog,
然后将相关事件传递给过滤器后继,拦截sessionOpened,sessionClosed,filterClose事件思路基本相同。对于sessionIdle,messageSent,messageReceived,filterWrite先判断会话log的info级别是否开启,开启则输出相应事件日志。上面这些事件的日志级别都是Info;exceptionCaught则先判断会话log的warn级别是否开启,开启则输出相应事件日志。
再来看SessionLog:
package org.apache.mina.util; import org.apache.mina.common.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; //从包引入来看MINA默认用slf4j日志组件 /** * Provides utility methods to log protocol-specific messages. * <p> * Set {@link #PREFIX} and {@link #LOGGER} session attributes * to override prefix string and logger. * * @author The Apache Directory Project ([email protected]) * @version $Rev$, $Date$ * */ public class SessionLog { /** * Session attribute key: prefix string */ public static final String PREFIX = SessionLog.class.getName() + ".prefix"; /** * Session attribute key: {@link Logger} */ public static final String LOGGER = SessionLog.class.getName() + ".logger"; //获取会话IoHandler类型 private static Class getClass(IoSession session) { return session.getHandler().getClass(); } //Debug输出,如果会话日志开启debug级别,则输出debug日志 public static void debug(IoSession session, String message) { Logger log = getLogger(session); if (log.isDebugEnabled()) { log.debug(String.valueOf(session.getAttribute(PREFIX)) + message); } } public static void debug(IoSession session, String message, Throwable cause) { Logger log = getLogger(session); if (log.isDebugEnabled()) { log.debug(String.valueOf(session.getAttribute(PREFIX)) + message, cause); } } //获取会话日志 private static Logger getLogger(IoSession session) { Logger log = (Logger) session.getAttribute(LOGGER); if (log == null) { //如果会话日志为空,则从LoggerFactory获取Logger log = LoggerFactory.getLogger(getClass(session)); String prefix = (String) session.getAttribute(PREFIX); if (prefix == null) { prefix = "[" + session.getRemoteAddress() + "] "; session.setAttribute(PREFIX, prefix); } //将日志log,添加到会话中 session.setAttribute(LOGGER, log); } return log; } //判断log的debug级别是否开启 public static boolean isDebugEnabled(IoSession session) { return getLogger(session).isDebugEnabled(); } /*下面的info,warn,error与debug的思想是一致的这里不再说 */ public static void info(IoSession session, String message) { Logger log = getLogger(session); if (log.isInfoEnabled()) { log.info(String.valueOf(session.getAttribute(PREFIX)) + message); } } public static void info(IoSession session, String message, Throwable cause) { Logger log = getLogger(session); if (log.isInfoEnabled()) { log.info(String.valueOf(session.getAttribute(PREFIX)) + message, cause); } } public static void warn(IoSession session, String message) { Logger log = getLogger(session); if (log.isWarnEnabled()) { log.warn(String.valueOf(session.getAttribute(PREFIX)) + message); } } public static void warn(IoSession session, String message, Throwable cause) { Logger log = getLogger(session); if (log.isWarnEnabled()) { log.warn(String.valueOf(session.getAttribute(PREFIX)) + message, cause); } } public static void error(IoSession session, String message) { Logger log = getLogger(session); if (log.isErrorEnabled()) { log.error(String.valueOf(session.getAttribute(PREFIX)) + message); } } public static void error(IoSession session, String message, Throwable cause) { Logger log = getLogger(session); if (log.isErrorEnabled()) { log.error(String.valueOf(session.getAttribute(PREFIX)) + message, cause); } } public static boolean isInfoEnabled(IoSession session) { return getLogger(session).isInfoEnabled(); } public static boolean isWarnEnabled(IoSession session) { return getLogger(session).isWarnEnabled(); } public static boolean isErrorEnabled(IoSession session) { return getLogger(session).isErrorEnabled(); } }
在TCP简单通信实例这篇文章中我们创建过一个测试的过滤器TestFilter,通过ReferenceCountingFilter包装添加
过滤链上:
TestFilter testFilter = new TestFilter(); ReferenceCountingFilter referenceCountingFilter = new ReferenceCountingFilter(testFilter); defaultIoFilterChainBuilder.addLast("testFilter",referenceCountingFilter);
在前面过滤器的文章我们也提到过ReferenceCountingFilter,下面我们再来看一下ReferenceCountingFilter
package org.apache.mina.filter; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilter; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoSession; /** * An {@link IoFilter}s wrapper that keeps track of the number of usages of this filter and will call init/destroy * when the filter is not in use. *ReferenceCountingIoFilter用于包装过滤器,用于记录过滤器的使用量, 当过滤器添加到过滤链上时,调用过滤器的init方法,从过滤链上移除时,调用过滤器的destroy方法。 * @author The Apache Directory Project ([email protected]) * @version $Rev$, $Date$ */ public class ReferenceCountingIoFilter implements IoFilter { private final IoFilter filter;//包装的过滤器 private int count = 0;//过滤器使用计数器 public ReferenceCountingIoFilter(IoFilter filter) { this.filter = filter; } public void init() throws Exception { // no-op, will init on-demand in pre-add if count == 0 } public void destroy() throws Exception { //no-op, will destroy on-demand in post-remove if count == 0 } public void onPostAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { filter.onPostAdd(parent, name, nextFilter); } public synchronized void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { if (0 == count) { //当过滤器首次添加到过滤链上时,调用过滤器的init方法。 filter.init(); ++count; } filter.onPreAdd(parent, name, nextFilter); } public void onPreRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { filter.onPreRemove(parent, name, nextFilter); } public synchronized void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { filter.onPostRemove(parent, name, nextFilter); --count; //当过滤器从过滤链上完全移除时,调用过滤器的destroy方法。 if (0 == count) { filter.destroy(); } } public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { filter.exceptionCaught(nextFilter, session, cause); } public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { filter.filterClose(nextFilter, session); } public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { filter.filterWrite(nextFilter, session, writeRequest); } public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { filter.messageReceived(nextFilter, session, message); } public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception { filter.messageSent(nextFilter, session, message); } public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { filter.sessionClosed(nextFilter, session); } public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { filter.sessionCreated(nextFilter, session); } public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception { filter.sessionIdle(nextFilter, session, status); } public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { filter.sessionOpened(nextFilter, session); } }
一个过滤器可以多次添加到过滤链上,如何保证过滤器第一次添加到过滤链上时,初始化过滤器,完全从过滤链上移除时,销毁过滤器,释放资源?这就是ReferenceCountingIoFilter的作用,ReferenceCountingIoFilter同时也是一个过滤器,内部一个count用于记录过滤器filter添加到过滤链上的次数,即过滤链上存在filter的个数,一个filter,即ReferenceCountingIoFilter包装的过滤器,触发IoHandler和IoSession的相关事件,直接交给包装的内部filter处理。
总结:
LoggingFilter拦截IoHandler的sessionCreated事件,将日志输出委托给SessionLog,然后将相关事件传递给过滤器后继,拦截sessionOpened,sessionClosed,
filterClose事件思路基本相同。对于sessionIdle,messageSent,messageReceived,
filterWrite先判断会话log的info级别是否开启,开启则输出相应事件日志。上面这些事件的日志级别都是Info;exceptionCaught则先判断会话log的warn级别是否开启,开启则输出相应
事件日志。
一个过滤器可以多次添加到过滤链上,ReferenceCountingIoFilter用于保证过滤器第一次添加到过滤链上时,初始化过滤器,完全从过滤链上移除时,销毁过滤器,释放资源;ReferenceCountingIoFilter内部一个count用于记录包装过滤器filter添加到过滤链上的次数,即过滤链上存在filter的个数,一个filter,即ReferenceCountingIoFilter包装的过滤器,ReferenceCountingIoFilter触发IoHandler和IoSession的相关事件,直接交给包装的内部filter处理。