Redis系列1:深刻理解高性能Redis的本质
Redis系列2:数据持久化提高可用性
Redis系列3:高可用之主从架构
Redis系列4:高可用之Sentinel(哨兵模式)
Redis系列5:深入分析Cluster 集群模式
追求性能极致:Redis6.0的多线程模型
追求性能极致:客户端缓存带来的革命
Redis系列8:Bitmap实现亿万级数据计算
Redis系列9:Geo 类型赋能亿级地图位置计算
Redis系列10:HyperLogLog实现海量数据基数统计
Redis系列11:内存淘汰策略
Redis系列12:Redis 的事务机制
分布式锁,即分布式系统中的锁,我们通过锁解决 控制共享资源访问 的问题,来保证只有一个线程可以访问被保护的资源。
等等,本篇基于Redis角度进行讨论
SETNX 是 set if not exists 的缩写,当且仅当 key 不存在时,则设置 value 给这个key。若给定的 key 已经存在,则 SETNX 不做任何动作。
命令的返回值说明:
举例说明:setnx lock.key lock.value
> SETNX lock.user_063105015 1 (integer) 1 # 获取编号为 063105015 用户成功
如果已经被获取过了,则获取失败
> SETNX lock.user_063105015 1 (integer) 0 # 获取编号为 063105015 用户失败
获取key的值,如果存在,则返回;如果不存在,则返回nil
# 获取成功 > GET lock.user_063105015 "1" # 获取失败 > GET lock.user_123456789 (nil)
原子的设置值的办法,对key设置newValue这个值,并且返回key原来的旧值。
# 重置用户信息 > getset lock.user_063105015 0 "1" # 原值为1 # 再次重置 > getset lock.user_063105015 1 "0" # 原值为0
> DEL lock.user_063105015 (integer) 1
具体执行流程如下:
可能会因为一些场景,造成锁无法释放,如下:
超时释放其实就是重置,目的是避免因为各种原因导致的锁长时间无法释放。
做法就是我们给锁加个过期时间(EXPIRE Time):
# 给用户 063105015 加锁 > SETNX lock.user_063105015 1 (integer) 1 # 设置过期时间,到时间没删除则自动释放 > EXPIRE lock.user_063105015 120 # 120秒之后自动释放 (integer) 1
为了保证执行时的原子性,Redis 官方扩展了 SET 命令,既能满足获取对象,又能保证设置超时的时间语义。
避免出现了获取锁完成之后,执行超时设置失败微软无法释放锁的情况。保证要么都成功,要么都不执行。
# 示例如下: SET lock.user_063105015 1 NX PX 60000
经常会出现一种情况,就是你获取到锁之后,因为各种原因(比如你的服务线程故障、网络抖动 等等),没有执行完成,或者没有释放锁,
这时候锁也过了 EXPIRE TIME,就自动释放了。当另外一个线程开锁成功,你的线程响应过来了,把人家的锁给释放了,这样就有问题了。
为了避免这种操作,我们要对同一个的锁做唯一识别码,在释放锁之前,先判断下是不是自己设置的那个锁,如下:
# 设置10086专用值 > SET lock.user_063105015 10086 NX PX 60000 OK # 设置成功,获取检查确实是10086 > get lock.user_063105015 "10086" # 伪代码:删除前进项确认是不是自己加的那个锁 if ( redis.get("lock.user_063105015").equals("10086")) { redis.del("lock.user_063105015"); // 只有对比成功才进行删除,释放锁 }
可重入锁可以理解为重新进入,由多于一个任务并发使用,而不必担心数据错误。
这边说说可重入锁,比如你执行线程的方案a获取锁之后,你的a方法后,线程继续执行b方法也需要获取锁,如果这时候不可重入,
线程就需要等待锁的释放,进入争抢。
这边的解法就是对线程加锁的锁值进行增减,同一个线程的方法遇到加锁则锁值+1,遇到退锁则锁值-1,当前仅当锁值=0的时候,说明这个锁真正的被释放了。
Java中的Redisson 类库就是通过 Redis Hash 来实现可重入锁。
加锁的逻辑
我们可以使用 Redis hash 结构实现,key 表示被锁的共享资源, hash 结构的 fieldKey 的 value 则保存加锁的次数。
实现如下( KEYS1 = "lock.user_063105015", ARGV [10000,uuid):
KEYS[1] = key的值
ARGV[1]) = 持有锁的时间
ARGV[2] = getLockName(threadId) 下面id就算系统在启动的时候会全局生成的uuid 来作为当前进程的id,加上线程id就是getLockName(threadId)了,可以理解为:进程ID+系统ID = ARGV[2]
# 1 为 true # 0 为 false if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
参数说明
程序说明