一、RedissonLock 基础机制 1-1、获得锁 getLock 1-2、加锁 tryLock 1-3、解锁 unlock 1-4、判断锁是否存在,判断锁是否当前线程持有 二、RedissonLock 扩展 2-1、RedissonFairLock 2-2、RedissonReadLock 2-3、RedissonWriteLock
在Java分布式锁最佳实践 已经知道了分布式锁的使用方式,这篇文章就来更进一步的了解在 Redisson中,到底如何去实现分布式锁。
在开始之前大家肯定也已经知道了,Redis是单线程模型,虽然是多路IO复用(可以同时接受多个请求),但每个命令本质上还是单线程去执行的,而在分布式锁的使用过程中肯定会涉及到多个命令一致性的问题,大家也肯定知道了使用的是Lua脚本。
一、RedissonLock 基础机制
在使用Redisson做分布式锁的时候,会用到 加锁、解锁、判断锁是否存在、以及锁是否被当前线程持有,下面来看看这几个常见操作是如何实现的。
1-1、获得锁 getLock
RLock lock = redissonClient.getLock(lockKey);@Overridepublic RLock getLock(String name){returnnew RedissonLock(commandExecutor, name);}从代码上可知获取的是 RedissonLock。其实 RLock的实现有很多,但最常用的还是 RedissonLock

