如果对以上内容了解不足,阅读本文会比较吃力。(第一次写,有不合适或者更优解的地方欢迎指正)。
@DistributedLock("testLock") public R testWrite(@DistributedLockParam String param) { System.out.println(new Date()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getId()); System.out.println(new Date()); return R.ok(); }
使用@DistributedLock的方法在执行过程中会持有一个redisson锁,当我用JMeter(压测工具,可以简单理解成几乎同时发送n个请求)进行三次请求的时候,控制台的输入内容如下
可以看到,三次请求得以有序执行,分布式锁生效,下面我会讲解具体的实现思路和代码。
redisson框架的分布式锁基于redis实现,通过访问其中同名的键值来判断当前任务有没有其他线程正在执行,在java代码中,可以在逻辑代码之前进行上锁(lock),逻辑代码之后进行解锁(unlock)来完成。
Spring的aop,在调用 由Bean容器管理的对象 中的方法时,会自动触发aop的执行,因此,我们可以设计一个aop,帮助我们完成上述redisson的功能,这样我们就不需要手动编写上锁解锁的这个过程了。
下面是一个简易的redissonDemo,当然其中缺少很多真正使用时需要用到的参数。
@Autowired private RedissonClient redissonClient; public void lockDemo() { String name = "lockName"; RLock lock = redissonClient.getLock(name); try { lock.lock(); //do someThing Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }
可以看到,我们真正的业务代码是被包含于一个try中的,在业务之前和之后都有一段锁逻辑,因此,aop通知我们选择环绕通知。同时,在创建锁的时候,我们需要一些参数,编写这个aop时我们需要考虑怎样将参数传给它。
上面提到了,我们需要为aop提供参数,然后aop通过这些参数为我们进行上锁解锁操作。在这里我们选择使用将注解设置为切点,因为注解可以完成提供参数的这一需求。这样,aop的原型呼之欲出。
@Autowired private RedissonClient redissonClient; @Pointcut("@annotation(com.xxx.common.aop.distributed.DistributedLock)") public void distributedLockAspect() {} @Around(value = "distributedLockAspect()") public Object doAround(ProceedingJoinPoint pjp) throws Throwable { return doLock(pjp); }
@Target({ ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DistributedLock { /** * 锁的名称。 * 如果lockName可以确定,直接设置该属性。 */ String value(); /** * 是否使用尝试锁。 */ boolean tryLock() default false; /** * 最长等待时间。 * 该字段只有当tryLock()返回true才有效。 */ long waitTime() default 30L; /** * 锁超时时间。 * 如果tryLock为false,且leaseTime设置为0及以下,会变成lock() */ long leaseTime() default 5L; /** * 时间单位。默认为秒。 */ TimeUnit timeUnit() default TimeUnit.SECONDS; }
@Target({ ElementType.METHOD })的含义是当前注解使用在方法上
@Retention(RetentionPolicy.RUNTIME)的含义是当前注解在运行时有效
注解中包含一些上锁时常用的属性,如果从aop中读取这些属性,也可以执行上锁解锁过程,但是事实上,我们在设置锁的时候常常是需要设置 锁粒度 的。
解释一下锁粒度的问题,我们知道,锁是为了将原本异步处理的一些功能同步,防止出现线程安全问题,但是现在这个注解实现时,只有一个value,这个value对于某个方法来说是固定的,但是这样就会存在问题,打个比方,你在超市买东西以后要结账(方法),结果超市虽然有很多个结账台,但是只允许有一个人去结账,其他人都拦在外面,因为第一个人进去结账的时候直接把门带上了(上锁),他锁住了整个方法,导致了其他人无法完成结账,这时我们正确的解决办法是让所有的结账台都能接收一个顾客,也就是我们的结账方法需要能为每个结账台构建独立的锁(其实就是行级锁和表级锁的区分)。
我们的方法是同一个,但是我们需要让它在不同的情况下去生成不同的锁,这个时候首先想到的应该是方法参数,我们可以考虑用方法的参数的不同来决定锁的形态。这样,出现了第二个注解。
@Target({ ElementType.PARAMETER }) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DistributedLockParam { /** * 如果是对象则填写使用哪个属性,如果是基本数据类型或String则按默认 * @return */ String value() default ""; /** * 当前属性在锁目录的顺序,越小越靠前 * **勿重 * @return */ int sort() default 0; }
这个注解的作用是写在方法参数上,当我们需要设置锁粒度的时候,就在对应的参数上填写这个注解,然后在aop中读取包含这个注解的参数的值,拼在真正的锁name中。
由于粒度可能由多于一个元素组成,所以在注解中加入了sort属性用来为粒度参数排序。
由于方法的参数既可能是基本类型,还可能是对象,我们的粒度标记可能 除了标记基本类型外,还需要用来标记对象中的某个属性,甚至这个属性还可能是对象,就需要对象属性的属性……
好吧,先记住这个问题,等我们到了aop逻辑再说。
至于value中的值,在设置中,默认情况是直接使用目标的toString,如果要使用对象的某个属性,则将value修改为对应的属性名,如果是属性的属性,就用 "." 来表示。
当然了,我们既然已经支持多个粒度标记,那么就应该允许选择某一个对象的多个属性作为粒度,这个功能有两种实现方式,第一种是在value值中进行切分,比如 "a;b" 表示用a属性做第一个标记,用b属性做第二个标记。
但是万一我想在这两个之间拼接一个其他对象的某个属性呢?(没有这个万一!guna!),另外我们的value已经允许使用 "." 做深度查询标记了,再加一个分号是不是太乱了。
所以我们放弃了这种实现方式,换成另一种,复用注解。
注解怎么复用我就不说了,这是一个固定流程,在这里只贴出其中的代码。
需要在@DistributedLockParam中加入一个新注解@Repeatable。
@Target({ ElementType.PARAMETER }) @Retention(RetentionPolicy.RUNTIME) @Documented @Repeatable(DistributedLockParams.class) public @interface DistributedLockParam { /** * 如果是对象则填写使用哪个属性,如果是基本数据类型或String则按默认 * @return */ String value() default ""; /** * 当前属性在锁目录的顺序,越小越靠前 * **勿重 * @return */ int sort() default 0; }
然后是需要一个额外的注解作为它的容器。
@Retention(RetentionPolicy.RUNTIME) @Documented @Target({ ElementType.PARAMETER }) public @interface DistributedLockParams { DistributedLockParam[] value(); }
至此,我们用到的三个注解就全部完成了。
然后,坐稳了,我们要加速了。
在之前aop原型中,我们的环绕通知调用了doLock方法。
private Object doLock(ProceedingJoinPoint pjp) throws Throwable { //切点所在的类 Class targetClass = pjp.getTarget().getClass(); //使用了注解的方法 String methodName = pjp.getSignature().getName(); Class[] parameterTypes = ((MethodSignature)pjp.getSignature()).getMethod().getParameterTypes(); Method method = targetClass.getMethod(methodName, parameterTypes); Object[] arguments = pjp.getArgs(); // 根据方法反射获取想要的锁名字 String lockName = getLockName(method, arguments); DistributedLock distributedLock = method.getAnnotation(DistributedLock.class); // 生成锁 RLock lock = lock(lockName, distributedLock); try { return pjp.proceed(); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } }
其中,我们要做的第一步是获取到连接点的方法和参数,这是我们redisson锁的参数存放位置,这部分代码比较固定。
得到这两个元素后,我们可以用getLockName方法拼接锁的name。
private String getLockName(Method method, Object[] arguments) { // 获取注解 DistributedLock distributedLock = method.getAnnotation(DistributedLock.class); // 获取前缀 StringBuilder lockName = new StringBuilder(distributedLock.value()); // 用来存储锁粒度标记 TreeMap<Integer, String> treeMap = new TreeMap<>(); // 遍历参数找到粒度标记 Annotation[][] parameterAnnotations = method.getParameterAnnotations(); for (int i = 0; i < parameterAnnotations.length; i++) { // parameterAnnotations[i]:第i个参数的注解数组 for (Annotation annotation : parameterAnnotations[i]) { // 遍历需要的注解(如果只有一个就是DistributedLockParam,如果一个参数上有多个会自动组合成DistributedLockParams) if (annotation instanceof DistributedLockParam) { // 获取注解 DistributedLockParam distributedLockParam = (DistributedLockParam) annotation; // 把属性放进treemap fillTreeMap(distributedLockParam, arguments[i], treeMap); }else if (annotation instanceof DistributedLockParams) { // 获取注解 DistributedLockParams distributedLockParams = (DistributedLockParams) annotation; // 把属性放进treemap for (DistributedLockParam distributedLockParam : distributedLockParams.value()) { fillTreeMap(distributedLockParam, arguments[i], treeMap); } break; } } } // 收集完毕,拼接lockName separate(lockName, treeMap); return lockName.toString(); } private void fillTreeMap(DistributedLockParam distributedLockParam, Object argument, TreeMap<Integer, String> treeMap) { // 获取属性名 String field = distributedLockParam.value(); if (field.equals("")) { // 基本属性直接用 field = argument.toString(); } else { // 对象反射拿数据 try { String[] values = field.split("\\."); for (int i = 0; i < values.length; i++) { Field declaredField = argument.getClass().getDeclaredField(values[i]); declaredField.setAccessible(true); if (i == values.length - 1) { // 最后一个为真实对象,此时从中提取属性 field = declaredField.get(argument).toString(); // 如果这里不跳出,下一句执行会报错 break; } // 切换到下级对象 argument = declaredField.get(argument); } } catch (NoSuchFieldException | IllegalAccessException e) { throw new RRException("分布式锁参数有误"); } } // 确定好以后放进treeMap中,自动排序 treeMap.put(distributedLockParam.sort(), field); } private void separate(StringBuilder lockName, TreeMap<Integer, String> treeMap) { treeMap.values().forEach(s -> lockName.append(":").append(s)); }
首先,从method上获取@DistributedLock注解,并获取其中的value,这是我们锁名的前缀,如果后续没有拼接操作的话,它就是我们的锁名。
第二步是获取全部的粒度标记,因为有排序功能,索引我们引入treeMap来为我们找到的标记排序。
Annotation[][] parameterAnnotations = method.getParameterAnnotations(); for (int i = 0; i < parameterAnnotations.length; i++) { // parameterAnnotations[i]:第i个参数的注解数组 for (Annotation annotation : parameterAnnotations[i]) { // 遍历需要的注解(如果只有一个就是DistributedLockParam,如果一个参数上有多个会自动组合成DistributedLockParams) if (annotation instanceof DistributedLockParam) { // 获取注解 DistributedLockParam distributedLockParam = (DistributedLockParam) annotation; // 把属性放进treemap fillTreeMap(distributedLockParam, arguments[i], treeMap); }else if (annotation instanceof DistributedLockParams) { // 获取注解 DistributedLockParams distributedLockParams = (DistributedLockParams) annotation; // 把属性放进treemap for (DistributedLockParam distributedLockParam : distributedLockParams.value()) { fillTreeMap(distributedLockParam, arguments[i], treeMap); } break; } } }
其中的这段代码是获取标记的全过程。首先我们会获取参数列表的注解数组,它是一个二维数组,两个index分别表示参数的索引和注解的索引。
我们需要遍历这个数组中的每一个注解,判断它是不是我们需要的@DistributedLockParam和@DistributedLockParams(当我们的注解没有复用的时候,注解就是@DistributedLockParam,但是当我们对这个注解进行复用的时候,我们取到的就是@DistributedLockParams,其中包含所有的@DistributedLockParam)。
获取到注解以后通过fillTreeMap()方法提取其中的数据,对value默认情况下会直接提取目标的toString,如果我们手写了value……
// 对象反射拿数据 try { String[] values = field.split("\\."); for (int i = 0; i < values.length; i++) { Field declaredField = argument.getClass().getDeclaredField(values[i]); declaredField.setAccessible(true); if (i == values.length - 1) { // 最后一个为真实对象,此时从中提取属性 field = declaredField.get(argument).toString(); // 如果这里不跳出,下一句执行会报错 break; } // 切换到下级对象 argument = declaredField.get(argument); }
for递归的受难日到了。
首先我们按照约定的规则切分value,得到了每级属性的名字,然后就需要在反射的层面来找到这个属性了。
一开始,argument是我们的参数本身,通过argument.getClass().getDeclaredField(values[i])可以获得当前我们需要的属性,如果这个时候我们已经探索到最后一级了,就直接将这个属性的存在treeMap中并跳出(递归头),反之,我们需要以这个属性为基础,再次探索(递归体),在递归时,我们的argument实际指代的是当前遍历到的对象,所以在进入下一次递归前,我们首先需要将argument指向我们的下级属性对象。
当上面的逻辑执行完后,我们就得到了一个treeMap,其中包含我们所有的标记,并已经按照sort做了排序。
然后做一步简单的拼接。
private void separate(StringBuilder lockName, TreeMap<Integer, String> treeMap) { treeMap.values().forEach(s -> lockName.append(":").append(s)); }
拼接用":"是因为冒号在redis中是目录展示,类似于我们的"/"。
后面的操作就很简单了。
RLock lock = lock(lockName, distributedLock); try { return pjp.proceed(); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } }
private RLock lock(String lockName, DistributedLock distributedLock) throws InterruptedException { RLock lock = redissonClient.getLock(lockName); // 上锁 if (distributedLock.tryLock()) { lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit()); } else { long leaseTime = distributedLock.leaseTime(); if (leaseTime > 0) { lock.lock(distributedLock.leaseTime(), distributedLock.timeUnit()); } else { lock.lock(); } } return lock; }
提取@DistributedLock中的其他锁参数,然后构建我们的lock并执行上锁,try 返回结果finally解锁一气呵成。
在这个锁中,锁失效也就意味着是aop失效,所以这个问题可以变成,在什么情况下aop会失效。
这里就涉及到aop的原理,我只大致描述一点,有兴趣的可以自己去搜一下。
aop是基于代理模式的,Spring的aop会在我们调用 由Bean容器管理的对象 中的方法时自动触发,所以有两种情况下是无法生效的。
// spring相关的依赖就不贴过来了 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.12.0</version> </dependency>
import com.xxx.common.aop.distributed.DistributedLock; import com.xxx.common.aop.distributed.DistributedLockParam; import com.xxx.common.aop.distributed.DistributedLockParams; import com.xxx.common.exception.RRException; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.AfterThrowing; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.TreeMap; @Aspect @Component public class DistributedLockAspect { @Autowired private RedissonClient redissonClient; @Pointcut("@annotation(com.xxx.common.aop.distributed.DistributedLock)") public void distributedLockAspect() {} @Around(value = "distributedLockAspect()") public Object doAround(ProceedingJoinPoint pjp) throws Throwable { return doLock(pjp); } private Object doLock(ProceedingJoinPoint pjp) throws Throwable { //切点所在的类 Class targetClass = pjp.getTarget().getClass(); //使用了注解的方法 String methodName = pjp.getSignature().getName(); Class[] parameterTypes = ((MethodSignature)pjp.getSignature()).getMethod().getParameterTypes(); Method method = targetClass.getMethod(methodName, parameterTypes); Object[] arguments = pjp.getArgs(); // 根据方法反射获取想要的锁名字 String lockName = getLockName(method, arguments); DistributedLock distributedLock = method.getAnnotation(DistributedLock.class); // 生成锁 RLock lock = lock(lockName, distributedLock); try { return pjp.proceed(); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } private RLock lock(String lockName, DistributedLock distributedLock) throws InterruptedException { RLock lock = redissonClient.getLock(lockName); // 上锁 if (distributedLock.tryLock()) { lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit()); } else { long leaseTime = distributedLock.leaseTime(); if (leaseTime > 0) { lock.lock(distributedLock.leaseTime(), distributedLock.timeUnit()); } else { lock.lock(); } } return lock; } private String getLockName(Method method, Object[] arguments) { // 获取注解 DistributedLock distributedLock = method.getAnnotation(DistributedLock.class); // 获取前缀 StringBuilder lockName = new StringBuilder(distributedLock.value()); // 用来存储锁粒度标记 TreeMap<Integer, String> treeMap = new TreeMap<>(); // 遍历参数找到粒度标记 Annotation[][] parameterAnnotations = method.getParameterAnnotations(); for (int i = 0; i < parameterAnnotations.length; i++) { // parameterAnnotations[i]:第i个参数的注解数组 for (Annotation annotation : parameterAnnotations[i]) { // 遍历需要的注解(如果只有一个就是DistributedLockParam,如果一个参数上有多个会自动组合成DistributedLockParams) if (annotation instanceof DistributedLockParam) { // 获取注解 DistributedLockParam distributedLockParam = (DistributedLockParam) annotation; // 把属性放进treemap fillTreeMap(distributedLockParam, arguments[i], treeMap); }else if (annotation instanceof DistributedLockParams) { // 获取注解 DistributedLockParams distributedLockParams = (DistributedLockParams) annotation; // 把属性放进treemap for (DistributedLockParam distributedLockParam : distributedLockParams.value()) { fillTreeMap(distributedLockParam, arguments[i], treeMap); } break; } } } // 收集完毕,拼接lockName separate(lockName, treeMap); return lockName.toString(); } private void fillTreeMap(DistributedLockParam distributedLockParam, Object argument, TreeMap<Integer, String> treeMap) { // 获取属性名 String field = distributedLockParam.value(); if (field.equals("")) { // 基本属性直接用 field = argument.toString(); } else { // 对象反射拿数据 try { String[] values = field.split("\\."); for (int i = 0; i < values.length; i++) { Field declaredField = argument.getClass().getDeclaredField(values[i]); declaredField.setAccessible(true); if (i == values.length - 1) { // 最后一个为真实对象,此时从中提取属性 field = declaredField.get(argument).toString(); // 如果这里不跳出,下一句执行会报错 break; } // 切换到下级对象 argument = declaredField.get(argument); } } catch (NoSuchFieldException | IllegalAccessException e) { throw new RRException("分布式锁参数有误"); } } // 确定好以后放进treeMap中,自动排序 treeMap.put(distributedLockParam.sort(), field); } private void separate(StringBuilder lockName, TreeMap<Integer, String> treeMap) { treeMap.values().forEach(s -> lockName.append(":").append(s)); } @AfterThrowing(value = "distributedLockAspect()", throwing="ex") public void afterThrowing(Throwable ex) { throw new RuntimeException(ex); } }
import java.lang.annotation.*; import java.util.concurrent.TimeUnit; @Target({ ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DistributedLock { /** * 锁的名称。 * 如果lockName可以确定,直接设置该属性。 */ String value(); /** * 是否使用尝试锁。 */ boolean tryLock() default false; /** * 最长等待时间。 * 该字段只有当tryLock()返回true才有效。 */ long waitTime() default 30L; /** * 锁超时时间。 * 如果tryLock为false,且leaseTime设置为0及以下,会变成lock() */ long leaseTime() default 5L; /** * 时间单位。默认为秒。 */ TimeUnit timeUnit() default TimeUnit.SECONDS; }
import java.lang.annotation.*; @Target({ ElementType.PARAMETER }) @Retention(RetentionPolicy.RUNTIME) @Documented @Repeatable(DistributedLockParams.class) public @interface DistributedLockParam { /** * 如果是对象则填写使用哪个属性,如果是基本数据类型或String则按默认 * @return */ String value() default ""; /** * 当前属性在锁目录的顺序,越小越靠前 * **勿重 * @return */ int sort() default 0; }
import java.lang.annotation.*; @Retention(RetentionPolicy.RUNTIME) @Documented @Target({ ElementType.PARAMETER }) public @interface DistributedLockParams { DistributedLockParam[] value(); }