一、Netty引言
基于NIO一款异步通讯框架,因为在使用上相比较Mina
较为简单,开发门槛低导致了Netty在互联网开发中受到绝大多数商用项目成功验证,导致了Netty成为NIO开发的首选框架。
“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括 FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
在设计上: 针对多种传输类型的统一接口 - 阻塞和非阻塞; 简单但更强大的线程模型;真正的无连接的数据报套接字支持;链接逻辑支持复用;
在性能上:比核心 Java API 更好的吞吐量,较低的延时;资源消耗更少,这个得益于共享池和重用;减少内存拷贝
在健壮性上:消除由于慢,快,或重载连接产生的 OutOfMemoryError;消除经常发现在 NIO 在高速网络中的应用中的不公平的读/写比
在安全上:完整的 SSL / TLS 和 StartTLS 的支持且已得到大量商业应用的真实验证, 如: Hadoop 项目的 Avro (RPC 框架)、 Dubbo、 Dubbox等RPC框架
二、实例
- 导入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
- Netty的服务端开发
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//①、启动服务引导 框架启动引导类,屏蔽网络通讯配置信息
ServerBootstrap sbt= new ServerBootstrap();
//②、创建请求转发,响应线程池
NioEventLoopGroup master = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
//③、关联线程池组
sbt.group(master,worker);
//④、设置服务端实现类
sbt.channel(NioServerSocketChannel.class);
//⑤、配置服务端RPC通讯管道 【关注点】
sbt.childHandler(new ServerChannelInitializer());
//⑥、设置服务器的端口并且启动服务
System.out.println("我在9999监听...");
ChannelFuture f = sbt.bind(9999).sync();
//⑦、关闭通信管道
f.channel().closeFuture().sync();
//⑧、释放资源
master.shutdownGracefully();
worker.shutdownGracefully();
}
}
- Netty的客户端开发(Server端改动下)
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//①、启动服务引导 框架启动引导类,屏蔽网络通讯配置信息
Bootstrap bt= new Bootstrap();
//②、创建请求转发,响应线程池
NioEventLoopGroup worker = new NioEventLoopGroup();
//③、关联线程池组
bt.group(worker);
//④、设置服务端实现类
bt.channel(NioSocketChannel.class);
//⑤、配置客户端RPC通讯管道 【关注点】
bt.handler(new ClientChannelInitializer());
//⑥、设置服务器的端口并且启动服务
ChannelFuture f = bt.connect("127.0.0.1",9999).sync();
//⑦、关闭通信管道
f.channel().closeFuture().sync();
//⑧、释放资源
worker.shutdownGracefully();
}
}
- 总结下API记忆技巧:
第一步、创建服务引导(服务端ServerBootstrap与客户端Bootstrap);
第二步、确定网络编程线程(服务端两个,客户端一个);
第三步、关联起来;
第四步、实现类(服务端NioServerSocketChannel和客户端NioSocketChannel);
第五步、配置自定义的RPC通讯管道;
第六步、服务端启动服务,客户端多设置ip启动服务;
第七步、关闭通信管道
第八步、释放资源
三、Netty的管道浅析 —— “学Netty,就是学管道”
- 解释:
管道:消息交换的介质
环:加功能、不同环只处理固定方向的stream
红色:输入upstream、编码器、将对象转成字节流、先编码,再编帧
橙色:输出downstream、解码器、将字节流转成对象、先解帧,再解码
编码 - 将对象变成字节、编帧 - 切分对象,方便解帧、解帧 - 固定规则拆分、解码 - 字节变成对象
管道末端:挂载最终处理者,作用收发对象
管道前端:挂载编解码器
所以最终关注的,需要书写的是,最终处理者和编解码器
四、开发管道
- 初始化管道末端
xxxChannelInitializer
- ServerChannelInitializer
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//通信管道
ChannelPipeline pipeline = ch.pipeline();
//挂载最终处理者
pipeline.addLast(new ServerChannelHandlerAdapter());
}
}
- ClientChannelInitializer
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//通信管道
ChannelPipeline pipeline = ch.pipeline();
//挂载最终处理者
pipeline.addLast(new ClientChannelHandlerAdapter());
}
}
- 挂载最终处理者
xxxChannelHandlerAdapter
Server:
channelRead:被动接收发消息
exceptionCaught:异常捕获
Client:
channelActive:发送消息
channelRead:接收消息
exceptionCaught:异常捕获
- ServerChannelHandlerAdapter
创建服务端最终处理者
public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter {
/**
* 收发消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ChannelFuture channelFuture = ctx.writeAndFlush(msg);
//关闭通道
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
/**
* 异常捕获
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println(cause.getMessage());
}
}
- ClientChannelHandlerAdapter
创建客户端最终处理者
public class ClientChannelHandlerAdapter extends ChannelHandlerAdapter {
/**
* 主动发送消息
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String msg="你好,我服务器!!!!";
ByteBuf buf=ctx.alloc().buffer();
buf.writeBytes(msg.getBytes());
ctx.writeAndFlush(buf);
}
/**
* 被动收发消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf= (ByteBuf) msg;
System.out.println("收到:"+buf.toString(CharsetUtil.UTF_8));
}
/**
* 异常捕获
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println(cause.getMessage());
}
}
五、字节流ByteBuf基本使用
- ByteBuf
ByteBuf
区别于ByteBuffer
不同。因为ByteBuf是一个可边长的字节数组是Netty提供专门用于操作字节工具类。
该对象通过readIndex
/writeIndex
控制ByteBuf。在初始情况下readIndex
和writeIndex
都等于0;当用户写数据的时候writeIndex
自动增长字节数,可读的区域就等于writeIndex-readIndex
。
/**
* 或者其他创建方式 new PooledByteBufAllocator().buffer();
* new UnpooledByteBufAllocator(true).buffer();
*/
ByteBuf buf= Unpooled.buffer(5);// r=0,w=0,c=5(可以自动扩容)
buf.writeByte((byte)'a');
buf.writeByte((byte)'b');
buf.writeByte((byte)'c'); // r=0,w=3,c=5
byte v1 = buf.readByte();
byte v2 = buf.readByte(); // r=2,w=3,c=5
//readableBytes() 剩余可读字节
int i = buf.readableBytes();// w-r=1
//discardReadBytes() 清除已读过数据,即修正readIndex和writeIndex
buf.discardReadBytes(); // w-=r,r=0 ,c=5
相比于 ByteBuffer操作简单些。
注意ByteBuf
是Netty提供缓冲区,不能够给NIO使用,仅仅简化对缓冲区的操作。省略flip/clear
方法。Netty管道默认只支持发送ByteBuf
和FileRegion
都代表字节数据,一般在RPC系统使用ByteBuf
传送基础数据(java对象);如果用户使用Netty开发文件传输系统。用户可以直接将一个文件拆分成多个FileRegion
对象.
- FileRegion
将一个文件切分成多个区间,实现对文件随机读取。(可以实现断点续传,并行拷贝)
FileChannel fi=new FileInputStream("xxx文件A").getChannel();
FileChannel fo=new FileOutputStream("xxx文件B",true).getChannel();//SocketChannel
//读取xxx文件A从0位置开始读,读取100个字节
FileRegion region= new DefaultFileRegion(fi,0,100);
//从当前Region的0位置,拷贝100个字节数据到Xxx文件B中
region.transferTo(fo,0);
FileRegion
在网络间传递文件系统。综上所述Netty的通道只支持发送以上两种对象类型,如果用户希望发送自定义类型就必须尝试将发送的数据转换为ByteBuf
或者FileRegion
.
六、如何捕获Netty在传输过程中出现的异常信息?【面试题】
//发送消息
ChannelFuture channelFuture = ctx.writeAndFlush(cmd);
//添加序列化异常捕获
channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
//出错切断连接
channelFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
对象编码器,作用于输出,使得Netty支持传输任意Java对象。
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import org.apache.commons.lang3.SerializationUtils; import java.io.Serializable; import java.util.List; public class ObjectEncoder extends MessageToMessageEncoder<Object> { /** * * @param ctx * @param msg 需要编码的对象 * @param out 编码数据帧 * @throws Exception */ @Override protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { ByteBuf buf=ctx.alloc().buffer(); byte[] bytes= SerializationUtils.serialize((Serializable) msg); buf.writeBytes(bytes); out.add(buf);//生成数据帧 } }
对象解码,将任意的收到ByteBuf转换为Object
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import org.apache.commons.lang3.SerializationUtils; import java.util.List; public class ObjectDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { byte[] bytes=new byte[msg.readableBytes()]; msg.readBytes(bytes); Object obj = SerializationUtils.deserialize(bytes); out.add(obj); } }
七、完整版本Netty服务端【重点】
- HostAndPort
封装ip和端口
public class HostAndPort {
private String host;
private int port;
public HostAndPort(String host, int port) {
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
- MethodInvokeMeta
测试用方法对象
public class MethodInvokeMeta implements Serializable {
private String method;//方法名
private Class<?>[] parameterTypes;//参数类型
private Object[] agrs;//参数
private String targetClass;//目标类信息
public MethodInvokeMeta(String method, Class<?>[] parameterTypes, Object[] agrs, String targetClass) {
this.method = method;
this.parameterTypes = parameterTypes;
this.agrs = agrs;
this.targetClass = targetClass;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getAgrs() {
return agrs;
}
public void setAgrs(Object[] agrs) {
this.agrs = agrs;
}
public String getTargetClass() {
return targetClass;
}
public void setTargetClass(String targetClass) {
this.targetClass = targetClass;
}
@Override
public String toString() {
return "MethodInvokeMeta{" +
"method='" + method + '\'' +
", parameterTypes=" + Arrays.toString(parameterTypes) +
", agrs=" + Arrays.toString(agrs) +
", targetClass='" + targetClass + '\'' +
'}';
}
}
- NettyServer
简洁方式NettyServer
帧头图图
public class NettyServer {
//1.启动服务引导 框架启动引导类,屏蔽网络通讯配置信息
private ServerBootstrap sbt = new ServerBootstrap();
//2.创建请求转发、响应线程池
private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup worker = new NioEventLoopGroup();
private int port;
public NettyServer(int port) {
this.port = port;
//3.关联线程池组
sbt.group(boss, worker);
//4.设置服务端实现类
sbt.channel(NioServerSocketChannel.class);
}
public static void main(String[] args) throws InterruptedException {
NettyServer nettyServer = new NettyServer(9999);
try {
nettyServer.start();
} finally {
nettyServer.close();
}
}
public void start() throws InterruptedException {
//5.配置服务端RPC通讯管道 - 需要关注的东西
sbt.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加数据帧解码
/**
* @param maxFrameLength 帧的最大长度 '65535'
* @param lengthFieldOffset length字段偏移的地址 '0'
* @param lengthFieldLength length字段所占的字节长 '2'
* @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段 '0'
* @param initialBytesToStrip 解析时候跳过多少个长度 '2'
* @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异 '有不写这个参数的构造'
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
//添加对象解码器
pipeline.addLast(new ObjectDecoder());
//添加数据帧编码器
/**
* 两个字节长度的帧头协议
*/
pipeline.addLast(new LengthFieldPrepender(2));
//添加对象编码器
pipeline.addLast(new ObjectEncoder());
//添加最终处理者
pipeline.addLast(new ChannelHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println(cause.getMessage());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器收到:" + msg);
//收消息 ChannelFuture异步操作的结果
ChannelFuture channelFuture = ctx.writeAndFlush(msg);
//关闭通道,ps测试多对象时关闭
//channelFuture.addListener(ChannelFutureListener.CLOSE);
channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
channelFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
});
//6.设置服务器的端口并且启动服务
System.out.println("我在9999监听...");
ChannelFuture f = sbt.bind(port).sync();
//7.关闭通信管道
f.channel().closeFuture().sync();
}
public void close() {
//8.释放资源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
- NettyClient
简洁方式NettyClient
public class NettyClient {
//1.启动服务引导 框架启动引导类,屏蔽网络通讯配置信息
private Bootstrap bt=new Bootstrap();
//2.创建请求转发、响应线程池
private EventLoopGroup worker=new NioEventLoopGroup();
public NettyClient() {
//3.关联线程池组
bt.group(worker);
//4.设置客户端实现类
bt.channel(NioSocketChannel.class);
}
public static void main(String[] args) throws InterruptedException {
NettyClient nettyClient = new NettyClient();
try {
HostAndPort hostAndPort = new HostAndPort("127.0.0.1", 9999);
MethodInvokeMeta invokeMeta = new MethodInvokeMeta("sum",
new Class[]{Integer.class, Integer.class},
new Object[]{1, 2}, "com.netty.service.DemoService");
Object res = nettyClient.call(hostAndPort, invokeMeta);
System.out.println(res);
}finally {
nettyClient.close();
}
}
public Object call(HostAndPort hostAndPort, final Object cmd) throws InterruptedException {
final List<Object> res=new ArrayList<Object>();
//5.配置客户端RPC通讯管道 - 需要关注的东西
bt.handler(new ChannelInitializer(){
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加数据帧解码
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
//添加对象解码器
pipeline.addLast(new ObjectDecoder());
//添加数据帧编码器
pipeline.addLast(new LengthFieldPrepender(2));
//添加对象编码器
pipeline.addLast(new ObjectEncoder());
//添加最终处理者
pipeline.addLast(new ChannelHandlerAdapter(){
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送消息
ChannelFuture channelFuture = ctx.writeAndFlush(cmd);
//多发两个,共发三个对象
channelFuture = ctx.writeAndFlush(cmd);
channelFuture = ctx.writeAndFlush(cmd); channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
channelFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//收消息
res.add(msg);
}
});
}
});
//6.设置服务器的端口并且启动服务
ChannelFuture f = bt.connect(hostAndPort.getHost(),hostAndPort.getPort()).sync();
//7.关闭通信管道
f.channel().closeFuture().sync();
return res.get(0);
}
public void close(){
//8.释放资源
worker.shutdownGracefully();
}
}
- 自定义的 ObjectEncoder
自定义对象编码器
public class ObjectEncoder extends MessageToMessageEncoder<Object> {
/**
*
* @param ctx
* @param msg 需要编码的对象
* @param out 编码数据帧
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
ByteBuf buf=ctx.alloc().buffer();
byte[] bytes= SerializationUtils.serialize((Serializable) msg);
buf.writeBytes(bytes);
out.add(buf);//生成数据帧
}
}
- 自定义的 ObjectDecoder
自定义对象解码器
public class ObjectDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
byte[] bytes=new byte[msg.readableBytes()];
msg.readBytes(bytes);
Object obj = SerializationUtils.deserialize(bytes);
out.add(obj);
}
}
- 测试结果,多对象可正常取到