1、接收ActiveMq类
package com.roncoo.pay.app.queue;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import com.roncoo.pay.app.queue.bankmessage.BankMessageFixedThreadPool;
import com.roncoo.pay.app.queue.bankmessage.BankMessageTask;
import com.roncoo.pay.app.queue.bankmessage.biz.BankMessageBiz;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.SessionAwareMessageListener;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.fastjson.JSONObject;
import com.roncoo.pay.common.core.exception.BizException;
public class BankMessageListener implements SessionAwareMessageListener<Message> {
private static final Log LOG = LogFactory.getLog(BankMessageListener.class);
@Autowired
private JmsTemplate notifyJmsTemplate;
@Autowired
private BankMessageBiz bankMessageBiz;
public synchronized void onMessage(Message message, Session session) {
//接收 activemq消息
Map<String,String> param = null;
String strMessage = null;
try {
ActiveMQTextMessage objectMessage = (ActiveMQTextMessage) message;
strMessage = objectMessage.getText();
LOG.info("strMessage1 bank:" + strMessage);
param = JSONObject.parseObject(strMessage, Map.class);
//封装成任务
BankMessageTask bankMessageTask = new BankMessageTask(param);
bankMessageTask.setBankMessageBiz(bankMessageBiz);
// 把任务加到线程池
BankMessageFixedThreadPool.addTask(bankMessageTask);
} catch (RpcException e) {
LOG.error("==>RpcException", e);
} catch (BizException e) {
// 业务异常,不再写会队列
LOG.error("==>BizException", e);
} catch (Exception e) {
// 不明异常不再写会队列
LOG.error("==>Exception", e);
}
}
public JmsTemplate getNotifyJmsTemplate() {
return notifyJmsTemplate;
}
public void setNotifyJmsTemplate(JmsTemplate notifyJmsTemplate) {
this.notifyJmsTemplate = notifyJmsTemplate;
}
public BankMessageBiz getBankMessageBiz() {
return bankMessageBiz;
}
public void setBankMessageBiz(BankMessageBiz bankMessageBiz) {
this.bankMessageBiz = bankMessageBiz;
}
}
2、BankMessageFixedThreadPool 线程池类
package com.roncoo.pay.app.queue.bankmessage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
*
* @author Peter
*
*/
public class BankMessageFixedThreadPool {
private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
public static void addTask(BankMessageTask bankMessageTask){
fixedThreadPool.execute(bankMessageTask);
}
}
3、BankMessageTask 类
package com.roncoo.pay.app.queue.bankmessage;
import com.roncoo.pay.app.queue.bankmessage.biz.BankMessageBiz;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Map;
/**
*
*
*
*/
public class BankMessageTask implements Runnable {
private static final Log LOG = LogFactory.getLog(BankMessageTask.class);
@Autowired
private BankMessageBiz bankMessageBiz;
private Map<String , String> notifyMessageMap;
public BankMessageTask(Map<String , String> notifyMessageMap){
this.notifyMessageMap = notifyMessageMap;
}
/**
*执行业务逻辑
*/
@Override
public void run() {
bankMessageBiz.completePay(notifyMessageMap);
}
public Map<String, String> getNotifyMessageMap() {
return notifyMessageMap;
}
public void setNotifyMessageMap(Map<String, String> notifyMessageMap) {
this.notifyMessageMap = notifyMessageMap;
}
public BankMessageBiz getBankMessageBiz() {
return bankMessageBiz;
}
public void setBankMessageBiz(BankMessageBiz bankMessageBiz) {
this.bankMessageBiz = bankMessageBiz;
}
}
4、具体业务逻辑
package com.roncoo.pay.app.queue.bankmessage.biz;
import com.roncoo.pay.service.message.api.RpTransactionMessageService;
import com.roncoo.pay.service.trade.api.RpTradePaymentManagerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Map;
/**
*
*/
public class BankMessageBiz {
private static final Log LOG = LogFactory.getLog(BankMessageBiz.class);
@Autowired
private RpTradePaymentManagerService rpTradePaymentManagerService;
@Autowired
private RpTransactionMessageService rpTransactionMessageService;
public void completePay(Map<String , String > notifyMessageMap){
String messageId = notifyMessageMap.get("messageId");
String payWayCode = notifyMessageMap.get("payWayCode");
//调用业务方法,完成交易
try{
rpTradePaymentManagerService.completeScanPay(payWayCode, notifyMessageMap);
//删除消息
rpTransactionMessageService.deleteMessageByMessageId(messageId);
}catch (Exception e){
LOG.error("完成支付结果异常:",e);
}
}
public RpTradePaymentManagerService getRpTradePaymentManagerService() {
return rpTradePaymentManagerService;
}
public void setRpTradePaymentManagerService(RpTradePaymentManagerService rpTradePaymentManagerService) {
this.rpTradePaymentManagerService = rpTradePaymentManagerService;
}
public RpTransactionMessageService getRpTransactionMessageService() {
return rpTransactionMessageService;
}
public void setRpTransactionMessageService(RpTransactionMessageService rpTransactionMessageService) {
this.rpTransactionMessageService = rpTransactionMessageService;
}
}
扫描二维码关注公众号,回复:
1918702 查看本文章