RabbitMQ中间件方案:BPM-SAP的应用解耦和用户并发量控制(基于SpringBoot)

一、高耦合架构

原始架构图:
在这里插入图片描述
存在问题:
当大量用户同时访问BPM并查询SAP数据时,BPM直接通过RFC(Remote Function Call)方式调用SAP接口,与SAP建立了过多的JCo(Java Connection),如果这些JCo没有及时清除,会导致后续登录的用户没有可用的JCo,从而无法获取SAP数据。

二、解决方案

调整后的架构图:
在这里插入图片描述
调整步骤如下:
(1)将BPM的RFC调用功能抽取并拆分为一个独立的项目,置于BPM和SAP之间;回收BPM的RFC调用功能,也就回收了BPM用户建立JCo的功能,这样就不会出现一个用户(Session)对应一个JCo的情况。
(2)将RabbitMQ服务器置于BPM和RFC项目之间,剥夺了BPM用户建立JCo的功能,改为根据消息队列建立JCo,这样就不会出现一个用户(Session)对应一个JCo的情况,而是一条MQ对应一个JCo。
(3)将创建请求队列、携带请求JSON数据、监听响应JSON数据的功能抽取并交给独立的API项目,将API项目置于BPM和RabbitMQ服务器之间。
(4)将创建响应队列、携带响应JSON数据、监听请求JSON数据的功能交给RFC项目。

RabbitMQ的用户并发控制功能:
将一个用户(Session)对应一个JCo变为一条MQ对应一个JCo。即使很多BPM用户并发调用SAP接口,在RabbitMQ的控制下,用户的并发量=JCo的数量 变为 MQ的数量=JCo数量,大大减少了并发建立的JCo。

三、RabbitMQ架构存在的问题

结合架构图思考以下场景:
假设现在只创建了两条消息队列:一条为请求队列,一条为响应队列。
用户发起请求,请求到达BPM项目,请求体中的JSON数据先后经过BPM项目的StringEntity和HttpPost对象封装,由HttpClient执行访问API项目。
在API项目中,API在启动时创建一条请求队列;在控制层接收到来自BPM的请求JSON数据后,将该JSON数据发送到请求队列中;与此同时,API也开始监听响应队列。
在RFC项目中,RFC在启动时创建一条响应队列;在监听到请求队列后,获取其中的请求JSON数据,解析这些JSON数据,并使用RFC调用SAP接口,查询得到响应JSON数据;随后,将响应JSON数据发送到响应队列中。
回到API项目,在实时监听响应队列的过程中,只要发现响应JSON数据不为NULL,就在控制层返回响应JSON数据给BPM,BPM再返回给客户端。
当只有一个用户发起一次请求,上述过程没有任何问题,但是当有多条请求时,就容易出现错误消费的情况。
比如,用户A和用户B同时发起请求,几乎同时从SAP中查到响应JSON数据(RESPONSE_JSON_A和RESPONSE_JSON_B),但由于只有一条响应队列,所以发送到响应队列的JSON数据只能是二者之一,假设是RESPONSE_JSON_A,所以API监听到的也是RESPONSE_JSON_A,那么用户A和B在客户端接收到的响应都是RESPONSE_JSON_A,B用户就懵了,因为他明明想获取RESPONSE_JSON_B啊。
我的解决方案
为了避免上述出现的消费错误的情况,可以在请求JSON数据中加入标记值,并在查询出SAP响应数据后,在响应JSON中也加入该标记值,在API项目中,在监听到响应队列携带的响应JSON数据后,进行请求和响应数据的标记校对,当两者标记值能够对应上,再返回JSON数据给BPM。
关键代码
(1)rabbit-api项目
控制层代码:

package com.jake.rabbitapi.controller;

import com.jake.rabbitapi.constants.Queues;
import com.jake.rabbitapi.consumer.ResponseMapConsumer;
import com.jake.rabbitapi.producer.RequestMapProducer;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;
import java.util.Random;
import java.util.UUID;

@RestController
@RequestMapping(value = "/rfc")
public class BpmToRfcController {

    @Autowired
    private RequestMapProducer requestMapProducer;

    @Autowired
    private ResponseMapConsumer responseMapConsumer;

