Redis教程

Redis【三】持久化&管道&事务&消息队列

本文主要是介绍Redis【三】持久化&管道&事务&消息队列,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录

一、线程IO模型

二、持久化

1.快照的原理

2.AOF原理

3.AOF重写

4.fsync

5.Redis4.0混合持久化

三、管道(Pipeline)

四、事务

1.事务简介

2.Redis事务不具备原子性

3.watch

五、PubSub

1.消息多播

2.PubSub

3.PubSub的缺点

六、Stream

1.结构介绍

2.相关命令

3.PEL如何避免消息丢失

4.Stream的高可用

5.分区Partition

七、Info指令


一、线程IO模型

Redis是单线程程序,由于它的所有数据都在内存中,所有的运算都是内存级别的运算,所以能非常快速的处理数据。正因为它是单线程程序,所以对于一些复杂度为O(n)级别的指令要小心使用,否则可能造成Redis卡顿。Redis底层采用非阻塞IO处理客户端连接事件以及读写事件。非阻塞IO的读写方法是当前能读多少就读多少,能写多少就写多少。能读多少取决于内核为套接字分配的读缓冲区内部的数据字节数,能写多少取决于内核为套接字分配的写缓冲区的空闲空间字节数。读方法和写方法都会通过返回值来告知程序实际读写了多少字节。

非阻塞IO采用一种事件机制来处理侦听连接就绪、读写、连接事件,在程序中我们轮询来捕获相应的事件进行不同的逻辑处理。那么我们的程序是如何得知事件发生的呢?比如select函数是操作系统提供给用户程序的API,输入是读写描述符列表,输出是与之对应的可读可写事件。因为我们通过socket系统调用同时处理多个通道描述符的读写事件,因此我们将这类系统调用称为多路复用API。

Redis会将每个客户端套接字都关联一个指令队列。客户端的指令通过队列来排序处理,先来先服务。同样的,Redis会为每个客户端套接字关联一个响应队列。Redis服务器通过响应队列来将指令的返回结果回复给客户端。如果队列为空,那么意味着连接暂时处于空闲状态。

二、持久化

Redis的数据全部都在内存里,如果突然宕机,数据就会全部丢失,因此必须有一种机制来保证Redis的数据不会因为故障而丢失,这种机制就是Redis的持久化机制。

Redis的持久化机制有两种,一种是快照,另一种是AOF日志。

  • 快照是一次性全量备份;AOF日志是连续的增量备份。
  • 快照是内存内存数据的二进制序列化形式,在存储上非常紧凑;而AOF日志记录的是内存数据修改的指令记录文本。

AOF日志在长期运行过程中变得无比庞大,数据库重启时需要加载AOF日志进行指令重放,这个过程无比漫长,所以需要定期进行AOF重写,给AOF日志进行瘦身。

1.快照的原理

Redis是单线程程序,这个线程要同时负责多个客户端套接字的并发读写操作和内存数据结构的逻辑读写。在服务线上请求的同时,Redis还需要进行内存快照,内存快照要求Redis必须进行文件IO操作。文件IO操作会严重拖慢服务器的性能。

为了不阻塞线上业务,Redis需要一边做持久化,一边响应客户端的请求,且持久化的同时,内存数据结构可能还在改变。Redis使用操作系统的多进程COW(Copy On Write写时复制)机制来实现快照持久化。Redis在持久化时会fork一个子进程(子进程刚产生时,他和父进程共享内存里面的代码段和数据段)。子进程做数据持久化,不会修改现有的内存数据结构,他只是对数据结构进行遍历读取,然后序列化到磁盘。父进程持续服务客户端请求,然后对内存数据及进行不间断的修改。

父进程对内存数据进行修改是采用COW机制,父进程对该数据进行修改时会先将被共享的数据复制一份出来然后对这个副本进行修改,这时子进程看到的数据还是之前的原始数据,子进程因为数据没有变化,他能看到的内存里的数据在进程产生的一瞬间就被“凝固”了,再也不会改变,这也是为什么Redis的持久化叫快照的原因(注意:在序列化操作过程中发生的数据修改没法被记录)。

