由于键空间通知比较耗CPU, 所以 Redis默认是关闭键空间事件通知的, 需要手动修改redis.conf 的notify-keyspace-events开启redis key的过期提醒
K:keyspace事件,事件以__keyspace@<db>__为前缀进行发布; E:keyevent事件,事件以__keyevent@<db>__为前缀进行发布; g:一般性的,非特定类型的命令,比如del,expire,rename等; $:String 特定命令; l:List 特定命令; s:Set 特定命令; h:Hash 特定命令; z:Sorted 特定命令; x:过期事件,当某个键过期并删除时会产生该事件; e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件; A:g$lshzxe的别名,因此”AKE”意味着所有事件。
notify-keyspace-events Ex 表示开启键过期事件提醒
import os import time import redis import random from concurrent.futures import ThreadPoolExecutor from loguru import logger def redis_init(): redis_string = os.environ.get('REDIS_URL') redis_conn = redis.Redis.from_url(redis_string, decode_responses=True, health_check_interval=30, retry_on_timeout=True, socket_keepalive=True) redis_conn.ping() return redis_conn def set_delayed_notification(redis_conn): for i in range(0, 10): delayed = random.randint(2, 10) redis_conn.set(f'n_{i}', '1', delayed) logger.info(f'n_{i} set delayed notice time {delayed}') def hook_notice(redis_conn): sub = redis_conn.pubsub() #获得一个订阅者 sub.psubscribe(["__keyevent@*__:expired"]) #订阅__keyevent@*__:expired频道 for item in sub.listen(): #监听这个频道,阻塞式 key = str(item['data']) #取出通知内容 logger.info(f'notice {key}') if __name__ == "__main__": reids_conn = redis_init() executor = ThreadPoolExecutor(max_workers=2) executor.submit(hook_notice, reids_conn) executor.submit(set_delayed_notification, reids_conn) time.sleep(10)
关于这种方式在实际应用中的缺点主要有,强依赖于redis的稳定,如果redis出现波动可能会造成某些键过期之后不通知,并且这种方式需要对通知的时间精准度要有一定的容忍度,可能会提前或滞后几百毫秒左右。
redis的过期基于redisDb数据结构
typedef struct redisDb { dict *dict; /* 键空间 key space */ dict *expires; /* 过期字典 */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ struct evictionPoolEntry *eviction_pool; /* Eviction pool of keys */ int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ } redisDb;
键空间(key space):dict字典用来保存数据库中的所有键值对
过期字典(expires):保存数据库中所有键的过期时间,过期时间用UNIX时间戳表示,且值为long long整数。
int expireIfNeeded(redisDb *db, robj *key) { /* 取出键的过期时间 */ mstime_t when = getExpire(db,key); mstime_t now; /* 没有过期时间返回0*/ if (when < 0) return 0; /* No expire for this key */ /* 服务器loading时*/ if (server.loading) return 0; /* 根据一定规则获取当前时间*/ now = server.lua_caller ? server.lua_time_start : mstime(); /* 如果当前的是从(Slave)服务器 0认为key为无效 */ if (server.masterhost != NULL) return now > when; /* key未过期,返回 0 */ if (now <= when) return 0; /* 删除键 */ server.stat_expiredkeys++; propagateexpire(db,key,server.lazyfree_lazy_expire); notifykeyspaceevent(notify_expired, "expired",key,db->id); return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key); }
for (j = 0; j < dbs_per_call; j++) { int expired; redisDb *db = server.db+(current_db % server.dbnum); current_db++; /* 超过25%的key已过期,则继续. */ do { unsigned long num, slots; long long now, ttl_sum; int ttl_samples; /* 如果该db没有设置过期key,则继续看下个db*/ if ((num = dictSize(db->expires)) == 0) { db->avg_ttl = 0; break; } slots = dictSlots(db->expires); now = mstime(); /*但少于1%时,需要调整字典大小*/ if (num && slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1)) break; expired = 0; ttl_sum = 0; ttl_samples = 0; if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;// 20 while (num--) { dictEntry *de; long long ttl; if ((de = dictGetRandomKey(db->expires)) == NULL) break; ttl = dictGetSignedIntegerVal(de)-now; if (activeExpireCycleTryExpire(db,de,now)) expired++; if (ttl > 0) { /* We want the average TTL of keys yet not expired. */ ttl_sum += ttl; ttl_samples++; } } /* Update the average TTL stats for this database. */ if (ttl_samples) { long long avg_ttl = ttl_sum/ttl_samples; /样本获取移动平均值 */ if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); } iteration++; if ((iteration & 0xf) == 0) { /* 每迭代16次检查一次 */ long long elapsed = ustime()-start; latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); if (elapsed > timelimit) timelimit_exit = 1; } /* 超过时间限制则退出*/ if (timelimit_exit) return; /* 在当前db中,如果少于25%的key过期,则停止继续删除过期key */ } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); }
依次遍历16个db,每个db选择20个key,判断key是否过期,如果当次选的key少于少于25%过期则进行下一个db如果多余25%再2234再来一次,并且在总用时超过250ms也停止这一个过程。