在高并发的场景下,用传统的关系型数据库很难满足性能方面要求,越来越挑剔的用户也要求我们的系统不仅要关注功能特性,同时追求极致的产品体验,也就是所说的高性和高可用,那么不免要引入第三方缓存中间件NoSQL,比较熟知有:mongoDb、memercache等,还有今天的主题redis,这些中间件在互联网公司使用非常频繁,而我所在唯品会,这些缓存中间件大量使用,比如商品使用memercache,同时redis也在公司内部大量使用,而我当时在做开放平台的网关时,技术选型就是redis。
作为开源键值型NoSQL技术,redis越来越受欢迎,主要是因为它的稳定性、灵活性及强大的功能,它被广泛用在企业级中,被一些公司广泛采用,包括初创型公司及大型技术公司,比如twitter、UBer。
在实际工作中,我们通常选用redis缓存MYSQL数据库的查询结果、包括网站评论、API计数等,还有基于redis做分布式锁,那么Redis到底有哪些特性,让这么多开发者眷恋redis,现总结一下:
由于redis是单线程的设计也给Redis带来一些问题:
暂略,具体可以参考redis的官网:https://redis.io/
在之前的主从模式下,如果master出问题,则需要运维手工将从节点上升为主节点,特别是线上出问题需要人工介入,影响用户体验,也是不可接受的,因此在redis2.8版本引入稳定版sentinel自动选举解决此问题。
消息队列好处非常之多,应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等等功能,其作为分布式系统架构中的一个重要组件,有着举足轻重的地位,当前使用较多的消息队列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等,各种开源的 MQ 已经足够使用了,为什么需要用 Redis实现MQ呢?有些简单的业务场景,可能不需要重量级的 MQ 组件(相比 Redis 来说,Kafka 和 RabbitMQ 都算是重量级的消息队列),用redis消息功能还是非常不错的选择。
redis实现消息队列主要有三种方式,分别是List、Streams、pub/sub 模式,下面分别介绍一下:
Redis列表是简单的字符串列表,按照插入顺序排序,所以常用来做异步队列使用。将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理,具体可以直接使用Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop,具体示例如下:
/** * 消息生产者 */ public class Producer extends Thread { public static final String MESSAGE_KEY = "message:queue"; private Jedis jedis; private String producerName; private volatile int count; public Producer(String name) { this.producerName = name; init(); } private void init() { jedis = MyJedisFactory.getLocalJedis(); } public void putMessage(String message) { Long size = jedis.lpush(MESSAGE_KEY, message); System.out.println(producerName + ": 当前未被处理消息条数为:" + size); count++; } public int getCount() { return count; } @Override public void run() { try { while (true) { putMessage(StringUtils.generate32Str()); TimeUnit.SECONDS.sleep(1); } } catch (InterruptedException e) { } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException{ Producer producer = new Producer("myProducer"); producer.start(); for(; ;) { System.out.println("main : 已存储消息条数:" + producer.getCount()); TimeUnit.SECONDS.sleep(10); } } } /** * 消息消费者 */ public class Customer extends Thread{ private String customerName; private volatile int count; private Jedis jedis; public Customer(String name) { this.customerName = name; init(); } private void init() { jedis = MyJedisFactory.getLocalJedis(); } public void processMessage() { String message = jedis.rpop(Producer.MESSAGE_KEY); if(message != null) { count++; handle(message); } } public void handle(String message) { System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条"); } @Override public void run() { while (true) { processMessage(); } } public static void main(String[] args) { Customer customer = new Customer("yamikaze"); customer.start(); } }
以下是List的一常用命令。
通过 LPUSH,RPOP 这样的方式,会存在一个性能风险点,就是消费者如果想要及时的处理数据,就要在程序中写个类似 while(true) 这样的逻辑,不停的去调用 RPOP 或 LPOP 命令,这就会给消费者程序带来些不必要的性能损失。所以,Redis 还提供了 BLPOP、BRPOP 这种阻塞式读取的命令(带 B-Bloking的都是阻塞式),客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据,这种方式就节省了不必要的 CPU 开销。
消息模式一般是两种,分别是点对点(Point-to-Point)和发布订阅,而List实现方式其实就是点对点的模式,下边我们再看下Redis 的发布订阅模式(消息多播),"发布/订阅"模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息。Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式, 这个功能提供两种信息机制, 分别是订阅/发布到频道和订阅/发布到模式。
具体样例如下:
public class Publisher { private Jedis publisherJedis; private String channel; public Publisher(Jedis publishJedis,String channel){ this.publisherJedis=publishJedis; this.channel=channel; } public void startPublish(){ try{ BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); while(true){ System.out.println("请输入message:"); String line = reader.readLine(); if(!"quit".equals(line)){ publisherJedis.publish(channel, line); }else{ break; } } }catch(Exception e){ e.printStackTrace(); } } } public class Subscriber extends JedisPubSub { @Override public void onMessage(String channel, String message) { System.out.println("Channel:" + channel + ",Message:" + message); } @Override public void onPMessage(String pattern, String channel, String message) { System.out.println("Pattern:" + pattern + ",Channel:" + channel + ",Message:" + message); } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println("onSubscribe---channel:"+channel+",subscribedChannels:"+subscribedChannels); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { System.out.println("onPUnsubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { System.out.println("onPSubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels); } }
订阅发布:模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。
Redis 发布订阅 (pub/sub) 有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃,而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。
Redis 5.0 版本新增了一个更强大的数据结构——Stream。它提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。它就像是个仅追加内容的消息链表,把所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容,而且消息是持久化的,基本参考了kafka的设计思想,具体不多讲,可以了解一下kafka:https://blog.csdn.net/weixin_42152237/article/details/119764900
对于数据管理,redis采用的是定期删除+惰性删除策略,为什么采用这些样的策略管理方式呢,因为定时删除,用一个定时器来负责监视key,过期则自动删除。虽然内存及时释放,但是十分消耗CPU资源。在大并发请求下,我们应该保证业务正常运作,而不是删除key,因此采用了定时删除和惰性删除策略。所谓定时删除,一般默认100毫秒检测一次,每次检查只是随机抽样,而惰性删除,当获取某个key的时候,则先检查key是否设置了过期时间,如果过期了此时就会删除。
以上策略,确实是缓减了对CPU的占用,尽可能保证业务正常运作,但存一个问题,因为定时删除不一定能清除掉数据,同时,如果对于某些KEY用户很长一段时间没有访问,也可能会造成内存占用越来越多,为了解决这个问题,redis 还供了其它配置来回收内存,就是在redis.conf配置文件可以找到“# maxmemory-policy volatile-lru”,来配制相关机制:
redis提供了不同级别的持久化方式,一种是RDB,另一种是AOF,可以同时开启两种持久化方式,在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。
Redis虽然读取写入的速度都特别快,但是也会产生读压力特别大的情况,为了分担读压力,Redis支持主从复制,Redis的主从结构可以采用一主多从或者级联结构,Redis主从复制可以根据是否是全量分为全量同步和增量同步。
Redis全量复制一般发生在Slave初始化阶段,这时Slave需要将Master上的所有数据都复制一份。具体步骤如下:
1)从服务器连接主服务器,发送SYNC命令;
2)主服务器接收到SYNC命名后,开始执行BGSAVE命令生成RDB文件并使用缓冲区记录此后执行的所有写命令;
3)主服务器BGSAVE执行完后,向所有从服务器发送快照文件,并在发送期间继续记录被执行的写命令;
4)从服务器收到快照文件后丢弃所有旧数据,载入收到的快照;
5)主服务器快照发送完毕后开始向从服务器发送缓冲区中的写命令;
6)从服务器完成对快照的载入,开始接收命令请求,并执行来自主服务器缓冲区的写命令;
完成上面几个步骤后就完成了从服务器数据初始化的所有操作,从服务器此时可以接收来自用户的读请求。
Redis增量复制是指Slave初始化后开始正常工作时主服务器发生的写操作同步到从服务器的过程。 增量复制的过程主要是主服务器每执行一个写命令就会向从服务器发送相同的写命令,从服务器接收并执行收到的写命令。
主从刚刚连接的时候,进行全量同步,全同步结束后,进行增量同步。当然,如果有需要,slave 在任何时候都可以发起全量同步,redis 策略是,无论如何,首先会尝试进行增量同步,如不成功,要求从机进行全量同步。
redis cluster提供的功能主要有:数据自动分片、hash tags等功能,具体如下:
Redis cluster是Redis的分布式解决方案,在3.0版本推出后有效地解决了redis分布式方面的需求,在3.0之前为了解决容量高可用用方面的需求基本上只能通过客户端分片+redis sentinel或者代理(twemproxy、codis)方案解决、redis cluster非常优雅地解决了redis集群方面的问题。
Redis采用去中心的设计方案,通过虚拟16384个槽,将每个key映射到每一个具体的槽上,而每个redis节点可以负责管理一定数量的槽,假设有三个redis-cluster中有三个主节点,其槽可能分布如下图:
1)、Gossip协议
2)、meet消息:用于通知新节点加入,消息发送这通知消息接收者加入集群,消息接收者回复pong消息,当新节点加入到集群后各节点就通过ping、pong消息进行信息交换。
3)、ping:集群中每个节点每秒向集群中多个节点发送ping消息,用于节点活性检测和状态交换。
4)、pong:pong作为meet消息和ping消息的响应消息,响应自身状态。也可以通过pong消息向集群中其他阶段广播自身的状态。
5)、fail:fail消息用于向集群中其他阶段广播某个节点已经下线。
6)、消息格式:这些ping、pong等消息具体包括哪些信息呢?在cluster.h头文件中可以看到消息主要是由消息头和消息体组成:
/* 消息*/ typedef struct { char sig[4]; /* Siganture "RCmb" (Redis Cluster message bus). */ uint32_t totlen; /* Total length of this message */ uint16_t ver; /* Protocol version, currently set to 1. */ uint16_t port; /* TCP base port number. */ uint16_t type; /* Message type */ uint16_t count; /* Only used for some kind of messages. */ uint64_t currentEpoch; /* The epoch accordingly to the sending node. */ uint64_t configEpoch; /* The config epoch if it's a master, or the last epoch advertised by its master if it is a slave. */ uint64_t offset; /* Master replication offset if node is a master or processed replication offset if node is a slave. */ char sender[CLUSTER_NAMELEN]; /* Name of the sender node */ unsigned char myslots[CLUSTER_SLOTS/8]; char slaveof[CLUSTER_NAMELEN]; char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */ char notused1[34]; /* 34 bytes reserved for future usage. */ uint16_t cport; /* Sender TCP cluster bus port */ uint16_t flags; /* Sender node flags */ unsigned char state; /* Cluster state from the POV of the sender */ unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */ union clusterMsgData data; /* message body*/ } clusterMsg;
/* message body difinition */ union clusterMsgData { /* PING, MEET and PONG */ struct { /* Array of N clusterMsgDataGossip structures */ clusterMsgDataGossip gossip[1]; } ping; /* FAIL */ struct { clusterMsgDataFail about; } fail; /* PUBLISH */ struct { clusterMsgDataPublish msg; } publish; /* UPDATE */ struct { clusterMsgDataUpdate nodecfg; } update; };
/* MEET, PING and PONG*/ typedef struct { char nodename[CLUSTER_NAMELEN]; uint32_t ping_sent; uint32_t pong_received; char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */ uint16_t port; /* base port last time it was seen */ uint16_t cport; /* cluster port last time it was seen */ uint16_t flags; /* node->flags copy */ uint32_t notused1; } clusterMsgDataGossip;
/* fail message */ typedef struct { char nodename[CLUSTER_NAMELEN]; } clusterMsgDataFail;
当集群出现容量限制或者其他一些原因需要扩容时,redis cluster提供了比较优雅的集群扩容方案。
首先将新节点加入到集群中,可以通过在集群中任何一个客户端执行cluster meet 新节点ip:端口,或者通过redis-trib add node添加,新添加的节点默认在集群中都是主节点。
迁移数据的大致流程是,首先需要确定哪些槽需要被迁移到目标节点,然后获取槽中key,将槽中的key全部迁移到目标节点,然后向集群所有主节点广播槽(数据)全部迁移到了目标节点。直接通过redis-trib工具做数据迁移很方便。 现在假设将节点A的槽10迁移到B节点,过程如下:
B:cluster setslot 10 importing A.nodeId
A:cluster setslot 10 migrating B.nodeId
循环获取槽中key,将key迁移到B节点
A:cluster getkeysinslot 10 100
A:migrate B.ip B.port “” 0 5000 keys key1[ key2…]
向集群广播槽已经迁移到B节点
cluster setslot 10 node B.nodeId
缩容的大致过程与扩容一致,需要判断下线的节点是否是主节点,以及主节点上是否有槽,若主节点上有槽,需要将槽迁移到集群中其他主节点,槽迁移完成之后,需要向其他节点广播该节点准备下线(cluster forget nodeId)。最后需要将该下线主节点的从节点指向其他主节点,当然最好是先将从节点下线。
节点之间通过gossip消息进行通信,当A节点发送PING消息给B节点,若没有收到B节点回复的PONG消息,持续cluster_node_timeout时长,则A节点会判定B节点已经下线了。待A判定B节点下线后,就会向集群广播B节点下线的消息。若集群中大部分节点都认为B节点下线后就会真正地下线B节点。
当某个节点判断另外一个节点下线后,相应的节点状态会跟随消息在集群内传播,当集群中半数以上的主节点都标记该节点下线时,就会出发下线B节点的操作。集群中每个节点在收到其他节点发送的pfail状态时,都会尝试触发下线的操作,只要当前节点是主节点且半数以上主节点判定某节点下线就会向集群中广播fail消息,立即下线问题节点,从而触发从节点的故障转移流程。
当问题节点下线后,如果该下线节点是带有槽的主节点,则需要从它的从节点选出一个替换它,当问题节点的从节点发现其主节点下线时,将会出发故障恢复流程。
确定参与选主的节点,并不是所有的从节点都能参与到故障恢复的流程中,若从节点与问题主节点的断线时间超过cluster_node_timeout * cluster-slave-validity-factor时,该从节点不能参与到后续恢复流程。选举过程如下:
cluster内部通过一个延迟出发的机制,从节点中具有更大复制偏移量的从节点具有优先发起选主的权利。从节点维护着一个执行故障选主的时间,并且有定时任务检测选主时间,若达到故障选主时间后,则发起选主。
1) 更新配置纪元(epoch)
配置纪元是一个只增不减的整数,每个主节点自身都维护一个配置纪元,所有主节点的配置纪元都不想等,从节点会复制主节点的配置纪元。
2)广播选主消息
在集群内广播选主消息(FAIL_AUTH_REQUEST),并记录已发送过消息的状态,保证该从节点在一个配置纪元内只能发起一次选举。
只有持有槽的主节点才会处理故障选主消息,因为每个持有槽的节点在一个配置纪元内都有唯一的一张选票,当接到第一个请求投票的从节点消息时回复FAILOVER_AUTH_ACK消息作为投票,之后相同配置纪元内的其他从节点的选主消息将忽略。
投票过程其实是一个领导者选举的过程,如集群内有N个持有槽的主节点代表有N张票,只要有N/2 + 1张票投给同一个从节点,则该从节点就将晋升为master。每个配置纪元代表了一次选主的周期,在开始投票后的cluster-node-timeout * 2的时间内没有获得足够数量的投票,则本次选举作废,需要发起下一次选主。
当从节点收到足够选票后,触发替换直接点的操作:
1)当前从节点取消复制变为主节点。
2)执行cluster delslot葱啊做撤销故障主节点负责的槽,并执行cluster addslot把这些槽委派给自己,
3)向集群广播PONG消息,通知集群内所有节点当前从节点变为主节点并接管故障主节点的槽信息。
由于redis-cluster采用去中心化的架构,集群的主节点各自负责一部分槽,客户端不确定key到底会映射到哪个节点上。redis-cluste通过重定向解决这个问题。在没有使用cluster模式时,redis对请求的处理很简单,若key存在于自身节点,则直接返回结果,若key不存在则告诉客户端key不存在,在使用cluster模式时,对请求的处理就变得复杂起来。在cluster模式下,节点对请求的处理过程如下:
1、检查当前key是否存在当前NODE?
1)通过crc16(key)/16384计算出slot
2)查询负责该slot负责的节点,得到节点指针
3)该指针与自身节点比较
2、若slot不是由自身负责,则返回MOVED重定向
3、若slot由自身负责,且key在slot中,则返回该key对应结果
4、若key不存在此slot中,检查该slot是否正在迁出(MIGRATING)?
5、若key正在迁出,返回ASK错误重定向客户端到迁移的目的服务器上
6、若Slot未迁出,检查Slot是否导入中?
7、若Slot导入中且有ASKING标记,则直接操作
8、否则返回MOVED重定向
每个客户端可以随便发起请求到集群中的任意一个节点,包括从节点。节点将会解析请求并计算该key对应slot是否由该节点负责,若不是由该节点负责的槽则返回一个MOVED错误。如下:
127.0.0.1:6379> get key1 (error) MOVED 9189 127.0.0.1:6382
其中MOVED错误信息会包含该key对应的slot以及slot所在节点的ip和端口。
客户端并不要求保存slot与节点的对应关系,但是为了高性能,客户端应该保存一下slot与节点的对应关系。发送请求时只需要先计算key对应的slot然后通过slot获取对应的节点,待客户端收到MOVED错误时更新一下slot与节点的对应关系。
ASK重定向主要是解决slot迁移时,同一个槽信息存在两个节点上,但是槽中的key还没有全部迁移完成,避免客户端重新获取slot重新获取slot与节点的对应关系。
127.0.0.1:6379> get key1 (error) ASK 9189 127.0.0.1:6382
其格式与MOVED错误差不多,包括该key对应的slot,以及迁出目节点的ip和端口。
在集群模式下,所有的publish命令都会向所有节点(包括从节点)进行广播,造成每条publish数据都会在集群内所有节点传播一次,加重了带宽负担,对于在有大量节点的集群中频繁使用pub,会严重消耗带宽,不建议使用。
对multi-key操作支持不够,虽然支持hash tag,但在迁移槽的过程中也会出现不可用。只支持单层复制;不支持节点自动发现,必须手动广播meet消息。
就nosql来说,Redis成绩已经很惊人了,就说支持多线程的memcached,速度似乎也只能达到这个级别。我们平常除了把redis当为缓存来使用外,而随着版本的迭代,功能已越来越强大,也希望给大家带来更多惊喜。