我们知道rockdb作为kv存储,采用的WAL方式来写日志,即预写日志,每次要对数据操作之前,先写日志保存起来,然后在进行相应操作。这样当发生某些意外而导致还未写到磁盘中的数据丢失时,我们可以采用log文件来进行恢复。通过读取磁盘中的内容和已知的WAL日志,就可以恢复到最新的状态。而memtable和未写入磁盘的immemtable则从log文件中读出来,重做memtable和immemtable到sstable中即可。
整个恢复流程粗略如下流程图:
在DBImpl::Recover()函数中,NEWDB()是创建新的生成全新的manifest和current文件,然后获取wal文件目录,如果目录为空,则返回,否则开始recoverlogfile()操作。恢复的是memtable及immtable中还未持久化到SSTable中的数据。
接下来的日志操作就是恢复内存中的数据了,重做日志操作即是将日志中记录的操作读取出来,然后再将读取到的操作重新写入到rocksdb中,如果缓存大小大于写缓存尺寸,就写入到保存在sstable中,RecoverFogFiles()函数中实现,具体流程如下:
logfilename()得到log文件名,然后env_->NewSequentialFile打开日志文件,如果打开状态成功,得到info_log状态,在sstble0中插入该记录。
writeleve0tableforrecovery()函数比较简单,具体过程如下:
开始先给文件解锁,Buildtable()建立sstable,如果创建成功,则调用LogAndNotifyTableFileCreation()写入到logger类,最后如果当前memtable文件size大于0,则调用edit->AddFile写入到manifast文件。
Status DBImpl::Recover()部分代码如下
s = env_->LockFile(LockFileName(dbname_), &db_lock_); //文件加锁 if (!s.ok()) { return s; } s = env_->FileExists(CurrentFileName(dbname_)); if (s.IsNotFound()) { if (db_options_.create_if_missing) { s = NewDB(); //生成全新的manifest和current文件 is_new_db = true; } else { return Status::InvalidArgument( dbname_, "does not exist (create_if_missing is false)"); } } else if (s.ok()) { if (db_options_.error_if_exists) { return Status::InvalidArgument( dbname_, "exists (error_if_exists is true)"); } } else { // Unexpected error reading file assert(s.IsIOError()); return s; } // Check for the IDENTITY file and create it if not there s = env_->FileExists(IdentityFileName(dbname_)); if (s.IsNotFound()) { s = SetIdentityFile(env_, dbname_); if (!s.ok()) { return s; } } else if (!s.ok()) { assert(s.IsIOError()); return s; } } Status s = versions_->Recover(column_families, read_only); // 恢复当前version信息 if (db_options_.paranoid_checks && s.ok()) { s = CheckConsistency(); //检查文件一致性 } if (s.ok()) { SequenceNumber max_sequence(kMaxSequenceNumber); default_cf_handle_ = new ColumnFamilyHandleImpl( versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); single_column_family_mode_ = versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; // Recover from all newer log files than the ones named in the // descriptor (new log files may have been added by the previous // incarnation without registering them in the descriptor). // // Note that prev_log_number() is no longer used, but we pay // attention to it in case we are recovering a database // produced by an older version of rocksdb. const uint64_t min_log = versions_->MinLogNumber(); const uint64_t prev_log = versions_->prev_log_number(); std::vector<std::string> filenames; s = env_->GetChildren(db_options_.wal_dir, &filenames); // 获取wal文件目录 std::vector<uint64_t> logs; for (size_t i = 0; i < filenames.size(); i++) { uint64_t number; FileType type; if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) { if (is_new_db) { return Status::Corruption( "While creating a new Db, wal_dir contains " "existing log file: ", filenames[i]); } else if ((number >= min_log) || (number == prev_log)) { logs.push_back(number); //存储当前已有的日志文件 } } } if (logs.size() > 0 && error_if_log_file_exist) { //如果文件有错误 return Status::Corruption("" "The db was opened in readonly mode with error_if_log_file_exist" "flag but a log file already exists"); } if (!logs.empty()) { //如果文件没有缺失 // Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); //排序日志文件 s = RecoverLogFiles(logs, &max_sequence, read_only); //重做日志操作 if (!s.ok()) { // Clear memtables if recovery failed for (auto cfd : *versions_->GetColumnFamilySet()) { cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); } } } SetTickerCount(stats_, SEQUENCE_NUMBER, versions_->LastSequence()); } // Initial value max_total_in_memory_state_ = 0; for (auto cfd : *versions_->GetColumnFamilySet()) { auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * mutable_cf_options->max_write_buffer_number; } return s; }
DBImpl::RecoverLogFiles部分代码
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* max_sequence, bool read_only) {
SequenceNumber* max_sequence, bool read_only) {
int job_id = next_job_id_.fetch_add(1); { auto stream = event_logger_.Log(); stream << "job" << job_id << "event" << "recovery_started"; stream << "log_files"; //输出log文件通知开始recorvery stream.StartArray(); for (auto log_number : log_numbers) { stream << log_number; //分配log number } stream.EndArray(); } bool continue_replay_log = true; for (auto log_number : log_numbers) { versions_->MarkFileNumberUsedDuringRecovery(log_number, env_); //用log number来mark 文件number // Open the log file std::string fname = LogFileName(db_options_.wal_dir, log_number); //打开log file unique_ptr<SequentialFileReader> file_reader; { unique_ptr<SequentialFile> file; status = env_->NewSequentialFile(fname, &file, env_options_); if (!status.ok()) { MaybeIgnoreError(&status); if (!status.ok()) { return status; } else { // Fail with one log file, but that's ok. //某个log文件打开失败了,继续操作 // Try next one. continue; } } file_reader.reset(new SequentialFileReader(std::move(file))); } // Create the log reader. LogReporter reporter; reporter.env = env_; reporter.info_log = db_options_.info_log.get(); //得到info_log reporter.fname = fname.c_str(); if (!db_options_.paranoid_checks || db_options_.wal_recovery_mode == WALRecoveryMode::kSkipAnyCorruptedRecords) { reporter.status = nullptr; } else { reporter.status = &status; } // We intentially make log::Reader do checksumming even if // paranoid_checks==false so that corruptions cause entire commits // to be skipped instead of propagating bad information (like overly // large sequence numbers). log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/, 0 /*initial_offset*/); Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number, db_options_.wal_recovery_mode, !continue_replay_log); // Determine if we should tolerate incomplete records at the tail end of the // log bool report_eof_inconsistency; if (db_options_.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { //记录一致性 // in clean shutdown we don't expect any error in the log files report_eof_inconsistency = true; } else { // for other modes ignore only incomplete records in the last log file // which is presumably due to write in progress during restart report_eof_inconsistency = false; // TODO krad: Evaluate if we need to move to a more strict mode where we // restrict the inconsistency to only the last log } // Read all the records and add to a memtable std::string scratch; Slice record; WriteBatch batch; if (!continue_replay_log) { uint64_t bytes; if (env_->GetFileSize(fname, &bytes).ok()) { auto info_log = db_options_.info_log.get(); //得到info_log Log(InfoLogLevel::WARN_LEVEL, info_log, "%s: dropping %d bytes", fname.c_str(), static_cast<int>(bytes)); } } while (continue_replay_log && reader.ReadRecord(&record, &scratch, report_eof_inconsistency) && status.ok()) { if (record.size() < 12) { //size不对 reporter.Corruption(record.size(), Status::Corruption("log record too small")); continue; } WriteBatchInternal::SetContents(&batch, record); // If column family was not found, it might mean that the WAL write // batch references to the column family that was dropped after the // insert. We don't want to fail the whole write batch in that case -- // we just ignore the update. // That's why we set ignore missing column families to true status = WriteBatchInternal::InsertInto( //插入 &batch, column_family_memtables_.get(), true, log_number); MaybeIgnoreError(&status); if (!status.ok()) { // We are treating this as a failure while reading since we read valid // blocks that do not form coherent data reporter.Corruption(record.size(), status); continue; } const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + WriteBatchInternal::Count(&batch) - 1; if ((*max_sequence == kMaxSequenceNumber) || (last_seq > *max_sequence)) { *max_sequence = last_seq; } if (!read_only) { // we can do this because this is called before client has access to the // DB and there is only a single thread operating on DB ColumnFamilyData* cfd; while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { cfd->Unref(); // If this asserts, it means that InsertInto failed in // filtering updates to already-flushed column families assert(cfd->GetLogNumber() <= log_number); auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); VersionEdit* edit = &iter->second; status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. return status; } cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), *max_sequence); //创建一个新的memtable文件 } } } if (!status.ok()) { if (db_options_.wal_recovery_mode == WALRecoveryMode::kSkipAnyCorruptedRecords) { //忽视任何error // We should ignore all errors unconditionally status = Status::OK(); } else if (db_options_.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { // We should ignore the error but not continue replaying status = Status::OK(); continue_replay_log = false; Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Point in time recovered to log #%" PRIu64 " seq #%" PRIu64, log_number, *max_sequence); } else { assert(db_options_.wal_recovery_mode == WALRecoveryMode::kTolerateCorruptedTailRecords || db_options_.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency); return status; } } flush_scheduler_.Clear(); if ((*max_sequence != kMaxSequenceNumber) && (versions_->LastSequence() < *max_sequence)) { versions_->SetLastSequence(*max_sequence); //置为最大的sequence } } if (!read_only) { // no need to refcount since client still doesn't have access // to the DB and can not drop column families while we iterate auto max_log_number = log_numbers.back(); for (auto cfd : *versions_->GetColumnFamilySet()) { auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); VersionEdit* edit = &iter->second; if (cfd->GetLogNumber() > max_log_number) { // Column family cfd has already flushed the data // from all logs. Memtable has to be empty because // we filter the updates based on log_number // (in WriteBatch::InsertInto) assert(cfd->mem()->GetFirstSequenceNumber() == 0); assert(edit->NumEntries() == 0); continue; } // flush the final memtable (if non-empty) if (cfd->mem()->GetFirstSequenceNumber() != 0) { status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); //写成SSTable if (!status.ok()) { // Recovery failed break; } cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), //新建新的memtable *max_sequence); } // write MANIFEST with update // writing log_number in the manifest means that any log file // with number strongly less than (log_number + 1) is already // recovered and should be ignored on next reincarnation. // Since we already recovered max_log_number, we want all logs // with numbers `<= max_log_number` (includes this one) to be ignored edit->SetLogNumber(max_log_number + 1); //更新lognumber // we must mark the next log number as used, even though it's // not actually used. that is because VersionSet assumes // VersionSet::next_file_number_ always to be strictly greater than any // log number versions_->MarkFileNumberUsedDuringRecovery(max_log_number + 1); //标记下个log number status = versions_->LogAndApply( cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_); if (!status.ok()) { // Recovery failed break; } } } event_logger_.Log() << "job" << job_id << "event" << "recovery_finished"; return status;}
mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); //开始时间 FileMetaData meta; meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); ReadOptions ro; ro.total_order_seek = true; Arena arena; Status s; TableProperties table_properties; { ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" " Level-0 table #%" PRIu64 ": started", cfd->GetName().c_str(), meta.fd.GetNumber()); bool paranoid_file_checks = cfd->GetLatestMutableCFOptions()->paranoid_file_checks; { mutex_.Unlock(); TableFileCreationInfo info; s = BuildTable( //新建sstable dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(), iter.get(), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), snapshots_.GetAll(), GetCompressionFlush(*cfd->ioptions()), cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->internal_stats(), Env::IO_HIGH, &info.table_properties); LogFlush(db_options_.info_log); Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s", cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), s.ToString().c_str()); // output to event logger if (s.ok()) { //写入到logger类 info.db_name = dbname_; info.cf_name = cfd->GetName(); info.file_path = TableFileName(db_options_.db_paths, meta.fd.GetNumber(), meta.fd.GetPathId()); info.file_size = meta.fd.GetFileSize(); info.job_id = job_id; EventHelpers::LogAndNotifyTableFileCreation( &event_logger_, db_options_.listeners, meta.fd, info); } mutex_.Lock(); } } ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. int level = 0; if (s.ok() && meta.fd.GetFileSize() > 0) { //文件大小大于0,表示存在,写入到manifest edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(), meta.smallest, meta.largest, meta.smallest_seqno, meta.largest_seqno, meta.marked_for_compaction, meta.priv_meta); } InternalStats::CompactionStats stats(1); stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.fd.GetFileSize(); stats.num_output_files = 1; cfd->internal_stats()->AddCompactionStats(level, stats); cfd->internal_stats()->AddCFStats( InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize()); RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); return s; } Status DBImpl::FlushMemTableToOutputFile( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { mutex_.AssertHeld(); assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options, env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots_.GetAll(), job_context, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(0U), GetCompressionFlush(*cfd->ioptions()), stats_, &event_logger_); FileMetaData file_meta; // Within flush_job.Run, rocksdb may call event listener to notify // file creation and deletion. // // Note that flush_job.Run will unlock and lock the db_mutex, // and EventListener callback will be called when the db_mutex // is unlocked by the current thread. Status s = flush_job.Run(&file_meta); if (s.ok()) { InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context, mutable_cf_options); if (made_progress) { *made_progress = 1; } VersionStorageInfo::LevelSummaryStorage tmp; LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(), cfd->current()->storage_info()->LevelSummary(&tmp)); } if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks && bg_error_.ok()) { // if a bad error happened (not ShutdownInProgress) and paranoid_checks is // true, mark DB read-only bg_error_ = s; } RecordFlushIOStats(); #ifndef ROCKSDB_LITE if (s.ok()) { // may temporarily unlock and lock the mutex. NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, job_context->job_id, flush_job.GetTableProperties()); } #endif // ROCKSDB_LITE file_meta.FreePrivateMetadata(); return s; }