服务器端的代码(同样我也会在很多地方写上备注的)[其实如果认真看代码注释基本都可以理解了]
publicclass TimeServer{
public void bind(int port) throws Exception {
//EventLoopGroup 是循环事件组,其中有两个循环事件组
//一个循环事件组主要是处理接入连接的事件
//另外一个循环事件组主要是处理应用的业务,
//就是说做向已经已经建立连接的channel读取数据和发出数据
EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//这个主要是引导Netty的配置的,我把它当成了一个configration了,不知道合不合适~
ServerBootstrap bootstrap = new ServerBootstrap();
//group就是刚刚创建两个循环事件组应用到Netty中,Channel非常重要:我们使用的是NioServerSocketChannel,就是说使用Nio的服务器Socket
bootstrap.group(bossLoopGroup, workerLoopGroup).channel(NioServerSocketChannel.class)
//其他选择这里我们使用了SO_BACKLOG,
.option(ChannelOption.SO_BACKLOG, 1024)
//childHandler是我们最需要花时间的地方,主要用作处理所有业务的地方(文章会详细说)注意:ChildChannelHandler是自己实现的下文有详细代码
.childHandler(new ChildChannelHandler());
//ChannelFuture后面的文章会详细介绍,现在主要知道他是一个Future回调就行,bind绑定端口号,并指定为同步执行
ChannelFuture future =bootstrap.bind(port).sync();
//关闭channel,但是这里如果没有什么异常按照我的代码逻辑基本上不会执行
future.channel().closeFuture().sync();
} finally {
//如果结束就把事件循环组关闭
bossLoopGroup.shutdownGracefully();
workerLoopGroup.shutdownGracefully();
}
}
//注意这个ChildChannelHandler 继承与 ChannelInitializer
//里面会有个一个channel,channel当中可以设置一个pipeline
public static class ChildChannelHandler extends ChannelInitializer {
@Override
protected void initChannel(Channel channel) throws Exception {
// TODO Auto-generated method stub
System.out.println(">>>>>initChannel by ChildChannelHandler");
//第一层设置行的解码最大为1024
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
//将二进制流转成string
channel.pipeline().addLast(new StringDecoder());
//这个是我们自己实现的最后一个Handler 主要是做接收到消息的业务的
channel.pipeline().addLast(new TimeServerHandler());
//这里可以看到pipleline里面的东西就像是一个filter 一层层将消息进行过滤处理
}
}
//我们处理消息的类
public static class TimeServerHandler extends ChannelHandlerAdapter {
int counter = 0;
@Override
public void channelRead(ChannelHandlerContextctx, Object msg) throws Exception {
// TODO Auto-generated method stub
//由于我们上面pipeline中添加了StringDecoder进行解码所以我们可以直接转型为string
String request = (String) msg;
System.out.println("The time server receive order :" +request);
System.out.println(">>>> counter : " + (++counter));
String response = null;
if (request.equalsIgnoreCase("QUERY TIMEORDER")){
response = String.valueOf(System.currentTimeMillis());
} else {
response = "BAD ORDER";
}
response += "\n";
//使用Unpooled工具类(由Netty提供)将byte数组转成ByteBuf
ByteBuf responseBuf = Unpooled.copiedBuffer(response.getBytes());
//将有内容的ByteBuf通过ChannelHandlerContext发送结果给客户端
ctx.write(responseBuf);
}
//消息读取完成的回调
@Override
public void channelReadComplete(ChannelHandlerContextctx) throwsException{
// TODO Auto-generated method stub
ctx.flush();
}
//出现异常的回调
@Override
public void exceptionCaught(ChannelHandlerContextctx, Throwable arg1) throws Exception {
// TODO Auto-generated method stub
super.exceptionCaught(ctx, arg1);
ctx.close();
}
//可以看到这个类就是一个事件回调,比我们之前写的NIO的循环selector那个鬼东西好多了。
}
}
最后使用Main方法将其进行调用即可:
publicstatic void main(String[] args) {
System.out.println("Starting timeserver ... ");
try {
new TimeServer().bind(1890);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
开始要说点概念深入理解上面所使用的类了
Bootstrap 和 ServerBootstrap :上面的代码也看到这个基本上是一个配置的类,Bootstrap 是最要是针对客户端的,ServerBootstrap服务器端的,这个基本不用多解释了。
EventLoopGroup:上面代码的注释已经有写的比较清楚了,就是一个事件循环的功能,需要注意的是,Bootstrap 只使用一个EventLoopGroup 因为他不需要处理大量客户端的连接请求,但是ServerBootstrap 为了避免性能瓶颈,所以讲连接和处理已经连接的channel分开成两个EventLoopGroup。在Netty in action 一书中有一张图,我这里就引入一下吧
当然我们也可以只使用一个EventLoopGroup这样接收连接和处理已经连接的消息也变成同一个EventLoopGroup 情况就会变成这样:
所以最好就是分开两个EventLoopGroup,如果想进一步了解EventLoopGroup 后面有机会我也会出一遍博客详细说面EventLoopGroup的工作原理,顺便我也尝试一下看看它的源代码究竟里面做了些什么,以后有机会~~~ 先继续这个课题
ChannelHandler:这个真的是非常重点,刚刚我们上面的代码已经看见了,非常想一个filter过滤器,一层层的将消息进行处理,消息经过不同的Handler之后到了我们最后自己实现的TimeServerHandler,获得就是一个字符串的指令了,我们也不需要进行任何的转码操作,事实上在使用Netty的时候会用到非常多不一样的Handler去解决非常多的需求。上面代码中我们在ServerBootstrap中设置了一个childHandler,而这个childHandler是我们自己继承自ChanneIinitializer的,里面有个非常重要的实现initChannel方法,我们就在这个方法中添加若干个handler的,这些handler都会添加到channel中的pipeline当中。
这里需要说明的是,这些handler都继承与ChannelHandler,有两个最为重要的派生类channelInboundHandler和channelOutboundHandler。正如字面理解的就是说,一个是处理入站消息和出站消息的,这些handler会按照pipeline的顺序进行一个个传递,继续偷图:
正如上图所示,Inbound 是从第一个InboundHandler开始跑的,一直跑到最后一个InboundHandler,而OutboundHandler是刚刚相反,从最后一个OutboundHandler开始跑,一直跑到最前面的OutboundHandler。而且pipeline非常智能的跳过不合适的Handler。
最后有一个比较需要注意的事情,在Netty当中发送消息有两种方法:第一种方法是直接在channel里面调用write方法,另外一种是通过ChannelHandlerContext里面调用write方法,两者有如下区别:
通过channel调用:消息会从ChannelPipeline的尾部开始。
通过ChannelHandlerContext调用:消息会从当前的Handler开始往上传。
Decoder & Encoder & Adapter:Netty提供非常多的Adapter类,让Handler变得非常简单,如果细心的你应该可以发现我们最后一个处理业务的Handler并不是继承与什么ChannelHandler的,而是继承与一个ChannelHandlerAdapter的,如果刚刚所说每个ChannelHandler都负责转发事件到下一个ChannelHandler中,在ChannelHandlerAdapter这些操作得到了更好的封装,我们只需要利用ChannelHandlerAdapter类或者是它的子类就可以帮我们轻松实现编码和解码、业务等功能,但是一般情况下解码和编码工作我们不会去继承ChannelHandlerAdapter类而是使用MessageToMessageDecoder等这些类而这些类是继承与我们的ChannelHandlerAdapter的。
解码器和编码器(Decoders&&Encoders),Netty提供非常多不同的编码器例如:ByteToMessageDecoder和MessageToByteEncoder、ProtobufEncoder、ProtobufDecoder、MessageToMessageDecoder等等。其中我们上面的代码就使用了一个StringDecoder。在后面我的关于Netty会用到不同的Encoder和Decoder,会更加深入的理解。
Domainlogic 业务逻辑,Netty也提供了一个类去处理我们业务,上面我们的代码直接使用了ChannelHandlerAdapter进行现实业务的,但是Netty也提供了一个SimpleChannelInBoundHandler<I>进行业务处理,其中I是消息的类型,当然我们刚刚使用StringDecoder所以消息转回来就肯定是String类型了,所以我们可以把我们的TimeServerHandler写成这样:
public static class TimeServerHandler2 extends SimpleChannelInboundHandler<String>{
int counter = 0;
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("The time server receive order : " + msg);
System.out.println(">>>> counter : " + (++counter));
String response = null;
if (msg.equalsIgnoreCase("QUERY TIME ORDER")) {
response = String.valueOf(System.currentTimeMillis());
} else {
response = "BAD ORDER";
}
response += "\n";
//使用Unpooled工具类(由Netty提供)将byte数组转成ByteBuf
ByteBuf responseBuf = Unpooled.copiedBuffer(response.getBytes());
//将有内容的ByteBuf通过ChannelHandlerContext发送结果给客户端
ctx.writeAndFlush(responseBuf);
}
}
现在基本对Netty有比较大概的了解的,之后后续的文章会继续扩展知识内容,会利用Netty实现IM服务器、HTTP服务器、文件服务器等,现在我们先继续我们的TimeServer的客户端:
public class TimeClient {
//连接是事件服务器
public void connect(String host, int port) throws Exception {
//客户端不需要两个事件循环组。
EventLoopGroup group = new NioEventLoopGroup();
try {
//这里使用的是Bootstrap而不是ServerBootstrap
Bootstrap bootstrap = new Bootstrap();
//option 使用TCP不延迟
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
//使用匿名类的方式初始化channelPipeline
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// TODO Auto-generated method stub
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
//同样使用StringDecoder转成String类型
channel.pipeline().addLast(new StringDecoder());
//我们实现的Handler主要处理接收到数据的业务
channel.pipeline().addLast(new TimeClientHandler());
}
});
//请求连接服务器,这里会阻塞
ChannelFuture future = bootstrap.connect(host, port).sync();
//关闭channel
future.channel().closeFuture().sync();
} finally {
//如果结束就把事件循环组关闭
group.shutdownGracefully();
}
}
public static class TimeClientHandler extends ChannelHandlerAdapter {
private ByteBuf requestMsg = null;
private int counter = 0;
public TimeClientHandler() {
// TODO Auto-generated constructor stub
}
//channel已经有效,可以发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
//发100次请求
for (int i = 0; i < 100; i++) {
byte[] data = ("QUERY TIME ORDER\n").getBytes();
//使用Unpooled工具类(由Netty提供)将byte数组转成ByteBuf
requestMsg = Unpooled.copiedBuffer(data);
//将请求时间指令的ByteBuf通过ChannelHandlerContext的Channel对象发送给服务器端
ctx.writeAndFlush(requestMsg);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// TODO Auto-generated method stub
//获得服务器请求
String response = (String) msg;
System.out.println("Time Server response : " + response);
System.out.println(">>counter : " + (++counter));
//全部返回后关闭连接
if(counter >= 99){
ctx.close();
}
}
//出现异常的回调
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// TODO Auto-generated method stub
ctx.close();
}
}
}
调用方式也是一样的:
public static void main(String[] args) throws Exception {
System.out.println("Connecting time server ...");
new TimeClient().connect("127.0.0.1", 1890);
}
---------------------
作者:TONY Yan
来源:CSDN
原文:https://blog.csdn.net/tony308001970/article/details/70387779
版权声明:本文为博主原创文章,转载请附上博文链接!