一、限流场景
限流场景一般基于硬件资源的使用负载,包括CPU,内存,IO。例如某个报表服务需要消耗大量内存,如果并发数增加就会拖慢整个应用,甚至内存溢出导致应用挂掉。
限流适用于会动态增加的资源,已经池化的资源不一定需要限流,例如数据库连接池,它是已经确定的资源,池的大小固定(即使可以动态伸缩池大小),这种场景下并不需要通过限流来实现,只要能做到如果池内链接已经使用完,则无法再获取新的连接则可。
因此,使用限流的前提是:
1.防止资源使用过载产生不良影响。
2.使用的资源会动态增加,例如一个站点的请求。
二、Spring中实现限流
I、限流需求
1.只针对Controller限流
2.根据url请求路径限流
3.可根据正则表达式匹配url来限流 4.可定义多个限流规则,每个规则的最大流量不同
II、相关类结构
1.CurrentLimiteAspect是一个拦截器,在controller执行前后执行后拦截
2.CurrentLimiter是限流器,可以添加限流规则,根据限流规则获取流量通行证,释放流量通行证;如果获取通行证失败则抛出异常。
3.LimiteRule是限流规则,限流规则可设置匹配url的正则表达式和最大流量值,同时获取该规则的流量通信证和释放流量通信证。
4.AcquireResult是获取流量通信证的结果,结果有3种:获取成功,获取失败,不需要获取。
5.Application是Spring的启动类,简单起见,在启动类种添加限流规则。
III、Show me code
1.AcquireResult.java
public class AcquireResult {
/** 获取通行证成功 */
public static final int ACQUIRE_SUCCESS = 0;
/** 获取通行证失败 */
public static final int ACQUIRE_FAILED = 1;
/** 不需要获取通行证 */
public static final int ACQUIRE_NONEED = 2;
/** 获取通行证结果 */
private int result;
/** 可用通行证数量 */
private int availablePermits;
public int getResult() {
return result;
}
public void setResult(int result) {
this.result = result;
}
public int getAvailablePermits() {
return availablePermits;
}
public void setAvailablePermits(int availablePermits) {
this.availablePermits = availablePermits;
}
}
复制代码
2.LimiteRule.java
/**
* @ClassName LimiteRule
* @Description TODO
* @Author 铿然一叶
* @Date 2019/10/4 20:18
* @Version 1.0
* javashizhan.com
**/
public class LimiteRule {
/** 信号量 */
private final Semaphore sema;
/** 请求URL匹配规则 */
private final String pattern;
/** 最大并发数 */
private final int maxConcurrent;
public LimiteRule(String pattern, int maxConcurrent) {
this.sema = new Semaphore(maxConcurrent);
this.pattern = pattern;
this.maxConcurrent = maxConcurrent;
}
/**
* 获取通行证
* @param urlPath 请求Url
* @return 0-获取成功,1-没有获取到通行证,2-不需要获取通行证
*/
public synchronized AcquireResult tryAcquire(String urlPath) {
AcquireResult acquireResult = new AcquireResult();
acquireResult.setAvailablePermits(this.sema.availablePermits());
try {
//Url请求匹配规则则获取通行证
if (Pattern.matches(pattern, urlPath)) {
boolean acquire = this.sema.tryAcquire(50, TimeUnit.MILLISECONDS);
if (acquire) {
acquireResult.setResult(AcquireResult.ACQUIRE_SUCCESS);
print(urlPath);
} else {
acquireResult.setResult(AcquireResult.ACQUIRE_FAILED);
}
} else {
acquireResult.setResult(AcquireResult.ACQUIRE_NONEED);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return acquireResult;
}
/**
* 释放通行证
*/
public synchronized void release() {
this.sema.release();
print(null);
}
/**
* 得到最大并发数
* @return
*/
public int getMaxConcurrent() {
return this.maxConcurrent;
}
/**
* 得到匹配表达式
* @return
*/
public String getPattern() {
return this.pattern;
}
/**
* 打印日志
* @param urlPath
*/
private void print(String urlPath) {
StringBuffer buffer = new StringBuffer();
buffer.append("Pattern: ").append(pattern).append(", ");
if (null != urlPath) {
buffer.append("urlPath: ").append(urlPath).append(", ");
}
buffer.append("Available Permits:").append(this.sema.availablePermits());
System.out.println(buffer.toString());
}
}
复制代码
3.CurrentLimiter.java
/**
* @ClassName CurrentLimiter
* @Description TODO
* @Author 铿然一叶
* @Date 2019/10/4 20:18
* @Version 1.0
* javashizhan.com
**/
public class CurrentLimiter {
/** 本地线程变量,存储一次请求获取到的通行证,和其他并发请求隔离开,在controller执行完后释放本次请求获得的通行证 */
private static ThreadLocal<Vector<LimiteRule>> localAcquiredLimiteRules = new ThreadLocal<Vector<LimiteRule>>();
/** 所有限流规则 */
private static Vector<LimiteRule> allLimiteRules = new Vector<LimiteRule>();
/** 私有构造器,避免实例化 */
private CurrentLimiter() {}
/**
* 添加限流规则,在spring启动时添加,不需要加锁,如果在运行中动态添加,需要加锁
* @param rule
*/
public static void addRule(LimiteRule rule) {
printRule(rule);
allLimiteRules.add(rule);
}
/**
* 获取流量通信证,所有流量规则都要获取后才能通过,如果一个不能获取则抛出异常
* 多线程并发,需要加锁
* @param urlPath
*/
public synchronized static void tryAcquire(String urlPath) throws Exception {
//有限流规则则处理
if (allLimiteRules.size() > 0) {
//能获取到通行证的流量规则要保存下来,在Controller执行完后要释放
Vector<LimiteRule> acquiredLimitRules = new Vector<LimiteRule>();
for(LimiteRule rule:allLimiteRules) {
//获取通行证
AcquireResult acquireResult = rule.tryAcquire(urlPath);
if (acquireResult.getResult() == AcquireResult.ACQUIRE_SUCCESS) {
acquiredLimitRules.add(rule);
//获取到通行证的流量规则添加到本地线程变量
localAcquiredLimiteRules.set(acquiredLimitRules);
} else if (acquireResult.getResult() == AcquireResult.ACQUIRE_FAILED) {
//如果获取不到通行证则抛出异常
StringBuffer buffer = new StringBuffer();
buffer.append("The request [").append(urlPath).append("] exceeds maximum traffic limit, the limit is ").append(rule.getMaxConcurrent())
.append(", available permit is").append(acquireResult.getAvailablePermits()).append(".");
System.out.println(buffer);
throw new Exception(buffer.toString());
} else {
StringBuffer buffer = new StringBuffer();
buffer.append("This path does not match the limit rule, path is [").append(urlPath)
.append("], pattern is [").append(rule.getPattern()).append("].");
System.out.println(buffer.toString());
}
}
}
}
/**
* 释放获取到的通行证。在controller执行完后掉调用(抛出异常也需要调用)
*/
public synchronized static void release() {
Vector<LimiteRule> acquiredLimitRules = localAcquiredLimiteRules.get();
if (null != acquiredLimitRules && acquiredLimitRules.size() > 0) {
acquiredLimitRules.forEach(rule->{
rule.release();
});
}
//destory本地线程变量,避免内存泄漏
localAcquiredLimiteRules.remove();
}
/**
* 打印限流规则信息
* @param rule
*/
private static void printRule(LimiteRule rule) {
StringBuffer buffer = new StringBuffer();
buffer.append("Add Limit Rule, Max Concurrent: ").append(rule.getMaxConcurrent())
.append(", Pattern: ").append(rule.getPattern());
System.out.println(buffer.toString());
}
}
复制代码
4.CurrentLimiteAspect.java
/**
* @ClassName CurrentLimiteAspect
* @Description TODO
* @Author 铿然一叶
* @Date 2019/10/4 20:15
* @Version 1.0
* javashizhan.com
**/
@Aspect
@Component
public class CurrentLimiteAspect {
/**
* 拦截controller,自行修改路径
*/
@Pointcut("execution(* com.javashizhan.controller..*(..))")
public void controller() { }
@Before("controller()")
public void controller(JoinPoint point) throws Exception {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
//获取通行证,urlPath的格式如:/limit
CurrentLimiter.tryAcquire(request.getRequestURI());
}
/**
* controller执行完后调用,即使controller抛出异常这个拦截方法也会被调用
* @param joinPoint
*/
@After("controller()")
public void after(JoinPoint joinPoint) {
//释放获取到的通行证
CurrentLimiter.release();
}
}
复制代码
5.Application.java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
new SpringApplicationBuilder(Application.class).run(args);
//添加限流规则
LimiteRule rule = new LimiteRule("/limit", 4);
CurrentLimiter.addRule(rule);
}
}
复制代码
IV、验证
测试验证碰到的两个坑:
1.人工通过浏览器刷新请求发现controller是串行的
2.通过postman设置了并发测试也还是串行的,即便设置了并发数,如下图:
百度无果,只能自行写代码验证了,代码如下:
/**
* @ClassName TestClient
* @Description TODO
* @Author 铿然一叶
* @Date 2019/10/5 0:51
* @Version 1.0
* javashizhan.com
**/
public class CurrentLimiteTest {
public static void main(String[] args) {
final String limitUrlPath = "http://localhost:8080/limit";
final String noLimitUrlPath = "http://localhost:8080/nolimit";
//限流测试
test(limitUrlPath);
//休眠一会,等上一批线程执行完,方便查看日志
sleep(5000);
//不限流测试
test(noLimitUrlPath);
}
private static void test(String urlPath) {
Thread[] requesters = new Thread[10];
for (int i = 0; i < requesters.length; i++) {
requesters[i] = new Thread(new Requester(urlPath));
requesters[i].start();
}
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Requester implements Runnable {
private final String urlPath;
private final RestTemplate restTemplate = new RestTemplate();
public Requester(String urlPath) {
this.urlPath = urlPath;
}
@Override
public void run() {
String response = restTemplate.getForEntity(urlPath, String.class).getBody();
System.out.println("response: " + response);
}
}
复制代码
输出日志如下:
Pattern: /limit, urlPath: /limit, Available Permits:3
Pattern: /limit, urlPath: /limit, Available Permits:2
Pattern: /limit, urlPath: /limit, Available Permits:1
Pattern: /limit, urlPath: /limit, Available Permits:0
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
Pattern: /limit, Available Permits:1
Pattern: /limit, Available Permits:2
Pattern: /limit, Available Permits:3
Pattern: /limit, Available Permits:4
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
复制代码
可以看到日志输出信息为:
1.第1个测试url最大并发为4,一次10个并发请求,有4个获取通行证后,剩余6个获取通行证失败。
2.获取到通行证的4个请求在controller执行完后释放了通行证。
3.第2个测试url没有限制并发,10个请求均执行成功。
至此,限流器验证成功。
end.
相关阅读:
Java并发编程(一)知识地图
Java并发编程(二)原子性
Java并发编程(三)可见性
Java并发编程(四)有序性
Java并发编程(五)创建线程方式概览
Java并发编程入门(六)synchronized用法
Java并发编程入门(七)轻松理解wait和notify以及使用场景
Java并发编程入门(八)线程生命周期
Java并发编程入门(九)死锁和死锁定位
Java并发编程入门(十)锁优化
Java并发编程入门(十二)生产者和消费者模式-代码模板
Java并发编程入门(十三)读写锁和缓存模板
Java并发编程入门(十四)CountDownLatch应用场景
<---此贴不易,左边点赞!
站点: javashizhan.com/
微信公众号: