如果你害怕失败,你就会失败。
–> 返回专栏总目录 <–
代码下载地址:https://github.com/f641385712/netflix-learning
目录
前言
前面用了几篇文章内容分析了Hystrix执行fallback的逻辑以及导致降级的各种情况,但是作为正常执行的逻辑均还没涉及。比如需要知道:在线程池隔离下如何执行?在信号量隔离下如何执行呢?
介绍过了异常情况的处理,本文将介绍Hystrix的正常执行流程以及源码解析。
正文
对于方法的执行,Hystrix面向使用者一共提供了四种方法:execute/queue/observe/toObservable
,分别应用于不同的场景。而最终要执行目标方法的话,都会归并到一出,这边是本文入口:executeCommandAndObserve()
方法。
executeCommandAndObserve()源码解读
该方法是AbstractCommand
的一个私有方法,语义是:执行Command命令并且Observe
返回一个可被观察的实例Observable<R>
。很明显,它将围绕着目标方法的执行而展开~
AbstractCommand:
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
// 执行上下文。保证线程池内亦能获取到主线程里的参数
HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
... // 暂时忽略一些内置的function们
Observable<R> execution;
// 二者的唯一却别是:若开启了超时支持的话,就只可观察对象的结果处
// lift一个HystrixObservableTimeoutOperator实例,以监控超时情况
// 关于Hystrix是如何实现超时的,这在后面专文讲解~~是个较大的,也较难的话题
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// 得到execution后,开始注册一些基本的事件、观察者
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
目标方法的执行在executeCommandWithSpecifiedIsolation()
方法里。但在此之前下介绍下其它函数的作用:
doOnNext(markEmits)
观察者被回调之前的调用(此时其实数据已经发送,也就是目标方法已经执行了)。
markEmits函数内容:
markEmits = r -> {
// 是否应该在onNext这步报告数据
// HystrixCommand -> false
// HystrixObservableCommand -> true
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
// 命令是否是标量
// HystrixCommand -> true
// HystrixObservableCommand -> false
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
// 这几句代码是重点:
// 记录结果为SUCCESS成功
// 并且,并且,并且circuitBreaker.markSuccess();(若断路器是打开的,此处就关闭了)
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
此步骤最重要的两件事:
- 记录result结果事件为:SUCCESS
- 闭合circuitBreaker断路器(若已经是闭合的就忽略呗)
doOnCompleted(markOnCompleted)
正常完成(发射、监听全部正常时)
markOnCompleted函数内容:
markOnCompleted = () -> {
// 注意这个!符号
if (!commandIsScalar()) {
... // 逻辑完全同上面的Scalar部分
}
}
该函数主要是确保非Scala类型结果也能够正常关闭断路器以及标记Success。
onErrorResumeNext(handleFallback)
重要。当目标方法执行过程中发生错误会执行此函数,用于Resume恢复而不是立马停止线程:这边是触发fallback逻辑的入口。
handleFallback函数内容:
handleFallback = (Throwable t) -> {
// 把Throwable t强转为Exception e(若不是Exception类型就包装为Exception类型)
// 比如若t是NPE异常,那么t和e是完全一样的。
// 只有当t是error类时,t才和e不相等
Exception e = getExceptionFromThrowable(t);
// 既然发生错误了,那就记录执行时候的异常e
executionResult = executionResult.setExecutionException(e);
// 若异常类型是RejectedExecutionException:线程池拒绝
// 若异常类型是HystrixTimeoutException:目标方法执行超时
// 若异常类型是HystrixBadRequestException:下文详细分解
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
// 什么时候会进入到这里?只有当子类复写了getExceptionFromThrowable()方法的时候才有可能进入到这里
// 这里算是一种兜底:保证不管咋样HystrixBadRequestException都不会触发熔断
// 其实我倒觉得,不让getExceptionFromThrowable这个方法被复写也行的
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
当目标方法执行过程中抛出异常(可能是程序问题、可能是超时等等)时候,会进入到这里来处理,处理case可分为两大类:
doOnEach(setRequestContext)
该步骤每次都会执行:为子线程设置请求上下文,保证数据打通。
setRequestContext = (Notification<? super R> rNotification) -> {
if (!HystrixRequestContext.isCurrentThreadInitialized()) {
HystrixRequestContext.setContextOnCurrentThread(currentRequestContext);
}
}
关于Hystrix是如何实现通过HystrixRequestContext
完成跨线程通信的,可参考这篇文章。
executeCommandWithSpecifiedIsolation()
它也是AbstractCommand
的一个私有方法,只有被executeCommandAndObserve
调用。它俩的区别你可简单理解为:
executeCommandWithSpecifiedIsolation()
用于对目标方法的真正执行executeCommandAndObserve
在其基础上封装,加上了执行结果处理、超时处理、出现异常后的fallback处理等额外的逻辑。
该方法字面意思:在规定的隔离方式里执行Command命令,这里规定的隔离策略有且仅有两种:
- THREAD:线程池隔离(默认)
- SEMAPHORE:信号量隔离
THREAD线程池隔离下执行
线程池隔离逻辑描述:
AbstractCommand:
private Observable<R> executeCommandWithSpecifiedIsolation(AbstractCommand<R> _cmd) {
// 标记我们正在一个线程中执行(即使我们最终被拒绝
// 我们仍然是一个线程执行,而不是信号量)
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// 默认是一个defer延迟执行的可观察对象
// 注意:进到这个回调里面来后,就是使用的线程池的资源去执行了(获取到了线程池资源)
// 比如此处线程号就是:hystrix-fallbackDemoGroup-1
return Observable.defer(() -> {
// 记录:目标方法已经执行(不管出异常与否,反正就是执行了)
// 应为如果被熔断了,或者线程池拒绝了它是不会被执行的
executionResult = executionResult.setExecutionOccurred();
// 线程状态必须是OBSERVABLE_CHAIN_CREATED时才让执行
// 而此状态是由toObservable()方法设置过来的
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
// 收集指标信息:开始执行
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
// 这个判断非常的有意思:如果在run方法还没执行之前
// 也就是在线程切换之间就超时了,那就直接返回一个错误
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
return Observable.error(new RuntimeException("timed out before executing run()"));
}
// 把执行的线程状态标记为STARTED:启动
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
// 全局计数器+1
// 此处是唯一调用处。信号量里是木有此调用的哦
HystrixCounters.incrementGlobalConcurrentThreads();
// 标记线程池已经开始准备执行了
threadPool.markThreadExecution();
// store the command that is being run
// 这个保存使用的ThreadLocal<ConcurrentStack<HystrixCommandKey>>和当前线程绑定
// 这样确保了命令在执行时的线程安全~~~~~~~
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
// 执行钩子程序,以及执行目标run方法程序
// getUserExecutionObservable:getExecutionObservable()抽象方法获取到目标方法
// 本处也是该抽象方法的唯一调用处哦
// 若这里面任何一个方法抛出异常(哪怕是hook方法),就原样抛出
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else { // 说明已经unsubscribed了,就抛错
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
});
}
}
待执行的目标Observable
实例由getUserExecutionObservable()
提供,内部便是调用了抽象方法getExecutionObservable()
由子类来提供的。除此之外还需关心注册在Observable
上的其它操作符:
doOnTerminate
:当线程停止时(不管正常停or异常停)。作用是改变线程状态为STARTED
等doOnUnsubscribe
:当取消订阅时会执行。作用基本同上subscribeOn
:重要。它决定了数据发射在哪个线程里执行,是线程池调度的入口:
AbstractCommand:
// 核心是threadPool.getScheduler()获取到一个Scheduler
.subscribeOn(threadPool.getScheduler(() -> {
properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT
}));
关于Hystrix
是如何去调用线程池资源执行目标方法的,具体详情可参见这篇文章Hystrix执行目标方法时,如何调用线程池资源?
SEMAPHORE信号量隔离下执行
当隔离模式选择信号量隔离时,那就执行如下逻辑:
AbstractCommand:
return Observable.defer(() -> {
// 检测线程状态。记录线程执行(不管成功与否)
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
// 一样的,标记线程开始执行了。隔离模式是:SEMAPHORE哦
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// 不解释
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
});
信号量下隔离执行的逻辑几乎完全同线程池方式。不同的是它更加的轻量:不需要线程调度,因此也就不需要使用subscribeOn()
调度线程,也不需要使用doOnTerminate/doOnUnsubscribe
等去还原线程状态了。
总结
关于Hystrix目标方法执行逻辑源码解读方面就介绍到这了。
此处有个小知识点:在Command目标开始执行的时候,调用了HystrixCommandMetrics#markCommandStart()
方法,当结束的时候会自动调用其HystrixCommandMetrics#markCommandDone()
方法,只是结束方法的调用时机在入口方法toObservable()
处管理着,这将在后文会再次提起,敬请关注。
声明
原创不易,码字不易,多谢你的点赞、收藏、关注。把本文分享到你的朋友圈是被允许的,但拒绝抄袭
。你也可【左边扫码/或加wx:fsx641385712】邀请你加入我的 Java高工、架构师 系列群大家庭学习和交流。
- [享学Netflix] 一、Apache Commons Configuration:你身边的配置管理专家
- [享学Netflix] 二、Apache Commons Configuration事件监听机制及使用ReloadingStrategy实现热更新
- [享学Netflix] 三、Apache Commons Configuration2.x全新的事件-监听机制
- [享学Netflix] 四、Apache Commons Configuration2.x文件定位系统FileLocator和FileHandler
- [享学Netflix] 五、Apache Commons Configuration2.x别样的Builder模式:ConfigurationBuilder
- [享学Netflix] 六、Apache Commons Configuration2.x快速构建工具Parameters和Configurations
- [享学Netflix] 七、Apache Commons Configuration2.x如何实现文件热加载/热更新?
- [享学Netflix] 八、Apache Commons Configuration2.x相较于1.x使用上带来哪些差异?
- [享学Netflix] 九、Archaius配置管理库:初体验及基础API详解
- [享学Netflix] 十、Archaius对Commons Configuration核心API Configuration的扩展实现
- [享学Netflix] 十一、Archaius配置管理器ConfigurationManager和动态属性支持DynamicPropertySupport
- [享学Netflix] 十二、Archaius动态属性DynamicProperty原理详解(重要)
- [享学Netflix] 十三、Archaius属性抽象Property和PropertyWrapper详解
- [享学Netflix] 十四、Archaius如何对多环境、多区域、多云部署提供配置支持?
- [享学Netflix] 十五、Archaius和Spring Cloud的集成:spring-cloud-starter-netflix-archaius
- [享学Netflix] 十六、Hystrix断路器:初体验及RxJava简介
- [享学Netflix] 十七、Hystrix属性抽象以及和Archaius整合实现配置外部化、动态化
- [享学Netflix] 十八、Hystrix配置之:全局配置和实例配置
- [享学Netflix] 十九、Hystrix插件机制:SPI接口介绍和HystrixPlugins详解
- [享学Netflix] 二十、Hystrix跨线程传递数据解决方案:HystrixRequestContext
- [享学Netflix] 二十一、Hystrix指标数据收集(预热):滑动窗口算法(附代码示例)
- [享学Netflix] 二十二、Hystrix事件源与事件流:HystrixEvent和HystrixEventStream
- [享学Netflix] 二十三、Hystrix桶计数器:BucketedCounterStream
- [享学Netflix] 二十四、Hystrix在滑动窗口内统计:BucketedRollingCounterStream、HealthCountsStream
- [享学Netflix] 二十五、Hystrix累计统计流、分发流、最大并发流、配置流、功能流(附代码示例)
- [享学Netflix] 二十六、Hystrix指标数据收集器:HystrixMetrics(HystrixDashboard的数据来源)
- [享学Netflix] 二十七、Hystrix何为断路器的半开状态?HystrixCircuitBreaker详解
- [享学Netflix] 二十八、Hystrix事件计数器EventCounts和执行结果ExecutionResult
- [享学Netflix] 二十九、Hystrix执行过程核心接口:HystrixExecutable、HystrixObservable和HystrixInvokableInfo
- [享学Netflix] 三十、Hystrix的fallback回退/降级逻辑源码解读:getFallbackOrThrowException
- [享学Netflix] 三十一、Hystrix触发fallback降级逻辑的5种情况及代码示例
- [享学Netflix] 三十二、Hystrix抛出HystrixBadRequestException异常为何不会触发熔断?
- [享学Netflix] 三十三、Hystrix执行目标方法时,如何调用线程池资源?