Redis教程

Redis6.0学习笔记

本文主要是介绍Redis6.0学习笔记,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

文章目录

    • 一、Redis概述入门
      • 1、NoSQL概述
      • 2、Redis介绍
      • 3、Redis安装
      • 4、Redis压力测试
      • 5、基础知识
    • 二、Redis五大基本数据类型
      • 1、Redis-key
      • 2、String类型
      • 3、列表List
      • 4、集合Set
      • 5、哈希Hash
      • 6、有序集合Zset
    • 三、Redis三种特殊数据类型
      • 1、GEO地理位置
      • 2、Hyperloglog
      • 3、Bitmaps
    • 四、事务
    • 五、Java连接Redis操作
      • 1、Jedis
      • 2、SpringBoot整合Redis
    • 六、Redis.conf配置信息
    • 七、Redis持久化
      • 1、RDB(Redis DataBase)
      • 2、AOF(Append Only File)
      • 3、总结
    • 八、Redis发布订阅
    • 九、Redis主从、哨兵和集群
      • 1、主从复制
      • 2、哨兵模式
      • 3、Redis集群
    • 十、Redis缓存
      • 1、缓存穿透
      • 2、缓存击穿
      • 3、缓存雪崩

一、Redis概述入门

1、NoSQL概述

NoSQL是指not only Sql,是一种非关系型数据库。其中NoSQL共有四种分类

  • KV键值
  • 文档型数据库(bson、MongoDB)
  • 列存储数据库(HBase、分布式文件系统)
  • 图关系数据库(存放关系、例如Neo4j)

2、Redis介绍

Redis(Remote Dictionary Server ),即远程字典服务,是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。

Redis官网:https://redis.io/

Redis中文官网:http://www.redis.cn/

3、Redis安装

windows安装:https://github.com/dmajkic/redis/downloads(不推荐win开发)

Linux安装:

# 从官网下载redis最新版
wget https://download.redis.io/releases/redis-6.2.4.tar.gz
#移动到opt目录下
mv redis-6.2.4.tar.gz /opt/
# 解压即可
tar -zxvf redis-6.2.4.tar.gz
#安装基本环境
yum install gcc-c++
#进入安装包
cd redis-6.2.4/
# 进行编译安装,Redis默认安装路径(和大多数软件一样) /usr/local/bin
make
make install
#进入redis服务目录
cd /usr/local/bin
#创建配置文件目录
mkdir conf
#将/opt/redis-6.2.4/redis.conf进行备份
cp /opt/redis-6.2.4/redis.conf conf/myredis.conf
#修改为后台启动,进入myredis.conf修改daemonize为yes
redis-server conf/myredis.conf 
#客户端连接测试
redis-cli -p 6379
#关闭程序,cli中先shutdown,后exit
#查看进程
ps -ef|grep redis

4、Redis压力测试

Redis-benchmark官方默认压测工具

序号选项描述默认值
1-h指定服务器主机名127.0.0.1
2-p指定服务器端口号6379
3-s指定服务器socket
4-c指定并发连接数50
5-n指定请求数10000
6-d以字节的形式指定SET/GET值的数据大小3
7-k1=keep alive 0=reconnect1
8-rSET/GET/INCR使用随机key,SADD使用随机值
9-P通过管道传输1
10-q强制退出redis。仅显示query/sec值
11–csv以CSV格式输出
12-l生成循环,永久执行
13-t仅运行以逗号分隔的测试命令列表
14-IIdle模式。仅打开N个idle连接并等待
# 开启服务后在当前目录进行测试
redis-benchmark -h localhost -p 6379 -c 100 -n 100000

5、基础知识

redis默认16个数据库,默认使用第一个,使用select进行切换数据库,Redis6之前是单线程的,因为Redis是基于内存的操作,CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存的大小或者网络带宽,而单线程复杂度低,又不需要CPU上下文切换,也无需加锁。而在Redis6开始支持多线程,默认仍然是不开启,开启需要在redis.conf进行设置,其中Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程顺序执行。

127.0.0.1:6379> PING
PONG
#切换数据库
127.0.0.1:6379> SELECT 1
OK
127.0.0.1:6379[1]> DBSIZE
(integer) 0
127.0.0.1:6379[1]> set name shawn
OK
127.0.0.1:6379[1]> get name
"shawn"
127.0.0.1:6379[1]> keys *
1) "name"
#清除数据库
127.0.0.1:6379[1]> FLUSHDB
OK
127.0.0.1:6379[1]> keys *
(empty array)
#清除全部数据库
127.0.0.1:6379[1]> FLUSHALL
OK
#关闭服务并退出
127.0.0.1:6379[1]> SHUTDOWN
not connected> exit

二、Redis五大基本数据类型

Redis 是一种开源(BSD 许可)、内存中数据结构存储,用作数据库、缓存和消息代理。 Redis 提供了诸如字符串、散列、列表、集合、带范围查询的排序集合、位图、超级日志、地理空间索引和流等数据结构。 Redis 内置复制、Lua 脚本、LRU 驱逐、事务和不同级别的磁盘持久化,并通过 Redis Sentinel 和 Redis Cluster 自动分区提供高可用性。

Redis有五大基本数据类型:

  • String(字符串类型)
  • Hash(哈希,类似java的Map)
  • List(列表)
  • Set(集合)
  • ZSet(有序集合)

1、Redis-key

