熔断小构件
我们的工程如果通过http接口或rpc获取数据,实际就是依赖对方接口的稳定性。如果被依赖的接口挂掉了,我们的系统要做到基本不受影响,并且能在依赖接口恢复后自动恢复接口的正常调用。
实现了一个小构件,功能原理如下:
1、支持注解的方式或者原生调用
2、设置接口的最大调用时间maxTime,接口失败的最大次数maxCnt,以及失败后开始重新尝试恢复的次数tryReconvertCnt
3、每次调用记录时间,如果超过maxTime的次数超过maxCnt,则熔断。
4、熔断后,每次都不会实际调用接口,只会记录一个Intege.MAX_VALUE
5、如果Intege.MAX_VALUE的次数超过尝试恢复的次数tryReconvertCnt,则实际调用一次接口,如果接口的调用时间在maxTime以内,则恢复正常服务。
源代码及使用方法如下:
@Service
public class StopLogicService {
private ConcurrentHashMap < String,StopLogicEntity> map= new ConcurrentHashMap<>();//熔断器key+熔断器对象
/*
*初始化一个熔断器
* @param uniqKey 熔断器的唯一标识,不可重复
* @param maxTime 接口理论上正常花费时间的最大值
* @param maxCnt 允许失败的最大值,超过此值后熔断
* @param tryRecoverCnt 熔断后,超过此值开始尝试恢复
*/
public void initOne(String uniqKey, int maxTime, int maxCnt, int tryRecoverCnt){
map.put(uniqKey,new StopLogicEntity(maxTime, maxCnt, tryRecoverCnt));
}
//删除一个熔断器
public void deleteOne(String primaryKey){
map.remove(primaryKey);
}
public StopLogicEntity getStopLogicEntity(String primaryKey){
return map.get(primaryKey);
}
//是否已包含该熔断器
public boolean containsEntry(String uniqKey){
return map.containsKey(uniqKey);
}
/*
*熔断器定义
*/
public static class StopLogicEntity{
private int maxTime; //接口允许的最大时间 毫秒
private int maxCnt; //熔断触发的最大次数
private int tryRecoverCnt; //熔断后,考虑重新尝试的次数 tryRecoverCnt>maxCnt
private int dataSize;
private ConcurrentLinkedDeque< Integer> dataList;
public StopLogicEntity(int maxTime,int maxCnt,int tryRecoverCnt){
if(tryRecoverCnt<=maxCnt){
throw new RuntimeException("tryRecoverCnt must greater than maxCnt!");
}
this.maxTime=maxTime;
this.maxCnt=maxCnt;
this.tryRecoverCnt=tryRecoverCnt;
this.dataList=new ConcurrentLinkedDeque();
this.dataSize=Math.max(maxCnt,tryRecoverCnt);
}
/*
*记录一次调用所花费的时间
*/
public void recordOne(int timeInt){
if(dataList.size()>=dataSize){
dataList.removeFirst();
}
dataList.addLast(timeInt);
}
/*
*判断服务是否可用
* @return true 服务可以调用。 false,熔断中,跳过服务调用。
*/
public boolean isAvailable(){
int overtimeCnt=0; //超时次数
int cnt=0;
Iterator<Integer> iterator= dataList.descendingIterator();
while (iterator.hasNext()){
cnt++;
if(iterator.next()>=maxTime){
overtimeCnt++;
}else{
return true;//有一个恢复成功的记录,则取消熔断
}
if(overtimeCnt>=maxCnt){
recordOne(Integer.MAX_VALUE); //Integer.MAX_VALUE 代表熔断中的一次未发送的请求
if(isNeedRecover()){
return true;//需要从熔断中恢复
}
return false;
}
if(cnt>maxCnt){
break;
}
}
return true;
}
//是否需要在熔断中尝试恢复一下?
private boolean isNeedRecover(){
int cnt=0;
Iterator<Integer> iterator= dataList.descendingIterator();
while (iterator.hasNext()){
if(Integer.MAX_VALUE==iterator.next()){
cnt++;
if(cnt>=tryRecoverCnt){
return true;
}
}
}
return false;
}
}
//测试代码
public static void main(String[] args) throws InterruptedException {
final long start=System.currentTimeMillis();
final String primaryKey="xx_key";
final StopLogicService stopLogicService=new StopLogicService();
stopLogicService.initOne(primaryKey,500,100,200);
ExecutorService threadPool= Executors.newFixedThreadPool(30);
for(int i=0;i<30;i++) {
threadPool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
for (int j = 0; j < 20; j++) {
TimeUnit.SECONDS.sleep(1);
int baseInt = 300;
if (System.currentTimeMillis() - start > 10 * 1000) {//开始熔断
baseInt = 500;
}
if (stopLogicService.getStopLogicEntity(primaryKey).isAvailable()) {
stopLogicService.getStopLogicEntity(primaryKey).recordOne(baseInt + RandomUtils.nextInt(100)); //发送请求
System.out.println(Thread.currentThread().getName() + "发送请求");
} else {
System.out.println(Thread.currentThread().getName() + "熔断");
}
}
return null;
}
});
}
TimeUnit.MINUTES.sleep(1);
threadPool.shutdownNow();
}
}
原生使用方式:
if(stopLogicService.getStopLogicEntity(STOP_LOGIC_KEY).isAvailable()){//判断是否需要熔断
long start=System.currentTimeMillis();
String url=String.format(urlPattern,cityCode);
result = HttpClientUtils.getContentByHttpGet(url, "UTF-8", 1);
stopLogicService.getStopLogicEntity(STOP_LOGIC_KEY).recordOne(Integer.parseInt(String.valueOf(System.currentTimeMillis()-start)));//向熔断器记录本次花费时间
}else {
logger.error(STOP_LOGIC_KEY+"熔断!");
}
自定义注解:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface StopLogicAnn {
int maxTime(); //正常情况下接口调用时间的最大值,超过后会被记录一笔超时
int maxCnt();//超时的最大可容忍次数
int tryRecoverCnt();//熔断使得接口被跳过,达到一定次数后开始尝试一次调用
String uniqKey();
}
定义一个切面:
@Aspect
@Component
public class StopLogicAspect {
private static Logger logger= LoggerFactory.getLogger(StopLogicAspect.class);
@Resource
private StopLogicService stopLogicService;
//切入点
@Pointcut(value = "@annotation(com.corp.StopLogicAnn)")
private void pointcut() {
}
@Around(value = "pointcut() && @annotation(stopLogicAnn)")
public Object around(ProceedingJoinPoint proceedingJoinPoint,StopLogicAnn stopLogicAnn){
Object result=null;
try {
String uniqKey=stopLogicAnn.uniqKey();
int maxCnt=stopLogicAnn.maxCnt();
int maxTime=stopLogicAnn.maxTime();
int tryReconverCnt=stopLogicAnn.tryRecoverCnt();
if(!stopLogicService.containsEntry(uniqKey)){//如果没有注册熔断器先注册一个
stopLogicService.initOne(uniqKey,maxTime,maxCnt,tryReconverCnt);
}
if(stopLogicService.getStopLogicEntity(uniqKey).isAvailable()){//判断是否需要熔断
long start=System.currentTimeMillis();
result= proceedingJoinPoint.proceed();
stopLogicService.getStopLogicEntity(uniqKey).recordOne(Integer.parseInt(String.valueOf(System.currentTimeMillis()-start)));//向熔断器记录本次花费时间
}else {
logger.error(uniqKey+"熔断!");
}
return result;
} catch (Throwable throwable) {
logger.error("around:",throwable);
}
return null;
}
}
注解的使用方式:
@StopLogicAnn(maxTime = 500,maxCnt = 50,tryRecoverCnt = 200,uniqKey = "city_suggest_stoplogic")
@RequestMapping(value = "/suggestV3", method = {RequestMethod.GET, RequestMethod.HEAD})
@ResponseBody
public Map< String, Object> suggestV3();
//spring的相关配置:
< aop:aspectj-autoproxy proxy-target-class="true"/>
< context:component-scan base-package="************************"/>