MANIFEST在RocksDB中是一个单独的文件,而这个文件所保存的数据基本是来自于VersionEdit这个结构.
MANIFEST包含了两个文件,一个log文件一个包含最新MANIFEST文件名的文件,Manifest的log文件名是这样 MANIFEST-(seqnumber),这个seq会一直增长.只有当 超过了指定的大小之后,MANIFEST会刷新一个新的文件,当新的文件刷新到磁盘(并且文件名更新)之后,老的文件会被删除掉.这里可以认为每一次MANIFEST的更新都代表一次snapshot.
下面就是MANIFEST的基本文件组成:
在RocksDB中任意时间存储引擎的状态都会保存为一个Version(也就是SST的集合),而每次对Version的修改都是一个VersionEdit,而最终这些VersionEdit就是 组成manifest-log文件的内容.
下面就是MANIFEST的log文件的基本构成:
version = { version-edit* }
manifest-log-file = { version, version-edit* }
= { version-edit* }
class VersionSet {
public:
VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options,
const EnvOptions& env_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller);
~VersionSet();
.......................
private:
struct ManifestWriter;
friend class Version;
.................................
// Opened lazily
unique_ptr<log::Writer> descriptor_log_;
// generates a increasing version number for every new version
uint64_t current_version_number_;
// Queue of writers to the manifest file
std::deque<ManifestWriter*> manifest_writers_;
..........................................
这里最关键的两个数据结构是descriptor_log_和manifest_writers_,前一个表示了当前manifest-log文件,后一个表示需要写入到manifest-log文件中的内容.
下面就是ManifestWriter的结构,可以看到其中包含了一个VersionEdit的数组,这个数组就是即将要写入到manifest-log文件中的内容.
// this is used to batch writes to the manifest file
struct VersionSet::ManifestWriter {
Status status;
bool done;
InstrumentedCondVar cv;
ColumnFamilyData* cfd;
const autovector<VersionEdit*>& edit_list;
explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
const autovector<VersionEdit*>& e)
: done(false), cv(mu), cfd(_cfd), edit_list(e) {}
};
然后我们来看RocksDB如何来创建以及写入文件,下面所有的代码都是包含在VersionSet::LogAndApply这个函数中.
首先在每次LogAndApply的时候都会创建一个新的ManifesWriter加入到manifest_writers_队列中.这里只有当之前保存在队列中 的所有Writer都写入完毕之后才会加入到队列,否则就会等待.
接下来就是保存对应的数据到batch_edits中(manifest_writers_).
autovector<VersionEdit*> batch_edits;
....................................
if (w.edit_list.front()->IsColumnFamilyManipulation()) {
// no group commits for column family add or drop
LogAndApplyCFHelper(w.edit_list.front());
batch_edits.push_back(w.edit_list.front());
} else {
v = new Version(column_family_data, this, current_version_number_++);
for (const auto& writer : manifest_writers_) {
if (writer->edit_list.front()->IsColumnFamilyManipulation() ||
break;
}
last_writer = writer;
for (const auto& edit : writer->edit_list) {
...........................................
batch_edits.push_back(edit);
}
}
builder->SaveTo(v->storage_info());
}
if (!descriptor_log_ ||
manifest_file_size_ > db_options_->max_manifest_file_size) {
pending_manifest_file_number_ = NewFileNumber();
batch_edits.back()->SetNextFile(next_file_number_.load());
new_descriptor_log = true;
} else {
pending_manifest_file_number_ = manifest_file_number_;
}
if (new_descriptor_log) {
// if we're writing out new snapshot make sure to persist max column family
if (column_family_set_->GetMaxColumnFamily() > 0) {
w.edit_list.front()->SetMaxColumnFamily(
column_family_set_->GetMaxColumnFamily());
}
}
如果需要创建新的manifest-log文件,则开始构造对应的文件信息并创建文件.
if (new_descriptor_log) {
// create manifest file
ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
pending_manifest_file_number_);
unique_ptr<WritableFile> descriptor_file;
EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
s = NewWritableFile(
env_, DescriptorFileName(dbname_, pending_manifest_file_number_),
&descriptor_file, opt_env_opts);
if (s.ok()) {
descriptor_file->SetPreallocationBlockSize(
db_options_->manifest_preallocation_size);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(descriptor_file), opt_env_opts));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteSnapshot(descriptor_log_.get());
}
开始写入对应的VersionEdit的record到文件(最后我们会来看这个record的构成),这里看到写入完成后会调用Sync来刷新内容到磁盘,等这些操作都做完之后,则会更新Current文件也就是更新最新的manifest-log文件名到CURRENT文件中.
CURRENT文件更新完毕之后,就可以删除老的mainfest文件了.
// Append the old mainfest file to the obsolete_manifests_ list to be deleted
if (s.ok() && new_descriptor_log) {
obsolete_manifests_.emplace_back(
DescriptorFileName("", manifest_file_number_));
}
最后则是更新manifest_writers_队列,唤醒之前阻塞的内容.
// wake up all the waiting writers
while (true) {
ManifestWriter* ready = manifest_writers_.front();
manifest_writers_.pop_front();
if (ready != &w) {
ready->status = s;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
具体的文件格式可以看RocksDB的wiki,这里我来介绍下对应的源码. 通过上面的分析我们可以看到最终是通过VersionEdit::EncodeTo来序列化数据,而VersionEdit主要包含了比如log_number/last_sequence_这些字段,这里还有一个比较关键的信息被序列化了,那就是FileMetaData,也就是SST文件的元信息.
struct FileMetaData {
FileDescriptor fd;
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
SequenceNumber smallest_seqno; // The smallest seqno in this file
SequenceNumber largest_seqno; // The largest seqno in this file
.........................................
// File size compensated by deletion entry.
// This is updated in Version::UpdateAccumulatedStats() first time when the
// file is created or loaded. After it is updated (!= 0), it is immutable.
uint64_t compensated_file_size;
// These values can mutate, but they can only be read or written from
// single-threaded LogAndApply thread
uint64_t num_entries; // the number of entries.
uint64_t num_deletions; // the number of deletion entries.
uint64_t raw_key_size; // total uncompressed key size.
uint64_t raw_value_size; // total uncompressed value size.
int refs; // Reference count
bool being_compacted; // Is this file undergoing compaction?
bool init_stats_from_file; // true if the data-entry stats of this file
// has initialized from file.
bool marked_for_compaction; // True if client asked us nicely to compact this
// file.