在常见的分布式锁中有以下三种实现:
在 Redis 中有 3 个重要命令,通过这三个命令可以实现分布式锁
public class DistributedLock implements Lock { private static JedisPool JEDIS_POOL = null; private static int EXPIRE_SECONDS = 60; public static void setJedisPool(JedisPool jedisPool, int expireSecond) { JEDIS_POOL = jedisPool; EXPIRE_SECONDS = expireSecond; } private String lockKey; private String lockValue; private DistributedLock(String lockKey) { this.lockKey = lockKey; } public static DistributedLock newLock(String lockKey) { return new DistributedLock(lockKey); } @Override public void lock() { if (!tryLock()) { throw new IllegalStateException("未获取到锁"); } } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return tryLock(0, null); } @Override public boolean tryLock(long time, TimeUnit unit) { Jedis conn = null; String retIdentifier = null; try { conn = JEDIS_POOL.getResource(); lockKey = UUID.randomUUID().toString(); // 获取锁的超时时间,超过这个时间则放弃获取锁 long end = 0; if (time != 0) { end = System.currentTimeMillis() + unit.toMillis(time); } do { if (conn.setnx(lockKey, lockValue) == 1) { conn.expire(lockKey, EXPIRE_SECONDS); return true; } try { Thread.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } while (System.currentTimeMillis() < end); } catch (JedisException e) { if (lockValue.equals(conn.get(lockKey))) { conn.del(lockKey); } e.printStackTrace(); } finally { if (conn != null) { conn.close(); } } return false; } @Override public void unlock() { Jedis conn = null; try { conn = JEDIS_POOL.getResource(); if (lockValue.equals(conn.get(lockKey))) { conn.del(lockKey); } } catch (JedisException e) { e.printStackTrace(); } finally { if (conn != null) { conn.close(); } } } @Override public Condition newCondition() { return null; } }
上面的代码中也有一个问题,setnx 和 expire 是分为两步进行了,虽然在 catch 中处理异常并尝试将可能出现锁删除,但这种方式并不友好,一个好的方案是通过执行 lua 脚本来实现。在 Spring Redis Lock 和 Redission 都是通过 lua 脚本实现的
local lockClientId = redis.call('GET', KEYS[1]) if lockClientId == ARGV[1] then redis.call('PEXPIRE', KEYS[1], ARGV[2]) return true elseif not lockClientId then redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2]) return true end return false
1. 引入库
在 Spring Boot 项目会根据 Spring Boot 依赖管理自动配置版本号
Maven
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2. 配置 redis
在 application-xxx.yml
中配置
spring: redis: host: 127.0.0.1 port: 6379 timeout: 2500 password: xxxxx
3. 增加配置
RedisLockConfig.java
import java.util.concurrent.TimeUnit; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.integration.redis.util.RedisLockRegistry; @Configuration public class RedisLockConfig { @Bean public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) { return new RedisLockRegistry(redisConnectionFactory, "redis-lock", TimeUnit.MINUTES.toMillis(10)); } }
4. 使用
@Autowired private RedisLockRegistry lockRegistry; Lock lock = lockRegistry.obtain(key); boolean locked = false; try { locked = lock.tryLock(); if (!locked) { // 没有获取到锁的逻辑 } // 获取锁的逻辑 } finally { // 一定要解锁 if (locked) { lock.unlock(); } }
Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("xxxxxx").setDatabase(0); RedissonClient redissonClient = Redisson.create(config); RLock rLock = redissonClient.getLock("lockKey"); boolean locked = false; try { /* * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败 * leaseTime 锁的持有时间,超过这个时间锁会自动失效 */ locked = rLock.tryLock((long) waitTimeout, (long) leaseTime, TimeUnit.SECONDS); if (!locked) { // 没有获取锁的逻辑 } // 获取锁的逻辑 } catch (Exception e) { throw new RuntimeException("aquire lock fail"); } finally { if(locked) rLock.unlock(); }
优点:redis 本身的性能比较高,即使存在大量的 setnx 命令也不会有所下降
缺点:
针对第 2 个缺点,在 Redission 通过续约机制,每隔一段时间去检测锁是否还在进行,如果还在运行就将对应的 key 增加一定的时间,保证在锁运行的情况下不会发生 key 到了过期时间自动删除的情况
基于zookeeper临时有序节点可以实现的分布式锁。
大致步骤:客户端对某个方法加锁时,在 zookeeper 上的与该方法对应的指定节点的目录下,生成一个唯一的临时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
当第一个节点申请锁 xxxlock 时如下: 在 xxxlock 持久节点下,创建一个 lock 的临时有序节点,此时因为 lock 为有序节点中序号最小的一个,则此时获取到锁
当第一个节点还在处理业务逻辑未释放锁时,第二节点申请 xxxlock 锁,创建一个 lock 的临时有序节点,此时因为 lock 不是有序节点中序号最小的一个,则此时不能获取到锁,需要一直等到 lock:1 节点删除后才能获取到锁,此时 lock:2 会 watch 它的上一个节点(即 lock:1)等到 lock:1 删除后在获取锁
当第一个节点还在处理业务逻辑未释放锁时,第二节点还在排队,第三个节点申请锁时,创建一个 lock 的临时有序节点,此时因为 lock 不是有序节点中序号最小的一个,则此时不能获取到锁,需要一直等到上面的节点( lock:1 和 lock:2 )节点删除后才能获取到锁,此时 lock:3 会 watch 它的上一个节点(即 lock:2)等到 lock:2 删除后在获取锁
Maven.
<dependency> <!-- spring integration --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-zookeeper</artifactId> </dependency>
Gradle.
compile "org.springframework.integration:spring-integration-zookeeper:5.1.2.RELEASE"
增加配置
@Configuration public class ZookeeperLockConfig { @Value("${zookeeper.host}") private String zkUrl; @Bean public CuratorFrameworkFactoryBean curatorFrameworkFactoryBean() { return new CuratorFrameworkFactoryBean(zkUrl); } @Bean public ZookeeperLockRegistry zookeeperLockRegistry(CuratorFramework curatorFramework) { return new ZookeeperLockRegistry(curatorFramework, "/lock"); } }
使用
@Autowired private ZookeeperLockRegistry lockRegistry; Lock lock = lockRegistry.obtain(key); boolean locked = false; try { locked = lock.tryLock(); if (!locked) { // 没有获取到锁的逻辑 } // 获取锁的逻辑 } finally { // 一定要解锁 if (locked) { lock.unlock(); } }
Maven
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.1.0</version> </dependency>
使用
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, new RetryNTimes(retryCount, elapsedTimeMs)); InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "lock name"); mutex.acquire(); // 获取锁 mutex.acquire(long time, TimeUnit unit) // 获取锁并设置最大等待时间 mutex.release(); // 释放锁
优点:
缺点:
create table distributed_lock ( id int(11) unsigned NOT NULL auto_increment primary key, key_name varchar(30) unique NOT NULL comment '锁名', update_time datetime default current_timestamp on update current_timestamp comment '更新时间' )ENGINE=InnoDB comment '数据库锁';
方式一:通过 insert 和 delete 实现
使用数据库唯一索引,当我们想获取一个锁的时候,就 insert 一条数据,如果 insert 成功则获取到锁,获取锁之后,通过 delete 语句来删除锁
这种方式实现,锁不会等待,如果想设置获取锁的最大时间,需要自己实现
方式二:通过for update 实现
以下操作需要在事务中进行
select * from distributed_lock where key_name = 'lock' for update;
在查询语句后面增加 for update
,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。for update
的另一个特性就是会阻塞,这样也间接实现了一个阻塞队列,但是 for update
的阻塞时间是由数据库决定的,而不是程序决定的。
在 MySQL 8 中,for update
语句可以加上 nowait
来实现非阻塞用法
select * from distributed_lock where key_name = 'lock' for update nowait;
在 InnoDB 引擎在加锁的时候,只有通过索引查询时才会使用行级锁,否则为表锁,而且如果查询不到数据的时候也会升级为表锁。
这种方式需要在数据库中实现已经存在数据的情况下使用。
优点:
如果项目中已经使用了数据库在不引入其他中间件的情况下,可以直接使用数据库,减少依赖
直接借助数据库,容易理解。
缺点:
从性能角度(从高到低)缓存 > Zookeeper >= 数据库
从可靠性角度(从高到低)Zookeeper > 缓存 > 数据库
问题、实现 | Redis | Zookeeper | 数据库 |
---|---|---|---|
性能 | 高 | 中 | 低 |
可靠性 | 中 | 高 | 低 |
过期删除 | 有,设置过期时间,或者手动删除 | 执行业务逻辑后手动删除 | 1. for update 事务完成后,数据库自动释放 2. insert 方式执行业务逻辑后手动删除 |
阻塞队列 | 无,需要客户端自旋解决 | 通过监听上一个 lock 解决,watch 机制 | 1. for update 数据库自己解决 2. insert 方式需要客户端自旋解决 |
超时时间内业务未完成问题 | 需要自己写续约机制完成,Redission 内部自己实现了 | 无这问题 | 1. for update 执行时间过长,可能导致事务本身超时 2. insert 方式无此问题 |
项目异常导致锁未手动删除的情况 | redis 有过期时间,过期时间后自动删除 | session 断开后,临时节点自动删除 | 1. for update 机制数据库会自动清除 2. insert 方式就得自己想解决方案了 |