本文主要现实mina的自定义协议,并且实现服务器和客户端的简单数据交互。
"mina协议的自定义"可参考本博Mina相关文章。
正题,所需要的基础类:
抽象协议类
请求协议
响应协议
(需要定制自己的协议格式)
协议编码解码工厂
协议编码
协议解码
客户端
客户端Handler
服务器
服务器Handler
/** * 消息协议 * * @author Simple * */ public abstract class JAbsMessageProtocal { public abstract byte getTag();// 消息协议类型 请求/响应 public abstract int getLength();// 消息协议数据长度 } /** * 报头: * short tag:请求/响应 * int length:数据长度 * 报体: * short methodCode:功能函数 * byte resultCode:结果码 * String content:数据内容 */
/** * 消息协议-请求 * * @author Simple * */ public class JMessageProtocalReq extends JAbsMessageProtocal { private short functionCode;// 功能代码 private String content;// 请求内容 @Override public int getLength() { return 2 + (content == null ? 0 : content.getBytes().length); } @Override public byte getTag() { return JConstant.REQ; } public void setFunctionCode(short functionCode) { this.functionCode=functionCode; } public short getFunctionCode() { return functionCode; } public void setContent(String content) { this.content=content; } public String getContent() { return content; } @Override public String toString() { return "JMessageProtocalReq [content=" + content + ", functionCode=" + functionCode + ", getLength()=" + getLength() + ", getTag()=" + getTag() + "]"; } }
/** * 消息协议-响应 * * @author Simple * */ public class JMessageProtocalRes extends JAbsMessageProtocal { private byte resultCode;// 结果码 private String content;// 响应内容 @Override public int getLength() { return 1 + (getContent() == null ? 0 : getContent().getBytes().length); } @Override public byte getTag() { return JConstant.RES; } public void setResultCode(byte resultCode) { this.resultCode=resultCode; } public byte getResultCode() { return resultCode; } public void setContent(String content) { this.content=content; } public String getContent() { return content; } @Override public String toString() { return "JMessageProtocalRes [content=" + content + ", resultCode=" + resultCode + ", getLength()=" + getLength() + ", getTag()=" + getTag() + "]"; } }
/** * JMessageProtocal解码编码工厂 * * @author Simple * */ public class JMessageProtocalCodecFactory implements ProtocolCodecFactory { private final JMessageProtocalDecoder decoder; private final JMessageProtocalEncoder encoder; public JMessageProtocalCodecFactory(Charset charset) { this.decoder=new JMessageProtocalDecoder(charset); this.encoder=new JMessageProtocalEncoder(charset); } public ProtocolDecoder getDecoder(IoSession paramIoSession) throws Exception { return decoder; } public ProtocolEncoder getEncoder(IoSession paramIoSession) throws Exception { return encoder; } }
/** * JMessageProtocal解码 * @author Simple * */ public class JMessageProtocalDecoder extends ProtocolDecoderAdapter { private Logger log=Logger.getLogger(JMessageProtocalDecoder.class); private Charset charset; public JMessageProtocalDecoder(Charset charset) { this.charset=charset; } /** * 解码 */ public void decode(IoSession session, IoBuffer buf, ProtocolDecoderOutput out) throws Exception { JAbsMessageProtocal absMP=null; // 获取协议tag byte tag=buf.get(); // 获取协议体长度 int length=buf.getInt(); // 取出协议体 byte[] bodyData=new byte[length]; buf.get(bodyData); // 为解析数据做准备 // 检测协议 IoBuffer tempBuf=IoBuffer.allocate(100).setAutoExpand(true); tempBuf.put(tag); tempBuf.putInt(length); tempBuf.put(bodyData); tempBuf.flip(); if(!canDecode(tempBuf)) { return; } // 协议体buf IoBuffer bodyBuf=IoBuffer.allocate(100).setAutoExpand(true); bodyBuf.put(bodyData); bodyBuf.flip(); // 整个协议buf IoBuffer allBuf=IoBuffer.allocate(100).setAutoExpand(true); allBuf.put(tag); allBuf.putInt(length); allBuf.put(bodyData); allBuf.flip(); // if(tag == JConstant.REQ) { JMessageProtocalReq req=new JMessageProtocalReq(); short functionCode=bodyBuf.getShort(); String content=bodyBuf.getString(charset.newDecoder()); req.setFunctionCode(functionCode); req.setContent(content); absMP=req; } else if(tag == JConstant.RES) { JMessageProtocalRes res=new JMessageProtocalRes(); byte resultCode=bodyBuf.get(); String content=bodyBuf.getString(charset.newDecoder()); res.setResultCode(resultCode); res.setContent(content); absMP=res; } else { log.error("未定义的Tag"); } out.write(absMP); } // 是否可以解码 private boolean canDecode(IoBuffer buf) { int protocalHeadLength=5;// 协议头长度 int remaining=buf.remaining(); if(remaining < protocalHeadLength) { log.error("错误,协议不完整,协议头长度小于" + protocalHeadLength); return false; } else { log.debug("协议完整"); // 获取协议tag byte tag=buf.get(); if(tag == JConstant.REQ || tag == JConstant.RES) { log.debug("Tag=" + tag); } else { log.error("错误,未定义的Tag类型"); return false; } // 获取协议体长度 int length=buf.getInt(); if(buf.remaining() < length) { log.error("错误,真实协议体长度小于消息头中取得的值"); return false; } else { log.debug("真实协议体长度:" + buf.remaining() + " = 消息头中取得的值:" + length); } } return true; } }
/** * JMessageProtocal编码 * @author Simple * */ public class JMessageProtocalEncoder extends ProtocolEncoderAdapter { private Charset charset; public JMessageProtocalEncoder(Charset charset) { this.charset=charset; } /** * 编码 */ public void encode(IoSession session, Object object, ProtocolEncoderOutput out) throws Exception { // new buf IoBuffer buf=IoBuffer.allocate(2048).setAutoExpand(true); // object --> AbsMP JAbsMessageProtocal absMp=(JAbsMessageProtocal)object; buf.put(absMp.getTag()); buf.putInt(absMp.getLength()); if(object instanceof JMessageProtocalReq) {// 请求协议 JMessageProtocalReq mpReq=(JMessageProtocalReq)object; buf.putShort(mpReq.getFunctionCode()); buf.putString(mpReq.getContent(), charset.newEncoder()); } else if(object instanceof JMessageProtocalRes) {// 响应协议 JMessageProtocalRes mpRes=(JMessageProtocalRes)object; buf.put(mpRes.getResultCode()); buf.putString(mpRes.getContent(), charset.newEncoder()); } buf.flip(); out.write(buf); } }
/** * MINA 客户端 * * @author Simple * */ public class MainClient { @SuppressWarnings("unused") private static Logger log=Logger.getLogger(MainClient.class); private static final int PORT=9999; public static void main(String[] args) { NioSocketConnector connector=new NioSocketConnector(); DefaultIoFilterChainBuilder chain=connector.getFilterChain(); chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8")))); connector.setHandler(new MinaClientHandler()); connector.setConnectTimeoutMillis(3000); ConnectFuture cf=connector.connect(new InetSocketAddress("localhost", PORT)); cf.awaitUninterruptibly();// 等待连接创建完成 JMessageProtocalReq req=new JMessageProtocalReq(); req.setFunctionCode((short)1); req.setContent("hello world!!!"); cf.getSession().write(req); cf.getSession().getCloseFuture().awaitUninterruptibly();// 等待连接断开 connector.dispose(); } }
/** * MINA 客户端消息处理 * * @author Simple * */ public class MinaClientHandler extends IoHandlerAdapter { private Logger log=Logger.getLogger(MinaClientHandler.class); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { log.error(String.format("Client产生异常!")); } @Override public void messageReceived(IoSession session, Object message) throws Exception { log.debug(String.format("来自Server[%s]的消息:%s", session.getRemoteAddress(), message.toString())); } @Override public void messageSent(IoSession session, Object message) throws Exception { log.debug(String.format("向Server[%s]发送消息:%s", session.getRemoteAddress(), message.toString())); } @Override public void sessionClosed(IoSession session) throws Exception { log.debug(String.format("与Server[%s]断开连接!", session.getRemoteAddress())); } @Override public void sessionCreated(IoSession session) throws Exception { log.debug(String.format("与Server[%s]建立连接!", session.getRemoteAddress())); } @Override public void sessionOpened(IoSession session) throws Exception { log.debug(String.format("与Server[%s]打开连接!", session.getRemoteAddress())); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { log.debug(String.format("Client进入空闲状态!")); } }
/** * MINA 服务器 * * @author Simple * */ public class MainServer { private static Logger log=Logger.getLogger(MainServer.class); private static final int PORT=9999; public static void main(String[] args) throws Exception { SocketAcceptor acceptor=new NioSocketAcceptor();// tcp/ip 接收者 DefaultIoFilterChainBuilder chain=acceptor.getFilterChain();// 过滤管道 chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8")))); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 读写通道10s内无操作进入空闲状态 acceptor.setHandler(new MinaServerHandler());// 设置handler acceptor.bind(new InetSocketAddress(PORT));// 设置端口 log.debug(String.format("Server Listing on %s", PORT)); } }
/** * MINA 服务器消息处理 * * @author Simple * */ public class MinaServerHandler extends IoHandlerAdapter { private Logger log=Logger.getLogger(MinaServerHandler.class); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { log.error(String.format("Server产生异常!")); } @Override public void messageReceived(IoSession session, Object message) throws Exception { log.debug(String.format("来自Client[%s]的消息:%s", session.getRemoteAddress(), message.toString())); } @Override public void messageSent(IoSession session, Object message) throws Exception { log.debug(String.format("向Client[%s]发送消息:%s", session.getRemoteAddress(), message.toString())); } @Override public void sessionClosed(IoSession session) throws Exception { log.debug(String.format("Client[%s]与Server断开连接!", session.getRemoteAddress())); } @Override public void sessionCreated(IoSession session) throws Exception { log.debug(String.format("Client[%s]与Server建立连接!", session.getRemoteAddress())); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { log.debug(String.format("Server进入空闲状态!")); } @Override public void sessionOpened(IoSession session) throws Exception { log.debug(String.format("Client[%s]与Server打开连接!", session.getRemoteAddress())); } }
如果想深入了解mina,我感觉还得去读一下mina的源码,会有很大的收获。