期望在同一时间段,不允许相同的库存被操作
@Transactional(rollbackFor = Exception.class) @DistributedLock(prefix = LOCK_STOCK_SYNC_PREFIX, key = "#skuCode") @Override public void syncStock(String skuCode, final StockSyncCmd cmd) { stockSyncCmdExe.execute(skuCode, cmd); }
// DistributedLockAdvice @Around("process()") public Object aroundExec(ProceedingJoinPoint pjp) throws Throwable { /* 通过连接点pjp获取所需属性,解析获取key... */ final Map<String, RLock> locks = lock(keys); try { return pjp.proceed(); } finally { unlock(locks); } }
在Redis主从集群模式下,Client A对Master节点上的锁会异步复制到Slave节点,但是在这个过程中,如果Master节点发生宕机,Slave节点成为Master节点后,Client B又对这个原Slave现Master节点上了同一把锁,那这个时候就发生了多Client对同一业务ID上锁的情况
经过JMeter并发测试,发现达不到预期效果
分析原因
综上
如果业务请求1先释放锁,然后在还未提交事务的时间节点,有业务请求2进来请求的还是同一个SKU Code,因为已经释放了锁,但业务请求1的事务还未提交,所以这个时候查DB数据还是原始数据,那这个时候问题就出现了,会被乐观锁拒绝更新
解决方案
@Aspect @Order(1) @Component @Slf4j public class DistributedLockAdvice { }
@Order注解可以决定IOC容器加载Bean的顺序
假设业务需求现在要求同时操作一批SKU Code,也就是把一个集合的SKU Code加锁看作为一次原子操作
@Around("process()") public Object aroundExec(ProceedingJoinPoint pjp) throws Throwable { // 获取method,args,prefix,key... Set<String> keys = new HashSet(); // 解析EL表达式将结果逐一添加进keys Map locks = this.lock(keys); try { return pjp.proceed(); } finally { unlock(locks); } } private Map<String, RLock> lock(Set<String> keys) { Map<String, RLock> locks = new HashMap<>(6); for (String key : keys) { // get lock instance final RLock lock = redissonClient.getLock(key); // lock lock.lock(); // add to list locks.put(key, lock); } return locks; }
当两个请求同时到来时,有可能会出现死锁的情况,都不释放已经获取的锁,都在等待唤醒通知,获取下一把锁。
在不考虑替换整体解决方案的前提下,用TreeSet可以避免该问题