为什么使用Nosql
1、单机Mysql时代
90年代,一个网站的访问量一般不会太大,单个数据库完全够用。随着用户增多,网站出现以下问题
数据量增加到一定程度,单机数据库就放不下了
数据的索引(B+ Tree),一个机器内存也存放不下
访问量变大后(读写混合),一台服务器承受不住。
2、Memcached(缓存) + Mysql + 垂直拆分(读写分离)
网站80%的情况都是在读,每次都要去查询数据库的话就十分的麻烦!所以说我们希望减轻数据库的压力,我们可以使用缓存来保证效率!
优化过程经历了以下几个过程:
优化数据库的数据结构和索引(难度大)
文件缓存,通过IO流获取比每次都访问数据库效率略高,但是流量爆炸式增长时候,IO流也承受不了
MemCache,当时最热门的技术,通过在数据库和数据库访问层之间加上一层缓存,第一次访问时查询数据库,将结果保存到缓存,后续的查询先检查缓存,若有直接拿去使用,效率显著提升。
3、分库分表 + 水平拆分 + Mysql集群
信息分散保存在每个集群中
4、如今最近的年代
如今信息量井喷式增长,各种各样的数据出现(用户定位数据,图片数据等),大数据的背景下关系型数据库(RDBMS)无法满足大量数据要求。Nosql数据库就能轻松解决这些问题。
目前一个基本的互联网项目
为什么要用NoSQL ?
用户的个人信息,社交网络,地理位置。用户自己产生的数据,用户日志等等爆发式增长!
这时候我们就需要使用NoSQL数据库的,Nosql可以很好的处理以上的情况!
什么是Nosql
NoSQL = Not Only SQL(不仅仅是SQL)
Not Only Structured Query Language
关系型数据库:列+行,同一个表下数据的结构是一样的。
非关系型数据库:数据存储没有固定的格式,并且可以进行横向扩展。
Nosql特点:
方便扩展(数据之间没有关系,很好扩展!)
大数据量高性能(Redis一秒可以写8万次,读11万次,NoSQL的缓存记录级,是一种细粒度的缓存,性能会比较高!)
数据类型是多样型的!(不需要事先设计数据库,随取随用)
传统的 RDBMS 和 NoSQL
传统的 RDBMS(关系型数据库)
大数据时代的3V :主要是描述问题的
海量Velume
多样Variety
实时Velocity
大数据时代的3高 : 主要是对程序的要求
高并发
高可扩
高性能
Nosql的四大分类
KV键值对
新浪:Redis
美团:Redis + Tair
阿里、百度:Redis + Memcache
文档型数据库(bson数据格式):
MongoDB(掌握)
基于分布式文件存储的数据库。C++编写,用于处理大量文档。
MongoDB是RDBMS和NoSQL的中间产品。MongoDB是非关系型数据库中功能最丰富的,NoSQL中最像关系型数据库的数据库。
ConthDB
列存储数据库
HBase(大数据必学)
分布式文件系统
图关系数据库
用于广告推荐,社交网络
Neo4j、InfoGrid
Redis是什么?
Redis(Remote Dictionary Server ),即远程字典服务。
是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。
与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
Redis能该干什么?
内存存储、持久化,内存是断电即失的,所以需要持久化(RDB、AOF)
高效率、用于高速缓冲
发布订阅系统
地图信息分析
计时器、计数器(eg:浏览量)
特性
多样的数据类型
持久化
集群
事务
环境搭建
官网:https://redis.io/
推荐使用Linux服务器学习。
windows版本的Redis已经停更很久了…
Windows安装
https://github.com/dmajkic/redis
解压安装包
开启redis-server.exe
启动redis-cli.exe测试
Linux安装
1.redis官网下载安装包!
2.解压Redis的安装包!程序一般放在 /opt 目录下
3.基本环境安装
yum install gcc-c++ # 然后进入redis目录下执行 make # 然后执行 make install
4.redis默认安装路径 /usr/local/bin
5.将redis的配置文件复制到 程序安装目录 /usr/local/bin/luo-config下
6.redis默认不是后台启动的,需要修改配置文件!
7.通过制定的配置文件启动redis服务
8.使用redis-cli连接指定的端口号测试,Redis的默认端口6379
9.查看redis进程是否开启
测试性能
redis-benchmark:Redis官方提供的性能测试工具,参数选项如下
简单测试
# 测试:100个并发连接 100000请求 redis-benchmark -h localhost -p 6379 -c 100 -n 100000
测试分析
基础知识
redis默认有16个数据库
16个数据库为:DB 0~DB 15
默认使用DB 0 ,可以使用select n切换到DB n,DBSIZE可以查看当前数据库的大小,与key数量相关。
127.0.0.1:6379> config get databases # 命令行查看数据库数量databases 1) "databases" 2) "16" 127.0.0.1:6379> select 8 # 切换数据库 DB 8 OK 127.0.0.1:6379[8]> dbsize # 查看数据库大小 (integer) 0 # 不同数据库之间 数据是不能互通的,并且dbsize 是根据库中key的个数。 127.0.0.1:6379> set name sakura OK 127.0.0.1:6379> SELECT 8 OK 127.0.0.1:6379[8]> get name # db8中并不能获取db0中的键值对。 (nil) 127.0.0.1:6379[8]> DBSIZE (integer) 0 127.0.0.1:6379[8]> SELECT 0 OK 127.0.0.1:6379> keys * 1) "counter:__rand_int__" 2) "mylist" 3) "name" 4) "key:__rand_int__" 5) "myset:__rand_int__" 127.0.0.1:6379> DBSIZE # size和key个数相关 (integer) 5
keys * :查看当前数据库中所有的key。
flushdb:清空当前数据库中的键值对。
flushall:清空所有数据库的键值对。
Redis是单线程的,Redis是基于内存操作的。
所以Redis的性能瓶颈不是CPU,而是机器内存和网络带宽。
那么为什么Redis的速度如此快呢,性能这么高呢?QPS达到10W+
Redis为什么单线程还这么快?
误区1:高性能的服务器一定是多线程的?
误区2:多线程(CPU上下文会切换!)一定比单线程效率高!
核心:Redis是将所有的数据放在内存中的,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换:耗时的操作!),对于内存系统来说,如果没有上下文切换效率就是最高的,多次读写都是在一个CPU上的,在内存存储数据情况下,单线程就是最佳的方案。
Redis是一个开源(BSD许可),内存存储的数据结构服务器,可用作数据库,高速缓存和消息队列代理。它支持字符串、哈希表、列表、集合、有序集合,位图,hyperloglogs等数据类型。内置复制、Lua脚本、LRU收回、事务以及不同级别磁盘持久化功能,同时通过Redis Sentinel提供高可用,通过Redis Cluster提供自动分区。
Redis-key
在redis中无论什么数据类型,在数据库中都是以key-value形式保存,通过进行对Redis-key的操作,来完成对数据库中数据的操作。
下面学习的命令:
exists key:判断键是否存在
del key:删除键值对
move key db:将键值对移动到指定数据库
expire key second:设置键值对的过期时间
type key:查看value的数据类型
127.0.0.1:6379> keys * # 查看当前数据库所有key (empty list or set) 127.0.0.1:6379> set name qinjiang # set key OK 127.0.0.1:6379> set age 20 OK 127.0.0.1:6379> keys * 1) "age" 2) "name" 127.0.0.1:6379> move age 1 # 将键值对移动到指定数据库 (integer) 1 127.0.0.1:6379> EXISTS age # 判断键是否存在 (integer) 0 # 不存在 127.0.0.1:6379> EXISTS name (integer) 1 # 存在 127.0.0.1:6379> SELECT 1 OK 127.0.0.1:6379[1]> keys * 1) "age" 127.0.0.1:6379[1]> del age # 删除键值对 (integer) 1 # 删除个数 127.0.0.1:6379> set age 20 OK 127.0.0.1:6379> EXPIRE age 15 # 设置键值对的过期时间 (integer) 1 # 设置成功 开始计数 127.0.0.1:6379> ttl age # 查看key的过期剩余时间 (integer) 13 127.0.0.1:6379> ttl age (integer) 11 127.0.0.1:6379> ttl age (integer) 9 127.0.0.1:6379> ttl age (integer) -2 # -2 表示key过期,-1表示key未设置过期时间 127.0.0.1:6379> get age # 过期的key 会被自动delete (nil) 127.0.0.1:6379> keys * 1) "name" 127.0.0.1:6379> type name # 查看value的数据类型 string
关于TTL命令
Redis的key,通过TTL命令返回key的过期时间,一般来说有3种:
当前key没有设置过期时间,所以会返回-1.
当前key有设置过期时间,而且key已经过期,所以会返回-2.
当前key有设置过期时间,且key还没有过期,故会返回key的正常剩余时间.
关于重命名RENAME和RENAMENX
RENAME key newkey修改 key 的名称
RENAMENX key newkey仅当 newkey 不存在时,将 key 改名为 newkey 。
set user:1{name:luo ,age: 18},设置一个user:1对象值为json字符来保存一个对象,这里的key是一个巧妙的设计,user:{id}:{filed},
Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)
一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表超过40亿个元素)。
列表可以经过规则定义将其变为队列、栈、双端队列等
Redis中List是可以进行双端操作的,所以命令也就分为了LXXX和RLLL两类,有时候L也表示List例如LLEN
应用:
消息排队!消息队列(Lpush Rpop),栈(Lpush Lpop)
Redis的Set是string类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。
Redis 中 集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。
集合中最大的成员数为 232 - 1 (4294967295, 每个集合可存储40多亿个成员)。
应用:微博关注及粉丝,共同关注和共同爱好
Redis hash 是一个string类型的field和value的映射表,hash特别适合用于存储对象。
Set就是一种简化的Hash,只变动key,而value使用默认值填充。可以将一个Hash表作为一个对象进行存储,表中存放对象的信息。
Hash变更的数据user name age,尤其是用户信息之类的,经常变动的信息!Hash更适合于对象的存储,Sring更加适合字符串存储!
不同的是每个元素都会关联一个double类型的分数(score)。redis正是通过分数来为集合中的成员进行从小到大的排序。
score相同:按字典顺序排序
有序集合的成员是唯一的,但分数(score)却可以重复。
-------------------ZADD--ZCARD--ZCOUNT-------------- 127.0.0.1:6379> ZADD myzset 1 m1 2 m2 3 m3 # 向有序集合myzset中添加成员m1 score=1 以及成员m2 score=2.. (integer) 2 127.0.0.1:6379> ZCARD myzset # 获取有序集合的成员数 (integer) 2 127.0.0.1:6379> ZCOUNT myzset 0 1 # 获取score在 [0,1]区间的成员数量 (integer) 1 127.0.0.1:6379> ZCOUNT myzset 0 2 (integer) 2 ----------------ZINCRBY--ZSCORE-------------------------- 127.0.0.1:6379> ZINCRBY myzset 5 m2 # 将成员m2的score +5 "7" 127.0.0.1:6379> ZSCORE myzset m1 # 获取成员m1的score "1" 127.0.0.1:6379> ZSCORE myzset m2 "7" --------------ZRANK--ZRANGE----------------------------------- 127.0.0.1:6379> ZRANK myzset m1 # 获取成员m1的索引,索引按照score排序,score相同索引值按字典顺序顺序增加 (integer) 0 127.0.0.1:6379> ZRANK myzset m2 (integer) 2 127.0.0.1:6379> ZRANGE myzset 0 1 # 获取索引在 0~1的成员 1) "m1" 2) "m3" 127.0.0.1:6379> ZRANGE myzset 0 -1 # 获取全部成员 1) "m1" 2) "m3" 3) "m2" #testset=>{abc,add,amaze,apple,back,java,redis} score均为0 ------------------ZRANGEBYLEX--------------------------------- 127.0.0.1:6379> ZRANGEBYLEX testset - + # 返回所有成员 1) "abc" 2) "add" 3) "amaze" 4) "apple" 5) "back" 6) "java" 7) "redis" 127.0.0.1:6379> ZRANGEBYLEX testset - + LIMIT 0 3 # 分页 按索引显示查询结果的 0,1,2条记录 1) "abc" 2) "add" 3) "amaze" 127.0.0.1:6379> ZRANGEBYLEX testset - + LIMIT 3 3 # 显示 3,4,5条记录 1) "apple" 2) "back" 3) "java" 127.0.0.1:6379> ZRANGEBYLEX testset (- [apple # 显示 (-,apple] 区间内的成员 1) "abc" 2) "add" 3) "amaze" 4) "apple" 127.0.0.1:6379> ZRANGEBYLEX testset [apple [java # 显示 [apple,java]字典区间的成员 1) "apple" 2) "back" 3) "java" -----------------------ZRANGEBYSCORE--------------------- 127.0.0.1:6379> ZRANGEBYSCORE myzset 1 10 # 返回score在 [1,10]之间的的成员 1) "m1" 2) "m3" 3) "m2" 127.0.0.1:6379> ZRANGEBYSCORE myzset 1 5 1) "m1" 2) "m3" --------------------ZLEXCOUNT----------------------------- 127.0.0.1:6379> ZLEXCOUNT testset - + (integer) 7 127.0.0.1:6379> ZLEXCOUNT testset [apple [java (integer) 3 ------------------ZREM--ZREMRANGEBYLEX--ZREMRANGBYRANK--ZREMRANGEBYSCORE-------------------------------- 127.0.0.1:6379> ZREM testset abc # 移除成员abc (integer) 1 127.0.0.1:6379> ZREMRANGEBYLEX testset [apple [java # 移除字典区间[apple,java]中的所有成员 (integer) 3 127.0.0.1:6379> ZREMRANGEBYRANK testset 0 1 # 移除排名0~1的所有成员 (integer) 2 127.0.0.1:6379> ZREMRANGEBYSCORE myzset 0 3 # 移除score在 [0,3]的成员 (integer) 2 # testset=> {abc,add,apple,amaze,back,java,redis} score均为0 # myzset=> {(m1,1),(m2,2),(m3,3),(m4,4),(m7,7),(m9,9)} ----------------ZREVRANGE--ZREVRANGEBYSCORE--ZREVRANGEBYLEX----------- 127.0.0.1:6379> ZREVRANGE myzset 0 3 # 按score递减排序,然后按索引,返回结果的 0~3 1) "m9" 2) "m7" 3) "m4" 4) "m3" 127.0.0.1:6379> ZREVRANGE myzset 2 4 # 返回排序结果的 索引的2~4 1) "m4" 2) "m3" 3) "m2" 127.0.0.1:6379> ZREVRANGEBYSCORE myzset 6 2 # 按score递减顺序 返回集合中分数在[2,6]之间的成员 1) "m4" 2) "m3" 3) "m2" 127.0.0.1:6379> ZREVRANGEBYLEX testset [java (add # 按字典倒序 返回集合中(add,java]字典区间的成员 1) "java" 2) "back" 3) "apple" 4) "amaze" -------------------------ZREVRANK------------------------------ 127.0.0.1:6379> ZREVRANK myzset m7 # 按score递减顺序,返回成员m7索引 (integer) 1 127.0.0.1:6379> ZREVRANK myzset m2 (integer) 4 # mathscore=>{(xm,90),(xh,95),(xg,87)} 小明、小红、小刚的数学成绩 # enscore=>{(xm,70),(xh,93),(xg,90)} 小明、小红、小刚的英语成绩 -------------------ZINTERSTORE--ZUNIONSTORE----------------------------------- 127.0.0.1:6379> ZINTERSTORE sumscore 2 mathscore enscore # 将mathscore enscore进行合并 结果存放到sumscore (integer) 3 127.0.0.1:6379> ZRANGE sumscore 0 -1 withscores # 合并后的score是之前集合中所有score的和 1) "xm" 2) "160" 3) "xg" 4) "177" 5) "xh" 6) "188" 127.0.0.1:6379> ZUNIONSTORE lowestscore 2 mathscore enscore AGGREGATE MIN # 取两个集合的成员score最小值作为结果的 (integer) 3 127.0.0.1:6379> ZRANGE lowestscore 0 -1 withscores 1) "xm" 2) "70" 3) "xg" 4) "87" 5) "xh" 6) "93"
应用案例:
set排序 存储班级成绩表 工资表排序!
普通消息,1.重要消息 2.带权重进行判断
排行榜应用实现,取Top N测试
底层使用经纬度定位地理坐标并用一个有序集合zset保存,所以zset命令也可以使用
有效经纬度
有效的经度从-180度到180度。
有效的纬度从-85.05112878度到85.05112878度。
指定单位的参数 unit 必须是以下单位的其中一个:
m 表示单位为米。
km 表示单位为千米。
mi 表示单位为英里。
ft 表示单位为英尺。
应用:通过georadius就可以完成 附近的人功能
----------------georadius--------------------- 127.0.0.1:6379> GEORADIUS china:city 120 30 500 km withcoord withdist # 查询经纬度(120,30)坐标500km半径内的成员 1) 1) "hangzhou" 2) "29.4151" 3) 1) "120.20000249147415" 2) "30.199999888333501" 2) 1) "shanghai" 2) "205.3611" 3) 1) "121.40000134706497" 2) "31.400000253193539" ------------geohash--------------------------- 127.0.0.1:6379> geohash china:city yichang shanghai # 获取成员经纬坐标的geohash表示 1) "wmrjwbr5250" 2) "wtw6ds0y300"
Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。
花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。
因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
其底层使用string数据类型
----------PFADD--PFCOUNT--------------------- 127.0.0.1:6379> PFADD myelemx a b c d e f g h i j k # 添加元素 (integer) 1 127.0.0.1:6379> type myelemx # hyperloglog底层使用String string 127.0.0.1:6379> PFCOUNT myelemx # 估算myelemx的基数 (integer) 11 127.0.0.1:6379> PFADD myelemy i j k z m c b v p q s (integer) 1 127.0.0.1:6379> PFCOUNT myelemy (integer) 11 ----------------PFMERGE----------------------- 127.0.0.1:6379> PFMERGE myelemz myelemx myelemy # 合并myelemx和myelemy 成为myelemz OK 127.0.0.1:6379> PFCOUNT myelemz # 估算基数 (integer) 17
应用场景:
网页的访问量(UV):一个用户多次访问,也只能算作一个人。
传统实现,用set存储用户的id,然后每次进行比较。当用户变多之后这种方式及其浪费空间,而我们的目的只是计数,Hyperloglog就能帮助我们利用最小的空间完成。
如果允许容错,那么一定可以使用Hyperloglog !
如果不允许容错,就使用set或者自己的数据类型即可 !
使用位存储,信息状态只有 0 和 1
Bitmap是一串连续的2进制数字(0或1),每一位所在的位置为偏移(offset),在bitmap上可执行AND,OR,XOR,NOT以及其它位操作
------------setbit--getbit-------------- 127.0.0.1:6379> setbit sign 0 1 # 设置sign的第0位为 1 (integer) 0 127.0.0.1:6379> setbit sign 2 1 # 设置sign的第2位为 1 不设置默认 是0 (integer) 0 127.0.0.1:6379> setbit sign 3 1 (integer) 0 127.0.0.1:6379> setbit sign 5 1 #假设星期六签到了,状态为1 (integer) 0 127.0.0.1:6379> type sign string 127.0.0.1:6379> getbit sign 2 # 获取第2位的数值 (integer) 1 127.0.0.1:6379> getbit sign 3 (integer) 1 127.0.0.1:6379> getbit sign 4 # 未设置默认是0 (integer) 0 -----------bitcount---------------------------- 127.0.0.1:6379> BITCOUNT sign # 统计sign中为1的位数 (integer) 4
应用场景
签到统计、状态统计
Redis的单条命令是保证原子性的,但是redis事务不能保证原子性
Redis事务本质:一组命令的集合。
----------------- 队列 set set set 执行 -------------------
事务中每条命令都会被序列化,执行过程中按顺序执行,不允许其他命令进行干扰。
一次性,顺序性,排他性
单独的隔离操作,事务中所有命令都会被序列化、按顺序地执行,事务在执行的过程中,不被其他客户端发送来的命令所打断
Redis事务没有隔离级别的概念,Redis单条命令是保证原子性的,但是事务不保证原子性!
Redis事务操作过程
开启事务(multi)
命令入队
执行事务(exec)
所以事务中的命令在加入时都没有被执行,直到提交时才会开始执行(Exec)一次性完成。
127.0.0.1:6379> multi # 开启事务 OK 127.0.0.1:6379> set k1 v1 # 命令入队 QUEUED 127.0.0.1:6379> set k2 v2 # .. QUEUED 127.0.0.1:6379> get k1 QUEUED 127.0.0.1:6379> set k3 v3 QUEUED 127.0.0.1:6379> keys * QUEUED 127.0.0.1:6379> exec # 事务执行 1) OK 2) OK 3) "v1" 4) OK 5) 1) "k3" 2) "k2" 3) "k1"
取消事务(discurd)
127.0.0.1:6379> multi OK 127.0.0.1:6379> set k1 v1 QUEUED 127.0.0.1:6379> set k2 v2 QUEUED 127.0.0.1:6379> DISCARD # 放弃事务 OK 127.0.0.1:6379> EXEC (error) ERR EXEC without MULTI # 当前未开启事务 127.0.0.1:6379> get k1 # 被放弃事务中命令并未执行 (nil)
事务错误
代码语法错误(编译时异常)所有的命令都不执行
127.0.0.1:6379> multi OK 127.0.0.1:6379> set k1 v1 QUEUED 127.0.0.1:6379> set k2 v2 QUEUED 127.0.0.1:6379> error k1 # 这是一条语法错误命令 (error) ERR unknown command `error`, with args beginning with: `k1`, # 会报错但是不影响后续命令入队 127.0.0.1:6379> get k2 QUEUED 127.0.0.1:6379> EXEC (error) EXECABORT Transaction discarded because of previous errors. # 执行报错 127.0.0.1:6379> get k1 (nil) # 其他命令并没有被执行
代码逻辑错误 (运行时异常) 其他命令可以正常执行 >>> 所以不保证事务原子性
127.0.0.1:6379> multi OK 127.0.0.1:6379> set k1 v1 QUEUED 127.0.0.1:6379> set k2 v2 QUEUED 127.0.0.1:6379> INCR k1 # 这条命令逻辑错误(对字符串进行增量) QUEUED 127.0.0.1:6379> get k2 QUEUED 127.0.0.1:6379> exec 1) OK 2) OK 3) (error) ERR value is not an integer or out of range # 运行时报错 4) "v2" # 其他命令正常执行 # 虽然中间有一条命令报错了,但是后面的指令依旧正常执行成功了。 # 所以说Redis单条指令保证原子性,但是Redis事务不能保证原子性。
监控
悲观锁:
很悲观,认为什么时候都会出现问题,无论做什么都会加锁
乐观锁:
很乐观,认为什么时候都不会出现问题,所以不会上锁!更新数据的时候去判断一下,在此期间是否有人修改过这个数据
获取version
更新的时候比较version
使用watch key监控指定数据,相当于乐观锁加锁。
正常执行
127.0.0.1:6379> set money 100 # 设置余额:100 OK 127.0.0.1:6379> set use 0 # 支出使用:0 OK 127.0.0.1:6379> watch money # 监视money (上锁) OK 127.0.0.1:6379> multi OK 127.0.0.1:6379> DECRBY money 20 QUEUED 127.0.0.1:6379> INCRBY use 20 QUEUED 127.0.0.1:6379> exec # 监视值没有被中途修改,事务正常执行 1) (integer) 80 2) (integer) 20
测试多线程修改值,使用watch可以当做redis的乐观锁操作(相当于getversion)
我们启动另外一个客户端模拟插队线程。
线程1:
127.0.0.1:6379> watch money # money上锁 OK 127.0.0.1:6379> multi OK 127.0.0.1:6379> DECRBY money 20 QUEUED 127.0.0.1:6379> INCRBY use 20 QUEUED 127.0.0.1:6379> # 此时事务并没有执行
模拟线程插队,另一个客户端,线程2:
127.0.0.1:6379> INCRBY money 500 # 修改了线程一中监视的money (integer) 600
回到线程1,执行事务:
127.0.0.1:6379> EXEC # 执行之前,另一个线程修改了我们的值,这个时候就会导致事务执行失败 (nil) # 没有结果,说明事务执行失败 127.0.0.1:6379> get money # 线程2 修改生效 "600" 127.0.0.1:6379> get use # 线程1事务执行失败,数值没有被修改 "0"
锁已经失效了,解锁获取最新值,然后再加锁进行事务。
unwatch进行解锁。
注意:每次提交执行exec后都会自动释放锁,不管是否成功
使用Java来操作Redis,Jedis是Redis官方推荐使用的Java连接redis的客户端。
<!--导入jredis的包--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.2.0</version> </dependency> <!--fastjson--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.70</version> </dependency>
1、开启防火墙
systemctl start firewalld
2、开放指定端口
firewall-cmd --zone=public --add-port=1935/tcp --permanent
命令含义:
–zone #作用域
–add-port=1935/tcp #添加端口,格式为:端口/通讯协议
–permanent #永久生效,没有此参数重启后失效
3、重启防火墙
firewall-cmd --reload
重启redis-server
redis-server luoconfig/redis.conf
测试事务
public class TestTX { public static void main(String[] args) { Jedis jedis = new Jedis("39.99.xxx.xx", 6379); JSONObject jsonObject = new JSONObject(); jsonObject.put("hello", "world"); jsonObject.put("name", "kuangshen"); // 开启事务 Transaction multi = jedis.multi(); String result = jsonObject.toJSONString(); // jedis.watch(result) try { multi.set("user1", result); multi.set("user2", result); // 执行事务 multi.exec(); }catch (Exception e){ // 放弃事务 multi.discard(); } finally { // 关闭连接 System.out.println(jedis.get("user1")); System.out.println(jedis.get("user2")); jedis.close(); } } }
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
springboot 2.x后 ,原来使用的 Jedis 被 lettuce 替换。
jedis:采用的直连,多个线程操作的话,是不安全的。如果要避免不安全,使用jedis pool连接池!更像BIO模式
lettuce:采用netty,实例可以在多个线程中共享,不存在线程不安全的情况!可以减少线程数据了,更像NIO模式
我们在学习SpringBoot自动配置的原理时,整合一个组件并进行配置一定会有一个自动配置类xxxAutoConfiguration,并且在spring.factories中也一定能找到这个类的完全限定名。Redis也不例外。
还存在一个RedisProperties类
源码分析
想知道如何编写配置文件然后连接Redis,就需要阅读RedisProperties,还有一些连接池相关的配置。注意使用时一定使用Lettuce的连接池,jedis不生效。
在测试类中测试Redis Template
@SpringBootTest class RedisSpringbootApplicationTests { @Autowired private RedisTemplate redisTemplate; @Test void contextLoads() { // redisTemplate 操作不同的数据类型,api和我们的指令是一样的 // opsForValue 操作字符串 类似String // opsForList 操作List 类似List // opsForHah 操作hash // 除了基本的操作,我们常用的方法都可以直接通过redisTemplate操作,比如事务和基本的CRUD // 获取连接对象 //RedisConnection connection = redisTemplate.getConnectionFactory().getConnection(); //connection.flushDb(); //connection.flushAll(); redisTemplate.opsForValue().set("mykey","kuangshen"); System.out.println(redisTemplate.opsForValue().get("mykey")); } }
存储在redis中非正常显示
默认的序列化器是采用JDK序列化器
如果对象不适用序列化则会报错
自定义redisTemplate
@Configuration public class RedisConfig { @Bean //自定义 public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { //一般使用<String, Object> RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); template.setConnectionFactory(redisConnectionFactory); //Json序列化配置 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); //String的序列化 StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // key采用String的序列化方式 template.setKeySerializer(stringRedisSerializer); // hash的key也采用String的序列化方式 template.setHashKeySerializer(stringRedisSerializer); // value序列化方式采用jackson template.setValueSerializer(jackson2JsonRedisSerializer); // hash的value序列化方式采用jackson template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
使用RedisTemplate需要频繁调用.opForxxx然后才能进行对应的操作,这样使用起来代码效率低下,工作中一般不会这样使用,而是将这些常用的公共API抽取出来封装成为一个工具类
RedisUtil
public class RedisUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; // =============================common============================ /** * 指定缓存失效时间 * @param key 键 * @param time 时间(秒) * @return */ public boolean expire(String key, long time) { try { if (time > 0) { redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据key 获取过期时间 * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */ public long getExpire(String key) { return redisTemplate.getExpire(key, TimeUnit.SECONDS); } /** * 判断key是否存在 * @param key 键 * @return true 存在 false不存在 */ public boolean hasKey(String key) { try { return redisTemplate.hasKey(key); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除缓存 * @param key 可以传一个值 或多个 */ @SuppressWarnings("unchecked") public void del(String... key) { if (key != null && key.length > 0) { if (key.length == 1) { redisTemplate.delete(key[0]); } else { redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key)); } } } // ============================String============================= /** * 普通缓存获取 * @param key 键 * @return 值 */ public Object get(String key) { return key == null ? null : redisTemplate.opsForValue().get(key); } /** * 普通缓存放入 * @param key 键 * @param value 值 * @return true成功 false失败 */ public boolean set(String key, Object value) { try { redisTemplate.opsForValue().set(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 普通缓存放入并设置时间 * @param key 键 * @param value 值 * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 * @return true成功 false 失败 */ public boolean set(String key, Object value, long time) { try { if (time > 0) { redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); } else { set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 递增 * @param key 键 * @param delta 要增加几(大于0) * @return */ public long incr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递增因子必须大于0"); } return redisTemplate.opsForValue().increment(key, delta); } /** * * 递减 * @param key 键 * @param delta 要减少几(小于0) * @return */ public long decr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递减因子必须大于0"); } return redisTemplate.opsForValue().increment(key, -delta); } // ================================Map================================= /** * HashGet * @param key 键 不能为null * @param item 项 不能为null * @return 值 */ public Object hget(String key, String item) { return redisTemplate.opsForHash().get(key, item); } /** * 获取hashKey对应的所有键值 * @param key 键 * @return 对应的多个键值 */ public Map<Object, Object> hmget(String key) { return redisTemplate.opsForHash().entries(key); } /** * HashSet * @param key 键 * @param map 对应多个键值 * @return true 成功 false 失败 */ public boolean hmset(String key, Map<String, Object> map) { try { redisTemplate.opsForHash().putAll(key, map); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * HashSet 并设置时间 * @param key 键 * @param map 对应多个键值 * @param time 时间(秒) * @return true成功 false失败 */ public boolean hmset(String key, Map<String, Object> map, long time) { try { redisTemplate.opsForHash().putAll(key, map); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * @param key 键 * @param item 项 * @param value 值 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value) { try { redisTemplate.opsForHash().put(key, item, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * @param key 键 * @param item 项 * @param value 值 * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value, long time) { try { redisTemplate.opsForHash().put(key, item, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除hash表中的值 * @param key 键 不能为null * @param item 项 可以使多个 不能为null */ public void hdel(String key, Object... item) { redisTemplate.opsForHash().delete(key, item); } /** * 判断hash表中是否有该项的值 * @param key 键 不能为null * @param item 项 不能为null * @return true 存在 false不存在 */ public boolean hHasKey(String key, String item) { return redisTemplate.opsForHash().hasKey(key, item); } /** * hash递增 如果不存在,就会创建一个 并把新增后的值返回 * @param key 键 * @param item 项 * @param by 要增加几(大于0) * @return */ public double hincr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, by); } /** * hash递减 * @param key 键 * @param item 项 * @param by 要减少记(小于0) * @return */ public double hdecr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, -by); } // ============================set============================= /** * 根据key获取Set中的所有值 * @param key 键 * @return */ public Set<Object> sGet(String key) { try { return redisTemplate.opsForSet().members(key); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 根据value从一个set中查询,是否存在 * @param key 键 * @param value 值 * @return true 存在 false不存在 */ public boolean sHasKey(String key, Object value) { try { return redisTemplate.opsForSet().isMember(key, value); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将数据放入set缓存 * @param key 键 * @param values 值 可以是多个 * @return 成功个数 */ public long sSet(String key, Object... values) { try { return redisTemplate.opsForSet().add(key, values); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 将set数据放入缓存 * @param key 键 * @param time 时间(秒) * @param values 值 可以是多个 * @return 成功个数 */ public long sSetAndTime(String key, long time, Object... values) { try { Long count = redisTemplate.opsForSet().add(key, values); if (time > 0) expire(key, time); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 获取set缓存的长度 * @param key 键 * @return */ public long sGetSetSize(String key) { try { return redisTemplate.opsForSet().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 移除值为value的 * @param key 键 * @param values 值 可以是多个 * @return 移除的个数 */ public long setRemove(String key, Object... values) { try { Long count = redisTemplate.opsForSet().remove(key, values); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } // ===============================list================================= /** * 获取list缓存的内容 * @param key 键 * @param start 开始 * @param end 结束 0 到 -1代表所有值 * @return */ public List<Object> lGet(String key, long start, long end) { try { return redisTemplate.opsForList().range(key, start, end); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 获取list缓存的长度 * @param key 键 * @return */ public long lGetListSize(String key) { try { return redisTemplate.opsForList().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 通过索引 获取list中的值 * @param key 键 * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推 * @return */ public Object lGetIndex(String key, long index) { try { return redisTemplate.opsForList().index(key, index); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, Object value) { try { redisTemplate.opsForList().rightPush(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, Object value, long time) { try { redisTemplate.opsForList().rightPush(key, value); if (time > 0) expire(key, time); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, List<Object> value) { try { redisTemplate.opsForList().rightPushAll(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, List<Object> value, long time) { try { redisTemplate.opsForList().rightPushAll(key, value); if (time > 0) expire(key, time); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据索引修改list中的某条数据 * @param key 键 * @param index 索引 * @param value 值 * @return */ public boolean lUpdateIndex(String key, long index, Object value) { try { redisTemplate.opsForList().set(key, index, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 移除N个值为value * @param key 键 * @param count 移除多少个 * @param value 值 * @return 移除的个数 */ public long lRemove(String key, long count, Object value) { try { Long remove = redisTemplate.opsForList().remove(key, count, value); return remove; } catch (Exception e) { e.printStackTrace(); return 0; } } }
启动时通过配置文件来启动
配置文件unit单位对大小写不敏感
可以使用 include 组合多个配置问题
网络
bind 127.0.0.1 #绑定ip
protected-mode yes #保护模式
port 6379 #端口设置
通用GENERAL
daemonize yes #以守护进程的方式运行,默认是no,需要开启,不然一退出就结束
pidfile /var/run/redis_6379.pid #如果以后台方式运行,需要指定一个pid文件
logfile #文件日志名
快照
持久化,在规定时间内执行多少次操作就会持久化到文件.rdb .aof
redis是内存数据库,断电即失
save 900 1 #900秒内如果至少有1个key发生变化,就进行持久化操作
save 60 10000
stop-writes-on-bgsave-error yes #持久化如果出错是否继续工作
rdbcompression yes #是否压缩rdb文件,需要消耗资源
rdbchecksum yes #保存rdb文件的时候,进行错误的检查校验
dir ./ #rdb文件保存的目录
安全security
requirepass 123 #默认没有密码
限制client
maxcilents 10000 #设置能连上redis的客户端的数量
maxmemory < bytes > # redis 设置最大内存容量
maxmemory-policy noeviction #内存到达上限之后的处理策略,六种方式
1、volatile-lru:只对设置了过期时间的key进行LRU(默认值)
2、allkeys-lru : 删除lru算法的key
3、volatile-random:随机删除即将过期key
4、allkeys-random:随机删除
5、volatile-ttl : 删除即将过期的
6、noeviction : 永不过期,返回错误
append only 模式 aof配置
appendonly no #默认不开启aof模式,默认使用rdb方式持久化
appendfilename “appendonly.aof” #持久化文件的名字
appendfysnc always #每次write后都会调用fsync,消耗性能
appendfysnc everysec #每秒调用一次fsync,可能丢失一秒的数据
appendfysnc no #不执行fsync,由操作系统自动调度刷磁盘
在指定时间间隔后,将内存中的数据集快照写入数据库 ;在恢复时候,直接读取快照文件,进行数据的恢复
在进行 RDB 的时候,redis 的主线程是不会做 io 操作的,主线程会 fork 一个子线程来完成该操作,RDB最后一次持久化的数据可能丢失
Redis 调用forks。同时拥有父进程和子进程。
子进程将数据集写入到一个临时 RDB 文件中
当子进程完成对新 RDB 文件的写入时,Redis 用新 RDB 文件替换原来的 RDB 文件,并删除旧的 RDB 文件
这种工作方式使得 Redis 可以从写时复制(copy-on-write)机制中获益(因为是使用子进程进行写操作,而父进程依然可以接收来自客户端的请求
默认情况下, Redis 将数据库快照保存在名字为 dump.rdb的二进制文件中。文件名可以在配置文件中进行自定义dbfilename dump.rbd
flushall 命令也会触发rdb规则持久化 ;
满足配置条件中的触发条件 ;
退出redis,也会产生rdb文件
bgsave和save对比
命令 save bgsave
IO类型 同步 异步
阻塞? 是 是(阻塞发生在fock(),通常非常快)
复杂度 O(n) O(n)
优点 不会消耗额外的内存 不阻塞客户端命令
缺点 阻塞客户端命令 需要fock子进程,消耗内存
恢复rdb文件
只需要将rdb文件放到redis启动目录就可以,redis启动时会自动检查rdb文件并恢复数据
查看需要存放的位置
127.0.0.1:6379>config get dir "dir" "/usr/local/bin" #如果在这个目录下存在dump.rdb文件,启动就会自动恢复其中的数据
优点:
适合大规模的数据恢复
对数据的完整性要求不高
缺点:
需要一定的时间间隔进行操作,如果redis意外宕机了,这个最后一次修改的数据就没有了。
fork进程的时候,会占用一定的内容空间。
Append Only File
将我们所有的命令都记录下来,history,恢复的时候就把这个文件全部再执行一遍
如果需要使用aof,需要修改配置文件
如果这个aof文件有错位,这时候redis是启动不起来的,需要修复这个aof文件
redis给我们提供了一个工具redis-check-aof --fix,redis-check-aof --fix appendonly.aof,错误的数据会被丢弃
优点
每一次修改都会同步,文件的完整性会更加好
每秒同步一次,可能会丢失一秒的数据
从不同步,效率最高
缺点
相对于数据文件来说,aof远远大于rdb,修复速度比rdb慢!
Aof运行效率也要比rdb慢,所以我们redis默认的配置就是rdb持久化
RDB | AOF | |
---|---|---|
启动优先级 | 低 | 高 |
体积 | 小 | 大 |
恢复速度 | 快 | 慢 |
数据安全性 | 丢数据 | 根据策略决定 |
一般来说, 如果想达到足以媲美 PostgreSQL 的数据安全性, 你应该同时使用两种持久化功能。
如果你非常关心你的数据, 但仍然可以承受数分钟以内的数据丢失, 那么你可以只使用 RDB 持久化。
有很多用户都只使用 AOF 持久化, 但并不推荐这种方式: 因为定时生成 RDB 快照(snapshot)非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比 AOF 恢复的速度要快。
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
命令
PSUBSCRIBE pattern [pattern…] 订阅一个或多个符合给定模式的频道。
PUNSUBSCRIBE pattern [pattern…] 退订一个或多个符合给定模式的频道。
PUBSUB subcommand [argument[argument]] 查看订阅与发布系统状态。
PUBLISH channel message 向指定频道发布消息
SUBSCRIBE channel [channel…] 订阅给定的一个或多个频道。
SUBSCRIBE channel [channel…] 退订一个或多个频道
测试
------------订阅端---------------------- 127.0.0.1:6379> SUBSCRIBE luo # 订阅luo频道 Reading messages... (press Ctrl-C to quit) # 等待接收消息 1) "subscribe" # 订阅成功的消息 2) "luo" 3) (integer) 1 1) "message" # 接收到来自sakura频道的消息 "hello world" 2) "luo" 3) "hello world" 1) "message" # 接收到来自luo频道的消息 "hello i am luo" 2) "luo" 3) "hello i am luo" --------------消息发布端------------------- 127.0.0.1:6379> PUBLISH luo "hello world" # 发布消息到luo频道 (integer) 1 127.0.0.1:6379> PUBLISH luo "hello i am luo" # 发布消息 (integer) 1 -----------------查看活跃的频道------------ 127.0.0.1:6379> PUBSUB channels 1) "luo"
原理
每个 Redis 服务器进程都维持着一个表示服务器状态的 redis.h/redisServer 结构, 结构的 pubsub_channels 属性是一个字典, 这个字典就用于保存订阅频道的信息,其中,字典的键为正在被订阅的频道, 而字典的值则是一个链表, 链表中保存了所有订阅这个频道的客户端。
客户端订阅,就被链接到对应频道的链表的尾部,退订则就是将客户端节点从链表中移除。
缺点
如果一个客户端订阅了频道,但自己读取消息的速度却不够快的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。
这和数据传输可靠性有关,如果在订阅方断线,那么他将会丢失所有在短线期间发布者发布的消息。
应用
消息订阅:公众号订阅,微博关注等等(起始更多是使用消息队列来进行实现)
多人在线聊天室。
稍微复杂的场景,我们就会使用消息中间件MQ处理。
概念
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(Master/Leader),后者称为从节点(Slave/Follower), 数据的复制是单向的!只能由主节点复制到从节点(主节点以写为主、从节点以读为主)。
默认情况下,每台Redis服务器都是主节点,一个主节点可以有0个或者多个从节点,但每个从节点只能由一个主节点。
作用
数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余的方式。
故障恢复:当主节点故障时,从节点可以暂时替代主节点提供服务,是一种服务冗余的方式
负载均衡:在主从复制的基础上,配合读写分离,由主节点进行写操作,从节点进行读操作,分担服务器的负载;尤其是在多读少写的场景下,通过多个从节点分担负载,提高并发量。
高可用(集群)基石:主从复制还是哨兵和集群能够实施的基础。
配置文件有一个replication模块
查看当前库的信息:info replication
127.0.0.1:6379> info replication # Replication role:master connected_slaves:0 #连接的从机数 master_failover_state:no-failover master_replid:b343b9ac18e7233142bdeb2edff7455f43ebfe9e master_replid2:0000000000000000000000000000000000000000 master_repl_offset:0 second_repl_offset:-1 repl_backlog_active:0 repl_backlog_size:1048576 repl_backlog_first_byte_offset:0 repl_backlog_histlen:0
启动多个服务,就需要多个配置文件。每个配置文件对应修改以下信息:
端口号,pid文件名,日志文件名,rdb文件名
启动单机多服务集群:
默认情况下,每台Redis服务器都是主节点;==我们一般情况下只用配置从机就好了!
认老大!一主(79)二从(80,81)
使用SLAVEOF host port就可以为从机配置主机了
127.0.0.1:6381> slaveof 127.0.0.1 6379 OK 127.0.0.1:6381> info replication # Replication role:slave master_host:127.0.0.1 master_port:6379 master_link_status:up master_last_io_seconds_ago:3 master_sync_in_progress:0 slave_repl_offset:0 slave_priority:100 slave_read_only:1 replica_announced:1 connected_slaves:0 master_failover_state:no-failover master_replid:170974f44972ee990f3172cba5d110bb79dd3742 master_replid2:0000000000000000000000000000000000000000 master_repl_offset:0 second_repl_offset:-1 repl_backlog_active:1 repl_backlog_size:1048576 repl_backlog_first_byte_offset:1 repl_backlog_histlen:0
真实开发中应该在从机的配置文件中进行配置,这样的话是永久的
规则
============================= 127.0.0.1:6380[1]> keys * 1) "k1" 127.0.0.1:6380[1]> set k2 v2 (error) READONLY You can't write against a read only replica. 127.0.0.1:6380[1]> get k1 "v1" 127.0.0.1:6380[1]>
全量复制:slave服务在接收到数据库文件后,将其存盘并加载到内存中
增量复制:Master继续将所有收集到的修改命令一次传给slave,完成同步
但只要是重新连接master,一次完全同步(全量复制)将自动进行
层层链路
及时79断开,80还是不可以进行写操作
127.0.0.1:6381> slaveof 127.0.0.1 6380
手动另外选择主机
从机手动执行命令slaveof no one,这样执行以后从机会独立出来成为一个主机
(自动选举老大模式)
主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵(Sentinel)模式。
哨兵是一个独立的进程,独立运行,原理是哨兵通过发送命令,等待Redis服务器响应,从而监听多个Redis实例
哨兵通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。
使用多个哨兵进行监控,各个哨兵之间还会进行监控,形成多哨兵模式
假设主服务宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程成为客观下线(真正把主机干掉后)。
建立一个conf文件,vim sentinel.conf
sentinel monitor luo-master 127.0.0.1 6379 1
监控的主节点的名字、IP 和端口
数字1表示 :当一个哨兵主观认为主机断开,就可以客观认为主机故障,然后开始选举新的主机。
启动哨兵模式
redis-sentinel luo-config/sentinel.conf
如果master断开,就会随机选择一个服务器(投票算法)
断开的主机回来时只能归并到新的主机下成为从机
优点:
哨兵集群,基于主从复制模式,所有主从复制的优点,它都有
主从可以切换,故障可以转移,系统的可用性更好
哨兵模式是主从模式的升级,手动到自动,更加健壮
缺点:
Redis不好在线扩容,集群容量一旦达到上限,在线扩容就十分麻烦
实现哨兵模式的配置其实是很麻烦的,里面有很多配置项
哨兵模式的全部配置
# Example sentinel.conf # 哨兵sentinel实例运行的端口 默认26379 port 26379 #如果有哨兵集群需要配置多个端口 # 哨兵sentinel的工作目录 dir /tmp # 哨兵sentinel监控的redis主节点的 ip port # master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。 # quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了 # sentinel monitor <master-name> <ip> <redis-port> <quorum> sentinel monitor mymaster 127.0.0.1 6379 1 # 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码 # 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码 # sentinel auth-pass <master-name> <password> sentinel auth-pass mymaster MySUPER--secret-0123passw0rd # 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒 # sentinel down-after-milliseconds <master-name> <milliseconds> sentinel down-after-milliseconds mymaster 30000 # 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行同步, 这个数字越小,完成failover所需的时间就越长, 但是如果这个数字越大,就意味着越多的slave因为replication而不可用。 可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。 # sentinel parallel-syncs <master-name> <numslaves> sentinel parallel-syncs mymaster 1 # 故障转移的超时时间 failover-timeout 可以用在以下这些方面: #1. 同一个sentinel对同一个master两次failover之间的间隔时间。 #2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。 #3.当想要取消一个正在进行的failover所需要的时间。 #4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了 # 默认三分钟 # sentinel failover-timeout <master-name> <milliseconds> sentinel failover-timeout mymaster 180000 # SCRIPTS EXECUTION #配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。 #对于脚本的运行结果有以下规则: #若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10 #若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。 #如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。 #一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。 #通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本, #这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数, #一个是事件的类型, #一个是事件的描述。 #如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。 #通知脚本 # sentinel notification-script <master-name> <script-path> sentinel notification-script mymaster /var/redis/notify.sh # 客户端重新配置主节点参数脚本 # 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。 # 以下参数将会在调用脚本时传给脚本: # <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port> # 目前<state>总是“failover”, # <role>是“leader”或者“observer”中的一个。 # 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的 # 这个脚本应该是通用的,能被多次调用,不是针对性的。 # sentinel client-reconfig-script <master-name> <script-path> sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
概念
在默认情况下,用户请求数据时,会先在缓存(Redis)中查找,若没找到即缓存未命中,再在数据库中进行查找,数据库没找到时查询失败,数量少可能问题不大,可是一旦大量的请求数据(例如秒杀场景)缓存都没有命中的话,就会全部转移到数据库上,造成数据库极大的压力,就有可能导致数据库崩溃。网络安全中也有人恶意使用这种手段进行攻击被称为洪水攻击。
解决方案
一次请求若在缓存和数据库中都没找到,即使放回一个空的对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据会从缓存中获取,保护后端数据源
缺点:
概念
相较于缓存穿透,缓存击穿的目的性更强,一个存在的key,同时有大量的请求,在缓存过期的一刻,这些请求都会击穿到DB,造成瞬时DB请求量大、压力骤增。这就是缓存被击穿,只是针对其中某个key的缓存不可用而导致击穿,但是其他的key依然可以使用缓存响应。
比如热搜排行上,一个热点新闻被同时大量访问就可能导致缓存击穿。
解决方案
1.设置热点数据永不过期
这样就不会出现热点数据过期的情况,但是当Redis内存空间满的时候也会清理部分数据,而且此种方案会占用空间,一旦热点数据多了起来,就会占用部分空间。
2.加互斥锁(分布式锁)
在访问key之前,采用SETNX(set if not exists)来设置另一个短期key来锁住当前key的访问,访问结束再删除该短期key。保证同时刻只有一个线程访问。这样对锁的要求就十分高。
概念
大量的key设置了相同的过期时间,导致在缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。某一时间段,缓解集中过期失效
其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某企节点宕机或断网。因为自然形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的.无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的.很有可能瞬间就把数据库压垮。
解决方案
redis高可用
这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群
限流降级
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。
扩展:
分布式锁
setnx key value 上锁,del key释放锁
锁没有释放,可以设置过期时间expir key seconds
上锁之后突然出现异常无法设置过期时间,上锁的同时设置过期时间
set key value nx ex seconds
• @GetMaping (“testLock”)
public void testLockO {
//1.取锁,setnx ,后两个参数设置过期时间
Boolean lock = redisTemplate. opsForValue (). setlfAbsent ( k: “lock”, V: “111”,3, TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock) {
Object value = redisTemplate.opsForValue(). get(num);
//2. 1.判断num 为空return
if(StringUtils. isEmpty(value)){
return; }
//2.2有值就转成成int
int num = Integer, parseinS: value+”");
/,2. 3把redis的numMl 1
redisTemplate. opsForValue() . set("num”, ++num);
「2 4释放二.del
redisTemplate, delete ( key: “lock”); }
else{
//3.获取锁失败,隔0.1秒后再获取
try{
Thread.sleep(100);
testLock();
}catch(InterrruptExpection e){
e.printStackTrace();
}}}
存在问题一,误删其他人的锁
解决:
通过uuid表示不同操作set key uuid nx ex seconds,然后比较锁的uuid和当前的uuid
String uuid = UUID.randomUUID().toString()
Boolean lock = redisTemplate.opsForValue (). setlfAbsent ( k: “lock”, V: "111”,3, TimeUnit.SECONDS);
···
String lockUuid = (String)redisTemplate.opsForValue ().get(“lock”)
if(uuid.equals(lockUuid)) …
问题二,不保证原子性,lua脚本保证原子性