Logger子系统在env->create阶段由toku_logger_create进行初步的初始化工作。代码片段如下:

  1. int toku_logger_create (TOKULOGGER *resultp) {
  2. TOKULOGGER CALLOC(result);
  3. if (result==0) return get_error_errno();
  4. result->is_open=false;
  5. result->write_log_files = true;
  6. result->trim_log_files = true;
  7. result->directory=0;
  8. result->lg_max = 100<<20; // 100MB default
  9. // lsn is uninitialized
  10. result->inbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
  11. result->outbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
  12. // written_lsn is uninitialized
  13. // fsynced_lsn is uninitialized
  14. result->last_completed_checkpoint_lsn = ZERO_LSN;
  15. // next_log_file_number is uninitialized
  16. // n_in_file is uninitialized
  17. toku_logfilemgr_create(&result->logfilemgr);
  18. *resultp=result;
  19. ml_init(&result->input_lock);
  20. toku_mutex_init(&result->output_condition_lock, NULL);
  21. toku_cond_init(&result->output_condition, NULL);
  22. result->output_is_available = true;
  23. return 0;
  24. }

Logger子系统在env->open阶段,调用toku_logger_open函数进行进一步的初始化。函数toku_logger_opentoku_logger_open_with_last_xid的简单封装。Env->open最终调用toku_logger_open_with_last_xid解析redo log file获取下一个可用的lsn,下一个可用的redo log file的序列号index并打开相应redo log file。在env->open时,调用toku_logger_open_with_last_xid的最后一个参数last_xid为TXNID_NONE,表示由toku_logger_open_with_last_xid指定事务子系统初始化时最新的txnid。 解析redo log file的过程在函数toku_logfilemgr_init实现,依次解析redo log目录下的每一个文件名符合特定格式的redo log file,从中读取最后一个log entry的lsn保存下来。Redo log文件名遵循”log$index.tokulog$version”格式,$index是64位无符号整数表示的redo log file的序列号index,$version是32位无符号整数表示版本信息。 如果最新的redo log file最后一个log entry是LT_shutdown(表示正常关闭不需要进行recovery),那么把对应的txnid记录在last_xid_if_clean_shutdown变量,作为TokuDB事务子系统初始化时最新的txnid。在解析redo log file的时候,还会用最新的redo log file的最后一个log entry的lsn更新logger的lsn,written_lsn,fsynced_lsn。接着,toku_logger_find_next_unused_log_file找到下一个可用的redo log文件的序列号,并创建新的redo log file。每个redo log file最开始的12个字节是固定的,首先是8个字节的magic字符串“tokulogg“,紧接着4个字节是log的版本信息。代码片段如下:

  1. int
  2. toku_logger_open_with_last_xid(const char *directory /* redo log dir */, TOKULOGGER logger, TXNID last_xid) {
  3. if (logger->is_open) return EINVAL;
  4. TXNID last_xid_if_clean_shutdown = TXNID_NONE;
  5. r = toku_logfilemgr_init(logger->logfilemgr, directory, &last_xid_if_clean_shutdown);
  6. if ( r!=0 )
  7. return r;
  8. logger->lsn = toku_logfilemgr_get_last_lsn(logger->logfilemgr);
  9. logger->written_lsn = logger->lsn;
  10. logger->fsynced_lsn = logger->lsn;
  11. logger->inbuf.max_lsn_in_buf = logger->lsn;
  12. logger->outbuf.max_lsn_in_buf = logger->lsn;
  13. r = open_logdir(logger, directory);
  14. if (r!=0) return r;
  15. long long nexti;
  16. r = toku_logger_find_next_unused_log_file(logger->directory, &nexti);
  17. if (r!=0) return r;
  18. logger->next_log_file_number = nexti;
  19. r = open_logfile(logger);
  20. if (r!=0) return r;
  21. if (last_xid == TXNID_NONE) {
  22. last_xid = last_xid_if_clean_shutdown;
  23. }
  24. toku_txn_manager_set_last_xid_from_logger(logger->txn_manager, last_xid);
  25. logger->is_open = true;
  26. return 0;
  27. }

