目录
一、简介
这篇文章总结hystrix链路跟踪器的实现
二、思路
上一篇文章总结了feign的链路跟踪器的实现,同样的,这里也可以用AOP来拦截,除此之外,hystrix还有一些自身的特点
1、需要从http中获取链路信息,由于是新建的线程需要特殊处理,可以参看《springCloud微服务系列——OAuth2+JWT模式下的feign+hystrix处理》
2、处理结束后,需要销毁上下文信息
三、示例代码
@Component
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private static final Logger logger = LoggerFactory.getLogger(RequestAttributeHystrixConcurrencyStrategy.class);
private HystrixConcurrencyStrategy delegate;
public RequestAttributeHystrixConcurrencyStrategy() {
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {
// Welcome to singleton hell...
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance()
.registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
}
catch (Exception e) {
logger.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
}
}
private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher,
HystrixPropertiesStrategy propertiesStrategy) {
if (logger.isDebugEnabled()) {
logger.debug("Current Hystrix plugins configuration is ["
+ "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
+ eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
logger.debug("Registering Sleuth Hystrix Concurrency Strategy.");
}
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
if (callable instanceof HttpRequestWrappedCallable) {
return callable;
}
Callable<T> wrappedCallable = this.delegate != null
? this.delegate.wrapCallable(callable) : callable;
if (wrappedCallable instanceof HttpRequestWrappedCallable) {
return wrappedCallable;
}
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return new HttpRequestWrappedCallable<>(callable, requestAttributes);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}
static class HttpRequestWrappedCallable<T> implements Callable<T> {
private final Callable<T> target;
private final RequestAttributes requestAttributes;
public HttpRequestWrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
this.target = target;
this.requestAttributes = requestAttributes;
}
@Override
public T call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
return target.call();
}
finally {
RequestContextHolder.resetRequestAttributes();
}
}
}
}
@Slf4j
@Aspect
@Component
public class HystrixTracker extends GenericTracker implements Tracker<TraceHolder> {
@Value("${spring.profiles.active:default}")
private String profile;
@Autowired
private TraceClient traceClient;
/**
* <p>Title: </p>
* <p>Description: </p>
* @param traceClient
*/
public HystrixTracker(TraceClient traceClient) {
super(traceClient);
this.traceClient = traceClient;
}
@Around(value = "@annotation(trace)")
public Object proceed(ProceedingJoinPoint joinPoint, Trace trace) throws Throwable {
if(!trace.value().getName().equals(this.getClass().getName())) {
return joinPoint.proceed();
}
TraceHolder traceHolder = new TraceHolder();
Object result = null;
try {
log.info("hystrix tracker");
Gson gson = new Gson();
String targetName = joinPoint.getTarget().getClass().getName();
Signature signature = joinPoint.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
Method method = methodSignature.getMethod();
String methodName = method.getName();
GetMapping getMapping = method.getAnnotation(GetMapping.class);
PostMapping postMapping = method.getAnnotation(PostMapping.class);
PutMapping putMapping = method.getAnnotation(PutMapping.class);
DeleteMapping deleteMapping = method.getAnnotation(DeleteMapping.class);
RequestMapping requestMapping = method.getAnnotation(RequestMapping.class);
String serverHost = "";
if(getMapping != null) {
serverHost = StringUtils.join(getMapping.value(), "/");
}
else if(postMapping != null) {
serverHost = StringUtils.join(postMapping.value(), "/");
}
else if(putMapping != null) {
serverHost = StringUtils.join(putMapping.value(), "/");
}
else if(deleteMapping != null) {
serverHost = StringUtils.join(deleteMapping.value(), "/");
}
else if(requestMapping != null) {
serverHost = StringUtils.join(requestMapping.value(), "/");
}
Map<String, Object> requestMap = new HashMap<String, Object>();
List<String> requestList = new ArrayList<String>();
Object[] args = joinPoint.getArgs();
for(Object arg : args) {
requestList.add(arg.toString());
}
requestMap.put("params", requestList);
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
traceHolder.setProfile(profile);
traceHolder.setRpcType(RpcTypeEnum.HTTP.name());
traceHolder.setServiceCategory("hystrix");
traceHolder.setServiceName("");
traceHolder.setMethodName(methodName);
traceHolder.setRequestParam(gson.toJson(requestMap));
traceHolder.setServiceHost("");
traceHolder.setClientHost(HostUtil.getIP(request));
Optional.ofNullable(
request.getAttribute(TraceInfo.TRACE_ID_KEY)
).ifPresent(traceId -> {
traceHolder.setTraceId((String) traceId);
});
Optional.ofNullable(
request.getAttribute(TraceInfo.RPC_ID_KEY)
).ifPresent(rpcId -> {
traceHolder.setRpcId(((String) rpcId)+".1");
});
TraceContext.init();
preHandle(traceHolder);
TraceInfo traceInfo = TraceContext.getTraceInfo();
traceInfo.setRootRpcId(traceHolder.getRpcId());
TraceContext.putContext(traceHolder.getTraceId(), traceInfo);
request.setAttribute(TraceInfo.TRACE_ID_KEY, traceHolder.getEntity().getTraceId());
request.setAttribute(TraceInfo.RPC_ID_KEY, traceHolder.getEntity().getRpcId());
result = joinPoint.proceed();
traceHolder.getEntity().setResponseInfo(result.toString());
postHandle(traceHolder);
} catch (Exception e) {
log.error(e.getMessage(), e);
exceptionHandle(traceHolder, e);
} finally {
TraceContext.removeTraceInfo();
MDC.remove(TraceInfo.TRACE_ID_KEY);
MDC.remove(TraceInfo.RPC_ID_KEY);
}
return result;
}
}