127.0.0.1:6379> set name shawn
OK
127.0.0.1:6379> keys *
1) "name"
127.0.0.1:6379> exists name #是否存在
(integer) 1
127.0.0.1:6379> type name #类型
string
127.0.0.1:6379> move name 1
(integer) 1
127.0.0.1:6379> set age 1
OK
127.0.0.1:6379> keys *
1) "age"
127.0.0.1:6379> expire age 10 #设置过期时间
(integer) 1
127.0.0.1:6379> ttl age #查看还有多久过期
(integer) 7
127.0.0.1:6379> get age
(nil)

2、String类型

# ======================================================
# set、get、del、append、strlen
# ======================================================
127.0.0.1:6379> set name shawn
OK
127.0.0.1:6379> append name ,hello #追加
(integer) 11
127.0.0.1:6379> strlen name #字符串长度
(integer) 11
127.0.0.1:6379> get name 
"shawn,hello"
127.0.0.1:6379> del name #删除
(integer) 1
127.0.0.1:6379> keys *
(empty array)
# ======================================================
# incr、decr      一定要是数字才能进行加减,+1 和 -1。
# incrby、decrby  命令将 key 中储存的数字加上指定的增量值。
# ======================================================
127.0.0.1:6379> set views 0
OK
127.0.0.1:6379> incr views #自增1
(integer) 1
127.0.0.1:6379> decr views #自减1
(integer) 0
127.0.0.1:6379> incrby views 10 #自增10
(integer) 10
127.0.0.1:6379> decrby views 5 #自减5
(integer) 5
127.0.0.1:6379> get views
"5"
# ======================================================
# range [范围]
# getrange 获取指定区间范围内的值,类似between...and的关系,从零到负一表示全部
# setrange 设置指定区间范围内的值,格式是setrange key值 具体值
# ======================================================
127.0.0.1:6379> set name hello,shawn
OK
127.0.0.1:6379> getrange name 6 11
"shawn"
127.0.0.1:6379> setrange name 6 shanw22
(integer) 13
127.0.0.1:6379> get name
"hello,shanw22"
# ======================================================
# setex(set with expire) 设置过期时间
# setnx(set if not exist)不存在就设置(分布式锁常用)
# ======================================================
127.0.0.1:6379> setex key1 30 hello #设置key1值为hello,过期时间30s
OK
127.0.0.1:6379> ttl key1
(integer) 25
127.0.0.1:6379> setnx key1 hello #过期后成功进行设置
(integer) 1
127.0.0.1:6379> setnx key1 hello #设置失败
(integer) 0
# ======================================================
# mset   同时设置多组k-v
# mget   同时获取多组k-v
# msetnx 当所有 key 都成功设置,返回 1。如果所有给定 key 都设置失败(至少有一个 key 已经存在),那么返回 # 0。该操作为原子性操作,要么都成功,要么失败
# ======================================================
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3
OK
127.0.0.1:6379> keys *
1) "k3"
2) "k2"
3) "k1"
127.0.0.1:6379> msetnx k1 v1 k4 v4 #原子操作
(integer) 0
127.0.0.1:6379> keys *
1) "k3"
2) "k2"
3) "k1"
# 可以缓存对象
127.0.0.1:6379> msetnx user:1:name shawn user:1:age 18
(integer) 1
127.0.0.1:6379> mget user:1:name user:1:age
1) "shawn"
2) "18"
# ======================================================
# getset(先get再set)
# ======================================================
127.0.0.1:6379> getset db redis
(nil)
127.0.0.1:6379> getset db mysql
"redis"
#=======================================================
#Redis中的Value可以是字符串,也可以是数字

3、列表List

list相当于双向链表,可以用作队列,也可以作栈,可以做消息队列,在两端操作效率高,最中间操作效率会低

# ======================================================
# Lpush:将一个或多个值插入到列表头部。(左)
# rpush:将一个或多个值插入到列表尾部。(右)
# lrange:返回列表中指定区间内的元素,区间以偏移量 START 和 END 指定。
# 其中 0 表示列表的第一个元素, 1 表示列表的第二个元素,以此类推。
# 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。 
# lpop 命令用于移除并返回列表的第一个元素。当列表 key 不存在时,返回 nil
# rpop 移除列表的最后一个元素,返回值为移除的元素
# ======================================================
127.0.0.1:6379> lpush list one
(integer) 1
127.0.0.1:6379> lpush list two
(integer) 2
127.0.0.1:6379> lrange list 0 -1 #获取list中的值
1) "two"
2) "one"
127.0.0.1:6379> rpush list three
(integer) 3
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
3) "three"
127.0.0.1:6379> lpop list
"two"
127.0.0.1:6379> rpop list
"three"
# ======================================================
# lindex,按照索引下标获得元素(-1代表最后一个,0代表是第一个)
# llen 用于返回列表的长度。
# lrem key 根据参数 COUNT 的值,移除列表中与参数 VALUE 相等的元素
# ltrim key 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被 删除。
# rpoplpush 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
# lset key index value 将列表 key 下标为 index 的元素的值设置为 value
# ======================================================
127.0.0.1:6379> lindex list 0
"one"
127.0.0.1:6379> llen list
(integer) 1
127.0.0.1:6379> lrem list 2 one #移除2个one的值,这里只有一个,故删除了1个
(integer) 1
127.0.0.1:6379> rpoplpush list mylist
"hello"
127.0.0.1:6379> lset list 0 hi #第0个value更新,key不存在会报错
OK
# ======================================================
# linsert key before/after pivot value 用于在列表的元素前或者后插入元素 
# 将值 value 插入到列表 key 当中,位于值 pivot 之前或之后。
# ======================================================
127.0.0.1:6379> lrange list 0 -1
1) "hi"
2) "hello1"
127.0.0.1:6379> linsert list after hi new #在hi后插入new
(integer) 3
127.0.0.1:6379> lrange list 0 -1
1) "hi"
2) "new"
3) "hello1"

