Dubbo基础知识(三)

RpcContext

Dubbo有一个特殊的类RpcContext,它是一个ThreadLocal的临时状态记录器,每个线程都独有一个RpcContext实例,我们来看下它的源码:

public class RpcContext {

	//......省略部分代码

	/**ThreadLocal变量*/
    private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
        @Override
        protected RpcContext initialValue() {
            return new RpcContext();
        }
    };
    
    /**隐式参数集合attachments*/
    private final Map<String, String> attachments = new HashMap<String, String>();
    
    /**用于异步调用*/
    private Future<?> future;

	/**本地地址*/
    private InetSocketAddress localAddress;
	/**远程地址*/
    private InetSocketAddress remoteAddress;

	/**请求对象*/
    private Object request;
    /**结果对象*/
    private Object response;

	/**获取线程本地RpcContext实例*/
    public static RpcContext getContext() {
        return LOCAL.get();
    }
}

RpcContext的主要作用有三个:

  • 状态记录
  • 传递隐式参数
  • 异步调用

状态记录

由于RpcContext使用ThreadLocal变量来管理,所以它记录的是线程范围内的状态。在发起RPC请求或者接收RPC请求时,Dubbo都会使用RpcContext来记录状态信息,比如 A 调 B,B 再调 C,那么 A 机器上就是记录了A 调 B 的信息,对于 B 机器上,在 B 调 C 之前,RpcContext 记录的是 A 调 B 的信息,在 B 调 C 之后,RpcContext 记录的是 B 调 C 的信息。

例如对于消费者服务来说,可以在执行远程调用之后,通过RpcContext获取到远程调用的目标地址:

//远程调用
List<UserAddress> userAddressList = userService.getUserAddressList(i+"");
//输出目标提供者服务地址
String serverIP = RpcContext.getContext().getRemoteHost();
System.out.println("serverIP:"+serverIP);

输出结果:

RESPONSE FROM 【192.168.0.149】:address:北京天安门广场,userId:1
serverIP:192.168.0.149
RESPONSE FROM 【192.168.0.205】:address:北京天安门广场,userId:2
serverIP:192.168.0.205
RESPONSE FROM 【192.168.0.149】:address:北京天安门广场,userId:3
serverIP:192.168.0.149
RESPONSE FROM 【192.168.0.205】:address:北京天安门广场,userId:4
serverIP:192.168.0.205

传递隐式参数

可以通过调用 RpcContext 上的 setAttachmentgetAttachment 方法,在服务消费方和提供方之间进行参数的隐式传递。隐式参数可以传递多个,存储在 RpcContextattachments 集合。
在这里插入图片描述
从上面的流程图我们可以看到,由于发送方和接收方不处于一个线程范围(甚至都不在同一个进程范围),所以他们之间对于RpcContext是不可见的,RpcContext只是负责将隐式参数存储到attachments集合,在发送数据时Dubbo会将attachments集合取出来通过Netty框架发送到接收方,接收方再将隐式参数集合取出赋值到本地的RpcContextattachments,这样接收方就可以在服务接口实现方法中获取到隐式参数。
其实attachments集合变量和RpcContext类里面的其他变量本质上并没有什么区别,只不过attachments被写进了网络发送数据里面,而其他变量没有写进网络发送数据罢了。

消费者服务:

//设置隐式参数
RpcContext.getContext().setAttachment("index",i+"");
//远程调用
List<UserAddress> userAddressList = userService.getUserAddressList(i+"");

提供者服务:

@Override
public List<UserAddress> getUserAddressList(String userId) {
	//......省略部分代码
	String index = RpcContext.getContext().getAttachment("index");
	System.out.println("index:"+index);
}
index:2
index:4
index:6
index:8

异步调用

当提供者服务处理某些远程接口耗时过长时,消费者服务可以采用异步调用的方式,异步获取响应结果,不用长时间阻塞消费者服务线程。Dubbo的异步调用是借助RpcContext实现的:
在这里插入图片描述
步骤1:消费者发起远程调用(用户线程)。

步骤2:将请求发送给提供者服务(IO线程),返回Future(用户线程),同时Future和请求ID关联起来。

步骤3:将步骤2返回的Future赋值给RpcContextfuture变量(用户线程)。

步骤4:消费者服务获取到Future,可以通过Future#get()方法获取结果,但此时结果为null(用户线程)。

步骤5:消费者等待Future#get()返回结果(用户线程)。

步骤6:提供者服务处理完成返回结果(IO线程)。

