像涉及到消息中转,消息处理的话,一般都会用到消息中间件,rabbitmq的优缺点可以去搜一下
我这边提供一个rabbitmq的监听服务
1.首先一个公共的常量配置
package com.emp.constants;
/**
* @描述: 公共常量
* @作者: DuKai
* @创建时间: 2018/7/26 16:36
* @版本号: V1.0
*/
public interface CommonConstants {
/** 小数点精度 **/
int AMOUNT_SPOTSIZE = 3;
/** Rabbitmq常量配置 **/
String TOPIC_EXCHANGE = "topicExchange";
String FANOUT_EXCHANGE="fanoutExchange";
String DIRECT_EXCHANGE="directExchange";
String HEADERS_EXCHANGE="headersExchange";
/** 充值信息队列 **/
String ACCOUNT_RECHARGE = "emp.account.recharge";
/** 提现信息队列 **/
String ACCOUNT_CASH = "emp.account.cash";
/** 提现成功通知队列 **/
String ACCOUNT_CASH_NOTICE = "emp.account.cash.notice";
/** 账户变动队列 **/
String ACCOUNT_CHANGE = "emp.account.change";
/** 游戏对局信息队列 **/
String GAME_OVER_INFO = "emp.game.over.info";
/** 游戏房间记录队列 **/
String GAME_ROOM_HISTORY = "emp.game.room.history";
/** 修改用户信息 **/
String GAME_US_USERINFO = "emp.game.us.userInfo";
}
2.消息中间件配置
@Bean(name = "rabbitmqBean")
public RabbitmqBean getRabbitmqBean() {
return new RabbitmqBean();
}
@Bean(name = "connectionFactory")
public ConnectionFactory connectionFactory(@Autowired RabbitmqBean bean) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(bean.getHost(),bean.getPort());
connectionFactory.setUsername(bean.getUsername());
connectionFactory.setPassword(bean.getPassword());
connectionFactory.setVirtualHost(bean.getVirtualHost());
connectionFactory.setPublisherConfirms(bean.isPublisherConfirms());
return connectionFactory;
}
@Bean(name = "rabbitTemplate")
/** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 **/
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
/** 根据规则匹配 **/
@Bean(name = "topicExchange")
public TopicExchange topicExchange() {
return new TopicExchange(CommonConstants.TOPIC_EXCHANGE);
}
/** 声明队列 */
@Bean(name = "rechargeQueue")
public Queue rechargeQueue() {
//true表示持久化该队列
return new Queue(CommonConstants.ACCOUNT_RECHARGE, true);
}
@Bean(name = "cashQueue")
public Queue cashQueue() {
return new Queue(CommonConstants.ACCOUNT_CASH);
}
@Bean(name = "cashNoticeQueue")
public Queue cashNoticeQueue() {
return new Queue(CommonConstants.ACCOUNT_CASH_NOTICE);
}
@Bean(name = "accountChangeQueue")
public Queue accountChangeQueue() {
return new Queue(CommonConstants.ACCOUNT_CHANGE);
}
@Bean(name = "gameOverInfoQueue")
public Queue gameOverInfoQueue() {
return new Queue(CommonConstants.GAME_OVER_INFO);
}
@Bean(name = "gameRoomHistoryQueue")
public Queue gameRoomHistoryQueue() {
return new Queue(CommonConstants.GAME_ROOM_HISTORY);
}
@Bean(name = "gameUsUserInfoQueue")
public Queue gameUsUserInfoQueue() {
return new Queue(CommonConstants.GAME_US_USERINFO);
}
/**
* 绑定队列
* @param rechargeQueue
* @param topicExchange
* @return
*/
@Bean
public Binding bindAccountQueue(@Qualifier("rechargeQueue") Queue rechargeQueue, @Autowired TopicExchange topicExchange) {
return BindingBuilder.bind(rechargeQueue).to(topicExchange).with(CommonConstants.ACCOUNT_RECHARGE);
}
@Bean
public Binding bindCashQueue(@Qualifier("cashQueue") Queue cashQueue, @Autowired TopicExchange topicExchange) {
return BindingBuilder.bind(cashQueue).to(topicExchange).with(CommonConstants.ACCOUNT_CASH);
}
@Bean
public Binding bindCashNoticeQueue(@Qualifier("cashNoticeQueue") Queue cashNoticeQueue, @Autowired TopicExchange topicExchange) {
return BindingBuilder.bind(cashNoticeQueue).to(topicExchange).with(CommonConstants.ACCOUNT_CASH_NOTICE);
}
@Bean
public Binding bindAccountChangeQueue(@Qualifier("accountChangeQueue") Queue accountChangeQueue, @Autowired TopicExchange topicExchange) {
return BindingBuilder.bind(accountChangeQueue).to(topicExchange).with(CommonConstants.ACCOUNT_CHANGE);
}
@Bean
public Binding bindGameOverInfoQueue(@Qualifier("gameOverInfoQueue") Queue gameOverInfoQueue, @Autowired TopicExchange topicExchange) {
return BindingBuilder.bind(gameOverInfoQueue).to(topicExchange).with(CommonConstants.GAME_OVER_INFO);
}
@Bean
public Binding bindGameRoomHistoryQueue(@Qualifier("gameRoomHistoryQueue") Queue gameRoomHistoryQueue, @Autowired TopicExchange topicExchange) {
return BindingBuilder.bind(gameRoomHistoryQueue).to(topicExchange).with(CommonConstants.GAME_ROOM_HISTORY);
}
@Bean
public Binding bindUsUserInfoQueue(@Qualifier("gameUsUserInfoQueue") Queue gameUsUserInfoQueue, @Autowired TopicExchange topicExchange) {
return BindingBuilder.bind(gameUsUserInfoQueue).to(topicExchange).with(CommonConstants.GAME_US_USERINFO);
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
@Autowired ConnectionFactory connectionFactory, @Autowired RabbitmqBean bean){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(bean.getConcurrentConsumers());
factory.setMaxConcurrentConsumers(bean.getMaxConcurrentConsumers());
//开启手动 ack
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
3.写你拿到数据干嘛的业务
/**
* 充值信息队列消费
* emp数字账户不能充值和提现 只能通过USDT或者其他的币种兑换过来
* @param message
* @param channel
* @param map
* @throws Exception
*/
@RabbitHandler
@RabbitListener(queues = "emp.account.recharge")
public void consumersMqRechargeMsg(@Payload byte[] message, Channel channel,
@Headers Map<String,Object> map) throws Exception{
long deliveryTag = (long)map.get(AmqpHeaders.DELIVERY_TAG);
String msgKey = map.get("amqp_receivedRoutingKey").toString();
//将字节数组转换为集合
List<DigitalOrder> digitalOrders = JsonUtil.strToList(new String(message), DigitalOrder.class);
logger.info(digitalOrders.toString());
logger.info("消费队列【"+msgKey+"】中的消息");
String queueKey = "emp.account.recharge";
if(queueKey.equals(msgKey)) {
digitalOrders.stream().forEach(digitalOrder -> {
//判断是否为已确认过的订单
String orderId = digitalOrder.getOrderId();
DigitalOrder digitalOrderRes = digitalOrderService.getDigitalOrderByIdCon(orderId);
//交易类型 1表示充值
String transType = "1";
String addr = digitalOrder.getAddr();
String currencyType = digitalOrder.getCurrencyType();
switch(digitalOrder.getOrderStatus()){
case "1":
if(digitalOrderRes!=null){
if(digitalOrderRes.getOrderStatus().equals("2")) {
break;
}else{
DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
digitalOrder.setTransType(transType);
digitalOrderRes.setOrderStatus("1");
digitalOrderRes.setDigitalAccountId(digitalAccount.getDigitalAccountId());
digitalOrderRes.setConfirmations(digitalOrder.getConfirmations());
try {
digitalOrderService.addOrModDigitalOrder(digitalOrderRes);
}catch(Exception ee){
logger.info("11"+digitalOrderRes.toString());
}
BigDecimal transAmount = digitalOrder.getAmount();
int a=accountService.rechargeAccountAmount(addr, currencyType, transAmount, transType, orderId);
logger.info("充值成功"+a);
break;
}
}else{
DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
digitalOrder.setTransType(transType);
digitalOrder.setDigitalAccountId(digitalAccount.getDigitalAccountId());
try {
digitalOrderService.addOrModDigitalOrder(digitalOrder);
}catch(Exception ee){
logger.info("12"+digitalOrder.toString());
}
BigDecimal transAmount = digitalOrder.getAmount();
int a=accountService.rechargeAccountAmount(addr, currencyType, transAmount, transType, orderId);
logger.info("充值成功"+a);
break;
}
case "2":
if(digitalOrderRes!=null){
DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
digitalOrderRes.setDigitalAccountId(digitalAccount.getDigitalAccountId());
digitalOrderRes.setOrderStatus("2");
digitalOrderRes.setConfirmations(digitalOrder.getConfirmations());
try{
digitalOrderService.addOrModDigitalOrder(digitalOrderRes);
}catch(Exception ee){
logger.info("21"+digitalOrderRes.toString());
}
logger.info("充值失败");
break;
}else{
DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
digitalOrder.setTransType(transType);
digitalOrder.setDigitalAccountId(digitalAccount.getDigitalAccountId());
try{
digitalOrderService.addOrModDigitalOrder(digitalOrder);
}catch(Exception ee){
logger.info("22"+digitalOrder.toString());
}
logger.info("充值失败");
break;
}
default:
if(digitalOrderRes!=null){
if(digitalOrderRes.getOrderStatus().equals("1")){
break;
}else if(digitalOrderRes.getOrderStatus().equals("2")){
break;
}else{
DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
digitalOrderRes.setDigitalAccountId(digitalAccount.getDigitalAccountId());
digitalOrderRes.setTransType(transType);
digitalOrderRes.setConfirmations(digitalOrder.getConfirmations());
try{
digitalOrderRes.setOrderStatus(digitalOrder.getOrderStatus());
digitalOrderService.addOrModDigitalOrder(digitalOrderRes);
}catch(Exception ee){
logger.info("31"+digitalOrderRes.toString());
}
break;
}
}else{
DigitalAccount digitalAccount = digitalAccountService.getDigitalAccountByAddr(addr, currencyType);
digitalOrder.setTransType(transType);
digitalOrder.setDigitalAccountId(digitalAccount.getDigitalAccountId());
try {
digitalOrderService.addOrModDigitalOrder(digitalOrder);
}catch(Exception ee){
logger.info("32"+digitalOrder.toString());
}
logger.info("默认");
break;
}
}
});
}
// 确认消费成功
channel.basicAck(deliveryTag,false);
}