zookeeper简介
关于zookeeper之前简单写过两篇博客进行总结,但是比较偏实例,关于zookeeper分布式锁的介绍可以参看这篇简书——zookeeper的分布式锁。
之前的两篇总结地址:curator基本操作,curator的watcher机制。
springboot中集成zookeeper
引入zookeeper的jar包
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
这里包含了zookeeper和curator的jar包信息。
引入zookeeper的配置信息
zookeeper.address.username=localhost:2181
zookeeper.address.namespace=spring_boot_distribute_lock
这里指定了连接信息和命名空间。
将CuratorFramework加入容器
/**
* autor:liman
* createtime:2020/1/29
* comment:
*/
@Configuration
public class MiddlewareConfiguration {
//注入环境配置的实体,用于读取配置信息
@Autowired
private Environment env;
@Bean
public CuratorFramework curatorFramework(){
CuratorFramework curatorFramework =
CuratorFrameworkFactory.builder().connectString(env.getProperty("zookeeper.address.username"))
.namespace(env.getProperty("zookeeper.address.namespace"))
.retryPolicy(new RetryNTimes(5,1000))
.build();
//开启zookeeper的连接
curatorFramework.start();
return curatorFramework;
}
}
zookeeper分布式锁初步版本
利用zookeeper实现分布式锁,主要是利用curator高度封装的InterProcessMutex的acquire操作完成,如果acquire成功表示获取到分布式锁,如果失败表示获取锁失败。释放锁通过InterProcessMutex的release操作即可。
暴力失败版本。
/**
* zookeeper实现分布式锁
* @param productLockDto
* @return
*/
public int updateStockWithZookeeper(ProductLockDto productLockDto) throws Exception {
int res = 0;
InterProcessMutex mutex = new InterProcessMutex(curatorClient,pathPrefix);
try{
if(mutex.acquire(10L,TimeUnit.SECONDS)){
//TODO:真正的业务处理
ProductLock productLockEntity = lockMapper.selectByPrimaryKey(productLockDto.getId());
int leftStock = productLockEntity.getStock();
if(productLockEntity!=null && productLockEntity.getStock().compareTo(productLockDto.getStock())>=0){
productLockEntity.setStock(productLockDto.getStock());
res = lockMapper.updateStockForNegative(productLockEntity);
if(res>0){
log.info("基于zookeeper的分布式锁更新库存成功,剩余stock:{}",leftStock-1);
}
}
}else{
log.error("基于zookeeper,获取分布式锁失败");
throw new RuntimeException("zookeeper,获取分布式锁失败");
}
}catch (Exception e){
log.error("zookeeper,获取分布式锁失败,{}",e.fillInStackTrace());
throw new RuntimeException("获取分布式锁失败");
}finally {
mutex.release();//释放锁
}
return res;
}
这是一个简单的版本,出现异常,则直接抛出异常,就直接按照失败处理。
/**
* zookeeper实现分布式锁 cas
* @param productLockDto
* @return
*/
public int updateStockWithZookeeperCAS(ProductLockDto productLockDto){
int res = 0;
InterProcessMutex mutex = new InterProcessMutex(curatorClient,pathPrefix);
boolean flag = true;
while(flag){
try{
if(mutex.acquire(10L,TimeUnit.SECONDS)){
//TODO:真正的业务处理
flag=false;
ProductLock productLockEntity = lockMapper.selectByPrimaryKey(productLockDto.getId());
int leftStock = productLockEntity.getStock();
if(productLockEntity!=null && productLockEntity.getStock().compareTo(productLockDto.getStock())>=0){
productLockEntity.setStock(productLockDto.getStock());
res = lockMapper.updateStockForNegative(productLockEntity);
if(res>0){
log.info("基于zookeeper的分布式锁更新库存成功,剩余stock:{}",leftStock-1);
}
}
}else{
log.error("基于zookeeper,获取分布式锁失败");
flag = true;
}
}catch (Exception e){
log.error("基于zookeeper,获取分布式锁失败");
flag = true;
}finally {
try{
mutex.release();//释放锁
}catch (Exception e){
log.error("释放锁失败:重新获取锁");
flag=true;
}
}
}
return res;
}
这里提供了一个新版本,如果出现异常,则继续获取锁。
总结
其实没有过多可总结的,测试结果就不贴出了,总体来说按照的简书博客中总结的内容,就是如下轮子,第二个版本使得获取锁失败的时候也能继续加入到锁的竞争中。
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
try
{
// do some work inside of the critical section here
}
finally
{
lock.release();
}
}