到这里,TokuDB的logger子系统就初始化好了,在处理DDL或者DML或者TokuDB执行checkpoint的时候,都需要先写rollback(undo)log,redo log。Rollback在之前的月报MySQL · TokuDB · 事务子系统和 MVCC 实现 谈到过,这里不再赘述。

TokuDB的logger有两个buffer:inbuf和outbuf。Inbuf表示接收log entry的buffer,而outbuf表示写到redo log文件的buffer。这两个buffer是如何切换的呢?当inbuf满或者inbuf里的free space无法满足新来的log entry的存储需求时,需要触发redo buffer flush过程,即将inbuf日志flush到redo log文件里。这个过程比较耗时,而且很可能inbuf里面还有free space,只是由于当前这个log entry比较大而无法满足存储需求,TokuDB实现了output permission机制,使得需要free space的请求等待在output permission的条件变量上,其他client thread上下文的redo log请求可以继续使用inbuf写日志。等待上一个flush完成后(即条件变量被signaled),检查当前inbuf的free space,如果可以满足这条redo log entry就直接返回,说明别的线程帮我们flush好了。如果free space不够,需要在当前线程的上下文去做flush,实际上是把inbuf和outbuf互换,然后把outbuf写到redo log文件中。写完之后适当调整inbuf的大小使之满足当前redo log entry请求。最后唤醒等待inbuf提供足够空间的线程(阻塞在output permission上的线程)。简而言之,把redo log buffer拆分成inbuf和outbuf,最重要的作用是在redo log flush的时候不会阻塞新的log entry写入,感兴趣的朋友可以看一下函数toku_logger_maybe_fsync的实现,这里就不一一展开了。函数toku_logger_make_space_in_inbuf的代码片段如下:

  1. void
  2. toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed)
  3. {
  4. if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) {
  5. return;
  6. }
  7. ml_unlock(&logger->input_lock);
  8. LSN fsynced_lsn;
  9. // 等待前面的redo log flush完成
  10. grab_output(logger, &fsynced_lsn);
  11. ml_lock(&logger->input_lock);
  12. if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) {
  13. // 其他线程帮助flush redo log,直接返回。
  14. release_output(logger, fsynced_lsn);
  15. return;
  16. }
  17. if (logger->inbuf.n_in_buf > 0) {
  18. // 交换inbuf,outbuf
  19. swap_inbuf_outbuf(logger);
  20. // 把outbuf里的日志写回
  21. write_outbuf_to_logfile(logger, &fsynced_lsn);
  22. }
  23. // 适当调整inbuf大小
  24. if (n_bytes_needed > logger->inbuf.buf_size) {
  25. int new_size = max_int(logger->inbuf.buf_size * 2, n_bytes_needed);
  26. assert(new_size < (1<<30)); // inbuf必须小于1G
  27. XREALLOC_N(new_size, logger->inbuf.buf);
  28. logger->inbuf.buf_size = new_size;
  29. }
  30. // 唤醒等待flush redo log的线程
  31. release_output(logger, fsynced_lsn);

TokuDB崩溃恢复过程

前面提到MySQL重启过程中会调用db_env_create创建env实例,进行参数设置和callback设置,然后调用env->open来做进一步初始化。同样env->open也是一个回调函数,它是在db_env_create设置的,指向env_open函数。 在env_open里调用validate_env判断是否需要进行recovery。validate_env函数返回时表明这个env是否是emptyenv (env目录为空,且不存在rollback文件,不存在数据文件),是否是newnev (env目录不存在),是否是emptyrollback (env目录存在,rollback文件为空)。 如果满足条件 !emptyenv && !new_env && is_set(DB_RECOVERY) 就尝试进行recovery。简单地说recovery的条件就是env存在,log_dir存在,redo log存在。 判断是否真正做recovery的函数是tokuft_needs_recovery。代码如下:

  1. int tokuft_needs_recovery(const char *log_dir, bool ignore_log_empty) {
  2. int needs_recovery;
  3. int r;
  4. TOKULOGCURSOR logcursor = NULL;
  5. r = toku_logcursor_create(&logcursor, log_dir);
  6. if (r != 0) {
  7. needs_recovery = true; goto exit;
  8. }
  9. struct log_entry *le;
  10. le = NULL;
  11. r = toku_logcursor_last(logcursor, &le);
  12. if (r == 0) {
  13. needs_recovery = le->cmd != LT_shutdown;
  14. }
  15. else {
  16. needs_recovery = !(r == DB_NOTFOUND && ignore_log_empty);
  17. }
  18. exit:
  19. if (logcursor) {
  20. r = toku_logcursor_destroy(&logcursor);
  21. assert(r == 0);
  22. }
  23. return needs_recovery;
  24. }

