Hystrix基本使用

添加依赖

<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-core</artifactId>
    <version>1.5.8</version>
</dependency>
<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-metrics-event-stream</artifactId>
    <version>1.5.8</version>
</dependency>

HelloWorld

import com.netflix.hystrix.*;
import rx.Observable;

import java.util.concurrent.Future;

/**
 * Created by gmou on 2016/12/15.
 */
public classA_CommandHelloWorldextendsHystrixCommand<String>{

    private final String name;

    public A_CommandHelloWorld(String name) {
        super(
                Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("CommandHelloWorld"))
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroupThreadPool"))
                        .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                                .withCoreSize(20)
                                .withMaxQueueSize(40))
                        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                                // 设置隔离方式,默认线程池隔离,可以设置为信号量隔离
//                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                                // 添加熔断策略,服务不可用到一定的阈值直接使用降级方案,以免造成雪崩。
                                .withCircuitBreakerEnabled(true)
                                .withCircuitBreakerErrorThresholdPercentage(50)
                                .withCircuitBreakerSleepWindowInMilliseconds(5)
                                .withExecutionTimeoutInMilliseconds(40))
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        Thread.sleep(50);
        return String.format("Hello %s !", name);
    }

    @Override
    protected String getFallback() {
        return "fallback:" + name;
    }

    public static void main(String[] args) throws Exception {
        A_CommandHelloWorld command1 = new A_CommandHelloWorld("Jack1");
        // 同步
        command1.execute();

        // 异步
        Future<String> future = new A_CommandHelloWorld("Jack2").queue();
        future.get();

        //响应式
        // 1. 立即执行execute
        Observable observable1 = new A_CommandHelloWorld("Jack3").observe();
        // 2. subscribe后才会执行execute
        Observable observable2 = new A_CommandHelloWorld("Jack4").toObservable();

        observable1.subscribe(obj -> {
            System.out.println("next:" + obj);
        }, obj -> {
            System.out.println("error:" + obj);
        }, () -> {
            System.out.println("complete");
        });
    }

}

响应式,可以返回多次结果

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;

/**
 * 响应式
 * 可以返回多个结果,执行多次onNext
 */
public classB_ObservableCommandHelloWorldextendsHystrixObservableCommand<String>{

    private final String name;

    public B_ObservableCommandHelloWorld(String name) {
        super(
                HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                                .withCircuitBreakerEnabled(true)
                                .withExecutionTimeoutInMilliseconds(40))
        );
        this.name = name;
    }

    @Override
    protected Observable<String> construct() {
        return Observable.create(observer -> {
            try {
                if (!observer.isUnsubscribed()) {
                    // 执行onNext返回结果,可执行多次
                    observer.onNext("Hello");
                    observer.onNext(name + "!");
                    observer.onCompleted();
                }
            } catch (Exception e) {
                observer.onError(e);
//                throw new RuntimeException(e.getMessage());
            }
        });
    }

    // 服务降级
    @Override
    protected Observable<String> resumeWithFallback() {
        // 忽略异常
        Observable.empty();
        // 直接抛出异常
        Observable.error(new Throwable("failure from Command fallback"));
        // 使用常量
        Observable.just(name);
        // 服务降级,使用备选方案(从缓存获取数据,调用数据库...)
        return Observable.create(observer -> {
            try {
                if (!observer.isUnsubscribed()) {
                    observer.onNext("Fallback.Hello");
                    observer.onNext("Fallback." + name + "!");
                    observer.onCompleted();
                }
            } catch (Exception e) {
                observer.onError(e);
            }
        });
    }

    public static void main(String[] args) throws Exception {
        // 监听
        B_ObservableCommandHelloWorld observable1 = new B_ObservableCommandHelloWorld("Jack");
        observable1.toObservable().subscribe(obj -> {
            System.out.println("next:" + obj);
        }, obj -> {
            System.out.println("error:" + obj);
        }, () -> {
            System.out.println("complete");
        });

        // 同步阻塞
        B_ObservableCommandHelloWorld observable2 = new B_ObservableCommandHelloWorld("Jack2");
        // 当observable2只返回一个结果,即内部只调用一次onNext,throw Exception:Sequence contains too many elements
//        System.out.println(observable2.observe().toBlocking().toFuture().get());
        // 当observable2返回多个结果,只取第一个。没有结果则报错:Sequence contains no elements
        System.out.println(observable2.observe().toBlocking().first());

    }
}

设置缓存

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

public classC_RequestCacheCommandextendsHystrixCommand<Integer> {

    private final Integer value;

    publicC_RequestCacheCommand(int value){
        super(
                HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CacheCommand"))
        );
        this.value = value;
    }

    @Override
    protected Integer run()throws Exception {
        System.out.println("run:" + value);
        return value;
    }

    // 通过控制key来动态缓存或者设置缓存时间
    @Override
    protected String getCacheKey(){
        return value + "";
    }

    publicstaticvoidmain(String[] args)throws Exception {

        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try {
            C_RequestCacheCommand command1 = new C_RequestCacheCommand(1);
            // 首次执行,execute获取结果
            System.out.println(command1.execute());
            System.out.println(command1.isResponseFromCache());
            C_RequestCacheCommand command2 = new C_RequestCacheCommand(1);
            // 再次执行,从缓存获取(同一个context中)
            System.out.println(command2.execute());
            System.out.println(command2.isResponseFromCache());
        } finally {
            context.shutdown();
        }

        context = HystrixRequestContext.initializeContext();
        try {
            // 新的context后,所有数据从新执行execute获取
            C_RequestCacheCommand command3 = new C_RequestCacheCommand(1);
            System.out.println(command3.execute());
            System.out.println(command3.isResponseFromCache());
        } finally {
            context.shutdown();
        }
    }
}

