SPI ,全称为 Service Provider Interface,是一种服务发现机制。它通过在ClassPath路径下的META-INF/services文件夹查找文件,自动加载文件里所定义的类。
Dubbo 改进了 JDK 标准的 SPI 的一些问题:
- JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。
- 如果扩展点加载失败,连扩展点的名称都拿不到了。
- 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。
ExtensionLoader拓展加载器:
/**
* 拓展加载器集合
*
* key:拓展接口
*/
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
/**
* 拓展实现类集合
*
* key:拓展实现类
* value:拓展对象。
*
* 例如,key 为 Class<AccessLogFilter>
* value 为 AccessLogFilter 对象
*/
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
// ============================== 对象属性 ==============================
/**
* 拓展接口。
* 例如,Protocol
*/
private final Class<?> type;
/**
* 对象工厂
*
* 用于调用 {@link #injectExtension(Object)} 方法,向拓展对象注入依赖属性。
*
* 例如,StubProxyFactoryWrapper 中有 `Protocol protocol` 属性。
*/
private final ExtensionFactory objectFactory;
/**
* 缓存的拓展名与拓展类的映射。
*
* 和 {@link #cachedClasses} 的 KV 对调。
*
* 通过 {@link #loadExtensionClasses} 加载
*/
private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
/**
* 缓存的拓展实现类集合。
*
* 不包含如下两种类型:
* 1. 自适应拓展实现类。例如 AdaptiveExtensionFactory
* 2. 带唯一参数为拓展接口的构造方法的实现类,或者说拓展 Wrapper 实现类。例如,ProtocolFilterWrapper 。
* 拓展 Wrapper 实现类,会添加到 {@link #cachedWrapperClasses} 中
*
* 通过 {@link #loadExtensionClasses} 加载
*/
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
/**
* 拓展名与 @Activate 的映射
*
* 例如,AccessLogFilter。
*
* 用于 {@link #getActivateExtension(URL, String)}
*/
private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
/**
* 缓存的拓展对象集合
*
* key:拓展名
* value:拓展对象
*
* 例如,Protocol 拓展
* key:dubbo value:DubboProtocol
* key:injvm value:InjvmProtocol
*
* 通过 {@link #loadExtensionClasses} 加载
*/
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();
/**
* 缓存的自适应( Adaptive )拓展对象
*/
private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();
/**
* 缓存的自适应拓展对象的类
*
* {@link #getAdaptiveExtensionClass()}
*/
private volatile Class<?> cachedAdaptiveClass = null;
/**
* 缓存的默认拓展名
*
* 通过 {@link SPI} 注解获得
*/
private String cachedDefaultName;
/**
* 创建 {@link #cachedAdaptiveInstance} 时发生的异常。
*
* 发生异常后,不再创建,参见 {@link #createAdaptiveExtension()}
*/
private volatile Throwable createAdaptiveInstanceError;
/**
* 拓展 Wrapper 实现类集合
*
* 带唯一参数为拓展接口的构造方法的实现类
*
* 通过 {@link #loadExtensionClasses} 加载
*/
private Set<Class<?>> cachedWrapperClasses;
/**
* 拓展名 与 加载对应拓展类发生的异常 的 映射
*
* key:拓展名
* value:异常
*
* 在 {@link #loadFile(Map, String)} 时,记录
*/
private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<String, IllegalStateException>();
getExtensionClasses() 方法,获得拓展实现类数组。
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
private volatile Class<?> cachedAdaptiveClass = null;
private Set<Class<?>> cachedWrapperClasses;
/**
* 获得拓展实现类数组
*
* @return 拓展实现类数组
*/
private Map<String, Class<?>> getExtensionClasses() {
// 从缓存中,获得拓展实现类数组
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 从配置文件中,加载拓展实现类数组
classes = loadExtensionClasses();
// 设置到缓存中
cachedClasses.set(classes);
}
}
}
return classes;
}
loadExtensionClasses() 方法,从多个配置文件中,加载拓展实现类数组:
/**
* 加载拓展实现类数组
*
* 无需声明 synchronized ,因为唯一调用该方法的 {@link #getExtensionClasses()} 已经声明。
* // synchronized in getExtensionClasses
*
* @return 拓展实现类数组
*/
private Map<String, Class<?>> loadExtensionClasses() {
// 通过 @SPI 注解,获得默认的拓展实现类名
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) cachedDefaultName = names[0];
}
}
// 从配置文件中,加载拓展实现类数组
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadFile(extensionClasses, DUBBO_DIRECTORY);
loadFile(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
loadFile(extensionClasses, dir) 方法,从一个配置文件中,加载拓展实现类数组:
/**
* 缓存的自适应拓展对象的类
*
* {@link #getAdaptiveExtensionClass()}
*/
private volatile Class<?> cachedAdaptiveClass = null;
/**
* 拓展 Wrapper 实现类集合
*
* 带唯一参数为拓展接口的构造方法的实现类
*
* 通过 {@link #loadExtensionClasses} 加载
*/
private Set<Class<?>> cachedWrapperClasses;
/**
* 拓展名与 @Activate 的映射
*
* 例如,AccessLogFilter。
*
* 用于 {@link #getActivateExtension(URL, String)}
*/
private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
/**
* 缓存的拓展名与拓展类的映射。
*
* 和 {@link #cachedClasses} 的 KV 对调。
*
* 通过 {@link #loadExtensionClasses} 加载
*/
private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
/**
* 拓展名 与 加载对应拓展类发生的异常 的 映射
*
* key:拓展名
* value:异常
*
* 在 {@link #loadFile(Map, String)} 时,记录
*/
private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<String, IllegalStateException>();
/**
* 从一个配置文件中,加载拓展实现类数组。
*
* @param extensionClasses 拓展类名数组
* @param dir 文件名
*/
private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
// 获得完整的文件名
String fileName = dir + type.getName();
try {
Enumeration<java.net.URL> urls;
// 获得文件名对应的所有文件URL数组
ClassLoader classLoader = findClassLoader();
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
// 遍历文件数组
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL url = urls.nextElement();
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
try {
String line;
while ((line = reader.readLine()) != null) {
// 跳过当前被注释掉的情况,例如 #spring=xxxxxxxxx
final int ci = line.indexOf('#');
if (ci >= 0) line = line.substring(0, ci);
line = line.trim();
if (line.length() > 0) {
try {
// 拆分,key=value 的配置格式,按照 key=value 的配置拆分。其中 name 为拓展名,line 为拓展实现类名。注意,上文我们提到过 Dubbo SPI 会兼容 Java SPI 的配置格式,那么按照此处的解析方式,name 会为空。这种情况下,拓展名会自动生成,
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
// 判断拓展实现,是否实现拓展接口
Class<?> clazz = Class.forName(line, true, classLoader);
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + "is not subtype of interface.");
}
// 缓存自适应拓展对象的类到 `cachedAdaptiveClass`
if (clazz.isAnnotationPresent(Adaptive.class)) {
if (cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getClass().getName()
+ ", " + clazz.getClass().getName());
}
} else {
// 缓存拓展 Wrapper 实现类到 `cachedWrapperClasses`
try {
clazz.getConstructor(type);
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
// 缓存拓展实现类到 `extensionClasses`
} catch (NoSuchMethodException e) {
clazz.getConstructor();
// 未配置拓展名,自动生成。例如,DemoFilter 为 demo 。主要用于兼容 Java SPI 的配置。
if (name == null || name.length() == 0) {
name = findAnnotationName(clazz);
if (name == null || name.length() == 0) {
if (clazz.getSimpleName().length() > type.getSimpleName().length()
&& clazz.getSimpleName().endsWith(type.getSimpleName())) {
name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
} else {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
}
}
}
// 获得拓展名,可以是数组,有多个拓展名。
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
// 缓存 @Activate 到 `cachedActivates` 。
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}
for (String n : names) {
// 缓存到 `cachedNames`
if (!cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
// 缓存拓展实现类到 `extensionClasses`
Class<?> c = extensionClasses.get(n);
if (c == null) {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}
}
}
}
}
} catch (Throwable t) {
// 发生异常,记录到异常集合
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
} // end of while read lines
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", class file: " + url + ") in " + url, t);
}
} // end of while urls
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", description file: " + fileName + ").", t);
}
}
(获得拓展加载器)getExtensionLoader(type) 静态方法,根据拓展点的接口,获得拓展加载器:
/**
* 拓展加载器集合
*
* key:拓展接口
*/
// 【静态属性】
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
/**
* 根据拓展点的接口,获得拓展加载器
*
* @param type 接口
* @param <T> 泛型
* @return 加载器
*/
@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
// 必须是接口
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
// 必须包含 @SPI 注解
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
// 获得接口对应的拓展点加载器
// 从 EXTENSION_LOADERS 静态中获取拓展接口对应的 ExtensionLoader 对象。
// 若不存在,则创建 ExtensionLoader 对象,并添加到 EXTENSION_LOADERS
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
}
构造方法:
/**
* 拓展接口。
* 例如,Protocol
*/
private final Class<?> type;
/**
* 对象工厂
*
* 用于调用 {@link #injectExtension(Object)} 方法,向拓展对象注入依赖属性。
*
* 例如,StubProxyFactoryWrapper 中有 `Protocol protocol` 属性。
*/
private final ExtensionFactory objectFactory;
private ExtensionLoader(Class<?> type) {
this.type = type;
//当拓展接口非 ExtensionFactory 时( 如果不加这个判断,会是一个死循环 ),调用 ExtensionLoader的getAdaptiveExtension() 方法,获得 ExtensionFactory 拓展接口的自适应拓展实现对象。
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
getExtension() 方法,返回指定名字的扩展对象。如果指定名字的扩展不存在,则抛异常 IllegalStateException (有就去粗来,没有就创造一个塞进去):
/**
* 缓存的拓展对象集合
*
* key:拓展名
* value:拓展对象
*
* 例如,Protocol 拓展
* key:dubbo value:DubboProtocol
* key:injvm value:InjvmProtocol
*
* 通过 {@link #loadExtensionClasses} 加载
*/
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();
/**
* Find the extension with the given name. If the specified name is not found, then {@link IllegalStateException}
* will be thrown.
*/
/**
* 返回指定名字的扩展对象。如果指定名字的扩展不存在,则抛异常 {@link IllegalStateException}.
*
* @param name 拓展名
* @return 拓展对象
*/
@SuppressWarnings("unchecked")
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
// 查找 默认的 拓展对象
if ("true".equals(name)) {
return getDefaultExtension();
}
// 从 缓存中 获得对应的拓展对象
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
// 从 缓存中 未获取到,进行创建缓存对象。
if (instance == null) {
instance = createExtension(name);
// 设置创建对象到缓存中
holder.set(instance);
}
}
}
return (T) instance;
}
createExtension(name) 方法,创建拓展名的拓展对象,并缓存(找拓展名对应的拓展实现类,如果没有就抛异常):
/**
* 拓展实现类集合
*
* key:拓展实现类
* value:拓展对象。
*
* 例如,key 为 Class<AccessLogFilter>
* value 为 AccessLogFilter 对象
*/
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
/**
* 创建拓展名的拓展对象,并缓存。
*
* @param name 拓展名
* @return 拓展对象
*/
@SuppressWarnings("unchecked")
private T createExtension(String name) {
// 获得拓展名对应的拓展实现类
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name); // 抛出异常
}
try {
// 从缓存中,获得拓展对象。
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 当缓存不存在时,创建拓展对象,并添加到缓存中。
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 注入依赖的属性
injectExtension(instance);
// 创建 Wrapper 拓展对象
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
injectExtension(instance) 方法,注入依赖的属性:
/**
* 注入依赖的属性
*
* @param instance 拓展对象
* @return 拓展对象
*/
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) { // setting && public 方法
// 获得属性的类型
Class<?> pt = method.getParameterTypes()[0];
try {
// 获得属性
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
// 获得属性值
Object object = objectFactory.getExtension(pt, property);
// 设置属性值
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
} }
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}