一:并发操作出现的原因
原因:多个动作在一瞬间同时操作同一数据
现象:
- 多人在某一瞬间通过相同的方式操作同一条数据
- 多人在某一瞬间通过不同的方式操作同一条数据
- 在某一瞬间,同一动作,多次执行
二:并发举例及解决办法
针对上述的三种的情况,分别以实际情况进行举例。
【多人在某一瞬间通过相同的方式操作同一条数据】
1.某仓库系统有一品牌商品A,商品A在数据库中只允许存在一条记录,库存的数量是这条数据的一个字段,现有库存100件,在某一天到货了1000件。由于数量比较大,现在需要10名操作员去处理这1000件商品进行入库,操作的途径都是使用PDA扫描完成后进行入库。我们假设至少存在1名以上的操作员同时进行入库操作。这样就可以满足上述条件【多人在某一瞬间通过相同的方式操作同一条数据】。在这种情况下,如果不进行处理,就会导致数据错乱,错乱的原因简单说就是在双方写数据时没有获取到最新的数据库数据。
解决方法:
方法一: 加锁。加锁是比较常用的方法。从系统的架构上来说,锁被分为单机锁和分布式锁。如果系统只是部署在单一机器上,可以简单通过java提供的各种锁来进行操作。如果系统被部署在多台机器上,可以使用redis来实现分布式加锁。这两种加锁方式从某种意义上来说是悲观锁。上述的问题,我们可以使用商品的唯一属性,比如id或者商品的唯一条码来进行加锁。
方法二:数据库乐观锁。数据库乐观锁几乎适用于所有的并发场景。使用方法:在数据库表中增加一个版本号字段,每一次更新和删除时把当前持有的对象版本号和数据库中最新的版本号进行比对,如果相同则验证通过,不然则操作失败。
方法三:使用消息队列。这种方式在消息过多时,对库存的处理可能不会特别及时。由于库存一般是需要比较及时的可见,所以这种方式并不建议。
【多人在某一瞬间通过不同的方式操作同一条数据】
2. 还是按照上述的背景来说。在这10名操作员进行入库的同时,还有至少1名操作员对A商品进行出库操作。我们假设入库时没有并发问题,但是其中一个入库和一个出库同时操作了A商品的库存,通过两种不同的方式对库存进行操作。如果不进行处理,库存也会出现数据错乱的问题。
解决方法:
方法一: 加锁。这个时候使用普通的单机锁已经没有意义了,可以使用分布式锁,依旧使用唯一属性来进行加锁,尽管方法不同,但关键的key是一样的,这样就可以锁住操作。
方法二:数据库乐观锁。
对于上述的问题,我扩展一下,如果是一批商品,你总不能一个一个进行加锁处理吧,那样效率也太低了。所以这种情况下,简单的加锁已经不能满足现在的需要了。所以数据库乐观锁又重新出现了。在批量更新时,发现其中任何一个商品的版本号不一致,立即报错回滚。
本篇文章就是对加锁方案,互斥锁(mutex key)的落地:
创建redis锁工具类:
@Slf4j
@Component
public class RedisLock {
private final long lockTime = 10;
@Autowired
private StringRedisTemplate redisTemplate;
public boolean tryGetLock(String key, String value) {
return tryGetLock(key, value, 1000);
}
public boolean tryGetLock(String key, String value, long waitMillis) {
long start = System.currentTimeMillis();
Preconditions.checkArgument(waitMillis <= lockTime * 1000);
checkKeyPrefix(key);
Boolean getLock = redisTemplate.opsForValue().setIfAbsent(key, value, lockTime, TimeUnit.SECONDS);
if (getLock != null && getLock) {
return true;
} else {
return tryGetLock(key, value, waitMillis, start);
}
}
private boolean tryGetLock(String key, String value, long waitMillis, long start) {
while (true) {
long end = System.currentTimeMillis();
if (end - start > waitMillis) {
return false;
}
Boolean getLock = redisTemplate.opsForValue().setIfAbsent(key, value, lockTime, TimeUnit.SECONDS);
if (getLock != null && getLock) {
return true;
} else {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
log.error("getRedisLock InterruptedException, ", e);
}
}
}
}
public void releaseLock(String key, String value) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Collections.singletonList(key), value);
}
private void checkKeyPrefix(String key) {
Objects.requireNonNull(key);
Arrays.stream(RedisLockKeyPrefix.values()).filter(prefix -> key.startsWith(prefix.name())).findFirst().orElseThrow(() ->
new IllegalArgumentException("请定义锁前缀"));
}
}
消耗额度,同时只能有一个人操作。
@Override
@Transactional
public ResultVO<Object> createSitesTask(CreateSiteTaskArg createSiteTaskArg, User user) {
String lockKey = SITE_CHOOSE_LOCK_PREFIX + LOCK_KEY_CONCAT + user.getUserName() + LOCK_KEY_CONCAT + createSiteTaskArg.getBrandId()
+ LOCK_KEY_CONCAT + createSiteTaskArg.getSiteChooseWay();
String lockValue = UUID.randomUUID().toString();
try {
//有其他相同账户在操作,则先暂时不让当前用户操作,等5秒钟
if (!redisLock.tryGetLock(lockKey, lockValue, 2000)) {
throw new IndustryException(ErrorCode.SYSTEM_BUSY);
}
// 校验用户选址剩余次数
List<IndustryAuthorityPO> authorityPOList = checkAndGetRestSite(createSiteTaskArg, user.getUserName());
// 扣除用户选址次数
Map<String, RecordPO> records = reduceRestSites(authorityPOList, createSiteTaskArg.getSites().size(),
createSiteTaskArg.getSiteChooseWay(), user.getUserName());
// 调用openapi发起选址预测
SiteSelectionDTO siteSelectionDTO = buildSiteSelection(createSiteTaskArg);
SiteSelectAsynVO siteSelectAsynVO = SdkUtils.getData(openapiClient.execute(siteSelectionDTO)).getData();
// 保存扣除记录
SiteChooseRecordPO siteChooseRecordPO = buildSiteChooseRecord(createSiteTaskArg, siteSelectAsynVO.getTaskId(),
user.getUserName(), records);
industrySitesChooseRecordDAO.save(siteChooseRecordPO);
// 保存选址预测任务信息
List<SiteChoosePO> siteChoosePOList = buildSiteChooseList(createSiteTaskArg, siteSelectAsynVO, user.getUserName());
industrySitesChooseDAO.insertAll(siteChoosePOList);
} finally {
redisLock.releaseLock(lockKey, lockValue);
}
return ResultVO.success(null, "创建分析任务成功");
}
操作完及时释放锁!
方法三:使用消息队列。这种方式在消息过多时,对库存的处理可能不会特别及时。由于库存一般是需要比较及时的可见,所以这种方式并不建议。
【在某一瞬间,同一动作,多次执行】
3.这一种情况属于请求重复提交,同样,如果没有进行处理,数据也会出现问题。
一个用户在入库时重复提交了两次,这样在不考虑其它并发的影响下,库存中的数据会多增加一次,但在入库历史中却只能看见一次记录,这样肯定是不可接受的。
解决方法:
方法一:前台可以在按钮或链接第一次点击后立刻禁用。这样可以有效的解决绝大部分的问题。但是由于操作端千变万化,这种方式并不能够完全解决问题。
方法二:后台生成一个随机数放在前台,前台在访问后台时,将随机数传输到后台进行验证,第一次验证通过即刻销毁, 随机数可以存在redis或session中,一般用于表单提交。但是这种方式还是存在缺陷,如果同一个页面有多个请求,一个随机数就完全不够用了。
方法三:nginx可以控制ip在同一时间内对服务的访问频率。比如入库时,如果进行了多次点击,发送了多次请求,在这1秒中,系统只接收第一次请求。
三:总结
处理并发的最终原理其实就是:将用户的并行操作变成串行操作。
在解决并发问题时,从操作端到服务端,再到数据库,都需要进行处理,层层过滤。
前端:防止多次点击。
服务端:对相同数据的操作写在同一个服务中。
数据库:乐观锁一定要使用。有需要的话,数据库的联合唯一索引也要准备。
四:扩展
到此为止,有相关项目开发经验(仓库系统)的读者可能会发现有些问题。问题在于:库存的设计不够优雅,才导致了很多并发情况的产生。比如,10个操作员在入库时,为什么需要操作库存中的A商品去增加库存呢?其实,所有的入库和出库,包括盘点等,都是在为了用户能够及时的看见库存而已。既然知道库存的计算方式,我们完全可以计算出对应的库存,并且还能减少大量的并发操作。