4、集合Set

set中的值不能重复,是无序不重复的

# ======================================================
# sadd 将一个或多个成员元素加入到集合中,不能重复
# smembers 返回集合中的所有的成员。
# sismember 命令判断成员元素是否是集合的成员。
# scard,获取集合里面的元素个数
# rem key value 用于移除集合中的一个或多个成员元素
# ======================================================
127.0.0.1:6379> sadd myset hello
(integer) 1
127.0.0.1:6379> sadd myset shawn
(integer) 1
127.0.0.1:6379> smembers myset 
1) "shawn"
2) "hello"
127.0.0.1:6379> sismember myset hello
(integer) 1
127.0.0.1:6379> scard myset
(integer) 2
127.0.0.1:6379> srem myset hello
(integer) 1
# ======================================================
# randmember key 命令用于返回集合中的一个随机元素。
# spop key 用于移除集合中的指定 key 的一个或多个随机元素
# smove SOURCE DESTINATION MEMBER,将指定成员 member 元素从 source 集合移动到 destination 集合。
# 数字集合类 差集:sdiff;交集:sinter;并集:sunion(社交软件共同关注等操作)
# ======================================================
127.0.0.1:6379> sadd k1 a b c
(integer) 3
127.0.0.1:6379> sadd k2 b c d
(integer) 3
127.0.0.1:6379> sdiff k1 k2
1) "a"
127.0.0.1:6379> sinter k1 k2
1) "b"
2) "c"
127.0.0.1:6379>  sunion k1 k2
1) "a"
2) "c"
3) "b"
4) "d"

5、哈希Hash

Map集合,相当于key-Map,通常来存储经常变动的对象

# ======================================================
# hset、hget 命令用于为哈希表中的字段赋值 。
# hmset、hmget 同时将多个field-value对设置到哈希表中。会覆盖哈希表中已存在的字段。 # hgetall 用于返回哈希表中,所有的字段和值。
# hdel    用于删除哈希表 key 中的一个或多个指定字段
# ======================================================
127.0.0.1:6379> hset myhash field shawn
(integer) 1
127.0.0.1:6379> hget myhash field
"shawn"
127.0.0.1:6379> hmset myhash field hello field1 world
OK
127.0.0.1:6379> hmget myhash field  field1 
1) "hello"
2) "world"
127.0.0.1:6379> hgetall myhash
1) "field"
2) "hello"
3) "field1"
4) "world"
127.0.0.1:6379> hdel myhash field
(integer) 1
# ======================================================
# hlen 获取哈希表中字段的数量。
# hexists 查看哈希表的指定字段是否存在。
# hkeys 获取哈希表中的所有域(field)。
# hvals 返回哈希表所有域(field)的值。
# ======================================================
127.0.0.1:6379> hlen myhash #字段数
(integer) 1
127.0.0.1:6379> hexists myhash field
(integer) 0
127.0.0.1:6379> hkeys myhash
1) "field1"
127.0.0.1:6379> hvals myhash
1) "world"
# ======================================================
# hincrby 为哈希表中的字段值加上指定增量值
# hsetnx 为哈希表中不存在的的字段赋值
# ======================================================
127.0.0.1:6379> hset myhash field 1
(integer) 1
127.0.0.1:6379> hincrby myhash field 1
(integer) 2
127.0.0.1:6379> hsetnx myhash field shawn
(integer) 0

6、有序集合Zset

Zset增加了权重参数score,可以用来设置任务的重要程度,例如排行榜应用,Top N

# ======================================================
# zadd    将一个或多个成员元素及其分数值加入到有序集当中。
# zrange  返回有序集中,指定区间内的成员
# ======================================================
127.0.0.1:6379> zadd myset 1 one 2 two
(integer) 2
(integ127.0.0.1:6379> zrange myset 0 -1
1) "one"
2) "two"
# ======================================================
# zrangebyscore 返回有序集合中指定分数区间的成员列表。有序集成员按分数值递增(从小到大)次序排列。

# ======================================================
127.0.0.1:6379> zadd salary 2500 Amy 3500 Mike 200 Shawn
(integer) 3
127.0.0.1:6379> zrangebyscore salary -inf +inf #正序
1) "Shawn"
2) "Amy"
3) "Mike"
127.0.0.1:6379> zrangebyscore salary -inf 2500 WITHSCORES #带上score进行查询
1) "Shawn"
2) "200"
3) "Amy"
4) "2500"
# ======================================================
# zrem 移除有序集中的一个或多个成员
# zcard   命令用于计算集合中元素的数量。
# zcount  计算有序集合中指定分数区间的成员数量。
# zrank  返回有序集中指定成员的排名。其中有序集成员按分数值递增(从小到大)顺序排列。
# zrevrank 返回有序集中成员的排名。其中有序集成员按分数值递减(从大到小)排序。
# ======================================================
127.0.0.1:6379> zrem salary Shawn
(integer) 1
127.0.0.1:6379> zcard salary
(integer) 2
127.0.0.1:6379> zcount salary -inf 2500
(integer) 1
127.0.0.1:6379> zrank salary Mike #Mike的薪水排名
(integer) 1
127.0.0.1:6379> zrevrank salary Mike
(integer) 0

三、Redis三种特殊数据类型

1、GEO地理位置

GEO 的数据结构总共有六个常用命令:geoadd、geopos、geodist、georadius、 georadiusbymember、gethash
官方文档:https://www.redis.net.cn/order/3685.html

