服务导出之注册到zookeeper
- ServiceConfig#export导出, 层层调用到最后, 就是调用RegistryProtocol#export来进行服务导出,上一篇解释了,也解释了容器的启动;
- 由于我们使用的zookeeper注册中心, 因此dubbo会将服务信息注册到注册中心中;
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker); //
URL providerUrl = getProviderUrl(originInvoker); // dubbo://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-provider-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.40.17&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello&pid=27656&release=2.7.0&side=provider&timeout=3000×tamp=1590735956489
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// export invoker
// 根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务了
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// 得到注册中心-ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 得到存入到注册中心去的providerUrl,会对服务提供者url中的参数进行简化
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
// 将当前服务提供者Invoker,以及该服务对应的注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
//是否需要注册到注册中心
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 注册服务,把简化后的服务提供者url注册到registryUrl中去
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
//监听内容
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
Registry#getRegistry(originInvoker)
目的:获取一个Registry实例对象;
工作:
- 获取registryUrl;
- 通过工厂模式,创建一个Registry对象;
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
private URL getRegistryUrl(Invoker<?> originInvoker) {
// 将registry://xxx?xx=xx®istry=zookeeper 转为 zookeeper://xxx?xx=xx
URL registryUrl = originInvoker.getUrl();
if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
}
return registryUrl;
}
RegistryFactory#getRegistry(registryUrl)
是一个接口;
@SPI("dubbo")
public interface RegistryFactory {
@Adaptive({
"protocol"})
Registry getRegistry(URL url);
}
- ZookeeperRegistryFactory类实现了AbstractRegistryFactory抽象类, 而AbstractRegistryFactory实现了RegistryFactory接口;
- 调用getRegistry方法调用的是ZookeeperRegistryFactory父类AbstractRegistryFactory的getRegistry方法
- AbstractRegistryFactory#getRegistry定义了创建注册中心的一些公共逻辑,而由于注册中心有好几种,因此将注册中心的创建createRegistry逻辑交由子类去实现;这是工厂模式的通常做法;、
- 使用ReentrantLock可重入锁, 保证线程安全, 避免重复创建Registry实例;
@Override
public Registry getRegistry(URL url) {
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
protected abstract Registry createRegistry(URL url);
ZookeeperRegistryFactory#createRegistry
创建了一个ZookeeperRegistry实例;
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
/**
* Invisible injection of zookeeper client via IOC/SPI
* @param zookeeperTransporter
*/
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
创建ZookeeperRegistry实例;
- 调用super(url)去加载一些公共的配置;
- 获取group组名 ,默认值为“dubbo”
- 判断组名是否以“/”开头, 没有则拼接"/"
- Zookeeper注册中心的根节点设置为组名 group;
- 连接注册中心;
- 监听是否连接成功,连接失败,则进行重连;
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
创建完ZookeeperRegistry实例后,层层返回, 返回到获取Registry的地方;
RegistryProtocol#getRegisteredProviderUrl
目的: 简化删除URL中的一些无用数据;
// 得到存入到注册中心去的providerUrl,会对服务提供者url中的参数进行简化
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
工作:
- 判断RegistryProtocol是否配置简化简述Simplified, 默认时false;
- 过滤URL中以“.”开头的key, 以及monitor, bind.ip, bind.port, qos.enable, qos_host, qos_port, qos.accept.foreign.ip, validation,interfaces的key ; 过滤后,剩下的key就是待删除的key;
- 删除URL中 待删除的key;
- 返回
/**
* Return the url that is registered to the registry and filter the url parameter once
* 得到能存入到注册中心去的providerUrl
* @param providerUrl
* @return url to registry.
*/
private URL getRegisteredProviderUrl(final URL providerUrl, final URL registryUrl) {
//The address you see at the registry
// 默认都是走这里
if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
// 移除key以"."开始的参数,以及其他跟服务本身没有关系的参数,比如监控,绑定ip,qosd等等
return providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameters(
MONITOR_KEY, BIND_IP_KEY, BIND_PORT_KEY, QOS_ENABLE, QOS_HOST, QOS_PORT, ACCEPT_FOREIGN_IP, VALIDATION_KEY,
INTERFACES);
} else {
String extraKeys = registryUrl.getParameter(EXTRA_KEYS_KEY, "");
// if path is not the same as interface name then we should keep INTERFACE_KEY,
// otherwise, the registry structure of zookeeper would be '/dubbo/path/providers',
// but what we expect is '/dubbo/interface/providers'
// 如果path不等于interface的值,那么则把INTERFACE_KEY添加到extraKeys中
if (!providerUrl.getPath().equals(providerUrl.getParameter(INTERFACE_KEY))) {
if (StringUtils.isNotEmpty(extraKeys)) {
extraKeys += ",";
}
extraKeys += INTERFACE_KEY;
}
// paramsToRegistry包括了DEFAULT_REGISTER_PROVIDER_KEYS和extraKeys
String[] paramsToRegistry = getParamsToRegistry(DEFAULT_REGISTER_PROVIDER_KEYS
, COMMA_SPLIT_PATTERN.split(extraKeys));
// 生成只含有paramsToRegistry的对应的参数,并且该参数不能为空
return URL.valueOf(providerUrl, paramsToRegistry, providerUrl.getParameter(METHODS_KEY, (String[]) null));
}
}
//过滤掉以"."开头的key, 返回一个数组;
private static String[] getFilteredKeys(URL url) {
Map<String, String> params = url.getParameters();
// 过滤url的参数,找到startsWith(HIDE_KEY_PREFIX)的key
if (CollectionUtils.isNotEmptyMap(params)) {
return params.keySet().stream()
.filter(k -> k.startsWith(HIDE_KEY_PREFIX))
.toArray(String[]::new);
} else {
return new String[0];
}
}
ProviderConsumerRegTable.registerProvider(originInvoker,registryUrl, registeredProviderUrl)
目的: 当前服务提供者Invoker,以及该服务对应的注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
工作:
- 创建一个ProviderInvokerWrapper实例,包装了注册中URL,服务执行器, 以及简化后的URL;
- 获取providerUrl中的serviceKey;
- 从缓存providerInvokers根据key获取map;
- 如果为空,则放入一个空的ConcurrentHashMap, 再根据key将这个ConcurrentHashMap拿出来;
- 以invoker为key, wrapperInvoker包装实例为value, 放入invokers,即刚创建的ConcurrenthashMap;
public static <T> ProviderInvokerWrapper<T> registerProvider(Invoker<T> invoker, URL registryUrl, URL providerUrl) {
ProviderInvokerWrapper<T> wrapperInvoker = new ProviderInvokerWrapper<>(invoker, registryUrl, providerUrl);
String serviceUniqueName = providerUrl.getServiceKey();
// 根据
ConcurrentMap<Invoker, ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
if (invokers == null) {
providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashMap<>());
invokers = providerInvokers.get(serviceUniqueName);
}
invokers.put(invoker, wrapperInvoker);
return wrapperInvoker;
}
register(registryUrl, registeredProviderUrl)
注册到Zookeeper注册中心;
- 获取REGISTER_KEY的值,判断是否要注册;
- 需要注册则将简化后的服务URL放入注册中心;
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 注册服务,把简化后的服务提供者url注册到registryUrl中去
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
- 首先调用registryFactory.getRegistry获取注册中心实例, 由于前面已经创建一个注册中心实例,并且放入了缓存,因此,这个步骤中,直接从map中获取就可以了;
- 调用的ZookeeperRegistry#register方法;
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
// 调用ZookeeperRegistry的register方法
registry.register(registeredProviderUrl);
}
ZookeeperRegistry#register(registeredProviderUrl)
- ZookeeperRegistry本身并没有register方法,但继承了FailbackRegistry父类, 父类中已经实现了register方法
- 因此,调用ZookeeperRegistry实际上调用了父类FailbackRegistry的register方法;
- 调用了doRegister(url)方法, 由于注册中心有好几种,不可能是通用的操作,因此实现逻辑由子类去实现;即调用了ZookeeperRegistry#doRegister(url)方法
- 个人看法:普通工厂模式也是这种设计理念;
//FailbackRegistry#register
@Override
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
//省略部分代码;
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
ZookeeperRegistry#doRegister(url)
目的:给Zookeeper发送请求,创建一个值toUrlPath(url)为动态的配置结点;
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
toUrlPath(url)
获取结点的值;
扫描二维码关注公众号,回复:
16014419 查看本文章
//"/dubbo/com.demo.DemoService/providers/#{url}"
// toCategoryPath(url) ⇒ "/dubbo/com.demo.DemoService/providers"
// PATH_SEPARATOR ==> "/"
//URL.encode(url.toFullString())==>对url进行中文编码
private String toUrlPath(URL url) {
return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
}
//返回的结果:"/dubbo/com.demo.DemoService/providers"
// toServicePath(url) ⇒ "/dubbo/com.demo.DemoService"
// PATH_SEPARATOR ==> "/"
// 获取URL的catogory的值, 默认为providers;
private String toCategoryPath(URL url) {
return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
}
//结果: "/dubbo/com.demo.DemoService"
//toRootDir() ---> "/dubbo/"
//URL.encode(name) --> 对URL的name属性进行URL编码; name的值为 com.demo.DemoService
private String toServicePath(URL url) {
String name = url.getServiceInterface();
if (ANY_VALUE.equals(name)) {
return toRootPath();
}
return toRootDir() + URL.encode(name);
}
//结果为:根节点 的值 + “/”, 这里为 "/dubbo/"
private String toRootDir() {
if (root.equals(PATH_SEPARATOR)) {
return root;
}
return root + PATH_SEPARATOR;
}
服务导出源码流程
- ServiceBean.export()方法是导出的入口方法,会执行ServiceConfig.export()方法完成服务导出,导出完了之后会发布一个Spring事件ServiceBeanExportedEvent
- 在ServiceConfig.export()方法中会先调用checkAndUpdateSubConfigs(),这个方法主要完成AbstractConfig的参数刷新(从配置中心获取参数等等),AbstractConfig是指ApplicationConfig、ProtocolConfig、ServiceConfig等等,刷新完后会检查stub、local、mock等参数是否配置正确
- 参数刷新和检查完成了之后,就会开始导出服务,如果配置了延迟导出,那么则按指定的时间利用ScheduledExecutorService来进行延迟导出
- 否则调用doExport()进行服务导出
- 继续调用doExportUrls()进行服务导出
- 首先通过loadRegistries()方法获得所配置的注册中心的URL,可能配了多个配置中心,那么当前所导出的服务需要注册到每个配置中心去,这里,注册中心的是以URL的方式来表示的,使用的是什么注册中心、注册中心的地址和端口,给注册中心所配置的参数等等,都会存在在URL上,此URL以registry://开始
- 获得到注册中心的registryURLs之后,就会遍历当前服务所有的ProtocolConfig,调用doExportUrlsFor1Protocol(protocolConfig, registryURLs);方法把当前服务按每个协议每个注册中心分别进行导出
- 在doExportUrlsFor1Protocol()方法中,会先构造一个服务URL,包括
a. 服务的协议dubbo://,
b. 服务的IP和PORT,如果指定了就取指定的,没有指定IP就获取服务器上网卡的IP,
c. 以及服务的PATH,如果没有指定PATH参数,则取接口名
d. 以及服务的参数,参数包括服务的参数,服务中某个方法的参数
e. 最终得到的URL类似: dubbo://192.168.1.110:20880/com.tuling.DemoService?timeout=3000&&sayHello.loadbalance=random - 得到服务的URL之后,会把服务URL作为一个参数添加到registryURL中去,然后把registryURL、服务的接口、当前服务实现类ref生成一个Invoker代理对象,再把这个代理对象和当前ServiceConfig对象包装成一个DelegateProviderMetaDataInvoker对象,DelegateProviderMetaDataInvoker就表示了完整的一个服务
- 接下来就会使用Protocol去export导出服务了,导出之后将得到一个Exporter对象(该Exporter对象,可以理解为主要可以用来卸载(unexport)服务,什么时候会卸载服务?在优雅关闭Dubbo应用的时候)
- 接下来我们来详细看看Protocol是怎么导出服务的?
- 但调用protocol.export(wrapperInvoker)方法时,因为protocol是Protocol接口的一个Adaptive对象,所以此时会根据wrapperInvoker的genUrl方法得到一个url,根据此url的协议找到对应的扩展点,此时扩展点就是RegistryProtocol,但是,因为Protocol接口有两个包装类,一个是ProtocolFilterWrapper、ProtocolListenerWrapper,所以实际上在调用export方法时,会经过这两个包装类的export方法,但是在这两个包装类的export方法中都会Registry协议进行了判断,不会做过多处理,所以最终会直接调用到RegistryProtocol的export(Invoker originInvoker)方法
- 在RegistryProtocol的export(Invoker originInvoker)方法中,主要完成了以下几件事情:
a. 生成监听器,监听动态配置中心此服务的参数数据的变化,一旦监听到变化,则重写服务URL,并且在服务导出时先重写一次服务URL
b. 拿到重写之后的URL之后,调用doLocalExport()进行服务导出,在这个方法中就会调用DubboProtocol的export方法去导出服务了,导出成功后将得到一个ExporterChangeableWrapper
ⅰ. 在DubboProtocol的export方法中主要要做的事情就是启动NettyServer,并且设置一系列的RequestHandler,以便在接收到请求时能依次被这些RequestHandler所处理
ⅱ. 这些RequestHandler在上文已经整理过了
c. 从originInvoker中获取注册中心的实现类,比如ZookeeperRegistry
d. 将重写后的服务URL进行简化,把不用存到注册中心去的参数去除
e. 把简化后的服务URL调用ZookeeperRegistry.registry()方法注册到注册中心去
f. 最后将ExporterChangeableWrapper封装为DestroyableExporter对象返回,完成服务导出
Exporter架构
一个服务导出成功后,会生成对应的Exporter:
- DestroyableExporter:Exporter的最外层包装类,这个类的主要作用是可以用来unexporter对应的服务
- ExporterChangeableWrapper:这个类主要负责在unexport对应服务之前,把服务URL从注册中心中移除,把该服务对应的动态配置监听器移除
- ListenerExporterWrapper:这个类主要负责在unexport对应服务之后,把服务导出监听器移除
- DubboExporter:这个类中保存了对应服务的Invoker对象,和当前服务的唯一标志,当NettyServer接收到请求后,会根据请求中的服务信息,找到服务对应的DubboExporter对象,然后从对象中得到Invoker对象
总结
- 服务导出中使用构造器模式,反射, 工厂模式,代码通用性也很强;
- 大概了解服务导出的过程, 至于Dubbo配置的动态变化,这涉及到监听机制,Zookeeper的一大特性吧。 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener) 订阅注册中心配置, 同时有一个监听器,如果配置发生变动,则触发overrideSubscribeListener监听器,重新导入配置;