完整下单逻辑图
下单以及用户验证优化
优化方案
进行我们这一步骤是因为我们每次下单都需要查询数据库中的用户信息以及商品信息,我们把下过单的用户以及商品进行缓存,减少对数据库的访问。
下面是代码:
首先是商品信息:
我们在ItemService 中添加
//item及promo model 缓存模型
ItemModel getItemByIdInCache(Integer id);
实现方法:
@Override
public ItemModel getItemByIdInCache(Integer id) {
ItemModel itemModel = (ItemModel) redisTemplate.opsForValue().get("item_validate_"+id);
if(itemModel == null){
itemModel = this.getItemById(id);
redisTemplate.opsForValue().set("item_validate_"+id, itemModel);
redisTemplate.expire("item_validate_"+id, 10, TimeUnit.MINUTES);
}
return itemModel;
}
然后在下单的service 层中把从数据库拿变成去缓存取:
//1.校验下单状态,下单的商品是否存在,用户是否合法,购买数量是否正确
//ItemModel itemModel = itemService.getItemById(itemId);
ItemModel itemModel = itemService.getItemByIdInCache(itemId);
下面是用户缓存(同样的逻辑):
UserModel getUserByIdInCache(Integer id);
下面是实现:
@Override
public UserModel getUserByIdInCache(Integer id) {
UserModel userModel = (UserModel) redisTemplate.opsForValue().get("user_validate_"+id);
if(userModel == null){
userModel = this.getUserById(id);
redisTemplate.opsForValue().set("user_validate_"+id, userModel);
redisTemplate.expire("user_validate_"+id, 10, TimeUnit.MINUTES);
}
return userModel;
}
orderservice 中变成对缓存的访问:
//UserModel userModel = userService.getUserById(userId);
UserModel userModel = userService.getUserByIdInCache(userId);
优化效果–jemter 压测
- 没有做优化:
- 做了优化的:
实际上没有达到压力的极值
RocketMQ 安装
我们可以看下官网的步骤:https://rocketmq.apache.org/docs/quick-start/
-
Start Name Server
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log -
Start Broker
nohup sh bin/mqbroker -n localhost:9876 &
nohup sh mqbroker -n localhost:9876 -c …/conf/broker.conf autoCreateTopicEnable=true &tail -f ~/logs/rocketmqlogs/broker.log
这一步一般会出现安装失败的情况,因为我们的的mqbroker的启动内存太大了,我们需要修改mqbroker.xml
文件
修改runbroker.sh
内存配置再改下JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m"
库存行锁优化
首先给库存表加上一个对于item_id的索引
方案
- 活动发布同步库存到缓存
首先写一个把库存缓存到redis 的服务:
在PromoService 中
//根据活动ID 把数据库库存缓存到redis
void publishPromo(Integer promoId);
实现一下这个函数:
@Override
public void publishPromo(Integer promoId) {
PromoDO promoDO = promoDOMapper.selectByPrimaryKey(promoId);
if(promoDO.getItemId() == null || promoDO.getItemId().intValue() == 0){
return ;
}
//默认这段时间库存是不会变化的,变化的话,需要进行商品下架,然后活动开始再上架开始卖
ItemModel itemModel = itemService.getItemById(promoDO.getItemId());
//将库存同步到redis 中
redisTemplate.opsForValue().set("promo_item_stock"+itemModel.getId(), itemModel.getStock());
}
那么我们在哪里使用这个服务呢,我们在itemcontroller 里面写一个接口调一下
@RequestMapping(value = "/publishpromo",method = {RequestMethod.GET})
@ResponseBody
public CommonReturnType publishpromo(@RequestParam(name = "id")Integer id){
promoService.publishPromo(id);
return CommonReturnType.create(null);
}
然后启动项目,在浏览器上面输入http://localhost:8080/item/publishpromo?id=1
结果返货i的是正确的。我们看下redis:确实存到了redis 中。
- 下单交易减缓存库存
直接在redis 里面操作 但是数据库还是没有更新库存信息
@Override
@Transactional
public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {
//int affectedRow = itemStockDOMapper.decreaseStock(itemId,amount);
long result = redisTemplate.opsForValue().increment("promo_item_stock"+itemId, amount.intValue()*-1);
if(result >= 0){
//更新库存成功
return true;
}else{
//更新库存失败
return false;
}
}
- 异步消息扣减数据库内库存
@Component
public class MqConsumer {
private DefaultMQPushConsumer consumer;
@Value("${mq.nameserver.addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
@Autowired
private ItemStockDOMapper itemStockDOMapper;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer("stock_consumer_group");
consumer.setNamesrvAddr(nameAddr);
consumer.setVipChannelEnabled(false);
consumer.subscribe(topicName,"*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//实现库存真正到数据库内扣减的逻辑
Message msg = msgs.get(0);
String jsonString = new String(msg.getBody());
Map<String,Object>map = JSON.parseObject(jsonString, Map.class);
Integer itemId = (Integer) map.get("itemId");
Integer amount = (Integer) map.get("amount");
itemStockDOMapper.decreaseStock(itemId,amount);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
@Component
public class MqProducer {
private DefaultMQProducer producer;
@Value("${mq.nameserver.addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
@PostConstruct
public void init() throws MQClientException {
//做mq的初始化工作
producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr(nameAddr);
producer.setVipChannelEnabled(false);
producer.start();
}
//同步库存扣减的消息
public boolean asyncReduceStock(Integer itemId, Integer amount) {
Map<String, Object> bodyMap = new HashMap<>();
bodyMap.put("itemId", itemId);
bodyMap.put("amount", amount);
Message message = new Message(topicName, "increase",
JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
try {
producer.send(message);
} catch (MQClientException e) {
e.printStackTrace();
return false;
} catch (RemotingException e) {
e.printStackTrace();
return false;
} catch (MQBrokerException e) {
e.printStackTrace();
return false;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
return true;
}
}
@Override
@Transactional
public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {
//int affectedRow = itemStockDOMapper.decreaseStock(itemId,amount);
long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue() * -1);
if(result >= 0){
boolean mqResult = producer.asyncReduceStock(itemId, amount);
if(!mqResult){
redisTemplate.opsForValue().increment("promo_item_stock_"+itemId, amount.intValue());
return false;
}
//更新库存成功
return true;
}else{
//更新库存失败
redisTemplate.opsForValue().increment("promo_item_stock_"+itemId, amount.intValue());
return false;
}
}