「Offer 驾到,掘友接招!我正在参与2022春招系列活动-经验复盘,点击查看 活动详情
在之前的文章当中,我介绍了关于升级SpringCloudGateway3.1.1遇到的一些bug,很高兴得到了一些同学们的反馈,同时也给我提出了一些问题,我经过分析,确实是自己在使用层面存在了问题,下文针对这个问题做一下阐述。前面的文章地址:SpringCloudgateWay升级到3.1.1版本你遇到这些坑了吗?
问题描述
前面我介绍了使用WebClient的方式,替代SpringCloudGateway3.1.1当中FeignClient的调用方式,有的同学按照我提供的方式进行了尝试。
发现了下面的问题:当请求量很小的时候,没有问题,但是当请求量增加后,会出现整个服务阻塞的情况,后面的请求不能发起。
问题复现
我使用一个登录接口做问题的复现,代码如下所示:
/**
* 登录
*
* @param userDTO
* @return com.wjbgn.bsolver.gateway.util.dto.Result
* @author weirx
* @date: 2022/3/14
*/
@PostMapping("/login")
public Result login(@RequestBody UserDTO userDTO) {
// 密码md5加密
userDTO.setPassword(MD5.create().digestHex(userDTO.getPassword()));
// 1、Webclient调用接口
Mono<Boolean> monoInfo = webClientBuilder
.build().post().uri(USER_VALIDATE_PATH)
.body(BodyInserters.fromValue(userDTO)).header(HttpHeaders.CONTENT_TYPE, "application/json")
.retrieve().bodyToMono(Boolean.class);
// 2、异步调用block方法,否则会报错,因为block的内部方法blockingGet是同步方法。
CompletableFuture<Boolean> voidCompletableFuture = CompletableFuture.supplyAsync(() ->
monoInfo.block());
try {
// 3、获取结果
Boolean result = voidCompletableFuture.get();
if (result) {
// 用户存在,直接生成token,登录成功n
String token = JwtUtil.generateToken(userDTO.getUsername());
// 将token放入redis
redisUtil.setObjectExpire(JwtUtil.REDIS_TOKEN_PREFIX + userDTO.getUsername(), token, Duration.ofMinutes(JwtUtil.REDIS_TOKEN_EXPIRE_MINUTE));
return Result.success("登陆成功", new UserDTO(userDTO.getUsername(), token));
} else {
return Result.failed("用户名不存在或密码错误");
}
} catch (Exception e) {
log.info("登录失败,msg = {}" ,e.getMessage());
}
return Result.failed("登录失败");
}
复制代码
如上所示,有三个关键点值得我们注意:
- WebClient接口调用,返回值
Mono
- 调用获取结果的方法
monoInfo.block()
,必须使用异步调用的方式 - 获取结果,定义CompletableFuture的结果为接口返回值
Bealoon
。
以上就是我的测试代码,经过我不断地通过这个接口进行登录,发现最后却是接口被阻塞了,一直不会响应:
经过我的验证,确实出现了这个问题,下面我们来分析下问题出现的原因。
问题分析
需要注意的一点,我们之所以使用异步的方式去获取Mono的结果,因为其block
方法从名字就能看出是一个阻塞的方法,而只有当我们调用了Block方法
,整个请求才会被发送。
试想一下,如果我们使用同步方式,那么多个请求来的时候,所有的请求都会变成同步的方式,那么整体服务的并发度将变成串行的,且只要接口阻塞,那么所有的请求都被阻塞了。
所以我们要使用异步的方式去执行这个block方法
。
虽然使用了异步调用的方方式,我们看到前面在请求量增大的时候,仍然出现阻塞的状况了。为什么呢?
我们的登陆接口调用了用户服务的验证用户接口
,假设我们的用户验证接口
只能接受10个并发,当我们大量用户登陆的时候,必然导致用户服务的验证接口
阻塞,不能处理过多的请求,那么block
方法就会阻塞等待其返回,最终导致大量的请求堆积,没有响应。
所以你即异步调用,也无法应付下游服务
阻塞的情况,这个需要我们来进行异常情况的处理。
源码分析
block源码
@Nullable
public T block() {
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber();
this.subscribe((Subscriber)subscriber);
return subscriber.blockingGet();
}
复制代码
我们重点管制最后一行代码blockingGet
方法,从名称就能知道这是一个阻塞的get方法
@Nullable
final T blockingGet() {
if (Schedulers.isInNonBlockingThread()) {
throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
} else {
if (this.getCount() != 0L) {
try {
this.await();
} catch (InterruptedException var3) {
this.dispose();
throw Exceptions.propagate(var3);
}
}
Throwable e = this.error;
if (e != null) {
RuntimeException re = Exceptions.propagate(e);
re.addSuppressed(new Exception("#block terminated with an error"));
throw re;
} else {
return this.value;
}
}
}
复制代码
看下isInNonBlockingThread
是什么,如下所示,当前线程是否属于NonBlocking
public static boolean isInNonBlockingThread() {
return Thread.currentThread() instanceof NonBlocking;
}
复制代码
NonBlocking
到底指的是什么?是一个接口,分别有下面两个实现:
- netty的EventLoop
- reactor的非阻塞线程
那么我们的当前线程如果属于这两种就会返回true,根据前面的判断,如果此方法返回true,则会抛出异常:
2022-03-20 13:29:34 ERROR reactor-http-nio-2 org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler [84630d14-1] 500 Server Error for HTTP POST "/user/login?username=weirx1"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP POST "/user/login?username=weirx1" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.wjbgn.bsolver.gateway.controller.LoginController.login(LoginController.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
复制代码
为什么呢?我们需要知道的是,gateway是基于netty
作为运行容器的,所以我们发现接口请求的线程就是如上异常输出中的reactor-http-nio-2
,那么它是否是NonBlocking接口
的实现的子类呢?结果是肯定的。
当我们直接在接口调用block
,而不使用异步的方式,我们看看下面的debugger过程:
能看到,类型是EventLoop
,名称是reactor-http-nio-3
,所以它一定是符合判断前面接口的判断条件,即不能直接在接口使用Mono.block()方法,否则就会报出如上异常的原因。
通过以上分析,相信我们知道了,gateway已经通过逻辑强制要求我们,必须使用异步调用。
下面我们接着分析源码,如果是其他的线程,就会走else判断的内容:
if (this.getCount() != 0L) {
try {
this.await();
} catch (InterruptedException var3) {
this.dispose();
throw Exceptions.propagate(var3);
}
}
复制代码
这个getCount
又是什么?经过跟踪发现是CountDownLatch
的方法,因为此类继承了CountDownLatch
,我们知道CountDownLatch的await
方法是阻塞,知道CounmtDownLatch的count值为0才会继续执行,否则就阻塞。
上面的代码中,如果getCount不等于0,则这个线程就会被阻塞在这。像我们前面的分析,如果用户服务
一直不返回,则此线程就会一直阻塞在这,且请求越多,被阻塞的线程就会越多。
它并没有一个处理阻塞或者说自己释放这个锁的能力。
通常我们解决接口请求阻塞的方式是什么?
很简单,就是指定超时时间
。
解决方案
前面我们知道了,造成阻塞的原因,现在我们来介绍解决方案。
使用 block(Duration timeout)
看下其源码:
@Nullable
final T blockingGet(long timeout, TimeUnit unit) {
if (Schedulers.isInNonBlockingThread()) {
throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
} else {
RuntimeException re;
if (this.getCount() != 0L) {
try {
if (!this.await(timeout, unit)) {
this.dispose();
throw new IllegalStateException("Timeout on blocking read for " + timeout + " " + unit);
}
} catch (InterruptedException var6) {
this.dispose();
re = Exceptions.propagate(var6);
re.addSuppressed(new Exception("#block has been interrupted"));
throw re;
}
}
Throwable e = this.error;
if (e != null) {
re = Exceptions.propagate(e);
re.addSuppressed(new Exception("#block terminated with an error"));
throw re;
} else {
return this.value;
}
}
}
复制代码
如上所示,同样的需要使用异步线程方式,其中的this.await(timeout, unit)
指定了超时时间,如果到了超时时间,还在阻塞,则会对线程进行中断
,同时手动抛出异常:
throw new IllegalStateException("Timeout on blocking read for " + timeout + " " + unit);
复制代码
那么我们可以有如下的改造,超时时间1s:
monoInfo.block(Duration.ofSeconds(1))
复制代码
当请求再次阻塞的时候,到达1s的超时间,会通过中断的方式,解除线程的阻塞状态,而不会一直阻塞:
2022-03-20 14:15:21 INFO reactor-http-nio-4 com.wjbgn.bsolver.gateway.controller.LoginController 登录失败,msg = java.lang.IllegalStateException: Timeout on blocking read for 1000000000 NANOSECONDS
复制代码
使用自定义线程池
当我们使用异步时,通过CompletableFuture
的方式,其默认的线程池是ForkJoinPool.commonPool()
,这是一个根据cpu核心数的默认线程池,不推荐使用,如果IO密集,会导致其他使用次线程池的任务阻塞。推荐使用自定义线程池
。
CompletableFuture<Boolean> voidCompletableFuture = CompletableFuture.supplyAsync(() ->
monoInfo.block(Duration.ofSeconds(1)), GlobalThreadPool.getExecutor());
复制代码
总结
这个总结其实更多的反思自己,在代码升级过程中的不严谨,同时考虑问题不够全面,导致最终存在的一些问题。这个问题如果等到上线后,那将会造成非常严重的问题。
同时也感谢同学们积极的回复,帮我发现这个问题,以后我会更加严谨的发布文章,减少给大家造成的困扰。
相关引用
CountDownLatch可以参考java并发编程(十九)Semaphore、CountdownLatch和CyclicBarrier你都玩过吗?
异步调用可以参考异步调用如何使用是最好的方式?