1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| void call(client *c, int flags) { long long dirty, start, duration; int client_old_flags = c->flags;
//将命令发送至监视器节点的客户端(仅当这些命令不是从aof中读取的时候) if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); }
/* Initialization: clear the flags that must be set by the command on * demand, and initialize the array for additional commands propagation. */ //初始化, c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); redisOpArray prev_also_propagate = server.also_propagate; redisOpArrayInit(&server.also_propagate);
/* Call the command. */ dirty = server.dirty; start = ustime(); c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0;
/* When EVAL is called loading the AOF we don't want commands called * from Lua to go into the slowlog or to populate statistics. */ if (server.loading && c->flags & CLIENT_LUA) flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
//当执行命令的是lua脚本的时候,如果命令的flags或者客户端的flags是强制传播行为,那么我们将强制命令调用者去传播lua脚本 if (c->flags & CLIENT_LUA && server.lua_caller) { if (c->flags & CLIENT_FORCE_REPL) server.lua_caller->flags |= CLIENT_FORCE_REPL; if (c->flags & CLIENT_FORCE_AOF) server.lua_caller->flags |= CLIENT_FORCE_AOF; }
//如果需要,将命令加入慢日志,统计命令热度等信息 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); } if (flags & CMD_CALL_STATS) { //计算命令的统计数据 c->lastcmd->microseconds += duration; c->lastcmd->calls++; }
//命令的复制和向AOF传播 if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { int propagate_flags = PROPAGATE_NONE;
//检查命令操作是否改变数据,若是则进行传播向aof追加和复制(主从or集群) if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
//如果客户端强制命令向aof追加写入/节点复制,则重置flags为能影响数据的命令 if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
/* However prevent AOF / replication propagation if the command * implementatino called preventCommandPropagation() or similar, * or if we don't have the call() flags to do so. */ if (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL)) propagate_flags &= ~PROPAGATE_REPL; if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF;
//调用传播方法 /* Call propagate() only if at least one of AOF / replication * propagation is needed. Note that modules commands handle replication * in an explicit way, so we never replicate them automatically. */ if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); }
//恢复旧的复制标志(原因是可能执行命令的递归调用) c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); c->flags |= client_old_flags & (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
/* Handle the alsoPropagate() API to handle commands that want to propagate * multiple separated commands. Note that alsoPropagate() is not affected * by CLIENT_PREVENT_PROP flag. */ if (server.also_propagate.numops) { int j; redisOp *rop;
if (flags & CMD_CALL_PROPAGATE) { for (j = 0; j < server.also_propagate.numops; j++) { rop = &server.also_propagate.ops[j]; int target = rop->target; /* Whatever the command wish is, we honor the call() flags. */ if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; if (target) propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); } } redisOpArrayFree(&server.also_propagate); } server.also_propagate = prev_also_propagate; server.stat_numcommands++; }
|