IBM MQ集成Spring Boot2多队列发送及接受消息

1、配置文件

#MQ安装服务器地址
project.mq.host=127.0.0.1
#端口
project.mq.port=1419
#传输类型
project.mq.transportType=1
#(队列管理器名称)
project.mq.queue-manager=MQ_MY_SEND
#(通道名称)
project.mq.channel=SYSTEM.DEF.SVRCONN
#创建的MQ用户
project.mq.username=mqm
#创建的MQ用户连接密码
project.mq.password=xxxxxx
project.mq.receive-timeout= 20000

2 、JmsConfig

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.PropertySource;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;

@Configuration
@PropertySource("classpath:ibm-mq.properties")
public class JmsConfig {
    private static Logger logger = LoggerFactory.getLogger(JmsConfig.class);
    @Value("${project.mq.host}")
    private String host;
    @Value("${project.mq.port}")
    private Integer port;
    @Value("${project.mq.queue-manager}")
    private String queueManager;
    @Value("${project.mq.channel}")
    private String channel;
    @Value("${project.mq.username}")
    private String username;
    @Value("${project.mq.password}")
    private String password;
    @Value("${project.mq.receive-timeout}")
    private long receiveTimeout;
    @Value("${project.mq.transportType}")
    private int transportType;
    @Bean
    public MQQueueConnectionFactory mqQueueConnectionFactory() {
        MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
        mqQueueConnectionFactory.setHostName(host);
        try {
            mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
//            mqQueueConnectionFactory.setCCSID(1383);// 表示是简体中文,
            mqQueueConnectionFactory.setCCSID(1381);// 表示是简体中文,
//            mqQueueConnectionFactory.setChannel(channel);
            mqQueueConnectionFactory.setPort(port);
            mqQueueConnectionFactory.setQueueManager(queueManager);
            mqQueueConnectionFactory.setTransportType(transportType);
        } catch (Exception e) {
            logger.error("", e.getMessage(), e);
            e.printStackTrace();
        }
        return mqQueueConnectionFactory;
    }
    @Bean
    UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(MQQueueConnectionFactory mqQueueConnectionFactory) {
        UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
 /*       //用户认证需要
        userCredentialsConnectionFactoryAdapter.setUsername(username);
        userCredentialsConnectionFactoryAdapter.setPassword(password);*/
        userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
        return userCredentialsConnectionFactoryAdapter;
    }
    @Bean
    @Primary
    public CachingConnectionFactory cachingConnectionFactory(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter) {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter);
        cachingConnectionFactory.setSessionCacheSize(500);
        cachingConnectionFactory.setReconnectOnException(true);
        return cachingConnectionFactory;
    }
    @Bean
    public PlatformTransactionManager jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory) {
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
        jmsTransactionManager.setConnectionFactory(cachingConnectionFactory);
        return jmsTransactionManager;
    }
    @Bean
    public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        jmsTemplate.setReceiveTimeout(receiveTimeout);
        return jmsTemplate;
    }

    public int getTransportType() {
        return transportType;
    }

    public void setTransportType(int transportType) {
        this.transportType = transportType;
    }

3 、利用监听方式获得消息

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import java.io.Reader;
import java.io.StringReader;
//消息消费者的类上必须加上@Component,或者是@Service,这样的话,消息消费者类就会被委派给Listener类,原理类似于使用SessionAwareMessageListener以及MessageListenerAdapter来实现消息驱动POJO
@Component
public class ReceiveMessage extends MessageListenerAdapter {
    private static Logger logger = LoggerFactory.getLogger(ReceiveMessage.class);
    @Autowired
    JmsOperations jmsOperations;

