基于 Redis 的 Redisson 分布式联锁 RedissonMultiLock 对象可以将多个 RLock 对象关联为一个联锁,每个 RLock 对象实例可以来自于不同的 Redisson 实例。
当然,这是官网的介绍,具体是什么?一起看看联锁 MultiLock 使用以及源码吧!
按照官方文档的说法,这里 Redisson 客户端可以不是同一个。当然,一般工作中也不会说不用一个客户端吧。
源码入口:org.redisson.RedissonMultiLock#lock()
,默认超时时间 leaseTime 没有设置,所以为 -1。
public void lock(long leaseTime, TimeUnit unit) { try { lockInterruptibly(leaseTime, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
这块方法太长,咱们拆分进行阅读。
- 基础等待时间 baseWaitTime = 锁数量 * 1500,在这里就是 4500 毫秒;
- leaseTime == -1 所以 waitTime = baseWaitTime,也就是 4500;
- while (true) 调用 tryLock 加锁,直到成功。
调用 tryLock 方法,其中参数 waitTime = 4500,leaseTime = -1,unit = MILLISECONDS
。
下面看一下 tryLock 里面有什么逻辑?
leaseTime != -1 不满足,这部分直接跳过。
waitTime != -1 条件满足,remainTime = 4500,lockWaitTime = 4500。
所以,failedLocksLimit() 这个方法直接返回 0,就是必须全部加锁成功。
这里才是重点:遍历所有的锁,依次加锁。
加锁逻辑就和可重入锁加锁并无区别了。所以 Lua 脚本就不进行分析了。
上面就是 tryLock 加锁之后的结果。
加锁成功,则将成功的锁放进 acquiredLocks 集合中;加锁失败,需要判断 failedLocksLimit
,因为这里是 0,所以会直接对成功加锁集合 acquiredLocks 中的所有锁执行锁释放,同时清空成功集合,恢复迭代器。
每次加锁之后,会更新锁剩余时间 remainTime,如果 remainTime 小于等于 0 了,则说明加锁超时,直接返回 false。这样就会执行外部的 while (true) 逻辑,然后重新再走一遍 RedissonMultiLock#tryLock
。
就是将 key1、key2、key3 …… keyN 放到一个 List 集合中,然后迭代循环加锁,直到所有的都成功。
- tryLock() 它表示用来尝试获取锁,
如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false
- tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,
只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。
如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true- tryLock(long waitTime, long leaseTime, TimeUnit unit)
在2的基础上,如果获取到锁,锁的最长持有时间为leaseTime
看完加锁逻辑,锁释放就更容易理解了。
直接遍历释放锁即可,lock.unlockAsync() 是调用的 RedissonBaseLock#unlockAsync() 方法。
建立一个三主三从的redis集群,参考文章
创建springboot项目
redissonCluster.yml
clusterServersConfig: # 连接空闲超时,单位:毫秒 默认10000 idleConnectionTimeout: 10000 pingConnectionInterval: 1000 # 同任何节点建立连接时的等待超时。时间单位是毫秒 默认10000 connectTimeout: 10000 # 等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认3000 timeout: 3000 # 命令失败重试次数 retryAttempts: 3 # 命令重试发送时间间隔,单位:毫秒 retryInterval: 1500 # 重新连接时间间隔,单位:毫秒 failedSlaveReconnectionInterval: 3000 # 执行失败最大次数 #failedAttempts: 3 # 密码 password: # 单个连接最大订阅数量 subscriptionsPerConnection: 5 clientName: null # loadBalancer 负载均衡算法类的选择 loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {} # 主节点最小空闲连接数 默认32 masterConnectionMinimumIdleSize: 32 # 主节点连接池大小 默认64 masterConnectionPoolSize: 64 # 订阅操作的负载均衡模式 subscriptionMode: SLAVE # 只在从服务器读取 readMode: SLAVE # 集群地址 nodeAddresses: - "redis://xxx.xxx.xxx.xxx:9001" - "redis://xxx.xxx.xxx.xxx:9002" - "redis://xxx.xxx.xxx.xxx:9003" # 对Redis集群节点状态扫描的时间间隔。单位是毫秒。默认1000 scanInterval: 1000 #这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。默认2 threads: 0 #这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。默认2 nettyThreads: 0 # 编码方式 默认org.redisson.codec.JsonJacksonCodec codec: !<org.redisson.codec.JsonJacksonCodec> {} #传输模式 transportMode: NIO # 分布式锁自动过期时间,防止死锁,单位毫秒,默认30s,每1/3的lockWatchdogTimeout时间,如果没执行玩业务,会自动给锁续约 lockWatchdogTimeout: 30000 # 通过该参数来修改是否按订阅发布消息的接收顺序出来消息,如果选否将对消息实行并行处理,该参数只适用于订阅发布消息的情况, 默认true keepPubSubOrder: true # 用来指定高性能引擎的行为。由于该变量值的选用与使用场景息息相关(NORMAL除外)我们建议对每个参数值都进行尝试。 # #该参数仅限于Redisson PRO版本。 #performanceMode: HIGHER_THROUGHPUT
@Configuration public class RedissonHttpSessionConfig { //服务停用后调用shutdown方法 @Bean(destroyMethod="shutdown") public RedissonClient getRedissonClient() throws IOException { ResourceLoader loader = new DefaultResourceLoader(); Resource resource = loader.getResource("redissonCluster.yml"); Config config = Config.fromYAML(resource.getInputStream()); return Redisson.create(config); } }
@Component public class RedissonMultiLockInit { private final ArrayList<RLock> rLockList=new ArrayList<>(); @Autowired RedissonClient redissonClient; public RedissonMultiLock initLock(String... locksName){ for (String lockName : locksName) { rLockList.add(redissonClient.getLock(lockName)); } RLock[] rLocks = rLockList.toArray(new RLock[0]); return new RedissonMultiLock(rLocks); } public List<RLock> getRLocks(){ return rLockList; } }
@Controller @RequestMapping("/lock") public class LockController { @Autowired RedissonMultiLockInit redissonMultiLockInit; @Autowired UserMapper userMapper; @Autowired PlatformTransactionManager transactionManager; @GetMapping("/get/{waitTime}/{leaseTime}") @ResponseBody public String getLock(@PathVariable long waitTime, @PathVariable long leaseTime) throws InterruptedException { String[] strings={"test1","test2","test3"}; RedissonMultiLock lock = redissonMultiLockInit.initLock(strings); //手动开启事务管理,@Transitional无法控制redis的分布式锁 //创建事务定义对象 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); //设置是否只读,false支持事务 def.setReadOnly(false); //设置事务隔离级别,可以重复读mysql默认级别 def.setIsolationLevel(TransactionDefinition.ISOLATION_REPEATABLE_READ); //设置事务传播行为 def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); //配置事务管理器 TransactionStatus status = transactionManager.getTransaction(def); if (lock.tryLock(waitTime,leaseTime, TimeUnit.SECONDS)){ System.out.println(Thread.currentThread().getName()+" waiting time is "+waitTime+"s " + "leaseTime is "+leaseTime+"s "+ "execute time is "+(leaseTime+10)+" s" ); try { userMapper.updateById(new User(1L,23,"beijing","myname2")); //模拟执行超时释放锁 Thread.sleep((leaseTime+10)*1000); List<RLock> rLocks = redissonMultiLockInit.getRLocks(); //判断是否仍然持有所有锁,防止锁过期 if(rLocks.stream().allMatch(RLock::isLocked)){ //提交业务 transactionManager.commit(status); //提交业务后再释放分布式锁 lock.unlock(); return "unlock success,transition success"; } else { //回滚业务 transactionManager.rollback(status); return "lock is expired,transition fail"; } } catch (Exception e) { e.printStackTrace(); return "transition error"; } } else { return Thread.currentThread().getName()+" can't get the lock,because the waiting time isn't enough. Waiting time is "+waitTime+"s, " + "leaseTime is "+leaseTime+"s "; } } }
http://localhost:8090/lock/get/6/-1
,表示最多有6s的等待获取锁的时间,并且业务的执行时间可以无续约(启用看门狗机制),那么这次业务是一定会成功的
http://localhost:8090/lock/get/6/9
,表示最多有6s的等待获取锁的时间,并且最多有9s的业务执行时间,超时就会释放分布式锁,业务失败,由于我在controller中写死了业务超过了9s,所以这次业务肯定失败。
http://localhost:8090/lock/get/6/-1
和http://localhost:8090/lock/get/2/-1
由于第一次业务要花费9s的业务执行时间,那么第二次业务无法在2s的时间内获取到分布式锁
,会退出此次业务。
参考文章