之前的文章说了一下JWT模式下feign调用其他服务的时候,怎么通过一个方法使得http请求头的信息传递到被调服务中。当然也可以用@RequestHeader注解,但是这样并不是太好,因为我们希望有一个全局的处理。这些问题可以参看之前的文章。
《springCloud微服务系列——OAuth2+JWT模式下的swagger+feign处理》
这篇文章将总结的问题是,同样基于feign模式,但是开启了hystrix,那么之前的方式传递http请求头就已经失效了。这里将介绍如何解决这个问题。
一、hystrix源码剖析-追根溯源
为什么开启了hystrix就不能获取到http请求头信息了呢,做了一定的源码分析。最后总结为下图:
该图展示了开启hystrix后一些比较重要的执行点,hystrix通过命令模式,通过hystrixCommond.execute()进行调用服务。关键是通过future模式和rxJava开启了一个新线程处理请求,最后调用feign的ribbon负载均衡。我们知道,spring mvc为了我们在controller层以外也能获得到http请求,使用了基于localThread的requestContextHolder,在这里面我们可以拿到所有的http请求信息。但是这里新启了一个线程后,当然在之前实现的拦截器中,无法通过requestContextHolder访问到以前的http请求信息了。
这里回顾一下上篇文章中的拦截器,我们看一下获取http请求的方法
private HttpServletRequest getHttpServletRequest() {
try {
return ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
} catch (Exception e) {
return null;
}
}
再来总结一下整个执行过程hystrixCommond.execute()->return queue().get()[新线程]->hystixCommond.run()->feign.ribbon.LoadBalancerFeignClient执行http请求->我们实现的拦截器->getHttpServletRequest->((ServletRequestAttributes)) RequestContextHolder.getRequestAttributes()).getRequest()
整个原因一目了然
二、解决方案
怎么解决问题呢?其实spring cloud自身也有很多地方需要这种跨线程的操作,所以它自己有一个类名为HystrixConcurrencyStrategy。我们可以看到它为spring security提供了一个实现SecurityContextConcurrencyStrategy,为sleuth提供了一个实现SleuthHystrixConcurrencyStrategy。我们要做的就是模仿这两个实现类,实现一个我们自己的HystrixConcurrencyStrategy类。这里就直接上代码了
@Component
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private static final Logger logger = LoggerFactory.getLogger(RequestAttributeHystrixConcurrencyStrategy.class);
private HystrixConcurrencyStrategy delegate;
public RequestAttributeHystrixConcurrencyStrategy() {
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {
// Welcome to singleton hell...
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance()
.registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
}
catch (Exception e) {
logger.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
}
}
private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher,
HystrixPropertiesStrategy propertiesStrategy) {
if (logger.isDebugEnabled()) {
logger.debug("Current Hystrix plugins configuration is ["
+ "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
+ eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
logger.debug("Registering Sleuth Hystrix Concurrency Strategy.");
}
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
if (callable instanceof HttpRequestWrappedCallable) {
return callable;
}
Callable<T> wrappedCallable = this.delegate != null
? this.delegate.wrapCallable(callable) : callable;
if (wrappedCallable instanceof HttpRequestWrappedCallable) {
return wrappedCallable;
}
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return new HttpRequestWrappedCallable<>(callable, requestAttributes);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}
static class HttpRequestWrappedCallable<T> implements Callable<T> {
private final Callable<T> target;
private final RequestAttributes requestAttributes;
public HttpRequestWrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
this.target = target;
this.requestAttributes = requestAttributes;
}
@Override
public T call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
return target.call();
}
finally {
RequestContextHolder.resetRequestAttributes();
}
}
}
}