一. 项目需求描述
最近公司不断在与客户外部系统对接,订单是其中的一个对接任务。公司现有一个专门负责处理订单的中心系统,但订单来源都是通过定时任务去其他系统拉取的。前不久完善的一版旧系统,加入了商城的业务,也同步了外部系统的订单,所以成为了订单的源头。现在就需要对订单系统进行改变,从原来的拉取订单变更为主动的接收订单,所以我们使用的了阿里云的消息服务。
二. 项目业务设计
我们有三个系统涉及到订单:A. java商城系统,B. ocs订单中心系统;C. php商城系统。现在A系统可以拿到所有的订单来源,所以在各个订单出发点发送消息服务,也就是A系统是消息生产者。在B系统,我们订阅A系统发送的消息,从而消费消息,将订单入库,即B系统是消息消费者。
三. 项目MQ服务配置
在第一篇博客中,我们了解到消息队列的模式分为队列和主题两种,我们采用的是主题模式。阿里的消息服务用起来也很方便,直接在管理控制台去配置一下即可。
1. 新建topic
2. 申请发布,即消息生产者
3. 申请订阅,即消息消费者
四. 项目MQ Java SDK
将mq加入到项目中,也比较简单,官网上也有对应的demo。
1. 消息生产者工具类:读取配置文件中在消息服务控制台配置好的topic,producer及一些密钥,创建消息生产者。
public OrderProducer getPlusOrderProducer() {
if (orderProducer != null) {
return orderProducer
}
Properties producerProperties = new Properties()
producerProperties.setProperty(PropertyKeyConst.ProducerId, Global.getConfig("PID_UQI_ORDER_UPLUS"))
producerProperties.setProperty(PropertyKeyConst.AccessKey, Global.getConfig("ACCESS_KEY"))
producerProperties.setProperty(PropertyKeyConst.SecretKey, Global.getConfig("SECRET_KEY"))
producerProperties.setProperty(PropertyKeyConst.ONSAddr, Global.getConfig("ONSADDR"))
orderProducer = ONSFactory.createOrderProducer(producerProperties)
orderProducer.start()
logger.debug("uplus订单生产者启动...")
return orderProducer
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
2. 推送消息业务类:获取消息生产者实例,发送消息。
public void pushMsg(WmsOrder orderInfo) {
byte[] bytes = null;
Message message = null;
SendResult sendResult = null;
OrderProducer producer = MqUtils.getInstance().getPlusOrderProducer();
bytes = new Gson().toJson(orderInfo).getBytes();
message = new Message(Global.getConfig("TID_UQI_ORDER_UPLUS"), Global.getConfig("TAG_UQI_ORDER_UPLUS"), bytes);
try {
sendResult = producer.send(message, Global.getConfig("TAG_UQI_ORDER_UPLUS"));
if (sendResult != null) {
logger.debug(new Date() + " 发送消息成功! Topic 是:" + message.getTopic() + " msgId 是: " + message.getMsgID() + " msgTag 是:" + message.getTag());
logger.info("消息内容是" + message.getBody());
}
} catch (Exception e) {
logger.debug(new Date() + " Send mq message failed. Topic is:" + message.getTopic());
e.printStackTrace();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
3. 消息消费者工具类:读取配置文件中在消息服务控制台配置好的topic,consumer及一些密钥,创建消息消费者。
public OrderConsumer getPlusOrderConsumer() {
String topic = Global.getConfig("TID_UQI_ORDER_UPLUS")
String subExpression = Global.getConfig("TAG_UQI_ORDER_UPLUS")
Properties consumerProperties = new Properties()
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, Global.getConfig("CID_UQI_ORDER_UPLUS"))
consumerProperties.setProperty(PropertyKeyConst.AccessKey, Global.getConfig("ACCESS_KEY"))
consumerProperties.setProperty(PropertyKeyConst.SecretKey, Global.getConfig("SECRET_KEY"))
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, Global.getConfig("ONSADDR"))
// 广播订阅方式设置
consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING)
OrderConsumer orderConsumer = ONSFactory.createOrderedConsumer(consumerProperties)
orderConsumer.subscribe(topic, subExpression, new MessageOrderListener() {
@Override
public OrderAction consume(Message message, ConsumeOrderContext context) {
try {
logger.info(new String(message.getBody(), "UTF-8"))
Gson gson = new Gson()
WmsOrder wmsOrder = gson.fromJson(new String(message.getBody(), "UTF-8"), WmsOrder.class)
final WmsOrderService wmsOrderService = FrameSpringBeanUtil.getBean(WmsOrderService.class)
wmsOrderService.synUplusOrder(wmsOrder)
return OrderAction.Success
} catch (Exception e) {
e.printStackTrace()
return OrderAction.Suspend
}
}
})
orderConsumer.start()
logger.info("订单消费者启动...")
return orderConsumer
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
4. 添加监听器,在项目启动时,将消费者启动,在web.xml中配置监听器
public class ConfigListener implements ServletContextListener {
@Override
public void contextDestroyed(ServletContextEvent context) {
}
@Override
public void contextInitialized(ServletContextEvent context) {
MqConsumerUtil.getInstance().getPlusOrderConsumer();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
<listener>
<listener-class>com.uqiauto.ocs.listener.ConfigListener</listener-class>
</listener>
五. 项目MQ 使用总结
使用消息服务,这过程中也是比较顺利的,在这次负责订单中心mq的接入过程中,也学习到了很多东西,所以,做完后花个时间做个总结整理还是很有必要的。