一、前沿
在 dubbo启动Bean生成 中我们了解了dubbo是如何借助 Spring 扩展功能来加载 bean 的,下面我们讲一下 dubbo 的 provider(提供者)的启动过程流程以及启动过程都做了些什么操作
二、服务导出原理
Dubbo 服务导出时机有如下两个:
1)、Spring 容器调用 ServiceBean 的 afterPropertiesSet 方法时导出服务
2)、 Spring 容器发布刷新事件,ServiceBean 接收到事件后,调用 onApplicationEvent 方法立即执行导出服务
服务导出大致分为三个部分:
1)、前置工作,主要用于检查参数,组装 URL
2)、导出服务,包含导出服务到本地 (JVM),和导出服务到远程
3)、向注册中心注册服务,用于服务发现
三、provider 服务导出过程
provider 启动过程中会暴露服务,将自己提供的服务导出,流程图如下:
暴露服务时序图:
四、provider启动过程源码
Spring 容器解析到 <dubbo:service> 标签时,在 Spring 容器中会生成对应的 ServiceBean,如下图:
provider 的服务发布都是在 ServiceBean 的 export 方法中完成的,下面我们先看看 ServiceBean,代码如下:
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
ApplicationEventPublisherAware
ServiceBean 继承了 ServiceConfig,实现了一系列的接口,即 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware,关于这些接口的用途,请参照以下文章:
InitializingBean 和 DisposableBean :InitializingBean&DisposableBean
****Aware:Spring Aware 接口
ApplicationListener:应用事件监听器,处理监听事件的
完整的 ServiceBean 源代码如下:
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
ApplicationEventPublisherAware {
private static final long serialVersionUID = 213195494150089726L;
private final transient Service service;
private transient ApplicationContext applicationContext;
private transient String beanName;
private transient boolean supportedApplicationListener;
private ApplicationEventPublisher applicationEventPublisher;
public ServiceBean() {
super();
this.service = null;
}
public ServiceBean(Service service) {
super(service);
this.service = service;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
// ApplicationContextAware 接口实现
this.applicationContext = applicationContext;
// 设置应用上下文
SpringExtensionFactory.addApplicationContext(applicationContext);
// 添加应用监听器
supportedApplicationListener = addApplicationListener(applicationContext, this);
}
@Override
public void setBeanName(String name) {
// 获取Spring容器中的 beanId,即 BeanNameAware 接口实现
this.beanName = name;
}
/**
* Gets associated {@link Service}
*
* @return associated {@link Service}
*/
public Service getService() {
return service;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// ApplicationListener 接口实现
// 当Spring 容器初始化完成之后,AbstractApplicationContext 的 finishRefresh 方法发布 ContextRefreshedEvent 事件,
// 最终调用 ApplicationListener (ServiceBean)的 onApplicationEvent 方法暴露服务
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
// 暴露服务
export();
}
}
@Override
@SuppressWarnings({"unchecked", "deprecation"})
public void afterPropertiesSet() throws Exception {
// InitializingBean 接口实现
// ServiceBean 属性初始化之后的操作
if (getProvider() == null) {
// ProviderConfig 提供者配置设置
Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
if (providerConfigMap != null && providerConfigMap.size() > 0) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if (CollectionUtils.isEmptyMap(protocolConfigMap)
&& providerConfigMap.size() > 1) { // backward compatibility
List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() != null && config.isDefault()) {
providerConfigs.add(config);
}
}
if (!providerConfigs.isEmpty()) {
setProviders(providerConfigs);
}
} else {
ProviderConfig providerConfig = null;
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() == null || config.isDefault()) {
if (providerConfig != null) {
throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
}
providerConfig = config;
}
}
if (providerConfig != null) {
setProvider(providerConfig);
}
}
}
}
if (getApplication() == null
&& (getProvider() == null || getProvider().getApplication() == null)) {
// ApplicationConfig 设置
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
ApplicationConfig applicationConfig = null;
for (ApplicationConfig config : applicationConfigMap.values()) {
if (applicationConfig != null) {
throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
}
applicationConfig = config;
}
if (applicationConfig != null) {
setApplication(applicationConfig);
}
}
}
if (getModule() == null
&& (getProvider() == null || getProvider().getModule() == null)) {
// ModuleConfig 设置
Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
ModuleConfig moduleConfig = null;
for (ModuleConfig config : moduleConfigMap.values()) {
if (config.isDefault() == null || config.isDefault()) {
if (moduleConfig != null) {
throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
}
moduleConfig = config;
}
}
if (moduleConfig != null) {
setModule(moduleConfig);
}
}
}
if (StringUtils.isEmpty(getRegistryIds())) {
if (getApplication() != null && StringUtils.isNotEmpty(getApplication().getRegistryIds())) {
setRegistryIds(getApplication().getRegistryIds());
}
if (getProvider() != null && StringUtils.isNotEmpty(getProvider().getRegistryIds())) {
setRegistryIds(getProvider().getRegistryIds());
}
}
if ((CollectionUtils.isEmpty(getRegistries()))
&& (getProvider() == null || CollectionUtils.isEmpty(getProvider().getRegistries()))
&& (getApplication() == null || CollectionUtils.isEmpty(getApplication().getRegistries()))) {
// RegistryConfig 注册配置设置
Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
if (CollectionUtils.isNotEmptyMap(registryConfigMap)) {
List<RegistryConfig> registryConfigs = new ArrayList<>();
if (StringUtils.isNotEmpty(registryIds)) {
Arrays.stream(COMMA_SPLIT_PATTERN.split(registryIds)).forEach(id -> {
if (registryConfigMap.containsKey(id)) {
registryConfigs.add(registryConfigMap.get(id));
}
});
}
if (registryConfigs.isEmpty()) {
for (RegistryConfig config : registryConfigMap.values()) {
if (StringUtils.isEmpty(registryIds)) {
registryConfigs.add(config);
}
}
}
if (!registryConfigs.isEmpty()) {
super.setRegistries(registryConfigs);
}
}
}
if (getMetadataReportConfig() == null) {
// MetadataReportConfig 设置
Map<String, MetadataReportConfig> metadataReportConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MetadataReportConfig.class, false, false);
if (metadataReportConfigMap != null && metadataReportConfigMap.size() == 1) {
super.setMetadataReportConfig(metadataReportConfigMap.values().iterator().next());
} else if (metadataReportConfigMap != null && metadataReportConfigMap.size() > 1) {
throw new IllegalStateException("Multiple MetadataReport configs: " + metadataReportConfigMap);
}
}
if (getConfigCenter() == null) {
// ConfigCenterConfig 配置中心配置设置
Map<String, ConfigCenterConfig> configenterMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConfigCenterConfig.class, false, false);
if (configenterMap != null && configenterMap.size() == 1) {
super.setConfigCenter(configenterMap.values().iterator().next());
} else if (configenterMap != null && configenterMap.size() > 1) {
throw new IllegalStateException("Multiple ConfigCenter found:" + configenterMap);
}
}
if (getMonitor() == null
&& (getProvider() == null || getProvider().getMonitor() == null)
&& (getApplication() == null || getApplication().getMonitor() == null)) {
// MonitorConfig 监控配置设置
Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
MonitorConfig monitorConfig = null;
for (MonitorConfig config : monitorConfigMap.values()) {
if (config.isDefault() == null || config.isDefault()) {
if (monitorConfig != null) {
throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
}
monitorConfig = config;
}
}
if (monitorConfig != null) {
setMonitor(monitorConfig);
}
}
}
if (getMetrics() == null) {
// MetricsConfig 设置
Map<String, MetricsConfig> metricsConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MetricsConfig.class, false, false);
if (metricsConfigMap != null && metricsConfigMap.size() > 0) {
MetricsConfig metricsConfig = null;
for (MetricsConfig config : metricsConfigMap.values()) {
if (metricsConfig != null) {
throw new IllegalStateException("Duplicate metrics configs: " + metricsConfig + " and " + config);
}
metricsConfig = config;
}
if (metricsConfig != null) {
setMetrics(metricsConfig);
}
}
}
if (StringUtils.isEmpty(getProtocolIds())) {
if (getProvider() != null && StringUtils.isNotEmpty(getProvider().getProtocolIds())) {
setProtocolIds(getProvider().getProtocolIds());
}
}
if (CollectionUtils.isEmpty(getProtocols())
&& (getProvider() == null || CollectionUtils.isEmpty(getProvider().getProtocols()))) {
// ProtocolConfig 协议配置设置
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
if (StringUtils.isNotEmpty(getProtocolIds())) {
Arrays.stream(COMMA_SPLIT_PATTERN.split(getProtocolIds()))
.forEach(id -> {
if (protocolConfigMap.containsKey(id)) {
protocolConfigs.add(protocolConfigMap.get(id));
}
});
}
if (protocolConfigs.isEmpty()) {
for (ProtocolConfig config : protocolConfigMap.values()) {
if (StringUtils.isEmpty(protocolIds)) {
protocolConfigs.add(config);
}
}
}
if (!protocolConfigs.isEmpty()) {
super.setProtocols(protocolConfigs);
}
}
}
if (StringUtils.isEmpty(getPath())) {
if (StringUtils.isNotEmpty(beanName)
&& StringUtils.isNotEmpty(getInterface())
&& beanName.startsWith(getInterface())) {
setPath(beanName);
}
}
if (!supportedApplicationListener) {
// 如果不支持应用监听器,属性初始化之后provider就直接暴露服务
export();
}
}
/**
* Get the name of {@link ServiceBean}
*
* @return {@link ServiceBean}'s name
* @since 2.6.5
*/
public String getBeanName() {
return this.beanName;
}
/**
* @since 2.6.5
*/
@Override
public void export() {
// 暴露服务
super.export();
// Publish ServiceBeanExportedEvent
publishExportEvent();
}
/**
* @since 2.6.5
*/
private void publishExportEvent() {
ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this);
applicationEventPublisher.publishEvent(exportEvent);
}
@Override
public void destroy() throws Exception {
// DisposableBean 接口实现
// no need to call unexport() here, see
// org.apache.dubbo.config.spring.extension.SpringExtensionFactory.ShutdownHookListener
}
// merged from dubbox
@Override
protected Class getServiceClass(T ref) {
if (AopUtils.isAopProxy(ref)) {
return AopUtils.getTargetClass(ref);
}
return super.getServiceClass(ref);
}
/**
* @param applicationEventPublisher
* @since 2.6.5
*/
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
// ApplicationEventPublisherAware 接口实现
this.applicationEventPublisher = applicationEventPublisher;
}
}
五、provider 服务导出源码
服务导出的入口是在 ServiceBean 中,下面我们就阅读相关源码对比一下是否符合暴露服务流程图和暴露服务时序图。
ServiceBean 中暴露服务的时机有两个:
1、afterPropertiesSet 方法中,Spring 初始化 ServiceBean 属性设置完成之后如果不支持应用监听器,则直接暴露服务
如下图所示:
2、onApplicationEvent 方法中,Spring 容器初始化完成之后,AbstractApplicationContext 的 finishRefresh 方法发布 ContextRefreshedEvent 事件,最终调用 ApplicationListener (ServiceBean)的 onApplicationEvent 方法暴露服务
具体过程如下图所示:
知道了暴露服务的时机,那具体的暴露过程是怎么样的呢?下面我们跟着源码分析一下
ServiceBean 中的 export 方法调用了 ServiceConfig 的 export 方法,代码如下:
public synchronized void export() {
checkAndUpdateSubConfigs();
if (!shouldExport()) {
// export 属性没有设置时默认是暴露
return;
}
if (shouldDelay()) {
// 延迟时间大于0,则延迟暴露服务
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 立即暴露服务
doExport();
}
}
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
// 如果已暴露直接返回
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
// 暴露URLs
doExportUrls();
}
其实现暴露服务url的真正逻辑在 ServiceConfig 的 doExportUrls 方法中,该方法主要实现了多协议多注册中心导出服务功能
代码如下:
private void doExportUrls() {
// 1、加载注册中心链接
List<URL> registryURLs = loadRegistries(true);
// 2、不同协议导出服务
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
// 根据不同的 Protocol 暴露服务,即生成对应的URL,例如 dubbo://192.168.1.247:20887/org.apache.dubbo.config.spring.api.DemoService?
// anyhost=true&application=service-class&bean.name=org.apache.dubbo.config.spring.api.DemoService&bind.ip=192.168.1.247&
// bind.port=20887&class=org.apache.dubbo.config.spring.impl.DemoServiceImpl&deprecated=false&dubbo=2.0.2&dynamic=true&
// generic=false&interface=org.apache.dubbo.config.spring.api.DemoService&methods=sayName,getBox&owner=world&pid=18196&
// register=true&release=&side=provider×tamp=1572313318352
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
这里大致分为了两步,第一步是 加载注册中心链接,第二是不同协议导出服务
5.1、加载注册中心链接
loadRegistries 方法实现了加载注册中心链接的功能,代码如下:
/**
*
* Load the registry and conversion it to {@link URL}, the priority order is: system property > dubbo registry config
*
* @param provider whether it is the provider side
* @return
*/
protected List<URL> loadRegistries(boolean provider) {
// check && override if necessary
List<URL> registryList = new ArrayList<URL>();
if (CollectionUtils.isNotEmpty(registries)) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (StringUtils.isEmpty(address)) {
// 若 address 为空,则将其设为 0.0.0.0
address = ANYHOST_VALUE;
}
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
// 若 address 非 N/A,才做处理
Map<String, String> map = new HashMap<String, String>();
// 添加 ApplicationConfig 中的字段信息到 map 中
appendParameters(map, application);
// 添加 RegistryConfig 字段信息到 map 中
appendParameters(map, config);
// 添加 path、pid,protocol 等信息到 map 中
map.put(PATH_KEY, RegistryService.class.getName());
appendRuntimeParameters(map);
if (!map.containsKey(PROTOCOL_KEY)) {
map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
}
// 解析得到 URL 列表,address 可能包含多个注册中心 ip,
// 因此解析得到的是一个 URL 列表
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
// 将 URL 协议头设置为 registry
url = URLBuilder.from(url)
.addParameter(REGISTRY_KEY, url.getProtocol())
.setProtocol(REGISTRY_PROTOCOL)
.build();
// 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:
// (服务提供者 && register = true 或 null)
// || (非服务提供者 && subscribe = true 或 null)
if ((provider && url.getParameter(REGISTER_KEY, true))
|| (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
loadRegistries 主要实现了以下逻辑:
1)、构建参数map映射表
2)、构建注册中心链接url
3)、遍历注册中心链接列表,根据条件判断是否加入 registryList 列表中
5.2、导出服务
导出服务中有分了两步,即 一是组装URL,二是导出服务
5.2.1、组装URL
URL 是 Dubbo 配置的载体,通过 URL 可让 Dubbo 的各种配置在各个模块之间传递。URL 对于 Dubbo,犹如水对于鱼,非常重要。大家在阅读 Dubbo 服务导出相关源码的过程中,要注意 URL 内容的变化。既然 URL 如此重要,那么下面我们来了解一下 URL 组装的过程。
doExportUrlsFor1Protocol 方法实现了组装URL的功能,这里我们将 doExportUrlsFor1Protocol 分成组装URL和导出服务两部分讲解,组装URL部分代码如下:
/**
* 转换成url
* 例如:dubbo://192.168.1.247:20887/org.apache.dubbo.config.spring.api.DemoService?anyhost=true&
* application=service-class&bean.name=org.apache.dubbo.config.spring.api.DemoService&
* bind.ip=192.168.1.247&bind.port=20887&class=org.apache.dubbo.config.spring.impl.DemoServiceImpl&
* deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.config.spring.api.DemoService&
* methods=sayName,getBox&owner=world&pid=18196®ister=true&release=&side=provider×tamp=1572313318352
*
* @param protocolConfig
* @param registryURLs
*/
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
// 协议类型为nullh或者空字符串,默认是 dubbo 协议,作为url开头
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
// url中的key-value保存在map中
Map<String, String> map = new HashMap<String, String>();
// side 可取值 provider 和 consumer,这里是暴露服务,所以是 provider,例如:side=provider
map.put(SIDE_KEY, PROVIDER_SIDE);
// url 中的 dubbo version、release、timestamp、pid 处理,例如:dubbo=2.0.2, release=,timestamp=1572313318352,pid=18196
appendRuntimeParameters(map);
// 通过反射方式将对象字段信息存储到map中
appendParameters(map, metrics);
// url 中的 application 处理,例如:application=service-class
appendParameters(map, application);
appendParameters(map, module);
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, provider);
appendParameters(map, protocolConfig);
// url 中的 service部分处理,例如 bean.name=org.apache.dubbo.config.spring.api.DemoService,
// class=org.apache.dubbo.config.spring.impl.DemoServiceImpl,
// interface=org.apache.dubbo.config.spring.api.DemoService,
// deprecated=false,dynamic=true, generic=false, owner=world, register=true
appendParameters(map, this);
// methods 为 MethodConfig 的集合,MethodConfig 存储了 <dubbo:method> 标签中配置的内容
if (CollectionUtils.isNotEmpty(methods)) {
for (MethodConfig method : methods) {
// 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。
// 比如存储 <dubbo:method name="sayHello" retries="2"> 对应的 MethodConfig,
// 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}
// url 中的 methods 处理,多个method以“,”拼接起来,例如 methods=sayName,getBox
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
// 检测 MethodConfig retry 是否为 false,若是,则设置重试次数为0
if (Boolean.FALSE.toString().equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
// 获取 method 的 ArgumentConfig(参数) 列表
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
// 比对方法名,查找目标方法
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
// 检测 ArgumentConfig 中的 type 属性与方法参数列表
// 中的参数名称是否一致,不一致则抛出异常
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
// 添加 ArgumentConfig 字段信息到 map 中,
// 键前缀 = 方法名.index,比如: map = {"sayHello.3": true}
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
// 从参数类型列表中查找类型名称为 argument.type 的参数
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
// 用户未配置 type 属性,但配置了 index 属性,且 index != -1
// 添加 ArgumentConfig 字段信息到 map 中
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
// 检测 generic 是否为 "true",并根据检测结果向 map 中添加不同的信息
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
// url 中的 revision 处理,例如:revision=0.0.1-SNAPSHOT
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
// 为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
// 添加方法名到 map 中,如果包含多个方法名,则用","隔开,比如 methods=sayName,getBox
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
// 所有方法名以","拼接起来
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// url 中的 token令牌 处理
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
// export service
// url 中的 bind.ip、 anyhost 处理,例如:bind.ip=192.168.1.247,anyhost=true
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
// url 中的 bind.port 处理,例如:bind.port=20887
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 组装url
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// .....此处省略了导出服务代码
this.urls.add(url);
}
组装URL过程中,先是将一些信息,比如版本、时间戳、方法名以及各种配置对象的字段信息放入到 map 中,map 中的内容将作为 URL 的查询字符串。构建好 map 后,紧接着是获取上下文路径、主机名以及端口号等信息。最后将 map 和主机名等数据传给 URL 构造方法创建 URL 对象。需要注意的是,这里出现的 URL 并非 java.net.URL,而是 com.alibaba.dubbo.common.URL。
其中 appendParameters 这个方法出现的次数比较多,该方法用于将对象字段信息添加到 map 中。实现上则是通过反射获取目标对象的 getter 方法,并调用该方法获取属性值。然后再通过 getter 方法名解析出属性名,比如从方法名 getName 中可解析出属性 name。如果用户传入了属性名前缀,此时需要将属性名加入前缀内容。最后将 <属性名,属性值> 键值对存入到 map 中
5.2.2 导出服务
doExportUrlsFor1Protocol 方法一部分是组装URL,组装好URL之后就是导出服务了,接下来我们看一下 doExportUrlsFor1Protocol 的导出服务部分代码:
/**
* 转换成url
* 例如:dubbo://192.168.1.247:20887/org.apache.dubbo.config.spring.api.DemoService?anyhost=true&
* application=service-class&bean.name=org.apache.dubbo.config.spring.api.DemoService&
* bind.ip=192.168.1.247&bind.port=20887&class=org.apache.dubbo.config.spring.impl.DemoServiceImpl&
* deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.config.spring.api.DemoService&
* methods=sayName,getBox&owner=world&pid=18196®ister=true&release=&side=provider×tamp=1572313318352
*
* @param protocolConfig
* @param registryURLs
*/
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// .....此处省略了组装URL部分代码
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
// 如果url使用的协议存在扩展,调用对应的扩展类来修改url
// 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
// 如果 scope 配置为 none 时,不暴露服务
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
// 如果 scope 配置为 非 remote 时,暴露本地服务
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// 如果 scope 配置为 非 local 时,暴露远程服务,本地协议名称为 injvm
if (!isOnlyInJvm() && logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
// 如果 protocol 仅仅是 injvm,则不需要注册
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 加载监视器链接
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 通过代理工厂将 ref 对象转换为 invoker 对象
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// 包装 invoker 对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 导出服务,并生成 Exporter,不同的协议实现了不同的暴露服务逻辑
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 一个服务可能有多个提供者,合并在一起
exporters.add(exporter);
}
} else {
// 不存在注册中心,仅导出服务
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
导出 dubbo 服务中,分为两种方式,即 导出到本地 (JVM) 和 导出到远程。
上面代码根据 url 中的 scope 参数决定服务导出方式,分别如下:
scope = none,不导出服务
scope != remote,导出到本地
scope != local,导出到远程
不管是导出到本地,还是远程。进行服务导出之前,均需要先创建 Invoker,这是一个很重要的步骤。因此下面先来分析 Invoker 的创建过程
5.2.2.1、invoker 创建过程
Dubbo 中,Invoker 是一个非常重要的模型。在 provider 和 consumer 均使用 Invoker 作为可执行体。Dubbo 官方文档中对 Invoker 进行了如下说明:
Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
既然 Invoker 如此重要,那么我们很有必要搞清楚 Invoker 的用途。Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。下面我们到 JavassistProxyFactory 代码中,探索 Invoker 的创建过程。代码如下:
/**
* JavaassistRpcProxyFactory
*/
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// 为目标类创建 Wrapper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
// 创建匿名 Invoker 对象,并实现了 doInvoke 方法
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 方法中会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
JavassistProxyFactory 的 getInvoker 方法创建了一个继承自 AbstractProxyInvoker 类的匿名对象,并覆写了抽象方法 doInvoke。doInvoke 仅是将调用请求转发给了 Wrapper 类的 invokeMethod 方法。Wrapper 用于“包裹”目标类,Wrapper 是一个abstract抽象类,仅可通过 getWrapper(Class) 方法创建子类。在创建 Wrapper 子类的过程中,子类代码生成逻辑会对 getWrapper 方法传入的 Class 对象进行解析,拿到诸如类方法,类成员变量等信息。以及生成 invokeMethod 方法代码和其他一些方法代码。代码生成完毕后,通过 Javassist 生成 Class 对象,最后再通过反射创建 Wrapper 实例。相关的代码如下:
/**
* get wrapper.
*
* @param c Class instance.
* @return Wrapper instance(not null).
*/
public static Wrapper getWrapper(Class<?> c) {
while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
{
c = c.getSuperclass();
}
if (c == Object.class) {
return OBJECT_WRAPPER;
}
// 缓存中获取 Wrapper
Wrapper ret = WRAPPER_MAP.get(c);
if (ret == null) {
// 缓存中不存在 Wrapper时,创建 Wrapper
ret = makeWrapper(c);
WRAPPER_MAP.put(c, ret);
}
return ret;
}
private static Wrapper makeWrapper(Class<?> c) {
if (c.isPrimitive()) {
// 如果 c 是否为基本类型,若是则抛出异常
throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
}
String name = c.getName();
ClassLoader cl = ClassUtils.getClassLoader(c);
// c1 用于存储 setPropertyValue 方法代码
StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
// c2 用于存储 getPropertyValue 方法代码
StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
// c3 用于存储 invokeMethod 方法代码
StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
// 生成代码异常,并捕获异常抛出 IllegalArgumentException
// 例如:DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); }
c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
// pts 用于存储成员变量名和类型
Map<String, Class<?>> pts = new HashMap<>(); // <property name, property types>
// ms 用于存储方法描述(方法签名)和方法实例
Map<String, Method> ms = new LinkedHashMap<>(); // <method desc, Method instance>
// mns 用于存储方法名列表
List<String> mns = new ArrayList<>(); // method names.
// dmns 用于存储声明在当前类中的方法名列表
List<String> dmns = new ArrayList<>(); // declaring method names.
// get all public field.
// 获取 public 类型的字段,并为所有字段生成条件判断语句
for (Field f : c.getFields()) {
String fn = f.getName();
Class<?> ft = f.getType();
if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) {
// 字段被 static 或者 transient 修饰的不做处理
continue;
}
// 生成条件判断及赋值语句,比如:
// if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
// if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}
c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
// 生成条件判断及返回语句,比如:
// if( $2.equals("name") ) { return ($w)w.name; }
c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
// pts 中存储字段名(key)+字段类型(value)
pts.put(fn, ft);
}
Method[] methods = c.getMethods();
// get all public method.
// 查找c中是否有声明的方法
boolean hasMethod = hasMethods(methods);
if (hasMethod) {
c3.append(" try{");
for (Method m : methods) {
//ignore Object's method.
if (m.getDeclaringClass() == Object.class) {
// 忽略 Object 中定义的方法
continue;
}
String mn = m.getName();
// 生成方法名判断语句,比如:
// if ( "sayHello".equals( $2 )
c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
int len = m.getParameterTypes().length;
// 生成“运行时传入的参数数量与方法参数列表长度”判断语句,比如:
// && $3.length == 2
c3.append(" && ").append(" $3.length == ").append(len);
boolean override = false;
for (Method m2 : methods) {
if (m != m2 && m.getName().equals(m2.getName())) {
// 如果方法对象不同但是方法名相同,则存在方法重载
override = true;
break;
}
}
// 对重载方法进行处理,考虑下面的方法:
// 1. void sayHello(Integer, String)
// 2. void sayHello(Integer, Integer)
// 方法名相同,参数列表长度也相同,因此不能仅通过这两项判断两个方法是否相等。
// 需要进一步判断方法的参数类型
if (override) {
if (len > 0) {
for (int l = 0; l < len; l++) {
// 生成参数类型进行检测代码,比如:
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")
c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
.append(m.getParameterTypes()[l].getName()).append("\")");
}
}
}
// 添加 ) {,完成方法判断语句,此时生成的代码可能如下(已格式化):
// if ("sayHello".equals($2)
// && $3.length == 2
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")) {
c3.append(" ) { ");
// 根据返回值类型生成目标方法调用语句
if (m.getReturnType() == Void.TYPE) {
// 返回值为空,比如:w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;
c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
} else {
// 返回值不为空,比如:return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
}
// 添加 }, 生成的代码形如(已格式化):
// if ("sayHello".equals($2)
// && $3.length == 2
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")) {
//
// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
// return null;
// }
c3.append(" }");
// 方法名添加到 mns 集合中
mns.add(mn);
if (m.getDeclaringClass() == c) {
// 如果当前方法是在 c 中被声明的,则添加到 dmns 集合中
dmns.add(mn);
}
// 方法描述信息(key)和方法实例(value)添加到 ms 集合中
ms.put(ReflectUtils.getDesc(m), m);
}
// 添加异常捕捉语句
c3.append(" } catch(Throwable e) { ");
c3.append(" throw new java.lang.reflect.InvocationTargetException(e); ");
c3.append(" }");
}
// 添加 NoSuchMethodException 异常抛出代码
c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");
// deal with get/set method.
// 处理 get 或者 set 方法
Matcher matcher;
for (Map.Entry<String, Method> entry : ms.entrySet()) {
String md = entry.getKey();
Method method = entry.getValue();
if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
// 匹配以 get 开头的方法
// 获取属性名,例如 getName 对应的属性名就是 name
String pn = propertyName(matcher.group(1));
// 生成属性判断以及返回语句,示例如下:
// if( $2.equals("name") ) { return ($w).w.getName(); }
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
} else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
// 匹配以 is|has|can 开头的方法
String pn = propertyName(matcher.group(1));
// 生成属性判断以及返回语句,示例如下:
// if( $2.equals("dream") ) { return ($w).w.hasDream(); }
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
} else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
// 匹配以 set 开头的方法
Class<?> pt = method.getParameterTypes()[0];
String pn = propertyName(matcher.group(1));
// 生成属性判断以及 setter 调用语句,示例如下:
// if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");
pts.put(pn, pt);
}
}
// 添加 NoSuchPropertyException 异常抛出代码
c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
// make class
// WRAPPER 类计数器
long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
// 创建类生成器
ClassGenerator cc = ClassGenerator.newInstance(cl);
// 设置类名
cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
// 设置超类
cc.setSuperClass(Wrapper.class);
// 添加默认构造方法
cc.addDefaultConstructor();
// 添加字段
cc.addField("public static String[] pns;"); // property name array.
cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
cc.addField("public static String[] mns;"); // all method name array.
cc.addField("public static String[] dmns;"); // declared method name array.
for (int i = 0, len = ms.size(); i < len; i++) {
cc.addField("public static Class[] mts" + i + ";");
}
// 添加方法代码
cc.addMethod("public String[] getPropertyNames(){ return pns; }");
cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
cc.addMethod("public String[] getMethodNames(){ return mns; }");
cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
cc.addMethod(c1.toString());
cc.addMethod(c2.toString());
cc.addMethod(c3.toString());
try {
// 生成类
Class<?> wc = cc.toClass();
// setup static field.
// 设置字段值
wc.getField("pts").set(null, pts);
wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
wc.getField("mns").set(null, mns.toArray(new String[0]));
wc.getField("dmns").set(null, dmns.toArray(new String[0]));
int ix = 0;
for (Method m : ms.values()) {
wc.getField("mts" + ix++).set(null, m.getParameterTypes());
}
// 创建 Wrapper 实例并返回
return (Wrapper) wc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
// 清除资源
cc.release();
ms.clear();
mns.clear();
dmns.clear();
}
}
上述代码比较长,阅读起来比较晦涩,要想读懂 Wrapper 类代码需要对 javassist 框架有所了解,对 javassist 的了解这里不做分析了,自己查阅资料即可,上述代码实现的功能在这里给大家总结一下:
1)、获取 public 类型的字段,并为所有字段生成条件判断语句、赋值语句和返回语句代码
2)、类中所有声明的方法生成方法名判断语句、重载方法生成方法参数类型检测、根据返回值类型生成目标方法调用语句、异常捕获代码
3)、匹配以 set、get、is、has、can 方法开头的所有方法,生成这些方法的属性名判断及返回语句代码
4)、设置类名、超类、默认构造方法、字段代码和方法代码后使用 ClassGenerator 类生成器将代码生成类对象
5)、 类设置字段值,创建 Wrapper 实例并返回
6)、清除资源
5.2.2.2 导出服务
invoker 创建好之后,就会导出服务,导出服务有两种:本地导出和远程导出
5.2.2.2.1 本地服务导出
本地服务导出调用的是 exportLocal 方法,代码如下:
/**
* always export injvm
*/
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
// 设置协议头为 injvm
.setProtocol(LOCAL_PROTOCOL)
// 设置主机为本机ip,即 127.0.0.1
.setHost(LOCALHOST_VALUE)
// 设置端口为0
.setPort(0)
.build();
// 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法
Exporter<?> exporter = protocol.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 本地服务导出仅仅是创建了 InjvmExporter
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
本地服务导出代码逻辑很简单,共包含了三个步骤:
1)、创建新的URL,在原URL基础上重置协议、主机ip 和端口
2)、创建invoker
3)、调用 InjvmProtocol 的 export 方法创建 InjvmExporter
5.2.2.2.2 远程服务导出
远程服务导出相对复杂一些,这里以默认dubbo协议导出为例讲解。如果存在注册中心url,即会调用 RegistryProtocol 的 export 方法导出服务,不存在时则直接调用 DubboProtocol 的 export 方法导出服务。在 RegistryProtocol 的 export 方法中最终也会调用 DubboProtocol 的 export 方法实现真正的导出服务。我们这里就从RegistryProtocol 的 export 方法介绍,代码如下:
/**
* 注册协议导出服务,这里使用 Zookeeper作为注册中心
*
* @param originInvoker
* @param <T>
* @return
* @throws RpcException
*/
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 获取注册中心url,
// 例如:zookeeper://192.168.1.235:2181/org.apache.dubbo.registry.RegistryService?application=service-class&
// dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.1.247%3A20887%2Forg.apache.dubbo.config.spring.api.DemoService%3F
// anyhost%3Dtrue%26application%3Dservice-class%26bean.name%3Dorg.apache.dubbo.config.spring.api.DemoService%26
// bind.ip%3D192.168.1.247%26bind.port%3D20887%26class%3Dorg.apache.dubbo.config.spring.impl.DemoServiceImpl%26
// deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3D
// org.apache.dubbo.config.spring.api.DemoService%26methods%3DsayName%2CgetBox%26owner%3Dworld%26pid%3D24316%26
// register%3Dtrue%26release%3D%26side%3Dprovider%26timestamp%3D1572405725011&owner=world&pid=24316×tamp=1572405725005
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
// 获取提供者url
// 例如:dubbo://192.168.1.247:20887/org.apache.dubbo.config.spring.api.DemoService?anyhost=true&application=service-class&
// bean.name=org.apache.dubbo.config.spring.api.DemoService&bind.ip=192.168.1.247&bind.port=20887&
// class=org.apache.dubbo.config.spring.impl.DemoServiceImpl&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&
// interface=org.apache.dubbo.config.spring.api.DemoService&methods=sayName,getBox&owner=world&pid=24316®ister=true&
// release=&side=provider×tamp=1572405725011
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
// 获取订阅url
// 例如:provider://192.168.1.247:20887/org.apache.dubbo.config.spring.api.DemoService?anyhost=true&application=service-class&
// bean.name=org.apache.dubbo.config.spring.api.DemoService&bind.ip=192.168.1.247&bind.port=20887&category=configurators&
// check=false&class=org.apache.dubbo.config.spring.impl.DemoServiceImpl&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&
// interface=org.apache.dubbo.config.spring.api.DemoService&methods=sayName,getBox&owner=world&pid=24316®ister=true&
// release=&side=provider×tamp=1572405725011
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// 创建订阅监听器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
// 导出服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// 根据 URL 加载 Registry 实现类,这里使用的是Zookeeper注册中心, 获取到的自然是 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 获取已注册的服务提供者 URL,并过滤url中的参数,例如 bind.ip、bind.port 等
// 例如:dubbo://192.168.1.247:20887/org.apache.dubbo.config.spring.api.DemoService?anyhost=true&application=service-class&
// bean.name=org.apache.dubbo.config.spring.api.DemoService&class=org.apache.dubbo.config.spring.impl.DemoServiceImpl&
// deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.config.spring.api.DemoService&
// methods=sayName,getBox&owner=world&pid=10428®ister=true&release=&side=provider×tamp=1572413677641
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
// 向服务提供者与消费者注册表中注册服务提供者,即provider
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
// 获取register参数值,默认是注册服务
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 向注册中心注册服务
register(registryUrl, registeredProviderUrl);
// 设置已注册标识
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
// 向注册中心订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
// 创建 DestroyableExporter 并返回
return new DestroyableExporter<>(exporter);
}
上述方法共实现了以下逻辑:
1)、doLocalExport 方法中调用 DubboProtocol 的 export方法导出服务
2)、向注册中心注册服务,即URL
3)、向注册中心订阅 override 数据
4)、创建并返回 DestroyableExporter
上述逻辑中,我们着重讲一下 doLocalExport 方法导出服务 和 注册中心注册服务,
5.2.2.2.2.1 导出服务
首先从 doLocalExport 方法开始,代码如下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
// 获取 originInvoker 中的 url 中的 export 参数值
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
// 创建 Invoker 委托类对象 InvokerDelegate
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// 调用 protocol 的 export方法导出服务,providerUrl 的协议是 dubbo, 故这里调用的是 DubboProtocol 的 export 方法
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
protocol 的 export 调用逻辑:doLocalExport(RegistryProtocol) -> export(ProtocolFilterWrapper) -> export(ProtocolListenerWrapper) -> export(DubboProtocol)
如下图所示:
最终调用的就是 DubboProtocol 的 export 方法,该方法代码如下:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 获取invoker 的 url
// 例如:dubbo://192.168.1.247:20887/org.apache.dubbo.config.spring.api.DemoService?anyhost=true&application=service-class&
// bean.name=org.apache.dubbo.config.spring.api.DemoService&bind.ip=192.168.1.247&bind.port=20887&
// class=org.apache.dubbo.config.spring.impl.DemoServiceImpl&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&
// interface=org.apache.dubbo.config.spring.api.DemoService&methods=sayName,getBox&owner=world&pid=20628®ister=true&
// release=&side=provider×tamp=1572416746813
URL url = invoker.getUrl();
// export service.
// 获取服务标识,由服务组名,服务名,服务版本号以及端口组成
// 例如:testGroup/org.apache.dubbo.config.spring.api.DemoService:1.0.0:20887
String key = serviceKey(url);
// 构建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 放入缓存
exporterMap.put(key, exporter);
//export an stub service for dispatching event
// 本地存根相关代码
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 启动服务器
openServer(url);
// 优化序列化
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
// 获取服务器地址 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例,例如:192.168.1.247:20887
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 缓存中不存在server时,调用createServer方法创建server
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
// 服务器已创建,则根据 url 中的配置进行重置服务器
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
// 增加channel.readonly.sent参数配置到url中
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
// 增加心跳检测配置到url中
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
// 增加 DubboCodec 编码解码器参数配置到url中
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
// 获取 server 参数,默认是 netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 创建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 获取 client 参数,可指定 netty,mina
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
// 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
// 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
// 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
private void optimizeSerialization(URL url) throws RpcException {
// 获取url中配置的optimizer参数值
String className = url.getParameter(OPTIMIZER_KEY, "");
if (StringUtils.isEmpty(className) || optimizers.contains(className)) {
// 不存在配置序列化或者缓存中已存在,直接返回
return;
}
logger.info("Optimizing the serialization process for Kryo, FST, etc...");
try {
Class clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
if (!SerializationOptimizer.class.isAssignableFrom(clazz)) {
throw new RpcException("The serialization optimizer " + className + " isn't an instance of " + SerializationOptimizer.class.getName());
}
// 实例化序列化对象
SerializationOptimizer optimizer = (SerializationOptimizer) clazz.newInstance();
if (optimizer.getSerializableClasses() == null) {
return;
}
for (Class c : optimizer.getSerializableClasses()) {
// 序列化类注册器注册该序列化对象
SerializableClassRegistry.registerClass(c);
}
// 加入序列化缓存
optimizers.add(className);
} catch (ClassNotFoundException e) {
throw new RpcException("Cannot find the serialization optimizer class: " + className, e);
} catch (InstantiationException e) {
throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
} catch (IllegalAccessException e) {
throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
}
}
上述过程相对来说还是比较简单易懂的,createServer 中包含了三个核心逻辑:
1)、通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,URL中添加心跳检测、编码解码等配置
2)、创建 ExchangeServer
3)、检测当前 Dubbo 所支持的 Transporter 是否包含 client 所表示的 Transporter
1 和 3 的过程已经很清楚了,只有 2 创建 ExchangeServer 不够明确,下面分析一下过程,代码如下:
// 1、 Exchangers 的 bind 方法
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
// url中不存在编码解码器时,这里设置 ExchangeCodec 编码解码器
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取 Exchanger,默认为 HeaderExchanger,调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
return getExchanger(url).bind(url, handler);
}
// 2、HeaderExchanger 的 bind 方法
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 核心逻辑就是 Transporters 的 bind 方法调用,其他就是创建对象,这里就不讲述了
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
// 3、Transporters 的 bind 方法
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 ChannelHandler 数量大于1,则创建 ChannelHandler 派发器
handler = new ChannelHandlerDispatcher(handlers);
}
// getTransporter() 获取的 Transporter 默认是 NettyTransporter(根据SPI注解参数配置决定)
return getTransporter().bind(url, handler);
}
// 4、NettyTransporter 的 bind 方法
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
// 创建 NettyServer
return new NettyServer(url, listener);
}
// 5、NettyServer 的构造方法
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
// 调用父类的构造方法
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
// 6、AbstractServer 的构造方法
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// 获取地址,例如:/192.168.1.247:20887
localAddress = getUrl().toInetSocketAddress();
// 获取ip和端口,例如:ip:192.168.1.247,port:20887
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
// 如果url中的anyhost为true 或者 ip是本机地址,则 ip 赋值为 0.0.0.0
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 获取最大可接受连接数,默认是0
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
// 获取空闲超时时间,默认是600000,即600s
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
// 调用模板方法启动服务
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
/**
* 初始化和启动netty服务
*
* Init and start netty server
*
* @throws Throwable
*/
// 7、NettyServer 的 doOpen 方法
@Override
protected void doOpen() throws Throwable {
// 创建 ServerBootstrap
bootstrap = new ServerBootstrap();
// 创建 boss 和 worker 线程池
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
// 绑定到指定的ip和端口上
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
上述就是创建 ExchangeServer 的整个流程,其中最重要的就是创建 NettyServer,dubbo 默认使用的 NettyServer 是基于 netty 4.x 版本的
到这里我们已经将服务导出的过程分析完了,逻辑还是比较复杂的,多使用测试demo进行源码调试,以便更好的理解整个过程
5.2.2.2.2.2 注册中心注册服务
服务注册对于dubbo来说不是必须的,但是我们一般为了服务统一治理,都会使用注册中心来治理,因此这块内容也很重要,是必须掌握的
我们以 Zookeeper 注册中心为示例,来讲解服务注册模块。服务注册的入口调用是在 RegistryProtocol 的 export 方法上,代码如下:
/**
* 注册协议导出服务,这里使用 Zookeeper作为注册中心
*
* @param originInvoker
* @param <T>
* @return
* @throws RpcException
*/
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// ....省略了其他部分代码
//to judge if we need to delay publish
// 获取register参数值,默认是注册服务
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 向注册中心注册服务
register(registryUrl, registeredProviderUrl);
// 设置已注册标识
providerInvokerWrapper.setReg(true);
}
//Ensure that a new exporter instance is returned every time export
// 创建 DestroyableExporter 并返回
return new DestroyableExporter<>(exporter);
}
调用了本类中的 register 方法,代码如下:
public void register(URL registryUrl, URL registeredProviderUrl) {
// 1、获取 Registry 注册中心实例
Registry registry = registryFactory.getRegistry(registryUrl);
// 2、向注册中心注册服务
registry.register(registeredProviderUrl);
}
register 方法有两步操作:
1)、获取注册中心实例 Registry
2)、向注册中心注册服务
5.2.2.2.2.2.1 获取注册中心实例
创建注册中心实例,调用的是 AbstractRegistryFactory 的 getRegistry 方法,代码如下:
@Override
public Registry getRegistry(URL url) {
url = URLBuilder.from(url)
// 设置 path 变量值为 RegistryService 的 name
.setPath(RegistryService.class.getName())
// 添加 interface 到URL中
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
// URL中移除 export 参数
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
// 缓存中没有获取到注册中心实例,则通过调用模板方法 createRegistry 创建注册中心实例,
// 这里由具体的注册中心实现,我们使用的是 Zookeeper,那这里使用是 ZookeeperRegistry 注册中心
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
接下来我们来看 ZookeeperRegistry 中的具体创建注册中心实例的逻辑,代码如下:
// 1、ZookeeperRegistryFactory 的 createRegistry 方法
@Override
public Registry createRegistry(URL url) {
// 创建 ZookeeperRegistry 实例
return new ZookeeperRegistry(url, zookeeperTransporter);
}
// 2、ZookeeperRegistry 的构造方法
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获取组名,默认是dubbo
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
// group = "/" + group
group = PATH_SEPARATOR + group;
}
this.root = group;
// 创建 Zookeeper 客户端,zookeeperTransporter 默认为 CuratorZookeeperTransporter
// zkClient 类型是 CuratorZookeeperClient
zkClient = zookeeperTransporter.connect(url);
// 添加状态监听器
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
// zk 重连逻辑
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
/**
* share connnect for registry, metadata, etc..
* <p>
* Make sure the connection is connected.
*
* @param url
* @return
*/
// 3、AbstractZookeeperTransporter 的 connect 方法
@Override
public ZookeeperClient connect(URL url) {
ZookeeperClient zookeeperClient;
// 获取 Zookeeper 集群url列表
List<String> addressList = getURLBackupAddress(url);
// The field define the zookeeper server , including protocol, host, port, username, password
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// avoid creating too many connections, so add lock
synchronized (zookeeperClientMap) {
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// 调用 CuratorZookeeperTransporter 的 createZookeeperClient 方法创建 zookeeperClient
zookeeperClient = createZookeeperClient(toClientURL(url));
logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
writeToClientMap(addressList, zookeeperClient);
}
return zookeeperClient;
}
// 4、CuratorZookeeperTransporter 的 createZookeeperClient 方法
@Override
public ZookeeperClient createZookeeperClient(URL url) {
// 构造 CuratorZookeeperClient
return new CuratorZookeeperClient(url);
}
// 5、CuratorZookeeperClient 的构造方法,
public CuratorZookeeperClient(URL url) {
super(url);
try {
int timeout = url.getParameter(TIMEOUT_KEY, 5000);
// 创建 CuratorFramework 构造器
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(timeout);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
// 构造zk客户端实例 CuratorFramework
client = builder.build();
// 添加状态监听器
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
// 启动客户端
client.start();
boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
if (!connected) {
throw new IllegalStateException("zookeeper not connected");
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
创建注册中心实例其实就是创建 Zookeeper 客户端的过程,代码中注释已经讲得很清楚了,这里就不在赘述了
5.2.2.2.2.2.1 注册中心注册服务
向注册中心注册服务,说白了就是将服务配置URL数据写入 Zookeeper 的某个路径的节点下,也就是创建节点,测试demo运行起来,写入Zookeeper数据示例如下图:
服务注册的接口为 register(URL),这个方法定义在 FailbackRegistry 抽象类中,整个过程的代码如下:
// 1、FailbackRegistry 的 register方法
@Override
public void register(URL url) {
// 设置url到已注册的缓存中
super.register(url);
// 移除注册失败的url
removeFailedRegistered(url);
// 移除失败的没有注册的url
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
// 模板方法,由具体的注册中心实现,这里调用的是 ZookeeperRegistry 的 doRegister
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
// 记录注册失败的url
addFailedRegistered(url);
}
}
// 2、ZookeeperRegistry 的 doRegister 方法
@Override
public void doRegister(URL url) {
try {
// 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下: /${group}/${serviceInterface}/providers/${url}
// 例如:/dubbo/org.apache.dubbo.config.spring.api.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
// 3、AbstractZookeeperClient 的 create 方法
@Override
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
// 如果非临时节点,则检查该路径是否存在
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
}
// 根据 ephemeral 的值创建临时或持久节点
if (ephemeral) {
// 创建临时节点
createEphemeral(path);
} else {
// 创建持久节点
createPersistent(path);
}
}
到这里向注册中心注册服务的流程就已经讲完了,还是很简单的
总结
本文详细分析了 Dubbo provider 启动过程中服务导出全过程,包括配置检测,加载注册中心链接、URL 组装,Invoker 创建、导出服务和注册服务等等。文章内容很多,需要足够的耐心阅读,有不正确之处请大家指正
参考:
https://dubbo.apache.org/zh-cn/docs/source_code_guide/export-service.html