Linux安全网 - Linux操作系统_Linux 命令_Linux教程_Linux黑客

会员投稿 投稿指南 本期推荐:
搜索:
您的位置: Linux安全网 > Linux编程 > 数据库管理 > » 正文

REDIS AOF的实现

来源: wudongxu 分享至:
Redis AOF 上文我们介绍了Redis的主框架,以及两种持久化大概原理。本文我们将从源码角度分析Redis AOF的相关实现。(本文基于的版本为2.4.2)
1. 相关配置项
首先我们看一下redis.conf里的关于AOF的配置选项:
Appendonly(yes,no):是否开启AOF持久化
Appendfilename(log/appendonly.aof):AOF日志文件
Appendfsync(always,everysec,no):AOF日志文件同步的频率,always代表每次写都进行fsync,everysec每秒钟一次,no不主动fsync,由OS自己来完成。
no-appendfsync-on-rewrite(yes,no):进行rewrite时,是否需要fsync
auto-aof-rewrite-percentage(100):当AOF文件增长了这个比例(这里是增加了一倍),则后台rewrite自动运行
auto-aof-rewrite-min-size(64mb):进行后面rewrite要求的最小AOF文件大小。这两个选项共同决定了后面rewrite进程是否到达运行的时机
注:rewrite是指当AOF很大的时候,通过重写内存的数据来删除原来的AOF文件,生成最新的内存数据的AOF日志,即把当前的结果逆转化为相应的操作命令写到AOF文件中。
通过上面的选项我们可以知道redis的三个AOF处理流程:
 每次更新操作进行的AOF写操作(涉及同步频率)
 Rewrite,当满足auto-aof-rewrite-percentage,auto-aof-rewrite-min-size时后面自动运行rewrite操作。
 Rewrite,当收到bgrewriteaof客户端命令时,马上运行后面rewrite操作
注:当某个key过期的时候也会写AOF,其实它跟第一种很类似,这里就不再介绍。下面我们将分别介绍这三个流程。
在redis的较新版本中(不知道从哪个版本开始)增加了两个新的子进程:
 REDIS_BIO_CLOSE_FILE,负责所有的close file操作
 REDIS_BIO_AOF_FSYNC,负责fsync操作
因为这两个操作都可能会引起阻塞,如果在主线程中完成的话,会影响系统对事件的响应,所以这里统一由相应的线程来完成,每个线程都有一个自己的bio_jobs list,用来保存需要的处理的job任务。其相应的代码在bio.c(线程处理函数为bioProcessBackgroundJobs)里,这两个线程在initServer时创建bioInit()。
注:标准命令格式:如set aaa xiang
*3\r\n$3\r\nset\r\n$3\r\n\aaa\r\n$5\r\n\xiang\r\n
其中*3表示该命令的参数个数,后面的数字表示每个参数的长度。

2. AOF的处理流程
2.1 每次更新操作的AOF写

主要涉及的配置是:Appendfsync,no-appendfsync-on-rewrite。该操作的入口在(redis.c):

void call(redisClient *c) {
    dirty = server.dirty;  //上次的脏数据个数
    c->cmd->proc(c);    //执行命令操作,如果该操作是一个更新操作,则server.dirty会增加
    dirty = server.dirty-dirty; //此次执行导致的脏数据个数
    …
    if (server.appendonly && dirty > 0) //有脏数据并且开启了AOF功能
        feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc); //将数据保存到server.aofbuf
…
}

我们再来看一下feedAppendOnlyFile的实现

