1、配置文件
#MQ安装服务器地址
project.mq.host=127.0.0.1
#端口
project.mq.port=1419
#传输类型
project.mq.transportType=1
#(队列管理器名称)
project.mq.queue-manager=MQ_MY_SEND
#(通道名称)
project.mq.channel=SYSTEM.DEF.SVRCONN
#创建的MQ用户
project.mq.username=mqm
#创建的MQ用户连接密码
project.mq.password=xxxxxx
project.mq.receive-timeout= 20000
2 、JmsConfig
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.PropertySource;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
@Configuration
@PropertySource("classpath:ibm-mq.properties")
public class JmsConfig {
private static Logger logger = LoggerFactory.getLogger(JmsConfig.class);
@Value("${project.mq.host}")
private String host;
@Value("${project.mq.port}")
private Integer port;
@Value("${project.mq.queue-manager}")
private String queueManager;
@Value("${project.mq.channel}")
private String channel;
@Value("${project.mq.username}")
private String username;
@Value("${project.mq.password}")
private String password;
@Value("${project.mq.receive-timeout}")
private long receiveTimeout;
@Value("${project.mq.transportType}")
private int transportType;
@Bean
public MQQueueConnectionFactory mqQueueConnectionFactory() {
MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
mqQueueConnectionFactory.setHostName(host);
try {
mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
// mqQueueConnectionFactory.setCCSID(1383);// 表示是简体中文,
mqQueueConnectionFactory.setCCSID(1381);// 表示是简体中文,
// mqQueueConnectionFactory.setChannel(channel);
mqQueueConnectionFactory.setPort(port);
mqQueueConnectionFactory.setQueueManager(queueManager);
mqQueueConnectionFactory.setTransportType(transportType);
} catch (Exception e) {
logger.error("", e.getMessage(), e);
e.printStackTrace();
}
return mqQueueConnectionFactory;
}
@Bean
UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(MQQueueConnectionFactory mqQueueConnectionFactory) {
UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
/* //用户认证需要
userCredentialsConnectionFactoryAdapter.setUsername(username);
userCredentialsConnectionFactoryAdapter.setPassword(password);*/
userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
return userCredentialsConnectionFactoryAdapter;
}
@Bean
@Primary
public CachingConnectionFactory cachingConnectionFactory(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter);
cachingConnectionFactory.setSessionCacheSize(500);
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
@Bean
public PlatformTransactionManager jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory) {
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
jmsTransactionManager.setConnectionFactory(cachingConnectionFactory);
return jmsTransactionManager;
}
@Bean
public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
jmsTemplate.setReceiveTimeout(receiveTimeout);
return jmsTemplate;
}
public int getTransportType() {
return transportType;
}
public void setTransportType(int transportType) {
this.transportType = transportType;
}
3 、利用监听方式获得消息
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import java.io.Reader;
import java.io.StringReader;
//消息消费者的类上必须加上@Component,或者是@Service,这样的话,消息消费者类就会被委派给Listener类,原理类似于使用SessionAwareMessageListener以及MessageListenerAdapter来实现消息驱动POJO
@Component
public class ReceiveMessage extends MessageListenerAdapter {
private static Logger logger = LoggerFactory.getLogger(ReceiveMessage.class);
@Autowired
JmsOperations jmsOperations;
/**
* 自动监听队列名mq.destination
* @param msg
*/
@JmsListener(destination = "${mq.destination}")
public void onMessage(Message msg) {
String jmsCorrelationId = null;
String jmsMessageId = null;
logger.info("成功监听"+systemConfig.getMqDestination()+"消息队列,传来的值为:" + msg.toString());
String message = null;
try {
jmsCorrelationId = msg.getJMSCorrelationID();
jmsMessageId = msg.getJMSMessageID();
logger.info("jmsCorrelationId = "+ jmsCorrelationId);
if(msg instanceof TextMessage){//接收text消息
message = ((TextMessage) msg).getText();
}else if(msg instanceof BytesMessage){ //接收字节消息
BytesMessage bytesMessage = (BytesMessage)msg;
byte []bt = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(bt);
message = new String(bt, BankConstant.MQ_REVERT_CHARSET_TYPE);
}
} catch (JMSException e) {
logger.error("", e.getMessage(), e);
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
logger.error("", e.getMessage(), e);
e.printStackTrace();
}
logger.info("成功监听"+systemConfig.getMqDestination()+"消息队列,传来的值为[" + message+"]");
}
}
4 、多队列动态增加队列利用定时轮询方式获得队列消息
Spring Boot需要开启定时
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class BankApplication {
public static void main(String[] args) {
SpringApplication.run(BankApplication.class, args);
}
}
定时任务类
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsOperations;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class BankTask {
private static Logger logger = LoggerFactory.getLogger(BankTask.class);
private ExecutorService threadPool =Executors.newFixedThreadPool(20);
@Autowired
CommonConfig commonConfig;
@Autowired
JmsOperations jmsOperations;
/**
* 多队列定时获得队列消息
*/
//# 定时多久获得一次消息(设置这个值下面俩个发送和接收队列就不用设置了)
//task.message.cron: 0 */2 * * * ?
@Scheduled(cron = "${task.message.cron}")
public void receiveMessage(){
List<MqInfo> list = mqInfoBusiness.getCacheAllList(); // 读取数据库队列列表
if (list != null && list.size() > 0){
for (MqInfo mqInfo : list) {
try {
if (StringUtils.isNotBlank(mqInfo.getReceiveQueue())) {
threadPool.execute(new Runnable() {
@Override
public void run() {
mqInfoBusiness.receive(mqInfo);
}
});
}
}catch (Exception e){
logger.error("", e.getMessage(), e);
e.printStackTrace();
}
}
}
}
}
业务处理类
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsOperations;
import org.springframework.stereotype.Service;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import java.io.Reader;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.util.List;
@Service
public class MqInfoBusinessImpl implements IMqInfoBusiness {
private static Logger logger = LoggerFactory.getLogger(MqInfoBusinessImpl.class);
@Autowired
JmsOperations jmsOperations;
@Autowired
SystemConfig systemConfig;
@Override
public void receiveMessage(MqInfo mqInfo, Message msg) {
String jmsCorrelationId = null;
String jmsMessageId = null;
logger.info(mqInfo.toString());
logger.info("成功监听"+mqInfo.getReceiveQueue()+"消息队列,传来的值为:" + msg.toString());
String message = null;
try {
jmsCorrelationId = msg.getJMSCorrelationID();
jmsMessageId = msg.getJMSMessageID();
logger.info("jmsMessageId = ["+jmsMessageId+"] jmsCorrelationId = ["+ jmsCorrelationId+"]");
if(msg instanceof TextMessage){//接收text消息
message = ((TextMessage) msg).getText();
}else if(msg instanceof BytesMessage){ //接收字节消息
BytesMessage bytesMessage = (BytesMessage)msg;
byte []bt = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(bt);
message = new String(bt, BankConstant.MQ_REVERT_CHARSET_TYPE);
}
} catch (JMSException e) {
logger.error("", e.getMessage(), e);
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
logger.error("", e.getMessage(), e);
e.printStackTrace();
}
logger.info("成功监听"+mqInfo.getReceiveQueue()+"消息队列,传来的值为[" + message+"]");
if (StringUtils.isNotBlank(message)){
MqMessageLog mqMessageLog = new MqMessageLog();
mqMessageLog.setJmsCorrelationId(jmsCorrelationId);
mqMessageLog.setJmsMessageId(jmsMessageId);
mqMessageLog.setContent(message);
mqMessageLog.setQueue(mqInfo.getReceiveQueue());
mqMessageLog.setTheSender(CommonConstant.MESSAGE_TYPE_SEND_FISCAL);
try {
Reader reader = new StringReader(message.trim());
Unmarshaller unmarshaller = JAXBContext.newInstance(BaseTransInfoResponse.class).createUnmarshaller();
BaseTransInfoResponse baseTransInfoResponse = (BaseTransInfoResponse) unmarshaller.unmarshal(reader);
BeanUtils.copyProperties(baseTransInfoResponse, mqMessageLog);
if (baseTransInfoResponse != null && (StringUtils.isBlank(baseTransInfoResponse.getTransVerify()) || !baseTransInfoResponse.getTransVerify().equals("应答")) ){
if (TransTypeEnum.ZJJB.getType().equals(baseTransInfoResponse.getTransType())){
mqMessageLog.setMessageType(CommonConstant.MESSAGE_TYPE_RECEIVE);
Reader reader1 = new StringReader(message);
Unmarshaller unmarshaller1 = JAXBContext.newInstance(BaseTransInfoResponse.class, ZJJBTransInfoResponse.class).createUnmarshaller();
ZJJBTransInfoResponse jbTransInfoResponse = (ZJJBTransInfoResponse) unmarshaller1.unmarshal(reader1);
fiscalSendService.capitalReport(jbTransInfoResponse, jmsCorrelationId);
}
}
} catch (JAXBException e) {
logger.error("", e.getMessage(), e);
e.printStackTrace();
} catch (Exception e) {
logger.error("", e.getMessage(), e);
e.printStackTrace();
}
int curtDate = DateHelper.getCurrentNumericDate();
int curtTime = DateHelper.getCurrentNumericTime();
mqMessageLog.setCreateDate(curtDate);
mqMessageLog.setCreateTime(curtTime);
mqMessageLogService.insert(mqMessageLog);
}
}
@Override
public Message receive(MqInfo mqInfo) {
Message message = jmsOperations.receive(mqInfo.getReceiveQueue());
if (message != null) {
try {
this.receiveMessage(mqInfo, message);
return this.receive(mqInfo);
} catch (Exception e) {
e.printStackTrace();
logger.error("", e.getMessage(), e);
}
}
return message;
}
}
队列列表实体
@TableName("mq_info")
public class MqInfo extends Model<MqInfo> {
private static final long serialVersionUID = 1L;
/**
* .
*/
@TableField("id")
private Integer id;
/**
* 队列管理器.
*/
@TableField("qmgr")
private String qmgr;
/**
* MQ服务器IP.
*/
@TableField("mq_host")
private String mqHost;
/**
* MQ服务器端口.
*/
@TableField("mq_port")
private Integer mqPort;
/**
* 发送队列.
*/
@TableField("send_queue")
private String sendQueue;
/**
* 接受队列.
*/
@TableField("receive_queue")
private String receiveQueue;
/**
* 通道.
*/
@TableField("channel")
private String channel;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
@Override
protected Serializable pkVal() {
return this.id;
}
public String getQmgr() {
return qmgr;
}
public void setQmgr(String qmgr) {
this.qmgr = qmgr;
}
public String getMqHost() {
return mqHost;
}
public void setMqHost(String mqHost) {
this.mqHost = mqHost;
}
public Integer getMqPort() {
return mqPort;
}
public void setMqPort(Integer mqPort) {
this.mqPort = mqPort;
}
public String getSendQueue() {
return sendQueue;
}
public void setSendQueue(String sendQueue) {
this.sendQueue = sendQueue;
}
public String getReceiveQueue() {
return receiveQueue;
}
public void setReceiveQueue(String receiveQueue) {
this.receiveQueue = receiveQueue;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
}