2.AOF原理

AOF日志存储的是Redis服务器的顺序指令序列,AOF日志只记录对内存进行修改的指令记录。假设AOF日志记录了自Redis实例创建以来所有的修改性指令序列,那么就可以通过对一个空的Redis实例顺序执行所有的指令(指令重放),来恢复Redis当前实例的内存数据结构的状态。

Redis会在收到客户端修改指令后,进行参数效验,逻辑处理,如果没有问题,就立刻将改指令文本存储到AOF日志中(先执行指令,再存盘)。

3.AOF重写

Redis提供了bgrewriteaof指令用于对AOF日志进行瘦身,其原理就是开辟一个子进程对内存遍历,转换成一系列Redis的操作指令,序列化到一个新的AOF日志文件中。序列化完毕后再将操作期间发生的增量AOF日志追加到这个新的AOF日志文件中,追加完毕后就立即替代旧的AOF日志文件,瘦身工作就完成了。

4.fsync

AOF日志是以文件的形式存在的,当程序对AOF日志文件进行写操作时,实际上时将内容写到了内核为文件描述符分配的一个内存缓存中,然后内核会异步将脏数据刷回到磁盘。这就意味着如果出现掉电宕机等情况,AOF日志内容可能还没有来得及完全刷到磁盘中,这个时候就可能会出现日志丢失。Linux的glibc提供了fsync(int fd)函数可以将指定文件的内容强制从内核缓存刷新到磁盘。只要Redis进行实时调用fsync就可以保证AOF日志不丢失。fsync是一个文件IO操作,他很慢,如果Redis每次执行一条指令就要fsync一次,那它的性能就会大打折扣。

所以在生产环境的服务器中,Redis通常是每隔1s左右执行一次fsync操作,这个1s的周期是可以配置的。这是在数据安全性和性能之间做一个折中,在保持高性能的同时,尽可能减少数据丢失。

5.Redis4.0混合持久化

重启Redis时,如果使用快照备份的方式恢复内存状态,会丢失大量数据;如果使用AOF日志方式,那么启动会非常耗时。Redis4.0为了解决这个问题,带来了一个新的持久化选项,即混合持久化。将快照文件的内容和增量AOF日志文件存在一起(注意,这是增量AOF日志文件,就是从持久化开始到持久化结束期间发生的增量AOF日志。不是全量,这种日志文件一般很小)。于是,在Redis重启的时候,可以先加载快照中的内容,然后再进行指令重放,这就保证了重启效率和数据安全。

三、管道(Pipeline)

当我们使用客户端对Redis进行一次操作时,客户端将请求传送给服务器,服务器处理完毕后,再将响应回复给客户端。这个过程要花费一个网络数据包来回的时间。如果连续执行多条指令,那就会花费多个网络数据包的来回时间。

假设客户端执行两条指令,那么站在socket层面,客户端需要经历“写-读-写-读”四个操作才能完整地执行这两条指令。如果将上述操作调整顺序,改成“写-写-读-读”这两个指令依然可以顺利完成,那么其实两次写可以合并,两次读操作也可以合并,将合并后地写发送给服务器,服务器将结果合并响应给客户端,这样两条指令就只花费了一次网络数据交互。这就是管道操作的本质,它并不是服务器的什么特性,而是客户端对指令列表改变读写顺序从而大幅节省了IO时间,管道中指令越多,效果越好。

四、事务

1.事务简介

为了确保连续多个操作的原子性,一个成熟的数据库通常都会有事务支持,Redis也一样,不同于关系数据库,我们无需理解那么多的事务模型就可以直接使用。不过正是因为简单,它的事务模型很不严格,这要求我们不能像使用关系数据库的事务一样来使用Redis事务。

Redis提供事务操作指令:multi(开启事务)、exec(提交事务)、discard(丢弃事务)

在开启事务之后,提交事务之前,Redis对这期间的指令都不执行,而是缓存在服务器的一个事务队列中,服务器一旦收到exec指令,才开始执行整个事务队列中的指令,执行完毕之后一次性返回所有指令的运行结果。而如果收到discard指令,则服务器丢弃事务队列中的所有指令。

2.Redis事务不具备原子性

如上操作,开启事务之后执行了如下几个操作(其中QUEUED表示加入事务队列成功):

  • 设置age,并对其自增1
  • 设置name,并对其自增1
  • 设置sex

事务的原子性只得是,同一个事务内得操作,要么一起成功,要么一起失败,可是通过上述示例可以看出Redis事务并不具备这种原子性,因为我们成功将age自增1,并且成功设置了sex。

上面得Redis事务在发送每个指令到事务队列时都要经过一次网络读写,当一个事务内部得指令过多时,需要得网络IO时间也会线性增长,所以我们在执行事务时可以结合管道做优化。像这种:

Pipeline pipeline = jedis.pipelined();
pipeline.multi();
 //……执行一些指令
pipeline.exec();
pipeline.close();

3.watch

watch相当于一个乐观锁,我们可以利用它来解决并发修改问题。比如两个客户端A和B要修改某个数据(比如容器结构中的存储数据或者字符串)假设客户端A先读取数据然后去内存中处理完后将其修改到Redis中,如果在A修改过程中该数据已经被B修改并更新到Redis中了,那对于该数据的操作就是不安全的。

watch会在事务开始之前盯住一个关键变量(禁止在mutil于exec之间执行watch指令),当提交事务时,Redis首先会检查关键变量自watch(被盯住)之后是否被修改了,如果关键变量被修改了,exec指令就会返回NULL告知客户端执行失败,这个时候客户端一般会选择重试。当服务器给exec指令返回一个NULL回复时,客户端知道了事务执行是失败的,通常客户端都会抛出一个WatchError之类的错误,不过也有些语言(Jedis)不会抛出异常,而是从exec()方法中返回NULL。

public static void main(String[] args) throws InterruptedException {
    Jedis jedis = new Jedis();
    while(true){
        //每次在事务之前盯住这个list
        jedis.watch("list");
        
        //第一次10s期间我对列表进行了push操作
        Thread.sleep(10000);

        Transaction tx = jedis.multi();
        tx.rpush("list","yhj");
        List<Object> res = tx.exec();
        if(res != null){
            break;
        }else{
            System.out.println("有人对该list进行了push或者pop操作");
        }
    }
}

另外Redis提供UNWATCH命令,可以取消对所有关键变量的监控。

五、PubSub

1.消息多播

消息多播允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由相应的消费组进行消费,它是分布式系统常用的一种解耦方式,用于将多个消息组的逻辑进行拆分。支持消息多播,多个消费组的逻辑就可以放到不同的子系统中。

如果只是普通的消息队列,不支持消息多播的话,就得将多个不同的消费组逻辑串联起来放在一个子系统中,进行连续消费。

2.PubSub

为了支持消息多播,Redis不能再依赖于那5中基本数据类型了,它是单独使用了一个模块来支持消息多播,这个模块的名字叫PubSub,也就是发布者/订阅者模式。Redis提供了发布/订阅相关的操作命令:

PSUBSCRIBE pattern [pattern ...]               //订阅一个或多个符合给定模式的频道。
PUBSUB subcommand [argument [argument ...]]    //查看订阅与发布系统状态。
PUBLISH channel message                        //将信息发送到指定的频道。
PUNSUBSCRIBE [pattern [pattern ...]]           //退订所有给定模式的频道。
SUBSCRIBE channel [channel ...]                //订阅指定的一个或多个频道。
UNSUBSCRIBE [channel [channel ...]]            //退订指定的频道。

下面使用Jedis演示PubSub:

