Nio通讯框架学习: http://guojuanjun.blog.51cto.com/277646/841342/
Netty是什么: http://lippeng.iteye.com/blog/1907279
Java NIO框架Netty教程: http://blog.csdn.net/kobejayandy/article/details/11495509
netty初步,与各个版本的比较 : http://blog.csdn.net/u010154380/article/details/46988269
Netty版本升级血泪史之线程篇: http://www.infoq.com/cn/articles/netty-version-upgrade-history-thread-part/
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。具体的我们就不在详讲,参看上面的引用连接。
从今天起,我们从netty的几个实例来看,如何构建网络通信,本文中的所用的netty版本为4.1.12,相关的源码github地址为 https://github.com/Donaldhan/netty_demo:
实例1,简单的消息通信应用:
服务端:
package netty.main.echo; import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import netty.handler.echo.EchoServerHandler; /** * Discards any incoming data. * @author donald * 2017年6月16日 * 上午9:39:53 */ public class EchoServer { private static final Logger log = LoggerFactory.getLogger(EchoServer.class); static final boolean SSL = System.getProperty("ssl") != null; private static final String ip = "192.168.31.153"; private static final int port = 10010; public static void main(String[] args) throws Exception { run(); } public static void run() throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } /* * EventLoopGroup(多线程事件loop),处理IO操作,这里我们用了两个事件loop * 第一个用于处理器监听连接请求,第二个用于数据的传输; * 具体线程是多少依赖于事件loop的具体实现 * */ EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //ServerBootstrap,用于配置服务端,一般为ServerSocket通道 ServerBootstrap serverBoot = new ServerBootstrap(); serverBoot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { //添加通道处理器到通道关联的管道,准确的中文翻译为管道线, 此管道线与Mina中过滤链十分相似, //ChannelInitializer用于配置通道的管道线,ChannelPipeline ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast(new EchoServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128)//socket监听器连接队列大小、 .childOption(ChannelOption.SO_KEEPALIVE, true); //保活,此配置针对ServerSocket通道接收连接产生的Socket通道 InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port); // 绑定地址,开始监听 ChannelFuture f = serverBoot.bind(inetSocketAddress).sync(); log.info("=========Server is start========="); //等待,直到ServerSocket关闭 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
服务端处理器:
package netty.handler.echo; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; /** * Handles a server-side channel. * @author donald * 2017年6月16日 * 上午9:36:53 */ public class EchoServerHandler extends ChannelInboundHandlerAdapter { private static final Logger log = LoggerFactory.getLogger(EchoServerHandler.class); /** * 读client通道数据,通道处理器上下文ChannelHandlerContext与Mina的会话很像 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf)msg; byte[] bytes = new byte[in.writerIndex()]; in.readBytes(bytes); //针对堆buf,direct buf不支持 // byte[] bytes = in.array(); String message = null; try { message = new String(bytes,"UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } try{ log.info("===Server reciever message:" +message); } finally{ //如果msg为引用计数对象,在使用后注意释放,一般在通道handler中释放 // ReferenceCountUtil.release(msg); } String ackMessage = "hello client ..."; in.clear(); try { in.writeBytes(ackMessage.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //异常发生时,关闭连接 cause.printStackTrace(); ctx.close(); } }
客户端:
package netty.main.echo; import java.net.InetSocketAddress; import javax.net.ssl.SSLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import netty.handler.echo.EchoClientHandler; /** * * @author donald * 2017年6月20日 * 下午12:44:58 */ public final class EchoClient { private static final Logger log = LoggerFactory.getLogger(EchoClient.class); private static final boolean SSL = System.getProperty("ssl") != null; private static final String ip = System.getProperty("host", "192.168.31.153"); private static final int port = Integer.parseInt(System.getProperty("port", "10010")); public static void main(String[] args) throws Exception { run(); } private static void run() throws SSLException, InterruptedException{ //配置安全套接字上下文 final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Bootstrap,用于配置客户端,一般为Socket通道 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //添加安全套接字处理器和通道处理器到 ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc(), ip, port)); } pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast(new EchoClientHandler()); } }); InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port); //连接socket地址 ChannelFuture f = bootstrap.connect(inetSocketAddress).sync(); log.info("=========Client is start========="); //等待,直到连接关闭 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
客户端处理器:
package netty.handler.echo; import java.io.UnsupportedEncodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * * @author donald * 2017年6月20日 * 下午12:45:04 */ public class EchoClientHandler extends ChannelInboundHandlerAdapter { private static final Logger log = LoggerFactory.getLogger(EchoClientHandler.class); private final ByteBuf firstMessage; public EchoClientHandler() { String message = "Hello Server..."; firstMessage = Unpooled.buffer(1024);//堆buffer try { firstMessage.writeBytes(message.getBytes("UTF-8")); firstMessage.retainedDuplicate(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.write(firstMessage); ctx.flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; byte[] bytes = new byte[in.writerIndex()]; in.readBytes(bytes); //针对堆buf,direct buf不支持 // byte[] bytes = in.array(); String message = new String(bytes,"UTF-8"); log.info("===Client reciever ack message from Server:" +message); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
分别启动服务端与客户端,控制台输出:
服务端:
[INFO ] 2017-07-05 21:23:51 netty.main.echo.EchoServer =========Server is start=========
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xa4c1b700, L:/192.168.31.153:10010 - R:/192.168.31.153:28426] REGISTERED
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xa4c1b700, L:/192.168.31.153:10010 - R:/192.168.31.153:28426] ACTIVE
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xa4c1b700, L:/192.168.31.153:10010 - R:/192.168.31.153:28426] READ: 15B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 53 65 72 76 65 72 2e 2e 2e |Hello Server... |
+--------+-------------------------------------------------+----------------+
[INFO ] 2017-07-05 21:23:59 netty.handler.echo.EchoServerHandler ===Server reciever message:Hello Server...
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xa4c1b700, L:/192.168.31.153:10010 - R:/192.168.31.153:28426] WRITE: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 63 6c 69 65 6e 74 20 2e 2e 2e |hello client ...|
+--------+-------------------------------------------------+----------------+
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xa4c1b700, L:/192.168.31.153:10010 - R:/192.168.31.153:28426] READ COMPLETE
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xa4c1b700, L:/192.168.31.153:10010 - R:/192.168.31.153:28426] FLUSH
客户端:
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xbd3abb79] REGISTERED
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xbd3abb79] CONNECT: /192.168.31.153:10010
[INFO ] 2017-07-05 21:23:59 netty.main.echo.EchoClient =========Client is start=========
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xbd3abb79, L:/192.168.31.153:28426 - R:/192.168.31.153:10010] ACTIVE
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xbd3abb79, L:/192.168.31.153:28426 - R:/192.168.31.153:10010] WRITE: 15B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 53 65 72 76 65 72 2e 2e 2e |Hello Server... |
+--------+-------------------------------------------------+----------------+
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xbd3abb79, L:/192.168.31.153:28426 - R:/192.168.31.153:10010] FLUSH
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xbd3abb79, L:/192.168.31.153:28426 - R:/192.168.31.153:10010] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 63 6c 69 65 6e 74 20 2e 2e 2e |hello client ...|
+--------+-------------------------------------------------+----------------+
[INFO ] 2017-07-05 21:23:59 netty.handler.echo.EchoClientHandler ===Client reciever ack message from Server:hello client ...
[INFO ] 2017-07-05 21:23:59 io.netty.handler.logging.LoggingHandler [id: 0xbd3abb79, L:/192.168.31.153:28426 - R:/192.168.31.153:10010] READ COMPLETE