public class ActiveMQTxMessageProducer extends AbsActiveMQTxMessageProducer { private QMessageService qMessageService; public QMessageService getqMessageService() { return qMessageService; } public void setqMessageService(QMessageService qMessageService) { this.qMessageService = qMessageService; } /** * 事务消息处理成功后的处理 * 事务消息处理成功,从消息表中删除对应的消息 * * @param messageId 消息id */ public void onSuccess(String messageId) { log.info("tx message--{}--commit success", messageId); //事务消息处理成功后,删除数据库中对应的消息 qMessageService.deleteQMessage(messageId); } /** * 事务消息处理失败后的处理 * 事务消息处理失败户,进行日志相关记录 * 或者其他相关数据库操作 * * @param e 消息发送异常 * @param messageId 消息id */ public void onFail(Exception e, String messageId) { log.error("send tx message:{} ,error:{}", messageId, e.getMessage()); } public ActiveMQTxMessageProducer() { } public ActiveMQTxMessageProducer(Builder builder) { this.brokerUrl = builder.brokerUrl; this.userName = builder.userName; this.password = builder.password; this.destName = builder.destName; this.n2 = builder.n2; this.qMessageService = builder.qMessageService; } public static Builder builder() { return new Builder(); } public static class Builder { private String brokerUrl; private String userName; private String password; private String destName; private boolean n2; private QMessageService qMessageService; public Builder() { } public Builder brokerUrl(String brokerUrl) { this.brokerUrl = brokerUrl; return this; } public Builder userName(String userName) { this.userName = userName; return this; } public Builder password(String password) { this.password = password; return this; } public Builder destName(String destName) { this.destName = destName; return this; } public Builder n2(boolean n2) { this.n2 = n2; return this; } public Builder qMessageService(QMessageService qMessageService) { this.qMessageService = qMessageService; return this; } public ActiveMQTxMessageProducer build() throws Exception { ActiveMQTxMessageProducer producer = new ActiveMQTxMessageProducer(this); producer.afterPropertiesSet(); return producer; } } }
调用:
//启动时初始化生产者 ActiveMQTxMessageProducer producer = producerCache.get(destName); if (producer == null) { producer = ActiveMQTxMessageProducer.builder() .brokerUrl(brokerConfig.getBrokerUrl()) .userName(brokerConfig.getUserName()) .password(brokerConfig.getPassword()) .destName(destName) .n2(n2) .qMessageService(qMessageService) .build(); //加入缓存中 producerCache.set(destName, producer); }