1、什么是Netty
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框 架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。那么传统的NIO有什么缺点呢
NIO的缺点:
-
1、NIO类库和API复杂。光一个Buffer缓冲区就有position,limit,capacity等很多参数,而且读写切换又要切换position和limit的值,而且你还需要熟练掌握Selector,ServerScoketChannel,SocketChannel等
-
2、要熟悉JAVA多线程编程,因为NIO设计到Reactor模式,你必须对多线程非常熟悉,才能写出性能优越的程序
-
3、开发工作量和难度都非常大,例如客户端的断线重连,网络闪断,半包读写等等
-
4、JDK NIO 的 Bug:臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到JDK 1.7版本该问题仍旧存在,没有被根本解决
2、先从一个简单的聊天室DEMO开始
1、创建一个空的Maven项目,引入Netty依赖,版本只要是4.xxx版本的都行,不要使用5.xx版本,5.xx版本有问题
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.69.Final</version>
</dependency>
复制代码
2、创建服务端和服务端处理器NettyChatServerHandler.class
public class NettyChatServer {
private int port;
public NettyChatServer(int port) {
this.port = port;
}
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try{
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//添加String类型的编码器
pipeline.addLast(new StringDecoder());
//添加String类型的解码器
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyChatServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(port);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()) {
System.out.println("端口绑定成功");
}else {
System.out.println("端口绑定失败");
}
}
});
System.out.println("服务端启动成功");
channelFuture.channel().closeFuture().sync();
}catch (Exception e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
NettyChatServer nettyChatServer = new NettyChatServer(9998);
nettyChatServer.start();
}
}
复制代码
public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> {
public static List<Channel> channelList = new CopyOnWriteArrayList<>();
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
Channel channel = channelHandlerContext.channel();
for(Channel channelToSend : channelList) {
//排除自身通道
if(channel != channelToSend) {
channelToSend.writeAndFlush(channel.remoteAddress().toString().substring(1) +
"发送的消息:" + msg);
}
}
}
/**
* 通道就绪事件,当有新的客户端连接的时候将通道放入集合
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelList.add(channel);
System.out.println("客户端【" + channel.remoteAddress().toString().substring(1)+"】上线");
}
/**
* 客户端断开连接
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelList.remove(channel);
System.out.println("客户端【" + channel.remoteAddress().toString().substring(1)+"】下线");
}
/**
* 异常处理事件
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
复制代码
3、创建客户端和客户端处理器NettyChatClientHandler.class
public class NettyChatClient {
public void start() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try{
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9998).sync();
Channel channel = channelFuture.channel();
System.out.println("客户端连接成功");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
//给服务端发送消息
String sendMsg = scanner.nextLine();
channel.writeAndFlush(sendMsg);
}
channelFuture.channel().closeFuture().sync();
}catch (Exception e) {
e.printStackTrace();
}finally {
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyChatClient nettyChatClient = new NettyChatClient();
nettyChatClient.start();;
}
}
复制代码
public class NettyChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg);
}
}
复制代码
4、测试 先启动服务端,然后再启动多个客户端,在一个客户端发送消息,在另外一个客户端接收消息,这是一个简单的群聊,可以开启多个客户端进行测试
3、核心组件之-EventLoop
可以理解成EventLoopGroup中的工作线程,但是实际上它并不是一个线程,它里面包含了一个线程,控制着该线程的生命周期
4、核心组件之-EventLoopGroup
无论是在客户端还是服务端我们首先做的第一步都是创建一个EventLoopGroup,只不过服务端需要创建二个,客户端只需要一个。你可以把它理解成是一个线程池,也就是EventLoop的一个集合,EventLoop 可以理解成是线程。那么服务端为什么要创建二个EventLoopGroup所谓的线程池呢??
在NIO编程中,有一个Selector选择器,它有一个线程会轮询注册到Selector的通道,只要有IO事件,就会将IO事件交给对应的线程去处理。所以,在上面的DEMO中
// 它就类似于Selector的作用,监控是否有事件产生,如果有事件产生就交给其它线程去做
EventLoopGroup bossGroup = new NioEventLoopGroup();
//这个线程池就是专门用来干活的,如果有事件产生,bossGroup就会将事件交给这个线程池里面的线程去干活
EventLoopGroup workerGroup = new NioEventLoopGroup();
复制代码
举个生活中的例子,你老板就是bossGroup,员工就是workerGroup。老板负责在外面接需求,有需求了就把需求交给员工来做。所以说bossGroup是不用真正做事的,它只需要将IO事件交给workerGroup中的线程去做就行
5、核心组件之-ServerBootStrap/BootStrap
他们是服务端(ServerBootStrap)和客户端(BootStrap)的引导类,负责启动整个Netty服务的。比如说一辆车子会有很多的零件,而他们就是把所有零件(Netty的各种组件)组装在一起之后,汽车才能正常跑。
6、核心组建之-ByteBuf
NIO已经提供了Buffer缓冲区,那Netty为什么还要自己搞一个ByteBuf出来呢
NIO的Buffer操作:
- 1、包含三个属性:position limit capacity
- 2、写模式,position从0开始,表示下一个可写的位置,limit等于capacity
- 3、读模式,position重置为o,表示下一个可读的位置,limit等于切换前的position的位置,capacity不变
- 4、通过flip()方法切换为读模式
- 5、通过clear()方法或者compact()方法清除(部分)数据
- 6、通过rewind()方法重新读取或者重新写入数据
- 7、通过buf.put()或者channel.read(buf)方法写入数据
- 8、通过buf.read()或者channel.write(buf)方法读取数据
是不是很复杂,总之要是我肯定是不想用这么复杂的API去编程,而且越复杂写出来的程序问题就相对越多,而且要写出性能优越的NIO程序,你必须对NIO非常熟悉
Netty的ByteBuf:
-
1、只提供了一个读指针(readIndex),写指针(writeIndex),让操作变得简单了
-
2、读写切换只需要从对应的指针开始就行,NIO中还要进行切换,还要重新设置position,limit等参数的值
-
3、Netty提供了多种缓冲区类型的Buffer
- 1、Pooled和UnPooled:池化和非池化,所谓池化就是在初始化时候就分配好一块内存,每次创建ByteBuf的时候直接从这个内存池中分配一块连续的内存给ByteBuf用,用完了再还回去。非池化就是完全利用JVM本身的内 存管理能力来管理对象的生命周期,即对象的内存分配完全交给JVM来管理,我们不管内存的分配和回收
- 2、Heap和Direct:堆内存和直接内存,堆内存就是JVM本身的堆内存。直接内存独立于JVM内存之外的内存空间,直接向操作系统申请一块内存空间
- 3、Safe和UnSafe:安全和非安全,UnSafe底层是使用Java的UnSafe类直接操作底层的数据结构,直接利用对象在内存中的指针来操作对象,所以比较危险
7、核心组建之-ChannelHandler
客户端发送消息那服务端接受到消息就要进行业务处理,而Netty中对消息的业务处理都在ChannelHandler中,就跟DEMO中的服务端处理器(NettyChatServerHandler.class)和客户端处理器(NettyChatClientHandler.class),我们可以自定义一个ChannelHandler,然后在该处理器中实现对消息的处理。总之就是一句话:接收消息,对消息进行业务处理都是在该组件中实现的
8、核心组建之-ChannelPipeline
在Netty中我们可以定义多个事件处理器Handler,消息会经过一系列的Handler处理器之后才会到达服务器,整个过程就类似于责任链模式一样,如下图,而服务端回写数据也会经过一系列的处理器处理,只不过与接收数据的顺序反向而已
ChannelPipeline可以理解成条管道,里面有多个Handler,事件会流经该管道里面的每一个Handler进行处理,只有当消息经过所有的Handler处理之后服务器才能接收到客户端发送过来的消息
9、核心组建之-Channle
Netty中的Channel就是将NIO的Channel进一步封装
10、核心组建之-ChannelHandlerContext
ChannelHandlerContext保存着Channel的上下文,同时关联着一个ChannelHandler,通过ChannelHadlerContext,ChannelHandler才能与ChannelPipeline或者其他的ChannelHandlker进行交互
11、核心组建之-ChannelFuture
Netty中的IO都是异步的,那么异步的话就会返回将来用来获取返回值的对象,也就是Future,通过ChannelFuture我们可以知道IO是否操作成功,是否已经完成,是否已取消等等,比如服务端是否创建成功
ChannelFuture channelFuture = bootstrap.bind(port);
//通过回调查看是否成功
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()) {
System.out.println("端口绑定成功");
}else {
System.out.println("端口绑定失败");
}
}
});
System.out.println("服务端启动成功");
复制代码
12、核心组建之-ChannleOption
ChannleOption保存了很多参数,可以让我们更好的配置Channel,更多的配置在ChannelOption.class类中配置