因为存在中文,redis客户端启动时命令redis-cli -p 6379 --raw

geoadd

# 语法
geoadd key longitude latitude member ...
# 将给定的空间元素(纬度、经度、名字)添加到指定的键里面。
# 这些数据会以有序集he的形式被储存在键里面,从而使得georadius和georadiusbymember这样的命令可以在之后通过位置查询取得这些元素。
# geoadd命令以标准的x,y格式接受参数,所以用户必须先输入经度,然后再输入纬度。
# geoadd能够记录的坐标是有限的:非常接近两极的区域无法被索引。
# 有效的经度介于-180-180度之间,有效的纬度介于-85.05112878 度至 85.05112878 度之间。当用户尝试输入一个超出范围的经度或者纬度时,geoadd命令将返回一个错误。
#===============================================
127.0.0.1:6379> geoadd china:city 116.23 40.22 北京
(integer) 1
127.0.0.1:6379> geoadd china:city 106.54 29.40 重庆 108.93 34.23 西安 114.02 30.58 武汉
(integer) 3

geopos

# 语法
geopos key member [member...]
#从key里返回所有给定位置元素的位置(经度和纬度)
#===============================================
127.0.0.1:6379> geopos china:city 北京
1) 1) "116.23000055551528931"
   2) "40.2200010338739844"

geodist

# 指定单位的参数 unit 必须是以下单位的其中一个:
# m 表示单位为米。
# km 表示单位为千米。
# mi 表示单位为英里。
# ft 表示单位为英尺。
# 如果用户没有显式地指定单位参数, 那么 GEODIST 默认使用米作为单位。
#==================================================
127.0.0.1:6379> geodist china:city 北京 重庆 km
"1491.6716"

georadious

以给定的经纬度为中心, 找出某一半径内的元素

# 附近范围内查询,比如附近的人功能的实现,count限制查询出来的数量
127.0.0.1:6379> georadius china:city 100 30 1000 km 
重庆
西安
127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord withdist count 2
重庆
635.2850
106.54000014066696167
29.39999880018641676
西安
963.3171
108.92999857664108276
34.23000121926852302

georadiusbymember

#找出指定元素旁边的位置
127.0.0.1:6379> georadiusbymember china:city 北京 1000 km
北京
西安

geohash

该命令将返回11个字符的Geohash字符串

# Redis使用geohash将二维经纬度转换为一维字符串,字符串越长表示位置更精确,两个字符串越相似表示距离越近。很少使用
127.0.0.1:6379> geohash china:city 北京 重庆
wx4sucu47r0
wm5z22h53v0

zrem

# geo底层使用了zset,故可以用此方法进行删除
127.0.0.1:6379> zrange china:city 0 -1
重庆
西安
武汉
北京
127.0.0.1:6379> zrem china:city 北京
1

2、Hyperloglog

Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的,固定12KB。可以用来计数网站用户量(允许小量容错)

127.0.0.1:6379> pfadd mykey a b c d e f g #创建第一组元素
(integer) 1
127.0.0.1:6379> pfcount mykey #统计元素的基数数量
(integer) 7
127.0.0.1:6379> pfadd mykey1 s f v b r t y u a  #创建第二组
(integer) 1
127.0.0.1:6379> pfmerge mykey2 mykey mykey1 #并集
OK
127.0.0.1:6379> pfcount mykey2
(integer) 12

3、Bitmaps

位存储。统计用户信息,活跃,不活跃,未登录等两个状态,都可以使用Bitmaps(只有0和1)

# 使用 bitmap 来记录上述事例中一周的打卡记录如下所示:
# 周一:1,周二:0,周三:0,周四:1,周五:1,周六:0,周天:0 (1 为打卡,0 为不打卡)
127.0.0.1:6379> setbit sign 0 1
(integer) 0
127.0.0.1:6379> setbit sign 1 1
(integer) 0
127.0.0.1:6379> setbit sign 2 0
(integer) 0
127.0.0.1:6379> setbit sign 3 0
(integer) 0
127.0.0.1:6379> setbit sign 4 1
(integer) 0
127.0.0.1:6379> setbit sign 5 1
(integer) 0
127.0.0.1:6379> setbit sign 6 0
(integer) 0
127.0.0.1:6379> getbit sign 1 #查询某一天是否打卡
(integer) 1  
127.0.0.1:6379> bitcount sign  #统计本周打卡天数
(integer) 4

四、事务

Redis中,单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。若为编译型错误,则事务无法执行。事务的执行是按顺序执行的,且事务没有隔离级别概念。

Redis事务:

  • 开启事务()
  • 命令入队()
  • 执行事务()
127.0.0.1:6379> multi #开启事务
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> get k1
QUEUED
127.0.0.1:6379(TX)> exec #执行事务
1) OK
2) OK
3) "v1"
#=================================
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> discard  #放弃事务
OK

悲观锁

悲观锁(Pessimistic Lock),顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿到这个数据就会block直到它拿到锁。传统的关系型数据库里面就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在操作之前先上锁。

乐观锁

乐观锁(Optimistic Lock),顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁。但是在更新的时候会判断一下再此期间别人有没有去更新这个数据,可以使用版本号等机制,乐观锁适用于多读的应用类型,这样可以提高吞吐量,乐观锁策略:提交版本必须大于记录当前版本才能执行更新。

