温馨提示
个人觉得,看源码最好的方式就是看源码中写的测试代码
测试源码分析
org.apache.dubbo.rpc.cluster.directory.StaticDirectoryTest#testStaticDirectory
public void testStaticDirectory() {
// 第一步: 路由服务Router
Router router = new ConditionRouterFactory().getRouter(getRouteUrl(" => " + " host = " + NetUtils.getLocalHost()));
List<Router> routers = new ArrayList<Router>();
routers.add(router);
List<Invoker<String>> invokers = new ArrayList<Invoker<String>>();
Invoker<String> invoker1 = new MockInvoker<String>(URL.valueOf("dubbo://10.20.3.3:20880/com.foo.BarService"));
Invoker<String> invoker2 = new MockInvoker<String>(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880/com.foo.BarService"));
Invoker<String> invoker3 = new MockInvoker<String>(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880/com.foo.BarService"));
invokers.add(invoker1);
invokers.add(invoker2);
invokers.add(invoker3);
List<Invoker<String>> filteredInvokers = router.route(invokers, URL.valueOf("consumer://" + NetUtils.getLocalHost() + "/com.foo.BarService"), new RpcInvocation());
StaticDirectory<String> staticDirectory = new StaticDirectory<>(filteredInvokers);
boolean isAvailable = staticDirectory.isAvailable();
Assertions.assertTrue(!isAvailable);
List<Invoker<String>> newInvokers = staticDirectory.list(new MockDirInvocation());
Assertions.assertTrue(newInvokers.size() > 0);
staticDirectory.destroy();
Assertions.assertEquals(0, newInvokers.size());
}
分析Router 的设计
Router router = new ConditionRouterFactory().getRouter(getRouteUrl(" => " + " host = " + NetUtils.getLocalHost()));
- 他采用了工厂设计模式,通过参数进行适配工厂。 在我们实际工作中也常用这个设备方式来设计自己的框架代码。
接下来我们分源码工厂层
- 定义工厂接口
@SPI
public interface RouterFactory {
/**
* Create router.
* Since 2.7.0, most of the time, we will not use @Adaptive feature, so it's kept only for compatibility.
*
* @param url url
* @return router instance
*/
@Adaptive("protocol")
Router getRouter(URL url);
}
工厂接口下面有很多实现,我们目前分析ConditionRouterFactory工厂,根据条件路由工厂
public class ConditionRouterFactory implements RouterFactory {
public static final String NAME = "condition";
/**
* ConditionRouter(url)
* @param url url
* @return
*/
@Override
public Router getRouter(URL url) {
return new ConditionRouter(url);
}
}
public ConditionRouter(URL url) {
this.url = url; // @1
this.priority = url.getParameter(Constants.PRIORITY_KEY, 0); //@2
this.force = url.getParameter(Constants.FORCE_KEY, false); // @3
this.enabled = url.getParameter(Constants.ENABLED_KEY, true); // @4
init(url.getParameterAndDecoded(Constants.RULE_KEY)); // @5
}
- @1 保存url地址
- @2 @3 @4 从url中获取参数, 如果没有就直接默认值。
- @5 接下来我们重点分析,该init方法干了什么获取到从参数中获取路由规则是如何执行路由规则
public void init(String rule) {
try {
// 获取路由规则
if (rule == null || rule.trim().length() == 0) {
throw new IllegalArgumentException("Illegal route rule!");
}
// => host = 10.8.80.53
rule = rule.replace("consumer.", "").replace("provider.", "");
// i = 1
int i = rule.indexOf("=>");
// 把前面的空格去掉
String whenRule = i < 0 ? null : rule.substring(0, i).trim();
// 获取分割后的值 => host = 10.8.80.53
String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
// 创建一个空的map new HashMap<String, MatchPair>()
Map<String, MatchPair> when = StringUtils.isBlank(whenRule) ||
"true".equals(whenRule) ?
new HashMap<String, MatchPair>() :
parseRule(whenRule);
// StringUtils.isBlank(thenRule) = false
/**
* Map<String, MatchPair>
* map
* key = host
* key = host value = ip
*/
Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
// NOTE: It should be determined on the business level whether the `When condition` can be empty or not.
this.whenCondition = when;
this.thenCondition = then;
} catch (ParseException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
private static Map<String, MatchPair> parseRule(String rule)
throws ParseException {
Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
if (StringUtils.isBlank(rule)) {
return condition;
}
// Key-Value pair, stores both match and mismatch conditions
MatchPair pair = null;
// Multiple values
Set<String> values = null;
// 做正则验证
final Matcher matcher = ROUTE_PATTERN.matcher(rule);
while (matcher.find()) { // Try to match one by one
String separator = matcher.group(1);
String content = matcher.group(2);
// Start part of the condition expression.
if (StringUtils.isEmpty(separator)) {
pair = new MatchPair();
condition.put(content, pair);
}
// The KV part of the condition expression
else if ("&".equals(separator)) {
if (condition.get(content) == null) {
pair = new MatchPair();
condition.put(content, pair);
} else {
pair = condition.get(content);
}
}
// The Value in the KV part.
else if ("=".equals(separator)) {
if (pair == null) {
throw new ParseException("Illegal route rule \""
+ rule + "\", The error char '" + separator
+ "' at index " + matcher.start() + " before \""
+ content + "\".", matcher.start());
}
values = pair.matches;
values.add(content);
}
// The Value in the KV part.
else if ("!=".equals(separator)) {
if (pair == null) {
throw new ParseException("Illegal route rule \""
+ rule + "\", The error char '" + separator
+ "' at index " + matcher.start() + " before \""
+ content + "\".", matcher.start());
}
values = pair.mismatches;
values.add(content);
}
// The Value in the KV part, if Value have more than one items.
else if (",".equals(separator)) { // Should be separated by ','
if (values == null || values.isEmpty()) {
throw new ParseException("Illegal route rule \""
+ rule + "\", The error char '" + separator
+ "' at index " + matcher.start() + " before \""
+ content + "\".", matcher.start());
}
values.add(content);
} else {
throw new ParseException("Illegal route rule \"" + rule
+ "\", The error char '" + separator + "' at index "
+ matcher.start() + " before \"" + content + "\".", matcher.start());
}
}
return condition;
}
- 解析到key和value
key = host
key = host , value = value
添加
List<Invoker<String>> invokers = new ArrayList<Invoker<String>>();
Invoker<String> invoker1 = new MockInvoker<String>(URL.valueOf("dubbo://10.20.3.3:20880/com.foo.BarService"));
Invoker<String> invoker2 = new MockInvoker<String>(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880/com.foo.BarService"));
Invoker<String> invoker3 = new MockInvoker<String>(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880/com.foo.BarService"));
invokers.add(invoker1);
invokers.add(invoker2);
invokers.add(invoker3);
List<Invoker<String>> filteredInvokers = router.route(invokers, URL.valueOf("consumer://" + NetUtils.getLocalHost() + "/com.foo.BarService"), new RpcInvocation());
- 添加 filter 表示只需要符合添加
StaticDirectory<String> staticDirectory = new StaticDirectory<>(filteredInvokers); // @1
- @1 标识本地目录
Directory
AbstractDirectory
StaticDirectory 静态目录
RegistryDirectory 动态目录
implements NotifyListener 时间
StaticDirectory
- getInterface() 获取接口
- isAvailable() 是否可用
- destroy() 销毁
- doList() // 获取服务对应的地址
代码分析
- step 1 判断
public boolean isAvailable() {
if (isDestroyed()) {
return false;
}
for (Invoker<T> invoker : invokers) {
// 拉去列表的时候,会携带该信息
if (invoker.isAvailable()) {
return true;
}
}
return false;
}
- 获取列表
List<Invoker<String>> newInvokers = staticDirectory.list(new MockDirInvocation());
- 获取列表
/**
* 暴漏出去的方法
* @param invocation
* @return
* @throws RpcException
*/
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
// 模板方法,由子类实现
return doList(invocation);
}
- 获取列表
@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
// 这种设
List<Invoker<T>> finalInvokers = invokers;
if (routerChain != null) {
try {
// 找到服务路由 获取runtime 参数, 并根据参数决定是否进行路由
finalInvokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
return finalInvokers == null ? Collections.emptyList() : finalInvokers;
}
// 销毁目录
staticDirectory.destroy();
-
模板设计模式 + 责任链设计模式 在Dubbo中是如何玩的
// inalInvokers = routerChain.route(getConsumerUrl(), invocation);
public abstract class AbstractDirectory<T> implements Directory<T> {
/**
* 暴漏出去的方法
* @param invocation
* @return
* @throws RpcException
*/
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException { // @1
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
// 模板方法,由子类实现
return doList(invocation);
}
//
protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException; //@2
}
- @1 提供标准模式方法
- @2 由子类去实现
责任链设计模式
protected RouterChain<T> routerChain; // 保存路由链信息
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
// 这种设
List<Invoker<T>> finalInvokers = invokers;
if (routerChain != null) {
try {
// 找到服务路由 获取runtime 参数, 并根据参数决定是否进行路由
finalInvokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
return finalInvokers == null ? Collections.emptyList() : finalInvokers;
}
private volatile List<Router> routers = Collections.emptyList();
public List<Invoker<T>> route(URL url, Invocation invocation) {
// 获取
List<Invoker<T>> finalInvokers = invokers;
for (Router router : routers) {
finalInvokers = router.route(finalInvokers, url, invocation);
}
return finalInvokers;
}
下一节介绍动态路由