RpcContext
Dubbo有一个特殊的类RpcContext
,它是一个ThreadLocal
的临时状态记录器,每个线程都独有一个RpcContext
实例,我们来看下它的源码:
public class RpcContext {
//......省略部分代码
/**ThreadLocal变量*/
private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
/**隐式参数集合attachments*/
private final Map<String, String> attachments = new HashMap<String, String>();
/**用于异步调用*/
private Future<?> future;
/**本地地址*/
private InetSocketAddress localAddress;
/**远程地址*/
private InetSocketAddress remoteAddress;
/**请求对象*/
private Object request;
/**结果对象*/
private Object response;
/**获取线程本地RpcContext实例*/
public static RpcContext getContext() {
return LOCAL.get();
}
}
RpcContext的主要作用有三个:
- 状态记录
- 传递隐式参数
- 异步调用
状态记录
由于RpcContext
使用ThreadLocal
变量来管理,所以它记录的是线程范围内的状态。在发起RPC请求或者接收RPC请求时,Dubbo
都会使用RpcContext
来记录状态信息,比如 A 调 B,B 再调 C,那么 A 机器上就是记录了A 调 B 的信息,对于 B 机器上,在 B 调 C 之前,RpcContext
记录的是 A 调 B 的信息,在 B 调 C 之后,RpcContext
记录的是 B 调 C 的信息。
例如对于消费者服务来说,可以在执行远程调用之后,通过RpcContext
获取到远程调用的目标地址:
//远程调用
List<UserAddress> userAddressList = userService.getUserAddressList(i+"");
//输出目标提供者服务地址
String serverIP = RpcContext.getContext().getRemoteHost();
System.out.println("serverIP:"+serverIP);
输出结果:
RESPONSE FROM 【192.168.0.149】:address:北京天安门广场,userId:1
serverIP:192.168.0.149
RESPONSE FROM 【192.168.0.205】:address:北京天安门广场,userId:2
serverIP:192.168.0.205
RESPONSE FROM 【192.168.0.149】:address:北京天安门广场,userId:3
serverIP:192.168.0.149
RESPONSE FROM 【192.168.0.205】:address:北京天安门广场,userId:4
serverIP:192.168.0.205
传递隐式参数
可以通过调用 RpcContext
上的 setAttachment
和 getAttachment
方法,在服务消费方和提供方之间进行参数的隐式传递。隐式参数可以传递多个,存储在 RpcContext
的attachments
集合。
从上面的流程图我们可以看到,由于发送方和接收方不处于一个线程范围(甚至都不在同一个进程范围),所以他们之间对于RpcContext
是不可见的,RpcContext
只是负责将隐式参数存储到attachments
集合,在发送数据时Dubbo
会将attachments
集合取出来通过Netty
框架发送到接收方,接收方再将隐式参数集合取出赋值到本地的RpcContext
的attachments
,这样接收方就可以在服务接口实现方法中获取到隐式参数。
其实attachments
集合变量和RpcContext
类里面的其他变量本质上并没有什么区别,只不过attachments
被写进了网络发送数据里面,而其他变量没有写进网络发送数据罢了。
消费者服务:
//设置隐式参数
RpcContext.getContext().setAttachment("index",i+"");
//远程调用
List<UserAddress> userAddressList = userService.getUserAddressList(i+"");
提供者服务:
@Override
public List<UserAddress> getUserAddressList(String userId) {
//......省略部分代码
String index = RpcContext.getContext().getAttachment("index");
System.out.println("index:"+index);
}
index:2
index:4
index:6
index:8
异步调用
当提供者服务处理某些远程接口耗时过长时,消费者服务可以采用异步调用的方式,异步获取响应结果,不用长时间阻塞消费者服务线程。Dubbo
的异步调用是借助RpcContext
实现的:
步骤1:消费者发起远程调用(用户线程)。
步骤2:将请求发送给提供者服务(IO线程),返回Future
(用户线程),同时Future和请求ID关联起来。
步骤3:将步骤2返回的Future
赋值给RpcContext
的future
变量(用户线程)。
步骤4:消费者服务获取到Future
,可以通过Future#get()
方法获取结果,但此时结果为null
(用户线程)。
步骤5:消费者等待Future#get()
返回结果(用户线程)。
步骤6:提供者服务处理完成返回结果(IO线程)。
步骤7:获取返回结果之后会从步骤2的映射存储集合中取出Future
,设置返回结果,这样步骤5就可以获取到返回结果了。
使用异步调用:
步骤一:使用async="true"
配置在接口或者方法标签上。
<dubbo:reference id="xxx" interface="xxx.xxx.Xxx" async="true">
</dubbo:reference>
步骤二:通过RpcContext
获取future
,异步等待结果返回。
List<UserAddress> userAddressList = userService.getUserAddressList(i+"");//远程调用
System.out.println(userAddressList);//结果为null
Future<Object> future = RpcContext.getContext().getFuture();
while (future.get() != null){
userAddressList = (List<UserAddress>)future.get(); //获取异步调用结果
if(userAddressList != null){
userAddressList.forEach((userAddress -> {
System.out.println("RESPONSE FROM 【"+userAddress.getIp()+"】:address:" +
""+userAddress.getUserAddress()+",userId:"+userAddress.getUserId());
}));
break;
}
Thread.sleep(1_000);
}
本地存根
消费者服务发起远程调用之后就只能等待结果的返回,所有处理逻辑都在提供者服务,如果消费者想在发起远程调用之前和之后做一些其他操作,比如参数验证、调用失败处理等等,该怎么办呢?比较戳的一种方式是消费者服务在每次发起远程调用前后都加上逻辑控制代码,这种方式也能实现,但是显然很麻烦也不优雅。
Dubbo
提供了本地存根功能,可以实现对服务接口类似AOP
控制的功能,下图是本地存根的实现逻辑图:
创建一个本地存根类XxxServiceStub
,它实现了服务接口XxxService
,并且构造方法注入了XxxService
接口实例(代理类XxxServiceProxy
),用户可以在XxxServiceStub
类实现对XxxService
的AOP
控制。
public class UserServiceStub implements UserService {
private final UserService userService;
// 构造函数传入真正的远程代理对象
public UserServiceStub(UserService userService){
this.userService = userService;
}
@Override
public List<UserAddress> getUserAddressList(String userId) {
try {
System.out.println("----UserServiceStub-----");
//参数校验或者ThreadLocal本地缓存等
return userService.getUserAddressList(userId);
} catch (Exception e) {
//容错处理
return new ArrayList<>();
}
}
}
当发起远程调用时,实际上是交给代理对象XxxServiceProxy
来处理,调用过程发生异常之后Dubbo
会通过XxxServiceMock
将异常向外抛出,我们可以在XxxServiceStub
捕获异常信息,进行容错处理。
定义了本地存根类之后,我们还需要在接口标签<dubbo:service/>
使用stub
属性配置之后才生效。
<dubbo:reference id="xxx" interface="xxx.xxx.Xxx"
stub="com.luke.dubbo.order.stub.UserServiceStub">
测试结果:
----UserServiceStub-----
RESPONSE FROM 【192.168.0.205】:address:北京天安门广场,userId:548
serverIP:192.168.0.205
----UserServiceStub-----
RESPONSE FROM 【127.0.0.1】:address:北京天安门广场,userId:549
serverIP:192.168.0.149
----UserServiceStub-----
RESPONSE FROM 【192.168.0.205】:address:北京天安门广场,userId:550
serverIP:192.168.0.205
延迟连接
延迟连接用于减少长连接数。当有调用发起时,再创建长连接。
<dubbo:protocol name="dubbo" lazy="true" />
粘滞连接
当提供者服务有多个实例时,默认情况下消费者服务在启动时底层会通过Netty网络框架依次连接每一个依赖的提供者服务:
这种连接是一种TCP长连接,依赖多少个提供者服务,消费者服务就需要保持多少个这样的TCP长连接,这是需要消耗不少资源的。Dubbo提供了粘滞连接功能,尽可能让客户端总是向同一提供者发起调用,除非该提供者挂了,再连另一台。同时粘带连接会自动开启延迟连接,以减少长连接数。
开启粘带连接:
<dubbo:reference id="xxx" interface="com.xxx.XxxService" sticky="true" />
或者
<dubbo:reference id="xxx" interface="com.xxx.XxxService">
<dubbo:mothod name="sayHello" sticky="true" />
</dubbo:reference>