自定义Filter
我们先手动定义一个Filter,用来统计服务端每个接口的执行时间。
- 实现Filter接口
- 在resources/META-INF/dubbo文件夹下新建org.apache.dubbo.rpc.Filter文件
- 在org.apache.dubbo.rpc.Filter文件中写上Filter的路径
@Slf4j
@Activate(group = PROVIDER)
public class CostFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
long start = System.currentTimeMillis();
Result result = invoker.invoke(invocation);
long cost = System.currentTimeMillis() - start;
log.info("request cost " + invocation.getMethodName() + " " + cost);
return result;
}
}
org.apache.dubbo.rpc.Filter文件中的内容
cost=com.javashitang.producer.conf.CostFilter
日志输出
INFO c.j.producer.conf.CostFilter - request cost interface com.javashitang.api.service.UserService hello 5
INFO c.j.producer.conf.CostFilter - request cost interface com.javashitang.api.service.UserService hello 0
Filter是如何工作的?
Web开发的时候我们经常和Servlet Filter(过滤器)和 Spring MVC Interceptor(拦截器)打交道,在一个请求前后做一些增加的操作,过滤器和拦截器是用责任链模式实现的
Dubbo Filter同样是对请求的过程进行增强,不过和Servlet Filter不同的是,Dubbo Filter是基于装饰者模式实现的。Dubbo的开发者是真爱用装饰者模式
Filter在服务提供方是怎样起作用的?
服务导出时需要根据具体的协议导出Invoker,导出的时候。由于Dubbo Aop的作用,导出的过程会执行ProtocolFilterWrapper#export方法,最初的Invoker被不断装饰
// ProtocolFilterWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 暴露远程服务,registry协议不会执行filter链
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
最初的Invoke被不断装饰,服务提供者和调用者都是用这个方法来装饰最初的Invoker
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 获取自动激活的扩展类
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = filter.invoke(next, invocation);
if (result instanceof AsyncRpcResult) {
AsyncRpcResult asyncResult = (AsyncRpcResult) result;
asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
return asyncResult;
} else {
return filter.onResponse(result, invoker, invocation);
}
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
从服务端调用的过程可以看到具体的装饰者,一个装饰执行一个Filter
大概就是这样的
Filter在服务调用方是怎样起作用的?
服务调用方也是在ProtocolFilterWrapper中对原始的Invoker进行装饰,用装饰类来执行Filter
实际调用的时候先经过了多个Filter最终才调用到了DubboInvoker#doInvoke方法
Dubbo中常见的过滤器
常用过滤器 | 介绍 | 使用方 |
---|---|---|
ActiveLimitFilter | 限制消费端对服务端最大并行调用数 | 消费端 |
ExecuteLimitFilter | 限制服务端的最大并行调用数 | 服务端 |
AccessLogFilter | 打印请求的访问日志 | 服务端 |
TokenFilter | 服务端提供令牌给消费端,防止消费端绕过注册中心直接调用服务端 | 服务端 |
MonitorFilter | 监控并统计所有接口的调用情况,如成功,失败,耗时,后续把数据发送到Dubbo-Monitor服务器上 | 消费端+服务端 |
分析几个常见的过滤器把,套路都差不多
ActiveLimitFilter
限制消费端对服务端最大并行调用数
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 获取设置的actives的值
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (!count.beginCount(url, methodName, max)) {
// 超过并发限制,超时时间默认为0
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
synchronized (count) {
while (!count.beginCount(url, methodName, max)) {
try {
// 等待过程中会被notify(),如果等待了remain毫秒,则下面一定会抛出异常
count.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
// 超时了还不能正常调用,抛出异常
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()
+ ". max concurrent invoke limit: " + max);
}
}
}
}
// 没有超过并发限制
boolean isSuccess = true;
long begin = System.currentTimeMillis();
try {
return invoker.invoke(invocation);
} catch (RuntimeException t) {
isSuccess = false;
throw t;
} finally {
count.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
if (max > 0) {
synchronized (count) {
count.notifyAll();
}
}
}
}
}