Netty有什么用?
随着移动互联网的爆发性增长,小明公司的电子商务系统访问量越来越大,由于现有系统是个单体的巨型应用,已经无法满足海量的并发请求,拆分势在必行。
在微服务的大潮之中, 架构师小明把系统拆分成了多个服务,根据需要部署在多个机器上,这些服务非常灵活,可以随着访问量弹性扩展。
世界上没有免费的午餐, 拆分成多个“微服务”以后虽然增加了弹性,但也带来了一个巨大的挑战:服务之间互相调用的开销。
比如说:原来用户下一个订单需要登录,浏览产品详情,加入购物车,支付,扣库存等一系列操作,在单体应用的时候它们都在一台机器的同一个进程中,说白了就是模块之间的函数调用,效率超级高。
现在好了,服务被安置到了不同的服务器上,一个订单流程,几乎每个操作都要越网络,都是远程过程调用(RPC), 那执行时间、执行效率可远远比不上以前了。
远程过程调用的第一版实现使用了HTTP协议,也就是说各个服务对外提供HTTP接口。 小明发现,HTTP协议虽然简单明了,但是废话太多,仅仅是给服务器发个简单的消息都会附带一大堆无用信息:
GET /orders/1 HTTP/1.1
Host: order.myshop.com
User-Agent: Mozilla/5.0 (Windows NT 6.1; )
Accept: text/html;
Accept-Language: en-US,en;
Accept-Encoding: gzip
Connection: keep-alive
......
看看那User-Agent,Accept-Language ,这个协议明显是为浏览器而生的!但是我这里是程序之间的调用,用这个HTTP有点亏。
能不能自定义一个精简的协议? 在这个协议中我只需要把要调用方法名和参数发给服务器即可,根本不用这么多乱七八糟的额外信息。
但是自定义协议客户端和服务器端就得直接使用“低级”的Socket了,尤其是服务器端,得能够处理高并发的访问请求才行。
小明复习了一下服务器端的socket编程,最早的Java是所谓的阻塞IO(Blocking IO), 想处理多个socket的连接的话需要创建多个线程, 一个线程对应一个。
这种方式写起来倒是挺简单的,但是连接(socket)多了就受不了了,如果真的有成千上万个线程同时处理成千上万个socket,占用大量的空间不说,光是线程之间的切换就是一个巨大的开销。
更重要的是,虽然有大量的socket,但是真正需要处理的(可以读写数据的socket)却不多,大量的线程处于等待数据状态(这也是为什么叫做阻塞的原因),资源浪费得让人心疼。
后来Java为了解决这个问题,又搞了一个非阻塞IO(NIO:Non-Blocking IO,有人也叫做New IO), 改变了一下思路:通过多路复用的方式让一个线程去处理多个Socket。
这样一来,只需要使用少量的线程就可以搞定多个socket了,线程只需要通过Selector去查一下它所管理的socket集合,哪个Socket的数据准备好了,就去处理哪个Socket,一点儿都不浪费。
好了,就是Java NIO了!
小明先定义了一套精简的RPC的协议,里边规定了如何去调用一个服务,方法名和参数该如何传递,返回值用什么格式......等等。然后雄心勃勃地要把这个协议用Java NIO给实现了。
可是美好的理想很快被无情的现实给击碎, 小明努力了一周就意识到自己陷入了一个大坑之中,Java NIO虽然看起来简单,但是API还是太“低级”了,有太多的复杂性,没有强悍的、一流的编程能力根本无法驾驭,根本做不到高并发情况下的可靠和高效。
小明不死心,继续向领导要人要资源,一定要把这个坑给填上,挣扎了6个月以后,终于实现了一个自己的NIO框架,可以执行高并发的RPC调用了。
然后又是长达6个月的修修补补,小明经常半夜被叫醒:生产环境的RPC调用无法返回了! 这样的Bug不知道改了多少个。
在那些不眠之夜中,小明经常仰天长叹:我用NIO做个高并发的RPC框架怎么这么难呐!
一年之后,自研的框架终于稳定,可是小明也从张大胖那里听到了一个让他崩溃的消息: 小明你知道吗?有个叫Netty的开源框架,可以快速地开发高性能的面向协议的服务器和客户端。 易用、健壮、安全、高效,你可以在Netty上轻松实现各种自定义的协议!咱们也试试?
小明赶紧研究,看完后不由得“泪流满面”:这东西怎么不早点出来啊!
好了,这个故事我快编不下去了,要烂尾了。
说说Netty到底是何方神圣, 要解决什么问题吧。
像上面小明的例子,想使用Java NIO来实现一个高性能的RPC框架,调用协议,数据的格式和次序都是自己定义的,现有的HTTP根本玩不转,那使用Netty就是绝佳的选择。
其实游戏领域是个更好的例子,长连接,自定义协议,高并发,Netty就是绝配。
因为Netty本身就是一个基于NIO的网络框架, 封装了Java NIO那些复杂的底层细节,给你提供简单好用的抽象概念来编程。
注意几个关键词,首先它是个框架,是个“半成品”,不能开箱即用,你必须得拿过来做点定制,利用它开发出自己的应用程序,然后才能运行(就像使用Spring那样)。
一个更加知名的例子就是阿里巴巴的Dubbo了,这个RPC框架的底层用的就是Netty。
另外一个关键词是高性能,如果你的应用根本没有高并发的压力,那就不一定要用Netty了。
netty是什么
Netty是一个基于Java NIO的client-server网络服务框架,人们可以利用netty快速地开发网络应用。同时netty相对于其他网络框架更加简单并且扩展性更强,这主要得益于其提供的简单易用的api将业务逻辑和网络处理代码解耦开来。能够使你更加专注于业务的实现而不需要太多关心网络底层实现。
异步设计
netty所有的api都是异步的。异步处理已经不是什么新鲜事了,众所周知,IO已经变为一个应用的瓶颈,而异步处理正是为了解决这个问题出现的。
CallBacks机制
CallBacks机制经常应用于异步处理,人们可以指定方法执行完后的回调函数,在JavaScript中,回调机制是其语言的核心。下面代码展示了如何利用回调机制处理接受数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public
interface Fetcher {
void fetchData(FetchCallback callback);
}
public
interface FetchCallback {
void onData(Data data);
void onError(Throwable cause);
}
public
class Worker {
public void doWork() {
Fetcher fetcher = ...
fetcher.fetchData(
new FetchCallback() {
public void onData(Data data) { #1
System.out.println(
"Data received: " + data);
}
public void onError(Throwable cause) { #2
System.err.println(
"An error accour: " + cause.getMessage());
}
});
... }
}
|
#1 没有出现错误,调用onData
#2 出现错误信息,调用onError
你可以将回调函数从当前线程移植到其他线程,但是并不能保证回调函数被执行。当你将多个异步回调函数串起来的时候会形成spaghetti code(管式代码),有些人认为这样的代码很难读,但JavaScript以及Node.js都是这种风格。
Futures机制
异步处理使用的第二个机制是Future机制。一个Future对象只有在特定情况下才会有值,Future对象要么是调用者的返回结果,要么是一个异常。Java在java.util.concurrent包中提供了供其线程池机制使用的Future接口,例如当你使用ExecutorService.submit()提交一个Runable任务时,就可以返回一个Future对象,利用Future对象可以判断该任务是否完成。如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
ExecutorService executor = Executors.newCachedThreadPool();
Runnable task1 =
new Runnable() {
public void run() {
doSomeHeavyWork();
}
... }
Callable<Interger> task2 =
new Callable() {
public Integer call() {
return doSomeHeavyWorkWithResul();
}
... }
Future<?> future1 = executor.submit(task1);
Future<Integer> future2 = executor.submit(task2);
while (!future1.isDone() || !future2.isDone()) {
...
// do something else
...
}
|
CallBacks和Future是异步处理中最常用的两种机制,实际上无法判断两种机制的优劣,而Netty则会两种都提供,你可以自由选择使用哪种机制。
JVM中的阻塞与非阻塞比较
随着web应用的持续增长,如何提升网络应用的效率变得尤为重要。幸运的是从1.4版本开始,java提供了NIO API来供我们编写更有效率的网络应用。Java 7中又引入的NIO.2不仅仅是之前api的升级,同时也允许我们更加高效方便地编写异步代码。
New or non-blocking?
The N in NIO is typically thought to mean non-blocking rather than new.NIO has beenaround for so long now that nobody calls it new IO anymore. Most people refer to it as non-blocking IO
上图所示为典型的阻塞IO模式,一个线程处理一个网络连接,因此应用能够处理连接的个数是由JVM上允许建立的线程个数决定的。
再来看下非阻塞IO模式,上图运用selector机制来处理多个连接。下面通过一个回显服务器示例来讲解非阻塞及阻塞IO的区别。
阻塞IO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
public
class PlainEchoServer {
public void serve(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port); #1
try {
while (
true) {
final Socket clientSocket = socket.accept(); #2
System.out.println(
"Accepted connection from " +
clientSocket);
new Thread(new Runnable() { #3
public void run() {
try {
BufferedReader reader =
new BufferedReader(
new
InputStreamReader(clientSocket.getInputStream()));
PrintWriter writer =
new PrintWriter(clientSocket
.getOutputStream(),
true);
while (true) { #4
writer.println(reader.readLine());
writer.flush();
}
}
catch (IOException e) {
e.printStackTrace();
try {
clientSocket.close();
}
catch (IOException ex) {
// ignore on close
}
}
}
}).start(); #5
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
|
# 1 绑定监听端口
# 2 阻塞至有新连接进来
# 3 新建线程用来处理客户端连接
# 4 从客户端读取数据并回写
# 5 启动线程
上述服务器代码要求每次连接进来一个请求就需要创建一个新的线程,即使使用线程池也仅能解决一时问题,不能再根本上解决问题:客户端的连接数取决于后台处理线程的个数。当连接数多时则会带来大问题。
非阻塞IO
在介绍NIO之前,我们先了解一些NIO的基本知识
BYTEBUFFER
ByteBuffer在Netty中即为重要,其主要是用来缓存数据的。ByteBuffer既可以分配到堆内存中也可以分配到堆外内存。一般来说,堆外内存能够更加快速地传递给channel,但分配和释放会更耗时。新旧的NIO API对ByteBuffer提供了统一的管理。ByteBuffer能够实现无拷贝地在各个实例之间共享,同时允许对可见数据进行切片和其他操作处理。
Slicing
Slicing a ByteBuffer allows to create a new ByteBuffer that share the same data as the intialByteBuffer but only expose a sub-region of it. This is useful to minimize memory copies whilestill only allow access to a part of the data
ByteBuffer有以下几个重要的操作
- 将数据写进ByteBuffer
- 调用ByteBuffer.flip()切换到读模式
- 从ByteBuffer中读取数据
- 调用ByteBuffer.clear()或者ByteBuffer.compact()来整理ByteBuffer内存
当往ByteBuffer中写数据时,ByteBuffer会通过更新buffer中write index的位置来跟踪buffer中的数据(也可以手动更新)。当需要从ByteBuffer中读取数据时,需要调用flip()来切换到读模式,flip()会将buffer的读起始位置设置为0,这样就可以读取buffer中所有数据了。
为了能够再次向ByteBuffer中写数据,可以将buffer模式切换到写模式并调用任意下列两个方法。
- ByteBuffer.clear():清除ByteBuffer
- ByteBuffer.compact():通过内存拷贝清除已经读过的数据
ByteBuffer.compact()会将所有未读的数据拷贝到buffer的起始位置。如下所示为ByteBuffer的使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
Channel inChannel = ....;
ByteBuffer buf = ByteBuffer.allocate(
48);
int bytesRead = -
1;
do {
bytesRead = inChannel.read(buf); #1
if (bytesRead != -
1) {
buf.flip(); #2
while(buf.hasRemaining()){
System.out.print((char) buf.get()); #3
}
buf.clear(); #4
}
}
while (bytesRead != -
1);
inChannel.close();
|
#1 从channel中读取数据到ByteBuffer
#2 切换模式至读模式
#3 读取buffer中的数据,每次调用一个get()会将buffer当前位置更新+1
#4 切换buffer至写模式,使其可以重新写
使用Selector模式
Selector可以监听多个IO是否可以读/写,这样一个Selector就可以用来处理多个连接,相比于阻塞IO每个连接占用一个线程,Selector模式更加高效。
通过以下几个操作就可以轻松运用Selector
- 在channels上创建一个或多个Selector
- 在channel上注册需要监听的事件,目前支持四种事件
- OP_ACCEPT:socket-accept事件
- OP_CONNECT:socket-connect事件
- OP_READ:可读事件
- OP_WRITE:可写事件
- channel注册后,调用Selector.select()方法阻塞直到上述注册的一个事件发生
- 当Selector.select()返回时,可以通过SelectionKey实例获取所有可操作的事件
下面EchoServer是基于非阻塞Selector的服务器代码,运用这个版本的Server可以运用一个线程处理上千个连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
public
class PlainNioEchoServer {
public void serve(int port) throws IOException {
System.out.println(
"Listening for connections on port " + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
InetSocketAddress address =
new InetSocketAddress(port);
ss.bind(address); #1
serverChannel.configureBlocking(
false);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT); #2
while (
true) {
try {
selector.select(); #3
}
catch (IOException ex) {
ex.printStackTrace();
// handle in a proper way
break;
}
Set readyKeys = selector.selectedKeys(); #4
Iterator iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove(); #5
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel client = server.accept(); #6
System.out.println(
"Accepted connection from" + client);
client.configureBlocking(
false);
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100)); #7
}
if (key.isReadable()) { #8
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
client.read(output); #9
}
if (key.isWritable()) { #10
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
client.write(output); #11
output.compact();
}
}
catch (IOException ex) {
key.cancel();
try {
key.channel().close();
}
catch (IOException cex) {
}
}
}
}
}
}
|
#1 绑定Server的port
#2 注册channel的OP_ACCEPT Selector事件,监听新连接
#3 阻塞直到有新的连接事件到来
#4 获取所有可操作的SelectedKey实例
#5 遍历SelectedKey实例,将遍历过的去除
#6 获取新的连接
#7 将新的连接注册到Selector中,并监听读/写事件
#8 检查SelectKey是否可读
#9 读数据
#10 检测是否可写
#11 写数据
上述代码实现起来比较繁琐,新的NIO API去掉了大部分繁琐的过程,使实现起来更加简单明了
基于NIO.2的EchoServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
public
class PlainNio2EchoServer {
public void serve(int port) throws IOException {
System.out.println(
"Listening for connections on port " + port);
final AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(port);
serverChannel.bind(address); #1
final CountDownLatch latch =
new CountDownLatch(
1);
serverChannel.accept(
null,
new
CompletionHandler<AsynchronousSocketChannel, Object>() { #2
public void completed(final AsynchronousSocketChannel channel, Object attachment) {
serverChannel.accept(null, this); #3
ByteBuffer buffer = ByteBuffer.allocate(
100);
channel.read(buffer, buffer,
new EchoCompletionHandler(channel)); #4
public void failed (Throwable throwable, Object attachment){
try {
serverChannel.close(); #5
}
catch (IOException e) {
// ingnore on close
}
finally {
latch.countDown();
}
}
});
try
{
latch.await();
}
catch(
InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
private
final
class EchoCompletionHandler implements
CompletionHandler<Integer, ByteBuffer> {
private
final AsynchronousSocketChannel channel;
EchoCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
channel.write(buffer, buffer, new CompletionHandler<Integer, #6
ByteBuffer>() {
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this); #7
}
else {
buffer.compact();
channel.read(buffer, buffer,
EchoCompletionHandler.this); #8
}
}
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
}
catch (IOException e) {
// ingnore on close
}
}
});
}
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
}
catch (IOException e) {
// ingnore on close
}
}
}
}
|
#1 绑定Server的port
#2 监听新连接到来,一旦有新的连接接入则会调用CompletionHandler
#3 重新监听连接接入事件
#4 在channel上触发读操作,一单有数据可读EchoCompletionHandler将会被触发
#5 出现错误时关闭channel
#6 注册写回调事件,通#4
#7 当buffer中还有数据时再次注册写事件
#8 同#4,注册CompletionHandler回调读事件
上述代码看起来要比之前的更加复杂,按NIO2.0自己实现了loop事件,我们在使用的时候只需要简单地注册自己感兴趣的事件即可。
非阻塞应用存在的问题以及Netty是如何解决的
跨平台及兼容性问题
非阻塞应用一般都会有跨平台问题,一个NIO应用在Linux上可以运行但在Window无法运行,同时还需要对低版本的兼容。NIO2.0只能在java7之后的版本运行,但它提供了一套统一的管理api,使其也能够在更低完本的jdk上运行,只不过有些功能受到了限制。
修复e-poll bug
在Linux系统上,Java的NIO的Selector运用的是较为高效的e-poll机制,但是当连接较少时会存在一个很严重的bug导致cpu占用率很高。
1
2
3
4
5
6
7
8
9
10
11
|
...
while (
true) {
int selected = selector.select(); #1
Set<SelectedKeys> readyKeys = selector.selectedKeys();
Iterator iterator = readyKeys.iterator(); #2
while (iterator.hasNext()) { #3
... #4
}
}
...
...
|
在Linux系统中,#1处并没有阻塞,而是返回0,这样while循环会到时cpu上升到100%,即使到现在,这个问题仍然存在,不过幸运的是Netty避免了这个问题。
Netty In Action(二)
第一个Netty程序
本章将运用netty建立一个Echo Server和Client来熟悉Netty的特性
Echo Server
一个Netty服务器包含以下两个主要部分:
- Bootstrapping:用来配置服务器熟悉,包括线程及端口。
- Server Handler:Server组件,包括各种如何处理新的链接等各种业务逻辑的实现
启动Server
我们通过创建一个ServerBootStrap实例来启动一个Server。如下面代码所示,通过配置实例的端口号,线程(事件)模型以及处理各种业务逻辑的handler来实现一个Netty Server。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public
class EchoServer {
private
final
int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group =
new NioEventLoopGroup();
try {
ServerBootstrap b =
new ServerBootstrap();
//1
b.group(group)
//2
.channel(NioServerSocketChannel.class)
//2
.localAddress(
new InetSocketAddress(port))
//2
.childHandler(
new ChannelInitializer<SocketChannel>() {
//3
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new EchoServerHandler());
//4
}
});
ChannelFuture f = b.bind().sync();
//5
System.out.println(EchoServer.class.getName() +
//6
" started and listen on " + f.channel()
.localAddress());
//7
f.channel().closeFuture().sync();
//8
}
finally {
//9
group.shutdownGracefully().sync();
//10
}
}
public static void main(String[] args) throws Exception {
if (args.length !=
1) {
System.err.println(
"Usage: "+EchoServer.class.getSimpleName() +
"<port >");
}
int port = Integer.parseInt(args[
0]);
new EchoServer(port).start();
}
}
|
- 创建Server实例
- 指定NIO(异步IO)协议及网络地址
- 向channel pipline中添加handler
- 绑定服务器,等待服务器关闭并释放资源
为了运行一个服务器,首先要创建一个ServerBootstrao实例(1),因为我们用的是NIO传输协议,所以需要指定NioEventLoopGroup来接受并处理新的连接,指定NioServerSocketChannel作为channel的类型,同时需要设置Server绑定的InetSocketAddress才能够接受新的连接(2)
下一步,通过创建一个子channel(child channel, 3)来指定当一个新的连接到来时执行的动作,在这里运用了ChannelInitializer类型。由于ChannelPipeline中包含多个handler,所以我们将新建的EchoServerHandler添加到最后(4)。
在(5)处,通过调用sync()方法来阻塞绑定我们的Server直到成功,同样在(8),我们阻塞调用Server的close接口,直到Server关闭,在(10),我们可以关闭EventLoopGroup并释放包括创建的线程在内的所有资源。
简化一下以上的步骤:
- 创建一个用于启动Server的ServerBootstrap示例,并在后边绑定它
- 创建一个NIOEventLoopGroup实例来处理各种事件,例如接受新连接,接受新数据,写数据等等。
- 指定Server需要绑定的本地的InetSocketAddress
- 创建一个childHandler来指定每一个新连接到来时需要处理的业务逻辑
- 当上述所有步骤都完成后,调用ServerBootstrap.bind()来绑定Server。
实现业务逻辑
我们通过继承ChannelInboundHandlerAdapter并复写messageReceived方法来实现我们的接收数据并回写数据的业务逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public
class EchoServerHandler extends ChannelInboundHandlerAdapter {
//1
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(
"Server received: "+msg);
ctx.write(msg);
//2
}
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
//3
}
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStracktrace();
//4
ctx.close();
//5
}
}
|
- Shareable注解说明该handler可以在多个channel之间共享
- 接受新的数据并将其回写,注意在这里还未将数据“flush”到客户端
- 将之前所有的数据flush到客户端,并关闭channel
- 打印异常日志
- 出现异常时关闭channel
Netty的handler提供各种各样的接口“钩子”,我们可以通过复写不同的“钩子”来实现不同的业务逻辑,但这些钩子中只有channelRead是必须的。
拦截异常
处理复写channelRead方法来实现业务逻辑外,我们可以通过复写exceptionCaught来处理Exception或者Throwable等异常。
实现echo client
一个echo client需要包括以下几个功能:
- 连接到服务器
- 写数据
- 接受服务器传回的数据
- 关闭连接
启动client
启动一个client与启动一个Server较为相似,如下代码所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public
class EchoClient {
private
final String host;
private
final
int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group =
new NioEventLoopGroup();
try {
Bootstrap b =
new Bootstrap();
//1
b.group(group)
//2
.channel(NioSocketChannel.class)
//3
.remoteAddress(
new InetSocketAddress(host, port))
//4
.handler(
new ChannelInitializer<SocketChannel>() {
//5
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new EchoClientHandler());
//6
} });
ChannelFuture f = b.connect().sync();
//7
f.channel().closeFuture().sync();
//8
}
finally {
group.shutdownGracefully().sync();
//9
}
}
public static void main(String[] args) throws Exception {
if (args.length !=
2) {
System.err.println(
"Usage: " + EchoClient.class.getSimpleName() +
" <host> <port>");
return; }
// Parse options.
final String host = args[
0];
final
int port = Integer.parseInt(args[
1]);
new EchoClient(host, port).start();
}
}
|
- 创建一个bootstrap
- 指定EventLoopGroup来处理客户端事件,运用NioEventLoopGroup
- 指定socket channle
- 设置需要连接的网络地址
- 利用ChannelInitializer指定channelHandler
- 将EchoClientHandler添加到ChannelPipeline中
- 调用sync()方法连接到远程服务器
- 阻塞等到连接关闭
- 关闭bootstrap和线程池,并释放所有资源
几个重要的步骤:
- 创建Bootstrap实例
- 创建NioEventLoopGroup并用来处理各种事件:创建新连接,读写数据等
- 指定需要连接的服务器地址
- 指定连接创建后调用的handler
- 以上步骤完成后,调用Bootstrap的connect方法连接到远程服务器。
实现客户端业务逻辑
我们通过继承SimpleChannelInboundHandlerAdapter并复写其方法来实现客户端的业务逻辑。目前,我们只需要以下三个方法即可:
- channelActive():当客户端与服务端连接建立时调用
- channelRead0():从服务端接收到数据时调用
- exceptionCaught():发生异常时调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
//1
public
class EchoClientHandler extends
SimpleChannelInboundHandlerAdapter<ByteBuf> {
public void channelActive(ChannelHandlerContext ctx) {
ctx.write(Unpooled.copiedBuffer(
"Netty rocks!", CharsetUtil.UTF_8);
//2
}
public void channelRead0(ChannelHandlerContext ctx,
ByteBuf in) {
System.out.println(“Client received: “ + ByteBufUtil
//3
.hexDump(in.readBytes(in.readableBytes())));
}
public void exceptionCaught(ChannelHandlerContext ctx, //4
Throwable cause) {
cause.printStracktrace();
ctx.close();
}
}
|
- 能够在多个channel共享handler
- 当连接建立时,向服务端发送数据
- 调用hexdump方法打印接收到的数据
- 打印异常
当连接建立时,channelActive方法将会被调用,并向服务端发送一条数据。当接到新的数据时channelRead0方法将会被调用,但需要注意的时客户端接收到的数据可能是不完整的,一个五字节的数据可能会分两次被传输。第一次传输连个字节,第二次传输三个字节。但在TCP协议或者其他面向流的协议来说,这种传输是可以保障顺序的。exceptionCaught被用来捕捉异常,并关闭连接。
或许你会疑问我们在EchoClientHandler为什么继承SimpleChannelInboundHandlerAdapter而不是像EchoServerHandler中继承ChannelInboundHandlerAdapter。最主要的原因是当你使用ChannelInboundHandlerAdapter时你需要自己释放资源,例如当使用ByteBuf时你需要调用ByteBuf.release()方法释放资源,而使用SimpleChannelInboundHandlerAdapter你不需要关心资源的释放,因为当channelRead0执行完毕时系统会自动释放资源。在Netty中,所有实现了ReferenceCounted接口的messages都会自动释放。
概述
上篇给大家介绍了 Netty 的基本用法,这次主要给大家讲一下更加高级的用法,主要分成两个方面客户端以及服务端实现,我们将实现一个简单的 Echo 程序。
服务端
服务端的实现,主要从这几个方面考虑:
最佳线程模型,实现高并发,高稳定性
容错机制
业务处理
心跳监测
那么用 Netty 怎么一一实现这些呢,废话不多说,直接上代码。
public class NormalNettyServer {
private int serverPort = 9000;
private String serverIp = "192.168.2.102";
public NormalNettyServer(int port){
serverPort = port;
}
public void start() throws Exception {
// 创建Accpet线程池 (1)
EventLoopGroup bossGroup = new NioEventLoopGroup(10);
// 创建Work线程池
EventLoopGroup workGroup = new NioEventLoopGroup(10);
try{
// 创建ServerBootstrap (2)
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup). // (3)
channel(NioServerSocketChannel.class). //(4)
childHandler(new ChannelInitializer<SocketChannel>() { // 初始化处理handler (5)
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 加入用户心跳监测机制 读时间超时 60s 写时间超时 10s 读写都没有超时 10s
ch.pipeline().addLast("timeout", new IdleStateHandler(60, 10, 10, TimeUnit.SECONDS));
// 加入业务处理handler
ch.pipeline().addLast("echo", new EchoHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128) // (6)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(serverIp, serverPort).sync(); // (7)
f.channel().closeFuture().sync();
} finally {
// 释放资源
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}注:示例代码中的阿拉伯数字与文字说明中的阿拉伯数字是一一对应解释。
EventLoopGroup 是用来处理 I/O 操作多线程事件循环池, 我们知道 Netty是基于 Reactor 模型的,其中一种是双线程池,这里就创建了两个线程循环池,管理10个线程,bossGroup 主要用于处理 Accpet 请求,建立连接后的处理主要是由 WorkGroup 负责。
ServerBootstrap 是一个启动 NIO 服务的辅助启动类
注册两个线程循环池。
NioServerSocketChannel 这里直接引用官方说明更合适
A ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections.
SocketChannel 是 TCP 连接的网络通道,在下面两种情况会创建
打开一个 SocketChannel 连接某台服务器。
一个新连接到达 ServerSocketChannel时,会创建一个SocketChannel。
当通道建立,会调用初始化操作,将业务处理的 handler 加入到 pipeline 中。这里主要加入应用层心跳监测以及应用层业务处理。通道支持参数配置,这里配置了两个参数 SO_BACKLOG, SO_KEEPALIVE,具体作用大家可以看访问链接(http://netty.io/5.0/api/io/netty/channel/ChannelOption.html)
绑定 host 以及监听端口,返回的 ChannelFuture, 由于这个过程是异步的,所有执行状态可以通过 ChannelFuture 中的获取,这将在客户端的实现中重点介绍。
通过以上步骤,我们就可以建立一个高效的服务,对于一个写C++的我,只能说真的很爽。言归正传,下面我们介绍一下两个handler的处理,看代码。
public class EchoHandler extends SimpleChannelInboundHandler<Object> { // (1)
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf buf = (ByteBuf)o; // (2)
byte[] packet = new byte[buf.readableBytes()];
buf.readBytes(packet);
// pb 协议 (3)
HelloTest.Hello hello = HelloTest.Hello.parseFrom(packet);
System.out.println(hello.getContent().toString());
channelHandlerContext.channel().writeAndFlush(buf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// (4)
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 新连接active
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接关闭
super.channelInactive(ctx);
}注:示例代码中的阿拉伯数字与文字说明中的阿拉伯数字是一一对应解释。
Netty的接受数据处理Handler都是继承 SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter。
Netty数据的收发,都采用ByteBuf。
这里采用了PB协议,具体用法,这里不详细讲,大家先看看,之后会有文章介绍。因为是Echo, 所有收到的数据直接原包返回。
Channel异常处理,这里可以做一些容错操作。
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
private ConnectionClient client;
public HeartBeatHandler(ConnectionClient client) {
this.client = client;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
//TODO(1) 可以做监测处理
} else if (event.state() == IdleState.READER_IDLE) {
// TODO(2)
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}注:示例代码中的阿拉伯数字与文字说明中的阿拉伯数字是一一对应解释。
写数据超时,服务端已经有一段时间没有对该连接通道发送数据。
读数据超时,服务端已经有一段时间没有接受该连接通道的数据。
通过这两个,我们可以对该通道做检查,比如发送心跳指令,监测客户端是否还存在。
这就是最简单服务端实现,麻雀虽小,五脏俱全,实现了我们最开始说的几个方面的内容。按照这个顺序,我们接下来介绍客户端实现。
客户端
客户端的实现往往都会有这样几个要求:
断线重连
心跳维持
业务处理
接下来,我们将会用Netty来实现这些功能,由于代码比较多,下面主要截取重要函数做说明。
public boolean connect() {
Bootstrap b = new Bootstrap(); // (1)
final HeartBeatHandler hearthandler = new HeartBeatHandler(this);
final ClientHandler handler = new ClientHandler(this);
EventLoopGroup loop = new NioEventLoopGroup(); // (2)
b.group(loop).channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// idle状态处理,主要是用于与服务端发送心跳 (3)
pipeline.addLast(new IdleStateHandler(60, 20, 0, TimeUnit.SECONDS));
pipeline.addLast("hearthandler", hearthandler);
// 业务处理
pipeline.addLast("handler", handler);
}
});
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.TCP_NODELAY, true);
ChannelFuture future = b.connect(host, port);
future.addListener(new ConnectionListener(this)); // (4)
return true;
}注:示例代码中的阿拉伯数字与文字说明中的阿拉伯数字是一一对应解释。
Bootstrap客户端启动辅助类,与ServerBootstrap相对。
客户端不需要处理连接请求,所有只需定义一个多线程事件循环池来处理channel事件就可以。
客户端的业务处理handler,包含心跳处理,以及业务处理。
ChannelFuture添加连接情况监听,用于实现重连。
其他的含义与服务端相同,请参考服务端说明,这里重点介绍重连机制的实现,重连主要是由两个点来触发的,一个是ChannelFuture, 一个Handler的InActive事件。
public class ConnectionListener implements ChannelFutureListener {
// 此处省略多行代码 .......
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 连接失败
System.out.println("connect reconnect");
this.client.reconnect(future.channel());
} else {
// 连接成功
System.out.println("connect success");
this.client.setChannel(future.channel());
}
}
}
public class ClientHandler extends ChannelInboundHandlerAdapter {
// 此处省略多行代码 .......
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 断开连接
System.out.println("SuperServer is disconnect " + ctx.channel().remoteAddress().toString());
client.reconnect(ctx.channel());
super.channelInactive(ctx);
}
}
public void reconnect(final Channel ch) {
final EventLoop eventLoop = ch.eventLoop();
eventLoop.schedule(new Runnable() {
@Override
public void run() {
connect();
System.out.println("reconnect server:" + host + ", Port:" + port);
}
}, 10L, TimeUnit.SECONDS);
}
以上就是具体的重连的实现过程,两个触发点,大家记住即可。另外心跳的实现与服务端类似,只不过在业务的处理有可能不同。比如 服务要考虑是否断开客户端,回收资源。而客户端要考虑的是是否要重连。
总结
本文主要侧重于实现,并没有去解释Netty的每个函数的实现细节,说的话估计要三天三夜,而且枯燥不堪,我还是喜欢直接使用,不过网上这方面有很多资料,大家自己可以去看,希望对大家有所帮助。文章涉及的源代码已经上传到GitHub(https://github.com/cosysun/NettyDemo.git)。