设置消息的长度:buffer.writeInt(classByte.length);
写入消息内容:buffer.writeBytes(classByte);
传输内容:channel.write(buffer);
package hotswap.client; import hotswap.JavacTool; import hotswap.NamedThreadFactory; import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.Arrays; import java.util.List; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; public class HotSwapClient { // 因ChannelFactory的关闭有DirectMemory泄露,采用静态化规避 // https://issues.jboss.org/browse/NETTY-424 private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory( Executors.newSingleThreadExecutor(new NamedThreadFactory("HotSwapClient_Boss", true)), Executors.newSingleThreadExecutor(new NamedThreadFactory("HotSwapClient_Worker", true)), 1); public static void main(String[] args) { send(connect()); } public static void send(ChannelFuture future) { if (future.isCancelled()) { // Connection attempt cancelled by user System.out.println("isCanncelled"); } else if (!future.isSuccess()) { future.getCause().printStackTrace(); System.out.println("isNotSuccess"); } else { // Connection established successfully Channel channel = future.getChannel(); channel.setInterestOps(Channel.OP_READ_WRITE); // 编译参数 List<String> otherArgs = Arrays.asList("-classpath", HotSwapClient.class.getProtectionDomain().getCodeSource().getLocation().toString()); // 编译 byte[] classByte = JavacTool.callJavac(otherArgs, "test.HotSwap"); ChannelBuffer buffer = ChannelBuffers.buffer(classByte.length + 4); buffer.writeInt(classByte.length); buffer.writeBytes(classByte); channel.write(buffer); } } public static ChannelFuture connect() { ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new SimpleChannelHandler() { @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelConnected"); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); System.out.println(buffer.toString(Charset.forName("UTF-8"))); System.out.flush(); e.getChannel().close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { System.out.println("client exceptionCaught"); e.getCause().printStackTrace(); e.getChannel().close(); } }); } }); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 20001)); future.awaitUninterruptibly(); return future; } }
服务端
读取消息长度:int msgBodyLen = buffer.readInt();
判断是否相等:if (buffer.readableBytes() >= msgBodyLen )
package hotswap.server; import hotswap.JavaClassExecuter; import hotswap.NamedThreadFactory; import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.Bootstrap; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.codec.frame.FrameDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * The HotSwapServer represents * @version $Id$ * @author fengjiachun */ public class HotSwapServer extends AbstractNettyServer { private static final Logger LOG = LoggerFactory.getLogger(HotSwapServer.class); static { try { new HotSwapServer().startServer(); } catch (Exception e) { LOG.error("start HotSwapServer error {}", e); } } @Override public Bootstrap createServerBootstrap() { ExecutorService boss = Executors.newSingleThreadExecutor(new NamedThreadFactory("HotSwapServer_Boss")); ExecutorService worker = Executors.newSingleThreadExecutor(new NamedThreadFactory("HotSwapServer_Worker")); this.serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(boss, worker, 1)); return this.serverBootstrap; } @Override public TRANSMISSION_PROTOCOL getTransmissionProtocol() { return TRANSMISSION_PROTOCOL.TCP; } @Override public void startServer() throws Exception { startServer(20001); } @Override public void startServer(int port) throws Exception { InetSocketAddress socketAddress = new InetSocketAddress(port); startServer(socketAddress); } @Override public void startServer(InetSocketAddress socketAddress) throws Exception { setPipelineFactory(new HotSwapPipelineFactory()); String[] optionsList = new String[2]; optionsList[0] = "child.tcpNoDelay"; // 关闭Nagle算法 optionsList[1] = "child.keepAlive"; // TCP定期发送心跳包,应用层不应该依赖这个选项,应用层有自己的心跳机制 configureServerBootStrap(optionsList); try { ((ServerBootstrap) this.serverBootstrap).bind(socketAddress); if (LOG.isInfoEnabled()) { LOG.info("HotSwapServer start, port=" + socketAddress.getPort()); } } catch (ChannelException e) { LOG.error("Unable to start HotSwapServer due to error {}", e); throw e; } } class HotSwapPipelineFactory implements ChannelPipelineFactory { private SimpleChannelHandler messageReceivedHandler = new SimpleChannelHandler() { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { byte[] classByte = (byte[]) e.getMessage(); // 执行传过来的字节 String resultMsg = JavaClassExecuter.execute(classByte); byte[] resultByte = resultMsg.getBytes(Charset.forName("UTF-8")); ChannelBuffer buffer = ChannelBuffers.buffer(resultByte.length); buffer.writeBytes(resultByte); e.getChannel().write(buffer); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { LOG.error("HotSwap Exception Caught: {}. Going to close channel.", e.getCause()); e.getChannel().close(); } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { if (LOG.isInfoEnabled()) { LOG.info("HotSwap Connected Channel with ip: {}.", e.getChannel().getRemoteAddress()); } } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { if (LOG.isInfoEnabled()) { LOG.info("HotSwap Disconnected Channel with ip: {}.", e.getChannel().getRemoteAddress()); } } }; @Override public ChannelPipeline getPipeline() throws Exception { return addHandlers(Channels.pipeline()); } public ChannelPipeline addHandlers(ChannelPipeline pipeline) { if (null == pipeline) { return null; } pipeline.addLast("hotSwapDecoder", new FrameDecoder() { @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { if (buffer.readableBytes() >= 4) { buffer.markReaderIndex(); // 标记ReaderIndex int msgBodyLen = buffer.readInt(); // 前四个字节存放消息的长度 if (buffer.readableBytes() >= msgBodyLen) { ChannelBuffer dst = ChannelBuffers.buffer(msgBodyLen); buffer.readBytes(dst, msgBodyLen); return dst.array(); } else { buffer.resetReaderIndex(); return null; } } return null; } }); pipeline.addLast("hotSwapHandler", messageReceivedHandler); return pipeline; } } }
转自: http://budairenqin.iteye.com/blog/1788839