redis是通过fork子进程来做aofrewrite,同时为了保证aof的连续性,父进程把aofrewrite期间的写命令缓存起来,等收割完子进程之后再追加到新的aof文件。如果期间写入量较大的话收割时就要有大量的写磁盘操作,造成性能下降。

为了提高aofrewrite效率,redis通过在父子进程间建立管道,把aofrewrite期间的写命令通过管道同步给子进程,追加写盘的操作也就转交给了子进程。

上图是aofrewrite的流程,标注为基本的函数调用关系。

  • 1 - 首先,通过命令或是事件触发aofrewrite,调用rewriteAppendOnlyFileBackground()函数

    • 该函数会fork出一个子进程
  • 2 - 父进程记录子进程的pid并开始缓存写命令

    • 当pid不为-1时就会执行aofRewriteBufferAppend()把写命令缓存起来
  • 3 - 子进程调用rewriteAppendOnlyFile(tmpfile)函数创建新的aof文件

    • 调用rewriteAppendOnlyFileRio()函数遍历redis把所有key-value以命令的方式写入新aof文件
    • 完成后调用exitFromChild(0)退出
  • 4 - 子进程退出后父进程调用backgroundRewriteDoneHandler()来处理

    • 调用aofRewriteBufferWrite()函数把积攒的写命令缓存写入子进程创建的临时aof文件
    • 最后rename()用新的aof文件替换掉原来的aof文件

在aofrewrite过程中,如果redis本身数据量较大子进程执行时间较长,或者写入流量较高,就会导致aof-rewrite-buffer积攒较多,父进程就要进行大量写磁盘操作,这对于redis来说显然是不够高效的。

为了提高aofrewrite效率,redis使用pipe来优化,下图中红色标注即为优化的部分:

image.png

优化点:

  • 1 - 父进程建立管道

    • 共三条管道,分别为一条数据管道,和两条控制管道
    • 数据管道用来传输数据,控制管道用来做父子进程交互,控制何时停止数据传输
    • 注册写事件aofChildWriteDiffData()向数据管道写数据
  • 3 - 子进程从管道读数据

    • 子进程在生成新aof文件时会定期调用aofReadDiffFromParent()从管道读取数据,并缓存下来
  • 4 - 父子进程交互

    • 子进程生成新aof文件后会通过控制管道向父进程发送”!”,发起停止数据传输请求
    • 父进程收到停止信号后激活读事件处理函数aofChildPipeReadable(),设置server.aof_stop_sending_diff=1停止数据传输,并向子进程回复”!”,表示同意停止
    • 子进程收到父进程的应答,调用rioWrite()把积攒的数据追加到新的aof文件,最后退出

细心的读者会发现,aofRewriteBufferAppend()和aofRewriteBufferWrite()这一对函数仍然保留,父进程还是要把aof-rewrite-buffer写盘吗?是的,这是因为父子进程是异步结构,父子间总会有那么一点代沟,aof-rewrite-buffer还是需要保留的,不过这个时候父进程写盘的数据量就很小了,几乎可以忽略。

aofrewrite的触发条件

    1. 执行bgrewriteaof命令。
    1. serverCron时间事件检测到aof文件大小超限。

命令的触发不必详述,主要来看下serverCron的触发:

也就是说aof文件大小超过了server.aof_rewrite_min_size,并且增长率大于server.aof_rewrite_perc时就会触发(增长率计算的基数server.aof_rewrite_base_size是上次aofrewrite完之后aof文件的大小)。

目前云redis设置server.aof_rewrite_min_size为内存规格的1/4,server.aof_rewrite_perc为100。

管道建立

