主要涉及的配置是:Appendfsync,no-appendfsync-on-rewrite。该操作的入口在(redis.c):
void call(redisClient *c) { dirty = server.dirty; //上次的脏数据个数 c->cmd->proc(c); //执行命令操作,如果该操作是一个更新操作,则server.dirty会增加 dirty = server.dirty-dirty; //此次执行导致的脏数据个数 … if (server.appendonly && dirty > 0) //有脏数据并且开启了AOF功能 feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc); //将数据保存到server.aofbuf … }
我们再来看一下feedAppendOnlyFile的实现
可以看到到上面AOF操作也只是写到buf中,并没有进行写操作,下面我们将查看该过程。通过查看代码我们可以知道flushAppendOnlyFile()函数是进行真正的写操作。另外我们可以知道该函数会在beforeSleep及serverCron中调用。其中beforeSleep是aeMain循环,每次进行事件处理前必须调用一次:void feedAppendOnlyFile(struct redisCommand…{ if (dictid != server.appendseldb){ //当月操作的db与上一次不一样,所以要重新写一个新的select db命令,当rewrite的时候也会把appendseldb置为-1 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.appendseldb = dictid; } … buf = catAppendOnlyGenericCommand(buf,argc,argv); //转换为标准命令格式 server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); //将命令写到aofbuf,这个buf会在serverCron当Appendfsync到满足时fsync到文件 if (server.bgrewritechildpid != -1) //如果有bgrewrite子进程的话,则也必须把该命令保存到bgrewritebuf,以便在子进程结束时,把新的变更追加到rewrite后的文件 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); … }
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
而serverCron则先判断是否有延迟的flush操作:flushAppendOnlyFile(int force){
…
if (server.appendfsync == APPENDFSYNC_EVERYSEC) //如果我们设置的fsync频率为每s
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;//判断是否已经有fsync job在等待fsync线程的处理
if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) {
if (sync_in_progress) { //已经有fsync job在等待处理了,先不write也不把该job放到fsync线程处理队列里,如果之前并没有延迟fsync的job,则标志现在已经有这样的情况并且设置这个时间为server.unixtime.如果之前已经有延迟的fsync job,则如果这个延迟小于2s,则直接返回再等待,否则就需要flush了。
if (server.aof_flush_postponed_start == 0) {
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
return;
}
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
server.aof_flush_postponed_start = 0;
nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); //write buf这里是nonblock的,此时并没有被fsync到磁盘,会直接返回
if (server.no_appendfsync_on_rewrite &&
(server.bgrewritechildpid != -1 || server.bgsavechildpid != -1))
return; //判断no_appendfsync_on_rewrite条件
if (server.appendfsync == APPENDFSYNC_ALWAYS) { //如果是APPENDFSYNC_ALWAYS,则必须马上调用fsync,此时主线程就会被阻塞
aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
server.lastfsync = server.unixtime;
} else if ((server.appendfsync == APPENDFSYNC_EVERYSEC &&
server.unixtime > server.lastfsync)) { //如果没有等待的job则把该job加到fsync线程的job队列里
if (!sync_in_progress) aof_background_fsync(server.appendfd);
server.lastfsync = server.unixtime;
}
}
通过上面的介绍我们可以知道即使Appendfsync设置为alawy,并不是每次执行完一条更新命令就直接写(write+fsync)aof file,这个过程(write+fsync)会被推迟到事件处理流程结束后beforeSleep后进行;如果在beforeSleep时也没有提交给fsync线程,if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) && if (sync_in_progress),则该次的请求会被标志为server.aof_flush_postponed_start,那么在调用serverCron时会再次调用flushAppendOnlyFile,看是否现在能够进行write并且把该job提交给fsync线程。[同样的貌似everysec,也并不是真正的每1s
fsync一次]
serverCron(){
if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
} else {
…//判断是否需要rdbSaveBackground,后面运行save rdb
if (server.bgsavechildpid == -1 &&
server.bgrewritechildpid == -1 &&
server.auto_aofrewrite_perc &&
server.appendonly_current_size > server.auto_aofrewrite_min_size)
{ //当前没有后面rewrite子进程,并且满足了auto_aofrewrite_min_size
long long base = server.auto_aofrewrite_base_size ?
server.auto_aofrewrite_base_size : 1;
long long growth = (server.appendonly_current_size*100/base) - 100;
if (growth >= server.auto_aofrewrite_perc) { //判断增长比例
redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}}
rewriteAppendOnlyFileBackground()函数也在下面的情况中出现,所以我们在下面一起分析。
rewriteAppendOnlyFileBackground(){
if ((childpid = fork()) == 0) { //后台子进程
if (server.ipfd > 0) close(server.ipfd); //关闭listen套接字
if (server.sofd > 0) close(server.sofd);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); //新的aof临时文件名,这个在rewriteAppendOnlyFile里又使用了一个新的tempfile name
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) … //rewrite并写到新的tempfile
}else{
server.aofrewrite_scheduled = 0; //子进程已经被调度
server.bgrewritechildpid = childpid; //作为判断是否有rewrite子进程的标志
updateDictResizePolicy(); //此时应disable resize dict
server.appendseldb = -1; //以使得下一个更新操作先写select db命令
return REDIS_OK;
}
}
下面我们看一下子进程是如何完成该工作的:
rewriteAppendOnlyFile(char *filename){
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); //打开一个新的tempfile
fp = fopen(tmpfile,"w");
for (j = 0; j < server.dbnum; j++) { //对所有的db库遍历
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; //对每个db先写select db命令
redisDb *db = server.db+j;
if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkLongLong(fp,j) == 0) goto werr; //db id
while((de = dictNext(di)) != NULL) { //获得该db内的每个dictEntry
keystr = dictGetEntryKey(de); //获得key值
o = dictGetEntryVal(de); //获得value值
initStaticStringObject(key,keystr); //将keystr转换为robj的类型,
if (o->type == REDIS_STRING) { //下面就是一个一个的判断value的类型,以选择它对应的命令,及encoding方式,我们这里就举REDIS_STRING类型为例
char cmd[]="*3\r\n$3\r\nSET\r\n"; //先写命令
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr; //写key
if (fwriteBulkObject(fp,o) == 0) goto werr; //写value
}else if(…)
Else if…
}
}
fflush(fp); //fsync文件并close
aof_fsync(fileno(fp));
fclose(fp);
rename(tmpfile,filename); //将它rename为temp-rewriteaof-bg-%d.aof名字,不明白这里为什么要使用一个新的tmpfile:temp-rewriteaof-%d.aof
}
至此子进程完成rewrite操作。下面我们将看父进程也就是主线程在获得子进程退出时做了些什么操作,父进程在serverCron里通过server.bgrewritechildpid来判断是否需要等待子进程退出的信号。
if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
if (pid == server.bgsavechildpid) {
backgroundSaveDoneHandler(statloc); //后台save rdb进程
} else {
backgroundRewriteDoneHandler(statloc); //有后台rewrite子进程退出,调用该函数进行处理
}
updateDictResizePolicy();
}
}
下面我们看一下backgroundRewriteDoneHandler作了哪些操作:(这里使用了一些技巧解决了一些之前aof存在的缺陷及问题)
backgroundRewriteDoneHandler(int statloc){
if (!bysignal && exitcode == 0) { //判断退出状态
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
(int)server.bgrewritechildpid);
newfd = open(tmpfile,O_WRONLY|O_APPEND); //打开子进程rewrite的临时文件
…
nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf)); //将bgrewritebuf写的tempfile里
/*当rename的是oldfile是真正的存在,并且该文件没有被open,也即没有其它的进程引用它,那么此时rename它的话会导致该文件的unlink操作,这就会导致主线程阻塞;这里解决的办法是先open(O_NONBLOCK),来增加引用计数,这里不用管是否open成功,因为如果这个file本身就不存在则也就不会有unlink的问题.如果该文件已经被打开过了,则先把它的oldfd=-1,然后rename,这时不会有unlink操作,然后由后台线程进行close操作,因为此时close就会导致unlink阻塞*/
if (server.appendfd == -1) //如果oldfile文件没有被open,客户端可以通过发送命令来disable aof
oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK); //增加oldfile的引用计数,防止rename导致的unlink阻塞
else
oldfd=-1; //这里置为-1是为了rename失败时close使用,否则在下面该值又会被置为旧的aof fd,然后在后台进行close
rename(tmpfile,server.appendfilename); //这里的rename已经不会导致unlink
if (server.appendfd == -1) {
close(newfd); //如果现在aof disable,则close新的aof file
} else {
oldfd = server.appendfd; //还原oldfd
server.appendfd = newfd; //设置newfd为新的aof fd
if (server.appendfsync == APPENDFSYNC_ALWAYS)
aof_fsync(newfd); //直接fsync阻塞
else if (server.appendfsync == APPENDFSYNC_EVERYSEC)
aof_background_fsync(newfd); //将该fsync放到fsync线程队列里
server.appendseldb = -1; /* Make sure SELECT is re-issued */
aofUpdateCurrentSize();
server.auto_aofrewrite_base_size = server.appendonly_current_size;
sdsfree(server.aofbuf); //清除aofbuf,因为这些已经存在bgrewritebuf里,被写入现在的aof file了
server.aofbuf = sdsempty();
}
if (oldfd != -1) bioCreateBackgroundJob
(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); //后台close线程
…
}
}
下面的这篇文章也解释了新版本解决了一些旧版本aof存在的问题:http://www.hoterran.info/redis-aof-backgroud-thread