    /**
     * 自动监听队列名mq.destination
     * @param msg
     */
    @JmsListener(destination = "${mq.destination}")
    public void onMessage(Message msg) {
        String jmsCorrelationId = null;
        String jmsMessageId = null;
        logger.info("成功监听"+systemConfig.getMqDestination()+"消息队列,传来的值为:" + msg.toString());
        String message = null;
        try {
            jmsCorrelationId = msg.getJMSCorrelationID();
            jmsMessageId = msg.getJMSMessageID();
            logger.info("jmsCorrelationId = "+ jmsCorrelationId);
            if(msg instanceof TextMessage){//接收text消息
                message = ((TextMessage) msg).getText();
            }else if(msg instanceof BytesMessage){   //接收字节消息
                BytesMessage bytesMessage = (BytesMessage)msg;
                byte []bt = new byte[(int) bytesMessage.getBodyLength()];
                bytesMessage.readBytes(bt);
                message =  new String(bt, BankConstant.MQ_REVERT_CHARSET_TYPE);
            }
        } catch (JMSException e) {
            logger.error("", e.getMessage(), e);
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            logger.error("", e.getMessage(), e);
            e.printStackTrace();
        }
        logger.info("成功监听"+systemConfig.getMqDestination()+"消息队列,传来的值为[" + message+"]");
    }

}

4 、多队列动态增加队列利用定时轮询方式获得队列消息

Spring Boot需要开启定时

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class BankApplication {

    public static void main(String[] args) {
        SpringApplication.run(BankApplication.class, args);
    }

}

定时任务类

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsOperations;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
public class BankTask {
    private static Logger logger = LoggerFactory.getLogger(BankTask.class);
    private ExecutorService threadPool =Executors.newFixedThreadPool(20);
    @Autowired
    CommonConfig commonConfig;
    @Autowired
    JmsOperations jmsOperations;


    /**
     * 多队列定时获得队列消息
     */
    //# 定时多久获得一次消息(设置这个值下面俩个发送和接收队列就不用设置了)
    //task.message.cron: 0 */2 * * * ?
    @Scheduled(cron = "${task.message.cron}")
    public void receiveMessage(){
        List<MqInfo> list = mqInfoBusiness.getCacheAllList(); // 读取数据库队列列表
        if (list != null && list.size() > 0){
            for (MqInfo mqInfo : list) {
                try {
                    if (StringUtils.isNotBlank(mqInfo.getReceiveQueue())) {
                        threadPool.execute(new Runnable() {
                            @Override
                            public void run() {
                                mqInfoBusiness.receive(mqInfo);
                            }
                        });
                    }
                }catch (Exception e){
                    logger.error("", e.getMessage(), e);
                    e.printStackTrace();
                }
            }
        }
    }

}

业务处理类

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsOperations;
import org.springframework.stereotype.Service;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import java.io.Reader;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.util.List;

@Service
public class MqInfoBusinessImpl implements IMqInfoBusiness {
    private static Logger logger = LoggerFactory.getLogger(MqInfoBusinessImpl.class);
    @Autowired
    JmsOperations jmsOperations;
    @Autowired
    SystemConfig systemConfig;