aofrewrite触发之后进入rewriteAppendOnlyFileBackground()函数:

  1. pid_t childpid;
  2. long long start;
  3. if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
  4. if (aofCreatePipes() != C_OK) return C_ERR;
  5. openChildInfoPipe();
  6. start = ustime();
  7. if ((childpid = fork()) == 0) {
  8. ...

OK,重点来了,在fork之前调用了aofCreatePipes()函数来创建管道(openChildInfoPipe()函数只是用来收集子进程copy-on-write用到的内存,就不详细展开了):

  1. int aofCreatePipes(void) {
  2. int fds[6] = {-1, -1, -1, -1, -1, -1};
  3. int j;
  4. if (pipe(fds) == -1) goto error; /* parent -> children data. 父进程向子进程写数据的管道*/
  5. if (pipe(fds+2) == -1) goto error; /* children -> parent ack. 子进程向父进程发起停止传输的控制管道*/
  6. if (pipe(fds+4) == -1) goto error; /* parent -> children ack. 父进程向子进程回复的控制管道*/
  7. /* Parent -> children data is non blocking. */
  8. if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
  9. if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
  10. if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
  11. //注册读事件处理函数,负责处理子进程要求停止数据传输的消息
  12. server.aof_pipe_write_data_to_child = fds[1]; //父进程向子进程写数据的fd
  13. server.aof_pipe_read_data_from_parent = fds[0]; //子进程从父进程读数据的fd
  14. server.aof_pipe_write_ack_to_parent = fds[3]; //子进程向父进程发起停止消息的fd
  15. server.aof_pipe_read_ack_from_child = fds[2]; //父进程从子进程读取停止消息的fd
  16. server.aof_pipe_write_ack_to_child = fds[5]; //父进程向子进程回复消息的fd
  17. server.aof_pipe_read_ack_from_parent = fds[4]; //子进程从父进程读取回复消息的fd
  18. server.aof_stop_sending_diff = 0; //是否停止管道传输标记位
  19. return C_OK;
  20. ...
  21. }

父进程与管道传输

管道建立起来了我们再来看看fork之后父进程和子进程如何工作,首先看下父进程:

  1. /* Parent */
  2. server.stat_fork_time = ustime()-start;
  3. server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
  4. latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
  5. ...
  6. server.aof_rewrite_scheduled = 0;
  7. server.aof_rewrite_time_start = time(NULL);
  8. server.aof_child_pid = childpid;
  9. updateDictResizePolicy();
  10. /* We set appendseldb to -1 in order to force the next call to the
  11. * feedAppendOnlyFile() to issue a SELECT command, so the differences
  12. * accumulated by the parent into server.aof_rewrite_buf will start
  13. * with a SELECT statement and it will be safe to merge. */
  14. server.aof_selected_db = -1;
  15. ...

父进程这里做的事情并不多,主要是信息的记录和一些标记位设置

  • 记录fork消耗的时间,info命令可以查看上次fork的耗时latest_fork_usec,单位微秒
  • 设置server.aof_rewrite_scheduled = 0,防止serverCron再次触发aofrewrite
  • 设置server.aof_child_pid为子进程pid,其不为-1时redis才会向aof-rewrite-buffer缓存写命令
  • updateDictResizePolicy()禁止所有hash数据结构resize,这是为了尽量避免子进程copy-on-write进行内存拷贝
  • 设置server.aof_selected_db = -1,下一次的aof日志会强制加上select,这是为了保证命令执行到正确的db

接下来就是缓存写命令和管道通信部分了,入口是在feedAppendOnlyFile():

  1. void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
  2. ...
  3. if (server.aof_child_pid != -1)
  4. aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
  5. ...
  6. }

server.aof_child_pid在这时就生效了,开始缓存写命令:

  1. void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
  2. listNode *ln = listLast(server.aof_rewrite_buf_blocks);
  3. while(len) {
  4. /* If we already got at least an allocated block, try appending
  5. * at least some piece into it. */
  6. if (block) {
  7. unsigned long thislen = (block->free < len) ? block->free : len;
  8. if (thislen) { /* The current block is not already full. */
  9. memcpy(block->buf+block->used, s, thislen);
  10. block->used += thislen;
  11. block->free -= thislen;
  12. s += thislen;
  13. len -= thislen;
  14. }
  15. }
  16. if (len) { /* First block to allocate, or need another block. */
  17. int numblocks;
  18. block = zmalloc(sizeof(*block));
  19. block->free = AOF_RW_BUF_BLOCK_SIZE;
  20. block->used = 0;
  21. listAddNodeTail(server.aof_rewrite_buf_blocks,block);
  22. /* Log every time we cross more 10 or 100 blocks, respectively
  23. * as a notice or warning. */
  24. numblocks = listLength(server.aof_rewrite_buf_blocks);
  25. int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
  26. LL_NOTICE;
  27. serverLog(level,"Background AOF buffer size: %lu MB",
  28. aofRewriteBufferSize()/(1024*1024));
  29. }
  30. }
  31. }
  32. if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
  33. aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
  34. AE_WRITABLE, aofChildWriteDiffData, NULL);
  35. }
  36. }

