Redis 提供两个订阅模式:频道(channel)订阅
和 glob-style 模式(pattern)频道订阅
。
struct redisServer 和 struct redisClient 都维护了频道和模式频道,前者维护了所有频道和订阅频道的客户端,后者维护了客户端自己订阅的频道。
struct redisServer { ...... /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ ...... } typedef struct redisClient { ...... // 用户感兴趣的频道 dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ // 用户感兴趣的模式 list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ ...... } redisClient; // 模式频道数据结构,list *pubsub_patterns 里的每个节点数据都是struct // pubsubPattern。 typedef struct pubsubPattern { redisClient *client; robj *pattern; } pubsubPattern;
频道订阅是一个 dict,每个 channel 被哈希进相应的桶,每个 channel 对应一个 clients,clients 都订阅了此 channel。当有消息发布的时候,检索 channel,遍历 clients,发布消息。
模式频道订阅是一个 list。当有消息发布的时候,channel 与 glob-style pattern 匹配,发布消息。
两种订阅模式是维护上述两种数据结构的过程,
// 订阅频道 /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ int pubsubSubscribeChannel(redisClient *c, robj *channel) { struct dictEntry *de; list *clients = NULL; int retval = 0; // redisClient.pubsub_channels 中保存客户端订阅的所有频道,可以查看客户端 // 订阅了多少频道以及客户端是否订阅某个频道 // server.pubsub_channels 中保存所有的频道和每个频道的订阅客户端,可以将 // 消息发布到订阅客户端 // 将频道加入redisClient.pubsub_channels /* Add the channel to the client -> channels hash table */ if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; incrRefCount(channel); // 在服务器负责维护的channel->clients 哈希表中寻找指定的频道 /* Add the client to the channel -> list of clients hash table */ de = dictFind(server.pubsub_channels,channel); // 未找到客户端指定的频道,需要创建 if (de == NULL) { clients = listCreate(); // 将频道加入server.pubsub_channels dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); // 找到客户端指定的频道,直接获取这个频道 } else { clients = dictGetVal(de); } // 将客户端添加到链表的尾部 listAddNodeTail(clients,c); } // 通知客户端 /* Notify the client */ addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength( c->pubsub_patterns)); return retval; } // 订阅模式频道 /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ int pubsubSubscribePattern(redisClient *c, robj *pattern) { int retval = 0; // redisClient.pubsub_patterns 中保存客户端订阅的所有模式频道,可以查看 // 客户端订阅了多少频道以及客户端是否订阅某个频道 // server.pubsub_patterns 中保存所有的模式频道和每个模式频道的订阅客户端 // ,可以将消息发布到订阅客户端 // 未订阅模式频道,插入 if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { retval = 1; pubsubPattern *pat; // 将模式频道加入redisClient.pubsub_patterns listAddNodeTail(c->pubsub_patterns,pattern); incrRefCount(pattern); // 将模式频道加入server.pubsub_patterns pat = zmalloc(sizeof(*pat)); pat->pattern = getDecodedObject(pattern); pat->client = c; listAddNodeTail(server.pubsub_patterns,pat); } // 通知客户端 /* Notify the client */ addReply(c,shared.mbulkhdr[3]); addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength( c->pubsub_patterns)); return retval; }
发布消息的过程则遍历上述两个数据结构(dict 和list),并将消息发布到匹配频道的所有客户端。
// 发布消息 /* Publish a message */ int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; struct dictEntry *de; listNode *ln; listIter li; // 发布消息有两个步骤, // 指定频道的所有订阅者发布消息 // 指定模式频道的所有订阅者发布消息 // // 寻找频道 /* Send to clients listening for that channel */ de = dictFind(server.pubsub_channels,channel); // 向频道所有订阅者发布信息 if (de) { list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { redisClient *c = ln->value; addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); addReplyBulk(c,message); receivers++; } } // // 进行glob-style 模式匹配 /* Send to clients listening to matching channels */ if (listLength(server.pubsub_patterns)) { listRewind(server.pubsub_patterns,&li); channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; // 匹配成功,向订阅者发布消息 if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { addReply(pat->client,shared.mbulkhdr[4]); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++; } } decrRefCount(channel); } return receivers; }
注意, 只要客户端订阅了频道, 除了SUBCRIBE,UNSUBCRIBE,PSUBCRIBE,PSUBCRIBE,就不能执行其他命令。
int processCommand(redisClient *c) { ...... // 在订阅发布模式下,只允许处理SUBSCRIBE 或者UNSUBSCRIBE 命令 // 从下面的检测条件可以看出:只要存在redisClient.pubsub_channels 或者 // redisClient.pubsub_patterns,就代表处于订阅发布模式下 /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0) && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed "in this context"); return REDIS_OK; } ...... }