步骤7:获取返回结果之后会从步骤2的映射存储集合中取出Future,设置返回结果,这样步骤5就可以获取到返回结果了。

使用异步调用:

步骤一:使用async="true"配置在接口或者方法标签上。

<dubbo:reference id="xxx" interface="xxx.xxx.Xxx" async="true">
</dubbo:reference>

步骤二:通过RpcContext获取future,异步等待结果返回。

List<UserAddress> userAddressList = userService.getUserAddressList(i+"");//远程调用
System.out.println(userAddressList);//结果为null

Future<Object> future = RpcContext.getContext().getFuture();
while (future.get() != null){
  userAddressList = (List<UserAddress>)future.get(); //获取异步调用结果
  if(userAddressList != null){
    userAddressList.forEach((userAddress -> {
      System.out.println("RESPONSE FROM 【"+userAddress.getIp()+"】:address:" +
              ""+userAddress.getUserAddress()+",userId:"+userAddress.getUserId());
    }));
    break;
  }

  Thread.sleep(1_000);
}

本地存根

消费者服务发起远程调用之后就只能等待结果的返回,所有处理逻辑都在提供者服务,如果消费者想在发起远程调用之前和之后做一些其他操作,比如参数验证、调用失败处理等等,该怎么办呢?比较戳的一种方式是消费者服务在每次发起远程调用前后都加上逻辑控制代码,这种方式也能实现,但是显然很麻烦也不优雅。
Dubbo提供了本地存根功能,可以实现对服务接口类似AOP控制的功能,下图是本地存根的实现逻辑图:
在这里插入图片描述
创建一个本地存根类XxxServiceStub,它实现了服务接口XxxService,并且构造方法注入了XxxService接口实例(代理类XxxServiceProxy),用户可以在XxxServiceStub类实现对XxxServiceAOP控制。

public class UserServiceStub implements UserService {

    private final UserService userService;

    // 构造函数传入真正的远程代理对象
    public UserServiceStub(UserService userService){
        this.userService = userService;
    }

    @Override
    public List<UserAddress> getUserAddressList(String userId) {
        try {
            System.out.println("----UserServiceStub-----");
            //参数校验或者ThreadLocal本地缓存等
            return userService.getUserAddressList(userId);
        } catch (Exception e) {
            //容错处理
            return new ArrayList<>();
        }
    }

}

当发起远程调用时,实际上是交给代理对象XxxServiceProxy来处理,调用过程发生异常之后Dubbo会通过XxxServiceMock将异常向外抛出,我们可以在XxxServiceStub捕获异常信息,进行容错处理。
定义了本地存根类之后,我们还需要在接口标签<dubbo:service/>使用stub属性配置之后才生效。

<dubbo:reference id="xxx" interface="xxx.xxx.Xxx"
 		stub="com.luke.dubbo.order.stub.UserServiceStub">

测试结果:

----UserServiceStub-----
RESPONSE FROM 【192.168.0.205】:address:北京天安门广场,userId:548
serverIP:192.168.0.205
----UserServiceStub-----
RESPONSE FROM 【127.0.0.1】:address:北京天安门广场,userId:549
serverIP:192.168.0.149
----UserServiceStub-----
RESPONSE FROM 【192.168.0.205】:address:北京天安门广场,userId:550
serverIP:192.168.0.205

延迟连接

延迟连接用于减少长连接数。当有调用发起时,再创建长连接。

<dubbo:protocol name="dubbo" lazy="true" />

粘滞连接

当提供者服务有多个实例时,默认情况下消费者服务在启动时底层会通过Netty网络框架依次连接每一个依赖的提供者服务:
在这里插入图片描述
这种连接是一种TCP长连接,依赖多少个提供者服务,消费者服务就需要保持多少个这样的TCP长连接,这是需要消耗不少资源的。Dubbo提供了粘滞连接功能,尽可能让客户端总是向同一提供者发起调用,除非该提供者挂了,再连另一台。同时粘带连接会自动开启延迟连接,以减少长连接数。
在这里插入图片描述
开启粘带连接:

<dubbo:reference id="xxx" interface="com.xxx.XxxService" sticky="true" />

或者

<dubbo:reference id="xxx" interface="com.xxx.XxxService">
    <dubbo:mothod name="sayHello" sticky="true" />
</dubbo:reference>

关注公众号了解更多原创博文

Alt

发布了122 篇原创文章 · 获赞 127 · 访问量 93万+

猜你喜欢

转载自blog.csdn.net/u010739551/article/details/105091839