在很多场景我们需要限制用户操作频次,比如发验证码、访问接口... ...使用redis可以有多种方式实现频次限制。
laravel路由限流中间件用的就是这种方法,根据用户访问接口、用户id等生成唯一key,并设置有效期限,在有效期限内,每接到一个访问就把key的value + 1,并返回在该有效期内还可以访问多少次,在有效期内value到达限制值即返回429错误,并返回多少秒后会重新计算和重新计算的时间点。
if(key 不存在 || key 过期){ setnx(key,1,limit); }else{ if(get(key))< limit){ incr(key); return "还可以写入limit-get(key)个数据"; }else{ return "超出限制,需要等待ttl(key) s"; } }
这种算法把时间划为“时间段”,比如1分钟内限制访问600次,如果某个用户在10:00:55-10:00:59秒内疯狂点击了580次,正常来说,在10:00:55秒之前他最多只能点击20次,但是按我们这种算法,到10:01:00我们会把上一分钟的数据清空,所以他10:01:00-10:01:01就可以再点击600次,即没有“时间滑块”的概念,我们没有做到任何一个时间点前后一分钟内都只能在限制次数内访问。
但是它非常轻量,实现简单,又不会随着限制次数增大而增大空间占用,非常合适做接口访问限制。
laravel中有此方式的其他实现方法,ThrottleRequestsWithRedis.php,它extends 了 ThrottleRequests,引用了\vendor\laravel\framework\src\Illuminate\Redis\Limiters\DurationLimiter.php以限流。它维持了一个hash key,{start:起始时间戳,end:超时时间戳,count:当前访问次数}。它给key了双倍过期时间以对抗网络波动等问题,但是实现原理和key过期是一样的。
这段代码是由lua编写,这是为了保证其原子性,否则就要单独写锁的业务了。
/** * Get the Lua script for acquiring a lock. * * KEYS[1] - The limiter name * ARGV[1] - Current time in microseconds 当前时间:毫秒、浮点数 * ARGV[2] - Current time in seconds 当前时间:秒 * ARGV[3] - Duration of the bucket 限制时间 * ARGV[4] - Allowed number of tasks 最大尝试次数 * * @return string */ protected function luaScript() { return <<<'LUA' --初始化函数 local function reset() redis.call('HMSET', KEYS[1], 'start', ARGV[2], 'end', ARGV[2] + ARGV[3], 'count', 1) return redis.call('EXPIRE', KEYS[1], ARGV[3] * 2) end --不存在时 初始化 if redis.call('EXISTS', KEYS[1]) == 0 then return {reset(), ARGV[2] + ARGV[3], ARGV[4] - 1} end --在时间窗内 if ARGV[1] >= redis.call('HGET', KEYS[1], 'start') and ARGV[1] <= redis.call('HGET', KEYS[1], 'end') then return { tonumber(redis.call('HINCRBY', KEYS[1], 'count', 1)) <= tonumber(ARGV[4]), redis.call('HGET', KEYS[1], 'end'), ARGV[4] - redis.call('HGET', KEYS[1], 'count') } end --保底 return {reset(), ARGV[2] + ARGV[3], ARGV[4] - 1} LUA; }
简单限流相对于上述时间段限流不同之处在于它维护了一个“一段时间内已经处理的请求”数据,它可以限制在任何滑动时间内处理的请求数量都在限制之内。
维护这个数据可以用redis的队列、hset、zset...只要能存储请求,并且能存储这些请求的逐出优先级即可。
if(key 不存在 || key 过期){ 初始化key并设置过期时间; }else{ if(count(key))< limit){ 写入新元素并设置过期时间; 将key中超出限制的元素逐出 可以用hash、list、set或者自己写数组按时间等记录 有明确而唯一的先后顺序 //要确保写入后key长度还在限制内; return "还可以写入limit-count(key)个数据"; }else{ return "超出限制,需要等待ttl(最近需要逐出的元素)"; } }
它的缺点也很明显,需要维持当前允许的数据列,这个开销随着limit而增大,如果接口允许某个用户1分钟6w次点击,难道要维持6w个未被逐出的元素吗?
假设我们有一个漏斗限流器,容量60滴水,频率限制为1小时允许360次访问,即每过10s漏斗就会流出1滴水(一次访问机会),这是漏斗流出速度,用户的访问频率是流入速度,在漏斗容量范围内,用户可以把水装满,即第0.1秒内我们就可以访问60次,把漏斗中灌满,之后就每秒判断一次有没有水漏出来,如果有1滴水漏出就说明有了新的访问空间被腾出,没有就等一会儿再去询问。
假设经过了1分钟我们都没访问,此时漏斗已经流走6滴水,空出了6个访问空间,此时我们以每10s一次的频率访问接口,流入和流出速度达到平衡,漏斗内会一直维持在6个访问空间。假如我们访问速度比流出快,漏斗很快灌满,此时需要再次等待腾出访问空间再访问。
漏斗限流器所占空间和限制大小无关。
redis4.0提供了redis-cell限流模块(这个模块使用Rust编写,需要安装此插件),它就使用了漏斗算法。
> cl.throttle kaka:write 20 30 60 1) (integer) 0 # 0 表示允许,1 表示拒绝 2) (integer) 5 # 漏斗容量 capacity 3) (integer) 4 # 漏斗剩余空间 left_quota,从0开始计算 4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒) 5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)