共计 13401 个字符,预计需要花费 34 分钟才能阅读完成。
这周学习了一下 Redis 事务功能的实现原理,本来是想用一篇文章进行总结的,写完以后发现这块内容比较多,而且多个命令之间又互相依赖,放在一篇文章里一方面篇幅会比较大,另一方面文章组织结构会比较乱,不容易阅读。因此把事务这个模块整理成上下两篇文章进行总结。
这篇文章我们重点分析一下 redis 事务命令中的两个辅助命令:watch 跟 unwatch。
一、redis 事务辅助命令简介
依然从 server.c 文件的命令表中找到相应的命令以及它们对应的处理函数。
//watch,unwatch 两个命令我们把它们叫做 redis 事务辅助命令 | |
{"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0}, | |
{"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0}, |
- watch,用于客户端关注某个 key,当这个 key 的值被修改时,整个事务就会执行失败(注:该命令需要在事务开启前使用)。
- unwatch,用于客户端取消已经 watch 的 key。
用法举例如下:
clientA
127.0.0.1:6379> watch a | |
OK | |
127.0.0.1:6379> multi | |
OK | |
127.0.0.1:6379> set b b | |
QUEUED | |
// 在执行前插入 clientB 的操作如下,事务就会执行失败 | |
127.0.0.1:6379> exec | |
(nil) | |
127.0.0.1:6379> |
clientB
127.0.0.1:6379> set a aa | |
OK | |
127.0.0.1:6379> |
二、redis 事务辅助命令源码分析
在看具体执行函数之前首先了解几个数据结构:
// 每个客户端对象中有一个 watched_keys 链表来保存已经 watch 的 key | |
typedef struct client {list *watched_keys; | |
} | |
// 上述链表中每个节点的数据结构 | |
typedef struct watchedKey {//watch 的 key | |
robj *key; | |
// 指向的 DB,后面细说 | |
redisDb *db; | |
} watchedKey; |
关于事务的几个命令所对应的函数都放在了 multi.c 文件中。
一起看下 watch 命令对应处理函数的源码:
void watchCommand(client *c) { | |
int j; | |
// 如果客户端处于事务状态,则返回错误信息 | |
// 由此可以看出,watch 必须在事务开启前使用 | |
if (c->flags & CLIENT_MULTI) {addReplyError(c,"WATCH inside MULTI is not allowed"); | |
return; | |
} | |
// 依次 watch 客户端的各个参数(这里说明 watch 命令可以一次 watch 多个 key) | |
// 注:0 表示命令本身,所以参数从 1 开始 | |
for (j = 1; j < c->argc; j++) | |
watchForKey(c,c->argv[j]); | |
// 返回结果 | |
addReply(c,shared.ok); | |
} | |
// 具体的 watch 操作,代码较长,慢慢分析 | |
void watchForKey(client *c, robj *key) {list *clients = NULL; | |
listIter li; | |
listNode *ln; | |
// 上面已经提到了数据结构 | |
watchedKey *wk; | |
// 首先判断 key 是否已经被客户端 watch | |
//listRewind 这个函数在发布订阅那篇文章里也有,就是把客户端的 watched_keys 赋值给 li | |
listRewind(c->watched_keys,&li); | |
while((ln = listNext(&li))) {wk = listNodeValue(ln); | |
// 这里一个 wk 节点中有 db,key 两个字段 | |
if (wk->db == c->db && equalStringObjects(key,wk->key)) | |
return; | |
} | |
// 开始 watch 指定 key | |
// 整个 watch 操作保存了两套数据结构,一套是在 db->watched_keys 中的字典结构,如下: | |
clients = dictFetchValue(c->db->watched_keys,key); | |
// 如果是 key 第一次出现,则进行初始化 | |
if (!clients) {clients = listCreate(); | |
dictAdd(c->db->watched_keys,key,clients); | |
incrRefCount(key); | |
} | |
// 把当前客户端加到该 key 的 watch 链表中 | |
listAddNodeTail(clients,c); | |
// 另一套是在 c ->watched_keys 中的链表结构:如下 | |
wk = zmalloc(sizeof(*wk)); | |
// 初始化各个字段 | |
wk->key = key; | |
wk->db = c->db; | |
incrRefCount(key); | |
// 加入到链表最后 | |
listAddNodeTail(c->watched_keys,wk); | |
} |
整个 watch 的数据结构比较复杂,我这里画了一张图方便理解:
简单解释一下上面的图,首先 redis 把每个客户端连接包装成了一个 client 对象,上图中 db,watch_keys 就是其中的两个字段(client 对象里面还有很多其他字段,包括上篇文章中提到的 pub/sub)。
- db 字段指向给该 client 对象分配的储存空间,db 对象 中也含有一个 watched_keys 字段,是字典类型(也就是哈希表),以想要 watch 的 key 做 key,存储的链表则是所有 watch 该 key 的客户端。
- watch_keys 字段则是一个链表类型,每个节点类型为 watch_key,其中包含两个字段,key 表示 watch 的 key,db 则指向了当前 client 对象的 db 字段,如上图。
看完 watch 命令的源码以后,再来看一下 unwatch 命令,如果搞明白了上面提到的两套数据结构,那么看 unwatch 的源码应该会比较容易,毕竟就是删除数据结构中对应的内容。
void unwatchCommand(client *c) {// 取消 watch 所有 key | |
unwatchAllKeys(c); | |
// 修改客户端状态 | |
c->flags &= (~CLIENT_DIRTY_CAS); | |
addReply(c,shared.ok); | |
} | |
// 取消 watch 的 key | |
void unwatchAllKeys(client *c) { | |
listIter li; | |
listNode *ln; | |
// 如果客户端没有 watch 任何 key,则直接返回 | |
if (listLength(c->watched_keys) == 0) return; | |
// 注意这里操作的是链表字段 | |
listRewind(c->watched_keys,&li); | |
while((ln = listNext(&li))) {list *clients; | |
watchedKey *wk; | |
// 遍历取出该客户端 watch 的 key | |
wk = listNodeValue(ln); | |
// 取出所有 watch 了该 key 的客户端,这里则是字典(即哈希表) | |
clients = dictFetchValue(wk->db->watched_keys, wk->key); | |
// 空指针判断 | |
serverAssertWithInfo(c,NULL,clients != NULL); | |
// 从 watch 列表中删除该客户端 | |
listDelNode(clients,listSearchKey(clients,c)); | |
// 如果 key 只有一个当前客户端 watch,则删除 | |
if (listLength(clients) == 0) | |
dictDelete(wk->db->watched_keys, wk->key); | |
// 从当前 client 的 watch 列表中删除该 key | |
listDelNode(c->watched_keys,ln); | |
// 减少引用数 | |
decrRefCount(wk->key); | |
// 释放内存 | |
zfree(wk); | |
} | |
} |
最后我们考虑一下 watch 机制的触发时机,现在我们已经把想要 watch 的 key 加入到了 watch 的数据结构中,可以想到触发 watch 的时机应该是修改 key 的内容时,通知到所有 watch 了该 key 的客户端。
感兴趣的用户可以任意选一个修改命令跟踪一下源码,例如 set 命令,我们发现所有对 key 进行修改的命令最后都会调用 touchWatchedKey()函数,而该函数源码就位于 multi.c 文件中,该函数就是触发 watch 机制的关键函数,源码如下:
// 这里入参 db 就是客户端对象中的 db,上文已经提到,不赘述 | |
void touchWatchedKey(redisDb *db, robj *key) {list *clients; | |
listIter li; | |
listNode *ln; | |
// 保存 watchkey 的字典为空,则返回 | |
if (dictSize(db->watched_keys) == 0) return; | |
// 注意这里操作的是字典(即哈希表)数据结构 | |
clients = dictFetchValue(db->watched_keys, key); | |
// 如果没有客户端 watch 该 key,则返回 | |
if (!clients) return; | |
// 把 client 赋值给 li | |
listRewind(clients,&li); | |
// 遍历 watch 了该 key 的客户端,修改他们的状态 | |
while((ln = listNext(&li))) {client *c = listNodeValue(ln); | |
c->flags |= CLIENT_DIRTY_CAS; | |
} | |
} |
跟我们猜测的一样,就是每当 key 的内容被修改时,则遍历所有 watch 了该 key 的客户端,设置相应的状态为 CLIENT_DIRTY_CAS。
三、redis 事务辅助命令总结
上面就是 redis 事务命令中 watch,unwatch 的实现原理,其中最复杂的应该就是 watch 对应的那两套数据结构了,跟之前的 pub/sub 类似,都是使用链表 + 哈希表的结构存储,另外也是通过修改客户端的状态位 FLAG 来通知客户端。
代码比较多,而且 C ++ 代码看上去会比较费劲,需要慢慢读,反复读。
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2017-11/148624p2.htm
接着上一篇,这篇文章分析一下 Redis 事务操作中 multi,exec,discard 三个核心命令。
一、redis 事务核心命令简介
redis 事务操作核心命令:
// 用于开启事务 | |
{"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0}, | |
// 用来执行事务中的命令 | |
{"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0}, | |
// 用来取消事务 | |
{"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0}, |
在 redis 中,事务并不具有 ACID 的概念,换句话说,redis 中的事务仅仅是保证一系列的命令按顺序一个一个执行,如果中间失败了,并不会进行回滚操作。
使用 redis 事务举例如下:
multi | |
OK | |
set a a | |
QUEUED | |
set b b | |
QUEUED | |
set c c | |
QUEUED | |
exec | |
1) OK | |
2) OK | |
3) OK | |
127.0.0.1:6379> |
二、redis 事务核心命令源码分析
关于事务的几个命令所对应的函数都放在 multi.c 文件中。
首先来看一下 multi 命令,该命令用于标记客户端开启事务状态,因此它做的就是修改客户端状态,代码很简单,如下:
void multiCommand(client *c) {// 如果客户端已经是事务模式,则返回错误提示信息 | |
if (c->flags & CLIENT_MULTI) {addReplyError(c,"MULTI calls can not be nested"); | |
return; | |
} | |
// 设置客户端为事务模式 | |
c->flags |= CLIENT_MULTI; | |
// 返回结果 | |
addReply(c,shared.ok); | |
} |
接下来看下 redis 处理命令逻辑中的一段源码:
这段代码在 server.c 文件中的 processCommand 方法中:
// 如果客户端处于事务状态且当前执行的命令不是 exec,discard,multi 跟 watch 命令中的一个 | |
// 则把当前命令加入一个队列 | |
if (c->flags & CLIENT_MULTI && | |
c->cmd->proc != execCommand && c->cmd->proc != discardCommand && | |
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) | |
{// 加入队列 | |
queueMultiCommand(c); | |
// 返回结果 | |
addReply(c,shared.queued); | |
} else {// 执行当前命令 | |
call(c,CMD_CALL_FULL); | |
c->woff = server.master_repl_offset; | |
if (listLength(server.ready_keys)) | |
handleClientsBlockedOnLists();} |
看入队操作源码前,先来熟悉几个数据结构,redis 会把每个连接的客户端封装成一个 client 对象,该对象中含有大量字段用来保存需要的信息,发布订阅功能也使用对应的字段进行存储,事务当然也不例外,如下:
// 每个客户端对象中有一个 mstate 字段用来保存事务上下文 | |
typedef struct client {multiState mstate;} | |
// 事务包装类型 | |
typedef struct multiState {// 当前事务中需要执行的命令数组 | |
multiCmd *commands; | |
// 需要执行的命令数量 | |
int count; | |
// 需要同步复制的最小数量 | |
int minreplicas; | |
// 同步复制超时时间 | |
time_t minreplicas_timeout; | |
} multiState; | |
// 事务中执行命令的封装类型 | |
typedef struct multiCmd {// 参数 | |
robj **argv; | |
// 参数数量 | |
int argc; | |
// 命令本身 | |
struct redisCommand *cmd; | |
} multiCmd; |
了解了基本的数据结构以后,再来看下入队操作:
void queueMultiCommand(client *c) {// 类型前面有说明 | |
multiCmd *mc; | |
int j; | |
// 扩容,每次扩容一个命令的大小 | |
c->mstate.commands = zrealloc(c->mstate.commands, | |
sizeof(multiCmd)*(c->mstate.count+1)); | |
//c++ 中给数组最后一个元素赋值语法实在是有点难懂... | |
mc = c->mstate.commands+c->mstate.count; | |
// 初始化 mc 各个字段 | |
mc->cmd = c->cmd; | |
mc->argc = c->argc; | |
mc->argv = zmalloc(sizeof(robj*)*c->argc); | |
// 把参数一个一个拷贝过来 | |
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); | |
for (j = 0; j < c->argc; j++) | |
incrRefCount(mc->argv[j]); | |
c->mstate.count++; | |
} |
上面是把命令加入事务命令数组的中的逻辑,由于在执行事务过程中也会执行删除事务的操作,因此在看执行事务逻辑之前我们先看下删除事务的实现原理。
当事务执行完成,执行错误或者客户端想取消当前事务,都会跟 discard 命令有联系,一起看下源码:
void discardCommand(client *c) {// 如果当前客户端没有处于事务状态,则返回错误信息 | |
if (!(c->flags & CLIENT_MULTI)) {addReplyError(c,"DISCARD without MULTI"); | |
return; | |
} | |
// 删除事务 | |
discardTransaction(c); | |
// 返回结果 | |
addReply(c,shared.ok); | |
} | |
// 具体的删除逻辑 | |
void discardTransaction(client *c) {// 释放客户端事务资源 | |
freeClientMultiState(c); | |
// 初始化客户端事务资源 | |
initClientMultiState(c); | |
// 状态位还原 | |
c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC); | |
// 取消已 watch 的 key,该函数上面文章中已经进行过分析,不赘述 | |
unwatchAllKeys(c); | |
} | |
// 释放事务队列中的每个命令 | |
void freeClientMultiState(client *c) { | |
int j; | |
for (j = 0; j < c->mstate.count; j++) { | |
int i; | |
multiCmd *mc = c->mstate.commands+j; | |
// 挨个释放命令的参数 | |
for (i = 0; i < mc->argc; i++) | |
decrRefCount(mc->argv[i]); | |
zfree(mc->argv); | |
} | |
// 最后释放命令本身 | |
zfree(c->mstate.commands); | |
} | |
// 事务相关字段设为初始值 | |
void initClientMultiState(client *c) {c->mstate.commands = NULL; | |
c->mstate.count = 0; | |
} |
到这里,我们已经了解了开启事务模式,把各个命令加入到事务命令执行数组中以及取消事务三个模块的执行原理,最后一起看下事务的执行过程,代码较长,需要慢慢看。
把一系列命令加入到事务命令数组中以后,客户端执行 exec 命令就可以把其中的所有命令挨个执行完成了,分析 exec 命令源码之前,我们应该可以想到 redis 的逻辑应该就是从客户端的事务命令数组中取出所有命令一个一个执行,源码如下:
void execCommand(client *c) { | |
int j; | |
robj **orig_argv; | |
int orig_argc; | |
struct redisCommand *orig_cmd; | |
// 标记是否需要把 MULTI/EXEC 传递到 AOF 或者 slaves 节点 | |
int must_propagate = 0; | |
// 标记当前 redis 节点是否为主节点 | |
int was_master = server.masterhost == NULL; | |
// 如果客户端没有处于事务状态,则返回错误提示信息 | |
if (!(c->flags & CLIENT_MULTI)) {addReplyError(c,"EXEC without MULTI"); | |
return; | |
} | |
// 首先对两个需要终止当前事务的条件进行判断 | |
//1. 当有 WATCH 的 key 被修改时则终止,返回一个 nullmultibulk 对象 | |
//2. 当之前有命令加入事务命令数组出错则终止,例如传入的命令参数数量不对,会返回 execaborterr | |
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr : | |
shared.nullmultibulk); | |
// 删除当前事务信息,前面已经分析过,不赘述 | |
discardTransaction(c); | |
goto handle_monitor; | |
} | |
// 把 watch 的 key 都删除,上面文章已经分析过,不赘述 | |
unwatchAllKeys(c); | |
// 保存当前命令上下文 | |
orig_argv = c->argv; | |
orig_argc = c->argc; | |
orig_cmd = c->cmd; | |
addReplyMultiBulkLen(c,c->mstate.count); | |
// 遍历事务命令数组 | |
for (j = 0; j < c->mstate.count; j++) {// 把事务队列中的命令参数取出赋值给 client,因为命令是在 client 维度执行的 | |
c->argc = c->mstate.commands[j].argc; | |
c->argv = c->mstate.commands[j].argv; | |
c->cmd = c->mstate.commands[j].cmd; | |
// 同步事务操作到 AOF 或者集群中的从节点 | |
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {execCommandPropagateMulti(c); | |
must_propagate = 1; | |
} | |
// 执行具体命令 | |
call(c,CMD_CALL_FULL); | |
// 由于命令可以修改参数的值或者数量,因此重新保存命令上下文 | |
c->mstate.commands[j].argc = c->argc; | |
c->mstate.commands[j].argv = c->argv; | |
c->mstate.commands[j].cmd = c->cmd; | |
} | |
// 恢复原始命令上下文 | |
c->argv = orig_argv; | |
c->argc = orig_argc; | |
c->cmd = orig_cmd; | |
// 事务执行完成,删除该事务,前面已经分析过,不赘述 | |
discardTransaction(c); | |
// 确保 EXEC 会进行传递 | |
if (must_propagate) {int is_master = server.masterhost == NULL; | |
server.dirty++; | |
if (server.repl_backlog && was_master && !is_master) {char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; | |
feedReplicationBacklog(execcmd,strlen(execcmd)); | |
} | |
} | |
//monitor 命令操作 | |
handle_monitor: | |
if (listLength(server.monitors) && !server.loading) | |
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); | |
} |
上面就是事务命令执行的整个逻辑,可以先排除集群跟 AOF 的同步逻辑,专注理解核心逻辑,代码整体逻辑算是比较清晰的,搞明白了前面的几个模块以后,再看执行逻辑就不会太难。
三、redis 事务命令总结
通过上、下两篇文章对 redis 事务各个命令进行了分析,仔细阅读应该可以了解整个事务执行框架,如果有任何问题或者疑惑,欢迎留言评论。
本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-11/148624.htm
这周学习了一下 Redis 事务功能的实现原理,本来是想用一篇文章进行总结的,写完以后发现这块内容比较多,而且多个命令之间又互相依赖,放在一篇文章里一方面篇幅会比较大,另一方面文章组织结构会比较乱,不容易阅读。因此把事务这个模块整理成上下两篇文章进行总结。
这篇文章我们重点分析一下 redis 事务命令中的两个辅助命令:watch 跟 unwatch。
一、redis 事务辅助命令简介
依然从 server.c 文件的命令表中找到相应的命令以及它们对应的处理函数。
//watch,unwatch 两个命令我们把它们叫做 redis 事务辅助命令 | |
{"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0}, | |
{"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0}, |
- watch,用于客户端关注某个 key,当这个 key 的值被修改时,整个事务就会执行失败(注:该命令需要在事务开启前使用)。
- unwatch,用于客户端取消已经 watch 的 key。
用法举例如下:
clientA
127.0.0.1:6379> watch a | |
OK | |
127.0.0.1:6379> multi | |
OK | |
127.0.0.1:6379> set b b | |
QUEUED | |
// 在执行前插入 clientB 的操作如下,事务就会执行失败 | |
127.0.0.1:6379> exec | |
(nil) | |
127.0.0.1:6379> |
clientB
127.0.0.1:6379> set a aa | |
OK | |
127.0.0.1:6379> |
二、redis 事务辅助命令源码分析
在看具体执行函数之前首先了解几个数据结构:
// 每个客户端对象中有一个 watched_keys 链表来保存已经 watch 的 key | |
typedef struct client {list *watched_keys; | |
} | |
// 上述链表中每个节点的数据结构 | |
typedef struct watchedKey {//watch 的 key | |
robj *key; | |
// 指向的 DB,后面细说 | |
redisDb *db; | |
} watchedKey; |
关于事务的几个命令所对应的函数都放在了 multi.c 文件中。
一起看下 watch 命令对应处理函数的源码:
void watchCommand(client *c) { | |
int j; | |
// 如果客户端处于事务状态,则返回错误信息 | |
// 由此可以看出,watch 必须在事务开启前使用 | |
if (c->flags & CLIENT_MULTI) {addReplyError(c,"WATCH inside MULTI is not allowed"); | |
return; | |
} | |
// 依次 watch 客户端的各个参数(这里说明 watch 命令可以一次 watch 多个 key) | |
// 注:0 表示命令本身,所以参数从 1 开始 | |
for (j = 1; j < c->argc; j++) | |
watchForKey(c,c->argv[j]); | |
// 返回结果 | |
addReply(c,shared.ok); | |
} | |
// 具体的 watch 操作,代码较长,慢慢分析 | |
void watchForKey(client *c, robj *key) {list *clients = NULL; | |
listIter li; | |
listNode *ln; | |
// 上面已经提到了数据结构 | |
watchedKey *wk; | |
// 首先判断 key 是否已经被客户端 watch | |
//listRewind 这个函数在发布订阅那篇文章里也有,就是把客户端的 watched_keys 赋值给 li | |
listRewind(c->watched_keys,&li); | |
while((ln = listNext(&li))) {wk = listNodeValue(ln); | |
// 这里一个 wk 节点中有 db,key 两个字段 | |
if (wk->db == c->db && equalStringObjects(key,wk->key)) | |
return; | |
} | |
// 开始 watch 指定 key | |
// 整个 watch 操作保存了两套数据结构,一套是在 db->watched_keys 中的字典结构,如下: | |
clients = dictFetchValue(c->db->watched_keys,key); | |
// 如果是 key 第一次出现,则进行初始化 | |
if (!clients) {clients = listCreate(); | |
dictAdd(c->db->watched_keys,key,clients); | |
incrRefCount(key); | |
} | |
// 把当前客户端加到该 key 的 watch 链表中 | |
listAddNodeTail(clients,c); | |
// 另一套是在 c ->watched_keys 中的链表结构:如下 | |
wk = zmalloc(sizeof(*wk)); | |
// 初始化各个字段 | |
wk->key = key; | |
wk->db = c->db; | |
incrRefCount(key); | |
// 加入到链表最后 | |
listAddNodeTail(c->watched_keys,wk); | |
} |
整个 watch 的数据结构比较复杂,我这里画了一张图方便理解:
简单解释一下上面的图,首先 redis 把每个客户端连接包装成了一个 client 对象,上图中 db,watch_keys 就是其中的两个字段(client 对象里面还有很多其他字段,包括上篇文章中提到的 pub/sub)。
- db 字段指向给该 client 对象分配的储存空间,db 对象 中也含有一个 watched_keys 字段,是字典类型(也就是哈希表),以想要 watch 的 key 做 key,存储的链表则是所有 watch 该 key 的客户端。
- watch_keys 字段则是一个链表类型,每个节点类型为 watch_key,其中包含两个字段,key 表示 watch 的 key,db 则指向了当前 client 对象的 db 字段,如上图。
看完 watch 命令的源码以后,再来看一下 unwatch 命令,如果搞明白了上面提到的两套数据结构,那么看 unwatch 的源码应该会比较容易,毕竟就是删除数据结构中对应的内容。
void unwatchCommand(client *c) {// 取消 watch 所有 key | |
unwatchAllKeys(c); | |
// 修改客户端状态 | |
c->flags &= (~CLIENT_DIRTY_CAS); | |
addReply(c,shared.ok); | |
} | |
// 取消 watch 的 key | |
void unwatchAllKeys(client *c) { | |
listIter li; | |
listNode *ln; | |
// 如果客户端没有 watch 任何 key,则直接返回 | |
if (listLength(c->watched_keys) == 0) return; | |
// 注意这里操作的是链表字段 | |
listRewind(c->watched_keys,&li); | |
while((ln = listNext(&li))) {list *clients; | |
watchedKey *wk; | |
// 遍历取出该客户端 watch 的 key | |
wk = listNodeValue(ln); | |
// 取出所有 watch 了该 key 的客户端,这里则是字典(即哈希表) | |
clients = dictFetchValue(wk->db->watched_keys, wk->key); | |
// 空指针判断 | |
serverAssertWithInfo(c,NULL,clients != NULL); | |
// 从 watch 列表中删除该客户端 | |
listDelNode(clients,listSearchKey(clients,c)); | |
// 如果 key 只有一个当前客户端 watch,则删除 | |
if (listLength(clients) == 0) | |
dictDelete(wk->db->watched_keys, wk->key); | |
// 从当前 client 的 watch 列表中删除该 key | |
listDelNode(c->watched_keys,ln); | |
// 减少引用数 | |
decrRefCount(wk->key); | |
// 释放内存 | |
zfree(wk); | |
} | |
} |
最后我们考虑一下 watch 机制的触发时机,现在我们已经把想要 watch 的 key 加入到了 watch 的数据结构中,可以想到触发 watch 的时机应该是修改 key 的内容时,通知到所有 watch 了该 key 的客户端。
感兴趣的用户可以任意选一个修改命令跟踪一下源码,例如 set 命令,我们发现所有对 key 进行修改的命令最后都会调用 touchWatchedKey()函数,而该函数源码就位于 multi.c 文件中,该函数就是触发 watch 机制的关键函数,源码如下:
// 这里入参 db 就是客户端对象中的 db,上文已经提到,不赘述 | |
void touchWatchedKey(redisDb *db, robj *key) {list *clients; | |
listIter li; | |
listNode *ln; | |
// 保存 watchkey 的字典为空,则返回 | |
if (dictSize(db->watched_keys) == 0) return; | |
// 注意这里操作的是字典(即哈希表)数据结构 | |
clients = dictFetchValue(db->watched_keys, key); | |
// 如果没有客户端 watch 该 key,则返回 | |
if (!clients) return; | |
// 把 client 赋值给 li | |
listRewind(clients,&li); | |
// 遍历 watch 了该 key 的客户端,修改他们的状态 | |
while((ln = listNext(&li))) {client *c = listNodeValue(ln); | |
c->flags |= CLIENT_DIRTY_CAS; | |
} | |
} |
跟我们猜测的一样,就是每当 key 的内容被修改时,则遍历所有 watch 了该 key 的客户端,设置相应的状态为 CLIENT_DIRTY_CAS。
三、redis 事务辅助命令总结
上面就是 redis 事务命令中 watch,unwatch 的实现原理,其中最复杂的应该就是 watch 对应的那两套数据结构了,跟之前的 pub/sub 类似,都是使用链表 + 哈希表的结构存储,另外也是通过修改客户端的状态位 FLAG 来通知客户端。
代码比较多,而且 C ++ 代码看上去会比较费劲,需要慢慢读,反复读。
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2017-11/148624p2.htm
