本地设置监听RabbitMq

像涉及到消息中转,消息处理的话,一般都会用到消息中间件,rabbitmq的优缺点可以去搜一下

我这边提供一个rabbitmq的监听服务

1.首先一个公共的常量配置

package com.emp.constants;


/**
 * @描述: 公共常量
 * @作者: DuKai
 * @创建时间: 2018/7/26 16:36
 * @版本号: V1.0
 */
public interface CommonConstants {

    /** 小数点精度 **/
    int AMOUNT_SPOTSIZE = 3;

    /** Rabbitmq常量配置 **/
    String TOPIC_EXCHANGE = "topicExchange";
    String FANOUT_EXCHANGE="fanoutExchange";
    String DIRECT_EXCHANGE="directExchange";
    String HEADERS_EXCHANGE="headersExchange";

    /** 充值信息队列 **/
    String ACCOUNT_RECHARGE = "emp.account.recharge";
    /** 提现信息队列 **/
    String ACCOUNT_CASH = "emp.account.cash";
    /** 提现成功通知队列 **/
    String ACCOUNT_CASH_NOTICE = "emp.account.cash.notice";
    /** 账户变动队列 **/
    String ACCOUNT_CHANGE = "emp.account.change";
    /** 游戏对局信息队列 **/
    String GAME_OVER_INFO = "emp.game.over.info";
    /** 游戏房间记录队列 **/
    String GAME_ROOM_HISTORY = "emp.game.room.history";
    /** 修改用户信息 **/
    String GAME_US_USERINFO = "emp.game.us.userInfo";




}

2.消息中间件配置

  @Bean(name = "rabbitmqBean")
    public RabbitmqBean getRabbitmqBean() {
        return new RabbitmqBean();
    }

    @Bean(name = "connectionFactory")
    public ConnectionFactory connectionFactory(@Autowired RabbitmqBean bean) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(bean.getHost(),bean.getPort());
        connectionFactory.setUsername(bean.getUsername());
        connectionFactory.setPassword(bean.getPassword());
        connectionFactory.setVirtualHost(bean.getVirtualHost());
        connectionFactory.setPublisherConfirms(bean.isPublisherConfirms());
        return connectionFactory;
    }

    @Bean(name = "rabbitTemplate")
    /** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 **/
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    /** 根据规则匹配 **/
    @Bean(name = "topicExchange")
    public TopicExchange topicExchange() {
        return new TopicExchange(CommonConstants.TOPIC_EXCHANGE);
    }

    /** 声明队列 */
    @Bean(name = "rechargeQueue")
    public Queue rechargeQueue() {
        //true表示持久化该队列
        return new Queue(CommonConstants.ACCOUNT_RECHARGE, true);
    }

    @Bean(name = "cashQueue")
    public Queue cashQueue() {
        return new Queue(CommonConstants.ACCOUNT_CASH);
    }

    @Bean(name = "cashNoticeQueue")
    public Queue cashNoticeQueue() {
        return new Queue(CommonConstants.ACCOUNT_CASH_NOTICE);
    }

    @Bean(name = "accountChangeQueue")
    public Queue accountChangeQueue() {
        return new Queue(CommonConstants.ACCOUNT_CHANGE);
    }

    @Bean(name = "gameOverInfoQueue")
    public Queue gameOverInfoQueue() {
        return new Queue(CommonConstants.GAME_OVER_INFO);
    }

    @Bean(name = "gameRoomHistoryQueue")
    public Queue gameRoomHistoryQueue() {
        return new Queue(CommonConstants.GAME_ROOM_HISTORY);
    }

    @Bean(name = "gameUsUserInfoQueue")
    public Queue gameUsUserInfoQueue() {
        return new Queue(CommonConstants.GAME_US_USERINFO);
    }