1-2、加锁 tryLock
/** * 尝试在指定的 <code>leaseTime</code> 内获取锁。 * 如有必要,将等待指定的 <code>waitTime</code> 直到锁可用。 * * 锁将在定义好的 <code>leaseTime</code> 时间间隔后自动释放。 * * @param waitTime 获取锁的最大等待时间 * @param leaseTime 锁的持有时间 * @param unit 时间单位 * @return 如果成功获取锁,则返回 <code>true</code>; * 如果锁已被占用,则返回 <code>false</code>。 * @throws InterruptedException 如果线程被中断 */booleantryLock(long waitTime, long leaseTime, TimeUnit unit)throws InterruptedException;1-2-1、加锁的Lua脚本是什么
真正加锁的方法是 tryAcquire,如果加锁成功 返回 null,否则返回以存在锁的存活时间 (从下面的Lua脚本可以看到加锁成功返回 null)
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId){return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId){ RFuture<Long> ttlRemainingFuture;// leaseTime != -1 表示用户设置了锁存活时间if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else {// internalLockLeaseTime 默认是30s,所以如果没有设置锁默认是30s ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return; }// lock acquiredif (ttlRemaining == null) {if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else {// 如果用户没有设置锁时间,则开启看门狗模式 scheduleExpirationRenewal(threadId); } } });return ttlRemainingFuture;}tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command){return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}解析Lua脚本
// 判断 KEYS[1] 是否存在;==0 表示不存在,进入"首次创建窗口"分支if (redis.call('exists', KEYS[1]) == 0) then// Hash 字段 ARGV[2] 计数 +1(不存在则从 0 变 1) redis.call('hincrby', KEYS[1], ARGV[2], 1)// 给整个 key 设置毫秒级过期时间 ARGV[1] redis.call('pexpire', KEYS[1], ARGV[1])// 返回 nil(通常表示本次处理成功/放行,具体看调用方约定)return nilend// key 已存在时,判断字段 ARGV[2] 是否已存在;==1 表示存在if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then// 同一字段继续累计 +1 redis.call('hincrby', KEYS[1], ARGV[2], 1)// 刷新/重设整个 key 的毫秒 TTL redis.call('pexpire', KEYS[1], ARGV[1])// 返回 nil(通常表示成功/放行)return nilend// 走到这里:key 存在但该 field 不存在;返回 key 剩余 TTL(毫秒)return redis.call('pttl', KEYS[1])tip:从 evalWriteAsync 方法的定义可知
KEYS[1] = getRawName() (也就是当钱锁的名称,通过 redissonClient.getLock(lockKey) 设置) ARGV[1] = unit.toMillis(leaseTime) 锁的存活时间 ARGV[2] = getLockName(threadId) 当前线程的名字
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params){// ...}从Lua脚本可以看出Redisson的分布式锁,其实是用Redis的 hash结构
1-2-2、看门狗🐶
所谓的看门狗就是在没有设置锁存活时间的一种保护机制,默认锁是30s,当存活时间只剩下 1/3的时候就会对锁进行一个续期
// lock acquiredif (ttlRemaining == null) {if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else {// 如果用户没有设置锁时间,则开启看门狗模式 scheduleExpirationRenewal(threadId); }}本质上是启动了一个定时任务, internalLockLeaseTime / 3 执行一次
renewExpiration
privatevoidrenewExpiration(){ ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublicvoidrun(Timeout timeout)throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return; } Long threadId = ent.getFirstThreadId();if (threadId == null) {return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> {if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName());return; }if (res) {// reschedule itself renewExpiration(); } else { cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task);}执行的脚本
renewExpirationAsync
protected RFuture<Boolean> renewExpirationAsync(long threadId){return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId));}1-2-3、锁的等待
在加锁的时候,传入了 waitTime。其实就是在有一个循环,在循环里面不断的去加锁,每次加锁都判断一下是否超时了
tryLock
@OverridepublicbooleantryLock(long waitTime, long leaseTime, TimeUnit unit)throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {returntrue; } time -= System.currentTimeMillis() - current;if (time <= 0) { acquireFailed(waitTime, unit, threadId);returnfalse; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> {if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId);returnfalse; }try { time -= System.currentTimeMillis() - current;if (time <= 0) { acquireFailed(waitTime, unit, threadId);returnfalse; }while (true) {long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {returntrue; } time -= System.currentTimeMillis() - currentTime;if (time <= 0) { acquireFailed(waitTime, unit, threadId);returnfalse; }// waiting for message currentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime;if (time <= 0) { acquireFailed(waitTime, unit, threadId);returnfalse; } } } finally { unsubscribe(subscribeFuture, threadId); }// return get(tryLockAsync(waitTime, leaseTime, unit));}1-3、解锁 unlock
1-3-1、解锁的流程
@Overridepublicvoidunlock(){try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause(); } else {throw e; } }}解锁成功后的回调里面会去取消看门狗
unlockAsync
@Overridepublic RFuture<Void> unlockAsync(long threadId){ RPromise<Void> result = new RedissonPromise<>();// 解锁 RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> {// 取消看门狗 cancelExpirationRenewal(threadId);if (e != null) { result.tryFailure(e);return; }if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause);return; } result.trySuccess(null); });return result;}1-3-2、解锁的Lua
unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId){return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}Lua 脚本解析
// 若 KEYS[1] 这个 Hash 里不存在字段 ARGV[3],直接结束并返回 nil(常见含义:没有可扣减的计数/资源,或本次无需处理)if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) thenreturn nil;end;// 对字段 ARGV[3] 执行 HINCRBY -1(整型自减 1),并把自减后的新值赋给局部变量 counterlocal counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);// 如果自减后 counter 仍大于 0:说明"还剩余量/次数"if (counter > 0) then// 给整个 KEYS[1] 设置/刷新毫秒过期时间为 ARGV[2](注意:这里 TTL 用的是 ARGV[2],不是 ARGV[1]) redis.call('pexpire', KEYS[1], ARGV[2]);// 返回 0(通常表示:扣减成功但还没耗尽;具体语义以你们 Java 侧判断为准)return0;else// counter <= 0:认为计数/资源已耗尽(或减到 0 及以下) redis.call('del', KEYS[1]);// 向频道 KEYS[2] 发布一条消息,消息内容为 ARGV[1](常用于通知"用尽/释放/完成"等事件) redis.call('publish', KEYS[2], ARGV[1]);// 返回 1(通常表示:已删除 key 且已通知;或"耗尽分支")return1;end;// 理论上很难走到这里:前面的分支要么 return nil,要么 return 0/1;若脚本后续被改动导致遗漏分支,才可能落到这句return nil;tip:可重入锁:在加锁的时候可以看到也会有一个计数器,可以多次加锁,同样解锁的时候也有一个计数器
1-4、判断锁是否存在,判断锁是否当前线程持有
除了加锁和解锁,还有两个常用的方法是判断锁是否存在 isLocked,和是否当前线程持有锁 isHeldByCurrentThread
isLocked 使用 exists 命令判断key是否存在 isHeldByCurrentThread 使用 hexists 命令判断这个key里面是否有这个线程 (看看加锁的 Lua脚本是把这 线程放进去了)
二、RedissonLock 扩展
Redisson 还提供了很多特色的锁,这里我们再来看几个锁。 RedissonFairLock、RedissonReadLock、RedissonWriteLock ,这三个锁继承了 RedissonLock 所以只需要看不同的点就好了。着重看看 加锁/解锁的 Lua脚本差异。

