简介:
BIO:同步阻塞式IO,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。
NIO:同步非阻塞式IO,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。
BIO
网络编程的基本模型是C/S模型,即两个进程间的通信。
服务端提供IP和监听端口,客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接,如果连接成功建立,双方就可以通过套接字进行通信。
传统的同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。
简单的描述一下BIO的服务端通信模型:采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理没处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的一请求一应答通宵模型。
【传统BIO通信模型图】
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,Java中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终就挂了。这种通信模型在高并发的场景是没法应用的。
同步阻塞式I/O创建的Server源码:
package com.cherry.socket.bio; import java.io.*; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author [email protected] * @create 2018/4/13 9:24 * @desc **/ public class BIOServerBootStrap { public static void main(String[] args) throws IOException { //1.Create ServerSocket ServerSocket serverSocket = new ServerSocket(); //2.Set up the service listener port serverSocket.bind(new InetSocketAddress(9999)); //Create a thread pool ExecutorService threadPool = Executors.newFixedThreadPool(150); while (true){ //3.Waitting for request System.out.println("----waitting for request----"); final Socket socket = serverSocket.accept(); threadPool.submit(new Runnable() { public void run() { try { //4.Obtain user intent InputStream is = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(is); BufferedReader br = new BufferedReader(isr); StringBuilder sb = new StringBuilder(); String line = null; while ((line=br.readLine())!=null){ sb.append(line); } System.out.println("----the message get by server---- : "+sb); //5.client response OutputStream os = socket.getOutputStream(); PrintWriter pw = new PrintWriter(os); pw.println("server time"+new Date().toLocaleString()); pw.flush(); socket.shutdownOutput();//Tell the client to write the cut-off //6.release the resource socket.close(); } catch (IOException e) { e.printStackTrace(); } } }); } } }
同步阻塞式I/O创建的Client源码:
package com.cherry.socket.bio; import java.io.*; import java.net.InetSocketAddress; import java.net.Socket; /** * @author [email protected] * @create 2018/4/13 11:07 * @desc **/ public class BIOClientBootStrap { public static void main(String[] args) throws IOException { //1.Create client socket Socket socket = new Socket(); //2.connect to the server socket.connect(new InetSocketAddress("127.0.0.1",9999)); //3.makes requests to the server OutputStream os = socket.getOutputStream(); PrintWriter pw = new PrintWriter(os); pw.println("Hello,This is Client 2"); pw.flush(); socket.shutdownOutput();//Tell the Server to write the cut-off //4.get the response InputStream is = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(is); BufferedReader br = new BufferedReader(isr); StringBuilder sb = new StringBuilder(); String line =null; while ((line=br.readLine())!=null){ sb.append(line); } System.out.println("----The message get by client---- : "+sb); //5.release the resource socket.close(); } }
NIO
non-blocking IO,
NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。
新增的着两种通道都支持阻塞和非阻塞两种模式。
阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。
对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用NIO的非阻塞模式来开发。
NIO编程是面向通道的(BIO是面向流的),流分为写入/写出,是单向的,意味着通道是可以进行双向读写的。NIO所有基于channel的API对数据的操作都是间接通过操作缓冲区ByteBuffer
In : 磁盘 --通道--> ByteBuffer(内存)-->数据(内存)
Out: 数据(内存)-->ByteBuffer(内存) --> 通道 --> 磁盘
NIO编程涉及ServerSocketChannel
、SocketChannel
、ByteBuffer
、Selector
(通道选择器)
服务端源码:
package com.cherry.socket.nio; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * @author [email protected] * @create 2018/4/13 11:34 * @desc **/ public class NIOServerBootStrap { public static void main(String[] args) throws IOException { //1.Create ServerSocketChannel ServerSocketChannel ssc = ServerSocketChannel.open(); //2.Set up the service listener port ssc.socket().bind(new InetSocketAddress(9999)); //3.Set up the channel to be non-blocking ssc.configureBlocking(false); //4.Create channel selector Selector selector =Selector.open(); //5.注册事件类型 ssc.register(selector, SelectionKey.OP_ACCEPT); //6.遍历事件处理列表 while (true){ int num = selector.select(); if (num>0){ //获取所有待处理的keys Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()){ SelectionKey key = keys.next(); //处理事件 if(key.isAcceptable()){ System.out.println("----key isAcceptable----"); ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = channel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector,SelectionKey.OP_ACCEPT); }else if (key.isReadable()){ System.out.println("----key isReadable(read)----"); SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); while (true){ buffer.clear(); int n = channel.read(buffer); if (n==1)break; baos.write(buffer.array(),0,n); } channel.register(selector,SelectionKey.OP_WRITE,baos); } else if (key.isWritable()){ System.out.println("----key isWritable(write)----"); SocketChannel channel = (SocketChannel) key.channel(); ByteArrayOutputStream baos = (ByteArrayOutputStream) key.attachment(); baos.write("_服务器追加".getBytes()); ByteBuffer buffer = ByteBuffer.wrap(baos.toByteArray()); channel.write(buffer); channel.socket().shutdownOutput(); channel.close(); } //移除时间列表 keys.remove(); } } else { continue; } } } }
Netty
Netty的通讯管道:
通过AOP的编程思想(责任链设计模式),实现消息的编解码。
服务端编程:
1)创建ServerBootstrap sbt=new ServerBootstrap();
2)创建EventLoopGroup boss、worker
3)关联boss和worker> sbt.group(boss,worker);
4)设置ServerSocketChannel实现类sbt.channel(NioServerSocketChannel);
5)初始化通道sbt.childHandler(初始化通道);
6)绑定端口并启动服务ChannelFuture future=sbt.bind(port).sync();
7)等待服务关闭 future.channel().closeFuture().sync();
8) 释放线程资源boss、worker # shutdownGraceFully();
服务端源码:
package com.cherry.socket.netty.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @author [email protected] * @create 2018/4/16 9:51 * @desc **/ public class ServerBootStrap { public static void main(String[] args) throws InterruptedException { //1.Create ServerBootstrap ServerBootstrap sbt = new ServerBootstrap(); //2.Create thread pool(boss & worker) EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); //3.关联线程池(boss负责请求转发,worker负责事件处理) sbt.group(boss,worker); //4.Set up the server sbt.channel(NioServerSocketChannel.class); //5.Initialize the communication channel (key point) sbt.childHandler(new ServerChannelInitizlizer()); //6.Bind listening port System.out.println("我在@9999端口监听..."); ChannelFuture channelFuture = sbt.bind(9999).sync(); //7.Wait for the server to close channelFuture.channel().closeFuture().sync(); //l boss.shutdownGracefully(); worker.shutdownGracefully(); } }
package com.cherry.socket.netty.server; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * @author [email protected] * @create 2018/4/16 10:26 * @desc **/ public class ServerChannelHander extends ChannelHandlerAdapter { /*捕获数据在通道传输过程中的异常*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.err.println("错误:"+cause.getMessage()); } /*接收数据,并响应*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelFuture channelFuture = ctx.writeAndFlush(msg); //关闭通道 channelFuture.addListener(ChannelFutureListener.CLOSE); } }
package com.cherry.socket.netty.server; import com.cherry.socket.netty.server.ServerChannelHander; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; /** * @author [email protected] * @create 2018/4/16 10:02 * @desc **/ public class ServerChannelInitizlizer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //communication channel ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new ServerChannelHander()); } }
客户端编程:
1)创建Bootstrap sbt=new Bootstrap();
2)创建EventLoopGroup worker
3)关联boss和worker> sbt.group(worker);
4)设置SocketChannel实现类sbt.channel(NioSocketChannel);
5)初始化通道sbt.handler(初始化通道);
6)绑定端口并启动服务ChannelFuture future=sbt.connect(host,port).sync();
7)等待服务关闭 future.channel().closeFuture().sync();
8)释放线程资源worker # shutdownGraceFully();
客户端源码:
package com.cherry.socket.netty.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; /** * @author [email protected] * @create 2018/4/16 11:15 * @desc **/ public class ClientBootStrap { public static void main(String[] args) throws InterruptedException { //1.Create Bootstrap Bootstrap bt = new Bootstrap(); //2.Create thread pool(worker) EventLoopGroup worker = new NioEventLoopGroup(); //3.关联线程池 bt.group(worker); //4.Set up the client bt.channel(NioSocketChannel.class); //5.Initialize the communication channel (key point) bt.handler(new ClientChannelInitializer()); //6.连接 ChannelFuture channelFuture = bt.connect("127.0.0.1",9999).sync(); //7.Wait for the server to close channelFuture.channel().closeFuture().sync(); //8.release the resource worker.shutdownGracefully(); } }
package com.cherry.socket.netty.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; import javax.xml.transform.Source; /** * @author [email protected] * @create 2018/4/16 11:22 * @desc **/ public class ClientChannelHander extends ChannelHandlerAdapter { /*捕获数据在通道传输过程的异常*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //super.exceptionCaught(ctx, cause); System.out.println("错误:"+cause.getMessage()); } /*接收数据*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //super.channelRead(ctx, msg); ByteBuf byteBuf = (ByteBuf) msg; System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8)); } /*链接服务器时发送数据*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //super.channelActive(ctx); ByteBuf buf = Unpooled.buffer(); buf.writeBytes("你好,我是客户端!".getBytes()); ctx.writeAndFlush(buf); } }
package com.cherry.socket.netty.client; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; /** * @author [email protected] * @create 2018/4/16 11:20 * @desc **/ public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //communication channel ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new ClientChannelHander()); } }
上述BIO NIO Netty所用的pom.xml
<!-- socket 相关 注意版本,不然有些方法不能用--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency>
通过对Netty的分析,我们将他的优点总结如下:
1.API使用简单,开发门槛低;
2.功能强大,预设了很多编码功能,支持多种主流协议;
3.定制能力强,可以通过ChannelHandler对通信框架进行灵活扩展;
4.性能高,通过与其他的主流NIO框架对比,Netty的综合性能最优;
5.成熟,稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG烦恼;
6.社区活跃,版本迭代周期短;
7.经过大规模的商业应用考验,质量得到验证。Netty在互联网、大数据、网络游戏、企业应用、电信软件等众多领域已经得到了成功商用,也 证明他已经完全能满足不同行业的商应用了;
基于上述这些,我将在下一篇文章中写一些关于如何实现高性能可扩展RPC,希望对大家有所帮助!