    @Override
    public void receiveMessage(MqInfo mqInfo, Message msg) {
        String jmsCorrelationId = null;
        String jmsMessageId = null;
        logger.info(mqInfo.toString());
        logger.info("成功监听"+mqInfo.getReceiveQueue()+"消息队列,传来的值为:" + msg.toString());
        String message = null;
        try {
            jmsCorrelationId = msg.getJMSCorrelationID();
            jmsMessageId = msg.getJMSMessageID();
            logger.info("jmsMessageId = ["+jmsMessageId+"] jmsCorrelationId = ["+ jmsCorrelationId+"]");
            if(msg instanceof TextMessage){//接收text消息
                message = ((TextMessage) msg).getText();
            }else if(msg instanceof BytesMessage){   //接收字节消息
                BytesMessage bytesMessage = (BytesMessage)msg;
                byte []bt = new byte[(int) bytesMessage.getBodyLength()];
                bytesMessage.readBytes(bt);
                message =  new String(bt, BankConstant.MQ_REVERT_CHARSET_TYPE);
            }
        } catch (JMSException e) {
            logger.error("", e.getMessage(), e);
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            logger.error("", e.getMessage(), e);
            e.printStackTrace();
        }
        logger.info("成功监听"+mqInfo.getReceiveQueue()+"消息队列,传来的值为[" + message+"]");
        if (StringUtils.isNotBlank(message)){
            MqMessageLog mqMessageLog = new MqMessageLog();
            mqMessageLog.setJmsCorrelationId(jmsCorrelationId);
            mqMessageLog.setJmsMessageId(jmsMessageId);
            mqMessageLog.setContent(message);
            mqMessageLog.setQueue(mqInfo.getReceiveQueue());
            mqMessageLog.setTheSender(CommonConstant.MESSAGE_TYPE_SEND_FISCAL);
            try {
                Reader reader = new StringReader(message.trim());
                Unmarshaller unmarshaller = JAXBContext.newInstance(BaseTransInfoResponse.class).createUnmarshaller();
                BaseTransInfoResponse baseTransInfoResponse = (BaseTransInfoResponse) unmarshaller.unmarshal(reader);
                BeanUtils.copyProperties(baseTransInfoResponse, mqMessageLog);
                if (baseTransInfoResponse != null && (StringUtils.isBlank(baseTransInfoResponse.getTransVerify()) || !baseTransInfoResponse.getTransVerify().equals("应答")) ){
                    if (TransTypeEnum.ZJJB.getType().equals(baseTransInfoResponse.getTransType())){
                        mqMessageLog.setMessageType(CommonConstant.MESSAGE_TYPE_RECEIVE);
                        Reader reader1 = new StringReader(message);
                        Unmarshaller unmarshaller1 = JAXBContext.newInstance(BaseTransInfoResponse.class, ZJJBTransInfoResponse.class).createUnmarshaller();
                        ZJJBTransInfoResponse jbTransInfoResponse = (ZJJBTransInfoResponse) unmarshaller1.unmarshal(reader1);
                        fiscalSendService.capitalReport(jbTransInfoResponse, jmsCorrelationId);
                    }
                }

            } catch (JAXBException e) {
                logger.error("", e.getMessage(), e);
                e.printStackTrace();
            } catch (Exception e) {
                logger.error("", e.getMessage(), e);
                e.printStackTrace();
            }
            int curtDate = DateHelper.getCurrentNumericDate();
            int curtTime = DateHelper.getCurrentNumericTime();
            mqMessageLog.setCreateDate(curtDate);
            mqMessageLog.setCreateTime(curtTime);
            mqMessageLogService.insert(mqMessageLog);
        }
    }

    @Override
    public Message receive(MqInfo mqInfo) {
        Message message = jmsOperations.receive(mqInfo.getReceiveQueue());
        if (message != null) {
            try {
                this.receiveMessage(mqInfo, message);
                return this.receive(mqInfo);
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("", e.getMessage(), e);
            }
        }
        return message;
    }

}

 队列列表实体

@TableName("mq_info")
public class MqInfo extends Model<MqInfo>   {
    private static final long serialVersionUID = 1L;
    /**
     * .
     */
    @TableField("id")
    private Integer id;
    /**
     * 队列管理器.
     */
    @TableField("qmgr")
    private String qmgr;
    /**
     * MQ服务器IP.
     */
    @TableField("mq_host")
    private String mqHost;
    /**
     * MQ服务器端口.
     */
    @TableField("mq_port")
    private Integer mqPort;
    /**
     * 发送队列.
     */
    @TableField("send_queue")
    private String sendQueue;
    /**
     * 接受队列.
     */
    @TableField("receive_queue")
    private String receiveQueue;
    /**
     * 通道.
     */
    @TableField("channel")
    private String channel;



    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
    @Override
    protected Serializable pkVal() {
        return this.id;
    }

    public String getQmgr() {
        return qmgr;
    }
    public void setQmgr(String qmgr) {
        this.qmgr = qmgr;
    }

    public String getMqHost() {
        return mqHost;
    }
    public void setMqHost(String mqHost) {
        this.mqHost = mqHost;
    }

    public Integer getMqPort() {
        return mqPort;
    }
    public void setMqPort(Integer mqPort) {
        this.mqPort = mqPort;
    }

    public String getSendQueue() {
        return sendQueue;
    }
    public void setSendQueue(String sendQueue) {
        this.sendQueue = sendQueue;
    }

    public String getReceiveQueue() {
        return receiveQueue;
    }
    public void setReceiveQueue(String receiveQueue) {
        this.receiveQueue = receiveQueue;
    }

    public String getChannel() {
        return channel;
    }
    public void setChannel(String channel) {
        this.channel = channel;
    }

}

猜你喜欢

转载自blog.csdn.net/u012613251/article/details/87255205