/**
     * 绑定队列
     * @param rechargeQueue
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding bindAccountQueue(@Qualifier("rechargeQueue") Queue rechargeQueue, @Autowired TopicExchange topicExchange) {
        return BindingBuilder.bind(rechargeQueue).to(topicExchange).with(CommonConstants.ACCOUNT_RECHARGE);
    }

    @Bean
    public Binding bindCashQueue(@Qualifier("cashQueue") Queue cashQueue, @Autowired TopicExchange topicExchange) {
        return BindingBuilder.bind(cashQueue).to(topicExchange).with(CommonConstants.ACCOUNT_CASH);
    }

    @Bean
    public Binding bindCashNoticeQueue(@Qualifier("cashNoticeQueue") Queue cashNoticeQueue, @Autowired TopicExchange topicExchange) {
        return BindingBuilder.bind(cashNoticeQueue).to(topicExchange).with(CommonConstants.ACCOUNT_CASH_NOTICE);
    }

    @Bean
    public Binding bindAccountChangeQueue(@Qualifier("accountChangeQueue") Queue accountChangeQueue, @Autowired TopicExchange topicExchange) {
        return BindingBuilder.bind(accountChangeQueue).to(topicExchange).with(CommonConstants.ACCOUNT_CHANGE);
    }

    @Bean
    public Binding bindGameOverInfoQueue(@Qualifier("gameOverInfoQueue") Queue gameOverInfoQueue, @Autowired TopicExchange topicExchange) {
        return BindingBuilder.bind(gameOverInfoQueue).to(topicExchange).with(CommonConstants.GAME_OVER_INFO);
    }

    @Bean
    public Binding bindGameRoomHistoryQueue(@Qualifier("gameRoomHistoryQueue") Queue gameRoomHistoryQueue, @Autowired TopicExchange topicExchange) {
        return BindingBuilder.bind(gameRoomHistoryQueue).to(topicExchange).with(CommonConstants.GAME_ROOM_HISTORY);
    }

    @Bean
    public Binding bindUsUserInfoQueue(@Qualifier("gameUsUserInfoQueue") Queue gameUsUserInfoQueue, @Autowired TopicExchange topicExchange) {
        return BindingBuilder.bind(gameUsUserInfoQueue).to(topicExchange).with(CommonConstants.GAME_US_USERINFO);
    }


    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
            @Autowired ConnectionFactory connectionFactory, @Autowired RabbitmqBean bean){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(bean.getConcurrentConsumers());
        factory.setMaxConcurrentConsumers(bean.getMaxConcurrentConsumers());
        //开启手动 ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

3.写你拿到数据干嘛的业务

/**
     * 充值信息队列消费
     * emp数字账户不能充值和提现 只能通过USDT或者其他的币种兑换过来
     * @param message
     * @param channel
     * @param map
     * @throws Exception
     */
    @RabbitHandler
    @RabbitListener(queues = "emp.account.recharge")
    public void consumersMqRechargeMsg(@Payload byte[] message, Channel channel,
                                       @Headers Map<String,Object> map) throws Exception{
        long deliveryTag = (long)map.get(AmqpHeaders.DELIVERY_TAG);
        String msgKey = map.get("amqp_receivedRoutingKey").toString();
        //将字节数组转换为集合
        List<DigitalOrder> digitalOrders = JsonUtil.strToList(new String(message), DigitalOrder.class);
        logger.info(digitalOrders.toString());
        logger.info("消费队列【"+msgKey+"】中的消息");
        String queueKey = "emp.account.recharge";
        if(queueKey.equals(msgKey)) {
            digitalOrders.stream().forEach(digitalOrder -> {
                //判断是否为已确认过的订单
                String orderId = digitalOrder.getOrderId();
                DigitalOrder digitalOrderRes = digitalOrderService.getDigitalOrderByIdCon(orderId);
                //交易类型 1表示充值
                String transType = "1";
                String addr = digitalOrder.getAddr();
                String currencyType = digitalOrder.getCurrencyType();
                switch(digitalOrder.getOrderStatus()){
                    case "1":
                        if(digitalOrderRes!=null){
                            if(digitalOrderRes.getOrderStatus().equals("2")) {
                                break;
                            }else{
                                DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
                                digitalOrder.setTransType(transType);
                                digitalOrderRes.setOrderStatus("1");
                                digitalOrderRes.setDigitalAccountId(digitalAccount.getDigitalAccountId());
                                digitalOrderRes.setConfirmations(digitalOrder.getConfirmations());
                                try {
                                    digitalOrderService.addOrModDigitalOrder(digitalOrderRes);
                                }catch(Exception ee){
                                 logger.info("11"+digitalOrderRes.toString());
                                }


                                BigDecimal transAmount = digitalOrder.getAmount();
                                int a=accountService.rechargeAccountAmount(addr, currencyType, transAmount, transType, orderId);
                                logger.info("充值成功"+a);
                                 break;
                            }
                        }else{
                            DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
                            digitalOrder.setTransType(transType);
                            digitalOrder.setDigitalAccountId(digitalAccount.getDigitalAccountId());
                            try {
                            digitalOrderService.addOrModDigitalOrder(digitalOrder);
                            }catch(Exception ee){
                                logger.info("12"+digitalOrder.toString());
                            }
                            BigDecimal transAmount = digitalOrder.getAmount();
                            int a=accountService.rechargeAccountAmount(addr, currencyType, transAmount, transType, orderId);
                            logger.info("充值成功"+a);
                            break;
                        }
                    case "2":
                        if(digitalOrderRes!=null){
                                DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
                                digitalOrderRes.setDigitalAccountId(digitalAccount.getDigitalAccountId());
                                digitalOrderRes.setOrderStatus("2");
                                digitalOrderRes.setConfirmations(digitalOrder.getConfirmations());
                                try{
                                digitalOrderService.addOrModDigitalOrder(digitalOrderRes);
                                }catch(Exception ee){
                               logger.info("21"+digitalOrderRes.toString());
                    }
                                logger.info("充值失败");
                                 break;
                        }else{
                            DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
                            digitalOrder.setTransType(transType);
                            digitalOrder.setDigitalAccountId(digitalAccount.getDigitalAccountId());
                            try{
                            digitalOrderService.addOrModDigitalOrder(digitalOrder);
                           }catch(Exception ee){
                        logger.info("22"+digitalOrder.toString());
                    }
                            logger.info("充值失败");
                            break;
                        }
                        default:
                            if(digitalOrderRes!=null){
                                if(digitalOrderRes.getOrderStatus().equals("1")){
                                    break;
                                }else if(digitalOrderRes.getOrderStatus().equals("2")){
                                    break;
                                }else{
                                    DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
                                    digitalOrderRes.setDigitalAccountId(digitalAccount.getDigitalAccountId());
                                    digitalOrderRes.setTransType(transType);
                                    digitalOrderRes.setConfirmations(digitalOrder.getConfirmations());
                                    try{
                                    digitalOrderRes.setOrderStatus(digitalOrder.getOrderStatus());
                                    digitalOrderService.addOrModDigitalOrder(digitalOrderRes);
                                    }catch(Exception ee){
                                        logger.info("31"+digitalOrderRes.toString());
                                    }
                                    break;

                                }
                            }else{
                                DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
                                digitalOrder.setTransType(transType);
                                digitalOrder.setDigitalAccountId(digitalAccount.getDigitalAccountId());
                                try {
                                    digitalOrderService.addOrModDigitalOrder(digitalOrder);
                                }catch(Exception ee){
                                    logger.info("32"+digitalOrder.toString());
                                }
                                logger.info("默认");
                                break;
                            }

                }
            });
        }
        // 确认消费成功
        channel.basicAck(deliveryTag,false);
    }
发布了237 篇原创文章 · 获赞 235 · 访问量 15万+

猜你喜欢

转载自blog.csdn.net/LRXmrlirixing/article/details/97373768