Redis 是Remote Dictionary Service 的简称;也是远程字典服务;
Redis 是内存数据库,KV数据库,数据结构数据库;
Redis 应用非常广泛,如Twitter、暴雪娱乐、Github、Stack Overflow、腾讯、阿里巴巴、京东、华为、新浪微博等,很多中小型公司也在使用;
Redis命令查看:http://redis.cn/commands.html
记录朋友圈点赞数、评论数和点击数(hash) 记录朋友圈说说列表(排序),便于快速显示朋友圈(zset) 记录文章的标题、摘要、作者和封面,用于列表页展示(hash) 记录朋友圈的点赞用户ID列表,评论ID列表,用于显示和去重计数(zset) 缓存热点数据,减少数据库压力(hash) 如果朋友圈说说ID是整数id,可使用redis来分配朋友圈说说id(计数器)(string) 通过集合(set)的交并差集运算来实现记录好友关系(set) 游戏业务中,每局战绩存储(list)
git clone https://gitee.com/mirrors/redis.git -b 6.2 cd redis make make test make install # 默认安装在 /usr/local/bin # redis-server 是服务端程序 # redis-cli 是客户端程序
mkdir redis-data # 把redis文件夹下 redis.conf 拷贝到 redis-data # 修改 redis.conf # requirepass 修改密码 123456 # daemonize yes cd redis-data redis-server redis.conf # 通过 redis-cli 访问 redis-server redis-cli -h 127.0.0.1 -a 123456
字符数组,该字符串是动态字符串raw,字符串长度小于1M时,加倍扩容;超过1M每次只多扩1M; 字符串最大长度为512M;
注意:redis字符串是二进制安全字符串;可以存储图片,二进制协议等二进制数据;
# 设置 key 的 value 值 SET key val # 获取 key 的 value GET key # 执行原子加一的操作 INCR key # 执行原子加一个整数的操作 INCRBY key increment # 执行原子减一的操作 DECR key # 执行原子减一个整数的操作 DECRBY key decrement # 如果key不存在,这种情况下等同SET命令。 当key存在时,什么也不做 SETNX key value # 删除 key val 键值对 DEL key # 设置或者清空key的value(字符串)在offset处的bit值。 SETBIT key offset value # 返回key对应的string在offset处的bit值 GETBIT key offset # 统计字符串被设置为1的bit数. BITCOUNT key
字符串长度小于等于 20 且能转成整数,则使用 int 存储;
字符串长度小于等于 44,则使用 embstr 存储;
字符串长度大于 44,则使用 raw 存储;
OBJECT encoding +key 可以查看存储结构是什么
为什么redis字符串存储小于等于44字节时,是 embstr 类型,而超过44是 raw 类型;
typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount; void *ptr; } robj; redis所有的key value都是用这个结构进行描述的 struct __attribute__ ((__packed__)) sdshdr8 { uint8_t len; /* used */ uint8_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; #define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 44 robj *createStringObject(const char *ptr, size_t len) { if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) return createEmbeddedStringObject(ptr,len); else return createRawStringObject(ptr,len); }
redisObject 占用16个字节; sdshdr8 占用 3+x+1 个字节(后面加1是因为 char buf[] 要预留一个 \0 );
redis 内存分配器认为 大于 64个字节为大字符串;所以留给小字符串的大小为 64 - 16 - 3 - 1 = 44 ;
SET role:10001 '{["name"]:"mark",["sex"]:"male",["age"]:30}'
GET role:10001
# 统计阅读数 累计加1 incr reads //如果没有reads 首先会创建reads = 0 然后再调用incr # 累计加100 incrby reads 100
# 加锁 setnx lock 1 # 释放锁 del lock
# 月签到功能 10001 用户id 202106 2021年6月份的签到 6月份的第1天 setbit sign:10001:202106 1 1 # 计算 2021年6月份 的签到情况 bitcount sign:10001:202106 # 获取 2021年6月份 第二天的签到情况 1 已签到 0 没有签到 getbit sign:10001:202106 2
双向链表实现,列表首尾操作(删除和增加)时间复杂度 O(1) ;查找中间元素时间复杂度为 O(n) ; 列表中数据是否压缩的依据: (数据太大会进行gzip压缩)
1. 元素长度小于 48,不压缩;
2. 元素压缩前后长度差不超过 8,不压缩;
# 从队列的左侧入队一个或多个元素 LPUSH key value [value ...] # 从队列的左侧弹出一个元素 LPOP key # 从队列的右侧入队一个或多个元素 RPUSH key value [value ...] # 从队列的右侧弹出一个元素 RPOP key # 返回从队列的 start 和 end 之间的元素 0, 1 2 -1是最后一个 -2是倒数第二 -3... LRANGE key start end # 从存于 key 的列表里移除前 count 次出现的值为 value 的元素 LREM key count value # 它是 RPOP 的阻塞版本,因为这个命令会在给定list无法弹出任何元素的时候阻塞连接 BRPOP key timeout
/* Minimum ziplist size in bytes for attempting compression. */ #define MIN_COMPRESS_BYTES 48 /* quicklistNode is a 32 byte struct describing a ziplist for a quicklist. * We use bit fields keep the quicklistNode at 32 bytes. * count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k). * encoding: 2 bits, RAW=1, LZF=2. * container: 2 bits, NONE=1, ZIPLIST=2. * recompress: 1 bit, bool, true if node is temporary decompressed for usage. * attempted_compress: 1 bit, boolean, used for verifying during testing. * extra: 10 bits, free for future use; pads out the remainder of 32 bits */ typedef struct quicklistNode { struct quicklistNode *prev; struct quicklistNode *next; unsigned char *zl; unsigned int sz; /* ziplist size in bytes */ unsigned int count : 16; /* count of items in ziplist */ unsigned int encoding : 2; /* RAW==1 or LZF==2 */ unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ unsigned int recompress : 1; /* was this node previous compressed? */ unsigned int attempted_compress : 1; /* node can't compress; too small */ unsigned int extra : 10; /* more bits to steal for future usage */ } quicklistNode; typedef struct quicklist { quicklistNode *head; quicklistNode *tail; unsigned long count; /* total count of all entries in all ziplists */ unsigned long len; /* number of quicklistNodes */ int fill : QL_FILL_BITS; /* fill factor for individual nodes */ unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */ unsigned int bookmark_count: QL_BM_BITS; quicklistBookmark bookmarks[]; } quicklist;
LPUSH + LPOP # 或者 RPUSH + RPOP
LPUSH + RPOP # 或者 RPUSH + LPOP
LPUSH + BRPOP +timeout # 或者 RPUSH + BLPOP +timeout
操作与队列一样,但是在不同系统间;
# 在某些业务场景下,需要获取固定数量的记录;比如获取最近50条战绩;这些记录需要按照插入的先 后顺序返回; lpush says '{["name"]:"xiaoming", ["text"]:"祝大家儿童节快乐!", ["picture"]:["url://image-20210601172741434.jpg", "url://image20210601172741435.jpg"], timestamp = 1231231230}' lpush says '{["name"]:"xiaoli", ["text"]:"祝大家儿童节快乐!", ["picture"]:["url://image-20210601172742434.jpg", "url://image20210601172741436.jpg"], timestamp = 1231231231}' lpush says '{["name"]:"xiaohua】", ["text"]:"祝大家儿童节快乐!", ["picture"]:["url://image-20210601172743434.jpg", "url://image20210601172741437.jpg"], timestamp = 1231231232}' lpush says '{["name"]:"xiaoxin】", ["text"]:"一切只为渴望更优秀的你", ["picture"]:["url://image-20210601172744434.jpg", "url://image20210601172741438.jpg"], timestamp = 1231231233}' lpush says '{["name"]:"xiaoai】", ["text"]:"hello 0Voice! hello to better self", ["picture"]:["url://image-20210601172745439.jpg", "url://image-20210601172741435.jpg"], timestamp = 1231231234}' lpush says '{["name"]:"xiaomei", ["text"]:"2021届学员真牛逼!", ["picture"]:["url://image-20210601172745434.jpg", "url://image20210601172741440.jpg"], timestamp = 1231231235}' # 裁剪最近5条记录 ltrim says 0 4 lrange says 0 -1
实际项目中需要保证命令的原子性,所以一般用 lua 脚本 或者使用 pipeline 命令;
-- redis lua脚本 local record = KEYS[1] redis.call("LPUSH", "says", record) redis.call("LTRIM", "says", 0, 4)
散列表,在很多高级语言当中包含这种数据结构;c++ unordered_map 通过 key 快速索引 value;
# 获取 key 对应 hash 中的 field 对应的值 HGET key field # 设置 key 对应 hash 中的 field 对应的值 HSET key field value # 设置多个hash键值对 HMSET key field1 value1 field2 value2 ... fieldn valuen # 获取多个field的值 HMGET key field1 field2 ... fieldn # 给 key 对应 hash 中的 field 对应的值加一个整数值 HINCRBY key field increment # 获取 key 对应的 hash 有多少个键值对 HLEN key # 删除 key 对应的 hash 的键值对,该键为field HDEL key field
节点数量大于 512(hash-max-ziplist-entries) 或所有字符串长度大于 64(hash-max-ziplistvalue),则使用 dict 实现;
节点数量小于等于 512 且有一个字符串长度小于 64,则使用 ziplist 实现;
hmset hash:10001 name mark age 18 sex male # 与 string 比较 set hash:10001 '{["name"]:"mark",["sex"]:"male",["age"]:18}' # 假设现在修改 mark的年龄为19岁 # hash: hset hash:10001 age 19 # string: get role:10001 # 将得到的字符串调用json解密,取出字段,修改 age 值 # 再调用json加密 set role:10001 '{["name"]:"mark",["sex"]:"male",["age"]:19}'
# 将用户id作为 key # 商品id作为 field # 商品数量作为 value # 注意:这些物品是按照我们添加顺序来显示的; # 添加商品: hset MyCart:10001 40001 1 lpush MyItem:10001 40001 # 增加数量: hincrby MyCart:10001 40001 1 hincrby MyCart:10001 40001 -1 // 减少数量1 # 显示所有物品数量: hlen MyCart:10001 # 删除商品: hdel MyCart:10001 40001 lrem MyItem:10001 1 40001 # 获取所有物品: lrange MyItem:10001 # 40001 40002 40003 hget MyCart:10001 40001 hget MyCart:10001 40002 hget MyCart:10001 40003
集合;用来存储唯一性字段,不要求有序;
# 添加一个或多个指定的member元素到集合的 key中 SADD key member [member ...] # 计算集合元素个数 SCARD key # SMEMBERS key SMEMBERS key # 返回成员 member 是否是存储的集合 key的成员 SISMEMBER key member # 随机返回key集合中的一个或者多个元素,不删除这些元素 SRANDMEMBER key [count] # 从存储在key的集合中移除并返回一个或多个随机元素 SPOP key [count] # 返回一个集合与给定集合的差集的元素 SDIFF key [key ...] # 返回指定所有的集合的成员的交集 SINTER key [key ...] # 返回给定的多个集合的并集中的所有成员 SUNION key [key ...]
元素都为整数且节点数量小于等于 512(set-max-intset-entries),则使用整数数组存储;
元素当中有一个不是整数或者节点数量大于 512,则使用字典存储;
# 添加抽奖用户 sadd Award:1 10001 10002 10003 10004 10005 10006 sadd Award:1 10009 # 查看所有抽奖用户 smembers Award:1 # 抽取多名幸运用户 srandmember Award:1 10 # 如果抽取一等奖1名,二等奖2名,三等奖3名,该如何操作?
sadd follow:A mark king darren mole vico sadd follow:C mark king darren sinter follow:A follow:C
sadd follow:A mark king darren mole vico sadd follow:C mark king darren # C可能认识的人: sdiff follow:A follow:C
有序集合;用来实现排行榜;它是一个有序结构;
# 添加到键为key有序集合(sorted set)里面 ZADD key [NX|XX] [CH] [INCR] score member [score member ...] # 从键为key有序集合中删除 member 的键值对 ZREM key member [member ...] # 返回有序集key中,成员member的score值 ZSCORE key member # 为有序集key的成员member的score值加上增量increment ZINCRBY key increment member # 返回key的有序集元素个数 ZCARD key # 返回有序集key中成员member的排名 ZRANK key member # 返回存储在有序集合key中的指定范围的元素 ZRANGE key start stop [WITHSCORES] # 返回有序集key中,指定区间内的成员(逆序) ZREVRANGE key start stop [WITHSCORES]
节点数量大于 128或者有一个字符串长度大于64,则使用跳表(skiplist);
节点数量小于等于128(zset-max-ziplist-entries)且所有字符串长度小于等于64(zset-maxziplist-value),则使用 ziplist 存储;
# 点击新闻: zincrby hot:20210203 1 10001 zincrby hot:20210203 1 10002 zincrby hot:20210203 1 10003 zincrby hot:20210203 1 10004 zincrby hot:20210203 1 10005 zincrby hot:20210203 1 10006 zincrby hot:20210203 1 10007 zincrby hot:20210203 1 10008 zincrby hot:20210203 1 10009 zincrby hot:20210203 1 10010 # 获取排行榜: zrevrange hot:20210203 0 9 withscores
将消息序列化成一个字符串作为 zset 的member;这个消息的到期处理时间作为score,然后用多 个线程轮询zset获取到期的任务进行处理。
def delay(msg): msg.id = str(uuid.uuid4()) #保证 member 唯一 value = json.dumps(msg) retry_ts = time.time() + 5 # 5s后重试 redis.zadd("delay-queue", retry_ts, value) # 使用连接池 def loop(): while True: values = redis.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1) if not values: time.sleep(1) continue value = values[0] success = redis.zrem("delay-queue", value) if success: msg = json.loads(value) handle_msg(msg) # 缺点:loop 是多线程竞争,两个线程都从zrangebyscore获取到数据,但是zrem一个成功一个失 败, # 优化:为了避免多余的操作,可以使用lua脚本原子执行这两个命令 # 解决:漏斗限流
生产者将定时任务 hash 到不同的 redis 实体中,为每一个redis实体分配一个dispatcher进程,用 来定时获取redis中超时事件并发布到不同的消费者中;
系统限定用户的某个行为在指定的时间里只能发生N次;
# 指定用户 user_id 的某个行为 action 在特定时间内 period 只允许发生做多的次数 max_count def is_action_allowed(userid, action, period, max_count): key = 'hist:%s:%s' % (userid, action) now_ts = int(time.time()*1000) # 毫秒时间戳 with client.pipeline() as pipe: # 记录行为 pipe.zadd(key, now_ts, now_ts) # 移除时间窗口之前的行为记录,剩下的都是时间窗口内的 pipe.zremrangebyscore(key, 0, now_ts - period * 1000) # 获取时间窗口内的行为数量 pipe.zcard(key) # 设置过期时间,避免冷用户持续占用内存 时间窗口的长度+1秒 pipe.expire(key, period + 1) _,_,current_count,_ = pipe.execute() return current_count <= max_count can_reply = is_action_allowed(10001, "replay", 60, 5) if can_reply: do_reply() else: raise ActionThresholdOverflow() # 维护一次时间窗口,将窗口外的记录全部清理掉,只保留窗口内的记录; # 缺点:记录了所有时间窗口内的数据,如果这个量很大,不适合做这样的限流;