redis用链表server.aof_rewrite_buf_blocks来缓存aofrewrite期间的写命令,链表的每个节点最大10MB;重点是在最后的写事件注册,当server.aof_pipe_write_data_to_child这个fd没有注册事件时,就注册写事件函数aofChildWriteDiffData:

子进程和管道传输

接下来看下子进程:

  1. ...
  2. /* Child */
  3. char tmpfile[256];
  4. closeListeningSockets(0);
  5. redisSetProcTitle("redis-aof-rewrite");
  6. snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
  7. if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
  8. ...
  9. exitFromChild(0);
  10. } else {
  11. exitFromChild(1);
  12. }
  13. ...

子进程首先关闭监听端口,然后就进入rewriteAppendOnlyFile()函数:

  1. int rewriteAppendOnlyFile(char *filename) {
  2. ...
  3. snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
  4. fp = fopen(tmpfile,"w");
  5. ...
  6. server.aof_child_diff = sdsempty();
  7. ...
  8. if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
  9. ...

首先打开一个临时aof文件,并初始化server.aof_child_diff缓存准备从父进程读数据,然后就调用rewriteAppendOnlyFileRio()来写aof文件和读取管道中的数据:

  1. int rewriteAppendOnlyFileRio(rio *aof) {
  2. ...
  3. if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
  4. processed = aof->processed_bytes;
  5. aofReadDiffFromParent();
  6. }
  7. ...
  8. }

在遍历redis把key-value写入新aof文件过程中,新aof文件每增长10K就会调用aofReadDiffFromParent()从管道中读取数据追加到server.aof_child_diff:

  1. ssize_t aofReadDiffFromParent(void) {
  2. char buf[65536]; /* Default pipe buffer size on most Linux systems. */
  3. ssize_t nread, total = 0;
  4. while ((nread =
  5. read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
  6. server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
  7. total += nread;
  8. }
  9. return total;
  10. }

停止管道传输

子进程在遍历完redis生成好新的aof文件之后就要准备退出了,那么退出前要先告诉父进程停止管道传输,依然回到rewriteAppendOnlyFile()函数来看:

  1. int rewriteAppendOnlyFile(char *filename) {
  2. ...
  3. /* Ask the master to stop sending diffs. */
  4. if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
  5. if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
  6. goto werr;
  7. /* We read the ACK from the server using a 10 seconds timeout. Normally
  8. * it should reply ASAP, but just in case we lose its reply, we are sure
  9. * the child will eventually get terminated. */
  10. if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
  11. byte != '!') goto werr;
  12. serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
  13. /* Read the final diff if any. */
  14. aofReadDiffFromParent();
  15. /* Write the received diff to the file. */
  16. "Concatenating %.2f MB of AOF diff received from parent.",
  17. (double) sdslen(server.aof_child_diff) / (1024*1024));
  18. if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
  19. goto werr;
  20. /* Make sure data will not remain on the OS's output buffers */
  21. if (fflush(fp) == EOF) goto werr;
  22. if (fsync(fileno(fp)) == -1) goto werr;
  23. if (fclose(fp) == EOF) goto werr;
  24. ...
  25. }

这里写的就很直接了:

  • 使用write向控制管道写入”!”发起停止请求,然后读取返回结果,超时时间为10s
  • 超时就goto werr异常退出,10s内读取到”!”就继续
  • 再次调用aofReadDiffFromParent()从数据管道读取数据确保管道中没有遗留
  • 最后rioWrite()把server.aof_child_diff积攒的数据追加到新的aof文件