tokuft_needs_recovery尝试读取最后一条redo log entry,如果不是LT_shutdown,就需要真正做recovery。读取最后一条redo log entry的代码片段如下:

如果需要做recovery,TokuDB会调用do_recovery进行恢复,恢复的时候先做redo log apply,然后进行undo rollback。代码片段如下:

  1. static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_dir) {
  2. r = toku_logcursor_create(&logcursor, log_dir);
  3. assert(r == 0);
  4. scan_state_init(&renv->ss);
  5. for (unsigned i=0; 1; i++) {
  6. // 读取前一个log entry,第一次读的是最后一个log entry
  7. le = NULL;
  8. r = toku_logcursor_prev(logcursor, &le);
  9. if (r != 0) {
  10. if (r == DB_NOTFOUND)
  11. break;
  12. rr = DB_RUNRECOVERY;
  13. goto errorexit;
  14. }
  15. // backward阶段处理log entry
  16. assert(renv->ss.ss == BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END ||
  17. renv->ss.ss == BACKWARD_NEWER_CHECKPOINT_END);
  18. logtype_dispatch_assign(le, toku_recover_backward_, r, renv);
  19. if (r != 0) {
  20. rr = DB_RUNRECOVERY;
  21. goto errorexit;
  22. }
  23. if (renv->goforward)
  24. break;
  25. }
  26. for (unsigned i=0; 1; i++) {
  27. // forward阶段处理log entry,首先处理的是checkpoint begin的那个log entry
  28. assert(renv->ss.ss == FORWARD_BETWEEN_CHECKPOINT_BEGIN_END ||
  29. renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END);
  30. logtype_dispatch_assign(le, toku_recover_, r, renv);
  31. if (r != 0) {
  32. rr = DB_RUNRECOVERY;
  33. goto errorexit;
  34. }
  35. // 读取下一个log entry
  36. le = NULL;
  37. r = toku_logcursor_next(logcursor, &le);
  38. if (r != 0) {
  39. if (r == DB_NOTFOUND)
  40. break;
  41. rr = DB_RUNRECOVERY;
  42. goto errorexit;
  43. }
  44. }
  45. // parse redo log结束
  46. assert(renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END);
  47. r = toku_logcursor_destroy(&logcursor);
  48. assert(r == 0);
  49. // 重启logger
  50. toku_logger_restart(renv->logger, lastlsn);
  51. // abort所有未提交的事务
  52. recover_abort_all_live_txns(renv);
  53. r = toku_checkpoint(renv->cp, renv->logger, NULL, NULL, NULL, NULL, RECOVERY_CHECKPOINT);
  54. assert(r == 0);
  55. return 0;
  56. }

