0%

Redis源码剖析之命令执行核心

以下涉及到的源码均为redis5.0-rc3版本的代码【点击查看官方源码

Redis命令执行

Redis执行命令时都会先建立一个客户端,然后由客户端去和服务器连接,在执行命令(使用lua脚本执行亦如此,只不过lua的客户端是个伪客户端而已)。redis的命令执行中有一个核心部分,就是call()方法,call函数声明如下:

1
void call(client *c, int flags)

可见有两个参数,client是代表客户端(redis的命令执行都是以客户端向服务端发送的方式,请求执行的一方即为客户端);flags是一个特殊标识。

命令执行附加标识

call方法的flags标识可以设置以下情况的值:

flags宏定义值 描述
CMD_CALL_NONE 代表不设标识
CMD_CALL_SLOWLOG 有此标识的时候,会去检查命令的执行速度,以便决策是否加入慢日志中
CMD_CALL_STATS 统计命令被执行的数量
CMD_CALL_PROPAGATE_AOF 如果命令会改变值或客户端强制命令传播,则将命令追加到AOF文件
CMD_CALL_PROPAGATE_REPL 如果命令会改变值或客户端强制命令传播,则将命令传播给服务器的从节点
CMD_CALL_PROPAGATE PROPAGATE_AOF和PROPAGATE_REPL两个标识的别名
CMD_CALL_FULL SLOWLOG,STATS,PROPAGATE三个标识的别名

而在执行传播的行为上会依赖客户端的flags,特殊的情况如下:

  1. 如果客户端的标识为CLIENT_FORCE_AOF或者CLIENT_FORCE_REPL,而call方法设置的flags为CMD_CALL_PROPAGATE_AOF/REPL,这种情况下即便命令没有改变值也会被传播。
  2. 如果客户端的flags被设置成CLIENT_PREVENT_REPL_PROP 或 CLIENT_PREVENT_AOF_PROP,基本执行的命令会影响数据库值,那么也不会被传播给从节点,同样亦不会被追加写入AOF文件。

但是有点需要注明的是,无论客户端的标识设置的是什么,如果call函数的flags没有被设置成CMD_CALL_PROPAGATE_AOF或 CMD_CALL_PROPAGATE_REPL,那么AOF命令追加和从节点复制都将永远不会发生;

客户端(client)的flags可以被如下API修改:

1
2
3
4
forceCommandPropagation(client *c, int flags);
preventCommandPropagation(client *c);
preventCommandAOF(client *c);
preventCommandReplication(client *c);

核心函数CALL的源码

call函数的具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
void call(client *c, int flags) {
long long dirty, start, duration;
int client_old_flags = c->flags;

//将命令发送至监视器节点的客户端(仅当这些命令不是从aof中读取的时候)
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}

/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation. */
//初始化,
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
redisOpArray prev_also_propagate = server.also_propagate;
redisOpArrayInit(&server.also_propagate);

/* Call the command. */
dirty = server.dirty;
start = ustime();
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;

/* When EVAL is called loading the AOF we don't want commands called
* from Lua to go into the slowlog or to populate statistics. */
if (server.loading && c->flags & CLIENT_LUA)
flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);

//当执行命令的是lua脚本的时候,如果命令的flags或者客户端的flags是强制传播行为,那么我们将强制命令调用者去传播lua脚本
if (c->flags & CLIENT_LUA && server.lua_caller) {
if (c->flags & CLIENT_FORCE_REPL)
server.lua_caller->flags |= CLIENT_FORCE_REPL;
if (c->flags & CLIENT_FORCE_AOF)
server.lua_caller->flags |= CLIENT_FORCE_AOF;
}

//如果需要,将命令加入慢日志,统计命令热度等信息
if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
if (flags & CMD_CALL_STATS) {
//计算命令的统计数据
c->lastcmd->microseconds += duration;
c->lastcmd->calls++;
}

//命令的复制和向AOF传播
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;

//检查命令操作是否改变数据,若是则进行传播向aof追加和复制(主从or集群)
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

//如果客户端强制命令向aof追加写入/节点复制,则重置flags为能影响数据的命令
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;

/* However prevent AOF / replication propagation if the command
* implementatino called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so. */
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;

//调用传播方法
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}

//恢复旧的复制标志(原因是可能执行命令的递归调用)
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);

/* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
if (server.also_propagate.numops) {
int j;
redisOp *rop;

if (flags & CMD_CALL_PROPAGATE) {
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
int target = rop->target;
/* Whatever the command wish is, we honor the call() flags. */
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}
}
redisOpArrayFree(&server.also_propagate);
}
server.also_propagate = prev_also_propagate;
server.stat_numcommands++;
}