在我们日常开发中,基本都会使用到数据库来进行数据的存储,在一般的系统中通常不会存在高并发的情况,所以单用数据库也不会有什么问题,可是一旦涉及到大量数据的需求,例如买票、商品订单等等,或者是网站首页访问量瞬间较大的时候,我们单一只使用数据库的话,这些压力会直接面向磁盘,而磁盘的读/写速度比较慢的问题存在着严重的性能弊端,一瞬间如果有成千上万的请求,那么就需要系统在短时间内完成成千上万的读/写操作,这个时候的数据库往往承受不了,会进一步导致系统瘫痪,最终服务器宕机。
存在上述问题时,项目中通常会引入NoSql技术,这是一种基于内存的数据库,并且提供一定的持久化功能。
我们现在要说的redis技术其实也是NoSql技术中心的一种,但是引入redis有可能会出现缓存的穿透、击穿、雪崩等问题。这里结合其他大佬的一些经验和自己的总结,来整理一份笔记,对这三种问题来一个简单的描述并说一下这些问题的解决方法。
缓存穿透: key对应的数据源并不存在,每次针对key的请求从缓存中获取不到,请求都会落到数据源,比如用一个不存在的id去获取信息,不论缓存还是数据库都没有,如果有人利用类似的漏洞对网站进行攻击,会直接压垮数据库。
缓存击穿: key对应的数据存在,但在redis中已经过期,此时如果有大量的请求并发,这些请求发现缓存过期后会从后端DB中加载数据,并且回设到缓存,如果这时的并发请求很大的话,会瞬间将数据库压垮,进一步造成服务器瘫痪。
缓存雪崩: 当用于缓存的服务器重启或者大量的缓存因为其他原因在某一时间段失效,在缓存失效的时候,也会直接给后端系统,例如DB等,带来很大压力。
一个一定不存在的缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错的考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,这样也就失去了缓存的意义。
总结出了两种较常用的方法,可以有效的解决缓存穿透的问题。
布隆过滤器: 大概就是将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。
详解: 判断一个元素是否存在一个数组里面,如下图,利用二进制去做一个存储,占用内存比较小,0代表不存在,1代表存在,添加和查询的效率很快,当保存了一个数值会经过一个算法将对应的值保存到布隆过滤器集合上的某一个位置,某个位置上可能会存在多个key,当传进来一个不存在的key值,和集合进行匹配,如果匹配不上会返回一个null。
缺点:
粗暴方式: 把空的数据也缓存进来,比如空字符串、空对象、空数组或者list。
if(list != null && list.size() > 0) { redisOperator.set("subDog:" + rootDogId, JsonUtils.objectToJson(list)); } else { redisOperator.set("subDog:" + rootDogId, JsonUtils.objectToJson(list), 5 * 60); }
key可能会在某个时间点被超高并发的访问,是一种非常热点的数据,这个时候,需要考虑一个问题,缓存被击穿的问题。
使用互斥锁(mutex key)
业界比较常用的做法,是使用mutex。简单地来说,就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db,而是先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX或者Memcache的ADD)去set一个mutex key,当操作返回成功时,再进行load db的操作并回设缓存;否则,就重试整个get缓存的方法。
SETNX,是「SET if Not eXists」的缩写,也就是只有不存在的时候才设置,可以利用它来实现锁的效果。
public String get(String key){ String value = redis.get(key); if (value == null) //代表缓存值过期 { //设置3min的超时,防着del操作失败的时候,下次缓存期一直不能load db if (redis.setnx(key_mutex, 1, 3 * 60) == 1) //代表设置成功 { value = db.get(key); redis.set(key, value, expire_secs); redis.del(key_mutex); } else { //这个时候代表同时候其他线程已经load db并回设到缓存了,这时候重试获取缓存值即可 sleep(50); get(key); //重试 } } else { return value; } }
缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前置则是针对某一个key。
缓存正常从Redis中获取,示意图如下:
缓存失效瞬间示意图如下:
缓存失效时的雪崩效应对底层系统的冲击非常可怕!大多数系统设计者考虑用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。还有一个简单方案就时讲缓存失效时间分散开,比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
加锁排队,代码如下:
//伪代码 public object GetProductListNew() { int cacheTime = 30; String cacheKey = "product_list"; String lockKey = cacheKey; String cacheValue = CacheHelper.get(cacheKey); if (cacheValue != null) { return cacheValue; } else { synchronized(lockKey) { cacheValue = CacheHelper.get(cacheKey); if (cacheValue != null) { return cacheValue; } else { //这里一般是sql查询数据 cacheValue = GetProductListFromDB(); CacheHelper.Add(cacheKey, cacheValue, cacheTime); } } return cacheValue; } }
加锁排队只是为了减轻数据库的压力,并没有提高系统吞吐量。假设在高并发下,缓存重建期间key是锁着的,这是过来1000个请求999个都在阻塞的。同样会导致用户等待超时,这是个治标不治本的方法!
注意:加锁排队的解决方式分布式环境的并发问题,有可能还要解决分布式锁的问题;线程还会被阻塞,用户体验很差!因此,在真正的高并发场景下很少使用!
随机值代码
//伪代码 public object GetProductListNew() { int cacheTime = 30; String cacheKey = "product_list"; //缓存标记 String cacheSign = cacheKey + "_sign"; String sign = CacheHelper.Get(cacheSign); //获取缓存值 String cacheValue = CacheHelper.Get(cacheKey); if (sign != null) { return cacheValue; //未过期,直接返回 } else { CacheHelper.Add(cacheSign, "1", cacheTime); ThreadPool.QueueUserWorkItem((arg) -> { //这里一般是 sql查询数据 cacheValue = GetProductListFromDB(); //日期设缓存时间的2倍,用于脏读 CacheHelper.Add(cacheKey, cacheValue, cacheTime * 2); }); return cacheValue; } }
解释说明:
关于缓存崩溃的解决方法,这里提出了三种方案:使用锁或队列、设置过期标志更新缓存、为key设置不同的缓存失效时间,还有一种被称为“二级缓存”的解决方法。
整理如下:
1.设置对应热点key永不过期
2.过期时间错开,过期时间使用随机生成,并且热点数据的过期时间设置的长一点,非热点数据可以设置短一点
3.多缓存结合,例如:请求进入,可以现请求redis,当redis中不存在的时候再去请求memcache,如果都没有再去请求db
4.采购第三方Redis(阿里云或者腾讯云上的redis)
引用大佬的一句话,针对业务系统,永远都是具体情况具体分析,没有最好,只有最合适。
参考相关连接:
https://www.cnblogs.com/xichji/p/11286443.html
https://blog.csdn.net/weixin_45445879/article/details/109321652