Scan log entry分别两个阶段:backward阶段和forward阶段。这两个阶段是由scan_state状态机控制的。在scan开始之前在scan_state_init函数中把状态机ss的初始状态设置为BACKWARD_NEWER_CHECKPOINT_END。

  • Backward阶段:从最后一个log entry开始向前读,直到读到checkpoint end。对在这个过程中读到的每一个log entry调用logtype_dispatch_assign(le, toku_recover_backward_, r, renv)。在这个阶段对于checkpoint以外的操作,toku_recover_backward_前缀的处理函数都是noop。当读到checkpoint end的log entry时,会把ss状态设置为BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END,并记录这个checkpoint的begin_lsn和lsn。然后继续向前scan直到读到checkpoint begin的log entry,确保ss中记录的checkpoint_begin_lsn和log entry的lsn是相等的,然后 把ss的状态设置为FORWARD_BETWEEN_CHECKPOINT_BEGIN_END,并设置renv->goforward为TRUE。
  1. static void scan_state_init(struct scan_state *ss) {
  2. ss->ss = BACKWARD_NEWER_CHECKPOINT_END;
  3. ss->checkpoint_begin_lsn = ZERO_LSN;
  4. ss->checkpoint_end_lsn = ZERO_LSN;
  5. ss->checkpoint_num_fassociate = 0;
  6. ss->checkpoint_num_xstillopen = 0;
  7. ss->last_xid = 0;
  8. }
  9. static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
  10. switch (renv->ss.ss) {
  11. case BACKWARD_NEWER_CHECKPOINT_END:
  12. renv->ss.ss = BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END;
  13. renv->ss.checkpoint_begin_lsn.lsn = l->lsn_begin_checkpoint.lsn;
  14. renv->ss.checkpoint_end_lsn.lsn = l->lsn.lsn;
  15. renv->ss.checkpoint_end_timestamp = l->timestamp;
  16. return 0;
  17. case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
  18. abort();
  19. default:
  20. break;
  21. }
  22. abort();
  23. }
  24. static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
  25. int r;
  26. switch (renv->ss.ss) {
  27. case BACKWARD_NEWER_CHECKPOINT_END:
  28. // incomplete checkpoint, nothing to do
  29. r = 0;
  30. break;
  31. case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
  32. assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn);
  33. renv->ss.ss = FORWARD_BETWEEN_CHECKPOINT_BEGIN_END;
  34. renv->ss.checkpoint_begin_timestamp = l->timestamp;
  35. renv->goforward = true;
  36. r = 0;
  37. break;
  38. default:
  39. abort();
  40. break;
  41. }
  42. return r;
  43. }
  44. static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
  45. int r;
  46. TXN_MANAGER mgr = toku_logger_get_txn_manager(renv->logger);
  47. switch (renv->ss.ss) {
  48. case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
  49. assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn);
  50. invariant(renv->ss.last_xid == TXNID_NONE);
  51. renv->ss.last_xid = l->last_xid;
  52. toku_txn_manager_set_last_xid_from_recovered_checkpoint(mgr, l->last_xid);
  53. r = 0;
  54. break;
  55. case FORWARD_NEWER_CHECKPOINT_END:
  56. assert(l->lsn.lsn > renv->ss.checkpoint_end_lsn.lsn);
  57. // Verify last_xid is no older than the previous begin
  58. invariant(l->last_xid >= renv->ss.last_xid);
  59. // Verify last_xid is no older than the newest txn
  60. invariant(l->last_xid >= toku_txn_manager_get_last_xid(mgr));
  61. r = 0; // ignore it (log only has a begin checkpoint)
  62. break;
  63. default:
  64. abort();
  65. break;
  66. }
  67. return r;
  68. }
  69. static int toku_recover_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
  70. int r;
  71. switch (renv->ss.ss) {
  72. case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
  73. assert(l->lsn_begin_checkpoint.lsn == renv->ss.checkpoint_begin_lsn.lsn);
  74. assert(l->lsn.lsn == renv->ss.checkpoint_end_lsn.lsn);
  75. assert(l->num_fassociate_entries == renv->ss.checkpoint_num_fassociate);
  76. assert(l->num_xstillopen_entries == renv->ss.checkpoint_num_xstillopen);
  77. renv->ss.ss = FORWARD_NEWER_CHECKPOINT_END;
  78. r = 0;
  79. break;
  80. case FORWARD_NEWER_CHECKPOINT_END:
  81. assert(0);
  82. return 0;
  83. default:
  84. assert(0);
  85. return 0;
  86. }
  87. return r;
  • 向后读:从当前位置读4个字节的长度len1,然后读1个字节cmd。然后按照不同cmd的定义来读log entry。
  • 向前读:从当前位置读nocrc的长度len2,把文件指针向前移动len2个字节。从那个位置向后读。