Redis教程

分布式锁:Redisson源码解析-FairLock

本文主要是介绍分布式锁:Redisson源码解析-FairLock,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

分布式锁:Redisson源码解析-FairLock

  • 一、FairLock是什么
    • 核心概念
    • 代码实现
    • 提出问题
      • 加锁
      • 释放锁
  • 二、源码解析
    • 初始化
    • 加锁
      • Lua脚本剖析
      • 加锁的流程图
      • 加锁的场景图
      • 公平加锁中的特性
        • 排队加锁
        • 获取锁超时自动释放锁
        • 自动刷新超时时间
    • 锁释放

一、FairLock是什么

前面的篇章中,我们输出过Redisson分布式锁核心代码的类图,可以观察到FairLock是基于RedissonLock的子类,也就是基于RedissonLock来实现了一些其他的特性

核心概念

相比与之前的ReentrantLock和现在的FairLock,顾名思义,就很明显的可以发现是表示的公平锁,从之前的案例中可以发现并没有对加锁失败后竞争锁的线程进行顺序上的控制,而是任由线程竞争,也就是非公平的,而FairLock会通过一系列的机制来控制,获取到锁的顺序会和请求获取锁的顺序是一致的

代码实现

// 获取一个FairLock
RLock fairLock = redissonClient.getFairLock("lockName");
// 加锁
fairLock.lock();
// 释放锁
fairLock.unlock();
  • 实现的代码很简单的,几乎就是和ReentrantLock是一样的,只是在初始化锁对象的时候,是初始化了一个RedissonFairLock,而不是以前的RedissonLock

提出问题

加锁

既然FairLock是可以实现加锁的顺序的,那么整个加锁的流程就会让人好奇了,到底是如何加锁的,才能保证加锁的一致性

  • redis可能存在一个队列,利用队列的先进先出规则,记录下来加锁的线程信息
  • 获取锁如果超时了怎么办,大家都知道我们在获取锁的时候可能是会存在一个waitTime的,如果已经达到了waitTime那么如何完成这个工作的呢
  • 如果设置了leaseTime,也就是超时锁自动释放,那么又会有什么样的不同呢?
  • 可重入锁,在这个队列里又是如何完成的呢,可以假想一下,既然是可重入锁,那么就肯定是加锁成功了,只需要用持有锁的元素来和当前要加锁的线程来比较,如果是一样的,就证明是同线程加锁,可以实现可重入锁
  • 还有一个问题,就是在没有获取锁的时候,会有一个循环来不停的获取锁,这个时候应该要保证队列中元素的唯一性
  • watchdog所完成的续约的工作,应该还和之前的是一样的

释放锁

释放锁有两种情况,一个是主动释放,只有锁已被持有任务完成,会主动执行unlock指令来释放锁;还有一种就是被动释放,超时锁自动释放,也就是设置了leaseTime,倒是watchdog没有再次启动的情况

二、源码解析

初始化

// 命令执行器
this.commandExecutor = commandExecutor;
// threadWaitTime:60000*5 是直接在代码中初始化的这个时间 这里都5分钟了
this.threadWaitTime = threadWaitTime;
// 队列名称:redisson_lock_queue:{lockName}
threadsQueueName = prefixName("redisson_lock_queue", name);
// set集合名称:redisson_lock_timeout:{lockName}
timeoutSetName = prefixName("redisson_lock_timeout", name);

路啊
可以观察到其实整个初始化的过程,就是初始化了一些成员变量,比较重点关注的有下面的:

  • threadWaitTime:60000*5
  • 队列名称:redisson_lock_queue:{lockName}
  • set集合名称:redisson_lock_timeout:{lockName}

同样,我们也可以猜测threadWaitTime是获取锁的等待时间,然后还在redis中维护了一个队列和一个set集合


通过阅读源码,可以发现FairLock在整个lock的过程中,几乎都是走的RedissonLock,发现RedissonFairLock只是重载了RedissonLock的方法

  • tryLockInnerAsync 加锁
  • acquireFailedAsync 取消获取锁
  • unlockInnerAsync 释放锁
  • forceUnlockAsync 强制释放锁

下面的时间,我们也就是把这几个核心的源码流程给看一下,引入了什么不一样的机制来完成排队加锁

加锁

Lua脚本剖析

加锁的核心代码是 tryLockInnerAsync ,主要核心是一段lua脚本

下面是针对自己的理解,给这段lua脚本增加了一些注释

  • lockName
  • redisson_lock_queue:{lockName}
  • redisson_lock_timeout:{lockName}
  • uuid:threadId 加锁的线程
  • waitTime 普通不设置waitTime加锁的情况下,waitTime = threadWaitTime = 60000*5
  • currentTime 当前时间
// KEY[1] lockName
// KEY[2] threadsQueueName -> redisson_lock_queue:{lockName}
// KEY[3] timeoutSetName -> redisson_lock_timeout:{lockName}
// ARGV[1] unit.toMillis(leaseTime)
// ARGV[2] uuid:threadId 
// ARGV[3] waitTime
// ARGV[4] currentTime当前时间