void feedAppendOnlyFile(struct redisCommand…{
if (dictid != server.appendseldb){ //当月操作的db与上一次不一样,所以要重新写一个新的select db命令,当rewrite的时候也会把appendseldb置为-1
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.appendseldb = dictid;
}
…
buf = catAppendOnlyGenericCommand(buf,argc,argv); //转换为标准命令格式
server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); //将命令写到aofbuf,这个buf会在serverCron当Appendfsync到满足时fsync到文件
if (server.bgrewritechildpid != -1) //如果有bgrewrite子进程的话,则也必须把该命令保存到bgrewritebuf,以便在子进程结束时,把新的变更追加到rewrite后的文件
    server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
…
}
可以看到到上面AOF操作也只是写到buf中,并没有进行写操作,下面我们将查看该过程。通过查看代码我们可以知道flushAppendOnlyFile()函数是进行真正的写操作。另外我们可以知道该函数会在beforeSleep及serverCron中调用。其中beforeSleep是aeMain循环,每次进行事件处理前必须调用一次:
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}
而serverCron则先判断是否有延迟的flush操作:
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
下面我们来看一下该函数flushAppendOnlyFile:
flushAppendOnlyFile(int force){
…
    if (server.appendfsync == APPENDFSYNC_EVERYSEC) //如果我们设置的fsync频率为每s
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;//判断是否已经有fsync job在等待fsync线程的处理
    if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) {
        if (sync_in_progress) {  //已经有fsync job在等待处理了,先不write也不把该job放到fsync线程处理队列里,如果之前并没有延迟fsync的job,则标志现在已经有这样的情况并且设置这个时间为server.unixtime.如果之前已经有延迟的fsync job,则如果这个延迟小于2s,则直接返回再等待,否则就需要flush了。
            if (server.aof_flush_postponed_start == 0) {
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                return;
            }
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
server.aof_flush_postponed_start = 0;
nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); //write buf这里是nonblock的,此时并没有被fsync到磁盘,会直接返回
    if (server.no_appendfsync_on_rewrite &&
        (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1))
            return; //判断no_appendfsync_on_rewrite条件
    if (server.appendfsync == APPENDFSYNC_ALWAYS) { //如果是APPENDFSYNC_ALWAYS,则必须马上调用fsync,此时主线程就会被阻塞
        aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
        server.lastfsync = server.unixtime;
    } else if ((server.appendfsync == APPENDFSYNC_EVERYSEC &&
                server.unixtime > server.lastfsync)) { //如果没有等待的job则把该job加到fsync线程的job队列里
        if (!sync_in_progress) aof_background_fsync(server.appendfd);
        server.lastfsync = server.unixtime;
    }
}
通过上面的介绍我们可以知道即使Appendfsync设置为alawy,并不是每次执行完一条更新命令就直接写(write+fsync)aof file,这个过程(write+fsync)会被推迟到事件处理流程结束后beforeSleep后进行;如果在beforeSleep时也没有提交给fsync线程,if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) && if (sync_in_progress),则该次的请求会被标志为server.aof_flush_postponed_start,那么在调用serverCron时会再次调用flushAppendOnlyFile,看是否现在能够进行write并且把该job提交给fsync线程。[同样的貌似everysec,也并不是真正的每1s fsync一次]

2.2 后面自动运行rewrite
该操作涉及的配置:auto-aof-rewrite-percentage,auto-aof-rewrite-min-size。
该过程是在serverCron里判断,是满足到达运行bgrewrite的时机:
serverCron(){
if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
} else {
…//判断是否需要rdbSaveBackground,后面运行save rdb
         if (server.bgsavechildpid == -1 &&
             server.bgrewritechildpid == -1 &&
             server.auto_aofrewrite_perc &&
             server.appendonly_current_size > server.auto_aofrewrite_min_size)
         { //当前没有后面rewrite子进程,并且满足了auto_aofrewrite_min_size
            long long base = server.auto_aofrewrite_base_size ?
                            server.auto_aofrewrite_base_size : 1;
            long long growth = (server.appendonly_current_size*100/base) - 100;
            if (growth >= server.auto_aofrewrite_perc) { //判断增长比例
                redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
        }
}}
rewriteAppendOnlyFileBackground()函数也在下面的情况中出现,所以我们在下面一起分析。

2.3 客户端发送bgrewriteaof命令
通过查找readonlyCommandTable表,我们可以看到当客户端发送bgrewriteaof命令过来的时候,服务器调用bgrewriteaofCommand函数来进行处理。该函数会判断当前是否已经有bgrewritechildpid存在,或者bgsavechildpid存在则标志server.aofrewrite_scheduled = 1,需要进行bgrewrite,但不是现在,而是在serverCron处理的时候。否则则直接调用rewriteAppendOnlyFileBackground,创建bgrewrite进程,进行rewrite操作。
rewriteAppendOnlyFileBackground(){
if ((childpid = fork()) == 0) { //后台子进程
        if (server.ipfd > 0) close(server.ipfd); //关闭listen套接字
        if (server.sofd > 0) close(server.sofd);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); //新的aof临时文件名,这个在rewriteAppendOnlyFile里又使用了一个新的tempfile name
        if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) … //rewrite并写到新的tempfile
}else{
        server.aofrewrite_scheduled = 0; //子进程已经被调度
        server.bgrewritechildpid = childpid; //作为判断是否有rewrite子进程的标志
        updateDictResizePolicy(); //此时应disable resize dict
        server.appendseldb = -1; //以使得下一个更新操作先写select db命令
        return REDIS_OK;
}
}
下面我们看一下子进程是如何完成该工作的:
rewriteAppendOnlyFile(char *filename){
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); //打开一个新的tempfile
fp = fopen(tmpfile,"w");
for (j = 0; j < server.dbnum; j++) { //对所有的db库遍历
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; //对每个db先写select db命令
        redisDb *db = server.db+j;
        if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
        if (fwriteBulkLongLong(fp,j) == 0) goto werr; //db id
        while((de = dictNext(di)) != NULL) { //获得该db内的每个dictEntry
            keystr = dictGetEntryKey(de); //获得key值
            o = dictGetEntryVal(de); //获得value值
            initStaticStringObject(key,keystr); //将keystr转换为robj的类型,
            if (o->type == REDIS_STRING) { //下面就是一个一个的判断value的类型,以选择它对应的命令,及encoding方式,我们这里就举REDIS_STRING类型为例
                char cmd[]="*3\r\n$3\r\nSET\r\n"; //先写命令
                if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
                if (fwriteBulkObject(fp,&key) == 0) goto werr;  //写key
                if (fwriteBulkObject(fp,o) == 0) goto werr;  //写value
            }else if(…)
            Else if…
        }
}
    fflush(fp); //fsync文件并close
    aof_fsync(fileno(fp));
