接口熔断小构件

熔断小构件

我们的工程如果通过http接口或rpc获取数据,实际就是依赖对方接口的稳定性。如果被依赖的接口挂掉了,我们的系统要做到基本不受影响,并且能在依赖接口恢复后自动恢复接口的正常调用。

实现了一个小构件,功能原理如下:

1、支持注解的方式或者原生调用

2、设置接口的最大调用时间maxTime,接口失败的最大次数maxCnt,以及失败后开始重新尝试恢复的次数tryReconvertCnt

3、每次调用记录时间,如果超过maxTime的次数超过maxCnt,则熔断。

4、熔断后,每次都不会实际调用接口,只会记录一个Intege.MAX_VALUE

5、如果Intege.MAX_VALUE的次数超过尝试恢复的次数tryReconvertCnt,则实际调用一次接口,如果接口的调用时间在maxTime以内,则恢复正常服务。

源代码及使用方法如下:

@Service
public class StopLogicService {
    private ConcurrentHashMap < String,StopLogicEntity> map= new ConcurrentHashMap<>();//熔断器key+熔断器对象
 /*
    *初始化一个熔断器
    * @param uniqKey 熔断器的唯一标识,不可重复
    * @param maxTime 接口理论上正常花费时间的最大值
    * @param maxCnt 允许失败的最大值,超过此值后熔断
    * @param tryRecoverCnt 熔断后,超过此值开始尝试恢复
     */
    public void initOne(String uniqKey, int maxTime, int maxCnt, int tryRecoverCnt){
        map.put(uniqKey,new StopLogicEntity(maxTime, maxCnt, tryRecoverCnt));
    }
    //删除一个熔断器
    public void deleteOne(String primaryKey){
        map.remove(primaryKey);
    }
    public StopLogicEntity getStopLogicEntity(String primaryKey){
        return map.get(primaryKey);
    }
    //是否已包含该熔断器
    public boolean containsEntry(String uniqKey){
        return map.containsKey(uniqKey);
    }

    /*
    *熔断器定义
     */
    public static class StopLogicEntity{
        private int maxTime;    //接口允许的最大时间 毫秒
        private int maxCnt;     //熔断触发的最大次数
        private int tryRecoverCnt;  //熔断后,考虑重新尝试的次数 tryRecoverCnt>maxCnt
        private int dataSize;
        private ConcurrentLinkedDeque< Integer> dataList;
        public StopLogicEntity(int maxTime,int maxCnt,int tryRecoverCnt){
            if(tryRecoverCnt<=maxCnt){
                throw new RuntimeException("tryRecoverCnt must greater than maxCnt!");
            }
            this.maxTime=maxTime;
            this.maxCnt=maxCnt;
            this.tryRecoverCnt=tryRecoverCnt;
            this.dataList=new ConcurrentLinkedDeque();
            this.dataSize=Math.max(maxCnt,tryRecoverCnt);
        }
        /*
        *记录一次调用所花费的时间
         */
        public void recordOne(int timeInt){
            if(dataList.size()>=dataSize){
                dataList.removeFirst();
            }
            dataList.addLast(timeInt);
        }

        /*
        *判断服务是否可用
        * @return true 服务可以调用。 false,熔断中,跳过服务调用。
         */
        public boolean isAvailable(){  
            int overtimeCnt=0;  //超时次数
            int cnt=0;
            Iterator<Integer> iterator= dataList.descendingIterator();
            while (iterator.hasNext()){
                cnt++;
                if(iterator.next()>=maxTime){
                    overtimeCnt++;
                }else{
                    return true;//有一个恢复成功的记录,则取消熔断
                }
                if(overtimeCnt>=maxCnt){
                    recordOne(Integer.MAX_VALUE);   //Integer.MAX_VALUE 代表熔断中的一次未发送的请求
                    if(isNeedRecover()){
                        return true;//需要从熔断中恢复
                    }
                    return false;
                }
                if(cnt>maxCnt){
                    break;
                }
            }
            return true;
        }
        //是否需要在熔断中尝试恢复一下?
        private boolean isNeedRecover(){
            int cnt=0;
            Iterator<Integer> iterator= dataList.descendingIterator();
            while (iterator.hasNext()){
                if(Integer.MAX_VALUE==iterator.next()){
                    cnt++;
                    if(cnt>=tryRecoverCnt){
                        return true;
                    }
                }
            }
            return false;
        }


    }

