本文介绍日常工作中redis的使用,涉及到redis的数据结构、对应的命令、持久化配置和Lua脚本,以及基于redis的分布式锁实现方案,使用redis时这些都是必会的基础知识,建议保存以下命令
# 查看当前库中key的数量
dbsize
# 清空当前库
flushdb
# 清空所有库
flushall
# 查看当前库下所有key
keys *
# 当前库下是否有指定key
exists key1
# 查看key的值类型
type key1
# 删除key
del key1
# 设置指定key的过期时间,单位秒
expire key1 10
# 查看key的剩余过期时间,-1表示永不过期,-2表示已过期
ttl key1
# 监视key
watch key1
# 取消监视key
unwatch key1
是key/value的数据结构,一个key对应一个string类型的value,单个value最大512M
这个是最常用的数据结构了
命令
# 设置key/value
set key1 value1
# 设置key/value的同时设置过期时间
setex key1 10 value1
# 获取key的值
get key1
# 设置新值,同时返回旧值
getset key1 value1
# 一次设置多个key/value,后面的....表示还可以写key3 value3等
mset key1 value1 key2 value2 ...
# 一次获取多个key的值
mget key1 key2
# 追加内容到指定key的值后面
append key1 xxx
# 获取值的长度
strlen key1
# 只有在key不存在时才成功
setnx key1 value1
# 只有在所有key不存在时才成功
msetnx key1 value1 key2 value2 ...
# 给指定key的值加1
incr key1
# 减1
decr key1
# 给指定key的值加指定数值,本例是加2
incrby key1 2
# 给指定key的值减指定数值,本例是减2
decrby key1 2
# 获取指定key的值中指定范围的字符,如值为abcdefg,取1至2返回bc,即包含1和2两个位置的字符
getrange key1 1 2
# 设置指定位置的值,指定开始位置,然后直接覆盖,如下例中值为abcdefg,从第1个位置开始覆盖为cb,则结果为acbdefg
setrange key1 1 cb
双向链表,无序可重复的集合,一般用来做队列
# 从表头添加元素,value2是新的表头
lpush key1 value1 value2 ...
# 从表尾添加元素,value2是新的表尾
rpush key1 value1 value2 ...
# 从表头弹出元素
lpop key1
# 从表尾弹出元素
rpop key1
# 从key1表尾弹出一个元素,再加到key2表头
rpoplpush key1 key2
# 从表中查看指定索引的范围的元素
lrange key1 0 2
# 查看整个链表
lrange key1 1 0 -1
# 获取链表中从左向右指定索引的元素
lindex key1 1
# 获取链表中最后一个元素
lindex key1 -1
# 获取链表长度
llen key1
# 向链表中的value1前面插入value2
linsert key1 before value1 value2
# 向链表中value1后面插入value2
linsert key1 after value1 value2
# 从链表中删除一个值为value1的元素,从左向右
lrem key1 1 value1
# 从链表中删除一个值为value1的元素,从右向左
lrem key1 -1 value1
# 删除链表中所有值为value1的元素
lrem key1 0 value1
无序不可重复的集合,常用来排除重复数据和随机抽奖功能
# 向集合中添加元素,重复元素会自动跳过
sadd key1 value1 value2 ...
# 取出集合所有元素
smembers key1
# 判断集合中是否存在某个元素
sismember key1 value1
# 获取集合中的元素个数
scard key1
# 从集合中删除指定元素
srem key1 value1 value2 ...
# 随机从集合中弹出一个元素并删除该元素
spop key1
# 随机从集合中取出元素,但不会删除元素,后面的1表示取出元素的个数
srandmember key1 1
# 求两个集合交集
sinter key1 key2
# 求两个集合并集
sunion key1 key2
# 求两个集合差集
sdiff key1 key2
有序不可重复的集合,常用来做排行榜
# 添加元素,相同value不同score会覆盖score
zadd key1 score1 value1 score2 value2
# 获取元素数量
zcard key1
# 取出全部元素,从小到大
zrange key1 0 -1
# 取出部分元素,从小到大
zrange key1 0 4
# 取出全部元素,从大到小
zrevrange key1 0 -1
# 取出部分元素,从大到小
zrevrange key1 0 4
# 取出score在指定范围内的元素,从小到大,其中min和max是score的范围
zrangebyscore key1 min max withscores
# 取出score在指定范围内的元素,从大到小
zrevrangebyscore key1 max min withscores
# 为指定value的元素的score递增,其中1是每次递增多少,可以为负数
zincrby key1 1 value1
# 删除指定元素
zrem key1 value1
# 统计集合中score在范围内的元素个数
zcount key1 min max
# 返回指定值在集合中的排名,从小到大,排名从0开始
zrank key1 value1
# 返回指定值在集合中的排名,从大到小
zrevrank key1 value1
# 添加一个键值对
hset key1 field1 value1
# 获取键值
hget key1 field1
# 批量设置键值对
hmset key1 field1 value1 field2 value2 ...
# 检查键是否存在
hexists key1 field1
# 获取所有键
hkeys key1
# 获取所有值
hvals key1
# 键值递增,后面的1表示每次递增多少,可以为负数,当是负数时表示递减
hincrby key1 field1 1
# 键不存在时成功
hsetnx key1 field1 value1
# 获取所有键值对,奇数为键,偶数为值
hgetall key1
bitmap以bit为单位设置各个位的值(要么是0,要么是1),根据实际应用场景可以设计出节省空间的算法,如布隆过滤器,本文以记录用户签到为例,假设用户ID为1,每年一个key,并且key=用户ID_年份,如1_2021
ID=1的用户在2021-01-01这一天签到,这一天是2021年第1天(也就是第0天),可以执行以下命令,保存签到记录
# 设置1_2021这个key的第0个bit值为1,以此表示第0天签到成功
setbit 1_2021 0 1
这种情况要么按月来设定key值,要么单独查询2021-02-01这一天是否签到,如果签到则总次数就减1
通过上面的例子,可以看到以bit为单位存储非常节省空间,用8个bit就可以表示8天内的签到情况。也可以用bitmap来存储所有用户一天内的签到情况,这种就以用户ID作为bit的偏移量,如果用户ID很大,超过了bitmap的最大范围,可以通过用户ID分片到不同的bitmap上
在同一个key内添加多个位置(经纬度),计算位置各个位置之间的距离,也可指定圆心按半径查找符合条件的位置,可实现附近的xxx功能
# 向key1添加一个叫company的位置,经纬度为116.404844 39.915378
geoadd key1 116.404844 39.915378 company
# 向key1添加一个叫home的位置,经纬度为116.370924 39.930871
geoadd key1 116.370924 39.930871 home
# 查询指定位置的经纬度
geopos key1 company
# 查询多个位置的经纬度
geopos key1 company home
# 计算两个位置的距离,单位是m
geodist key1 company home
# 计算两个位置的距离,指定单位为km
geodist key1 company home km
# 以指定经纬度为圆心,查询指定半径内的所有位置,其中116.370924 39.930871是圆心点的经纬度,2000 m是半径大小,单位m,withdist表示输出符合条件的位置与圆心的距离,withcoord表示输出符合条件的位置的经纬度,asc表示按距离从小到大排序
georadius key1 116.370924 39.930871 2000 m withdist withcoord asc
# 以指定位置为圆心,查询指定半径内的所有位置,返回结果中包含圆心自身,其他可选参数与上一条georadius相同
georadiusbymember key1 home 4000 m
redis大部分时间用来做缓存,通常数据丢失也可以恢复,但是有时候也会用来存储热门的数据,或者nginx直接连接redis做一些重要数据的存储(丢失后很难恢复),所以redis中的数据需要持久化
redis提供了RDB和AOF两种持久化方式,RDB是对当前数据的全量备份(理解成快照),AOF采用追加的方式记录所有写入的命令,所以一般AOF文件更大,可能导致硬盘被占满,这一点需要注意,需要及时的对AOF文件进行瘦身
执行save
和bgsave
命令会生成一份当前内存数据的快照到.rdb
文件内,其中bgsave
命令是另起一个线程去执行,因此不会阻塞主线程
redis默认开启了RDB,redis会自动进行RDB存储,RDB常用配置参数:
save 300 1000 # 每隔300秒,如果有1000个key发生了变化,则备份一次
save 30 10000 # 每隔30秒,如果有10000个key发生了变化,则备份一次
上面的参数不能乱改,要根据redis的写入数据情况来设置,不能太频繁的生成RDB快照,这会影响redis的性能
一般做持久化要同时开启RDB和AOF,下面介绍工作中如何设置redis的AOF,编辑redis.conf配置文件,修改以下配置项
# 开启AOF
appendonly yes
# AOF文件名
appendfilename "xxx.aof"
# 将命令刷入磁盘的间隔,everysec是每秒一次
appendfsync everysec
# 在执行bgrewrite的时候不向磁盘刷新数据
no-appendfsync-on-rewrite no
关于appendfsync的意思:在计算机组成原理中,我们知道相对于内存而言对磁盘的读写是很慢的,所以CPU将数据写入内存缓冲区,定时或存满后再写入磁盘,redis的AOF中appendfsync
这个配置就是设置多久写入磁盘一次,设为一秒是比较保险的,如果发生故障只会丢失最近1秒内的数据
RDB定期对内存中的数据进行快照,会影响redis的性能,所以不能太频繁
RDB在快照之间如何发生错误会丢失此段时间内的数据
RDB在重启redis时恢复速度更快,不像AOF那样需要一条一条命令执行
AOF可设置每秒追加一次写入命令到aof文件中,所以发生故障时丢失数据最少
AOF文件中保存的是redis的写入命令,所以可以打开文件进行修改,删除不需要的命令
AOF的缺点是要记录redis做的每一步写入命令,所以文件很大,需要及时进行瘦身
redis中默认就支持lua脚本,我们通常会使用lua脚本来代替redis事务,可解决超卖和少卖的情况
以下是redis的lua脚本特性:
原子操作,lua脚本会作为一个整体执行,不会被其他连接的命令中断,因此可替代事务
lua脚本加载后可重复使用
减少网络请求的开销,一次性发出一个lua脚本到redis,redis执行完后返回结果,不用多次请求
eval 脚本 参数数量 参数名1 参数名2 值1 值2
示例:
eval "return KEYS[1]..ARGV[1]" 1 key1 val1
# 输出
key1val1
lua脚本中KEYS
和ARGV
两个数组名是固定的,且索引从1开始,上面例子中参数数量为1,参数名1为key1,值1为val1,而脚本是拼接key1和val1,所以结果就是key1val1了
注意:如果脚本有多个参数,则参数名写在一起,参数值写在一起,如key1 key2 val1 val2
,而不是key1 val1 key2 val2
加载脚本的目的是重复使用,通过script load命令实现,返回一个sha1的hash值,之后通过此值可调用已加载的脚本
script load "return KEYS[1]..ARGV[1]"
# 假设返回3783a90bf1f43b15a1e06c4e7664da956ed959d9
调用脚本
# 3783a90bf1f43b15a1e06c4e7664da956ed959d9 是script load返回的结果
# 1 是参数数量
# key1 是参数名1
# val1 是参数值1
evalsha 3783a90bf1f43b15a1e06c4e7664da956ed959d9 1 key1 val1
指定加载脚本返回的sha1 hash值来调用脚本,并指定参数数量和参数名以及参数值
script exists 3783a90bf1f43b15a1e06c4e7664da956ed959d9
本例使用spring提供的RedisTemplate,首先引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
定义一个RedisClient,对常用命令进行封装
@Component
public class RedisClient {
@Autowired
@Qualifier("redisTemplate")
private RedisTemplate redisTemplate;
/**
* 执行脚本
* @param clazz 返回结果类型
* @param script lua脚本
* @param keys lua脚本的参数名
* @param args lua脚本的值
* @param <T>
* @return
*/
public <T> T execute(Class<T> clazz, String script, List<Object> keys, Object... args) {
DefaultRedisScript<T> redisScript = new DefaultRedisScript<>();
redisScript.setResultType(clazz);
redisScript.setScriptText(script);
return (T) redisTemplate.execute(redisScript, keys, args);
}
}
测试使用,先在redis中set两个key出来,分别是test_key1和test_key2,以下lua脚本是对这两个key进行递增操作,分别递增1和2
String script = "redis.call(\"INCRBY\", KEYS[1], ARGV[1])\nredis.call(\"INCRBY\", KEYS[2], ARGV[2])";
ArrayList<Object> keys = Lists.newArrayList("test_key1", "test_key2");
Object execute = redisClient.execute(Object.class, script, keys, 1, 2);
System.out.println(execute);
redis有个setnx命令,在key不存在时才能设置成功,因此也经常用来实现分布式锁,但是只通过setnx命令来做分布式锁是不安全的,假设线程1执行setnx成功并设置10秒后过期,线程2再执行setnx命令肯定是失败的,如果线程1执行过程中发生故障没有及时清除锁,则其他线程只能等待10秒后才能获取锁;如果线程1在10秒内还没有执行完,由于锁已经过期,导致其他线程执行setnx成功,这就存在两个线程同时拿到锁,这样的使用方式肯定是不行的,今天介绍第三方库redisson,redisson帮我们搞定了以上两种情况需要解决的问题
先引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
定义一个工具类,方便使用
@Component
public class RedissonUtils {
@Autowired
private RedissonClient redissonClient;
private static RedissonClient client;
@PostConstruct
private void init() {
client = redissonClient;
}
/**
* 通过指定key加锁
* @param key 指定key
* @return
*/
public static RLock lock(String key) {
RLock lock = client.getLock(key);
lock.lock();
return lock;
}
/**
* 通过指定key加锁指定时间
* @param key 指定key
* @param expire 锁定时间,默认单位:毫秒
* @return
*/
public static RLock lock(String key, long expire) {
RLock lock = client.getLock(key);
lock.lock(expire, TimeUnit.MILLISECONDS);
return lock;
}
/**
* 通过指定key加锁指定时间
* @param key 指定key
* @param expire 锁定时间
* @param unit 锁定时间单位
* @return
*/
public static RLock lock(String key, long expire, TimeUnit unit) {
RLock lock = client.getLock(key);
lock.lock(expire, unit);
return lock;
}
/**
* 通过指定key尝试加锁指定时间,如果加锁失败等待指定时间
* @param key 指定key
* @param wait 加锁等待时间
* @param expire 锁定时间
* @param unit 锁定时间单位
* @return
*/
public static RLock tryLock(String key, long wait, long expire, TimeUnit unit) {
RLock lock = client.getLock(key);
try {
if (lock.tryLock(wait, expire, unit)) {
return lock;
} else {
return null;
}
} catch (InterruptedException e) {
return null;
}
}
/**
* 通过指定key解锁
* @param key
*/
public static void unlock(String key) {
RLock lock = client.getLock(key);
lock.unlock();
}
/**
* 通过指定锁解锁
* @param lock
*/
public static void unlock(RLock lock) {
if (lock != null) {
lock.unlock();
}
}
}
获取锁
RLock lock = RedissonUtils.tryLock(LockKey.XXX, 5L, 10L, TimeUnit.SECONDS);
Assert.isNull(lock, "获取锁失败");
// 取到锁
try {
// 拿到锁后执行的操作
...
} finally {
RedissonUtils.unlock(lock);
}
需要注意的是释放锁一定要放到finally中,防止发生异常而未释放锁
最后说明一下分布式锁也不是非要使用redisson,使用redis实现分布式锁有个致命的问题,当拿到锁后主redis还未同步到从redis时,主redis挂掉,系统切换到从redis,此时其他线程仍然可以拿到锁,这样系统中就有两个线程拿到了锁
作者:飞翔的代码
链接:https://juejin.cn/post/6983845888121241608
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。