①、注入redissonClient客户端
②、通过redissonClient客户端获取锁对象rLock
③、通过rLock尝试获取锁
// ①、注入redissonClient客户端 @Autowired private RedissonClient redissonClient; public boolean getLock() { // ②、通过redissonClient客户端获取锁对象rLock(RedissonLock实现默认是可重入锁) rLock = redissonClient.getLock(lockInfo.getName()); try { // ③、通过rLock尝试获取锁 return rLock.tryLock(lockInfo.getWaitTime(), lockInfo.getLeaseTime(), TimeUnit.SECONDS); } catch (InterruptedException e) { log.info("获取可重入锁时线程被意外中断,锁名称:{},异常信息:{}", lockInfo.getName(), e); } return false; }
RedissonLock
。RedissonLock
类提供了可重入锁的实现。RedissonFairLock
)、读锁(RedissonReadLock
)、写锁(RedissonWriteLock
)、可重入锁(RedissonLock
)、联锁(RedissonMultiLock
)、红锁(RedissonRedLock
)上面说到redisson实现分布式锁最核心的类是RedissonLock(很多锁都是基于该类实现的)
(红锁和联锁后面介绍),那么下面看看RedissonLock
的继承关系和子类以及类的结构,以便于更好的理解源码。
redissonLock类作为一个通用的模板类(默认提供可重入锁实现),其他很多类型的锁都基于该类去实现不同的加锁方式。
可以看到RedissonLock
类有四个子类实现,这些子类分别覆写了加锁释放锁的核心方法。
RedissonLock
锁的继承以及实现关系,下面我们就找一个类型的锁实现来分析加锁解锁源码!这里以RedissonLock
默认实现的可重入锁为例进行源码分析。RedissonLock
实现了多种加锁的接口,比如:带等待时间和释放时间的加锁接口(tryLock(long waitTime, long leaseTime, TimeUnit unit)
)、只带等待时间的加锁接口(tryLock(long waitTime, TimeUnit unit)
)、既不带等待时间也不带锁释放时间的加锁接口(tryLock()
)、只带释放时间没有等待时间的接口(lock(long leaseTime, TimeUnit unit)
)等,这里我们挑选一个参数最多的tryLock(long waitTime, long leaseTime, TimeUnit unit)
方法来分析。下面所有的源码都来自于RedssionLock
类!!!
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 1、将等待时间转换为毫秒、、获取当前时间、获取当前线程ID long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); // 2、尝试申请锁,返回还剩余的锁过期时间【加锁核心方法,下面(②、加锁核心方法tryAcquire分析)】 Long ttl = tryAcquire(leaseTime, unit, threadId); // 3、ttl==null 表示获取锁成功则直接返回true // lock acquired if (ttl == null) { return true; } // 4、获取还需要等待的时间,且根据还需等待的时间(time)判断是否获取锁失败 time -= (System.currentTimeMillis() - current); // 如果还需要等待的时间为0,则说明获取锁已经失败了 // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败 if (time <= 0) { acquireFailed(threadId); return false; } // 重新获取当前时间 current = System.currentTimeMillis(); // 5、上面第一次尝试获取锁失败,且还没有超出最大等待时间的基础上,基于Redis的发布订阅机制,订阅锁释放事件 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); /* * 6、基于Redis的发布订阅机制,订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题: * 基于信号量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争 * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败 * 当 this.await返回true,进入下面的循环再次尝试获取锁 * * await是通过CountDownLatch + 监听器机制来实现的,具体看方法内部注释,见下面【③、await加锁最大等待时间方法分析】 */ if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { // 在等待时间耗完的情况下,取消对该锁的订阅 if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } // 获取锁失败 acquireFailed(threadId); return false; } // 7、如果在等待时间内订阅的锁已经被释放了,则会执行这里 try { // 获取还需要等待的时间 time -= (System.currentTimeMillis() - current); // 如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败 if (time <= 0) { acquireFailed(threadId); return false; } // 8、能运行到这里则说明: // 1、当前还在最大等待时间内 // 2、并且等待的锁已经被释放(即对该锁的订阅事件已经被吊起过),在这里可以再次尝试获取锁 // 这是一个死循环,循环退出条件有两个: // ①、在最大等待时间内成功获取锁,返回true // ②、超出了最大等待时间,但仍然没有成功获取到锁,返回false while (true) { // 获取当前时间 long currentTime = System.currentTimeMillis(); // 8.1、再次尝试申请锁,返回还剩余的锁过期时间 ttl = tryAcquire(leaseTime, unit, threadId); // 8.2、ttl==null 表示获取锁成功则直接返回true // lock acquired if (ttl == null) { return true; } // 再次计算还需要等待多时时间 time -= (System.currentTimeMillis() - currentTime); // 8.3、如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败 if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message // 更新一下当前时间,因为上面的操作可能会耗时,进而导致下面根据currentTime计算的time不准确 currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { // 8.4、如果剩余时间(ttl)小于waittime ,就在 ttl 时间内,从Entry的信号量(Semaphore)获取一个许可(除非被中断或者一直没有可用的许可)。 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { // 8.5、如果该锁剩余过期时间(ttl)大于waittime,则就在waittime 时间范围内等待可以通过信号量(Semaphore) getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } // 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间) time -= (System.currentTimeMillis() - currentTime); // 8.6、如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败 if (time <= 0) { acquireFailed(threadId); return false; } } } finally { // 9、无论是否获得锁,都要取消订阅解锁消息 unsubscribe(subscribeFuture, threadId); } }
/** * 尝试获取锁,如果没有获取到锁则返回该锁还剩余多少毫秒过期,如果获取到了锁,则返回空 */ // 【1、尝试获取锁】 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { // tryAcquireAsync方法返回一个RFuture<Long>类型,get方法主要就是取得RFuture中的数值 // 该数值就是该锁还剩余的过期时间(如果为空,则表示已经获取到锁了,反之则表示该锁还剩多久过期) // 见下面【2、异步尝试获取锁】 return get(tryAcquireAsync(leaseTime, unit, threadId)); } // 【2、异步尝试获取锁】 private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { // 1、如果有设置锁过期时间 if (leaseTime != -1) { // 调用tryLockInnerAsync,【通过lua脚本去加锁】,见下面【3、通过调用lua脚本去真正开始加锁】 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 2、如果获取锁时没有传递锁过期时间,则这里会给个默认过期时间30s(通过执行lua脚本去获取锁) RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // 3、给获取锁的操作添加一个监听,当获取锁的操作返回时(不管成功还是失败),立即调用监听方法 ttlRemainingFuture.addListener(new FutureListener<Long>() { // 当获取锁的操作执行结束时,该方法被吊起 @Override public void operationComplete(Future<Long> future) throws Exception { // 3.1、如果获取锁失败,则直接返回 if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired // ttlRemaining == null 则说明获取锁成功 if (ttlRemaining == null) { // 3.2、如果获取锁成功了,则开启定时任务去定时延长锁过期时间(看门狗) scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } // 【3、通过调用lua脚本去真正开始加锁】 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // 将锁过期时间转换为毫秒 internalLockLeaseTime = unit.toMillis(leaseTime); // 通过lua脚本去获取锁(可重入锁) // pttl命令和ttl命令类似,只是他是以毫秒为单位返回剩余过期时间,ttl是以秒为单位 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()) , internalLockLeaseTime, getLockName(threadId)); }
protected boolean await(RFuture<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException { // 具体实现见下面 return commandExecutor.await(future, timeout, timeoutUnit); } public boolean await(RFuture<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException { // 创建一个门栓 final CountDownLatch l = new CountDownLatch(1); // 当订阅的锁被释放后会吊起这个监听方法,在监听方法内部将门栓数量减一 future.addListener(new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { // 监听方法被吊起,门栓数量减一 l.countDown(); } }); // 在这里等待门栓数量为0,或超时时间到了再继续运行 // 在等待时间内如果订阅的锁已经释放,监听方法会被吊起门栓数量为0,则这里返回true // 如果等待时间已经耗完了,订阅的锁还没被释放的话,则这里返回false return l.await(timeout, timeoutUnit); }
释放锁的代码逻辑比较简单,这里只描述一下释放锁的lua脚本的大体流程即可。
public void unlock() { try { // 释放锁的核心方法unlockAsync get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException)e.getCause(); } else { throw e; } } }
RFuture可以理解为对Future的一个增强,netty中的实现。JDK中的future只能阻塞获取子线程的返回,在netty中对future进行了增强,可以添加监听并且异步获取。
public RFuture<Void> unlockAsync(final long threadId) { final RPromise<Void> result = new RedissonPromise<Void>(); // 1、具体的释放锁的lua脚本(释放锁的动作在这里完成) RFuture<Boolean> future = unlockInnerAsync(threadId); // 2、添加一个监听器,一旦释放锁的操作完成(无论失败或成功),都会吊起监听器的operationComplete方法 future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { // 3、如果释放锁失败了 if (!future.isSuccess()) { cancelExpirationRenewal(threadId); result.tryFailure(future.cause()); return; } Boolean opStatus = future.getNow(); if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } if (opStatus) { cancelExpirationRenewal(null); } // 4、释放锁成功 result.trySuccess(null); } }); return result; }
// 释放锁的lua脚本,这个lua脚本很简单不过多解锁 protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
关于这个问题,从源代码中可以看到如果自己设置了锁过期时间那么会直接调用
tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)
方法去获取锁,获取到了以后就直接返回了,并没有看到在哪里会去开启一个看门狗。如果是这样的话,那么是不是在我们调用
tryLock
方法的时候,如果自己指定了锁的过期时间,是不是就意味着没有定时线程(看门狗)去定期的延长锁的过期时间了???那这样是不是就不能保证分布式锁的安全性了????
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { // 1、如果有设置锁过期时间 if (leaseTime != -1) { // 调用tryLockInnerAsync,通过lua脚本去加锁【这里加锁成功后就直接返回了,并没有添加看门狗延长过期时间????】 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 2、如果获取锁时没有传递锁过期时间,则这里会给个默认过期时间30s(通过执行lua脚本去获取锁) RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // 3、给获取锁的操作添加一个监听,当获取锁的操作返回时(不管成功还是失败),立即调用监听方法 ttlRemainingFuture.addListener(new FutureListener<Long>() { // 当获取锁的操作执行结束时,该方法被吊起 @Override public void operationComplete(Future<Long> future) throws Exception { // 3.1、如果获取锁失败,则直接返回 if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // ttlRemaining == null 则说明获取锁成功 if (ttlRemaining == null) { // 3.2、如果获取锁成功了,则开启定时任务去定时延长锁过期时间(看门狗) scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
1、慢谈 Redis 实现分布式锁 以及 Redisson 源码解析
2、RedissonMultiLock + RedissonLock部分源码
3、redisson-2.10.4源代码分析
4、redis客户端redisson实战