fclose(fp);
rename(tmpfile,filename); //将它rename为temp-rewriteaof-bg-%d.aof名字,不明白这里为什么要使用一个新的tmpfile:temp-rewriteaof-%d.aof
}
至此子进程完成rewrite操作。下面我们将看父进程也就是主线程在获得子进程退出时做了些什么操作,父进程在serverCron里通过server.bgrewritechildpid来判断是否需要等待子进程退出的信号。
    if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            if (pid == server.bgsavechildpid) {
                backgroundSaveDoneHandler(statloc); //后台save rdb进程
            } else {
                backgroundRewriteDoneHandler(statloc);  //有后台rewrite子进程退出,调用该函数进行处理
            }
            updateDictResizePolicy();
        }
    }
下面我们看一下backgroundRewriteDoneHandler作了哪些操作:(这里使用了一些技巧解决了一些之前aof存在的缺陷及问题)
backgroundRewriteDoneHandler(int statloc){
if (!bysignal && exitcode == 0) { //判断退出状态
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
            (int)server.bgrewritechildpid);
        newfd = open(tmpfile,O_WRONLY|O_APPEND); //打开子进程rewrite的临时文件
        …
        nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf)); //将bgrewritebuf写的tempfile里
/*当rename的是oldfile是真正的存在,并且该文件没有被open,也即没有其它的进程引用它,那么此时rename它的话会导致该文件的unlink操作,这就会导致主线程阻塞;这里解决的办法是先open(O_NONBLOCK),来增加引用计数,这里不用管是否open成功,因为如果这个file本身就不存在则也就不会有unlink的问题.如果该文件已经被打开过了,则先把它的oldfd=-1,然后rename,这时不会有unlink操作,然后由后台线程进行close操作,因为此时close就会导致unlink阻塞*/
        if (server.appendfd == -1) //如果oldfile文件没有被open,客户端可以通过发送命令来disable aof
           oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK); //增加oldfile的引用计数,防止rename导致的unlink阻塞
        else
           oldfd=-1; //这里置为-1是为了rename失败时close使用,否则在下面该值又会被置为旧的aof fd,然后在后台进行close
        rename(tmpfile,server.appendfilename); //这里的rename已经不会导致unlink
        if (server.appendfd == -1) {
            close(newfd); //如果现在aof disable,则close新的aof file
        } else {
            oldfd = server.appendfd;  //还原oldfd
            server.appendfd = newfd; //设置newfd为新的aof fd
            if (server.appendfsync == APPENDFSYNC_ALWAYS)
                aof_fsync(newfd); //直接fsync阻塞
            else if (server.appendfsync == APPENDFSYNC_EVERYSEC)
                aof_background_fsync(newfd); //将该fsync放到fsync线程队列里
            server.appendseldb = -1; /* Make sure SELECT is re-issued */
            aofUpdateCurrentSize();
            server.auto_aofrewrite_base_size = server.appendonly_current_size;
            sdsfree(server.aofbuf); //清除aofbuf,因为这些已经存在bgrewritebuf里,被写入现在的aof file了
            server.aofbuf = sdsempty();
        }
        if (oldfd != -1) bioCreateBackgroundJob
(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); //后台close线程
        …
}
}
下面的这篇文章也解释了新版本解决了一些旧版本aof存在的问题:http://www.hoterran.info/redis-aof-backgroud-thread

3. 总结
通过这篇文章我们学习了AOF涉及的大多数内容,其实质可以分为两种内容:server接受到一条更新操作时向aofbuf里写这条命令,然后在结束一次事件循环后(beforeSleep),进行fsync操作,此时会根据配置的sync频率来选择是直接(alawy)由主线程fsync还是由fsync线程来sync(everysec);其二就是rewrite操作,该操作是由后台子进程来实现,子进程利用copy-on-write获得与父进程一样的地址空间,它把现在的所有db的所有dict表的内容重新还原为命令的形式写到一个临时的文件里,同时父进程现在必须把新的更新操作缓存到bgrewrietbuf里,当子进程结束的时候(已经把之前的数据写到临时文件里),父进程在serverCron的时候把刚才bgrewritebuf里的内容追加到子进程的临时文件里,然后rename这个临时文件为配置文件指定的文件名。这样就完成了一次rewrite及互换的操作。

Tags:
分享至:
最新图文资讯
1 2 3 4 5 6
验证码:点击我更换图片 理智评论文明上网,拒绝恶意谩骂 用户名:
关于我们 - 联系我们 - 广告服务 - 友情链接 - 网站地图 - 版权声明 - 发展历史