MicroServices – 第4部分:使用Netflix Hystrix的Spring Cloud断路器

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/cpongo3/article/details/89327700

在微服务世界中,为了满足客户端请求,一个微服务可能需要与其他微服务进行通信。我们应该尽量减少对其他微服务的这种直接依赖,但在某些情况下它是不可避免的。如果微服务中断或无法正常运行,则问题可能会级联到上游服务。Netflix创建了Hystrix库,实现了Circuit Breaker模式以解决这些问题。我们可以使用Spring Cloud Netflix Hystrix断路器来保护微服务免受级联故障的影响。

使用Spring Boot和Spring Cloud的MicroServices

  • 第1部分:MicroServices:Spring Boot和Spring Cloud概述
  • 第2部分:MicroServices:使用Spring Cloud Config和Vault进行配置管理
  • 第3部分:MicroServices:Spring Cloud Service Registry and Discovery
  • 第4部分:MicroServices:使用Netflix Hystrix的Spring Cloud断路器
  • 第5部分:MicroServices:Spring Cloud Zuul Proxy作为API网关
  • 第6部分:MicroServices:使用Spring Cloud Sleuth和Zipkin进行分布式跟踪

在这篇文章中,我们将学习:

  • 使用@HystrixCommand实现断路器模式
  • 如何传播ThreadLocal变量
  • 使用Hystrix仪表板监控断路器

实施Netflix Hystrix断路器模式

目录服务,我们在库存服务上调用REST端点以获得产品的库存水平。如果库存服务下降怎么办?如果库存服务需要很长时间才能响应,从而根据它减慢所有服务的速度怎么办?我们希望有一些超时并实现一些回退机制。

Hystrix启动程序添加到目录服务。

1
2
3
4
< dependency >
     < groupId >org.springframework.cloud</ groupId >
     < artifactId >spring-cloud-starter-netflix-hystrix</ artifactId >
</ dependency >

要启用Circuit Breaker,在catalog-service入口点类上添加@EnableCircuitBreaker注释。

1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
@EnableCircuitBreaker
@SpringBootApplication
public class CatalogServiceApplication {
     @Bean
     @LoadBalanced
     public RestTemplate restTemplate() {
         return new RestTemplate();
     }
     public static void main(String[] args) {
         SpringApplication.run(CatalogServiceApplication. class , args);
     }
}

现在我们可以在任何我们想要应用超时和回退方法的方法上使用@HystrixCommand注释。

让我们创建InventoryServiceClient.java,它将调用库存服务REST端点并将@HystrixCommand应用于回退实现。

1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.sivalabs.catalogservice.web.models.ProductInventoryResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.Optional;
@Service
@Slf4j
public class InventoryServiceClient {
     private final RestTemplate restTemplate;
     @Autowired
     public InventoryServiceClient(RestTemplate restTemplate) {
         this .restTemplate = restTemplate;
     }
     @HystrixCommand (fallbackMethod = "getDefaultProductInventoryByCode" )
     public Optional<ProductInventoryResponse> getProductInventoryByCode(String productCode)
     {
         ResponseEntity<ProductInventoryResponse> itemResponseEntity =
                 restTemplate.getForEntity( "http://inventory-service/api/inventory/{code}" ,
                         ProductInventoryResponse. class ,
                         productCode);
         if (itemResponseEntity.getStatusCode() == HttpStatus.OK) {
             return Optional.ofNullable(itemResponseEntity.getBody());
         } else {
             log.error( "Unable to get inventory level for product_code: " + productCode + ", StatusCode: " + itemResponseEntity.getStatusCode());
             return Optional.empty();
         }
     }
     @SuppressWarnings ( "unused" )
     Optional<ProductInventoryResponse> getDefaultProductInventoryByCode(String productCode) {
         log.info( "Returning default ProductInventoryByCode for productCode: " +productCode);
         ProductInventoryResponse response = new ProductInventoryResponse();
         response.setProductCode(productCode);
         response.setAvailableQuantity( 50 );
         return Optional.ofNullable(response);
     }
}
1
2
3
4
6
7
import lombok.Data;
@Data
public class ProductInventoryResponse {
     private String productCode;
     private int availableQuantity;
}