2-1、RedissonFairLock
2-1-1、公平锁加锁
公平锁-Lua加锁脚本解析
// ========== 一段:清理等待队列队头里"已过期"的等待者(循环直到队列为空或队头仍有效)==========whiletruedo// 取等待队列 KEYS[2] 的队首元素(下标 0),通常是"下一个该轮到谁"的线程/锁持有者标识 local firstThreadId2 = redis.call('lindex', KEYS[2], 0);// 队列为空:没有等待者,跳出清理循环if firstThreadId2 == false thenbreak; end;// 在超时有序集合 KEYS[3] 里查队首线程对应的 score,并转成数字(score 里编码了它的超时/截止时间语义) local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));// 若队首线程的 timeout 已经"到期"(<= 当前时间 ARGV[4]):认为该等待已失效,需要剔除if timeout <= tonumber(ARGV[4]) then// 从超时集合移除该线程;并从队列左侧弹出队首(两者保持一致)// 注意:只处理队首过期项,不动其它线程的 timeout(注释里强调的点) redis.call('zrem', KEYS[3], firstThreadId2); redis.call('lpop', KEYS[2]);else// 队首仍有效:停止继续清理break; end;end;// ========== 二段:判断是否"现在可以获取锁"(公平性:通常要求锁空且自己排在队首或队列空)==========// KEYS[1] 是锁本体(常见实现为 Hash:field=线程标识,value=重入计数)// 条件 A:锁 key 不存在(没人持有)// 条件 B:等待队列不存在/为空,或者队列队首就是当前线程 ARGV[2](轮到我了)if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then// 获取成功前先把自己从队列与超时集合移除(我已不再"等待",而是"持有") redis.call('lpop', KEYS[2]); redis.call('zrem', KEYS[3], ARGV[2]);// 对所有仍在 KEYS[3] 里等待的线程,把它们的 score 统一减去 ARGV[3]// 语义上通常是"队列整体时间轴平移/扣掉一段等待粒度",避免后续计算漂移 local keys = redis.call('zrange', KEYS[3], 0, -1);for i = 1, #keys, 1do redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]); end;// 真正加锁:在锁 Hash 里记录当前线程 ARGV[2} 的持有计数为 1,并给锁 key 设置租约过期(毫秒 ARGV[1]) redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]);// 返回 nil:表示加锁成功(具体含义由调用方约定)return nil;end;// ========== 三段:锁已被持有,但属于"可重入"——当前线程本来就已在锁 Hash 里 ==========if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then// 重入计数 +1,并刷新锁租约 TTL redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]);return nil;end;// ========== 四段:拿不到锁;若我已经在队列/超时集合里登记过,则返回一个"还要等多久"的近似值 ==========// 注释含义:更精确的值应基于"前一个等待线程"的真实 timeout 推导,但这里用近似避免遍历队列local timeout = redis.call('zscore', KEYS[3], ARGV[2]);if timeout ~= false thenreturn timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);end;// ========== 五段:不在队列里则入队:把当前线程挂到队列尾,并在 KEYS[3] 写入它的 timeout score ==========// 取队列尾部线程 lastThreadId;用它(或锁剩余 TTL)推导我应当排队到的 timeoutlocal lastThreadId = redis.call('lindex', KEYS[2], -1);local ttl;// 若队列尾存在且不是自己:我的 ttl 近似为"队尾线程的 score - 当前时间 ARGV[4]"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);else// 否则(队列为空或尾部就是自己这类边界):用锁 KEYS[1] 的剩余 PTTL(毫秒)作为基准 ttl = redis.call('pttl', KEYS[1]);end;// 计算我在有序集合 KEYS[3] 里应写入的 timeout(把等待步长 ARGV[3] 与当前时间 ARGV[4] 叠进去)local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);// zadd 返回 1 表示新插入成功:才把线程 ARGV[2] 推到队列右侧,保证队列与 zset 同步if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then redis.call('rpush', KEYS[2], ARGV[2]);end;// 返回 ttl:给调用方一个"建议等待/轮询间隔相关"的基准值(与 Redisson tryLock 返回剩余等待时间的风格一致)return ttl;在上面的 tryLock 已经讲过了,如果没有获取到锁,同时设置了等待时间,就会有个循环继续去尝试获取锁,这个是一样的 Redis里面会维护 每个线程获取锁的顺序集合,还会维护每个线程超时等待时间
2-1-2、公平锁解锁
公平锁-Lua解锁脚本解析
// ========== 一段:清理等待队列队头里已过期的等待者(与之前脚本同构)==========whiletruedo// 取等待队列 KEYS[2] 队首线程 id local firstThreadId2 = redis.call('lindex', KEYS[2], 0);// 队列为空则结束清理if firstThreadId2 == false thenbreak; end;// 取队首在 KEYS[3](超时 ZSet)里的 score,并转成数字 local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));// score 已"到期"(<= ARGV[4] 表示的当前时间):从 ZSet 与 List 同步移除队首if timeout <= tonumber(ARGV[4]) then redis.call('zrem', KEYS[3], firstThreadId2); redis.call('lpop', KEYS[2]);else// 队首仍有效:停止清理break; end;end;// ========== 二段:锁 key 已不存在时的早退 + 唤醒队首 ==========// 若 KEYS[1](锁)不存在:不再做 hincrby,但仍尝试唤醒队列里下一个等待者(若有)if (redis.call('exists', KEYS[1]) == 0) then// 读队首 nextThreadId(可能为 false) local nextThreadId = redis.call('lindex', KEYS[2], 0);// 若有下一个等待者:向频道 KEYS[4] .. ':' .. nextThreadId 发布消息 ARGV[1](通知其可继续竞争锁)if nextThreadId ~= false then redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); end;// 返回 1:常见语义为"本次释放路径结束/锁本就不存在但仍做了唤醒"(具体以客户端封装为准)return1;end;// ========== 三段:当前线程并未持有锁(Hash 里没有 ARGV[3] 这个 field)==========// 典型含义:不是持锁者调用 unlock / 线程不匹配 → 不做破坏性操作if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) thenreturn nil;end;// ========== 四段:重入计数减一;若仍大于 0,则仍视为"继续持有锁"==========// 对 ARGV[3] 对应的重入计数执行 HINCRBY -1,得到 counterlocal counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);// 还有嵌套加锁未释放完:刷新锁租约(PEXPIRE 使用 ARGV[2] 毫秒),返回 0if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]);return0;end;// ========== 五段:counter 已降到 0 —— 真正完全释放锁 ==========// 删除整个锁 key(所有 field 一并消失)redis.call('del', KEYS[1]);// 再次读取等待队列队首,准备唤醒"下一个该抢锁的人"local nextThreadId = redis.call('lindex', KEYS[2], 0);if nextThreadId ~= false then// 与第二段相同:按线程专属频道发布 ARGV[1],驱动客户端从阻塞/订阅中醒来继续 tryLock redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]);end;// 返回 1:常见语义为"锁已彻底释放并已尝试通知后继等待者"return1;2-2、RedissonReadLock
2-2-1、读锁加锁
读锁-加锁 Lua脚本解析
// 读出 KEYS[1] 中字段 'mode':表示当前锁处于 'read' / 'write' / 尚未初始化(不存在时为 false)local mode = redis.call('hget', KEYS[1], 'mode');// ========== 分支 A:还没有 mode(锁从未被初始化)——本线程成为第一个读锁持有者 ==========if (mode == false) then// 将锁模式设为只读聚合态 'read' redis.call('hset', KEYS[1], 'mode', 'read');// 当前线程 ARGV[2] 的读重入/读次数记为 1(用 Hash field 存每个线程的读计数) redis.call('hset', KEYS[1], ARGV[2], 1);// satellite key:KEYS[2] .. ':1' 存 1,表示"第 1 层读租约对象",并单独设过期 redis.call('set', KEYS[2] .. ':1', 1); redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]);// 主 Hash KEYS[1] 也设租约,整锁元数据到期可自动清理 redis.call('pexpire', KEYS[1], ARGV[1]);// 返回 nil:读锁获取成功return nil;end;// ========== 分支 B:允许继续加读锁的条件 ==========// 1) 已是 read 模式:多个读者并存是读锁语义// 2) 或是 write 模式,但 KEYS[1] 里已存在字段 ARGV[3]:典型用于"写锁线程上的重入/升级路径的一部分"(实现细节依框架;核心是 hexists 为真才放行)if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then// 将当前线程 ARGV[2] 的读计数 +1,返回自增后的新值 ind(作为第几层读租约的序号) local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1);// 为这一层读锁单独建一个字符串 key:KEYS[2] .. ':' .. ind,值为 1,并设 PEXPIRE// 作用:每层 unlock 可精准删一个 satellite key,配合计数减一 local key = KEYS[2] .. ':' .. ind; redis.call('set', key, 1); redis.call('pexpire', key, ARGV[1]);// 读取主 Hash 当前剩余 TTL local remainTime = redis.call('pttl', KEYS[1]);// 用 max(remainTime, ARGV[1]) 刷新主 Hash 过期:避免"主元数据比某条 satellite 更早过期"导致状态不一致 redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1]));return nil;end;// ========== 分支 C:写锁被其它线程占用等情况 —— 读锁获取失败 ==========// 返回主 Hash 剩余 PTTL(毫秒),客户端常据此阻塞重试或转 tryLock 失败语义return redis.call('pttl', KEYS[1]);2-2-2、读锁解锁
读锁-解锁 Lua脚本解析
// 读出主 Hash 的锁模式:'read' / 'write';不存在则为 false(key 或 field 不存在)local mode = redis.call('hget', KEYS[1], 'mode');// ========== 分支 A:mode 都不存在 —— 通常表示"锁元数据已空/从未正确初始化"等边界 ==========// 仍向 KEYS[2] 发布 ARGV[1],并返回 1(常见:幂等 unlock / 通知等待方继续;语义以客户端为准)if (mode == false) then redis.call('publish', KEYS[2], ARGV[1]);return1;end;// ========== 分支 B:当前线程 ARGV[2] 并不在主 Hash 里 —— 不是持锁者/没有可读计数 ==========// 返回 nil:典型表示"本次 unlock 无效/不匹配"(不做 hincrby、不删 key)local lockExists = redis.call('hexists', KEYS[1], ARGV[2]);if (lockExists == 0) thenreturn nil;end;// ========== 分支 C:把当前线程的读计数减 1,并删除对应那一层的 satellite key ==========// HINCRBY -1 后,counter 为"剩余读层数"(自减后的新值)local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1);// 若已减到 0:从 Hash 删除该线程 field(不再占用读者槽位)if (counter == 0) then redis.call('hdel', KEYS[1], ARGV[2]);end;// 删除刚释放的那一层读锁对应的字符串 key:KEYS[3] .. ':' .. (counter+1)// 含义:自减前最后一层序号是 counter+1;与加锁时 hincrby 返回 ind 再 set KEYS[3]:ind 的结构配对redis.call('del', KEYS[3] .. ':' .. (counter+1));// ========== 分支 D:主 Hash 里还剩多个 field(包含 mode 与其它线程/计数信息)——锁"整体"可能仍需保留 ==========// hlen(KEYS[1]) > 1:至少还有 mode + 其它内容(其它读者或写者相关字段)if (redis.call('hlen', KEYS[1]) > 1) then// maxRemainTime 初始化为 -3:用于在多个 pttl 中取最大值(-2/-1 等 Redis PTTL 特殊值也会参与 max) local maxRemainTime = -3;// 遍历 Hash 的所有 field 名 local keys = redis.call('hkeys', KEYS[1]);for n, key in ipairs(keys)do// 读出该 field 的值并转数字:正常线程 field 的值是"读重入层数"这类数字;'mode' 等非数字会被 type 检查跳过 counter = tonumber(redis.call('hget', KEYS[1], key));iftype(counter)== 'number' then// 对该线程的每一层 i,查询其 timeout key 的剩余 TTL,聚合出全局最大剩余时间for i = counter, 1, -1do local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); maxRemainTime = math.max(remainTime, maxRemainTime); end; end; end;// 若还能算出正的剩余 TTL:把主 Hash KEYS[1] 的过期对齐到"最晚到期的那层",避免元数据先过期if maxRemainTime > 0 then redis.call('pexpire', KEYS[1], maxRemainTime);return0; end;// 若没有正的 maxRemainTime,但当前仍是写模式:直接 return 0(保持主 Hash,不做 del;具体业务以框架为准)if mode == 'write' thenreturn0; end;end;// ========== 分支 E:已无人需要保留主元数据 —— 彻底删除锁并广播唤醒 ==========redis.call('del', KEYS[1]);redis.call('publish', KEYS[2], ARGV[1]);return1;2-3、RedissonWriteLock
2-3-1、写锁加锁
写锁-加锁 Lua脚本解析
// 读取 KEYS[1] 中 'mode':'read' / 'write' / 未初始化(不存在时为 false)local mode = redis.call('hget', KEYS[1], 'mode');// ========== 分支 A:锁从未初始化 —— 本线程直接获得写锁 ==========if (mode == false) then// 标记为写模式(独占) redis.call('hset', KEYS[1], 'mode', 'write');// 当前线程 ARGV[2] 的写重入计数置为 1 redis.call('hset', KEYS[1], ARGV[2], 1);// 给整个元数据 Hash 设置租约过期(毫秒 ARGV[1]) redis.call('pexpire', KEYS[1], ARGV[1]);// 返回 nil:写锁获取成功return nil;end;// ========== 分支 B:已是写模式 —— 仅允许"同线程"重入 ==========if (mode == 'write') then// 若 Hash 里已有该线程 ARGV[2] 的 field,说明是同一线程重复加写锁if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then// 写重入计数 +1 redis.call('hincrby', KEYS[1], ARGV[2], 1);// 读出主 Hash 当前剩余 PTTL(毫秒) local currentExpire = redis.call('pttl', KEYS[1]);// 把过期时间延长为 currentExpire + ARGV[1](在剩余时间上再续租 ARGV[1]) redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]);return nil; end;end;// ========== 分支 C:拿不到写锁 ==========// 典型情况:mode 为 'read'(有读者),或写锁已被其它线程占用(同 mode=='write' 但 hexists 为 0)// 返回 KEYS[1] 的 PTTL:调用方常用来等待/重试/tryLock 失败提示return redis.call('pttl', KEYS[1]);2-3-2、写锁 解锁
写锁-解锁 Lua脚本解析
// 读出当前锁模式:'read' / 'write' / 未初始化(false)local mode = redis.call('hget', KEYS[1], 'mode');// ========== 分支 A:元数据里根本没有 mode(锁状态已空或未初始化)==========// 仍向 KEYS[2] 发布 ARGV[1]:用于唤醒其它等待方;返回 1 常见表示"幂等释放/已无锁但完成通知"if (mode == false) then redis.call('publish', KEYS[2], ARGV[1]);return1;end;// ========== 分支 B:当前是写模式,只处理"写锁释放"逻辑 ==========if (mode == 'write') then// 判断 KEYS[1] 里是否存在 ARGV[3] 对应 field(非持锁者不应能解) local lockExists = redis.call('hexists', KEYS[1], ARGV[3]);// 不存在:不是该线程持有的写锁 → 不做破坏性修改if (lockExists == 0) thenreturn nil;else// 写重入计数 -1,得到 counter local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);// 写锁仍有多层重入未释放完:刷新整 Hash 租约为 ARGV[2] 毫秒,返回 0if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]);return0;else// counter 已降到 0:删除该线程在 Hash 中的 field(不再持有写锁这一维) redis.call('hdel', KEYS[1], ARGV[3]);// 若 Hash 里只剩 1 个 field:通常只剩 'mode',没有其它线程计数 → 锁完全空if (redis.call('hlen', KEYS[1]) == 1) then redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]);else// 还剩多个 field:注释含义是仍存在"读侧相关字段/已解锁读锁后的残留结构"等// 典型语义:写锁完全退出,但 Hash 里还有读者信息 → 把模式切回 'read' 聚合态 redis.call('hset', KEYS[1], 'mode', 'read'); end;// 返回 1:常见表示"本次写锁释放已走到终点(删 key 或降级为 read)"return1; end; end;end;// ========== 分支 C:mode 不是 false 也不是 'write'(例如为 'read')==========// 本脚本按设计不处理读模式下的写 unlock → 直接返回 nilreturn nil;
夜雨聆风