在微服务世界中,为了满足客户端请求,一个微服务可能需要与其他微服务进行通信。我们应该尽量减少对其他微服务的这种直接依赖,但在某些情况下它是不可避免的。如果微服务中断或无法正常运行,则问题可能会级联到上游服务。Netflix创建了Hystrix库,实现了Circuit Breaker模式以解决这些问题。我们可以使用Spring Cloud Netflix Hystrix断路器来保护微服务免受级联故障的影响。
使用Spring Boot和Spring Cloud的MicroServices
- 第1部分:MicroServices:Spring Boot和Spring Cloud概述
- 第2部分:MicroServices:使用Spring Cloud Config和Vault进行配置管理
- 第3部分:MicroServices:Spring Cloud Service Registry and Discovery
- 第4部分:MicroServices:使用Netflix Hystrix的Spring Cloud断路器
- 第5部分:MicroServices:Spring Cloud Zuul Proxy作为API网关
- 第6部分:MicroServices:使用Spring Cloud Sleuth和Zipkin进行分布式跟踪
在这篇文章中,我们将学习:
- 使用@HystrixCommand实现断路器模式
- 如何传播ThreadLocal变量
- 使用Hystrix仪表板监控断路器
实施Netflix Hystrix断路器模式
从目录服务,我们在库存服务上调用REST端点以获得产品的库存水平。如果库存服务下降怎么办?如果库存服务需要很长时间才能响应,从而根据它减慢所有服务的速度怎么办?我们希望有一些超时并实现一些回退机制。
将Hystrix启动程序添加到目录服务。
1
2
3
4
|
<
dependency
>
<
groupId
>org.springframework.cloud</
groupId
>
<
artifactId
>spring-cloud-starter-netflix-hystrix</
artifactId
>
</
dependency
>
|
要启用Circuit Breaker,请在catalog-service入口点类上添加@EnableCircuitBreaker注释。
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import
org.springframework.boot.SpringApplication;
import
org.springframework.boot.autoconfigure.SpringBootApplication;
import
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import
org.springframework.cloud.client.loadbalancer.LoadBalanced;
import
org.springframework.context.annotation.Bean;
import
org.springframework.web.client.RestTemplate;
@EnableCircuitBreaker
@SpringBootApplication
public
class
CatalogServiceApplication {
@Bean
@LoadBalanced
public
RestTemplate restTemplate() {
return
new
RestTemplate();
}
public
static
void
main(String[] args) {
SpringApplication.run(CatalogServiceApplication.
class
, args);
}
}
|
现在我们可以在任何我们想要应用超时和回退方法的方法上使用@HystrixCommand注释。
让我们创建InventoryServiceClient.java,它将调用库存服务REST端点并将@HystrixCommand应用于回退实现。
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
import
com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import
com.sivalabs.catalogservice.web.models.ProductInventoryResponse;
import
lombok.extern.slf4j.Slf4j;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.http.HttpStatus;
import
org.springframework.http.ResponseEntity;
import
org.springframework.stereotype.Service;
import
org.springframework.web.client.RestTemplate;
import
java.util.Optional;
@Service
@Slf4j
public
class
InventoryServiceClient {
private
final
RestTemplate restTemplate;
@Autowired
public
InventoryServiceClient(RestTemplate restTemplate) {
this
.restTemplate = restTemplate;
}
@HystrixCommand
(fallbackMethod =
"getDefaultProductInventoryByCode"
)
public
Optional<ProductInventoryResponse> getProductInventoryByCode(String productCode)
{
ResponseEntity<ProductInventoryResponse> itemResponseEntity =
ProductInventoryResponse.
class
,
productCode);
if
(itemResponseEntity.getStatusCode() == HttpStatus.OK) {
return
Optional.ofNullable(itemResponseEntity.getBody());
}
else
{
log.error(
"Unable to get inventory level for product_code: "
+ productCode +
", StatusCode: "
+ itemResponseEntity.getStatusCode());
return
Optional.empty();
}
}
@SuppressWarnings
(
"unused"
)
Optional<ProductInventoryResponse> getDefaultProductInventoryByCode(String productCode) {
log.info(
"Returning default ProductInventoryByCode for productCode: "
+productCode);
ProductInventoryResponse response =
new
ProductInventoryResponse();
response.setProductCode(productCode);
response.setAvailableQuantity(
50
);
return
Optional.ofNullable(response);
}
}
|
1
2
3
4
五
6
7
|
import
lombok.Data;
@Data
public
class
ProductInventoryResponse {
private
String productCode;
private
int
availableQuantity;
}
|
我们使用@HystrixCommand( fallbackMethod =“getDefaultProductInventoryByCode”)对我们进行REST调用的方法进行了注释,这样如果它在特定时间限制内没有收到响应,则调用将超时并调用配置的回退方法。回退方法应该在同一个类中定义,并且应该具有相同的签名。在fallback方法getDefaultProductInventoryByCode()中,我们将availableQuantity设置为50,显然,这种行为取决于业务需求。
我们可以通过使用@HystrixProperty注释配置属性来自定义@HystrixCommand默认行为。
1
2
3
4
五
6
7
8
9
10
|
@HystrixCommand
(fallbackMethod =
"getDefaultProductInventoryByCode"
,
commandProperties = {
@HystrixProperty
(name =
"execution.isolation.thread.timeoutInMilliseconds"
, value =
"3000"
),
@HystrixProperty
(name =
"circuitBreaker.errorThresholdPercentage"
, value=
"60"
)
}
)
public
Optional<ProductInventoryResponse> getProductInventoryByCode(String productCode)
{
....
}
|
我们可以在bootstrap.properties/yml文件中配置它们,而不是在代码中配置这些参数值,如下所示。
1
2
|
hystrix.command.getProductInventoryByCode.execution.isolation.thread.timeoutInMilliseconds=2000
hystrix.command.getProductInventoryByCode.circuitBreaker.errorThresholdPercentage=60
|
请注意,我们使用方法名称作为 commandKey,这是默认行为。我们可以自定义commandKey名称,如下所示:
1
2
3
4
五
|
@HystrixCommand
(commandKey =
"inventory-by-productcode"
, fallbackMethod =
"getDefaultProductInventoryByCode"
)
public
Optional<ProductInventoryResponse> getProductInventoryByCode(String productCode)
{
...
}
|
1
2
|
hystrix.command.inventory-by-productcode.execution.isolation.thread.timeoutInMilliseconds=2000
hystrix.command.inventory-by-productcode.circuitBreaker.errorThresholdPercentage=60
|
您可以在此处找到所有可用的配置选项https://github.com/Netflix/Hystrix/wiki/Configuration。
如何传播ThreadLocal变量
默认情况下,带有@HystrixCommand的方法将在不同的线程上执行,因为默认的execution.isolation.strategy是ExecutionIsolationStrategy.THREAD。因此,我们在调用之前设置的ThreadLocal变量@HystrixCommand方法不会中可用@HystrixCommand方法。
使ThreadLocal变量可用的一个选项是使用execution.isolation.strategy = SEMAPHORE。
1
2
3
4
五
6
7
8
9
|
@HystrixCommand
(fallbackMethod =
"getDefaultProductInventoryByCode"
,
commandProperties = {
@HystrixProperty
(name=
"execution.isolation.strategy"
, value=
"SEMAPHORE"
)
}
)
public
Optional<ProductInventoryResponse> getProductInventoryByCode(String productCode)
{
...
}
|
如果将属性execution.isolation.strategy设置为SEMAPHORE,则Hystrix将使用信号量而不是线程来限制调用该命令的并发父线程数。您可以在此处阅读有关隔离如何工作的更多信息https://github.com/Netflix/Hystrix/wiki/How-it-Works#isolation。
在Hystrix命令方法中使ThreadLocal变量可用的另一个选项是实现我们自己的HystrixConcurrencyStrategy。
假设您要将一些CorrelationId集传播为ThreadLocal变量。
1
2
3
4
五
6
7
8
9
10
11
|
public
class
MyThreadLocalsHolder {
private
static
final
ThreadLocal<String> CORRELATION_ID =
new
ThreadLocal();
public
static
void
setCorrelationId(String correlationId) {
CORRELATION_ID.set(correlationId);
}
public
static
String getCorrelationId() {
return
CORRELATION_ID.get();
}
}
|
让我们实现自己的HystrixConcurrencyStrategy。
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
31
32
33
34
|
@Component
@Slf4j
public
class
ContextCopyHystrixConcurrencyStrategy
extends
HystrixConcurrencyStrategy {
public
ContextCopyHystrixConcurrencyStrategy() {
HystrixPlugins.getInstance().registerConcurrencyStrategy(
this
);
}
@Override
public
<T> Callable<T> wrapCallable(Callable<T> callable) {
return
new
MyCallable(callable, MyThreadLocalsHolder.getCorrelationId());
}
public
static
class
MyCallable<T>
implements
Callable<T> {
private
final
Callable<T> actual;
private
final
String correlationId;
public
MyCallable(Callable<T> callable, String correlationId) {
this
.actual = callable;
this
.correlationId = correlationId;
}
@Override
public
T call()
throws
Exception {
MyThreadLocalsHolder.setCorrelationId(correlationId);
try
{
return
actual.call();
}
finally
{
MyThreadLocalsHolder.setCorrelationId(
null
);
}
}
}
}
|
现在,您可以在调用Hystrix命令之前设置CorrelationId,并在Hystrix命令中访问CorrelationId。
ProductService.java
1
2
3
4
五
6
7
8
9
10
11
12
|
public
Optional<Product> findProductByCode(String code)
{
....
String correlationId = UUID.randomUUID().toString();
MyThreadLocalsHolder.setCorrelationId(correlationId);
log.info(
"Before CorrelationID: "
+ MyThreadLocalsHolder.getCorrelationId());
Optional<ProductInventoryResponse> responseEntity = inventoryServiceClient.getProductInventoryByCode(code);
...
log.info(
"After CorrelationID: "
+ MyThreadLocalsHolder.getCorrelationId());
....
}
|
InventoryServiceClient.java
1
2
3
4
五
6
|
@HystrixCommand
(fallbackMethod =
"getDefaultProductInventoryByCode"
)
public
Optional<ProductInventoryResponse> getProductInventoryByCode(String productCode)
{
...
log.info(
"CorrelationID: "
+ MyThreadLocalsHolder.getCorrelationId());
}
|
这只是如何将数据传播到Hystrix命令的一个示例。类似地,我们可以传递当前HTTP请求中可用的任何数据,比如说使用Spring组件,如RequestContextHolder等。
Jakub Narloch撰写了一篇很好的文章,介绍了如何传播Request Context,甚至还创建了一个Spring Boot启动器。请查看他的博客https://jmnarloch.wordpress.com/2016/07/06/spring-boot-hystrix-and-threadlocals/和GitHub Repo https://github.com/jmnarloch/hystrix-context-spring-启动启动器。
使用Hystrix仪表板监控断路器
一旦我们将Hystrix启动器添加到目录服务,我们就可以使用Actuator端点http:// localhost:8181 / actuator / hystrix.stream将电路状态作为事件流获取,假设目录服务在8181端口上运行。
Spring Cloud还提供了一个很好的仪表板来监控Hystrix命令的状态。
使用Hystrix Dashboard starter 创建Spring Boot应用程序,并使用@EnableHystrixDashboard注释主入口点类。
1
2
3
4
|
<
dependency
>
<
groupId
>org.springframework.cloud</
groupId
>
<
artifactId
>spring-cloud-starter-netflix-hystrix-dashboard</
artifactId
>
</
dependency
>
|
假设我们在8788端口上运行Hystrix Dashboard,然后转到http:// localhost:8788 / hystrix查看仪表板。
现在,在Hystrix Dashboard主页中输入http:// localhost:8181 / actuator / hystrix.stream作为流URL,并将Catalog Service作为Title并单击Monitor Stream按钮。
现在调用内部调用库存服务REST端点的目录服务REST端点,您可以看到电路状态以及调用成功的次数和发生的故障数等。
我们可以使用Turbine在单个仪表板中提供所有服务的统一视图,而不是为每个服务提供单独的仪表板。有关详细信息,请参阅http://cloud.spring.io/spring-cloud-static/Finchley.M7/single/spring-cloud.html#_turbine。
您可以在https://github.com/sivaprasadreddy/spring-boot-microservices-series找到本文的源代码。