假设场景,多个线程并发(模拟并发)对库存数量进行扣减,现将库存预置在redis中,然后开启多线程对库存进行扣减
private static final String PRODUCT = "MoonCake"; private static final String PRODUCT_STOCK = PRODUCT + "Stock"; @Autowired private RedissonClient redissonClient; @Autowired private RedisTemplate redisTemplate; @GetMapping("/lockAdd") @ApiOperation("加分布式锁") public void lockAdd() throws Exception { redisTemplate.opsForValue().set(PRODUCT_STOCK, 90);// 预置库存为90 for (int i = 0; i < 100; i++) { new Thread(() -> { try { decreaseStock(); } catch (InterruptedException e) { log.error("error:", e); } }).start(); } } private void decreaseStock() throws InterruptedException { //对数据进行加锁 RLock lock = redissonClient.getLock(PRODUCT); if (lock.tryLock(5, TimeUnit.SECONDS)) { // 获取最新的库存 Integer curStock = (Integer) redisTemplate.opsForValue().get(PRODUCT_STOCK); log.info("Get from redis, curStock=" + curStock); if (curStock > 0) { curStock -= 1; // 更新库存值 redisTemplate.opsForValue().set(PRODUCT_STOCK, curStock); log.info("扣减成功,库存stock:" + curStock); } else { //没库存 log.info("扣减失败,库存不足"); } //解锁 lock.unlock(); } }
获取锁时,一般需要锁的key,可以根据实际业务情况进行设置,这里设置为“MoonCake”
RLock lock = redissonClient.getLock(PRODUCT);
上面新建锁时,主要将name注入,生成一个锁对象RedissonLock
@Override public RLock getLock(String name) { return new RedissonLock(connectionManager.getCommandExecutor(), name); } public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); // 命令行执行器 this.commandExecutor = commandExecutor; // id this.id = commandExecutor.getConnectionManager().getId(); // 内部锁释放时间 this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); // entry名,存储在redis中的形式 this.entryName = id + ":" + name; // 锁状态的发布和订阅 this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }
使用lock.tryLock可以获取锁,该方法的定义如下
/** **@param waitTime: 获取锁的等待时间 **@param leaseTime:获取到锁之后的释放时间 **@param unit:时间单位 **/ public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 等待时间转化为毫秒 long time = unit.toMillis(waitTime); // 当前时间 long current = System.currentTimeMillis(); // 当前线程ID long threadId = Thread.currentThread().getId(); // 尝试获取锁,返回ttl时间 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 获取到锁 if (ttl == null) { return true; } // 没有获取到锁,更新等待时间=waitTime-刚才操作花费时间 time -= System.currentTimeMillis() - current; // 等待时间到,没有获取到锁,获取失败 if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 等待时间还没有用完 current = System.currentTimeMillis(); /** * 订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题: * 基于信号量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争. * 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败. * 当 this.await 返回 true,进入循环尝试获取锁. */ RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future) if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } try { // 计算获取锁的总耗时,总耗时大于获取锁的等待时间,返回获取锁失败 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } /** * 收到锁释放的信号后,在最大等待时间之内,循环尝试获取锁 * 获取锁成功,则返回 true, * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环 */ while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 获取锁成功 if (ttl == null) { return true; } // 计算获取锁的总耗时,总耗时大于获取锁的等待时间,返回获取锁失败 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } /** * 阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息): */ currentTime = System.currentTimeMillis(); // 当前锁的ttl>0并且ttl<剩余的等待时间 if (ttl >= 0 && ttl < time) { // 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。 subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { // 在wait time 时间范围内等待可以通过信号量 subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } // 计算获取锁的总耗时,总耗时大于获取锁的等待时间,返回获取锁失败 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { // 取消订阅 unsubscribe(subscribeFuture, threadId); } }
加锁之后在redis中的数据结构是hash类型
获取锁的Lua脚本
public class RedissonLock extends RedissonExpirable implements RLock { <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); // 脚本执行参数: 都是从下标1开始 // KEYS[1]=锁的key // ARGV[1]=锁释放时间,ARGV[2]=当前线程Id return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + // 当前锁key存在 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 线程持有的锁计数+1 "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 以毫秒为单位,将锁的TTL过期时间刷新成最新值 "return nil; " + // 返回nil "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 当前锁的key不存在 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 线程持有的锁计数+1 "redis.call('pexpire', KEYS[1], ARGV[1]); " + //以毫秒为单位,将锁的TTL过期时间刷新成最新值 "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", // 以毫秒为单位,返回锁的当前ttl过期时间 Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } }
解锁时的参数类似于:
getName() -> MoonCake getChannelName() -> redisson_lock__channel:{MoonCake} LockPubSub.UNLOCK_MESSAGE -> 0 internalLockLeaseTime -> 90000 getLockName(threadId) -> 1a2cef15-9816-4bea-89dc-1a8d9ad096ca:644
主要通过RedissonLock#unlockInnerAsync方法进行解锁,先判断锁是否被当前线程加的,如果不是,直接返回解锁失败;如果是,将当前线程持有的锁的计数器减1,获得减1之后的计数器值A;如果A大于0,代表锁被当前线程加了多次,将锁的过期时间刷新成传入的internalLockLeaseTime;如果A==0,代表锁要被释放了,删除锁,然后发送解锁消息给等待队列。
protected RFuture<Boolean> unlockInnerAsync(long threadId) { // 脚本执行参数: 都是从下标1开始 // KEYS[1]=锁的key, KEYS[2]=消息通道名称 // ARGV[1]=解锁信息,ARGV[2]=锁释放时间,ARGV[2]=当前线程Id return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 当前线程持有锁? "return nil;" + // 不是当前线程持有的锁 "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 当前线程持有锁计数-1 "if (counter > 0) then " + // 锁重入 "redis.call('pexpire', KEYS[1], ARGV[2]); " + // 以毫秒为单位,将锁的TTL过期时间刷新成释放时间 "return 0; " + // 返回0 "else " + // 只锁定了一次,没有锁重入 "redis.call('del', KEYS[1]); " + // 删除锁 "redis.call('publish', KEYS[2], ARGV[1]); " + // 发布锁释放消息,通知等待队列 "return 1; " + // 返回1 "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }