源码分析

    依旧是从DBImpl::GetImpl开始,上一篇文章中我们分析这个函数只分析了Memtable相关的代码,这次我们来看当memtable没有查找到之后,RocksDB是如何处理的.我们可以看到当MemTable中没有找到对应的数据之后(包括删除),RocksDB将会进入对于sst的查找.

    从上面的代码我们可以看到直接从当前的version(sv->current)调用Get方法,因此接下来我们就来详细看这个函数。 这个函数简单来说就是根据所需要查找的key,然后选择对应的文件,这里每次会返回一个文件(key在sst的key范围内),然后循环查找.

    先来看查找之前的初始化

    1. user_comparator(), merge_operator_, info_log_, db_statistics_,
    2. status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
    3. value, value_found, merge_context, range_del_agg, this->env_, seq,
    4. merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
    5. // Pin blocks that we read to hold merge operands
    6. if (merge_operator_) {
    7. pinned_iters_mgr.StartPinning();
    8. }
    9. FilePicker fp(
    10. storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
    11. storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
    12. user_comparator(), internal_comparator());
    13. FdWithKeyRange* f = fp.GetNextFile();

    第一个是GetContext结构,这个类只要是根据传递进来的文件元信息来查找对应的key.然后是FilePicker,这个类主要是根据传递进来的key来选择对应的文件.这里最重要就是GetNextFile这个函数,我们来看这个函数。

    这个函数他会遍历所有的level,然后再遍历每个level的所有的文件,这里会对level 0的文件做一个特殊处理,这是因为只有level0的sst的range不是有序的,因此我们每次查找需要查找所有的文件,也就是会一个个的遍历.

    而在非level0,我们只需要按照二分查找来得到对应的文件即可,如果二分查找不存在,那么我就需要进入下一个level进行查找.

    1. FdWithKeyRange* GetNextFile() {
    2. while (!search_ended_) { // Loops over different levels.
    3. while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
    4. // Loops over all files in current level.
    5. FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
    6. hit_file_level_ = curr_level_;
    7. is_hit_file_last_in_level_ =
    8. curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
    9. int cmp_largest = -1;
    10. if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
    11. // Check if key is within a file's range. If search left bound and
    12. // right bound point to the same find, we are sure key falls in
    13. // range.
    14. assert(
    15. curr_level_ == 0 ||
    16. curr_index_in_curr_level_ == start_index_in_curr_level_ ||
    17. user_comparator_->Compare(user_key_,
    18. ExtractUserKey(f->smallest_key)) <= 0);
    19. int cmp_smallest = user_comparator_->Compare(user_key_,
    20. ExtractUserKey(f->smallest_key));
    21. if (cmp_smallest >= 0) {
    22. cmp_largest = user_comparator_->Compare(user_key_,
    23. ExtractUserKey(f->largest_key));
    24. }
    25. // Setup file search bound for the next level based on the
    26. // comparison results
    27. if (curr_level_ > 0) {
    28. file_indexer_->GetNextLevelIndex(curr_level_,
    29. curr_index_in_curr_level_,
    30. cmp_smallest, cmp_largest,
    31. &search_left_bound_,
    32. &search_right_bound_);
    33. }
    34. // Key falls out of current file's range
    35. if (cmp_smallest < 0 || cmp_largest > 0) {
    36. if (curr_level_ == 0) {
    37. ++curr_index_in_curr_level_;
    38. continue;
    39. } else {
    40. // Search next level.
    41. break;
    42. }
    43. }
    44. }
    45. returned_file_level_ = curr_level_;
    46. if (curr_level_ > 0 && cmp_largest < 0) {
    47. // No more files to search in this level.
    48. search_ended_ = !PrepareNextLevel();
    49. } else {
    50. ++curr_index_in_curr_level_;
    51. }
    52. return f;
    53. }
    54. // Start searching next level.
    55. search_ended_ = !PrepareNextLevel();
    56. }
    57. // Search ended.
    58. return nullptr;

    这里RocksDB使用了一个技巧用来加快二分查找的速度,每次更新sst的时候,RocksDB都会调用FileIndexer::UpdateIndex来更新这样的一个结构,这个结构就是FileIndexer,它主要是用来保存每一个level和level+1的key范围的关联信息,这样当我们在level查找的时候,如果没有查找到信息,那么我们将会迅速得到下一个level需要查找的文件范围.每一个key来进行比较总会有三种情况:

    • 小于当前sst的smallest.
    • 大于当前sst的largest.
    • 处于这个范围.

    那么我们只需要在初始化索引的时候能够得到当前的sst在下一个level中的位置,就可以根据上面三种类型来确定下一个level我们需要进行二分查找的文件范围.在RocksDB中定义了下面三个值.

    1. // Point to a left most file in a lower level that may contain a key,
    2. // which compares greater than smallest of a FileMetaData (upper level)
    3. int32_t smallest_lb;
    4. // Point to a left most file in a lower level that may contain a key,
    5. // which compares greater than largest of a FileMetaData (upper level)
    6. int32_t largest_lb;
    7. // Point to a right most file in a lower level that may contain a key,
    8. // which compares smaller than smallest of a FileMetaData (upper level)
    9. int32_t smallest_rb;
    10. // Point to a right most file in a lower level that may contain a key,
    11. // which compares smaller than largest of a FileMetaData (upper level)
    12. int32_t largest_rb;
    1. level 1: [50 - 60]
    2. level 2: [1 - 40], [45 - 55], [58 - 80]

    此时如果我们查找一个key为49,然后第一次比较,也就是key < level1.sst->smallest,那么我们将会知道我们需要在0和smallest_rb之间来查找,也就是0和1.假设我们查找key是55,也就是 level1.sst->smallest < key < level1.test.largest,此时我们在level2将需要在smallest_rb和largest_rb之间.这里可以看到其实就是计算一个重合的区间。

    来看RocksDB如何根据当前level的比较结果来计算下一个level需要二分查找的文件范围:

    看完上面这些我们继续来看RocksDB对于文件的查找.这里所有对于key的查找都是在table_cache_->Get中.这里我们暂且略过这个函数的实现,最后我们再来详细分析这个函数.

    1. while (f != nullptr) {
    2. ................................
    3. *status = table_cache_->Get(
    4. read_options, *internal_comparator(), f->fd, ikey, &get_context,
    5. cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
    6. IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
    7. fp.IsHitFileLastInLevel()),
    8. fp.GetCurrentLevel());
    9. // TODO: examine the behavior for corrupted key
    10. if (!status->ok()) {
    11. return;
    12. }
    13. .......................
    14. }

    当table_cache_->Get返回之后,我们需要根据get_context来判断返回的结果

    1. switch (get_context.State()) {
    2. case GetContext::kNotFound:
    3. // Keep searching in other files
    4. break;
    5. case GetContext::kMerge:
    6. break;
    7. case GetContext::kFound:
    8. if (fp.GetHitFileLevel() == 0) {
    9. RecordTick(db_statistics_, GET_HIT_L0);
    10. } else if (fp.GetHitFileLevel() == 1) {
    11. RecordTick(db_statistics_, GET_HIT_L1);
    12. } else if (fp.GetHitFileLevel() >= 2) {
    13. RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
    14. }
    15. return;
    16. case GetContext::kDeleted:
    17. // Use empty error message for speed
    18. *status = Status::NotFound();
    19. return;
    20. case GetContext::kCorrupt:
    21. *status = Status::Corruption("corrupted key for ", user_key);
    22. return;
    23. case GetContext::kBlobIndex:
    24. ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
    25. *status = Status::NotSupported(
    26. "Encounter unexpected blob index. Please open DB with "
    27. "rocksdb::blob_db::BlobDB instead.");
    28. return;
    29. }

    如果没有发现对应的值则进入下一次文件查找

    1. f = fp.GetNextFile();

    最后我们来详细分析最核心的函数TableCache::Get,这个函数不仅仅是返回对应的查找结果,并且还会cache相应的文件信息,并且如果row_cache打开,他还会做row cache.这里row cache就是对当前的所需要查找的key在当前sst中对应的value进行cache.

    先来看如果打开了row cache,RocksDB将会如何处理,首先它会计算row cache的key.通过下面的代码我们可以看到row cache的key就是fd_number+seq_no+user_key.

    1. uint64_t fd_number = fd.GetNumber();
    2. auto user_key = ExtractUserKey(k);
    3. // We use the user key as cache key instead of the internal key,
    4. // otherwise the whole cache would be invalidated every time the
    5. // sequence key increases. However, to support caching snapshot
    6. // reads, we append the sequence number (incremented by 1 to
    7. // distinguish from 0) only in this case.
    8. uint64_t seq_no =
    9. options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);
    10. // Compute row cache key.
    11. row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
    12. row_cache_id_.size());
    13. AppendVarint64(&row_cache_key, fd_number);
    14. user_key.size());

    然后就是在row cache中进行一次查找.如果有对应的值则直接返回结果,否则则将会在对应的sst读取传递进来的key.

    1. Status s;
    2. TableReader* t = fd.table_reader;
    3. Cache::Handle* handle = nullptr;
    4. if (!done && s.ok()) {
    5. if (t == nullptr) {
    6. s = FindTable(env_options_, internal_comparator, fd, &handle,
    7. options.read_tier == kBlockCacheTier /* no_io */,
    8. true /* record_read_stats */, file_read_hist, skip_filters,
    9. level);
    10. if (s.ok()) {
    11. t = GetTableReaderFromHandle(handle);
    12. }
    13. }
    14. ..........................
    15. }

    上面的代码会直接调用TableCache::FindTable, 这个函数主要是用来实现对应tablereader的读取以及row cache.

    1. Status TableCache::FindTable(const EnvOptions& env_options,
    2. const InternalKeyComparator& internal_comparator,
    3. const FileDescriptor& fd, Cache::Handle** handle,
    4. const bool no_io, bool record_read_stats,
    5. HistogramImpl* file_read_hist, bool skip_filters,
    6. int level,
    7. bool prefetch_index_and_filter_in_cache) {
    8. ...................................................
    9. if (*handle == nullptr) {
    10. if (no_io) { // Don't do IO and return a not-found status
    11. return Status::Incomplete("Table not found in table_cache, no_io is set");
    12. }
    13. unique_ptr<TableReader> table_reader;
    14. s = GetTableReader(env_options, internal_comparator, fd,
    15. false /* sequential mode */, 0 /* readahead */,
    16. record_read_stats, file_read_hist, &table_reader,
    17. skip_filters, level, prefetch_index_and_filter_in_cache);
    18. if (!s.ok()) {
    19. assert(table_reader == nullptr);
    20. RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
    21. // We do not cache error results so that if the error is transient,
    22. // or somebody repairs the file, we recover automatically.
    23. } else {
    24. s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
    25. handle);
    26. if (s.ok()) {
    27. // Release ownership of table reader.
    28. table_reader.release();
    29. }
    30. }
    31. }
    32. return s;
    33. }

    通过上面的代码可以看到实现很简单,就是一般的cache逻辑,读取然后判断是否存在,不存在则插入到cache. 上面的函数会调用 TableCache::GetTableReader,我们来简单看下这个函数.

    1. Status TableCache::GetTableReader(
    2. const EnvOptions& env_options,
    3. const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
    4. bool sequential_mode, size_t readahead, bool record_read_stats,
    5. HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
    6. bool skip_filters, int level, bool prefetch_index_and_filter_in_cache,
    7. bool for_compaction) {
    8. ..........................................
    9. if (s.ok()) {
    10. ...............................................
    11. s = ioptions_.table_factory->NewTableReader(
    12. TableReaderOptions(ioptions_, env_options, internal_comparator,
    13. skip_filters, level),
    14. std::move(file_reader), fd.GetFileSize(), table_reader,
    15. prefetch_index_and_filter_in_cache);
    16. TEST_SYNC_POINT("TableCache::GetTableReader:0");
    17. }
    18. return s;
    19. }

    可以看到最关键的调用就是调用ioptions_.table_factory->NewTableReader, 这里RocksDB会根据我们配置的不同的sst格式来调用不同的reader,而在RocksDB中默认的格式是基于block.

    1. // Create default block based table factory.
    2. extern TableFactory* NewBlockBasedTableFactory(
    3. const BlockBasedTableOptions& table_options = BlockBasedTableOptions());

    这里我们就不详细分析sst的文件格式了,以后我们会来详细对比这几个文件格式的优劣.这里我们只需要知道最终缓存的tablereader就是一个BlockBasedTable对象(假设使用了基于block的sst format).

    当读取完毕TableReader之后,RocksDB就需要从sst文件中get key了,也就是最终的key查找方式是在每个sst format class的Get方法中实现的。

    和上面一样,这里的get也就是对应的sst format的get.

    最后如果查找到key,则开始缓存对应的kv到row_cache.

    1. size_t charge =
    2. row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
    3. void* row_ptr = new std::string(std::move(*row_cache_entry));
    4. ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,

    这里整个读取流程我们都分析完毕了,不过这里略过了merge,delete range以及不同sst format如何组织以及读取内容,后续我们会详细分析这些略过的内容.