前面讲了Redis在实际业务场景中的应用,那么下面再来了解一下Redisson功能性场景的应用,也就是大家经常使用的分布式锁的实现场景。
引入redisson依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.0</version> </dependency>
编写简单的测试代码
public class RedissonTest { private static RedissonClient redissonClient; static { Config config=new Config(); config.useSingleServer().setAddress("redis://192.168.221.128:6379"); redissonClient=Redisson.create(config); } public static void main(String[] args) throws InterruptedException { RLock rLock=redissonClient.getLock("updateOrder"); //最多等待100秒、上锁10s以后自动解锁 if(rLock.tryLock(100,10,TimeUnit.SECONDS)){ System.out.println("获取锁成功"); } Thread.sleep(2000); rLock.unlock(); redissonClient.shutdown(); } }
你们会发现,通过redisson,非常简单就可以实现我们所需要的功能,当然这只是redisson的冰山一角,redisson最强大的地方就是提供了分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的并发程序的工具包获得了协调分布式多级多线程并发系统的能力,降低了程序员在分布式环境下解决分布式问题的难度,下面分析一下RedissonLock的实现原理
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); //通过tryAcquire方法尝试获取锁 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { //表示成功获取到锁,直接返回 return true; } //省略部分代码.... }
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; //leaseTime就是租约时间,就是redis key的过期时间。 if (leaseTime != -1) { //如果设置过期时间 ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else {//如果没设置了过期时间,则从配置中获取key超时时间,默认是30s过期 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } //当tryLockInnerAsync执行结束后,触发下面回调 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { //说明出现异常,直接返回 return; } // lock acquired if (ttlRemaining == null) { //表示第一次设置锁键 if (leaseTime != -1) { //表示设置过超时时间,更新internalLockLeaseTime,并返回 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //leaseTime=-1,启动Watch Dog scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
通过lua脚本来实现加锁的操作
判断lock键是否存在,不存在直接调用hset存储当前线程信息并且设置过期时间,返回nil,告诉客户端直接获取到锁。
判断lock键是否存在,存在则将重入次数加1,并重新设置过期时间,返回nil,告诉客户端直接获取到锁。
被其它线程已经锁定,返回锁有效期的剩余时间,告诉客户端需要等待。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
关于Lua脚本,我们稍后再解释。
释放锁的流程,脚本看起来会稍微复杂一点
如果lock键不存在,通过publish
指令发送一个消息表示锁已经可用。
如果锁不是被当前线程锁定,则返回nil
由于支持可重入,在解锁时将重入次数需要减1
如果计算后的重入次数>0,则重新设置过期时间
如果计算后的重入次数<=0,则发消息说锁已经可用
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
有竞争的情况在redis端的lua脚本是相同的,只是不同的条件执行不同的redis命令。当通过tryAcquire发现锁被其它线程申请时,需要进入等待竞争逻辑中
this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
this.await返回true,进入循环尝试获取锁。
继续看RedissonLock.tryLock后半部分代码如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { //省略部分代码 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); // 订阅监听redis消息,并且创建RedissonLockEntry RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。 if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { //取消订阅 subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); //表示抢占锁失败 return false; //返回false } try { //判断是否超时,如果等待超时,返回获的锁失败 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } //通过while循环再次尝试竞争锁 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); //竞争锁,返回锁超时时间 // lock acquired if (ttl == null) { //如果超时时间为null,说明获得锁成功 return true; } //判断是否超时,如果超时,表示获取锁失败 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 通过信号量(共享锁)阻塞,等待解锁消息. (减少申请锁调用的频率) // 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。 // 否则就在wait time 时间范围内等待可以通过信号量 currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } // 更新等待时间(最大等待时间-已经消耗的阻塞时间) time -= System.currentTimeMillis() - currentTime; if (time <= 0) { //获取锁失败 acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); //取消订阅 } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
一般来说,我们去获得分布式锁时,为了避免死锁的情况,我们会对锁设置一个超时时间,但是有一种情况是,如果在指定时间内当前线程没有执行完,由于锁超时导致锁被释放,那么其他线程就会拿到这把锁,从而导致一些故障。
为了避免这种情况,Redisson引入了一个Watch Dog机制,这个机制是针对分布式锁来实现锁的自动续约,简单来说,如果当前获得锁的线程没有执行完,那么Redisson会自动给Redis中目标key延长超时时间。
默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。
@Override public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { return tryLock(waitTime, -1, unit); //leaseTime=-1 }
实际上,当我们通过tryLock方法没有传递超时时间时,默认会设置一个30s的超时时间,避免出现死锁的问题。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //当leaseTime为-1时,leaseTime=internalLockLeaseTime,默认是30s,表示当前锁的过期时间。 //this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { //说明出现异常,直接返回 return; } // lock acquired if (ttlRemaining == null) { //表示第一次设置锁键 if (leaseTime != -1) { //表示设置过超时时间,更新internalLockLeaseTime,并返回 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //leaseTime=-1,启动Watch Dog scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
由于默认设置了一个30s的过期时间,为了防止过期之后当前线程还未执行完,所以通过定时任务对过期时间进行续约。
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else {// 第一次加锁的时候会调用,内部会启动WatchDog entry.addThreadId(threadId); renewExpiration(); } }
定义一个定时任务,该任务中调用renewExpirationAsync
方法进行续约。
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //用到了时间轮机制 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // renewExpirationAsync续约租期 RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次间隔租期的1/3时间执行 ee.setTimeout(task); }
执行Lua脚本,对指定的key进行续约。
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
Lua是一个高效的轻量级脚本语言(和JavaScript类似),用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。Lua在葡萄牙语中是“月亮”的意思,它的logo形式卫星,寓意是Lua是一个“卫星语言”,能够方便地嵌入到其他语言中使用;其实在很多常见的框架中,都有嵌入Lua脚本的功能,比如OpenResty、Redis等。
使用Lua脚本的好处:
减少网络开销,在Lua脚本中可以把多个命令放在同一个脚本中运行
原子操作,redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。换句话说,编写脚本的过程中无需担心会出现竞态条件
复用性,客户端发送的脚本会永远存储在redis中,这意味着其他客户端可以复用这一脚本来完成同样的逻辑
Lua是一个独立的脚本语言,所以它有专门的编译执行工具,下面简单带大家安装一下。
下载Lua源码包: https://www.lua.org/download.html
https://www.lua.org/ftp/lua-5.4.3.tar.gz
安装步骤如下
tar -zxvf lua-5.4.3.tar.gz cd lua-5.4.3 make linux make install
如果报错,说找不到readline/readline.h, 可以通过yum命令安装
yum -y install readline-devel ncurses-devel
最后,直接输入lua
命令即可进入lua的控制台。Lua脚本有自己的语法、变量、逻辑运算符、函数等,这块我就不在这里做过多的说明,用过JavaScript的同学,应该只需要花几个小时就可以全部学完,简单演示两个案例如下。
array = {"Lua", "mic"} for i= 0, 2 do print(array[i]) end
array = {"mic", "redis"} for key,value in ipairs(array) do print(key, value) end
Redis中集成了Lua的编译和执行器,所以我们可以在Redis中定义Lua脚本去执行。同时,在Lua脚本中,可以直接调用Redis的命令,来操作Redis中的数据。
redis.call(‘set’,'hello','world') local value=redis.call(‘get’,’hello’)
redis.call 函数的返回值就是redis命令的执行结果,前面我们介绍过redis的5中类型的数据返回的值的类型也都不一样,redis.call函数会将这5种类型的返回值转化对应的Lua的数据类型
在很多情况下我们都需要脚本可以有返回值,毕竟这个脚本也是一个我们所编写的命令集,我们可以像调用其他redis内置命令一样调用我们自己写的脚本,所以同样redis会自动将脚本返回值的Lua数据类型转化为Redis的返回值类型。 在脚本中可以使用return 语句将值返回给redis客户端,通过return语句来执行,如果没有执行return,默认返回为nil。
编写完脚本后最重要的就是在程序中执行脚本。Redis提供了EVAL命令可以使开发者像调用其他Redis内置命令一样调用脚本。
[EVAL] [脚本内容] [key参数的数量] [key …] [arg …]
可以通过key和arg这两个参数向脚本中传递数据,他们的值可以在脚本中分别使用KEYS和ARGV 这两个类型的全局变量访问。
比如我们通过脚本实现一个set命令,通过在redis客户端中调用,那么执行的语句是:
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello
上述脚本相当于使用Lua脚本调用了Redis的set
命令,存储了一个key=lua,value=hello到Redis中。
考虑到我们通过eval执行lua脚本,脚本比较长的情况下,每次调用脚本都需要把整个脚本传给redis,比较占用带宽。为了解决这个问题,redis提供了EVALSHA命令允许开发者通过脚本内容的SHA1摘要来执行脚本。该命令的用法和EVAL一样,只不过是将脚本内容替换成脚本内容的SHA1摘要
Redis在执行EVAL命令时会计算脚本的SHA1摘要并记录在脚本缓存中
执行EVALSHA命令时Redis会根据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了就执行脚本,否则返回“NOSCRIPT No matching script,Please use EVAL”
# 将脚本加入缓存并生成sha1命令 script load "return redis.call('get','lua')" # ["13bd040587b891aedc00a72458cbf8588a27df90"] # 传递sha1的值来执行该命令 evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0
通过lua脚本来实现一个访问频率限制功能。
思路,定义一个key,key中包含ip地址。 value为指定时间内的访问次数,比如说是10秒内只能访问3次。
定义Lua脚本。
local times=redis.call('incr',KEYS[1]) -- 如果是第一次进来,设置一个过期时间 if times == 1 then redis.call('expire',KEYS[1],ARGV[1]) end -- 如果在指定时间内访问次数大于指定次数,则返回0,表示访问被限制 if times > tonumber(ARGV[2]) then return 0 end -- 返回1,允许被访问 return 1
定义controller,提供访问测试方法
@RestController public class RedissonController { @Autowired RedissonClient redissonClient; private final String LIMIT_LUA= "local times=redis.call('incr',KEYS[1])\n" + "if times == 1 then\n" + " redis.call('expire',KEYS[1],ARGV[1])\n" + "end\n" + "if times > tonumber(ARGV[2]) then\n" + " return 0\n" + "end\n" + "return 1"; @GetMapping("/lua/{id}") public String lua(@PathVariable("id") Integer id) throws ExecutionException, InterruptedException { List<Object> keys= Arrays.asList("LIMIT:"+id); RFuture<Object> future=redissonClient.getScript(). evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3); return future.get().toString(); } }
需要注意,上述脚本执行的时候会有问题,因为redis默认的序列化方式导致value的值在传递到脚本中时,转成了对象类型,需要修改redisson.yml
文件,增加codec的序列化方式。
application.yml
spring: redis: redisson: file: classpath:redisson.yml
redisson.yml
singleServerConfig: address: redis://192.168.221.128:6379 codec: !<org.redisson.codec.JsonJacksonCodec> {}
redis的脚本执行是原子的,即脚本执行期间Redis不会执行其他命令。所有的命令必须等待脚本执行完以后才能执行。为了防止某个脚本执行时间过程导致Redis无法提供服务。Redis提供了lua-time-limit参数限制脚本的最长运行时间。默认是5秒钟。
当脚本运行时间超过这个限制后,Redis将开始接受其他命令但不会执行(以确保脚本的原子性),而是返回BUSY的错误,下面演示一下这种情况。
打开两个客户端窗口,在第一个窗口中执行lua脚本的死循环
eval "while true do end" 0
在第二个窗口中运行get lua
,会得到如下的异常。
(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.
我们会发现执行结果是Busy, 接着我们通过script kill 的命令终止当前执行的脚本,第二个窗口的显示又恢复正常了。
如果当前执行的Lua脚本对Redis的数据进行了修改(SET、DEL等),那么通过SCRIPT KILL 命令是不能终止脚本运行的,因为要保证脚本运行的原子性,如果脚本执行了一部分终止,那就违背了脚本原子性的要求。最终要保证脚本要么都执行,要么都不执行
同样打开两个窗口,第一个窗口运行如下命令
eval "redis.call('set','name','mic') while true do end" 0
在第二个窗口运行
get lua
结果一样,仍然是busy,但是这个时候通过script kill命令,会发现报错,没办法kill。
(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.
遇到这种情况,只能通过shutdown nosave命令来强行终止redis。
shutdown nosave和shutdown的区别在于 shutdown nosave不会进行持久化操作,意味着发生在上一次快照后的数据库修改都会丢失。
了解了lua之后,我们再回过头来看看Redisson的Lua脚本,就不难理解了。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
下面是Redisson中释放锁的代码,在代码中我们发现一个publish的指令redis.call('publish', KEYS[2], ARGV[1])
,这个指令是干啥的呢?
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
Redis提供了一组命令可以让开发者实现“发布/订阅”模式(publish/subscribe) . 该模式同样可以实现进程间的消息传递,它的实现原理是:
发布/订阅模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或多个频道,而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都会收到该消息
发布者发布消息的命令是PUBLISH, 用法是
PUBLISH channel message
比如向channel.1发一条消息:hello
PUBLISH channel.1 “hello”
这样就实现了消息的发送,该命令的返回值表示接收到这条消息的订阅者数量。因为在执行这条命令的时候还没有订阅者订阅该频道,所以返回为0. 另外值得注意的是消息发送出去不会持久化,如果发送之前没有订阅者,那么后续再有订阅者订阅该频道,之前的消息就收不到了
订阅者订阅消息的命令是:
SUBSCRIBE channel [channel …]
该命令同时可以订阅多个频道,比如订阅channel.1的频道:SUBSCRIBE channel.1,执行SUBSCRIBE命令后客户端会进入订阅状态。
一般情况下,我们不会用pub/sub来做消息发送机制,毕竟有这么多MQ技术在。