请求合并

单位时间内多次请求内部合并成一个请求,具体逻辑有Command中做
import com.netflix.hystrix.*;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

/**
 * Created by gmou on 2016/12/14.
 */
public class D_CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {

    private final Integer key;

    public D_CommandCollapserGetValueForKey(Integer key) {
        this.key = key;
    }

    @Override
    public Integer getRequestArgument() {
        return key;
    }

    @Override
    protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, Integer>> collapsedRequests) {
        System.out.println("createCommand:" + StringUtils.join(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()), ","));
        return new BatchCommand(collapsedRequests);
    }

    @Override
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> collapsedRequests) {
        System.out.println("mapResponseToRequests:" + StringUtils.join(batchResponse, ","));
        if (batchResponse == null || batchResponse.size() == 0) {
            return;
        }
        int count = 0;
        for (CollapsedRequest<String, Integer> request : collapsedRequests) {
            request.setResponse(batchResponse.get(count++));
        }
    }

    private static final class BatchCommand extends HystrixCommand<List<String>> {


        private final Collection<CollapsedRequest<String, Integer>> requests;

        private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
            System.out.println("BatchCommand:" + StringUtils.join(requests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()), ","));
            this.requests = requests;
        }

        @Override

        protected List<String> run() throws Exception {
            ArrayList<String> response = new ArrayList<String>();
            for (CollapsedRequest<String, Integer> request : requests) {
                if (request.getArgument() == 1) {
//                    throw new RuntimeException("argument can not be 1");
                }
                response.add("ret-" + request.getArgument());
            }
            return response;
        }

        @Override
        protected List<String> getFallback() {
            return new ArrayList<>();
        }
    }

    public static void main(String[] args) throws Exception {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();

        try {
            Future<String> f1 = new D_CommandCollapserGetValueForKey(1).queue();
            Future<String> f2 = new D_CommandCollapserGetValueForKey(2).queue();
            Future<String> f3 = new D_CommandCollapserGetValueForKey(3).queue();
            Future<String> f4 = new D_CommandCollapserGetValueForKey(4).queue();
            System.out.println("1 : " + f1.get());
            System.out.println("2 : " + f2.get());
            System.out.println("3 : " + f3.get());
            System.out.println("4 : " + f4.get());

//            List<Future<String>> rets = new ArrayList<>();
//            for (int i = 0; i < 100; i++) {
//                rets.add(new D_CommandCollapserGetValueForKey(i).queue());
//            }
//            for (Future<String> ret : rets) {
//                System.out.println(ret.get());
//            }

            System.out.println("ExecutedCommands.size:" + HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
            HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
            System.out.println("GetValueForKey:" + command.getCommandKey().name());
            System.out.println(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
            System.out.println(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            context.shutdown();
        }
    }

}

管理HystrixRequestContext

通过配置拦截器统一配置管理HystrixRequestContext生命周期

 通过拦截器统一设置管理HystrixRequstContext
 public class HystrixRequestContextServletFilter implements Filter {

     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
     throws IOException, ServletException {
         HystrixRequestContext context = HystrixRequestContext.initializeContext();
         try {
            chain.doFilter(request, response);
         } finally {
            context.shutdown();
         }
     }
 }

 <filter>
     <display-name>HystrixRequestContextServletFilter</display-name>
     <filter-name>HystrixRequestContextServletFilter</filter-name>
     <filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>
 </filter>
 <filter-mapping>
     <filter-name>HystrixRequestContextServletFilter</filter-name>
     <url-pattern>/*</url-pattern>
 </filter-mapping>

强制熔断器开启

import com.netflix.config.ConfigurationManager;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;

/**
 * 强制开启熔断,服务降级,取消优先级低的服务依赖,提高主服务性能
 */
public classE_ForceCircuitBreakerCommandextendsHystrixCommand<Boolean>{

    public E_ForceCircuitBreakerCommand() {
        super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("ForceCommandKey")));
    }

    @Override
    protected Boolean run() throws Exception {
        return true;
    }

    @Override
    protected Boolean getFallback() {
        return false;
    }


    public static void main(String[] args) throws Exception {
        System.out.println(new E_ForceCircuitBreakerCommand().execute());
        System.out.println("before : " + new E_ForceCircuitBreakerCommand().isCircuitBreakerOpen());
        /**
         *  hystrix.command.HystrixCommandKey.circuitBreaker.forceOpen 强制设置熔断器开启。
         *  http://stackoverflow.com/questions/29165654/how-to-force-a-hystrix-circuit-breaker-open
         */
        ConfigurationManager.getConfigInstance().setProperty("hystrix.command.ForceCommandKey.circuitBreaker.forceOpen", true);
        System.out.println("after : " + new E_ForceCircuitBreakerCommand().isCircuitBreakerOpen());
        System.out.println(new E_ForceCircuitBreakerCommand().execute());
    }
}

https://my.oschina.net/gmouou/blog/807439

猜你喜欢

转载自m635674608.iteye.com/blog/2384916