# 用watch监视,成功就修改,可以用来做乐观锁
127.0.0.1:6379> watch money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set money 100
QUEUED
#此时新开一个客户端,运行
127.0.0.1:6379> set money 500
OK
#回到第一个,执行事务,发现监视内容发送变化,修改失败
127.0.0.1:6379(TX)> exec
(nil)
# 若要放弃监视,使用unwatch
# 一但执行 EXEC 开启事务的执行后,无论事务使用执行成功, WARCH 对变量的监控都将被取消。故当事务执行失败后,需重新执行WATCH命令对变量进行监控,并开启新的事务进行操作。

五、Java连接Redis操作

1、Jedis

Jedis是Redis官方推荐的Java连接开发工具。

首先新建一个空的maven项目

<!--进入maven仓库查找最新版-->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.6.0</version>
</dependency>
// 成功连接,输出pong,jedis中已经集成了常用的API,使用.即可查询
public static void main(String[] args) {
        //连接本地的 Redis 服务
        Jedis jedis = new Jedis("localhost",6379);
        // 如果 Redis 服务设置了密码,需要下面这行,没有就不需要
        // jedis.auth("123456");
        System.out.println("连接成功");
        //查看服务是否运行
        System.out.println("服务正在运行: "+jedis.ping());
    }

2、SpringBoot整合Redis

简单使用

首先在pom.xml中导入依赖

<!--spring2.0后底层使用lettuce,性能更高,2.0之前采用jedis-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置application.yml

#配置redis
spring:
  redis:
    host: 127.0.0.1
    port: 6379

测试

@SpringBootTest
class RedisSpringApplicationTests {
	//redisTemplate 操作不同的数据类型,api和我们的指令是一样的
	//opsForValue 操作字符申类似string
	//opsForList 操作list 类List
	//opsForSet
	//opsForHash
	//opsForZSet
	//opsForGeo
	//opsForHyperLogLog
	@Autowired
	RedisTemplate<String, String> redisTemplate;
	@Test
	void contextLoads() {
		redisTemplate.opsForValue().set("k","v");
	}
}

源码分析

External Libraries中找到Redis的自动配置类,在RedisProperties.class也可以看到配置信息

image-20210607170901524

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({RedisOperations.class})
@EnableConfigurationProperties({RedisProperties.class})
@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
    public RedisAutoConfiguration() {
    }
	@Bean
	//我们可以自定义一个redisTemplate替换默认。下面注解意思是如果Spring容器中有了RedisTemplate对象了,这个自动配置的RedisTemplate不会实例化。
    @ConditionalOnMissingBean(name = {"redisTemplate"})
    @ConditionalOnSingleCandidate(RedisConnectionFactory.class)
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        //默认的没有进行过多操作,也没有序列化,不能进行对象传输
        //我们一般使用<String,Object>
        RedisTemplate<Object, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
    @Bean
    @ConditionalOnMissingBean
	//String类型常用,单独提出来一个方法
    @ConditionalOnSingleCandidate(RedisConnectionFactory.class)
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}

工具封装(可直接使用)

自定义RedisTemplate

@Configuration
public class RedisConfig {
    // 编写自己的RedisTemplate
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        //序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper om = new ObjectMapper();
        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
        om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        //String的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash采用String序列方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}

创建工具类

@Component
public final class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // =============================common============================
    /**
     * 指定缓存失效时间
     * @param key  键
     * @param time 时间(秒)
     */
    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }

    /**
     * 判断key是否存在
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除缓存
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
            }
        }
    }

    // ============================String=============================
    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 普通缓存放入并设置时间
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */
    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 递增
     * @param key   键
     * @param delta 要增加几(大于0)
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 递减
     * @param key   键
     * @param delta 要减少几(小于0)
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    // ================================Map=================================
    /**
     * HashGet
     * @param key  键 不能为null
     * @param item 项 不能为null
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     * @param key 键
     * @param map 对应多个键值
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * HashSet 并设置时间
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }

    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }

    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key  键
     * @param item 项
     * @param by   要增加几(大于0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }

    /**
     * hash递减
     *
     * @param key  键
     * @param item 项
     * @param by   要减少记(小于0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }

    // ============================set=============================
    /**
     * 根据key获取Set中的所有值
     * @param key 键
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将数据放入set缓存
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 将set数据放入缓存
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0) {
                expire(key, time);
            }
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 获取set缓存的长度
     *
     * @param key 键
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */
    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    // ===============================list=================================
    /**
     * 获取list缓存的内容
     *
     * @param key   键
     * @param start 开始
     * @param end   结束 0 到 -1代表所有值
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取list缓存的长度
     *
     * @param key 键
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 通过索引 获取list中的值
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */
    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N个值为value
     *
     * @param key   键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */
    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
}

六、Redis.conf配置信息

config get *获取全部配置信息

配置文件信息在/opt/redis-6.2.4/redis.conf,常用配置信息如下

