一:RPC框架设计
1、socket:
TCP/IP协议是传输层协议,主要解决数据如何在网络中传输;
HTTP 是应用层协议,主要解决如何包装数;socket则是支持TCP/IP协议网络通信的基本操作单元,是通信端点的抽象表示,比如代表客户端,服务端。他包含了通信的五种信息:
连接使用的协议、本地主机的IP地址、本地进程的协议端口、远程主机的IP地址、远程进程的协议端口。
Socket编程主要涉及到客户端和服务端,首先是在服务器端创建一个套接字 (ServerSocket),并把它附加到一个端口上,从这个端口监听连接。端口号的范围是0到
65536 ,但是 0 到 1024 是为特权服务保留的端口号,所以选用其他端口 。客户端请求与服务器进行连接的时候,根据服务器的域名或者 IP 地址,加上端口号,打开一个套接字。当服务器接受连接后,服务器和客户端之间的通信就像输入输出流一样进行作。
代码示例:
服务端:
package cn.jojo.uc.server.controller;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ServerDemo {
public static void main(String[] args) throws Exception {
//1.创建一个线程池,如果有客户端连接就创建一个线程, 与之通信
ExecutorService executorService = Executors.newCachedThreadPool();
//2.创建 ServerSocket 对象
ServerSocket serverSocket = new ServerSocket(9999);
System.out.println("服务器已启动");
while (true) {
//3.监听客户端
Socket socket = serverSocket.accept();
System.out.println("有客户端连接");
//4.开启新的线程处理
executorService.execute(new Runnable() {
@Override
public void run() {
handle(socket);
}
});
}
}
public static void handle(Socket socket) {
try {
System.out.println("线程ID:" + Thread.currentThread().getId() + " 线程名称:" + Thread.currentThread().getName());
//从连接中取出输入流来接收消息
InputStream is = socket.getInputStream();
byte[] b = new byte[1024];
int read = is.read(b);
System.out.println("客户端:" + new String(b, 0, read));
//连接中取出输出流并回话
OutputStream os = socket.getOutputStream();
os.write("没钱".getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//关闭连接
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
客户端:
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;
public class ClientDemo {
public static void main(String[] args) throws Exception {
while (true) {
//1.创建 Socket 对象
Socket s = new Socket("127.0.0.1", 9999);
//2.从连接中取出输出流并发消息
OutputStream os = s.getOutputStream();
System.out.println("请输入:");
Scanner sc = new Scanner(System.in);
String msg = sc.nextLine();
os.write(msg.getBytes());
//3.从连接中取出输入流并接收回话
InputStream is = s.getInputStream();
byte[] b = new byte[1024];
int read = is.read(b);
System.out.println("老板说:" + new String(b, 0, read).trim());
//4.关闭
s.close();
}
}
}
2、IO模型:
同步:需要主动去看水有没有烧开同步阻塞:烧水之后,啥也不干,蹲着那里等水烧开。同步非阻塞: 烧水之后,可以去干其他事,这就是非阻塞; 但依旧是同步的,因为他 时不时的要来看看是否烧开,主动获取结果。异步:水开之后发出警报通知,不用自己主动去看。异步阻塞:虽然会水开后会发出警报,不用自己去获取结果。 但是,在收到结果之前,啥也不干。异步非阻塞:烧水后不主动获取结果,水开了会有警报通知,,期间继续干其他事情。
- BIO:
-
一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器 ) 。
-
- NIO:
-
同步非阻塞,服务器实现模式为一个线程处理多个请求 ( 连接 ) ,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理。说白了,这个线程不会一直阻塞在一个连接上,但是会主动获取每个连接的结果。
-
-
- AIO:
-
AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是,先由操作系统完成后才通知服务端程序启动线程去处理。 Proactor 模式是一个消息异步通知的设计模式。
-
接下来重点说一下NIO,因为它使用最多最常用,Netty框架也是基于它来实现的。
NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)。 它是面向缓冲区编程的,扩展性强。
NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来 , 根据实际情况,可以分配 50 或者 100 个线程来处理。不像BIO 那样,非得分配 10000 个。BIO是基于字符流或字节流操作,而NIO通过channel读取到buffer中,或从buffer写入到channel。 Selector( 选择器 ) 用于监听多个通道的事件(比如:连接请求, 数据到达等),因此使用单个线程就可以监听多个客户端通道。
NIO对于Buffer,Channel,Selector三大部分,都有提供相关的API,去操作使用内存,通道和选择器。 但是,通常不会直接去使用它,因为开发难度大,例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。 要很熟悉多线程,网络等才能编写出可实践的代码。
Netty,,一个基于NIO的网络编程框架,简化了NIO的开发过程。Elasticsearch 、Dubbo 框架内部都采用了 Netty。
不同的线程模式,对程序的性能有很大影响,在学习Netty线程模式之前,首先讲解下 各个线程模 式, 最后看看 Netty 线程模型有什么优越性。
3、线程模型:
- 传统阻塞 I/O 服务模型
- 采用阻塞 IO 模式获取输入的数据, 每个连接都需要独立的线程完成数据的处理(也就是BIO模式)
- Reactor 模型:
服务器端程序处理传入的多个请求, 并将它们同步分派到相应的处理线程, 因此 Reactor 模式也叫 Dispatcher 模式。 Reactor 模式使用 IO 复用监听事件 , 收到事件后,分发给某个线程 ( 进程 ), 这点就是网络服务器高并发处理关键 .Reactor有几种方式:
- 单Reactor单线程:
Selector可以实现,应用程序通过一个阻塞对象监听多路连接请求;Reactor 对象通过 Selector监控客户端请求事件,收到事件后通过 Dispatch 进行分发。如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的业务处理。Handler 会完成 Read→业务处理→Send 的完整业务流程。
缺点: 1、性能问题:只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时, 整个进程无法处理其他连接事件,很容易导致性能瓶颈。 2、可靠性问题: 如果线程意外终止或者进入死循环,会导致整个系统通信模块不可用,造成节点故障。
- 单Reactor多线程:
用多个Handler线程处理业务,并且Handler只负责读请求和返回结果,由worker线程真正处理业务。
缺点:1、多线程就会设计数据共享的问题。 2、对于Reactor主线程来说,还是单线程,处理所有的事件的监听和响应,任务繁重,高并发时容易出现瓶颈。
- 多Reactor多线程:
如上,MainReactor只负责建立连接,后续交给SubReactor。 并且,一个MainReactor 可以对应多个 SubReactor,SubReactor处理后不用返回结果给MainReactor, 职责明确,充分利用系统资源,应对高并发。
- 单Reactor单线程:
4、Netty:
对于Netty来说,主要基于主从 Reactor 多线程模式,并做了一定的改进(不管是主从Reactor,都有多个线程组)。 下面看看Netty设计(从简单到详细):
- 简单版Netty:
就是上面的多Reactor多线程模型
BossGroup 线程维护 Selector来监听请求,ServerSocketChannel 注册到这个 Selector 上,只关注建立连接请求事件(主 Reactor)。
当收到客户端建立连接的请求时,通过 ServerSocketChannel.accept 方法获得对应的 SocketChannel,并封装成 NioSocketChannel 注册到 WorkerGroup 线程中的
Selector(监听处理请求) ,每个 Selector 运行在一个线程中(从 Reactor )。当 WorkerGroup 线程中的 Selector 监听到自己感兴趣的 IO 事件后,就调用 Handler 进行处理。 - 进阶版Netty:
有两组线程池:BossGroup 和 WorkerGroup,BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写。
BossGroup 和 WorkerGroup 含有多个不断循环的执行事件处理的线程,每个线程都包含一个 Selector,用于监听注册在其上的 Channel。
每个 BossGroup 中的线程循环执行以下三个步骤:1、 轮训 注册在其上的 ServerSocketChannel 的 accept 事件。2、处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel ,并将其注册到WorkerGroup 中某个线程上的 Selector 上。3、再去以此循环处理任务队列中的下一个事件。每个 WorkerGroup 中的线程循环执行以下三个步骤:1、 轮训注册在其上的 NioSocketChannel 的 read/write 事件。2、在对应的 NioSocketChannel 上处理 read/write 事件。3、再去以此循环处理任务队列中的下一个事件。 - 详细版Netty:
同样,还是有两组线程池:Boss Group 和 Worker Group, 包含很多NioEventLoop线程(红色部分,图中只画了一个,实际有很多个),也就是循环处理NIO事件的。每个 NioEventLoop 都包含一个 Selector,用于监听注册在其上的 Socket 网络连接(Channel)。
每个 BossNioEventLoop 中循环执行以下三个步骤:
1、select :轮训接收注册在其上的 ServerSocketChannel 的 accept 事件( OP_ACCEPT 事件)2、processSelectedKeys :处理的 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到 某个 WorkerNioEventLoop 上的 Selector 上。3、runAllTasks :再去以此循环处理任务队列中的其他任务。每个 WorkerNioEventLoop 中循环执行以下三个步骤:1、select :轮训注册在其上的 NioSocketChannel 的 read/write 事件( OP_READ/OP_WRITE 事件)。2、processSelectedKeys :在对应的 NioSocketChannel 上处理 read/write 事件。3、runAllTasks :再去以此循环处理任务队列中的其他任务。在以上两个 processSelectedKeys 步骤中,会使用 Pipeline (管道), Pipeline 中引用了Channel ,即通过 Pipeline 可以获取到对应的 Channel , Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。
了解了Netty的线程模型,下面来看看Netty几个核心类的api:
ChannelHandler及其实现类:
ChannelHandler 接口是事件处理器,它定义了许多事件处理的方法,我们可以通过重写这些方法去实现具 体的业务逻辑。
ChannelHandlerContext:
这 是 事 件 处 理 器的 上 下 文 对 象 , Pipeline 链 中 的 实 际 处 理 节 点 。 每 个
ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler , 同时也绑定了对应的 ChannelPipeline 和 Channel 的信息,方便对 ChannelHandler 进行调用。常用方法如下所示:ChannelFuture close() ,关闭通道ChannelOutboundInvoker flush() ,刷新ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前ChannelHandler 的下一个 ChannelHandler , 开始处理(出站)
一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的责任链。
如图,不管客户端还是服务端,写消息都是直接写入蓝色的Handle(out类型的),然后经过红色handle(混合类型),然后另一端通过绿色Handle(in类型的)读取。
在ChannelPipeline中,其实就是一个个的ChannelHandlerContext,每个上下文对象都有事件处理器ChannelHandler(也就是上面的Handle),In类型是按照Pipleline的加载顺序,顺序执行; Out类型是按照Pipeline的加载顺序,逆序执行
ChannelOption:
Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标准参数,而非 Netty 独创的。常用的参数配置有:
ChannelOption.SO_BACKLOG:对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。ChannelOption.SO_KEEPALIVE :一直保持连接活动状态。该参数用于设置 TCP 连接,这个选项用于可能长时间没有数据交流的连接,如果在两小时内没有数据的通信时,TCP 会自动发送一个活动探测数据报文。
ChannelFuture:
用来获取 Channel 中异步 I/O 操作的结果。 因为在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返回,调用者并不能立刻获得果,但是可以通过 ChannelFuture 来获取 I/O 操作的处理状态。
常用方法如下所示:
Channel channel() ,返回当前正在进行 IO 操作的通道ChannelFuture sync() ,等待异步操作执行完毕 , 将异步改为同步
EventLoopGroup和实现类NioEventLoopGroup:
EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般 会有多个EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:
BossEventLoopGroup 和 WorkerEventLoopGroup 。 通常一个服务端口即一个 ServerSocketChannel ,对应一个Selector 和一个 EventLoop 线程。 BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,如下图所示:BossEventLoopGroup 通常是一个单线程的 EventLoop( 为什么是单线程, 而不是 Netty线程模型中说的多线程呢? 因为通常使用Netty时,只会监听一个服务端口 ) , EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例, BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,WorkerEventLoopGroup由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。一般情况下我们都是用实现类 NioEventLoopGroup.常用方法如下所示:public NioEventLoopGroup() ,构造方法 , 创建线程组public Future<?> shutdownGracefully() ,断开连接,关闭线程
Netty 中的服务器端 和 客户端 的启动助手,通过它们可以完成服务端 和 客户端的各种配置。public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)该方法用于服务器端,用来设置两个 EventLooppublic B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLooppublic B channel(Class<? extends C> channelClass) ,该方法用来设置一个服务器端的通道 实现public B option(ChannelOption option, T value) ,用来给 ServerChannel 添加配置public ServerBootstrap childOption(ChannelOption childOption, T value) ,用来给接收到的通道添加配置public ServerBootstrap childHandler(ChannelHandler childHandler) ,该方法用来设置业务 处理类(自定义的 handler)public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连接服务器端
Netty用来操作缓冲区的类。常用API:
public static ByteBuf copiedBuffer(CharSequence string, Charset charset),
通过给定的数据 和 字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)
了解了常用API,下面看看Netty入门案列:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
Netty服务端编写
服务端实现步骤 :1. 创建 bossGroup 线程组 : 处理网络事件 -- 连接事件2. 创建 workerGroup 线程组 : 处理网络事件 -- 读写事件3. 创建服务端启动助手4. 设置 bossGroup 线程组和 workerGroup 线程组5. 设置服务端通道实现为 NIO6. 参数设置7. 创建一个通道初始化对象8. 向 pipeline 中添加自定义业务处理 handler9. 启动服务端并绑定端口 , 同时将异步改为同步10. 关闭通道和关闭连接池
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args)
throws InterruptedException {
//1.创建bossGroup线程组: 处理网络事件--连接事件
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//2.创建workerGroup线程组: 处理网络事件--读写事件 2 * 处理器线程数
EventLoopGroup workerGroup = new NioEventLoopGroup();
//3.创建服务端启动助手
ServerBootstrap bootstrap = new ServerBootstrap();
//4.设置线程组
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//5.设置服务端通道实现;
.option(ChannelOption.SO_BACKLOG, 128)//6.参数设置-设置线程队列中等待 连接个数
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//7.参数设 置-设置活跃状态,child是设置workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//8.创建一 个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//9.向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyServerHandle());
}
});
//10.启动服务端并绑定端口,同时将异步改为同步
ChannelFuture future = bootstrap.bind(9999).sync();
System.out.println("服务器启动成功....");
//11.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
future.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class NettyServerHandle implements ChannelInboundHandler {
/*** 通道读取事件 ** @param ctx 通道上下文对象 * @param msg 消息 * @throws Exception */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发来消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
/*** 读取完毕事件 ** @param ctx * @throws Exception */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服务端.", CharsetUtil.UTF_8));
}
/*** 异常发生事件 ** @param ctx * @param cause * @throws Exception */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
}
/*** 通道就绪事件 ** @param ctx * @throws Exception */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}
客户端实现步骤 :1. 创建线程组2. 创建客户端启动助手3. 设置线程组4. 设置客户端通道实现为 NIO5. 创建一个通道初始化对象6. 向 pipeline 中添加自定义业务处理 handler7. 启动客户端 , 等待连接服务端 , 同时将异步改为同步8. 关闭通道和关闭连接池
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//1. 创建线程组
EventLoopGroup group = new NioEventLoopGroup();
//2. 创建客户端启动助手
Bootstrap bootstrap = new Bootstrap();
//3. 设置线程组
bootstrap.group(group)
.channel(NioSocketChannel.class)//4. 设置服务端通道实现为NIO
.handler(new ChannelInitializer<SocketChannel>() { //5. 创建一个通 道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//6. 向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyClientHandle());
}
});
//7. 启动客户端, 等待连接服务端, 同时将异步改为同步
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
//8. 关闭通道和关闭连接池
future.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class NettyClientHandle implements ChannelInboundHandler {
/*** 通道读取事件 ** @param ctx 通道上下文对象 * @param msg 消息 * @throws Exception */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务端发来消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
/*** 读取完毕事件 ** @param ctx * @throws Exception */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty客户端.", CharsetUtil.UTF_8));
}
/*** 异常发生事件 ** @param ctx * @param cause * @throws Exception */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
}
/*** 通道就绪事件 ** @param ctx * @throws Exception */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}
分别启动服务端和客户端,运行结果如下:
1、服务端启动,会打印
服务器启动成功....然后等待客户端输入;
2、客户端启动,在NettyClientHandle的通道就绪事件channelActive方法中会发送:
你好呀,我是Netty客户端此时,服务端在NettyServerHandle#channelRead中会读取到消息并打印。
在读取完成后,在NettyServerHandle#channelReadComplete中会发送:
你好,我是Netty服务端.3、客户端在NettyClientHandle#channelRead 会读取服务端的消息并打印,读取完成后,又会在NettyClientHandle#channelReadComplete继续发送消息
你好,我是Netty客户端.然后,服务端再接收打印,再发送消息,客户端也是一样,一直循环下去…
上面就是简单的例子,使用的是Future同步等待。
Future 常用方法有:sync 方法 , 阻塞等待程序结果反回isDone 方法来判断当前操作是否完成;isSuccess 方法来判断已完成的当前操作是否成功;getCause 方法来获取已完成的当前操作失败的原因;isCancelled 方法来判断已完成的当前操作是否被取消;
如果不调用sync方法,则默认是异步,可以注册listener来查看异步结果。 比如:服务端启动时不调用sync()则为异步的方式,可以给Future注册监听来通知。
// ChannelFuture future = bootstrap.bind(9999).sync();
//使用异步
ChannelFuture future = bootstrap.bind(9999);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {
System.out.println("端口绑定失败!");
}
}
});
System.out.println("服务器启动成功....");
上面代码使用异步后,会先打印服务器启动成功,, 然后异步的结果在端口绑定成功后才会通知出来。
客户端发送消息时候,也是异步,,可以在通道就绪事件中添加listener获取异步结果:
/*** 通道就绪事件 ** @param ctx * @throws Exception */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("数据发送成功.");
} else {
System.out.println("数据发送失败.");
}
}
});
}