<!--消息总线-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
package com.boot.merchant.restapi.mqchannel;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface OutputSource {
String ORDERSCHANNEL = "orderschannel";
@Output("orderschannel")
MessageChannel ordersOutput();
}
package com.boot.merchant.restapi.controller;
import com.boot.merchant.domain.entity.Order;
import com.boot.merchant.domain.entity.OrderDetail;
import com.boot.merchant.domain.service.OrderService;
import com.boot.merchant.domain.util.CommonUtils;
import com.boot.merchant.domain.util.CopyUtil;
import com.boot.merchant.object.OrderQo;
import com.boot.merchant.restapi.mqchannel.OutputSource;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.data.domain.Page;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/order")
@EnableBinding(OutputSource.class)
public class OrderRestController {
private static Logger logger = LoggerFactory.getLogger(OrderRestController.class);
@Autowired
private OrderService orderService;
@Autowired
@Output(OutputSource.ORDERSCHANNEL)
private MessageChannel ordersChannel;
@RequestMapping(method = RequestMethod.PUT)
public CompletableFuture<String> update(@RequestBody OrderQo orderQo) throws Exception{
return CompletableFuture.supplyAsync(() -> {
Order order = CopyUtil.copy(orderQo, Order.class);
order.setModify(new Date());
List<OrderDetail> detailList = CopyUtil.copyList(orderQo.getOrderDetails(), OrderDetail.class);
order.setOrderDetails(detailList);
orderService.save(order);
//发送MQ消息,通知订单修改
ordersChannel.send(MessageBuilder.withPayload(orderQo).build());
logger.info("修改->ID=" + order.getId());
return order.getId().toString();
});
}
}
package com.demo.goods.restapi.mqchannel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface InputSource {
String ORDERSCHANNEL = "orderschannel";
String REPLYCHANNEL = "replychannel";
@Input("orderschannel")
SubscribableChannel ordersInput();
@Output("replychannel")
MessageChannel replyOutput();
}
package com.demo.goods.restapi.service;
import com.demo.goods.domain.entity.Goods;
import com.demo.goods.domain.service.GoodsService;
import com.demo.goods.restapi.mqchannel.InputSource;
import com.demo.order.object.OrderDetailQo;
import com.demo.order.object.OrderQo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.ServiceActivator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@EnableBinding(InputSource.class)
public class OrdersReceive {
private static Logger logger = LoggerFactory.getLogger(OrdersReceive.class);
@Autowired
private GoodsService goodsService;
@ServiceActivator(inputChannel = InputSource.ORDERSCHANNEL, outputChannel = InputSource.REPLYCHANNEL)
public CompletableFuture<String> accept(OrderQo orderQo) {
return CompletableFuture.supplyAsync(() -> {
if(orderQo != null) {
logger.info("接收到订单更新消息,订单编号=" + orderQo.getOrderNo());
if (orderQo.getStatus() != null && orderQo.getStatus() < 0) {
List<OrderDetailQo> list = orderQo.getOrderDetails();
for (OrderDetailQo orderDetailQo : list) {
Goods goods = goodsService.findOne(orderDetailQo.getGoodsid());
if(goods != null){
Integer num = goods.getBuynum() != null && goods.getBuynum() >0?
goods.getBuynum() - 1 : 0;
goods.setBuynum(num);
goodsService.save(goods);
logger.info("更新了商品购买数量,商品名称=" + goods.getName());
}
}
}
}
return "1";
});
}
}