netty-future和promise的演示以及速率控制的案例
一、future和promise的演示
1、Future
Future 模式 是处理异步执行时的一个处理模式,它有二种编程模式。
第一种:将来式- 让执行线程,可以拿到异步调用的一个标记对象,如果执行线程调用future.get(),那么就能够等待异步执行的结果。
第二种:回调式- 通过给Future添加回调的方式,通过回调拿到异步执行的结果
案例1:
/**
* 案例1:使用JDk的Future模式 -- 将来式
*/
@Test
public void test1() throws Exception {
long begin = System.currentTimeMillis();
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("执行耗时操作...");
excuteAsync();
return 100;
}
});
System.out.println("计算结果:" + future.get());// get会阻塞主线程 - 直至结果完成
System.out.println("主线程运算耗时:" + (System.currentTimeMillis() - begin) + "ms");
}
// 从上面的结果可以看出,执行线程的异步操作交给了另外一个线程执行,并且拿到了Future标记,如果.get可以阻塞至结果完成!
案例2:
/**
* 案例2:使用Netty的Future模式 -- 回调式 (因为jdk的Futrue不能注册监听)
* 什么是回调Future?即异步执行的结果,是通过我在Future上注册的监听拿到。
*/
@Test
public void test2() throws Exception {
EventExecutorGroup group = new DefaultEventExecutorGroup(4); // Netty线程池
long begin = System.currentTimeMillis();
io.netty.util.concurrent.Future<Integer> f = group.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("执行耗时操作...");
excuteAsync();
return 100;
}
});
// 注册的监听可以拿到异步执行的结果
f.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Object> objectFuture) throws Exception {
System.out.println("计算结果::" + objectFuture.get());
System.out.println("真实运行结果时间:" + ( System.currentTimeMillis() - begin ) + "ms");
}
});
System.out.println("主线程运算耗时:" + (System.currentTimeMillis() - begin) + "ms");
// 阻塞主线程
new CountDownLatch(1).await();
}
// 上面的结果可以看出,执行线程对异步结果的接收就是通过注册一个监听回调方式获取, 主线程只执行了很短的时间。
看了回调的Furue的模式,我们可以联想到js的异步操作方式,js异步操作,原生就是无限回调模式,而ES6的Promise的出现,将异步操作包装成了一个Promise对象,调用回调方法resolve(),就可以 在then方法回调处理结果,这样虽然执行起来还是异步的,但是代码却看起来是同步的了。
下面看看Jdk的CompletableFuture对Future添加监听器
案例3:
* 案例3: 下面我们用jdk的CompletableFuture演示类似Promise效果
*/
@Test
public void test3() throws Exception {
long l = System.currentTimeMillis();
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("执行耗时操作...");
excuteAsync();
return 100;
});
completableFuture.whenComplete((result, e) -> {
System.out.println("结果:" + result);
});
System.out.println("主线程运算耗时:" + (System.currentTimeMillis() - l) + "ms");
new CountDownLatch(1).await();
}
这个和js的promise使用起来的不同点,js的异步完成是通过resolve函数接收,这个是通过异步执行函数的返回。netty对Future做了一个升级版,就是Promise,promise除了能够添加监听外,还可以手动控制异步执行的完成。
2、promise
案例4:
/**
* 案例4:netty的promise的基本使用演示
* netty的promise对于future除了可以添加监听器外,最主要的是,promise对象可以设置完成状态等操作
*/
@Test
public void test4() throws InterruptedException {
// 构建promise --- > js new Promise()
Promise<String> promise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
// 给promise添加监听 --- > js Promise.then(ele -> {} )
promise.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<String>>(){
@Override
public void operationComplete(io.netty.util.concurrent.Future<String> future) throws Exception {
if (future.isSuccess()) {
System.out.println("消息成功 success:" + future.get());
} else {
System.out.println("消息失败 failure:" + future.cause());
}
}
});
// 给promise设置完成状态 --- > js resolve() 调用
new Thread(() -> {
excuteAsync();
promise.setSuccess("结果"); // 设置完成状态 , 会触发监听
}).start();
new CountDownLatch(1).await();
}
二、速率控制的案例
1、demo功能介绍
速率控制主要是通过ChannelTrafficShapingHandler(客户端)和GlobalTrafficShapingHandler(服务端)的编码解码器,设置参数为10,默认单位是byte/s。
比如:
ch.pipeline().addLast(new ChannelTrafficShapingHandler(10, 10)); // 客户端
ch.pipeline().addLast(new GlobalTrafficShapingHandler(ch.eventLoop().parent(), 10, 10)); // 服务端
客户端就是常规接收服务端消息后,发送一个uuid消息给服务端,服务端channelActive后,就需要发送一个消息给客户端,并且发送完成后再次发送一个消息给客户端,这个就需要Promise模式监控完成后才发送,并且服务端我们需要做一个速率监控,验证上面使用的速率控制编码解码器是否如实有效。
2、代码
客户端:
public class NettyClient {
public static void main(String[] args) throws Exception {
new NettyClient().bind("127.0.0.1", 7000);
}
public void bind(String address, int port) throws Exception {
EventLoopGroup loopGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(loopGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new ChannelTrafficShapingHandler(10, 10)); // 速率控制
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new MyClientHandler());
}
});
ChannelFuture future = b.connect(address, port).sync();
future.channel().closeFuture().sync();
} finally {
loopGroup.shutdownGracefully();
}
}
}
public class MyClientHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("客户端接收消息:" + msg + ",长度:" + msg.length());
ctx.write(UUID.randomUUID().toString() + "\r\n", ctx.voidPromise());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel().close();
}
}
服务端:
public class NettyServer {
public static void main(String[] args) {
NettyServer server = new NettyServer();
server.bind(7000);
}
public void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 流量控制 速率 10 bytes/s
ch.pipeline().addLast(new GlobalTrafficShapingHandler(ch.eventLoop().parent(), 10, 10));
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new MyServerHandler());
}
});
ChannelFuture future = b.bind(port).sync();
System.out.println("server start now");
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyServerHandler extends SimpleChannelInboundHandler<String> {
private Runnable task; // 监控任务
private AtomicLong msgLeng = new AtomicLong(); // 消费消息长度
private AtomicLong firstTime = new AtomicLong(System.currentTimeMillis()); // 记录上次时间
/**handler添加时触发**/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
task = () -> {
while (true) {
try {
// 获取消息
long length = msgLeng.getAndSet(0);
if (0 == length)
continue;
// 计算时间
long currentTime = System.currentTimeMillis();
long time = currentTime - firstTime.get();
firstTime.set(currentTime);
// 计算速率
System.out.println("数据发送速率(KB/S):" + length * 1000 / time);
Thread.sleep(50);
} catch (InterruptedException ignored) {
}
}
};
super.handlerAdded(ctx);
}
/**channel连接后发送数据,发送数据完成,promise触发监听,实际上调用是我们传入的回调函数,再次发送数据**/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
sendData(ctx);
new Thread(task).start();
}
public void sendData(ChannelHandlerContext ctx) {
// 写数据时第二个参数是Promise,这样写完数据后,可由传入的promise设置setSuccess() or setFailfua
ctx.write("111111111122222222223333333333\r\n",
getChannelProgressivePromise(ctx, new Consumer<ChannelProgressiveFuture>() {
@Override
public void accept(ChannelProgressiveFuture channelProgressiveFuture) {
if (ctx.channel().isWritable()) {
sendData(ctx);
}
}
}));
}
public ChannelProgressivePromise getChannelProgressivePromise(ChannelHandlerContext ctx,
Consumer<ChannelProgressiveFuture> completedAction) {
// 创建Promise
ChannelProgressivePromise channelProgressivePromise = ctx.newProgressivePromise();
// Promise创建监听
channelProgressivePromise.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total)
throws Exception {
System.out.println("服务端处理数量:" + progress);
msgLeng.getAndSet(progress);
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("服务端提醒:消息发送成功!");
// action的accept方法会递归调用发送数据方法
Optional.ofNullable(completedAction).ifPresent(action -> action.accept(future));
}
}
});
return channelProgressivePromise;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("NettyServer接收到消息:" + msg);
}
}
最后执行效果,打印是10 byte/s的速率,而我们客户端没有监控,但是发送的字节是uuid32个,最后服务端读取客户端数据的速度和服务端写数据Promise完成打印节奏是一致的,说明对速率解码器的验证是正确的。
end!