我们使用@HystrixCommand( fallbackMethod =“getDefaultProductInventoryByCode”)对我们进行REST调用的方法进行了注释,这样如果它在特定时间限制内没有收到响应,则调用将超时并调用配置的回退方法。回退方法应该在同一个类中定义,并且应该具有相同的签名。在fallback方法getDefaultProductInventoryByCode()中,我们将availableQuantity设置为50显然,这种行为取决于业务需求

我们可以通过使用@HystrixProperty注释配置属性来自定义@HystrixCommand默认行为。

1
2
3
4
6
7
8
9
10
@HystrixCommand (fallbackMethod = "getDefaultProductInventoryByCode" ,
     commandProperties = {
        @HystrixProperty (name = "execution.isolation.thread.timeoutInMilliseconds" , value = "3000" ),
        @HystrixProperty (name = "circuitBreaker.errorThresholdPercentage" , value= "60" )
     }
)
public Optional<ProductInventoryResponse> getProductInventoryByCode(String productCode)
{
     ....
}

我们可以在bootstrap.properties/yml文件中配置它们,而不是在代码中配置这些参数值,如下所示。

扫描二维码关注公众号,回复: 5906519 查看本文章
1
2
hystrix.command.getProductInventoryByCode.execution.isolation.thread.timeoutInMilliseconds=2000
hystrix.command.getProductInventoryByCode.circuitBreaker.errorThresholdPercentage=60

请注意,我们使用方法名称作为  commandKey,这是默认行为。我们可以自定义commandKey名称,如下所示:

1
2
3
4
@HystrixCommand (commandKey = "inventory-by-productcode" , fallbackMethod = "getDefaultProductInventoryByCode" )
public Optional<ProductInventoryResponse> getProductInventoryByCode(String productCode)
{
     ...
}
1
2
hystrix.command.inventory-by-productcode.execution.isolation.thread.timeoutInMilliseconds=2000
hystrix.command.inventory-by-productcode.circuitBreaker.errorThresholdPercentage=60

您可以在此处找到所有可用的配置选项https://github.com/Netflix/Hystrix/wiki/Configuration

如何传播ThreadLocal变量

默认情况下,带有@HystrixCommand的方法将在不同的线程上执行,因为默认的execution.isolation.strategyExecutionIsolationStrategy.THREAD。因此,我们在调用之前设置的ThreadLocal变量@HystrixCommand方法不会中可用@HystrixCommand方法。

使ThreadLocal变量可用的一个选项是使用execution.isolation.strategy = SEMAPHORE

1
2
3
4
6
7
8
9
@HystrixCommand (fallbackMethod = "getDefaultProductInventoryByCode" ,
     commandProperties = {
         @HystrixProperty (name= "execution.isolation.strategy" , value= "SEMAPHORE" )
     }
)
public Optional<ProductInventoryResponse> getProductInventoryByCode(String productCode)
{
     ...
}

如果将属性execution.isolation.strategy设置为SEMAPHORE,则Hystrix将使用信号量而不是线程来限制调用该命令的并发父线程数。您可以在此处阅读有关隔离如何工作的更多信息https://github.com/Netflix/Hystrix/wiki/How-it-Works#isolation

在Hystrix命令方法中使ThreadLocal变量可用的另一个选项是实现我们自己的HystrixConcurrencyStrategy

假设您要将一些CorrelationId集传播为ThreadLocal变量。

1
2
3
4
6
7
8
9
10
11
public class MyThreadLocalsHolder {
     private static final ThreadLocal<String> CORRELATION_ID = new ThreadLocal();
     public static void setCorrelationId(String correlationId) {
         CORRELATION_ID.set(correlationId);
     }
     public static String getCorrelationId() {
         return CORRELATION_ID.get();
     }
}

让我们实现自己的HystrixConcurrencyStrategy