# 绑定IP,这里指所有ipv4和ipv6都可以访问
bind * -::* 
# 受保护的 默认开启,若想外网连接必须关闭
protected-mode yes
# 端口号
port 6379
# 客户端闲置N秒后关闭连接(0禁用)
timeout 0
# 向客户端发送 TCP ACK 检测连接是否断开,保证连接活跃。单位秒,默认300秒发送一次,如果等于0 就是禁用。
tcp-keepalive 300
#==================general=================
# 默认情况下,Redis不会作为守护程序运行。如果需要,请设置为 yes
daemonize yes
# 可通过upstart和systemd管理Redis守护进程
supervised no
# 以后台进程方式运行redis,则需要指定pid文件
pidfile /var/run/redis_6379.pid
# 日志级别
loglevel notice
# 指定日志文件名称。指定为空时将输出到标准输出设备中。如果Redis以守护进程启动,当日志文件名称为空时,日志将会输出到 /dev/null。
logfile ""
# 数据库个数
databases 16
# redis 启动的时候显示日志
always-show-logo no
#==================snapshotting 快照=================
save 900 1 #900s有一个key发生改变,触发save
save 300 10 #300s有10个key发生改变,触发save
save 60 10000 #60s有10000个key发生改变,触发save
# 默认值为yes。当启用了RDB且最后一次后台保存数据失败,Redis是否停止接收数据。
stop-writes-on-bgsave-error yes
# 使用压缩rdb文件 yes:压缩,但是需要一些cpu的消耗。no:不压缩,需要更多的磁盘空间
rdbcompression yes
# 是否校验rdb文件,更有利于文件的容错性,但是在保存rdb文件的时候,会有大概10%的性能损耗
rdbchecksum yes
# rdb 文件得文件名称
dbfilename dump.rdb
# rdb文件是否删除同步锁
rdb-del-sync-files no
# 设置 rdb 文件存放得路径
dir ./
#==================replication 主从复制=================
#当本机为从服务时,设置主服务的IP及端口
replicaof <masterip> <masterport>
#当本机为从服务时,设置主服务的连接密码。
masterauth <master-password>
#本机为从服务时,设置主服务的用户名。
masteruser <username>
#当slave失去与master的连接,或正在拷贝中,如果为yes,slave会响应客户端的请求,数据可能不同步甚至没有数据,如果为no,slave会返回错误"SYNC with master in progress"
replica-serve-stale-data yes
#如果为yes,slave实例只读,如果为no,slave实例可读可写。
replica-read-only yes
#指定slave定期ping master的周期,默认10秒钟。
repl-ping-replica-period 10
#从服务ping主服务的超时时间,若超过repl-timeout设置的时间,slave就会认为master已经宕了。
repl-timeout 60
#在slave和master同步后(发送psync/sync),后续的同步是否设置成TCP_NODELAY.假如设置成yes,则redis会合并小的TCP包从而节省带宽,但会增加同步延迟(40ms),造成master与slave数据不一致 假如设置成no,则redis master会立即发送同步数据,没有延迟。
repl-disable-tcp-nodelay no
#当 master 不能正常工作的时候,Redis Sentinel 会从 slaves 中选出一个新的 master,这个值越小,就越会被优先选中,但是如果是 0 那是意味着这个 slave 不可能被选中。默认优先级为 100。
replica-priority 100
#==================security 安全=================
#ACL日志的最大长度,默认是128M
acllog-max-len 128
#ACL外部配置文件所在位置
aclfile /etc/redis/users.acl
#当前redis服务的访问密码,默认是不需要密码
requirepass 123456
#也可以命令行设置
config set requirepass "123456"
#测试ping,发现需要验证127.0.0.1:6379> ping
NOAUTH Authentication required. # 验证
127.0.0.1:6379> auth 123456
OK
#==================限制=================
# 设置最大客户连接数
maxclients 10000
# 内存限制字节数
maxmemory <bytes>
# maxmemory-policy 内存达到上限的处理策略
#volatile-lru:利用LRU算法移除设置过过期时间的key。
#volatile-random:随机移除设置过过期时间的key。
#volatile-ttl:移除即将过期的key,根据最近过期时间来删除(辅以TTL)  
#allkeys-lru:利用LRU算法移除任何key。
#allkeys-random:随机移除任何key。
#noeviction:不移除任何key,只是返回一个写错误。
maxmemory-policy noeviction
#==================append only模式=================
#Redis的持久化存储提供两种方式:RDB与AOF。RDB是默认配置(常用)AOF需要手动开启
appendonly no
# 配置文件名字
appendfilename "appendonly.aof"
# appendfsync aof持久化策略的配置
# no表示不执行fsync,由操作系统保证数据同步到磁盘,速度最快
# always表示每次写入都执行fsync,以保证数据同步到磁盘
# everysec表示每秒执行一次fsync,可能会导致丢失这1s数据
appendfsync everysec
#重写时是否可以运用Appendfsync,用默认no即可,保证数据安全性
No-appendfsync-on-rewrite no
# 设置重写的基准值
Auto-aof-rewrite-min-size 100
#设置重写的基准值
Auto-aof-rewrite-percentage 64mb
#==================cluster 集群=====================
# 启用集群模式
cluster-enabled yes      
# 设置当前节点连接超时毫秒数
cluster-node-timeout 15000     
#设置当前节点集群配置文件路径
cluster-config-file node_6381.conf             

七、Redis持久化

Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以 Redis 提供了持久化功能!

1、RDB(Redis DataBase)

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里

Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失,且备份时需要消耗内存。

image-20210605140429685

RDB快照

# 对于RDB来说,提供了三种机制:save、bgsave、自动触发。
# 自动触发在redis.conf下进行配置
# 三种情况保存的rdb文件可以进行配置,默认在当前目录
127.0.0.1:6379> bgsave
Background saving started
127.0.0.1:6379> save
OK
# 若要恢复Redis数据,只需要将dump.rdb文件放到对应dir目录下,Redis会自动进行数据恢复
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin"

2、AOF(Append Only File)

以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。

#若需要使用aof,需要在配置信息里开启
#aof正常恢复
#将有数据的aof文件复制一份保存到对应目录(config get dir)恢复:重启redis然后重新加载
#若aof文件异常,redis将无法启动,可进行修复
redis-check-aof --fix appendonly.aof

3、总结

1、RDB 持久化方式能够在指定的时间间隔内对数据进行快照存储
2、AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis 协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。
3、只做缓存,可以不使用任何持久化
4、同时开启两种持久化方式时

  • 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。
  • RDB 的数据不实时,同时使用两者时服务器重启也只会找AOF文件,建议不要只使用AOF,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在的Bug,留着作为一个万一的手段。

