订阅频道,如果该频道中有新的消息,那么订阅该频道的客户端都可以收到消息。主要涉及的命令:PUBLISH、SUBSCRIBE、PSUBSCRIBE。
订阅某个频道使用SUBSCRIBE命令,如果想订阅多个频道,直接追加即可。
SUBSCRIBE "news.sport" "news.movie"
在redisServer结构中,保存了所有频道的订阅关系:
struct redisServer { // ... // 保存所有频道的订阅关系 dict *pubsub_channels; // ... };
这个字典的键是某个被订阅的频道,而键的值则是一个链表,链表里面记录了所有订阅这个频道的客户端。举例:如果客户端要订阅一个test频道,如果pubsub_channels字典中有这个频道,就添加在这个频道的链表中。否则就在pubsub_channels字典中添加一个key为test的键,值是一个链表,链表中保存客户端的信息。
伪代码:
def subscribe(*all_input_channels): # 遍历输入的所有频道 for channel in all_input_channels: # 如果channel不存在于pubsub_channels字典(没有任何订阅者) # 那么在字典中添加channel键,并设置它的值为空链表 if channel not in server.pubsub_channels: server.pubsub_channels[channel] = [] # 将订阅者添加到频道所对应的链表的末尾 server.pubsub_channels[channel].append(client)
退订和订阅相反,使用的是UNSUBSCRIBE命令。
UNSUBSCRIBE "news.sport" "news.movie"
退订操作就是将该客户端从pubsub_channels中对应频道中的链表中移除。
伪代码:
def unsubscribe(*all_input_channels): # 遍历要退订的所有频道 for channel in all_input_channels: # 在订阅者链表中删除退订的客户端 server.pubsub_channels[channel].remove(client) # 如果频道已经没有任何订阅者了(订阅者链表为空) # 那么将频道从字典中删除 if len(server.pubsub_channels[channel]) == 0: server.pubsub_channels.remove(channel)
订阅模式是什么呢?其实就是通过正在表达式匹配多个频道。new.[ie]t可以匹配上new.it和new.et两个频道,这两个频道的消息,订阅模式的客户端都可以收到。
命令:PSUBSCRIBE。
PSUBSCRIBE "news.*"
服务器也将所有模式的订阅关系都保存在服务器状态的pubsub_patterns属性里面:
struct redisServer { // ... // 保存所有模式订阅关系 list *pubsub_patterns; // ... };
pubsub_patterns属性是一个链表,链表中的每个节点都包含着一个pubsub Pattern结构,这个结构的pattern属性记录了被订阅的模式,而client属性则记录了订阅模式的客户端:
typedef struct pubsubPattern { // 订阅模式的客户端 redisClient *client; // 被订阅的模式 robj *pattern; } pubsubPattern;
订阅模式就是添加一个节点,退订模式就是删除对应的节点。
了解了底层结构,向频道中发消息可以很容易理解了。
当向一个频道发消息时,直接遍历该频道的链表,将消息发送给链表中的所有客户端。之后再遍历模式的链表,找到与频道匹配的节点,将消息发送给该客户端。
给订阅频道的客户端发送消息,伪代码:
def channel_publish(channel, message): # 如果channel键不存在于pubsub_channels字典中 # 那么说明channel频道没有任何订阅者 # 程序不做发送动作,直接返回 if channel not in server.pubsub_channels: return # 运行到这里,说明channel频道至少有一个订阅者 # 程序遍历channel频道的订阅者链表 # 将消息发送给所有订阅者 for subscriber in server.pubsub_channels[channel]: send_message(subscriber, message)
给订阅模式的客户端发送消息,伪代码:
def pattern_publish(channel, message): # 遍历所有模式订阅消息 for pubsubPattern in server.pubsub_patterns: # 如果频道和模式相匹配 if match(channel, pubsubPattern.pattern): # 那么将消息发送给订阅该模式的客户端 send_message(pubsubPattern.client, message)
发送消息的伪代码:
def publish(channel, message): # 将消息发送给channel频道的所有订阅者 channel_publish(channel, message) # 将消息发送给所有和channel频道相匹配的模式的订阅者 pattern_publish(channel, message)
主要涉及几个命令:
PUBSUB CHANNELS[pattern]
查看所有频道的名称,如果指定到了pattern,返回的将是与模式匹配的频道名称:
redis> PUBSUB CHANNELS 1) "news.it" 2) "news.sport" 3) "news.business"
redis> PUBSUB CHANNELS "news.[is]*" 1) "news.it" 2) "news.sport"
返回这些频道的订阅者数量
PUBSUB NUMSUB[channel-1 channel-2...channel-n]
redis> PUBSUB NUMSUB news.it news.sport news.business news.movie 1) "news.it" 2) "3" 3) "news.sport" 4) "2" 5) "news.business" 6) "2" 7) "news.movie" 8) "1"
PUBSUB NUMPAT子命令用于返回服务器当前被订阅模式的数量:
redis> PUBSUB NUMPAT (integer) 3