共计 7731 个字符,预计需要花费 20 分钟才能阅读完成。
Redis 算是缓存界的老大哥了,最近做的事情对 Redis 依赖较多,使用了里面的发布订阅功能,事务功能以及 SortedSet 等数据结构,后面准备好好学习总结一下 Redis 的一些知识点。
先看下 redis 发布订阅的结构:
其中发布者跟订阅者之间通过 channel 进行交互,channel 分为两种模式。
一、redis 发布订阅命令简介
redis 中为发布订阅(pub/sub)功能提供了六个命令,分为两种模式。
- 由 subscribe,unsubscribe 组成,它们是负责订阅有确定名称的 channel,例如 subscribe test 表示订阅名字为 test 的 channel。
- 由 psubscribe,punsubscribe 组成,是负责订阅模糊名字的 channel,例如 psubscribe test* 表示订阅所有以 test 开头的 channel。
最后再加上发布命令 publish 以及查看订阅相关信息的 pubsub 命令组成。
二、redis 发布订阅源码分析
redis 所有的命令及其处理函数都放在了 server.c 文件的开头,从其中找出发布订阅功能相关的命令信息。
{"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
{"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0},
{"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},
这里可以看出创建一条命令需要很多参数,我们这里只需要关注前两个参数,第一个参数表示命令的内容,第二个表示该命令对应的处理函数。
普通模式订阅 subscribe 函数:
该命令支持多个参数,即 subscribe channel1,channel2…
void subscribeCommand(client *c) {
int j;
// 这里挨个处理 subscribe 的参数,因为命令本身被作为参数 0 所以从 1 开始处理后面的参数
for (j = 1; j < c->argc; j++)
// 订阅每个频道
pubsubSubscribeChannel(c,c->argv[j]);
// 这里设置客户端的状态,下面会解释这个状态的作用
c->flags |= CLIENT_PUBSUB;
}
在 server.c 文件中,processCommand 函数是在调用具体命令函数之前的判断逻辑,其中有一段:
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}
这里注释也写的很清楚,就是当 client 处于 pub/sub 上下文时,只接收订阅相关命令以及一个 ping 命令,这就解释了上面 subscribeCommand 函数中为什么要设置客户端 flag 字段。
接下来看下订阅的具体逻辑:
int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
// 把指定 channel 加入到 client 的 pubsub_channels 哈希表中
// 不成功说明已经订阅了该频道
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
// 这里是把该 channel 加入到 client 的哈希表中,引用加 1
incrRefCount(channel);
// 在 server 的发布订阅哈希表中查找指定 channel
de = dictFind(server.pubsub_channels,channel);
// 如果该 channel 还不存在,则创建
if (de == NULL) {
// 创建一个空 list
clients = listCreate();
// 把 channel 加入到 server 的哈希表中,value 就是该 channel 的所有订阅者
dictAdd(server.pubsub_channels,channel,clients);
// 该 channel 引用加 1
incrRefCount(channel);
} else {clients = dictGetVal(de);
}
// 把 client 加入到该 channel 的订阅列表中
listAddNodeTail(clients,c);
}
// 一系列通知客户端的操作
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
总结一下,订阅其实就是把指定 channel 分别加入到 client 跟 server 的 pub/sub 哈希表中,然后在 server 端保存订阅了该 channle 的所有 client 列表,如下图:
下面看一下 publish 发布命令:
例如:publish channelName msg
void publishCommand(client *c) {
// 发布逻辑
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
// 这里是关于集群或者 AOF 的操作
if (server.cluster_enabled)
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
forceCommandPropagation(c,PROPAGATE_REPL);
// 返回给 client 通知了的订阅者数
addReplyLongLong(c,receivers);
}
重点看下发布函数的源码:
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
// 根据上面的订阅源码,这里就是取出订阅该 channel 的所有 clients
de = dictFind(server.pubsub_channels,channel);
if (de) {
// 获取 client 的链表
list *list = dictGetVal(de);
listNode *ln;
listIter li;
// 由 client 链表创建它的迭代器,c++ 代码真是无力吐槽
listRewind(list,&li);
// 遍历所有 client 并发送消息
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
// 开始模糊匹配的逻辑处理,模糊模式使用的是链表而不是哈希表,后面会讲
if (listLength(server.pubsub_patterns)) {
// 创建模糊规则的迭代器 li
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
// 遍历所有的模糊模式,如果匹配成功则发送消息
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
// 判断当前 channel 是否可以匹配模糊规则
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
从上面的 publish 处理函数可以看出每次进行消息发布的时候,都会向普通模式跟模糊模式发布消息,同时也能看出 普通模式跟模糊模式使用的是两种不同的数据结构,下面看下模糊订阅模式。
模糊模式订阅 psubscribe 函数:
//psubscribe 命令对应的处理函数
void psubscribeCommand(client *c) {
int j;
// 挨个订阅 client 指定的 pattern
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
// 修改 client 状态
c->flags |= CLIENT_PUBSUB;
}
int pubsubSubscribePattern(client *c, robj *pattern) {
int retval = 0;
// 判断 client 是否已经订阅该 pattern,这里与普通模式不同,是个链表
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1;
pubsubPattern *pat;
// 把指定 pattern 加入到 client 的 pattern 链表中
listAddNodeTail(c->pubsub_patterns,pattern);
// 引用计数 +1
incrRefCount(pattern);
// 这里是创建一个 pattern 对象,并指向该 client,加入到 server 的 pattern 链表中
// 从这里可以看出,多个 client 订阅同一个 pattern 会创建多个 patter 对象,与普通模式不同
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
}
// 通知客户端
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
通过分析上面的源码可以总结一下模糊订阅中的数据结构,如下图:
注:正如上面提到的,模糊模式中,一个 pat 对象中包含一个 pattern 规则跟一个 client 指针,也就是说当多个 client 模糊订阅同一个 pattern 时同样会为每个 client 都创建一个节点。
普通模式取消订阅 unsubscribe 函数:
取消就相对简单了,说白了就是把上面锁保存在 server 跟 client 端的数据删除。
取消订阅入口
void unsubscribeCommand(client *c) {
// 如果该命令没有参数,则把 channel 全部取消
if (c->argc == 1) {pubsubUnsubscribeAllChannels(c,1);
} else {
int j;
// 迭代取消置顶 channel
for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
// 如果 channel 被全部取消,则修改 client 状态,这样 client 就可以发送其他命令了
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
// 一次性取消订阅所有 channel
int pubsubUnsubscribeAllChannels(client *c, int notify) {
// 取出 client 端所有的 channel
dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
dictEntry *de;
int count = 0;
while((de = dictNext(di)) != NULL) {robj *channel = dictGetKey(de);
// 最终也是挨个取消 channel
count += pubsubUnsubscribeChannel(c,channel,notify);
}
// 如果 client���面都没有订阅,依然返回响应
if (notify && count == 0) {addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReply(c,shared.nullbulk);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
// 释放空间
dictReleaseIterator(di);
return count;
}
// 取消订阅指定 channel
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
// 从 client 中删除指定 channel
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;
// 删除服务端该 channel 中的指定 client
de = dictFind(server.pubsub_channels,channel);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
serverAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
// 如果删除完以后 channel 没有了订阅者,则把 channel 也删除
dictDelete(server.pubsub_channels,channel);
}
}
// 返回 client 响应
if (notify) {addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
// 引用计数 -1
decrRefCount(channel);
return retval;
}
由于模糊模式的取消订阅与普通模式类似,这里就不再贴代码了。
三、redis 发布订阅总结
整个发布订阅的代码比较简单清晰,一个值得思考的问题时普通模式跟模糊模式中分别使用了哈希表跟链表两种结构进行处理,而不是统一的,原因在于模糊模式不能精确匹配,需要遍历挨个判断,而哈希表的优势在于快速定位查找,在需要遍历跟模糊匹配的场景中并不适用。
下面关于 Redis 的文章您也可能喜欢,不妨参考下:
Ubuntu 14.04 下 Redis 安装及简单测试 http://www.linuxidc.com/Linux/2014-05/101544.htm
Redis 主从复制基本配置 http://www.linuxidc.com/Linux/2015-03/115610.htm
CentOS 7 下 Redis 的安装与配置 http://www.linuxidc.com/Linux/2017-02/140363.htm
Ubuntu 14.04 安装 Redis 与简单配置 http://www.linuxidc.com/Linux/2017-01/139075.htm
Ubuntu 16.04 环境中安装 PHP7.0 Redis 扩展 http://www.linuxidc.com/Linux/2016-09/135631.htm
Redis 单机 & 集群离线安装部署 http://www.linuxidc.com/Linux/2017-03/141403.htm
CentOS 7.0 安装 Redis 3.2.1 详细过程和使用常见问题 http://www.linuxidc.com/Linux/2016-09/135071.htm
Ubuntu 16.04 环境中安装 PHP7.0 Redis 扩展 http://www.linuxidc.com/Linux/2016-09/135631.htm
Ubuntu 15.10 下 Redis 集群部署文档 http://www.linuxidc.com/Linux/2016-06/132340.htm
Redis 实战 中文 PDF http://www.linuxidc.com/Linux/2016-04/129932.htm
本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-11/148307.htm