1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
| /* This is our timer interrupt, called server.hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: * * - Active expired keys collection (it is also performed in a lazy way on * lookup). * - Software watchdog. * - Update some statistic. * - Incremental rehashing of the DBs hash tables. * - Triggering BGSAVE / AOF rewrite, and handling of terminated children. * - Clients timeout of different kinds. * - Replication reconnection. * - Many more... * * Everything directly called here will be called server.hz times per second, * so in order to throttle execution of things we want to do less frequently * a macro is used: run_with_period(milliseconds) { .... } */
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j; UNUSED(eventLoop); UNUSED(id); UNUSED(clientData);
/* Software watchdog: deliver the SIGALRM that will reach the signal * handler if we don't return here fast enough. */ if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
//更新时间缓存 updateCachedTime();
//每100毫秒运行一次 run_with_period(100) { trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands); trackInstantaneousMetric(STATS_METRIC_NET_INPUT, server.stat_net_input_bytes); trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, server.stat_net_output_bytes); }
/* We have just LRU_BITS bits per object for LRU information. * So we use an (eventually wrapping) LRU clock. * * Note that even if the counter wraps it's not a big problem, * everything will still work but some object will appear younger * to Redis. However for this to happen a given object should never be * touched for all the time needed to the counter to wrap, which is * not likely. * * Note that you can change the resolution altering the * LRU_CLOCK_RESOLUTION define. */ unsigned long lruclock = getLRUClock(); atomicSet(server.lruclock,lruclock);
记录自服务器启动以来的已使用的内存 if (zmalloc_used_memory() > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used_memory();
//每100毫秒运行一次 run_with_period(100) { /* Sample the RSS and other metrics here since this is a relatively slow call. * We must sample the zmalloc_used at the same time we take the rss, otherwise * the frag ratio calculate may be off (ratio of two samples at different times) */ server.cron_malloc_stats.process_rss = zmalloc_get_rss(); server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory(); /* Sampling the allcator info can be slow too. * The fragmentation ratio it'll show is potentically more accurate * it excludes other RSS pages such as: shared libraries, LUA and other non-zmalloc * allocations, and allocator reserved pages that can be pursed (all not actual frag) */ zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated, &server.cron_malloc_stats.allocator_active, &server.cron_malloc_stats.allocator_resident); /* in case the allocator isn't providing these stats, fake them so that * fragmention info still shows some (inaccurate metrics) */ if (!server.cron_malloc_stats.allocator_resident) { /* LUA memory isn't part of zmalloc_used, but it is part of the process RSS, * so we must desuct it in order to be able to calculate correct * "allocator fragmentation" ratio */ size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL; server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory; } if (!server.cron_malloc_stats.allocator_active) server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident; if (!server.cron_malloc_stats.allocator_allocated) server.cron_malloc_stats.allocator_allocated = server.cron_malloc_stats.zmalloc_used; }
//接收一个SIGTERM信好去安全的关闭服务(避免出现不安全操作) if (server.shutdown_asap) { if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); server.shutdown_asap = 0; }
//展示非空数据库的信息 run_with_period(5000) { for (j = 0; j < server.dbnum; j++) { long long size, used, vkeys;
size = dictSlots(server.db[j].dict); used = dictSize(server.db[j].dict); vkeys = dictSize(server.db[j].expires); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); /* dictPrintStats(server.dict); */ } } }
//展示连接的客户端的信息 if (!server.sentinel_mode) { run_with_period(5000) { serverLog(LL_VERBOSE, "%lu clients connected (%lu slaves), %zu bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), zmalloc_used_memory()); } }
//客户端的一些异步操作 clientsCron();
//后台对database的一些运行操作 databasesCron();
//aop重写操作 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); }
//检查是否有在后台运行rdbsava和aop重写操作 if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren()) { int statloc; pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == -1) { serverLog(LL_WARNING,"wait3() returned an error: %s. " "rdb_child_pid = %d, aof_child_pid = %d", strerror(errno), (int) server.rdb_child_pid, (int) server.aof_child_pid); } else if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); } else { if (!ldbRemoveChild(pid)) { serverLog(LL_WARNING, "Warning, detected child with unmatched pid: %ld", (long)pid); } } updateDictResizePolicy(); closeChildInfoPipe(); } } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now. */ for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes, * the given amount of seconds, and if the latest bgsave was * successful or if, in case of an error, at least * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */ if (server.dirty >= sp->changes && server.unixtime-server.lastsave > sp->seconds && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); rdbSaveBackground(server.rdb_filename,rsiptr); break; } }
//在条件满足的情况下触发aof重写操作 if (server.aof_state == AOF_ON && server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } } }
//如果之前的执行周期还未完成,则延迟清楚buffer if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
/* AOF write errors: in this case we have a buffer to flush as well and * clear the AOF error in case of success to make the DB writable again, * however to try every second is enough in case of 'hz' is set to * an higher frequency. */ run_with_period(1000) { if (server.aof_last_write_status == C_ERR) flushAppendOnlyFile(0); }
//释放客户端 freeClientsInAsyncFreeQueue();
//清楚已暂停使用的客户端 clientsArePaused();
/* Replication cron function -- used to reconnect to master, * detect transfer failures, start background RDB transfers and so forth. */ run_with_period(1000) replicationCron();
run_with_period(100) { //如果是集群模式,则每100毫秒执行一次集群模式下相关的事件 if (server.cluster_enabled) clusterCron(); }
run_with_period(100) { //如果sentinel模式存在,则没100毫秒执行一次哨兵模式下的一些事件 if (server.sentinel_mode) sentinelTimer(); }
run_with_period(1000) { //每秒清理一次废弃的sockets链接 migrateCloseTimedoutSockets(); }
/* Start a scheduled BGSAVE if the corresponding flag is set. This is * useful when we are forced to postpone a BGSAVE because an AOF * rewrite is in progress. * * Note: this code must be after the replicationCron() call above so * make sure when refactoring this file to keep this order. This is useful * because we want to give priority to RDB savings for replication. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.rdb_bgsave_scheduled && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) server.rdb_bgsave_scheduled = 0; }
server.cronloops++; return 1000/server.hz; }
|