1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
31
32
33
34
@Component
@Slf4j
public class ContextCopyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
     public ContextCopyHystrixConcurrencyStrategy() {
         HystrixPlugins.getInstance().registerConcurrencyStrategy( this );
     }
     @Override
     public <T> Callable<T> wrapCallable(Callable<T> callable) {
         return new MyCallable(callable, MyThreadLocalsHolder.getCorrelationId());
     }
     public static class MyCallable<T> implements Callable<T> {
         private final Callable<T> actual;
         private final String correlationId;
         public MyCallable(Callable<T> callable, String correlationId) {
             this .actual = callable;
             this .correlationId = correlationId;
         }
         @Override
         public T call() throws Exception {
             MyThreadLocalsHolder.setCorrelationId(correlationId);
             try {
                 return actual.call();
             } finally {
                 MyThreadLocalsHolder.setCorrelationId( null );
             }
         }
     }
}

现在,您可以在调用Hystrix命令之前设置CorrelationId,并在Hystrix命令中访问CorrelationId。

ProductService.java

1
2
3
4
6
7
8
9
10
11
12
public Optional<Product> findProductByCode(String code)
{
     ....
     String correlationId = UUID.randomUUID().toString();
     MyThreadLocalsHolder.setCorrelationId(correlationId);
     log.info( "Before CorrelationID: " + MyThreadLocalsHolder.getCorrelationId());
     Optional<ProductInventoryResponse> responseEntity = inventoryServiceClient.getProductInventoryByCode(code);
     ...
     log.info( "After CorrelationID: " + MyThreadLocalsHolder.getCorrelationId());
     ....
    
}

InventoryServiceClient.java

1
2
3
4
6
@HystrixCommand (fallbackMethod = "getDefaultProductInventoryByCode" )
public Optional<ProductInventoryResponse> getProductInventoryByCode(String productCode)
{
     ...
     log.info( "CorrelationID: " + MyThreadLocalsHolder.getCorrelationId());
}

这只是如何将数据传播到Hystrix命令的一个示例。类似地,我们可以传递当前HTTP请求中可用的任何数据,比如说使用Spring组件,如RequestContextHolder等。

Jakub Narloch撰写了一篇很好的文章,介绍了如何传播Request Context,甚至还创建了一个Spring Boot启动器。请查看他的博客https://jmnarloch.wordpress.com/2016/07/06/spring-boot-hystrix-and-threadlocals/和GitHub Repo https://github.com/jmnarloch/hystrix-context-spring-启动启动器

使用Hystrix仪表板监控断路器

一旦我们将Hystrix启动器添加到目录服务,我们就可以使用Actuator端点http:// localhost:8181 / actuator / hystrix.stream将电路状态作为事件流获取,假设目录服务在8181端口上运行。

Spring Cloud还提供了一个很好的仪表板来监控Hystrix命令的状态。
使用Hystrix Dashboard starter 创建Spring Boot应用程序,并使用@EnableHystrixDashboard注释主入口点类。

1
2
3
4
< dependency >
     < groupId >org.springframework.cloud</ groupId >
     < artifactId >spring-cloud-starter-netflix-hystrix-dashboard</ artifactId >
</ dependency >

假设我们在8788端口上运行Hystrix Dashboard,然后转到http:// localhost:8788 / hystrix查看仪表板。

Hystrix仪表板

现在,在Hystrix Dashboard主页中输入http:// localhost:8181 / actuator / hystrix.stream作为流URL,并将Catalog Service作为Title并单击Monitor Stream按钮。

现在调用内部调用库存服务REST端点的目录服务REST端点,您可以看到电路状态以及调用成功的次数和发生的故障数等。

Netflix Hystrix断路器

我们可以使用Turbine在单个仪表板中提供所有服务的统一视图,而不是为每个服务提供单独的仪表板。有关详细信息,请参阅http://cloud.spring.io/spring-cloud-static/Finchley.M7/single/spring-cloud.html#_turbine

您可以在https://github.com/sivaprasadreddy/spring-boot-microservices-series找到本文的源代码。

Categories: MicroServices

发表评论 取消回复

placeholder.jpg

电子邮件地址不会被公开。

Name
Email
Website
What's on your mind?

猜你喜欢

转载自blog.csdn.net/cpongo3/article/details/89327700