我们对于交易进行一下200个线程压测,看下应用服务器的资源:
再看下压测结果:
接下来用1000个线程看下数据库资源:
并且耗时也加大了:
那么对于下单我们一般会有几个步骤:1.校验商品是否存在,用户是否合法,购买数量是否正确。2.落单减库存。3.订单入库,加商品销量。4.返回前端。
通过上面步骤,其实我们对于数据库至少有6次操作,而且在减数据库的时候是根据id操作,那还有个行锁,所以性能亟待优化。
交易验证的优化,可以分为两部分:
用户风控策略优化:策略缓存模型化
活动校验策略优化:引入活动发布流程,模型缓存化,紧急下线的能力
例如我们把产品信息放到redis中(用户信息一样):
@Override
public ItemModel getItemByIdInCache(Integer id) {
ItemModel itemModel = (ItemModel) redisTemplate.opsForValue().get("item_validate_"+id);
if (itemModel == null){
itemModel = this.getItemById(id);
redisTemplate.opsForValue().set("item_validate_"+id , itemModel);
redisTemplate.expire("item_validate_"+id , 10 , TimeUnit.MINUTES);
}
return itemModel;
}
压测一下
区别不太大的原因是服务器带宽问题,但是还是有提升。
对于紧急下架功能,我们可以开放一个接口,删除redis即可。
对于库存行锁的优化:
扣减库存缓存化
异步同步数据库
库存数据库最终一致性保证
首先如何做到缓存化?一般活动商品都会有一个上架操作,那么我们可以写一个接口来进行数据同步:
@Override
public void publicPromo(Integer promoId) {
//通过活动id获取活动
PromoDO promoDo = promoDOMapper.selectByPrimaryKey(promoId);
if (promoDo.getItemId() == null || promoDo.getItemId().intValue() == 0){
return;
}
ItemModel itemModel = itemService.getItemById(promoDo.getItemId());
//将库存同步到redis内
redisTemplate.opsForValue().set("promo_item_stock_" + itemModel.getId() , itemModel.getStock());
}
然后controller层调用来同步到redis中。
接下来要做减库存的操作,思路就是直接减redis中的库存:
@Override
@Transactional
public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {
//int affectedRow = itemStockDOMapper.decreaseStock(itemId,amount);
long result = redisTemplate.opsForValue().increment("promo_item_stock_" + itemId , amount * -1);
if(result >= 0){
//更新库存成功
return true;
}else{
//更新库存失败
return false;
}
}
当然这种情况下在生产环境中不能用,因为数据库数据时不一致的,所以需要优化一下:
用异步消息队列的方式把扣减的消息发送给消息consumer端,由consumer端完成数据库扣减操作。这里我们用rocketmq,他是高性能高并发分布式消息中间件,典型的应用场景是分布式事务、异步解耦。
安装rocketmq非常简单,wget一下解压一下就好,启动和测试在官网可以看到http://rocketmq.apache.org/docs/quick-start/,
启动server:nohup sh bin/mqnamesrv &
broker:nohup sh bin/mqbroker -n localhost:9876 &
这里就不说了,有几个坑需要注意:
刚下载好的mq启动需要很大的内存空间,所以需要改一下:
bin/runserver.sh:
bin/runbroker.sh:
具体大小配置根据电脑实际情况来。
rocketmq还给我们提供了很多命令,可以看下mqadmin:
新建一个topic试一下:./mqadmin updateTopic -n localhost:9876 -t stock -c DefaultCluster
这里可能会报错:
需要更改一下tools.sh的内容,可以用命令找一下find / -name '*ext*' |grep jdk:
还有一个坑需要注意,如果配置在服务器上,broker上面的ip可能是内网ip,可以用sh ./mqbroker -m看一下:
需要修改conf/broker.conf:
启动命令要加上conf:
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
接下来代码实战下:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
mq.nameserver.addr=47.107.*.*:9876
mq.topicname=stock
@Component
public class MqProducer {
private DefaultMQProducer producer;
@Value("${mq.nameserver.addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
@PostConstruct
public void init() throws MQClientException {
//mq producer初始化
producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr(nameAddr);
producer.start();
}
//同步库存扣减消息
public boolean asyncReduceStock(Integer itemId , Integer amount){
Map<String , Object> map = new HashMap<>();
map.put("itemId" , itemId);
map.put("amount" , amount);
Message message = new Message(topicName , "increas" ,
JSON.toJSON(map).toString().getBytes(Charset.forName("UTF-8")));
try {
producer.send(message);
} catch (MQClientException e) {
e.printStackTrace();
return false;
} catch (RemotingException e) {
e.printStackTrace();
return false;
} catch (MQBrokerException e) {
e.printStackTrace();
return false;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
return true;
}
}
@Component
public class MqConsumer {
private DefaultMQPushConsumer consumer;
@Value("${mq.nameserver.addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
@Autowired
private ItemStockDOMapper itemStockDOMapper;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer("stock_consumer_group");
consumer.setNamesrvAddr(nameAddr);
consumer.subscribe(topicName , "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//实现库存真正在数据库中扣减
Message msg = msgs.get(0);
String jsonString = new String(msg.getBody());
Map<String , Object> map = JSON.parseObject(jsonString , Map.class);
Integer itemId = (Integer) map.get("itemId");
Integer amount = (Integer) map.get("amount");
itemStockDOMapper.decreaseStock(itemId , amount);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
业务层:
@Autowired
private MqProducer mqProducer;
@Override
@Transactional
public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {
long result = redisTemplate.opsForValue().increment("promo_item_stock_" + itemId , amount * -1);
if(result >= 0){
//更新库存成功
boolean mqResult = mqProducer.asyncReduceStock(itemId , amount);
if (!mqResult){
//失败回滚redis
redisTemplate.opsForValue().increment("promo_item_stock_" + itemId , amount);
return false;
}
return true;
}else{
//更新库存失败
redisTemplate.opsForValue().increment("promo_item_stock_" + itemId , amount);
return false;
}
}
这样就可以实现同步了,但是这样仍然会存在几个问题:
1.异步消息发送失败怎么办
2.扣减操作执行失败怎么办
3.下单失败无法正确回补库存怎么办
这些问题后面解决