public class PubSubProcessor extends JedisPubSub {
    // 接收到订阅频道消息后置处理
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("接收到[" + channel + "]的消息:\t" + message );
    }

    //订阅成功后置处理
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("成功订阅频道["+channel+"]");
    }

    // 取消订阅后置处理
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println("取消[" + channel + "]成功");
    }
}

生产者:

public class Publisher {
    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        jedis.publish("codehole","java");
        jedis.publish("codehole","python");
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        //订阅频道
        jedis.subscribe(new PubSubProcessor(),"codehole");
    }
}

启动消费者之后会首先会成功订阅频道,然后启动生产者,生产者给频道发送消息,然后消费者会马上接收到,并处理,处理完毕之后还是一直侦听消息。如下为消费者输出,

上面程序是基于名称订阅的,消费者订阅一个频道必须明确指明名称。如果想订阅多个频道,那就subscribe多个名称。消费者可以订阅多个频道,生产者向多个频道发送消息,消费者都可以接收到。为了简化订阅的繁琐,Redis提供了模式订阅功能Pattern Subscribe,这样就可以一次性订阅多个频道,即使生产新增了同模式的频道,消费者也可以立即收到消息。

//以"codehole."开头的频道的消息都是可以被该消费者接收到的
jedis.psubscribe(new PubSubProcessor(),"codehole.*");

JedisPubSub中也提供了一些关于模式订阅的相关处理方法:

//接收到消息的后置处理
public void onPMessage(String pattern, String channel, String message) { 
    //pattern表示是哪一种模式
}

// 订阅成功的后置处理
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
}

3.PubSub的缺点

PubSub的生产者传递过来一个消息,Redis会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息会被丢弃掉。如果一开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息,但是当挂掉的消费者重连上的时候,在断连期间的消息就会丢失。如果Redis停机重启,PubSub的消息是不会持久化的,宕机就相当于一个消费者都没有了,所有的消息会被直接丢弃。

六、Stream

1.结构介绍

Stream是Redis5.0新增的数据结构,它是一个新的强大的支持多播可持久化消息队列,Redis Stream极大地借鉴了Kafka的设计。它的结构如下:

每个Stream都有一个唯一的名称,它就是Redis的key,在我们首次使用xadd指令追加消息时自动创建。每个Stream都可以挂多个消费组(Consumer Group),每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个Stream内唯一名称,消费组不会自动创建,他需要单独的指令xgroup create进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化last_delivered_id变量。

消息ID的形式是timestampInMills-sequence,例如1234567891111-5,他表示当前的消息在毫秒时间戳1234567891111时产生,并且是该毫秒内产生的第5条消息。消息ID可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是“整数-整数”,而且后面加入的消息ID必须要大于前面的消息ID。

每个消费组的状态都是独立的,相互不受影响。也就是说同一份Stream内部的消息会被每个消费组都消费到。

同一个消费组可以挂接多个消费者,这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者有一个组内唯一名称。

消费者内部会有一个状态变量pending_ids,它记录了当前已经被客户端读取,但是还没有ack消息。如果客户端没有ack,这个变量里面的消息ID就会越来越多,一旦某个消息被ack,他就开始减少。这个pending_ids变量在Redis官方被称为PEL,也就是Pending Entries List,这是一个核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没被处理

2.相关命令

消息队列相关命令

XADD key ID field value [field value ...]        // 向队列添加消息,如果指定的队列不存在,则创建一个队列
    //key :队列名称,如果不存在就创建
    //ID :消息ID,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
    //field value : 消息内容(键值对形式)。

XDEL key ID [ID ...]        //从Stream中删除消息
XLEN    //获取Stream消息长度(即有几条消息)
DEL    //删除整个Stream消息列表中的所有消息
XRANGE key start end [COUNT count]    //获取消息列表,会自动过滤已经删除的消息
    //start :开始值, - 表示最小值
    //end :结束值, + 表示最大值
    //count :数量