    //测试代码
    public static void main(String[] args) throws InterruptedException {
        final long start=System.currentTimeMillis();


        final String primaryKey="xx_key";
        final StopLogicService stopLogicService=new StopLogicService();
        stopLogicService.initOne(primaryKey,500,100,200);
        ExecutorService threadPool= Executors.newFixedThreadPool(30);
        for(int i=0;i<30;i++) {
            threadPool.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    for (int j = 0; j < 20; j++) {
                        TimeUnit.SECONDS.sleep(1);
                        int baseInt = 300;
                        if (System.currentTimeMillis() - start > 10 * 1000) {//开始熔断
                            baseInt = 500;
                        }
                        if (stopLogicService.getStopLogicEntity(primaryKey).isAvailable()) {
                            stopLogicService.getStopLogicEntity(primaryKey).recordOne(baseInt + RandomUtils.nextInt(100));   //发送请求
                            System.out.println(Thread.currentThread().getName() + "发送请求");
                        } else {
                            System.out.println(Thread.currentThread().getName() + "熔断");
                        }
                    }
                    return null;
                }
            });
        }
        TimeUnit.MINUTES.sleep(1);
        threadPool.shutdownNow();
    }
}

原生使用方式:

if(stopLogicService.getStopLogicEntity(STOP_LOGIC_KEY).isAvailable()){//判断是否需要熔断
    long start=System.currentTimeMillis();
    String url=String.format(urlPattern,cityCode);
    result = HttpClientUtils.getContentByHttpGet(url, "UTF-8", 1);
    stopLogicService.getStopLogicEntity(STOP_LOGIC_KEY).recordOne(Integer.parseInt(String.valueOf(System.currentTimeMillis()-start)));//向熔断器记录本次花费时间
}else {
    logger.error(STOP_LOGIC_KEY+"熔断!");
}

自定义注解:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface StopLogicAnn {
    int maxTime();  //正常情况下接口调用时间的最大值,超过后会被记录一笔超时
    int maxCnt();//超时的最大可容忍次数
    int tryRecoverCnt();//熔断使得接口被跳过,达到一定次数后开始尝试一次调用
    String uniqKey();
}

定义一个切面:

@Aspect
@Component
public class StopLogicAspect {
    private static Logger logger= LoggerFactory.getLogger(StopLogicAspect.class);

    @Resource
    private StopLogicService stopLogicService;

    //切入点
    @Pointcut(value = "@annotation(com.corp.StopLogicAnn)")
    private void pointcut() {

    }

    @Around(value = "pointcut() && @annotation(stopLogicAnn)")
    public Object around(ProceedingJoinPoint proceedingJoinPoint,StopLogicAnn stopLogicAnn){
        Object result=null;
        try {
            String uniqKey=stopLogicAnn.uniqKey();
            int maxCnt=stopLogicAnn.maxCnt();
            int maxTime=stopLogicAnn.maxTime();
            int tryReconverCnt=stopLogicAnn.tryRecoverCnt();
            if(!stopLogicService.containsEntry(uniqKey)){//如果没有注册熔断器先注册一个
                stopLogicService.initOne(uniqKey,maxTime,maxCnt,tryReconverCnt);
            }
            if(stopLogicService.getStopLogicEntity(uniqKey).isAvailable()){//判断是否需要熔断
                long start=System.currentTimeMillis();
                result= proceedingJoinPoint.proceed();
                stopLogicService.getStopLogicEntity(uniqKey).recordOne(Integer.parseInt(String.valueOf(System.currentTimeMillis()-start)));//向熔断器记录本次花费时间
            }else {
                logger.error(uniqKey+"熔断!");
            }
            return result;
        } catch (Throwable throwable) {
            logger.error("around:",throwable);
        }
        return null;
    }
}

注解的使用方式:

@StopLogicAnn(maxTime = 500,maxCnt = 50,tryRecoverCnt = 200,uniqKey = "city_suggest_stoplogic")
@RequestMapping(value = "/suggestV3", method = {RequestMethod.GET, RequestMethod.HEAD})
@ResponseBody
public Map< String, Object> suggestV3();


//spring的相关配置:

< aop:aspectj-autoproxy proxy-target-class="true"/>
< context:component-scan base-package="************************"/>

猜你喜欢

转载自blog.csdn.net/fantasyagain/article/details/80695031