高并发RPC解决方案
-
基于TCP的RPC实现
- Dubbo:
阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的RPC实现服务的输出和输入功能,可以和Spring框架无缝集成。
- Dubbo:
-
Provider: 暴露服务的服务提供方。
-
Consumer: 调用远程服务的服务消费方。
-
Registry: 服务注册与发现的注册中心。
-
Monitor: 统计服务的调用次调和调用时间的监控中心。
-
Container: 服务运行容器。
-
思考:TCP(Dubbo)和HTTP(SpringCloud)哪个更好?
- 通用性
- http协议更通用,无论哪种语言都支持
- 性能
- tcp的性能更好,高两倍左右,并发越高,性能提升就很可观了
- 服务的全面性
- 最初的dubbo主要只是用于服务治理,更轻量话,但是随着微服务的发展,阿里针对dubbo就进一步退出了spring cloud alibaba,服务全面性也提高了
- 热度?来查查百度指数
- dubbo高于spring cloud
- 那究竟选谁?
- 更客观的,应该根据业务来做选择,如果业务是对外开放的,选spring cloud更好,如果是内部调用,选用性能更好的dubbo或许更好,并不是越新的技术越好,因为新技术采坑的可能性越大,到时没有足够的技术来解决,往往得不偿失。
原生JDK网络编程- NIO
-
什么是NIO?
- jdk1.4引入
- new io、no-blocking io
- jdk开发小组在bio的基础上新引入的io
-
nio和bio的区别?
区别一
-
bio是面向流的,读取数据是直接读直接写
-
nio是面向缓冲的,读写数据先保存到缓冲区,速度可以快一些
区别二
-
bio是阻塞的,当从网络读写数据时,如果读不进或者写不进时,这个线程就会被阻塞了,对线程的性能消耗很大
-
读数据或写数据无法进行时,线程可以解放出来去执行别的任务
一次网络读写,哪个快?
-
bio更快,只有一次系统调用,nio需要两次系统调用
nio的优势
-
多余并发很大的时候,bio需要启动大量的线程来应对网络请求,但是nio具有多路复用的特性,可以一个线程应对很多个网络请求
-
-
三大核心组件及关系
-
Selector—选择器
通过slector来选择通道
-
Channel—通讯通道
-
Buffer------缓存区
三大组件是怎么协同工作的?
-
-
如果一个客户端同时发送连接请求,另一个客户端发送读数据请求,有两个事件连接事件和读事件
此时新会新生成一个socketChannel用于读事件
-
当有channel感兴趣的事件到了selector里面时,selector需要通知特定的channel上
-
-
重要概念
-
操作类型SelectionKey----jdk包里的一个类
-
serverSocketChannel告诉selector自己关注什么事件时就要用到selectionKey选择键,就将channel和selector建立了某种关系
1.OP_READ ------- socketChannel的读事件,
客户端的socketChannel除了关注这个事件,还可以关注另外两种,所以至少要用到读写、连接这3种事件
服务端的socketChannel只关注读写事件,不需要连接
2.OP_WRITE
3.OP_CONNECT
4.OP_ACCEPT ----- 流程图中步骤1.关注客户端的连接 ServerSocketChannel只关注accept事件
-
-
多路复用IO
-
多路复用IO 应用程序 分割线 操作系统 选择器 | SelectableChannel ---register--->| | SelectableChannel ---register--->| | 系统内核 SelectableChannel ---register--->| | | 这里可以是阻塞的也可以是非阻塞的 -----询问事件/注册事件------> | | | | | | | 将特定事件交给对<-----反馈事件------- | --------------------它感兴趣的通道 | | | | | |<-客户端连接 | | | |<-数据传输 | | | |<-数据传输 | | | |<-客户端连接 V | | |<-数据传输 某一个SelectableChannel--清空事件->| | |<-数据传输 | | |<-客户端连接 | | |<-数据传输 | | |<-数据传输 | | | | | | | | | | | |
-
原生JDK网络编程- NIO之Buffer
-
可以读写数据的内存区域,实质就是数组
-
重要属性
-
capacity–容量,不可能无限大
-
position–位置,
写入模式,由于是一个数组,所以位置是0~最大容量-1
读模式,表示下一个能读的位置是position
-
limit–限制
写入模式,limit表示最多能写入多少数据,因为不能超过容量capicity
读入模式,limit最多不能超过已经写入的长度,后面的没有数据,都是未知数据
-
-
Buffer的分配、读写和常用操作
-
ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer,但是ByteBuffer为主
- MappedByteBuffer
buffer分配内存
-
package cn.enjoyedu.nio.buffer; import com.sun.management.OperatingSystemMXBean; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; /** * @author Mark老师 * 类说明:Buffer的分配 堆的最大值和最小值都是100M -Xmx100M -Xms100M */ public class AllocateBuffer { public static void main(String[] args) { // jdk提供的在运行时,获取虚拟机信息的类ManagementFactory // 线程信息、堆栈信息、虚拟机信息 // 用在监控系统里面 OperatingSystemMXBean osmxb = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); System.out.println("----------Test allocate--------"); // 空闲物理内存大小 System.out.println("before allocate:" + osmxb.getFreePhysicalMemorySize()); /*堆上分配*/ // 从已经创建的堆上取20万字节 ByteBuffer buffer = ByteBuffer.allocate(200000); System.out.println("buffer = " + buffer); // 使用堆上内存后,仍然跟空闲物理内存一样 // 实际也有可能有些微差别,因为堆上分配内存时间段,所以很短的时间内很可能是一样的 System.out.println("after allocate:" + osmxb.getFreePhysicalMemorySize()); /* 这部分用的直接内存*/ // 从堆外内存上分配20万字节 ByteBuffer directBuffer = ByteBuffer.allocateDirect(200000); System.out.println("directBuffer = " + directBuffer); // 使用堆外内存后,空闲物理内存发生了变化 // 因为系统物理内存一直在变化,有可能打,也有可能小 // 堆外分配内存比较耗费时间,时间差的更多,所以很大可能是不一样的 System.out.println("after direct allocate:" + osmxb.getFreePhysicalMemorySize()); System.out.println("----------Test wrap--------"); byte[] bytes = new byte[32]; // 因为buffer实际是字节数组 // 所以可以通过wrap方法把字节数组转换成buffer buffer = ByteBuffer.wrap(bytes); System.out.println(buffer); buffer = ByteBuffer.wrap(bytes, 10, 10); System.out.println(buffer); } }
-
Runtime.getRuntime().freeMemory()获取的是jvm的堆上还有多少空余内存没有用,不是上面的物理内存
-
直接内存比堆内存快,为什么?
-
往buffer写数据
- 应用程序往buffer写
- channel往buffer写
往buffer写数据
-
package cn.enjoyedu.nio.buffer; import java.nio.ByteBuffer; /** * @author Mark老师 * 类说明:Buffer方法演示 */ public class BufferMethod { public static void main(String[] args) { System.out.println("------Test get and put-------------"); ByteBuffer buffer = ByteBuffer.allocate(32); buffer.put((byte) 'a')//0 .put((byte) 'b')//1 .put((byte) 'c')//2 .put((byte) 'd')//3 .put((byte) 'e')//4 .put((byte) 'f');//5 // put方法往buffer中推入6个字符 // 属性发生了哪些变化? // position发生了变化,从0变成6 // limit和capacity没有变化,还是32 System.out.println("before flip()" + buffer); /* 转换为读取模式*/ // 把buffer从写模式转换成读模式 // 做了一个很简单的赋值操作 buffer.flip(); System.out.println("before get():" + buffer); // 读取第一个值后,position向后移动1位 System.out.println((char)buffer.get()); System.out.println("after get():" + buffer); /* position移动两位*/ byte[] dst = new byte[10]; // get方法也可以移动多位 buffer.get(dst, 0, 2); System.out.println("after get(dst, 0, 2):" + buffer); System.out.println("dst:" + new String(dst)); /*绝对读写*/ System.out.println("--------Test 绝对读写-------"); ByteBuffer bb = ByteBuffer.allocate(32); System.out.println("before put(byte):" + bb); System.out.println("after put(byte):" + bb.put((byte) 'z')); /* put(2,(byte) 'c')不改变position的位置*/ // put方法指定索引的位置 bb.put(2, (byte) 'c'); System.out.println("after put(2,(byte) 'c'):" + bb); System.out.println(new String(bb.array())); /* get(index)不影响position的值*/ // get方法指定索引的位置 System.out.println((char) buffer.get(2)); System.out.println("after get(index):" + buffer); System.out.println("--------Test Clear And Compact--------"); ByteBuffer buffer2 = ByteBuffer.allocate(32); buffer2.put((byte) 'x'); System.out.println("before clear:" + buffer2); // clear可以用来清除数据 // 看名字是一把清,实际清除只是改变了position、limit的位置 // buffer中之前存储的数据并没有清除 buffer2.clear(); System.out.println("after clear:" + buffer2); System.out.println(new String(buffer2.array())); /*放入4个字节,position移动到下个可写入的位置,也就是4*/ buffer2.put("abcd".getBytes()); System.out.println("before compact:" + buffer2); buffer2.flip();//将position设回0,并将limit设置成之前position的值*/ System.out.println("after flip:" + buffer2); /*compact()方法将所有未读的数据拷贝到Buffer起始处。*/ /* 然后将position设到最后一个未读元素正后面。*/ // 可以获取当前还有多少数据没有读 System.out.println("还有数据未读,个数:" + buffer2.remaining()); // compact也可以清除数据 // 会把未读取的数组从数组的开始位置开始排列,同时把position放到这些元素的末尾 buffer2.compact(); System.out.println("after compact:" + buffer2); System.out.println(new String(buffer2.array())); System.out.println("--------Test rewind--------"); buffer.clear(); buffer.position(10);/*移动position到10*/ buffer.limit(15);/*限定最大可写入的位置为15*/ System.out.println("before rewind:" + buffer); buffer.rewind();/*将position设回0*/ System.out.println("before rewind:" + buffer); System.out.println("--------Test mark AND reset----------"); buffer = ByteBuffer.allocate(20); System.out.println("buffer = " + buffer); buffer.clear(); buffer.position(5);/*移动position到5*/ // 打标记 buffer.mark();/*记录当前position的位置*/ buffer.position(10);/*移动position到10*/ System.out.println("before reset:" + buffer); // 恢复到打标记的位置 buffer.reset();/*复位position到记录的地址*/ System.out.println("after reset:" + buffer); } }
总结buffer的使用
-
1.写入数据到buffer
-
2.调用flip方法切换模式
-
3.从buffer里面读数据
-
怎么通过channel往buffer里面写数据呢?
通过channel往buufer里面写数据–NIO编程
服务器端
NIO服务器端NioServer----主类
-
package cn.enjoyedu.nio.nio; import static cn.enjoyedu.nio.Const.DEFAULT_PORT; /** * @author Mark老师 * 类说明:nio通信服务端 */ public class NioServer { private static NioServerHandle nioServerHandle; public static void main(String[] args){ nioServerHandle = new NioServerHandle(DEFAULT_PORT); new Thread(nioServerHandle,"Server").start(); } }
nio处理类NioServerHandle
-
package cn.enjoyedu.nio.nio; import cn.enjoyedu.nio.Const; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import static cn.enjoyedu.nio.Const.response; /** * @author Mark老师 * 类说明:nio通信服务端处理器 */ public class NioServerHandle implements Runnable{ private volatile boolean started; // 服务端需要ServerSocketChannel 用于accept private ServerSocketChannel serverSocketChannel; // 用来在客户端和SercerSocketChannel之间起桥梁作用 private Selector selector; /** * 构造方法 * @param port 指定要监听的端口号 */ public NioServerHandle(int port) { try { /*创建选择器的实例*/ // 注意不能new selector = Selector.open(); /*创建ServerSocketChannel的实例*/ // 注意不能new serverSocketChannel = ServerSocketChannel.open(); /*设置通道为非阻塞模式*/ // 由于是nio,非阻塞的,设置成非阻塞的 serverSocketChannel.configureBlocking(false); /*绑定端口*/ // 跟socket一定有bind()方法,绑定端口 serverSocketChannel.socket().bind(new InetSocketAddress(port)); /*注册事件,表示关心客户端连接*/ // serverSocketChannel往selector注册 // 之前分析的三大主件关系时流转的第一步,关联ACCEPT事件 serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); // 标识服务器已经启动了 started = true; System.out.println("服务器已启动,端口号:"+port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { // 只要服务器是在启动的,就永不停歇的进行下去 while(started){ try { /*获取当前有哪些事件*/ // 不断从Selector上去拿已经有哪些事件产生了 // 一定要当前有事件发生了,才会返回 // 现在是阻塞的,带有超时的,最长只会超时阻塞1秒 // 还有一个非阻塞的方法selector.selectNow(); selector.select(1000); /*获取事件的集合*/ // 获取产生事件的集合 // 用于进一步的处理 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。 如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活 的键出现,这会导致我们尝试再次处理它。*/ iterator.remove(); handleInput(key); } } catch (IOException e) { e.printStackTrace(); } } } /*处理事件的发生*/ private void handleInput(SelectionKey key) throws IOException { // 首先判定key是不是有效的 if(key.isValid()){ /*处理新接入的客户端的请求*/ // 是不是accept事件 if(key.isAcceptable()){ /*获取关心当前事件的Channel*/ // 把获取的channel做强制转型,转成ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); /*接受连接*/ // 类似bio中的socket,而在nio中是socketChannel // 通过ssc.accept()获取socketChannel SocketChannel sc = ssc.accept(); System.out.println("==========建立连接========="); // 上面serverSocketChannel已经设置成非阻塞 // 把socketChannel也设置成非阻塞的 sc.configureBlocking(false); /*关注读事件*/ // socketChannel只关注读事件 // 所以把socketChannel注册到selector上 sc.register(selector,SelectionKey.OP_READ); } /*处理对端的发送的数据*/ // 是不是read事件 if(key.isReadable()){ // 把获取的channel做强制转型,转成SocketChannels SocketChannel sc = (SocketChannel) key.channel(); /*创建ByteBuffer,开辟一个缓冲区*/ // 创建一个buffer,为后面读入数据做准备 ByteBuffer buffer = ByteBuffer.allocate(1024); /*从通道里读取数据,然后写入buffer*/ // 所以这里buffer是写入模式 int readBytes = sc.read(buffer); // 如果读入数据的长度大于0才做处理,否则关闭通道 if(readBytes>0){ // 如果自己做数据的拼接合并,是相当的麻烦 // tcp的粘包半包问题解决起来非常麻烦 // 把读到一半的数据用缓存保存起来,放到本地内存或者缓存里面 // 下一次读的时候取出来,再合并 // 好在netty里面已经针对粘包半包问题做了相应处理,直接使用nio编程是很麻烦很困难的 /*将缓冲区当前的limit设置为position,position=0, 用于后续对缓冲区的读取操作*/ // 下面要对buffer进行读取,所以切换读写模式 buffer.flip(); /*根据缓冲区可读字节数创建字节数组*/ // 当前buffer中有多少数据没读取,就创建多大的byte数组 byte[] bytes = new byte[buffer.remaining()]; /*将缓冲区可读字节数组复制到新建的数组中*/ // 把buffer中保存的数据,取出来,放入byte数组中 buffer.get(bytes); // 做简单处理,直接转成字符串 String message = new String(bytes,"UTF-8"); // 打印转化后的结果 System.out.println("服务器收到消息:"+message); /*处理数据*/ // 拼接响应结果,拼成新的字符串 // Hello, + msg + now is 2021-02-10 String result = Const.response(message); /*发送应答消息*/ doWrite(sc,result); } // readBytes=-1,意味着通道读取过程中发生了错误 // 这时候的读取已经是没有意义的了,所以关闭了通道 else if(readBytes<0){ /*取消特定的注册关系*/ // 执行完key.cancel()后并不会马上移除,而是下一次select的时候才移除 key.cancel(); /*关闭通道*/ sc.close(); } } } } /*发送应答消息*/ private void doWrite(SocketChannel sc,String response) throws IOException { // 把消息从string转成byte数组 byte[] bytes = response.getBytes(); // 新创建一个byteBuffer ByteBuffer buffer = ByteBuffer.allocate(bytes.length); // 把字节数组的值放到byteBuffer中 buffer.put(bytes); // 之前是写数据,但是下面要读buffer中的数据,再写往对端 // 所以nio实现起来是非常麻烦的 buffer.flip(); sc.write(buffer); } public void stop(){ started = false; } }
response的简单方法
-
package cn.enjoyedu.nio; /** * @author Mark老师 * 类说明: */ public class Const { public static int DEFAULT_PORT = 12345; public static String DEFAULT_SERVER_IP = "127.0.0.1"; /*根据输入信息拼接出一个应答信息*/ public static String response(String msg){ return "Hello,"+msg+",Now is "+new java.util.Date( System.currentTimeMillis()).toString() ; } }