-----------------------------------循环--------------------------------------------

//  (1)要不就是没有元素,不处理了
//  (2)要不就是把那些过期掉的元素给remove掉
while true do 
  // 从队列中获取第一个元素
  // lindex redisson_lock_queue:{lockName} 0
  local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
  // 如果没有获取到就直接跳出去
  if firstThreadId2 == false then
    break;
  end;
  // 去set里面获取这个队头的元素的score,也就是timeout
  local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
  // 如果这个timeout比当前时间还要小就从队列和set集合中给他挪出来
  if timeout <= tonumber(ARGV[4]) then 
    redis.call('zrem', KEYS[3], firstThreadId2);
    redis.call('lpop', KEYS[2]);
  else
    break;
  end;
end;

-------------------------------------加锁------------------------------------------

// 当前的锁lockName没有人获取,且lockName队列也不存在或者从队列中对头元素是同一个线程加锁-->代表能加锁
// 能加锁的话,就这一段代码
// (exists lockName == 0) and 
// ((exists redisson_lock_queue:{lockName} == 0) 
//  or (lindex redisson_lock_queue:{lockName} 0 ==  uuid:threadId))
if (redis.call('exists', KEYS[1]) == 0) 
  and ((redis.call('exists', KEYS[2]) == 0) 
    or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then 
  // 把当前加锁的线程给从队列和set集合中remove掉
  // lpop redisson_lock_queue:{lockName}
  redis.call('lpop', KEYS[2]);
  // zrem redisson_lock_timeout:{lockName} uuid:threadId 
  redis.call('zrem', KEYS[3], ARGV[2]);
  // 就获取set集合中的所有元素,赋值给keys
  // zrange redisson_lock_timeout:{lockName} 0 -1
  local keys = redis.call('zrange', KEYS[3], 0, -1);
  // 遍历keys
  for i = 1, #keys, 1 do 
    // 而zscore的设置是: 上一个锁的score+waitTime+currentTime
    // 让整个set集合中的元素都减掉waitTime
    // zincrby redisson_lock_timeout:{lockName} -waittime keys[i]
    redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
  end;
  // 加锁的代码
  // hset lockName uuid:threadId 1
  redis.call('hset', KEYS[1], ARGV[2], 1);
  // 设置过期时间的代码
  // pexpire lockName leaseTime
  redis.call('pexpire', KEYS[1], ARGV[1]);
  // 返回nil,加锁成功,会启动一个watchdog调度任务
  return nil;
end;

-------------------------------------可重入锁---------------------------------------

// 当前持有锁的线程是自己,可重入加锁就会走到这里来加锁了
// hexists lockName uuid:threadId 
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then 
  redis.call('hincrby', KEYS[1], ARGV[2],1);
  redis.call('pexpire', KEYS[1], ARGV[1]);
  return nil;
end;

------------------------已经有线程阻塞在队列里的,while获取锁--------------------------

// 获取set集合中,uuid:threadId key的分数,然后再给他 -waitTime-currentTime
// zscore的设置是: 上一个锁的score+waitTime+currentTime
// 实际返回的就是ttl
// zscore redisson_lock_timeout:{lockName} uuid:threadId
local timeout = redis.call('zscore', KEYS[3], ARGV[2]);
if timeout ~= false then
  return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);
end;


-----------------------------------第一次过来的互斥锁--------------------------------
// 计算并返回队列中最后一个线程的ttl,并添加到队列和set集合中

// 获取队列中的最后一个元素
// lindex threadsQueueName -1
local lastThreadId = redis.call('lindex', KEYS[2], -1);
local ttl;
// 判断队列中最后一个元素不为空,且不等于uuid:threadId
if lastThreadId ~= false and lastThreadId ~= ARGV[2] then
  // zscore redisson_lock_timeout:{lockName} lastThreadId - 当前时间
  ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);
else 
  // 只有队列中的元素为空的
  // pttl lockName
  ttl = redis.call('pttl', KEYS[1]);
end;

// timeout = ttl + waitTime + currentTime
local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);
// 设置set集合中的元素,会带上一个timeout作为score
// zadd redisson_lock_timeout:{lockName} timeout uuid:threadId 
if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then
  // 将等待的线程设置到队列中
  // rpush redisson_lock_queue:{lockName} uuid:threadId 
  redis.call('rpush', KEYS[2], ARGV[2]);
end;

// 返回ttl
return ttl;

加锁的流程图

从整个大致的流程图中我们大概可以枚举出几种情况:

  • 第一次加锁:直接进入持有lockName的hash key
  • 锁已被持有,可重入锁:直接进入将hash key中的counter+1的,重置过期时间
  • 锁已被持有,第一次出现竞争锁:获取lockName的hash过期时间 + waitTime + currentTime1,设置timeout到队列和set集合中