5、性能建议

  • 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留 save 900 1 这条规则。
  • 如果Enable AOF ,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite 的最后将 rewrite 过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。
  • 如果不Enable AOF ,仅靠 Master-Slave Repllcation 实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。代价是如果Master/Slave 同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个 Master/Slave 中的 RDB文件,载入较新的那个,微博就是这种架构。

八、Redis发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。 Redis 客户端可以订阅任意数量的频道。

image-20210605195534013

redis发布订阅常用命令

序号命令描述
1PSUBSCRIBE pattern [pattern …]订阅一个或多个符合给定模式的频道
2PUBSUB subcommand [argument [argument …]]查看订阅与发布系统状态
3PUBLISH channel message将信息发送到指定的频道
4PUNSUBSCRIBE [pattern [pattern …]]退订所有给定模式的频道
5SUBSCRIBE channel [channel …]订阅给定的一个或多个频道的信息
6UNSUBSCRIBE [channel [channel …]]退订给定的频道

测试

#开启一个客户端,订阅一个频道
127.0.0.1:6379> SUBSCRIBE shawn
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "shawn"
3) (integer) 1
#打开另一个客户端,发送消息
127.0.0.1:6379> PUBLISH shawn hello
(integer) 1
#第一个客户端收到订阅消息
1) "message"
2) "shawn"
3) "hello"

原理

  • Redis是使用C实现的,通过分析 Redis 源码里的 pubsub.c 文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解

  • Redis 通过 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能

  • 通过 SUBSCRIBE 命令订阅某频道后,redis-server 里维护了一个字典,字典的键就是一个个 channel ,而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。SUBSCRIBE 命令的关键,就是将客户端添加到给定 channel 的订阅链表中

  • 通过 PUBLISH 命令向订阅者发送消息,redis-server 会使用给定的频道作为键,在它所维护的 channel 字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者

  • Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe),在Redis中,你可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息

使用场景

  • Pub/Sub构建实时消息系统
  • Pub/Sub构建的实时聊天系统

九、Redis主从、哨兵和集群

这里实验都在一台机器上,故只修改端口,正式操作时应该分布在不同的机器中

1、主从复制

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。 Master以写为主,Slave 以读为主。默认每台Redis服务器都是主节点,单台Redis内存不应超过20G。

对于读多写少的电商

主从复制作用

  • 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
  • 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
  • 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
  • 高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。

环境配置

#查看信息
127.0.0.1:6379> INFO replication
# Replication
role:master
connected_slaves:0
master_failover_state:no-failover
master_replid:c75ea02227de8882aa3c60c9b22559e3076270b0
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

配置主从复制,至少一主二从

#生成三份配置文件,myredis.conf这里我已经存在了
cp conf/myredis.conf conf/myredis01.conf 
cp conf/myredis.conf conf/myredis02.conf
#其次修改配置文件,下面是我其中一个配置
#依次修改port端口号、daemonize为yes、pidfile文件、logfile文件、dbfilename文件
port 6370
daemonize yes
pidfile /var/run/redis_6370.pid
logfile "6370.log"
dbfilename "dump6370.rdb"
#保证文件不会重复,最后开启服务,开启三个终端
redis-server conf/myredis.conf 
redis-server conf/myredis01.conf 
redis-server conf/myredis02.conf 
#查看是否成功开启
ps -ef|grep redis

命令行配置(效果暂时,一般是配置文件配置)

#仅在从机进行配置即可,我的两个从机端口为6370和6371
127.0.0.1:6370> SLAVEOF 127.0.0.1 6379
OK
127.0.0.1:6371> SLAVEOF 127.0.0.1 6379
OK
#此时查看主机信息可以看见两个从机已经连接
127.0.0.1:6379> INFO replication
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6371,state=online,offset=280,lag=1
slave1:ip=127.0.0.1,port=6370,state=online,offset=280,lag=1
master_failover_state:no-failover
master_replid:d0f2fce55c4ee9f4403b7ff342ca7e43ef38d470
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:280
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:280
# 从机使用此命令可以重新变为主机
127.0.0.1:6371> SLAVEOF no one 

配置文件配置

# 进入REPLICATION部分,修改从机配置文件
replicaof <masterip> <masterport>

测试细节

  • 主机能读写,从机只能读,且从机会自动复制主机内容
  • 主机宕机,从机只能进行读操作
  • 若命令行操作,从机宕机,重新启动后变为主机,重新设置变为从机后可获取主机最新信息

复制原理

Slave 启动成功连接到 master 后会发送一个sync命令,Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。但是只要是重新连接master,一次完全同步(全量复制)将被自动执行

  • 全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
  • 增量复制:Master 继续将新的所有收集到的修改命令依次传给slave,完成同步

2、哨兵模式

哨兵模式能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。

一般哨兵模式要开启6个进程,假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线

img

测试配置

# 一主二从配置不变,加入哨兵进程
# 进入redis目录
cd /usr/local/bin/
# 复制3个哨兵配置文件sentinel.conf
cp /opt/redis-6.2.4/sentinel.conf conf/sentinel1.conf 
cp /opt/redis-6.2.4/sentinel.conf conf/sentinel2.conf 
cp /opt/redis-6.2.4/sentinel.conf conf/sentinel3.conf 

依次修改3份哨兵配置文件,保证端口、pid文件和日志文件不重名,日志文件在/tmp目录下