那么父进程是如何处理”!”的呢,还记得之前注册的读事件aofChildPipeReadable()吧,子进程向控制管道发送”!”就会激活:

很简单,标记server.aof_stop_sending_diff=1,给子进程回复”!”,并且把自己从事件循环删掉,自此父子进程间通信完成,剩下的就是父进程等待子进程退出进行收尾工作。

父进程收尾

serverCron()中会调用wait3()来收割子进程:

  1. if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
  2. ldbPendingChildren())
  3. {
  4. int statloc;
  5. pid_t pid;
  6. if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
  7. int exitcode = WEXITSTATUS(statloc);
  8. int bysignal = 0;
  9. if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
  10. if (pid == -1) {
  11. serverLog(LL_WARNING,"wait3() returned an error: %s. "
  12. "rdb_child_pid = %d, aof_child_pid = %d",
  13. strerror(errno),
  14. (int) server.rdb_child_pid,
  15. (int) server.aof_child_pid);
  16. } else if (pid == server.rdb_child_pid) {
  17. backgroundSaveDoneHandler(exitcode,bysignal);
  18. if (!bysignal && exitcode == 0) receiveChildInfo();
  19. } else if (pid == server.aof_child_pid) {
  20. backgroundRewriteDoneHandler(exitcode,bysignal);
  21. if (!bysignal && exitcode == 0) receiveChildInfo();
  22. } else {
  23. if (!ldbRemoveChild(pid)) {
  24. serverLog(LL_WARNING,
  25. "Warning, detected child with unmatched pid: %ld",
  26. (long)pid);
  27. }
  28. }
  29. updateDictResizePolicy();
  30. closeChildInfoPipe();
  31. }

如果收割到的pid是server.aof_child_pid就进入backgroundRewriteDoneHandler():

  1. void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
  2. ...
  3. /* Flush the differences accumulated by the parent to the
  4. * rewritten AOF. */
  5. latencyStartMonitor(latency);
  6. snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
  7. (int)server.aof_child_pid);
  8. newfd = open(tmpfile,O_WRONLY|O_APPEND);
  9. if (newfd == -1) {
  10. serverLog(LL_WARNING,
  11. "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
  12. goto cleanup;
  13. }
  14. if (aofRewriteBufferWrite(newfd) == -1) {
  15. serverLog(LL_WARNING,
  16. "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
  17. close(newfd);
  18. goto cleanup;
  19. }
  20. latencyEndMonitor(latency);
  21. latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);

首先会打开子进程生成的新aof文件,并调用aofRewriteBufferWrite()把server.aof_rewrite_buf_blocks中剩余的数据追加到新aof文件。

  1. /* Rename the temporary file. This will not unlink the target file if
  2. * it exists, because we reference it with "oldfd". */
  3. latencyStartMonitor(latency);
  4. if (rename(tmpfile,server.aof_filename) == -1) {
  5. serverLog(LL_WARNING,
  6. "Error trying to rename the temporary AOF file %s into %s: %s",
  7. tmpfile,
  8. server.aof_filename,
  9. strerror(errno));
  10. close(newfd);
  11. if (oldfd != -1) close(oldfd);
  12. goto cleanup;
  13. }
  14. latencyEndMonitor(latency);
  15. latencyAddSampleIfNeeded("aof-rename",latency);

之后把新aof文件rename为server.aof_filename记录的文件名。

  1. /* Asynchronously close the overwritten AOF. */
  2. if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);

使用bio后台线程来close原来的aof文件。

  1. cleanup:
  2. aofClosePipes();
  3. aofRewriteBufferReset();
  4. aofRemoveTempFile(server.aof_child_pid);
  5. server.aof_child_pid = -1;
  6. server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
  7. server.aof_rewrite_time_start = -1;
  8. /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */

本文介绍了redis的aofrewrite基础实现以及利用pipe的优化,云Redis4.0已经上线,欢迎使用。