前言
在xxl-job中存在调度中心远程调用执行器的过程, 在此可以很清晰的学习到远程调用的思想。
常规的远程调用协议,dubbo, thrift , Protocol Buffer, 以及更轻量级的hessian协议。 这些市面上大同小异的
框架,在我们集成这些框架的时候,我们总会在调用端引用服务端暴露出来的接口,然后通过代理工厂创建代理对象
最终通过spring的自动注入的方式,在我们的容器里面生成代理对象,然后当我们需要使用的时候,直接使用代理对象,
就可以调用到另一个系统里面的服务了, 常规的配置我们都知道,但是真正内部是怎么实现的,为何我们调用代理
对象的方法,就能调用到远程系统的服务,这是我们值得思考的。 此处主要以xxl-job为原型,讲解这种模式, 其他专业
的RPC框架肯定比这个更加复杂, 所以这里主要是讲解一个思想。
例:当A系统需要调用B系统的方法时,我们通常的做法是这样的。
B系统 暴露一个interface, 如: exampleInterface.jar
A系统引用exampleInterface.jar , 同时在spring.xml 文件中配置B系统暴露出来的接口类(例:userService)
最终我们的使用方式如下:
@Authwired
UserService userService ;
public
User getUsers(){
return
userService.getUsers()
}
|
上面那一段,非常简短的代码, 还原了一个非常简短的系统之间的调用, 但是为什么在A系统调用userService.getUsers() ,
这个方法最终就能调用到B系统去呢?
代理对象生成
在xxl-job中调度中心需要调用执行器的代码, 下面来仔细分析一下、
public
static
ExecutorBiz getExecutorBiz(String address)
throws
Exception {
// valid
if
(address==
null
|| address.trim().length()==
0
) {
return
null
;
}
// load-cache
address = address.trim();
// 从内存中获取执行器对象
ExecutorBiz executorBiz = executorBizRepository.get(address);
if
(executorBiz !=
null
) {
return
executorBiz;
}
// 通过代理类,创建代理对象,主要看这个创建过程
executorBiz = (ExecutorBiz)
new
NetComClientProxy(ExecutorBiz.
class
, address, accessToken).getObject();
// 放入内存中,以便下次调用
executorBizRepository.put(address, executorBiz);
return
executorBiz;
}
|
从下面的代码中,可以看到NetComClientProxy 这个代理类,是一个factoryBean, 所以主要看他的getObject方法即可
public
class
NetComClientProxy
implements
FactoryBean<Object> {
private
static
final
Logger logger = LoggerFactory.getLogger(NetComClientProxy.
class
);
private
Class<?> iface;
private
String serverAddress;
private
String accessToken;
private
JettyClient client =
new
JettyClient();
public
NetComClientProxy(Class<?> iface, String serverAddress, String accessToken) {
this
.iface = iface;
this
.serverAddress = serverAddress;
this
.accessToken = accessToken;
}
@Override
public
Object getObject()
throws
Exception {
return
Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(),
new
Class[] { iface },
new
InvocationHandler() {
@Override
public
Object invoke(Object proxy, Method method, Object[] args)
throws
Throwable {
// filter method like "Object.toString()"
if
(Object.
class
.getName().equals(method.getDeclaringClass().getName())) {
logger.error(
">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]"
, method.getDeclaringClass().getName(), method.getName());
throw
new
RuntimeException(
"xxl-rpc proxy class-method not support"
);
}
//生成这个代理对象的invoke方法里面,并没有调用目标类的方法,而是将目标的信息发送给远程服务器。
RpcRequest request =
new
RpcRequest();
request.setServerAddress(serverAddress);
// 服务器地址
request.setCreateMillisTime(System.currentTimeMillis());
// 创建时间, 用于判断请求是否超时
request.setAccessToken(accessToken);
// 数据校验
request.setClassName(method.getDeclaringClass().getName());
// 将目标类的class名称传给执行器,让那边来创建对象,并执行逻辑代码
request.setMethodName(method.getName());
// 方法名称为run
request.setParameterTypes(method.getParameterTypes());
// 参数类型
request.setParameters(args);
// 参数
RpcResponse response = client.send(request);
// 发送HTTP请求
// valid response
if
(response ==
null
) {
logger.error(
">>>>>>>>>>> xxl-rpc netty response not found."
);
throw
new
Exception(
">>>>>>>>>>> xxl-rpc netty response not found."
);
}
if
(response.isError()) {
throw
new
RuntimeException(response.getError());
}
else
{
return
response.getResult();
}
}
});
}
@Override
public
Class<?> getObjectType() {
return
iface;
}
@Override
public
boolean
isSingleton() {
return
false
;
}
}
|
从上面可以清晰的看到,生成的代理对象里面的 InvocationHandler 中的invoke方法,并没有真实的执行方法,而是将类名,方法名,参数等信息发送给远程服务器。
仅仅只是做了这些操作而已。 这就是他的设计比较巧妙的地方。
下面可以看一下,远程服务器拿到请求之后如何处理的。
public
static
RpcResponse invokeService(RpcRequest request, Object serviceBean) {
// request中的数据结构
if
(serviceBean==
null
) {
// 这个serviceBean 就是在执行器启动的时候,initExecutorServer () 这个方法中,将一个ExecutorBiz的实例放进去了,此处通过
// classname来获取这个实例
serviceBean = serviceMap.get(request.getClassName());
}
if
(serviceBean ==
null
) {
// TODO
}
RpcResponse response =
new
RpcResponse();
// 判断是否超时
if
(System.currentTimeMillis() - request.getCreateMillisTime() >
180000
) {
response.setResult(
new
ReturnT<String>(ReturnT.FAIL_CODE,
"The timestamp difference between admin and executor exceeds the limit."
));
return
response;
}
// 数据校验,验证token是否匹配,前提是token不为空
if
(accessToken!=
null
&& accessToken.trim().length()>
0
&& !accessToken.trim().equals(request.getAccessToken())) {
response.setResult(
new
ReturnT<String>(ReturnT.FAIL_CODE,
"The access token["
+ request.getAccessToken() +
"] is wrong."
));
return
response;
}
try
{
// 获取class
Class<?> serviceClass = serviceBean.getClass();
// 拿到请求中的方法名字, 此处这个值 是 run 方法
String methodName = request.getMethodName();
//方法类型
Class<?>[] parameterTypes = request.getParameterTypes();
// 方法参数
Object[] parameters = request.getParameters();
// spring的工具类, 创建一个fastClass 实例
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
// 拿到方法之后执行方法的invoke ,
Object result = serviceFastMethod.invoke(serviceBean, parameters);
// 返回执行结果
response.setResult(result);
}
catch
(Throwable t) {
t.printStackTrace();
response.setError(t.getMessage());
}
return
response;
}
|
由此我们就可以知道,远程调用的大致思想。