port 26381
daemonize yes
pidfile "/var/run/redis-sentinel26381.pid"
logfile "26381.log"
dir "/tmp"
#这里是最重要的,后四个依次是master别名,master的ip、端口号以及得票多少才能成为主机,一般是哨兵一半加一
sentinel monitor mymaster 127.0.0.1 6379 2
#在当前目录下依次启动,即完成哨兵模式
redis-sentinel conf/sentinel1.conf
redis-sentinel conf/sentinel2.conf
redis-sentinel conf/sentinel3.conf
#此时若6379主机宕机后,哨兵模式会自动选举产生新的主服务器,当6379重启后,自动变成从机,可以进入/tmp查看日志

配置文件详解

# 哨兵sentinel实例运行的端口 默认26379
port 26379
# 是否后台启动
daemonize yes
# 运行时PID文件
pidfile /var/run/redis-sentinel.pid
# 日志文件(绝对路径)
logfile "/opt/app/redis6/sentinel.log"
# 数据目录
dir "/tmp"
# 哨兵sentinel监控的redis主节点的 ip port 
# master-name  可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 2
# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
# 哨兵连接主节点多长时间没有响应就代表主节点挂了,单位毫秒。默认30000毫秒,30秒。
sentinel down-after-milliseconds mymaster 30000
# 在故障转移时,最多有多少从节点对新的主节点进行同步。这个值越小完成故障转移的时间就越长,这个值越大就意味着越多的从节点因为同步数据而暂时阻塞不可用
sentinel parallel-syncs mymaster 1
# 故障转移的超时时间,默认3分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000
#禁止使用SENTINEL SET设置notification-script和client-reconfig-script
sentinel deny-scripts-reconfig yes
# 配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
# 通知脚本
# sentinel notification-script <master-name> <script-path>
sentinel notification-script mymaster /var/redis/notify.sh
# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
# sentinel client-reconfig-script <master-name> <script-path>
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh

3、Redis集群

Redis集群由多个节点(Node)组成,Redis 的数据分布在这些节点中。集群中的节点分为主节点和从节点,只有主节点负责读写请求和集群信息的维护,从节点只进行主节点数据和状态信息的复制。Redis集群采用哈希分区的方式对数据进行分区,哈希分区就是对数据的特征值进行哈希,然后根据哈希值决定数据放在哪个节点。其中redis cluster集群是去中心化的,每个节点都是平等的,连接哪个节点都可以获取和设置数据。

Redis集群的作用有下面几点:

  • 数据分区:突破单机的存储限制,将数据分散到多个不同的节点存储;
  • 负载均衡:每个主节点都可以处理读写请求,提高了并发能力;
  • 高可用:集群有着和哨兵模式类似的故障转移能力,提升集群的稳定性;

普通端口:即客户端访问端口,如默认的6379;

集群端口:普通端口号加10000,如6379的集群端口为16379,用于集群节点之间的通讯

image-20210607155751506

配置

分配6个配置文件

IDIPHost类型从节点
A127.0.0.16381AA
B127.0.0.16382BB
C127.0.0.16383CC
AA127.0.0.16391/
BB127.0.0.16392/
CC127.0.0.16393/
#分别修改6个目录中的redis.conf文件,主要开启集群以及修改端口和文件路径
#举例其中一个
port 6381
port 26381
daemonize yes
pidfile "/var/run/redis-sentinel26381.pid"
logfile "26381.log"
cluster-enabled yes                            # 启用集群模式
cluster-node-timeout 15000                     # 设置当前节点连接超时毫秒数
#设置当前节点集群配置文件路径,该文件由集群自动维护,如果有则使用文件中的配置启动;如果没有,则初始化配置并将配置保存到文件中。
cluster-config-file node_6381.conf             
#=========================================
#启动,前三个表示主机,后三个表示从机
#这里的--cluster-replicas表示每个主节点有几个副本节点
redis-cli --cluster create 127.0.0.1:6381 127.0.0.1:6382 127.0.0.1:6383 127.0.0.1:6391 127.0.0.1:6392 127.0.0.1:6393 --cluster-replicas 1
# -c,使用集群方式登录
redis-cli -c [-h 192.168.30.128] -p 7001 [-a 123456]    
#集群状态
CLUSTER INFO     
#列出节点信息
CLUSTER NODES                  

十、Redis缓存

1、缓存穿透

缓存穿透是指查询一个根本不存在的数据,缓存层和持久层都不会命中。在日常工作中出于容错的考虑,如果从持久层查不到数据则不写入缓存层,缓存穿透将导致不存在的数据每次请求都要到持久层去查询,失去了缓存保护后端持久的意义

image-20210608101514039

2、缓存击穿

系统中存在以下两个问题时需要引起注意:当前key是一个热点key(例如一个秒杀活动),并发量非常大;重建缓存不能在短时间完成,可能是一个复杂计算,例如复杂的SQL、多次IO、多个依赖等。在缓存失效的瞬间,有大量线程来重建缓存,造成后端负载加大,甚至可能会让应用崩溃。

image-20210607170758835

3、缓存雪崩

由于缓存层承载着大量请求,有效地保护了存储层,但是如果缓存层由于某些原因不可用(宕机)或者大量缓存由于超时时间相同在同一时间段失效(大批key失效/热点数据失效),大量请求直接到达存储层,存储层压力过大导致系统雪崩。

image-20210607170538553


参考文章:
https://blog.csdn.net/wsdc0521/article/details/106316972
https://blog.csdn.net/weixin_43445935/article/details/115393205
https://www.bilibili.com/video/BV1S54y1R7SB?p=12&spm_id_from=pageDriver

这篇关于Redis6.0学习笔记的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!