分布式锁的选择,目前市面上使用最广的就是redis和zookeeper,这两种实现都有各自的有点。
zookeeper:可用性高,性能低,并发低。
redis:性能高,可用性中等,并发高
根据业务场景选择合适的技术实现,下面通过应用redis的特性,实现分布式锁。
与业务代码低耦合,侵入性低。
使用方便,简单
组件开发
利用redis的串行操作特点,将相关的锁设置key,给key设置过期时间。编写为一个luna脚本,实现一体操作。
redisson是开源的redis客户端,提供了丰富的数据操作。也提供了分布式锁的实现。已经有相关开源项目,就不要重复造轮子。
根据相关的业务开发需求,基于spring的aop的特性,采用注解的方式进行实现。组件的开发参考了相关开源项目实现,也融入了自身的的实现
参考开源的git地址:https://github.com/kekingcn/spring-boot-klock-starter
package com.xxx.component.redislock.annotation; import com.xxx.component.redislock.model.LockRange; import com.xxx.component.redislock.model.LockType; import com.xxx.component.redislock.strategy.LockExceptionStrategy; import com.xxx.component.redislock.strategy.LockTimeOutStrategy; import com.xxx.component.redislock.strategy.ReleaseExceptionStrategy; import com.xxx.component.redislock.strategy.ReleaseTimeOutStrategy; import com.xxx.component.redislock.util.Constant; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target(value = {ElementType.METHOD}) @Retention(value = RetentionPolicy.RUNTIME) public @interface RedisLock { /** * 锁前缀 * @return */ String preFix() default ""; /** * 锁的范围 * * @return */ LockRange lockRange() default LockRange.normal; /** * 锁后缀 * * @return */ String postFix() default ""; /** * 参数key值拼接符 * @return */ String separator() default "."; /** * 锁的名称 * @return name */ String name() default ""; /** * 锁类型,默认可重入锁 * @return lockType */ LockType lockType() default LockType.Reentrant; /** * 尝试加锁,最多等待时间 * @return waitTime */ long waitTime() default Constant.waitTime; /** *上锁以后xxx秒自动解锁 * @return leaseTime */ long leaseTime() default Constant.leaseTime; /** * 自定义业务key * @return keys */ String[] keys() default {}; /** * 加锁异常处理策略 * @return lockTimeoutStrategy */ LockExceptionStrategy lockExceptionStrategy() default LockExceptionStrategy.FAIL_FAST; /** * 释放锁异常处理策略 * @return releaseTimeoutStrategy */ ReleaseExceptionStrategy releaseExceptionStrategy() default ReleaseExceptionStrategy.FAIL_FAST; /** * 加锁超时的处理策略 * * @return lockTimeoutStrategy */ LockTimeOutStrategy lockTimeoutStrategy() default LockTimeOutStrategy.FAIL_FAST; /** * 释放锁时已超时的处理策略 * * @return releaseTimeoutStrategy */ ReleaseTimeOutStrategy releaseTimeoutStrategy() default ReleaseTimeOutStrategy.NO_OPERATION; /** * 本地参数变量 * * @return */ String[] localArgs() default {}; }
自定义错误处理@CustomLockExceptionStrategy
加解锁,发生超时或其它异常时,需要自定义错误处理。eg:记录加锁出错数据等,个人建议这里采用异步处理。
package com.xxx.component.redislock.annotation; import java.lang.annotation.*; /** * @Author Buck Wang * @Description 自定义redissson 加锁异常处理类 * @Date 2021/4/7 15:06 */ @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface CustomLockExceptionStrategy { //实现 CustomLockExceptionStrategyInter的处理类 Class name(); //自定义的业务类型 String busType() default ""; }
@Aspect @Component @Order(0) public class RedisLockAspect { private static final Logger logger = LoggerFactory.getLogger(RedisLockAspect.class); @Autowired LockFactory lockFactory; @Autowired private LockInfoProvider lockInfoProvider; @Around(value = "@annotation(redisLock)") public Object around(ProceedingJoinPoint joinPoint, RedisLock redisLock) throws Throwable { LockInfo lockInfo = lockInfoProvider.get(joinPoint, redisLock); LockRes lockResCurrent = new LockRes(lockInfo, LockResEnum.LockDealStatusEnum.EXCEPTION.getKey()); Lock lock = lockFactory.getLock(lockInfo); lockResCurrent.setLock(lock); //加锁 lock(lockResCurrent); try { return joinPoint.proceed(); } catch (Throwable e) { e.printStackTrace(); throw e; } finally { try { //释放锁 releaseLock(lockResCurrent); } catch (Throwable e) { throw e; } finally { LockThreadLocalUtil.removeLockRes(lockResCurrent); } } } }
注意点
aop的Order执行级别需要调高优先级,值越小,越先执行。注解是在方法上,有些会涉及到事务,事务会涉及到回滚,事务的实现也是aop,所以redislock的aop需要比事务的aop的越先执行,也就是锁的order值比事务的aop值小,默认的@Order的value是Ordered.LOWEST_PRECEDENCE=2147483647
代码实现
xml配置文件
目前项目中基于springmvc搭建的框架,版本是3.2.5。考虑到redis异常,自动下线错误节点,redis采用的是哨兵模式。
spring的xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans default-lazy-init="true" xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://redisson.org/schema/redisson http://redisson.org/schema/redisson/redisson.xsd" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:redisson="http://redisson.org/schema/redisson"> <aop:aspectj-autoproxy expose-proxy="true"/> <context:component-scan base-package="com.xxx.component.redislock"></context:component-scan> <redisson:client id="redisLockComponent" > <redisson:sentinel-servers master-name="${redislock.sentinelmaster}" slave-connection-pool-size="500" master-connection-pool-size="500" idle-connection-timeout="60000" slave-connection-minimum-idle-size="32" master-connection-minimum-idle-size="32" connect-timeout="10000" timeout="3000" ping-timeout="1000"> <redisson:sentinel-address value="${redislock.addr1}" /> <redisson:sentinel-address value="${redislock.addr2}" /> <redisson:sentinel-address value="${redislock.addr3}" /> </redisson:sentinel-servers> </redisson:client> </beans>
springboot
针对springboot,redisson有对应的jar,进行代码实现。当然你也可以自己代码实现,下面是代码。
Config config = Config.fromYAML("yaml的内容"); 或 Resource resource = ctx.getResource(redisLockConfig.getFile()); InputStream is = resource.getInputStream(); Config config = Config.fromYAML(is); Redisson.create(config);
redisson的yaml的配置
sentinelServersConfig: idleConnectionTimeout: 10000 connectTimeout: 10000 timeout: 3000 retryAttempts: 3 retryInterval: 1500 subscriptionsPerConnection: 5 loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {} slaveSubscriptionConnectionMinimumIdleSize: 1 slaveSubscriptionConnectionPoolSize: 50 slaveConnectionMinimumIdleSize: 32 slaveConnectionPoolSize: 64 masterConnectionMinimumIdleSize: 32 masterConnectionPoolSize: 64 readMode: "SLAVE" sentinelAddresses: - "redis://10.xxx.32.94:26379" - "redis://10.xxx.32.95:26379" - "redis://10.xxx.32.106:26379" masterName: "mymaster" database: 0 threads: 0 nettyThreads: 0 codec: !<org.redisson.codec.JsonJacksonCodec> {} transportMode: "NIO"
注意点
#redislock,哨兵模式配置 redislock.sentinelmaster=mymaster redislock.addr1=redis://10.xx.32.xx:26379 26379这个是哨兵的端口
org.redisson.RedissonLock#scheduleExpirationRenewal
private void scheduleExpirationRenewal(final long threadId) { if (expirationRenewalMap.containsKey(getEntryName())) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { RFuture<Boolean> future = renewExpirationAsync(threadId); future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { expirationRenewalMap.remove(getEntryName()); if (!future.isSuccess()) { log.error("Can't update lock " + getName() + " expiration", future.cause()); return; } if (future.getNow()) { // reschedule itself scheduleExpirationRenewal(threadId); } } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) { task.cancel(); } }
leaseTime的值设置为-1,定时任务,internalLockLeaseTime / 3,执行一次自动续约。续约的时间为internalLockLeaseTime。如internalLockLeaseTime 没做设置,默认是30000ms,如果想看到自动延时,注意不要使用debug断点执行方法,这样定时任务续约的执行也会卡住,可以采用线程sleep的方法,在观察key的剩余过期时间的变化,就知道是否自动延时。
redisson设置锁的key是hashmap类型。
#查看key是否存在,不存在返回值为0 HGETALL key #秒的方式,查看key的剩余时间,返回-1,代表没有设置过期时间,-2代表key已经过期了。 ttl key #毫秒的方式,查看key的剩余时间 Pttl key
注解不生效
主要的原因是没开启aop功能,导致不生效。
<aop:aspectj-autoproxy expose-proxy=“true”/>
嵌套调用,aop不生效
expose-proxy=“true”,属性必须是true。eg:同一个类中非aop注解的方法,调用带有aop注解的方法,需要先获取到此类的代理,在进行调用。
OrderService有方法method1,method2带有注解aop的方法。method1->method2
OrderService orderService=(OrderService) AopContext.currentProxy(); orderService.method2()