假设中间过去了10ms,也就是currentTime2 - currentTime1 = 10ms

  • 锁已被持有,第二次出现竞争锁:先获取队列元素,判断每个元素的timeout时间是不是比currentTime2小,小的就移出去;获取上次出现竞争锁的timeout1 - currentTime2,此时最大ttl就等于上次设置的timeout1减掉10ms,这一次的timeout2=ttl + waitTime + currentTime2,保证了返回的ttl是随着时间递减的,也保证了timeout2一定比timeout1大
  • 锁已被持有,第一次出现的竞争锁又来了:先获取队列元素,判断每个元素的timeout时间是不是比currentTime2小,小的就移出去;直接返回这个锁的timeout - waitTime - currentTime,说明了什么,说明了第二次来获取锁的时候就是waitTime的时间,相应的就是ttl了
  • 锁已被长时间持有,第一次出现竞争锁又来了:先获取队列元素,判断每个元素的timeout时间是不是比currentTime2小,小的就移出去,这里想一下他最开始的timeout=lockName的ttl+waitTime+currentTime1,那么现在第二次来就是ttl - 时间差,也就是在持有锁的ttl时间内没有获取到锁,就会把这个元素从队列和set集合中remove掉,并且其他的Set集合元素的timeout都要减掉释放的这个时间waitTime
  • 锁被释放,第二次加锁的线程来了:此时,发现出现和队列中存在元素,再一次来竞争锁是一样的锁已被持有,第一次出现的竞争锁又来了
  • 锁被释放,队头元素来了:删除队列和set集合中的元素,进入加锁逻辑,将set集合中的元素都减掉一个waitTime一个

加锁的场景图

简单画了一下,加锁的几个场景时序图——哈哈哈 伪时序图,最开始没想好咋画

公平加锁中的特性

排队加锁

排队加锁也是公平锁与RLock的主要区别,可以根据请求获取锁的顺序来获取锁

核心是引入了一个队列,来存储获取锁的顺序,同时通过维护一个zset有序集合来控制锁的超时

  • 如果出现了锁竞争,就会将这个锁入队,同时,通过设置一个timeout时间,需要关注的是,这个timemout时间会相对较大的,会加上一个waitTime=60000 × 5的时间,5分钟
  • 同时,队列中前面一个数据加锁成功了,那么他后面的没一个元素的timeout都需要减掉timeout,其实我觉的可以理解为,他认为一个锁最多被5分钟+ttl没有获取到锁就代表着失败了
  • 其他的就和RLock是一样的了,如果加锁成功,都会通过一个watchdog来续约租期
  • 当加锁失败后,就会进入一个循环,来尝试获取锁

获取锁超时自动释放锁

  • 每一次有获取锁的请求进来,都会从队列头开始找是不是有超时的存在,如果超时了就会从队列和zset集合中移除
  • 超时的只是从redis队列中和zset中移除了,此时如果后台while循环再来获取锁的时候,就会重新入队

自动刷新超时时间

  • 如果有锁加锁成功了,那么就需要对zset中的集合都减掉waitTime,刷新一下分数

锁释放

从代码结构中可以看到实际上的释放锁是重构了org.redisson.RedissonFairLock#unlockInnerAsync方法

核心也是一段lua脚本

-----------------------------------循环--------------------------------------------

//  (1)要不就是没有元素,不处理了
//  (2)要不就是把那些过期掉的元素给remove掉
while true do 
  local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
  if firstThreadId2 == false then 
    break;
  end; 
  local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
  if timeout <= tonumber(ARGV[4]) then 
    redis.call('zrem', KEYS[3], firstThreadId2); 
    redis.call('lpop', KEYS[2]); 
  else 
    break;
  end; 
end;


----------------------------要释放的所没有被持有--------------------------------------

if (redis.call('exists', KEYS[1]) == 0) then 
  local nextThreadId = redis.call('lindex', KEYS[2], 0); 
  if nextThreadId ~= false then 
    redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); 
  end; 
  return 1; 
end;


----------------------------如果持有锁的人不是自己----------------------------------
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
  return nil;
end; 

---------------------------如果是可重入的锁,就count-1---------------------------------
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
  redis.call('pexpire', KEYS[1], ARGV[2]); 
  return 0;
end; 

---------------------------如果持有的锁被释放掉了,就发一个广播消息----------------------
redis.call('del', KEYS[1]); 
local nextThreadId = redis.call('lindex', KEYS[2], 0); 
if nextThreadId ~= false then 
  redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); 
  end; 
return 1; 
    
  • 如果持有锁的人不是自己,会返回null,代码中会直接返回释放锁失败
  • 成功释放锁或者释放的锁没有被持有,也就是需要其他线程来加锁,就会发一个消息,返回1
  • 如果是成功释放锁,但是是重入锁,就count-1,返回0

主要不返回null就代表释放锁成功了

核心公平锁知识大概就这些了,主要是详细的看了加锁和释放锁的lua脚本,以及释放锁的流程

这篇关于分布式锁:Redisson源码解析-FairLock的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!