原文链接:https://blog.csdn.net/u013066244/article/details/109766759
long leaseTime = 3L;
lock.lock(leaseTime, TimeUnit.SECONDS);
RedissonRedLock redLock = new RedissonRedLock(lock);
// 有参数的情况
redLock.lock(leaseTime, TimeUnit.SECONDS);
/**
* leaseTime租约时间,也就是键key的过期时间
*/
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 以毫秒来算的话,1500==1.5s * 3 = 4.5s
// 默认等待时间为4.5s
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
if (leaseTime == -1) {
// 当没有设置租约时间时,waitTime 等于默认4.5s
waitTime = baseWaitTime;
} else {
// 设置了租约时间的情况
// 将前端传入的租约时间转为毫秒
leaseTime = unit.toMillis(leaseTime);
// 租约时间赋值给waitTime
waitTime = leaseTime;
if (waitTime <= 2000) {
// 小于 2s情况
waitTime = 2000;
} else if (waitTime <= baseWaitTime) {
// ThreadLocalRandom.current() 多线程下生成随机数
// 假设传入3s,那么就会在(3/2=1s, 3s)之间产生随机数赋值给waitTime
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else {
// 假设传入6s,那么就会在(4.5s, 6s)之间产生随机数赋值给waitTime
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
}
// 假设leaseTime传入的是3s, 根据随机策略(1s, 3s),假设生成随机数是2s
while (true) {
// waitTime = 2s, leaseTime = 3s
if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
// 可以看出lock方法只能成功,才会生成退出。否则就得报异常退出
return;
}
}
}
// waitTime = 2s, leaseTime = 3s, 时间单位毫秒
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
// 设置了租约时间的情况
// 这段逻辑 就是对有租约时间的情况下,设置newLeaseTime,实际获取分布式锁时用到
if (leaseTime != -1) {
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
// 设置了等待时间的场景
// 根据我假设的情况,这里2 * 2 = 4s;
// 这里乘以2,个人认为,从申请获取锁到真正获取到锁是有时间消耗的,
// 为了防止获取到的锁不至于立马过期,所以乘以2,其实我觉得leaseTime*2也可以;
// 因为从代码最后释放锁的逻辑来看,这里的租约时间多长,并不会影响最后锁的统一释放
newLeaseTime = unit.toMillis(waitTime)*2;
}
}
// 当前时间 毫秒
long time = System.currentTimeMillis();
// 总的等待时间
long remainTime = -1;
if (waitTime != -1) {
// 因为waitTime=2s,所以remainTime=2s
remainTime = unit.toMillis(waitTime);
}
// 这里红锁代码重写了calcLockWaitTime()方法
// 根据下面代码calcLockWaitTime分析,此时lockWaitTime=1s;
// 计算每个锁的等待时间 在联锁的场景下,就等于remainTime
long lockWaitTime = calcLockWaitTime(remainTime);
// 允许获取锁失败的次数 在联锁的场景下,固定为0
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
// 循环每个redis客户端,去获取锁
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
// 红锁的情况中,根据计算规则肯定是取lockWaitTime
// lockWaitTime = 1s, remainTime=2s 取最小值 1s
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime=1s, newLeaseTime=4s 开始尝试获取锁
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
// 如果获取到了,就记录起来
acquiredLocks.add(lock);
} else {
// 失败的话,比较下是否到了失败次数
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
// 3 - 2 也就是说 只有在成功两个,失败一个情况下,才会执行这里
// 换句话说,红锁的场景下走这里,联锁场景下一定不执行这里
break;
}
// 重试机制
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
// 重置失败次数、锁列表、遍历游标,这说明要进行重试了
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
// 红锁失败场景执行,因为联锁的场景中failedLocksLimit=0
failedLocksLimit--;
}
}
// 超时控制代码块
if (remainTime != -1) {
// System.currentTimeMillis() - time 单个Redis实例获取锁花费的时间
// remainTime = remainTime - System.currentTimeMillis() - time;
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
// 从这段逻辑可以看出 remainTime 就是总的等待时间,如果超过了,还没有走出循环,说明获取锁失败
// 对已经获取到的锁进行释放
unlockInner(acquiredLocks);
return false;
}
}
}
// leaseTime = 3s
// 下面逻辑是,key到了过期时间后,Redis利用异步线程进行删除,释放锁;
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
// 设置好过期时间
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
// 同步可中断的方式来释放到期锁
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
// 红锁重写
// 根据我的假设,此时remainTime=2s,locks.size()=3
@Override
protected long calcLockWaitTime(long remainTime) {
// Math.max(2 / 3, 1) = (0, 1) = 1
// 说明:按照下面算法,应该是每个实例等待时间和1进行比较,取最大值
return Math.max(remainTime / locks.size(), 1);
}
// 锁可以失败的次数,锁的数量-锁成功客户端最小的数量
protected int failedLocksLimit() {
return locks.size() - minLocksAmount(locks);
}
// 假设waitTime=1s, leaseTime=3s,单位毫秒
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 设置了租约时间的情况
if (leaseTime != -1) {
// tryLockInnerAsync 真正加锁的方法,其会调用脚本执行redis命令插入key
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 没有设置租约时间的情况
// 程序(看门狗)会设置一个默认值为30s的租约时间
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired 获取锁成功
if (ttlRemaining == null) {
// 对该设置时间轮询器,也可以理解为监听器
// 该监听器会以租约时间的三分之一的频率,不断延迟租约时间
// 目的是为了防止,业务程序还没有跑完,锁就被释放掉了。
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 说明该线程已经获取到锁了,
// 内部变量值会加1,即:可重入锁
oldEntry.addThreadId(threadId);
} else {
// 该线程第一次获取到该锁
entry.addThreadId(threadId);
// 启动时间轮询器,开始每隔leaseTime/3的时间,不断去延长租约时间
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// newTimeout 底层使用的是netty
//
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
// 有异常,结束
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
// 递归调用
renewExpiration();
}
});
}
// 定时器的时间为租约时间的三分之一
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),
// 并通过pexpire设置失效时间(也是锁的租约时间)
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[2] .. ':1', 1); " +
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"local remainTime = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
"return nil; " +
"end;" +
// 获取分布式锁的KEY的失效时间毫秒数
"return redis.call('pttl', KEYS[1]);",
// 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// 释放锁时需要在redis实例上执行的lua命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果分布式锁KEY不存在,那么向channel发布一条消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 如果就是当前线程占有分布式锁,那么将重入次数减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
long leaseTime = 3L;
lock.lock(leaseTime, TimeUnit.SECONDS);
RedissonRedLock redLock = new RedissonRedLock(lock);
// 有参数的情况
redLock.lock();
// 租约时间为 -1 相当于没有设置
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
waitTime=4.5s leaseTime = -1
...
long remainTime = -1;
if (waitTime != -1) {
// remainTime = waitTime = 4.5s
remainTime = unit.toMillis(waitTime);
}
// 这里红锁代码重写了calcLockWaitTime()方法
// 根据下面代码calcLockWaitTime分析,此时lockWaitTime=1s;
// 计算每个锁的等待时间 (4.5/3 = 1, 1)取最大,=1
long lockWaitTime = calcLockWaitTime(remainTime);
// 允许获取锁的失败次数=1
int failedLocksLimit = failedLocksLimit();
...
...
// min(1, 4.5) = 1
// 在redLock场景中lockWaitTime永远都会比remainTime值小
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime=1, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
...
// 无参的情况下leaseTime=-1,也就说无参的情况下,必须手动释放锁;redis不会自动释放
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
...
calcLockWaitTime()
failedLocksLimit()
long leaseTime = 3L;
RedissonMultiLock multiLock = new RedissonMultiLock(lock);
multiLock.lock(leaseTime, TimeUnit.SECONDS);
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// leaseTime=3s
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 默认等待时间
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
if (leaseTime == -1) {
// waitTime = 3 * 1.5 = 4.5s
waitTime = baseWaitTime;
} else {
leaseTime = unit.toMillis(leaseTime);
// 3s
waitTime = leaseTime;
if (waitTime <= 2000) {
waitTime = 2000;
} else if (waitTime <= baseWaitTime) {
// 3 <= 4.5
// 3/2=1, 3 假设随机数为2
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else {
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
}
while (true) {
// waitTime 2s,leaseTime=3s
if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
return;
}
}
}
long remainTime = -1;
if (waitTime != -1) {
// 2s
remainTime = unit.toMillis(waitTime);
}
// 联锁的场景下,就是remainTime=2s
long lockWaitTime = calcLockWaitTime(remainTime);
// 联锁的场景下,固定为0
int failedLocksLimit = failedLocksLimit();
...
// 联锁的场景下,lockWaitTime和remainTime是相等的;
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = remainTime = 2s,newLeaseTime=4s
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
if (failedLocksLimit == 0) {
// 失败次数用完了
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
// 重置失败次数、锁列表、遍历游标,这说明要进行重试了
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
}
// 时间控制 可以看出上面的逻辑存在重试机制,所以才有下面的超时判断逻辑
if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
RedissonMultiLock multiLock = new RedissonMultiLock(lock);
multiLock.lock();
waitTime=4.5s leaseTime=-1
long remainTime = -1;
if (waitTime != -1) {
// remainTime = waitTime=4.5s
remainTime = unit.toMillis(waitTime);
}
// lockWaitTime = remainTime = 4.5s
long lockWaitTime = calcLockWaitTime(remainTime);
// 固定为0
int failedLocksLimit = failedLocksLimit();
...
// lockWaitTime = remainTime = 4.5s
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime=4.5s newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
RedissonRedLock redLock = new RedissonRedLock(lock);
redLock.tryLock();
public boolean tryLock() {
try {
return tryLock(-1, -1, null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
waitTime=-1s leaseTime=-1
...
// 红锁 Max(-1/3, 1) 为1
long lockWaitTime = calcLockWaitTime(remainTime);
// 红锁 3 - 3/2+1 = 1
int failedLocksLimit = failedLocksLimit();
...
...
if (waitTime == -1 && leaseTime == -1) {
// 逻辑会走这里 尝试获取锁
lockAcquired = lock.tryLock();
}
...
tryLock(waitTime, -1, unit);
...
long remainTime = -1;
if (waitTime != -1) {
// remainTime = 7s
remainTime = unit.toMillis(waitTime);
}
//max(7/3=2, 1)=2
long lockWaitTime = calcLockWaitTime(remainTime);
// 1
int failedLocksLimit = failedLocksLimit();
...
...
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = 2s, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
// lockWaitTime = remainTime = waitTime =7s
long lockWaitTime = calcLockWaitTime(remainTime);
// 0
int failedLocksLimit = failedLocksLimit();
...
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = remainTime = 7s, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
tryLock(-1, -1, null);
// lockWaitTime = remainTime = waitTime =-1s
long lockWaitTime = calcLockWaitTime(remainTime);
// 0
int failedLocksLimit = failedLocksLimit();
...
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
}
...