    @RequestMapping(value = "/json")
    public Map<String, Object> getResponseMap(@RequestBody Map<String, Object> requestMap) throws InterruptedException {
        String requestID = UUID.randomUUID().toString();
        int reqQNo = new Random().nextInt(4) + 1;
        requestMap.put("requestID", requestID);
        requestMap.put("reqQNo", reqQNo);
        requestMapProducer.sendRequestMap(Queues.REQUEST_QUEUE + reqQNo, requestMap);
        Map<String, Object> responseMap = responseMapConsumer.getResponseMap();
        // 不断获取@RabbitListener监听到的响应数据,响应数据不为NULL时退出循环。
        while (responseMap == null) {
            Thread.sleep(100);
            responseMap = responseMapConsumer.getResponseMap();
        }
        String responseID = (String) responseMap.get("responseID");
        int resQNo = (int) responseMap.get("resQNo");
        // 不断获取@RabbitListener监听到的响应数据,当标记值相同时退出循环。
        while (!StringUtils.equals(requestID, responseID) && reqQNo != resQNo) {
            Thread.sleep(100);
            responseMap = responseMapConsumer.getResponseMap();
            responseID = (String) responseMap.get("responseID");
            resQNo = (int) responseMap.get("resQNo");
        }
        return responseMap;
    }

}

生产者代码:

package com.jake.rabbitapi.producer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
public class RequestMapProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendRequestMap(String queueName, Map<String, Object> requestMap) {
        rabbitTemplate.convertAndSend(queueName, requestMap);
    }

}

消费者代码:

package com.jake.rabbitapi.consumer;

import com.jake.rabbitapi.constants.Queues;
import lombok.Getter;
import lombok.Setter;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
@RabbitListener(queues = {Queues.RESPONSE_QUEUE + 1, Queues.RESPONSE_QUEUE + 2,
                Queues.RESPONSE_QUEUE + 3, Queues.RESPONSE_QUEUE + 4, Queues.RESPONSE_QUEUE + 5})
public class ResponseMapConsumer {

    @Getter
    @Setter
    private Map<String, Object> responseMap;

    @RabbitHandler
    public void listenResponseMap(Map<String, Object> responseMap) {
        setResponseMap(responseMap);
    }

}

(2)rabbit-rfc项目
生产者代码:

package com.jake.rabbitrfc.producer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
public class RequestMapProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendResponseMap(String queueName, Map<String, Object> responseSapJson) {
        rabbitTemplate.convertAndSend(queueName, responseSapJson);
    }

}

消费者代码:

package com.jake.rabbitrfc.consumer;

import com.jake.rabbitrfc.constant.Queues;
import com.jake.rabbitrfc.producer.RequestMapProducer;
import com.jake.rabbitrfc.service.BpmToSapService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
@RabbitListener(queues = {Queues.REQUEST_QUEUE + 1, Queues.REQUEST_QUEUE + 2,
                Queues.REQUEST_QUEUE + 3, Queues.REQUEST_QUEUE + 4, Queues.REQUEST_QUEUE + 5})
public class RequestMapConsumer {

    @Autowired
    private RequestMapProducer requestMapProducer;

    @Autowired
    private BpmToSapService bpmToSapService;

    @RabbitHandler
    public void listenRequestSapJson(Map<String, Object> requestMap) {
        Map<String, Object> responseMap = bpmToSapService.getSapJson(requestMap);
        String requestID = (String) requestMap.get("requestID");
        int reqQNo= (int) requestMap.get("reqQNo");
        responseMap.put("responseID", requestID);
        responseMap.put("resQNo", reqQNo);
        requestMapProducer.sendResponseMap(Queues.RESPONSE_QUEUE + reqQNo, responseMap);
    }

}

业务部分相关代码:

getSapJson(Map<String, Object> requestMap)

可以参考博客《SAP统一连接接口(RESTFUL风格)》

四、springboot默认数据源Hikari的连接池存在的问题

参考自己的博客《进阶之路:Java Web开发DEBUG实录》
BUG2:Hikari数据库连接池报错
HikariPool Connection is not available, request timed out after 60003ms

Debug
通过追踪日志,发现提示:cannot acquire JDBC connection
猜想可能是因为数据源连接池的最大连接数不足,连接池甚至来不及通过connection.close()方法回收已建立的连接,客户端便发起新的Web请求查询数据库,建立了新的数据库连接。于是连接池中的连接很快被耗尽,无法再响应客户端新发起的查询数据库的请求。
在springboot的application.properties中添加如下配置:

spring.datasource.hikari.maximum-pool-size=150

扩充最大连接数,问题解决。

猜你喜欢

转载自blog.csdn.net/qq_15329947/article/details/85298978