一:单机式令牌桶的缺陷
在上述的记账中我们讨论使用了Bucket来做限流机制。
/**
* 创建一个令牌桶
* @return
*/
private Bucket createNewBucket() {
Refill refill = Refill.of(1, Duration.ofSeconds(1000));// 1:每次补充令牌的数量 ; 2 补充token的时间间隔
Bandwidth limit = Bandwidth.classic(10, refill); //10,通最多能装10个令牌
return Bucket4j.builder().addLimit(limit).build();
}
我们来进一步查看其中的代码实现
public LocalBucket build() {
BucketConfiguration configuration = buildConfiguration();
switch (synchronizationStrategy) {
case LOCK_FREE: return new LockFreeBucket(configuration, timeMeter);
case SYNCHRONIZED: return new SynchronizedBucket(configuration, timeMeter);
case NONE: return new SynchronizedBucket(configuration, timeMeter, FakeLock.INSTANCE);
default: throw new IllegalStateException();
}
}
这是bucket4j最后使用Build()方法实现的本地的localBucket。事实上,只能在本机上实现,一旦分布则数据不同意。
二:分布式限领令牌桶的可行性讨论
我们还是来说一下限流的机制: 当访问开始消耗桶中的令牌达到最大值,则桶为空报错,期间桶中令牌还按照一定的时间规律
补充。
那么,令牌桶的核心就是3个概念:桶中最大的令牌数,桶补充令牌的时间间隔,桶每次补充令牌的个数。
我们来思考:
(1)令牌桶核心 业务就是:消耗令牌数和新增令牌数。如果我们使用定时任务来添加令牌。比如根据ip限流。
其中实线为定时添加令牌,虚线为访问消耗的令牌。但是这样做的消费太高,一个ip就要创建一个定时任务。
(2)简化版令牌桶:如每10s只允许一个ip访问20次。
实现思路:使用redis的失效时间。即:a拿着ip去访问redis中该ip对应的次数,当次数不为0,则可以访问。当该ip的缓存
达到失效时间,该ip去redis中发现为null,则重新初始化redis。
这样操作,我们不需要管,每一次补充多少,即在一定时间间隔内,访问次数是固定的。缺点没有动态补充。
(3)令牌桶的实现:此方法思路是在(2)的基础上实现,只不过添加了动态补充。
如果你理解了这个图的话,我想分布式令牌桶思路就没问题了。注意没问的令牌数不能大于最大数(即图中的100)
关于其中涉及的变量:
最大令牌数:访问最大的限制(即图中的100)bucketnum;
均匀产生令牌数的个数:这里用总量除以时间。即补充的总量(即图中的20个)bucketnew
补充的时间((即图中的10s)buckettime
令牌桶更新时间:即图中的每10令牌桶更新,将桶中的令牌重置为最大数量100。beginTime
上一次访问时间:在一个周期中的访问(即图中的10) 上一次访问的时间 lastTime。即 结果1中的0s和结果3中的3s
本次访问时间: 即在一个周期内的本次访问时间nowTime。即结果1中的3s和结果3中的5s。
具体代码示例:
(1)实体类:为了后续的方便,我把实体类分为2个
@Getter
@Setter
public class BucketPo {
private Integer id;
private Integer code; // 匹配对应的限流方式如ip,url等
private Long bucketnew; // 桶每次补充令牌数
private Long buckettime; // 桶每次补充的间隔
private Long bucketnum; // 通最大的数目
}
@Getter
@Setter
public class DyBucketPo extends BucketPo{
//每一个周期的开始时间
private Date beginTime;
//每一个周期结束时间
private Date endTime;
//上一次访问时间
private Date lastTime;
//本次访问时间
private Date nowTime;
//剩余次数
private long syTimes;
}
(2)实现令牌桶的代码
/**
*
* createNewBucket:创建一个新桶. <br/>
* TODO(这里描述这个方法适用条件 – 可选).<br/>
* @param po
* @param date 开始时间
* @return
* @author monxz
* @time:2019年6月16日下午3:47:37
* @since JDK 1.8
*/
private DyBucketPo createNewBucket(BucketPo po,Date date) {
//创建一个新的redis缓存
DyBucketPo dyPo=new DyBucketPo();
dyPo.setBucketnew(po.getBucketnew());
dyPo.setBucketnum(po.getBucketnum());
dyPo.setBuckettime(po.getBuckettime());
dyPo.setBeginTime(date);
Date newDate=new Date(date.getTime());
dyPo.setEndTime(TimeUtil.dateAddTime(newDate, po.getBuckettime()));
dyPo.setLastTime(date);
dyPo.setNowTime(date);
dyPo.setSyTimes(po.getBucketnum());
return dyPo;
}
/**
*
* createNextBucket:在某个时间后面创建下一个桶. <br/>
* TODO(这里描述这个方法适用条件 – 可选).<br/>
* @param po
* @param date
* @return
* @author monxz
* @time:2019年6月16日下午3:08:09
* @since JDK 1.8
*/
private DyBucketPo createNextBucket(DyBucketPo po,Date date){
if(TimeUtil.timeSencond(date, po.getEndTime()) >=0){
//新的结束开始/时间
po.setBeginTime(date);
Date newDate=new Date(date.getTime());
po.setEndTime(TimeUtil.dateAddTime(newDate, po.getBuckettime()));
po.setSyTimes(po.getBucketnum());
}else{
po.setLastTime(date);
po.setSyTimes(po.getSyTimes());
}
return po;
}
/**
*
* myFilter:更新本次桶. <br/>
* TODO.<br/>
* @param po
* @param date,这里在方法的一开始就用一个类似于全局变量的东西,使得后面的时间统一
* @return
* @author monxz
* @time:2019年6月16日下午3:10:40
* @since JDK 1.8
*/
private DyBucketPo useBucket(DyBucketPo po,Date date){
//1.计算出上一次剩余的次数
long syLastTimes=po.getSyTimes();
//2.计算出本次新产生的次数
long nowTimes=0;
if(TimeUtil.timeSencond(date, po.getEndTime()) >=0){
nowTimes=po.getBucketnum();
}else{
long timeJg=TimeUtil.timeSencond(date, po.getLastTime());
nowTimes= (long)(Float.parseFloat(po.getBucketnew()+"")
/ Float.parseFloat(po.getBuckettime()+"")
* Float.parseFloat(timeJg+""));
}
//3.计算出本次剩余的次数
long syTimes=0;
if((syLastTimes + nowTimes) >= po.getBucketnum() ){
syTimes=po.getBucketnum()-1;
}else{
syTimes=syLastTimes + nowTimes -1;
}
po.setSyTimes(syTimes);
return po;
}
public DyBucketPo getBucket4Redis(String key,BucketPo po,Date currentDate){
DyBucketPo dyPo=new DyBucketPo();
//获取到一个桶
if(redisTemp.opsForValue().get(key) !=null ){
dyPo=(DyBucketPo) redisTemp.opsForValue().get(key);
}else{
dyPo=new BucketMethod().createNewBucket(po,currentDate);
}
//消耗令牌
dyPo=new BucketMethod().useBucket(dyPo, currentDate);
//创建下一个桶
dyPo=new BucketMethod().createNextBucket(dyPo, currentDate);
if(dyPo.getSyTimes() <= 0){
dyPo.setSyTimes(0L);
}else{
dyPo.setLastTime(currentDate);
}
//存放到redis中
redisTemp.opsForValue().set(key, dyPo);
return dyPo;
}
(3)动态从数据库中获取到限流方式
package com.springcloud.gateway.filters;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import com.springcloud.gateway.common.enums.AllowedMethod;
import com.springcloud.gateway.common.enums.LimitType;
import com.springcloud.gateway.common.single.LimitConfigSingle;
import com.springcloud.gateway.enetity.po.BucketPo;
import com.springcloud.gateway.enetity.po.DyBucketPo;
import com.springcloud.gateway.enetity.po.Limitconfig;
import com.springcloud.gateway.service.LimitService;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.Bucket4j;
import io.github.bucket4j.Refill;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
/**
* 全局过滤器: 单机过滤器
*
*
* @author monxz
*
* @date 2019年6月6日
*/
@Component
@Slf4j
public class RequestGlobalFilter implements GlobalFilter,Ordered {
@Autowired
private BucketMethod method;
@Autowired
private LimitService limitService;
private static LimitConfigSingle configSingle=LimitConfigSingle.getInstance();
//执行顺序
@Override
public int getOrder() {
return 1;
}
/**
* 创建一个令牌桶
* @return
*/
private Bucket createNewBucket() {
Refill refill = Refill.of(1, Duration.ofSeconds(1000));// 1:每次补充令牌的数量 ; 2 补充token的时间间隔
Bandwidth limit = Bandwidth.classic(10, refill); //10,通最多能装10个令牌
return Bucket4j.builder().addLimit(limit).build();
}
/**
* 限流
*/
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String uri = request.getURI().toString();
String ip=request.getRemoteAddress().getAddress().getHostAddress();
if (getConfig(request)) {
return chain.filter(exchange);
} else {
//当可用的令牌书为0是,进行限流返回429状态码
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return exchange.getResponse().setComplete();
}
}
/**
* 配置加载
* get:(这里用一句话描述这个方法的作用). <br/>
* TODO(这里描述这个方法适用条件 – 可选).<br/>
* @param request
* @return
* @author monxz
* @time:2019年6月16日下午6:56:49
* @since JDK 1.8
*/
public Boolean getConfig(ServerHttpRequest request){
String uri = request.getURI().toString();
//过滤掉不可用的限流方式
List<Limitconfig> list = configSingle.getList().stream().filter(o -> o.getDisplay() == 1).collect(Collectors.toList());
//按照顺序排序
list.stream().sorted(Comparator.comparing(Limitconfig::getSort)).collect(Collectors.toList());
//提出掉无效数据
List<BucketPo> bucketList=limitService.bucketList(null);
List<Limitconfig> res=new ArrayList<>();
for(Limitconfig config:list){
for(BucketPo po:bucketList){
if(po.getCode() == config.getCode()){
res.add(config);
}
}
}
list=res;
for(Limitconfig config:list){
if(config.getCode() == LimitType.IP.getCode() ){
log.info("开启ip限流");
String ip=request.getRemoteAddress().getAddress().getHostAddress();
ip=ip.equals("0:0:0:0:0:0:0:1")?"localhost":ip;
if(!allowedMethod(ip, config.getLimitmethod(), config.getLimittext())){
log.info("该ip已经被封锁");
return false;
}
log.info("该ip进入限流");
DyBucketPo po=method.getBucket4Redis(LimitType.IP.getMsg()+ip,
bucketList.stream().filter(o -> o.getCode() == LimitType.IP.getCode()).collect(Collectors.toList()).get(0),
new Date());
log.info("ip限流剩余:"+po.getSyTimes());
if(po == null || po.getSyTimes() <= 0){
log.info("该ip已无访问次数");
return false;
}
}else if(config.getCode() == LimitType.URL.getCode()){
log.info("开启url限流");
if(!allowedMethod(uri, config.getLimitmethod(), config.getLimittext())){
log.info("该url已经被封锁");
return false;
}
log.info("该url进入限流");
DyBucketPo po=method.getBucket4Redis(LimitType.URL.getMsg()+uri,
bucketList.stream().filter(o -> o.getCode() == LimitType.URL.getCode()).collect(Collectors.toList()).get(0)
, new Date());
log.info("url限流剩余:"+po.getSyTimes());
if(po == null || po.getSyTimes() <= 0){
log.info("该url已无访问次数");
return false;
}
}else if(config.getCode() == LimitType.PARAM.getCode()){
log.info("开启PARAM限流");
if(!allowedMethod(uri, config.getLimitmethod(), config.getLimittext())){
log.info("该PARAM已经被封锁");
return false;
}
log.info("该PARAM进入限流");
DyBucketPo po=method.getBucket4Redis(LimitType.PARAM.getMsg()+uri,
bucketList.stream().filter(o -> o.getCode() == LimitType.PARAM.getCode()).collect(Collectors.toList()).get(0)
, new Date());
log.info("param限流剩余:"+po.getSyTimes());
if(po == null || po.getSyTimes() <= 0){
log.info("该param已无访问次数");
return false;
}
}
}
return true;
}
/**
*
* allowedMethod:拦截方式. <br/>
* TODO(这里描述这个方法适用条件 – 可选).<br/>
* @param key
* @return
* @author monxz
* @time:2019年6月16日下午6:57:44
* @since JDK 1.8
*/
public boolean allowedMethod(String key,int method,String text){
for(AllowedMethod e:AllowedMethod.values()){
if(method == e.getCode() && method == 1){
log.info("所有请求都被拦截");
return false;
}else if(method == e.getCode() && method == 2 ){
log.info("====白名单拦截");
if(!text.contains(key)){
log.info("该请求不再白名单内");
return false;
}
}else if(method == e.getCode() && method == 4 ){
log.info("=====黑名单拦截");
if(text.contains(key)){
log.info("该请求在黑名单内");
return false;
}
}else if(method == e.getCode() && method == 3){
log.info("所有请求都通过");
return true;
}
}
return true;
}
}
(4)实体表结构
三:总结
其实,redis实现令牌桶的机制,只要弄清楚令牌桶机制就ok了,整个实现的步骤就是:(1)从数据库中获取限流方式。(2)验证限流方式。(3)令牌桶实现限流。