消费者常用的命令如下:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]         //创建消费者组
    //key :队列名称,如果不存在就创建
    //groupname :组名。
    //$ : 表示从尾部开始消费,只接受新消息,当前Stream消息会全部忽略。
    /*
        如:
        从头开始消费:XGROUP CREATE mystream consumer-group-name 0-0  
        从尾部开始消费:XGROUP CREATE mystream consumer-group-name $
    */
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]    //以阻塞或非阻塞方式获取消息列表(该命令可以在不定义消费组的情况下进行Stream消息的独立消费,当Stream没有消息时可以阻塞等待。该指令可以忽略消费组的存在,将Stream当成普通的消息队列(list)使用)
    //count :数量
    //milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式。如果超时后还没有消息到来就返回nil。如果设置为0则表示永远阻塞,直到消息的到来。
    //key :队列名
    //id :消息 ID

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]      //读取消费者组中的消息(组内消费)
    //group :消费组名
    //consumer :消费者名。
    //count : 读取数量。
    //milliseconds : 阻塞毫秒数。
    //key : 队列名。
    //ID : 消息 ID。

XINFO                    //查看Stream和消费者组的相关信息;
XINFO GROUPS key         //打印该Stream的消费者组信息;
XINFO STREAM key         //打印Stream信息

XACK key group ID[ID ...]//将消息标记为"已处理"

 如下从Stream头部读取两条消息:

客户端如果想要使用XREAD进行顺序消费,那么一定要记住当前消费到哪里了,也就是返回的消息ID。下次继续调用XREAD时,将上次返回的最后一个消息ID作为参数传进去,就可以继续消费后续消息。

创建消费组:

组内消费:

上图中“>”表示从当前消费组的last_delivered_id后面开始读。消费完一条新消息后,对应的消息ID就会进入消费者的PEL结构里,客户端处理完毕后使用XACK指令通知服务器,本条消息已经处理完毕,该消息就会从PEL中移除。该消费组的状态如下:

如果有多个消费者,则可以通过xinfo consumers指令观察每个消费者的状态。

给服务器回复XACK:

Stream在每个消费者结构中保存了正在处理中的消息ID列表PEL,如果消费者收到了消息,处理完毕但是没有回复ack,就会导致PEL列表不断增长,如果有很多消费组的话,那么这个PEL占用的内存就会放大。

3.PEL如何避免消息丢失

当客户端读取Stream消息时,在Redis服务器将消息回复给客户端的过程中,如果客户端突然断开了连接,那么这个消息还没有被客户端收到就丢失了。不过没关系,PEL中已经保存了发出去的消息ID待客户端重新连接上之后,可以再次收到PEL中的消息ID列表。此时xreadgroup的起始消息ID必须是任意有效的消息ID,一般将参数设置为0-0,表示读取所有的PEL消息以及自last_delivered_id之后的新消息。

4.Stream的高可用

Stream的高可用是建立在主从复制基础上的,他和其他数据结构的复制机制没有区别,也就是说在Sentinel和Cluster集群环境下,Stream是可以支持高可用的。不过鉴于Redis的指令复制是异步的,在failover发生时,Redis可能会丢失极小一部分数据,这一点Redis的其他数据结构也一样。

5.分区Partition

Redis的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个Stream,然后在客户端使用一定的策略来生产消息到不同的Stream。

七、Info指令

Info指令显示地信息繁多,分为9大块,每个块都有非常多地参数,Info可以一次性获取所有信息,也可以按块获取信息。这9大块如下:

  1. Server:服务器运行地环境参数。
  2. Clients:客户端相关信息。
  3. Memory:服务器运行内存统计数据。
  4. Persistence:持久化信息。
  5. Stats:通用统计数据。
  6. Replication:主从复制相关信息。
  7. CPU:CPU使用情况。
  8. Cluster:集群信息。
  9. KeySpace:键值对统计数量信息。

 

参考自《Redis深度历险》

这篇关于Redis【三】持久化&管道&事务&消息队列的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!