文章目录
“Hello World”
下面的列子展示了 HystrixCommand
的一个基本实现:
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() {
// 在真实世界,run() 方法可能会产生一些网络请求等
return "Hello " + name + "!";
}
}
HystrixObservableCommand
的等价版本
这个等价版本和 HystrixCommand
的不同包括重载构造器等,代码如下:
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;
import rx.Subscriber;
public class CommandHelloWorld extends HystrixObservableCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected Observable<String> construct() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
if (!observer.isUnsubscribed()) {
// 在真实世界,run() 方法可能会产生一些网络请求等
observer.onNext("Hello");
observer.onNext(name + "!");
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
}
}
译注:HystrixObservableCommand 依赖 RxJava。
同步执行
通过调用 execute()
方法,可以以同步的方式执行一个 HystrixCommand
,示例如下:
String s = new CommandHelloWorld("World").execute();
单元测试代码如下:
@Test
public void testSynchronous() {
assertEquals("Hello World!", new CommandHelloWorld("World").execute());
assertEquals("Hello Bob!", new CommandHelloWorld("Bob").execute());
}
HystrixObservableCommand
的等价版本
对 HystrixObservableCommand
来说,没有一种简单方法来实现 execute
,但如果你能确定 HystrixObservableCommand
内部的 Observable
永远只返回单个值(译注:即 onNext()
方法至多只会被调用一次),那么你可以通过对 HystrixObservableCommand
内部的 Observable
调用其 .toBlocking().toFuture().get()
来模拟 execute()
的行为。
异步执行
通过调用 queue()
方法,可以以异步的方式执行 HystrixCommand
,示例如下:
Future<String> fs = new CommandHelloWorld("World").queue();
通过对返回的 Future
调用 get
方法来获取结果:
String s = fs.get();
如下单元测试例子展示了这种异步行为:
@Test
public void testAsynchronous1() throws Exception {
assertEquals("Hello World!", new CommandHelloWorld("World").queue().get());
assertEquals("Hello Bob!", new CommandHelloWorld("Bob").queue().get());
}
@Test
public void testAsynchronous2() throws Exception {
Future<String> fWorld = new CommandHelloWorld("World").queue();
Future<String> fBob = new CommandHelloWorld("Bob").queue();
assertEquals("Hello World!", fWorld.get());
assertEquals("Hello Bob!", fBob.get());
}
如下两种写法是等价的:
String s1 = new CommandHelloWorld("World").execute();
String s2 = new CommandHelloWorld("World").queue().get();
HystrixObservableCommand
的等价版本
对 HystrixObservableCommand
来说,没有一种简单方法来实现 execute
,但如果你能确定 HystrixObservableCommand
内部的 Observable
永远只返回单个值,那么你可以通过对 HystrixObservableCommand
内部的 Observable
调用其 RxJava
提供的 .toBlocking().toFuture()
方法来模拟 queue()
的行为。
Reactive模式执行
你也可以将 HystrixCommand
当作一个 Observable
来使用观察者模式获得结果,调用方式如下:
observe()
—— 返回一个 Hot Observable,这个命令将在调用observe()
方法时被立即执行。你不用担心命令在返回Observable
时被执行而无法观察/订阅到结果,因为这个 Observable 内部在每次有新的 Subscriber 订阅时会重放 Observable 的行为。toObservable()
—— 返回一个 Cold Observable,调用完toObservable()
方法之后命令不会立即被执行,直到有 Subscriber 订阅了这个 Observable。
Observable<String> ho = new CommandHelloWorld("World").observe();
// or Observable<String> co = new CommandHelloWorld("World").toObservable();
为了获取到命令执行的结果,需要对相应发起请求的 Observable 进行订阅:
ho.subscribe(new Action1<String>() {
@Override
public void call(String s) {
// value emitted here
}
});
以下的单元测试展示了这种行为:
@Test
public void testObservable() throws Exception {
Observable<String> fWorld = new CommandHelloWorld("World").observe();
Observable<String> fBob = new CommandHelloWorld("Bob").observe();
// 阻塞模式
assertEquals("Hello World!", fWorld.toBlockingObservable().single());
assertEquals("Hello Bob!", fBob.toBlockingObservable().single());
// 非阻塞模式
// - 匿名内部类形式,本测试不做任何断言
fWorld.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
// nothing needed here
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String v) {
System.out.println("onNext: " + v);
}
});
// 非阻塞模式
// - 同样是匿名内部类形式,忽略“异常”和“完成”回调
fBob.subscribe(new Action1<String>() {
@Override
public void call(String v) {
System.out.println("onNext: " + v);
}
});
}
如果你使用 Java 8 的 Lambda表达式/闭包,代码会更加紧凑,如下所示:
fWorld.subscribe((v) -> {
System.out.println("onNext: " + v);
})
// - 或者带上异常处理
fWorld.subscribe((v) -> {
System.out.println("onNext: " + v);
}, (exception) -> {
exception.printStackTrace();
})
关于 Observable 的信息,请查阅:http://reactivex.io/documentation/observable.html
响应式(Reactive)命令
通过上述命令,可以将一个 HystrixCommand
转换成 Observable
,但使用 HystrixObservableCommand
这个 HystrixCommand
的特殊版本是一个更好的选择。HystrixObservableCommand
可以封装一个能发射(译注:意指 Subscriber 的 onNext() 能被调用多次)多次的 Observable
,而将普通的 HystrixCommand
转成的 Observable
则最多只能发射一次。
如果使用 HystrixObservableCommand
,你应该将你的命令逻辑放置在重载的 construct
方法中而不是 run
方法中(就像使用普通 HystrixCommand
一样),这样 Hystrix 才能将你的命令逻辑包装到 Observable 中。
通过如下两种方式,可以从 HystrixObservableCommand
得到 Observable
:
observe()
—— 返回一个“热” Observable,这个方法内部会立即订阅底层 Observable,但你不用担心在这个方法返回 Observable 再去订阅会不会丢数据,因为这个方法使用了ReplaySubject
去订阅 Observable。即这个方法会让命令逻辑立即执行。toObservable()
—— 返回一个“冷” Observable,这个方法内部不会订阅底层的 Observable,即除非你去订阅返回的 Observable,否则命令逻辑不会执行。
失败回退(Fallback)
通过提供一个失败回退的方法(fallback 方法),Hystrix 在主命令逻辑发生异常时能从这个方法中得到一个默认值或者一些数据作为命令的返回值,从而实现优雅的服务降级。当然,你可能会给所有可能失败的命令增加失败回退方法,但也有如下一些例外:
-
写操作
如果一个 Hystrix 命令用于写操作而不是返回一个值(例如一个返回void
的HystrixCommand
或者一个返回空 Observable 的 HystrixObservableCommand),失败回退逻辑并没有多大的意义。一般如果写失败,应该向上传播至命令的调用者。 -
批处理/离线计算
如果你编写的 Hystrix 命令用于填充缓存,生成报表,或者做一些离线计算,通常将错误向上传播到调用者以便其可以稍后重试会更加合适,向上传递一个降级后的结果对调用者来说没有意义。
无论你的命令是否有失败回退方法,Hystrix 内部维持的状态和熔断器状态/指标都会得到更新。
在普通的 HystrixCommand
中,你可以通过实现 getFallback()
方法来提供失败回退。Hystrix 将会在命令执行过程中发生任何失败时执行此失败回退逻辑,例如 run()
方法失败,超时,线程池/信号量拒绝或者熔断器短路时。下面的例子展示了如何使用失败回退:
public class CommandHelloFailure extends HystrixCommand<String> {
private final String name;
public CommandHelloFailure(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() {
throw new RuntimeException("this command always fails");
}
@Override
protected String getFallback() {
return "Hello Failure " + name + "!";
}
}
上述例子中 run()
方法在每次命令执行时都会失败,但调用者并不会收到异常,而是会得到 getFallback() 返回的值:
@Test
public void testSynchronous() {
assertEquals("Hello Failure World!", new CommandHelloFailure("World").execute());
assertEquals("Hello Failure Bob!", new CommandHelloFailure("Bob").execute());
}
HystrixObservableCommand
的等价版本
在 HystrixObservableCommand
中,你需要重载 resumeWithFallback
方法以返回一个执行失败回退逻辑的 Observable,这样当失败时,Hystrix 会通知所有 Subscriber 这个 Observable 的执行结果。因为 Observable 可能会发射多次,并且可能在发射多次之后发生失败,因此,你的失败回退逻辑并不能保证所有的 Subscriber 只接收到失败回退 Observable 返回的数据。
在发生失败时,Hystrix 在内部使用 RxJava 的 onErrorResumeNext
操作符在主 Observable 和失败回退 Observable 之间进行无缝转化。
错误传播
run()
方法中抛出的所有异常,除开 HystrixBadRequestException
之外,均会被算作命令执行失败而触发 getFallback()
方法的调用和熔断器逻辑。
你可以通过将异常放进 HystrixBadRequestException
并通过 getCause()
获取真实的异常,而不会触发失败回退。HystrixBadRequestException
专门用来处理这种不应该算作命令执行失败,并且不应该触发熔断器的场景,例如向调用者报告参数不合法或者非系统异常等。
HystrixObservableCommand
的等价版本
若使用 HystrixObservableCommand
,不可恢复的错误将由主 Observable 通过 onError
来报告,而失败回退逻辑则是由 Hystrix 从 resumeWithFallback
方法中得到的另一个 Observable 来报告。
命令名称
默认情况下,命令的名称由类名决定:
getClass().getSimpleName()
也可以通过 HystrixCommand
或 HystrixObservableCommand
的构造器显式传入:
public CommandHelloWorld(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")));
this.name = name;
}
通过定义 Setter
的缓存,可以避免每次构造新的命令时重新构造 Setter:
private static final Setter cachedSetter =
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"));
public CommandHelloWorld(String name) {
super(cachedSetter);
this.name = name;
}
HystrixCommandKey
是一个接口,并能被实现为枚举类型或者普通类,但其仍然会保留 Factory 内部类以构造并 intern Key 对象:
HystrixCommandKey.Factory.asKey("HelloWorld")
命令组
Hystrix 提供命令组机制,通过使用相同命令组 Key,来统一一组命令的报表、告警、仪表盘或组/库的所有权。
默认情况下,Hystrix 会让一组命令使用统一的线程池,除非手工指定线程池的划分。
HystrixCommandGroupKey
是一个接口,并能被实现为枚举类型或者普通类,但其仍然会保留 Factory 内部类以构造并 intern Key 对象:
HystrixCommandGroupKey.Factory.asKey("ExampleGroup")
命令线程池
Hystrix 使用线程池 Key 来表示一个 HystrixThreadPool
,以实现监控、监控指标上报、缓存和其他用途中线程池的区分。一个 HystrixCommand
与一个通过 HystrixThreadPoolKey
获得的线程池相关联,该线程池会被注入到这个命令中。如果没有指定线程池 Key,命令则默认会和一个通过 HystrixCommandGroupKey
创建的线程池相关联。
也可以通过 HystrixCommand
或 HystrixObservableCommand
的构造器显式传入:
public CommandHelloWorld(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")));
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
HystrixThreadPoolKey
是一个接口,并能被实现为枚举类型或者普通类,但其仍然会保留 Factory 内部类以构造并 intern Key 对象:
HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")
尽量使用 HystrixThreadPoolKey
而不是通过指定另一个 HystrixCommandGroupKey
的原因是:多个命令可能从所有权或逻辑功能上来看属于同一个组,但某些命令可能需要和其他命令隔离开来。
例如:
- 有两个用来访问视频元数据的命令
- 组名为 “VideoMetadata”
- 命令A 访问 资源#1
- 命令B 访问 资源#2
如果 命令A 出现延迟并耗尽其关联的线程池,其不应该影响 命令B 的正常执行,因为它们访问的是不同的后端资源。
因此,我们从逻辑上将这两个命令放在同一个组,但需要将二者隔离开来,通过使用 HystrixThreadPoolKey
,我们可以实现这一需求。
请求缓存
通过实现 HystrixCommand
或 HystrixObservableCommand
的 getCacheKey() 方法来开启请求缓存:
public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
private final int value;
protected CommandUsingRequestCache(int value) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.value = value;
}
@Override
protected Boolean run() {
return value == 0 || value % 2 == 0;
}
@Override
protected String getCacheKey() {
return String.valueOf(value);
}
}
因其依赖于请求上下文,因此在使用请求缓存之前,需要初始化 HystrixRequestContext
。下面的单元测试演示了如何使用请求缓存:
@Test
public void testWithoutCacheHits() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
assertTrue(new CommandUsingRequestCache(2).execute());
assertFalse(new CommandUsingRequestCache(1).execute());
assertTrue(new CommandUsingRequestCache(0).execute());
assertTrue(new CommandUsingRequestCache(58672).execute());
} finally {
context.shutdown();
}
}
一般来说,可以在 ServletFilter
中初始化上下文或者关闭上下文,以包装所有的请求或其他 Servlet 带生命周期的钩子方法。
下面的例子演示了命令如何从缓存中获取结果(以及如何判断结果是否从缓存中获取):
@Test
public void testWithCacheHits() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);
assertTrue(command2a.execute());
// 这是第一次执行命令,结果未命中缓存
assertFalse(command2a.isResponseFromCache());
assertTrue(command2b.execute());
// 这是第二次执行命令,结果“2”命中缓存
assertTrue(command2b.isResponseFromCache());
} finally {
context.shutdown();
}
// 创建一个新的上下文(缓存为空)
context = HystrixRequestContext.initializeContext();
try {
CommandUsingRequestCache command3b = new CommandUsingRequestCache(2);
assertTrue(command3b.execute());
// 此时缓存为空,结果未命中缓存
assertFalse(command3b.isResponseFromCache());
} finally {
context.shutdown();
}
}
请求合并
通过请求合并,可以将多个请求合并到单个 HystrixCommand
的执行周期中。
合并器(Collapser)可以使用批大小(batch size)和自创建批命令以来耗时(elapsed time)来触发请求合并。
Hystrix 支持两种风格的请求合并:请求范围合并或全局范围合并,默认情况下使用请求范围合并,也可以通过合并器的构造器来配置。
请求范围风格的合并器在单个 HystrixRequestContext
中收集请求,而全局范围风格的合并器则从多个 HystrixRequestContext
中收集请求。因此,如果下游依赖无法在一次命令执行中处理多个 HystrixRequestContext
,选择请求范围风格的合并器比较合适。
下面的例子演示了如何实现请求范围风格的 HystrixCollapser
:
public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {
private final Integer key;
public CommandCollapserGetValueForKey(Integer key) {
this.key = key;
}
@Override
public Integer getRequestArgument() {
return key;
}
@Override
protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
return new BatchCommand(requests);
}
@Override
protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
int count = 0;
for (CollapsedRequest<String, Integer> request : requests) {
request.setResponse(batchResponse.get(count++));
}
}
private static final class BatchCommand extends HystrixCommand<List<String>> {
private final Collection<CollapsedRequest<String, Integer>> requests;
private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
this.requests = requests;
}
@Override
protected List<String> run() {
ArrayList<String> response = new ArrayList<String>();
for (CollapsedRequest<String, Integer> request : requests) {
// 手工构造返回结果
response.add("ValueForKey: " + request.getArgument());
}
return response;
}
}
}
下面的单元测试演示了如何使用合并器来自动将四个 CommandCollapserGetValueForKey
合并到单个 HystrixCommand
的执行过程中:
@Test
public void testCollapser() throws Exception {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();
assertEquals("ValueForKey: 1", f1.get());
assertEquals("ValueForKey: 2", f2.get());
assertEquals("ValueForKey: 3", f3.get());
assertEquals("ValueForKey: 4", f4.get());
// 断言合并请求 'GetValueForKey' 只执行了一次
assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
// 断言发出的请求是我们期望的请求
assertEquals("GetValueForKey", command.getCommandKey().name());
// 断言请求是否通过“合并”的方式发出
assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
// 并且被成功执行
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
} finally {
context.shutdown();
}
}
建立请求上下文
要使用请求范围的特性(例如请求缓存,请求合并,请求日志等),你必需手工管理 HystrixRequestContext
的生命周期(或者实现 HystrixConcurrencyStrategy
)。
这意味着你必需在发出请求前执行:
HystrixRequestContext context = HystrixRequestContext.initializeContext();
并且在请求结束后执行:
context.shutdown();
在标准 Java Web 应用中,你可以通过 Servlet Filter 来初始化上下文,就像如下 Filter 一样:
public class HystrixRequestContextServletFilter implements Filter {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
chain.doFilter(request, response);
} finally {
context.shutdown();
}
}
}
并通过如下配置 web.xml 来使上述 Filter 对所有请求生效:
<filter>
<display-name>HystrixRequestContextServletFilter</display-name>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
一些常用模式
接下来的章节将会展示一些 HystrixCommand
或 HystrixObservableCommand
的惯用法和模式。
快速失败
一种最基本的用法是执行一个命令,并且没有定义失败回退逻辑。如果有任何错误发生,这个命令将会抛出异常:
public class CommandThatFailsFast extends HystrixCommand<String> {
private final boolean throwException;
public CommandThatFailsFast(boolean throwException) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.throwException = throwException;
}
@Override
protected String run() {
if (throwException) {
throw new RuntimeException("failure from CommandThatFailsFast");
} else {
return "success";
}
}
}
如下单元测试展示了其行为:
@Test
public void testSuccess() {
assertEquals("success", new CommandThatFailsFast(false).execute());
}
@Test
public void testFailure() {
try {
new CommandThatFailsFast(true).execute();
fail("we should have thrown an exception");
} catch (HystrixRuntimeException e) {
assertEquals("failure from CommandThatFailsFast", e.getCause().getMessage());
e.printStackTrace();
}
}
HystrixObservableCommand 的等价版本
HystrixObservableCommand
快速失败的等价方案是通过重载 resumeWithFallback
方法来实现的:
@Override
protected Observable<String> resumeWithFallback() {
if (throwException) {
return Observable.error(new Throwable("failure from CommandThatFailsFast"));
} else {
return Observable.just("success");
}
}
静默失败
静默失败等价于返回空结果或功能被移除,通过返回 null,空 Map,空 List 或其他类似结果来实现。
你可以实现 getFallback() 方法来实现静默失败:
public class CommandThatFailsSilently extends HystrixCommand<String> {
private final boolean throwException;
public CommandThatFailsSilently(boolean throwException) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.throwException = throwException;
}
@Override
protected String run() {
if (throwException) {
throw new RuntimeException("failure from CommandThatFailsFast");
} else {
return "success";
}
}
@Override
protected String getFallback() {
return null;
}
}
@Test
public void testSuccess() {
assertEquals("success", new CommandThatFailsSilently(false).execute());
}
@Test
public void testFailure() {
try {
assertEquals(null, new CommandThatFailsSilently(true).execute());
} catch (HystrixRuntimeException e) {
fail("we should not get an exception as we fail silently with a fallback");
}
}
另一种实现会返回空 List
:
@Override
protected List<String> getFallback() {
return Collections.emptyList();
}
HystrixObservableCommand
的等价版本
HystrixObservableCommand
静默失败的等价方案是通过重载 resumeWithFallback
方法来实现的:
@Override
protected Observable<String> resumeWithFallback() {
return Observable.empty();
}
失败回退:静态模式
失败回退可以返回嵌入到代码的默认值,这种形式的失败回退和上述静默失败的例子不同,这种方式是为了返回一个默认值以使得逻辑能正常流转。
例如,如果一个命令基于用户身份来返回 true/false
的结果,但执行过程中出错了,那么其可以默认返回 true
:
@Override
protected Boolean getFallback() {
return true;
}
HystrixObservableCommand
的等价版本
HystrixObservableCommand
失败回退(静态模式)的等价方案是通过重载 resumeWithFallback
方法来实现的:
@Override
protected Observable<Boolean> resumeWithFallback() {
return Observable.just(true);
}
失败回退:打桩式
通常情况下,当你的命令返回一个包含多个字段的复合对象时,你需要使用打桩式的失败回退策略,复合对象中某些字段从请求中获取,而另一些字段则设置成默认值。
如下展示了一些你需要在失败回退结果中放置从请求中获取的字段:
- Cookie
- 请求参数或请求头部
- 当前失败命令之前的请求结果
失败回退结果中的桩字段可以直接从请求范围内获取,但一般而言,推荐使用在命令初始化时进行注入的方式,就像如下例子中的 countryCodeFromGeoLookup
字段一样:
public class CommandWithStubbedFallback extends HystrixCommand<UserAccount> {
private final int customerId;
private final String countryCodeFromGeoLookup;
/**
* @param customerId 用于获取 UserAccount 的消费者Id
* @param countryCodeFromGeoLookup 用于失败回退的默认国家码
*/
protected CommandWithStubbedFallback(int customerId, String countryCodeFromGeoLookup) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.customerId = customerId;
this.countryCodeFromGeoLookup = countryCodeFromGeoLookup;
}
@Override
protected UserAccount run() {
// 从远程服务中获取 UserAccount
// return UserAccountClient.getAccount(customerId);
throw new RuntimeException("forcing failure for example");
}
@Override
protected UserAccount getFallback() {
/**
* 返回一个打桩式的失败回退结果,其中包含了一些默认值,占位符和一个
* 注入进来的 'countryCodeFromGeoLookup',以替代从远程服务获取的结果
*/
return new UserAccount(customerId, "Unknown Name",
countryCodeFromGeoLookup, true, true, false);
}
public static class UserAccount {
private final int customerId;
private final String name;
private final String countryCode;
private final boolean isFeatureXPermitted;
private final boolean isFeatureYPermitted;
private final boolean isFeatureZPermitted;
UserAccount(int customerId, String name, String countryCode,
boolean isFeatureXPermitted,
boolean isFeatureYPermitted,
boolean isFeatureZPermitted) {
this.customerId = customerId;
this.name = name;
this.countryCode = countryCode;
this.isFeatureXPermitted = isFeatureXPermitted;
this.isFeatureYPermitted = isFeatureYPermitted;
this.isFeatureZPermitted = isFeatureZPermitted;
}
}
}
以下单元测试演示了这种行为:
@Test
public void test() {
CommandWithStubbedFallback command = new CommandWithStubbedFallback(1234, "ca");
UserAccount account = command.execute();
assertTrue(command.isFailedExecution());
assertTrue(command.isResponseFromFallback());
assertEquals(1234, account.customerId);
assertEquals("ca", account.countryCode);
assertEquals(true, account.isFeatureXPermitted);
assertEquals(true, account.isFeatureYPermitted);
assertEquals(false, account.isFeatureZPermitted);
}
HystrixObservableCommand
的等价版本
HystrixObservableCommand
失败回退(打桩式)的等价方案是通过重载 resumeWithFallback
方法返回桩结果来实现的:
@Override
protected Observable<Boolean> resumeWithFallback() {
return Observable.just( new UserAccount(customerId, "Unknown Name",
countryCodeFromGeoLookup, true, true, false) );
}
如果你期望你的命令(Observable 式)发射多个结果,你可以采取生成那些在命令失败之前没有被发射出的结果的方式来实现失败回退。下面的例子演示了如何实现这种效果 —— 通过追踪最后一个发射出来的结果,失败回退 Observable 可以知道从何处继续发射余下的结果:
@Override
protected Observable<Integer> construct() {
return Observable.just(1, 2, 3)
.concatWith(Observable.<Integer> error(new RuntimeException("forced error")))
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer t1) {
lastSeen = t1;
}
})
.subscribeOn(Schedulers.computation());
}
@Override
protected Observable<Integer> resumeWithFallback() {
if (lastSeen < 4) {
return Observable.range(lastSeen + 1, 4 - lastSeen);
} else {
return Observable.empty();
}
}
失败回退:使用网络上的缓存
有时,如果一个后端服务 Down 掉后,我们可以从缓存服务,例如 memcached,中获取一个可能不是最新的结果。
因为在这种情况下,失败回退逻辑需要通过网络来获取数据,因此这又是一个可能失败的点,并且需要包装在 HystrixCommand
或 HystrixObservableCommand
中。
需要注意的是,失败回退里的命令需要在单独的线程池中执行,否则如果二者共享线程池的话,当主命令延迟变高并且占满整个线程池时,将会导致失败回退的命令也无法得到执行。
下面的例子演示了 CommandWithFallbackViaNetwork
如何在其 getFallback()
方法中执行 FallbackViaNetwork
命令。
注意 FallbackViaNetwork
失败时,其也会通过其“静默失败”返回 null
的方式来处理自身的失败。
要使 FallbackViaNetwork
命令在不同的线程池,而不是在 RemoteServiceX
这个命令组的线程池中被执行,需要向其构造器中注入 HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")
。
这意味着 CommandWithFallbackViaNetwork
将会在名为 RemoteServiceX
的线程池中执行,而 FallbackViaNetwork
命令则会在名为 RemoteServiceXFallback
的线程池中执行。
public class CommandWithFallbackViaNetwork extends HystrixCommand<String> {
private final int id;
protected CommandWithFallbackViaNetwork(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand")));
this.id = id;
}
@Override
protected String run() {
// RemoteServiceXClient.getValue(id);
throw new RuntimeException("force failure for example");
}
@Override
protected String getFallback() {
return new FallbackViaNetwork(id).execute();
}
private static class FallbackViaNetwork extends HystrixCommand<String> {
private final int id;
public FallbackViaNetwork(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand"))
// use a different threadpool for the fallback command
// so saturating the RemoteServiceX pool won't prevent
// fallbacks from executing
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")));
this.id = id;
}
@Override
protected String run() {
MemCacheClient.getValue(id);
}
@Override
protected String getFallback() {
// the fallback also failed
// so this fallback-of-a-fallback will
// fail silently and return null
return null;
}
}
}
主从系统的失败回退
有时候系统可能会有多个状态 —— 主/从,或主/备。
有时 从状态/备用状态 被当作失败状态并且这种状态就是用于失败回退,在这种场景下,恰好适用于上述 “使用网络上的缓存” 模式。
然而,如果切换到从状态经常发生,假设发布新代码都采取这种措施(即发布新代码后将从原有代码切换到新代码上,有时这是有状态系统处理代码更新的一种手段),每次切换时,主系统就会失效,导致熔断器断开,并且导致告警。
如果我们不希望因为这个原因导致告警太频繁而真正失败到来时告警却被忽略(译注:Cry Wolf,即“狼来了”的故事),那么这并不是我们期望的行为。
因此,我们可以以平常心对待主从切换,并将主从的执行逻辑放置在一个 Facade 之后。
主从 HystrixCommand
的实现需要线程隔离,因为其需要执行网络请求和业务逻辑。它们的性能可能不同(通常,从系统是一个静态缓存),因此将二者封装在不同的命令中的另一好处是,它们可以被单独调优。
不要将这两个命令对外公开,而是要将其隐藏在另一个使用信号量进行隔离的 HystrixCommand
(即上文的 Facade) 中,并在其中实现选用主还是从命令的逻辑。如果主从命令均失败了,那么就执行 Facade 自身的失败回退逻辑。
这个 Facade HystrixCommand
使用信号量进行隔离即可,因为其唯一的作用就是选择并执行主从命令中的一个命令,而这两个命令均使用线程池进行隔离。没有必要为其引入多线程逻辑,因为其 run()
方法并没有网络请求,重试或其他可能等可能导致错误的行为。
public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> {
private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true);
private final int id;
public CommandFacadeWithPrimarySecondary(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand"))
.andCommandPropertiesDefaults(
// we want to default to semaphore-isolation since this wraps
// 2 others commands that are already thread isolated
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
this.id = id;
}
@Override
protected String run() {
if (usePrimary.get()) {
return new PrimaryCommand(id).execute();
} else {
return new SecondaryCommand(id).execute();
}
}
@Override
protected String getFallback() {
return "static-fallback-" + id;
}
@Override
protected String getCacheKey() {
return String.valueOf(id);
}
private static class PrimaryCommand extends HystrixCommand<String> {
private final int id;
private PrimaryCommand(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
.andCommandPropertiesDefaults(
// we default to a 600ms timeout for primary
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600)));
this.id = id;
}
@Override
protected String run() {
// perform expensive 'primary' service call
return "responseFromPrimary-" + id;
}
}
private static class SecondaryCommand extends HystrixCommand<String> {
private final int id;
private SecondaryCommand(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
.andCommandPropertiesDefaults(
// we default to a 100ms timeout for secondary
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100)));
this.id = id;
}
@Override
protected String run() {
// perform fast 'secondary' service call
return "responseFromSecondary-" + id;
}
}
public static class UnitTest {
@Test
public void testPrimary() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute());
} finally {
context.shutdown();
ConfigurationManager.getConfigInstance().clear();
}
}
@Test
public void testSecondary() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute());
} finally {
context.shutdown();
ConfigurationManager.getConfigInstance().clear();
}
}
}
}
客户端无网络访问
如果你的命令不需要访问网络,但需要考虑延迟或线程池占满等情况,你可以将 executionIsolationStrategy
属性设置成 ExecutionIsolationStrategy.SEMAPHORE
,Hystrix 将使用信号量进行服务隔离。
下面的例子演示了如何将“使用信号量隔离”设置为默认值(你也可以通过动态属性的方式设置):
public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> {
private final int id;
public CommandUsingSemaphoreIsolation(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
// 我们只做内存内的缓存查询,因此我们将隔离策略设置为 SEMAPHORE
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
this.id = id;
}
@Override
protected String run() {
// 真实的例子中可能会从内存数据结构中获取数据
return "ValueFromHashMap_" + id;
}
}
带请求缓存失效的“读-写-读”
如果你需要实现一个“读-写-读”模型,其中“读”操作需要请求网络,并且请求缓存被启用,但其“写”操作在另一个命令中被执行,并且期望能使同请求中的缓存失效,你可以通过调用 HystrixRequestCache.clear()
来做到这一点。
下面是一个示例实现:
public class CommandUsingRequestCacheInvalidation {
/* 表示一个远程数据源 */
private static volatile String prefixStoredOnRemoteDataStore = "ValueBeforeSet_";
public static class GetterCommand extends HystrixCommand<String> {
private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey("GetterCommand");
private final int id;
public GetterCommand(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetSetGet"))
.andCommandKey(GETTER_KEY));
this.id = id;
}
@Override
protected String run() {
return prefixStoredOnRemoteDataStore + id;
}
@Override
protected String getCacheKey() {
return String.valueOf(id);
}
/**
* 用于刷新缓存
*
* @param id 命令构造器上的入参
*/
public static void flushCache(int id) {
HystrixRequestCache.getInstance(GETTER_KEY,
HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id));
}
}
public static class SetterCommand extends HystrixCommand<Void> {
private final int id;
private final String prefix;
public SetterCommand(int id, String prefix) {
super(HystrixCommandGroupKey.Factory.asKey("GetSetGet"));
this.id = id;
this.prefix = prefix;
}
@Override
protected Void run() {
// 持久化数据
prefixStoredOnRemoteDataStore = prefix;
// 刷新缓存
GetterCommand.flushCache(id);
// 无返回值
return null;
}
}
}
迁移到Hystrix
当你决定将已有的库迁移到 Hystrix 时,你需要将每一个 Service 方法替换成 HystrixCommand
。
这些服务方法需要将请求直接定向到 HystrixCommand
,并且不要带任何额外的业务逻辑。
因此,服务库迁移之前可能如下图所示:
迁移后,库的使用者将可以直接访问 HystrixCommand
或间接通过服务 Facade 访问。