此处使用Redis的setNx命令和expire命令和del命令来实现分布式锁。
首先我们要知道, 我们的redis执行命令是队列方式的,并不存在多个命令同时运行,所有命令都是串行的访问。那么这就说明我们多个客户端连接Redis的时候不存在其并发的问题。
其实实现分布式锁并不仅仅可以使用Redis完成,也可以使用其他的方式来完成,最主要的目的就是有一个地方能作为锁状态,然后通过这个锁的状态来实现代码中的功能。只要我们这个锁操作的时候是是串行的,那么就能实现分布式锁。
其实有一个问题,为什么我们不使用Java中的synchronized而要去搞一个分布式锁呢?其实就是因为现在都是分布式环境,而Java内置的synchronized是针对单个Java进程的锁,而分布式环境下有n个Java进程,而分布式锁实现的多个Java进程之间的锁。
那么为什么我们要使用setNx命令,而不使用其他命令呢?例如get命令,这种当我们获取到key以后,可能已经是脏数据了,而我们的setNx的意思是,我们设置一个key,如果此key已经存在,那么则返回0,不存在则返回1并设置成功,我们就可以利用这个方式来实现所谓的分布式锁。
注意,分布式锁实现最重要的地方就是有一个步骤能做到串行且不会脏数据。
废话不多说直接上现成的方法。
/** * Redis 锁工具类 * * @author dh * @date 20211028103258 **/ @Component public class RedisLockHelper { @Autowired public RedisTemplate redisTemplate; /** * 获取锁 * @param key 锁key * @param seconds 最大锁时间 * @return true:成功,false:失败 */ public boolean lock(String key,Long seconds){ return (Boolean) redisTemplate.execute((RedisCallback) connection -> { /** 如果不存在,那么则true,则允许执行, */ Boolean acquire = connection.setNX(key.getBytes(), String.valueOf(key).getBytes()); /** 防止死锁,将其key设置过期时间 */ connection.expire(key.getBytes(), seconds); if (acquire) { return true; } return false; }); } /** * 删除锁 * @param key */ public void delete(String key) { redisTemplate.delete(key); } }
如果理解力强的朋友拿到这个方法就很快的能实现业务中的功能,我们这里给一个防止重复提交的实现案例。
防重复提交注解RepeatSubmitIntercept
/** * 重复提交拦截注解 * @author dh */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RepeatSubmitIntercept { /** * 最大阻挡时间,默认5s */ long maxTime() default 5L; /** * 重复提交时返回msg */ String errorTitle() default "当前操作重复!"; /** * 拦截方式: * 1、如果为0:那么则根据当前用户拦截,那么当前方法该用户在上次请求完成前内只能访问一次. * 2、如果为1:那么则根据当前指定参数名进行拦截,那么当前方法该用户同一参数在上次请求完成前只能访问一次. */ int type() default 0; /** * 拦截方式: * 如果拦截方式为0,那么根据请求头来判断用户 */ String userHead() default CacheConstants.AUTHORIZATION_HEADER; /** * 如果拦截方式为1时,指定的参数名称集合 * @return */ String []parameters() default {}; /** * redis中key前缀,一般不需要修改此 */ String redis_lock_prefix() default "super_bridal_repeat_submit_lock_prefix_"; /** * 当该方法处于被拦截状态时,重复尝试次数,0则不尝试 * @return */ int rewaitCount() default 0; }
aop
/** * 防重复提交的注解 * * @param point * @return * @throws Throwable */ @Around("@annotation(包名.........RepeatSubmitIntercept)") public Object noRepeatSubmitAround(ProceedingJoinPoint point) throws Throwable { HttpServletRequest request = ServletUtils.getRequest(); String uriStringBase64 = Base64.getEncoder().encodeToString(request.getRequestURI().getBytes()); MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); RepeatSubmitIntercept repeatSubmitIntercept = method.getAnnotation(RepeatSubmitIntercept.class); if (repeatSubmitIntercept.maxTime() < 1L) { throw new RepeatSubmitInterceptException("重复提交拦截器报错--设置最大阻挡时间错误,至少大于1s", 500); } if (StringUtils.isBlank(repeatSubmitIntercept.errorTitle())) { throw new RepeatSubmitInterceptException("重复提交拦截器报错--错误信息提醒请勿设置为空/空串", 500); } if (StringUtils.isBlank(repeatSubmitIntercept.redis_lock_prefix())) { throw new RepeatSubmitInterceptException("重复提交拦截器报错--前缀Key不能为空/空串", 500); } String token = Convert.toStr(ServletUtils.getRequest().getHeader(repeatSubmitIntercept.userHead())); StringBuilder key = new StringBuilder() .append(repeatSubmitIntercept.redis_lock_prefix()) .append(token) .append("/") .append(uriStringBase64); if (StringUtils.isEmpty(token)) { throw new RepeatSubmitInterceptException("重复提交拦截器报错--当前拦截方式为[用户拦截],但其请求头中token为空!", 500); } /** 用户拦截的方式 */ if (repeatSubmitIntercept.type() == 0) { /** 此处应该使用请求头中token作为key,那么此处不做其他操作. */ } else if (repeatSubmitIntercept.type() == 1) { /** 从请求参数中获取key */ // ...................省略 } else { throw new RepeatSubmitInterceptException("重复提交拦截器报错--当前拦截方式为未设置!", 500); } if (redisLockHelper.lock(key.toString(), repeatSubmitIntercept.maxTime())) { return execute(key.toString(), point); } else { /** * 1、判断允许重复等待 * 2、重复等待操作 * */ if (repeatSubmitIntercept.rewaitCount() > 0) { int i = 0; while (i < repeatSubmitIntercept.rewaitCount()) { /** 暂停100ms再去拿 */ Thread.sleep(100); i++; if (redisLockHelper.lock(key.toString(), repeatSubmitIntercept.maxTime())) { return execute(key.toString(), point); } } } } throw new RepeatSubmitInterceptException(repeatSubmitIntercept.errorTitle(), 500); }
注意这里的RepeatSubmitInterceptException是自定义的异常。
使用的地方
@GetMapping("/test1") @RepeatSubmitIntercept() public AjaxResult test1(){ System.out.println("进入了请求:" + System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return AjaxResult.success(); }
该实现中如有问题欢迎留言。