47 Repairer(
const std::string& dbname,
const Options& options)
50 icmp_(options.comparator),
71 Status status = FindFiles();
73 ConvertLogFilesToTables();
75 status = WriteDescriptor();
78 unsigned long long bytes = 0;
79 for (
size_t i = 0; i <
tables_.size(); i++) {
80 bytes +=
tables_[i].meta.file_size;
83 "**** Repaired leveldb %s; "
84 "recovered %d files; %llu bytes. "
85 "Some data may have been lost. "
88 static_cast<int>(
tables_.size()),
117 std::vector<std::string> filenames;
118 Status status = env_->GetChildren(dbname_, &filenames);
122 if (filenames.empty()) {
128 for (
size_t i = 0; i < filenames.size(); i++) {
131 manifests_.push_back(filenames[i]);
133 if (number + 1 > next_file_number_) {
134 next_file_number_ = number + 1;
137 logs_.push_back(number);
139 table_numbers_.push_back(number);
149 void ConvertLogFilesToTables() {
150 for (
size_t i = 0; i < logs_.size(); i++) {
151 std::string logname =
LogFileName(dbname_, logs_[i]);
152 Status status = ConvertLogToTable(logs_[i]);
154 Log(options_.info_log,
"Log #%llu: ignoring conversion error: %s",
155 (
unsigned long long) logs_[i],
156 status.ToString().c_str());
158 ArchiveFile(logname);
162 Status ConvertLogToTable(
uint64_t log) {
163 struct LogReporter :
public log::Reader::Reporter {
167 virtual void Corruption(
size_t bytes,
const Status& s) {
169 Log(info_log,
"Log #%llu: dropping %d bytes; %s",
170 (
unsigned long long) lognum,
171 static_cast<int>(bytes),
172 s.ToString().c_str());
178 SequentialFile* lfile;
179 Status status = env_->NewSequentialFile(logname, &lfile);
185 LogReporter reporter;
187 reporter.info_log = options_.info_log;
188 reporter.lognum = log;
193 log::Reader reader(lfile, &reporter,
false,
200 MemTable*
mem =
new MemTable(icmp_);
203 while (reader.ReadRecord(&record, &scratch)) {
204 if (record.size() < 12) {
214 Log(options_.info_log,
"Log #%llu: ignoring %s",
215 (
unsigned long long) log,
216 status.ToString().c_str());
225 meta.number = next_file_number_++;
226 Iterator* iter = mem->NewIterator();
227 status =
BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
232 if (meta.file_size > 0) {
233 table_numbers_.push_back(meta.number);
236 Log(options_.info_log,
"Log #%llu: %d ops saved to Table #%llu %s",
237 (
unsigned long long) log,
239 (
unsigned long long) meta.number,
240 status.ToString().c_str());
244 void ExtractMetaData() {
245 std::vector<TableInfo> kept;
246 for (
size_t i = 0; i < table_numbers_.size(); i++) {
248 t.meta.number = table_numbers_[i];
249 Status status = ScanTable(&t);
251 std::string fname =
TableFileName(dbname_, table_numbers_[i]);
252 Log(options_.info_log,
"Table #%llu: ignoring %s",
253 (
unsigned long long) table_numbers_[i],
254 status.ToString().c_str());
257 tables_.push_back(t);
262 Status ScanTable(TableInfo* t) {
265 Status status = env_->GetFileSize(fname, &t->meta.file_size);
267 Iterator* iter = table_cache_->NewIterator(
268 ReadOptions(), t->meta.number, t->meta.file_size);
270 ParsedInternalKey parsed;
272 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
273 Slice key = iter->key();
275 Log(options_.info_log,
"Table #%llu: unparsable key %s",
276 (
unsigned long long) t->meta.number,
284 t->meta.smallest.DecodeFrom(key);
286 t->meta.largest.DecodeFrom(key);
287 if (parsed.sequence > t->max_sequence) {
288 t->max_sequence = parsed.sequence;
291 if (!iter->status().ok()) {
292 status = iter->status();
296 Log(options_.info_log,
"Table #%llu: %d entries %s",
297 (
unsigned long long) t->meta.number,
299 status.ToString().c_str());
303 Status WriteDescriptor() {
306 Status status = env_->NewWritableFile(tmp, &file);
312 for (
size_t i = 0; i < tables_.size(); i++) {
313 if (max_sequence < tables_[i].max_sequence) {
314 max_sequence = tables_[i].max_sequence;
318 edit_.SetComparatorName(icmp_.user_comparator()->Name());
319 edit_.SetLogNumber(0);
320 edit_.SetNextFile(next_file_number_);
321 edit_.SetLastSequence(max_sequence);
323 for (
size_t i = 0; i < tables_.size(); i++) {
325 const TableInfo& t = tables_[i];
326 edit_.AddFile(0, t.meta.number, t.meta.file_size,
327 t.meta.smallest, t.meta.largest);
332 log::Writer log(file);
334 edit_.EncodeTo(&record);
335 status = log.AddRecord(record);
338 status = file->Close();
344 env_->DeleteFile(tmp);
347 for (
size_t i = 0; i < manifests_.size(); i++) {
348 ArchiveFile(dbname_ +
"/" + manifests_[i]);
356 env_->DeleteFile(tmp);
362 void ArchiveFile(
const std::string& fname) {
367 const char* slash = strrchr(fname.c_str(),
'/');
370 new_dir.assign(fname.data(), slash - fname.data());
372 new_dir.append(
"/lost");
373 env_->CreateDir(new_dir);
374 std::string new_file = new_dir;
375 new_file.append(
"/");
376 new_file.append((slash == NULL) ? fname.c_str() : slash + 1);
377 Status s = env_->RenameFile(fname, new_file);
378 Log(options_.info_log,
"Archiving %s: %s\n",
379 fname.c_str(), s.ToString().c_str());
385 Repairer repairer(dbname, options);
386 return repairer.Run();
InternalFilterPolicy const ipolicy_
static void SetContents(WriteBatch *batch, const Slice &contents)
bool ParseFileName(const std::string &fname, uint64_t *number, FileType *type)
static Status Corruption(const Slice &msg, const Slice &msg2=Slice())
uint64_t next_file_number_
std::string TempFileName(const std::string &dbname, uint64_t number)
void Log(Logger *info_log, const char *format,...)
bool ParseInternalKey(const Slice &internal_key, ParsedInternalKey *result)
Status RepairDB(const std::string &dbname, const Options &options)
std::string TableFileName(const std::string &name, uint64_t number)
std::string DescriptorFileName(const std::string &dbname, uint64_t number)
unsigned long long uint64_t
std::vector< uint64_t > logs_
std::vector< std::string > manifests_
std::string EscapeString(const Slice &value)
Status SetCurrentFile(Env *env, const std::string &dbname, uint64_t descriptor_number)
std::vector< uint64_t > table_numbers_
TableCache * table_cache_
std::string const dbname_
static Status InsertInto(const WriteBatch *batch, MemTable *memtable)
std::vector< TableInfo > tables_
std::string LogFileName(const std::string &name, uint64_t number)
static int Count(const WriteBatch *batch)
Options SanitizeOptions(const std::string &dbname, const InternalKeyComparator *icmp, const InternalFilterPolicy *ipolicy, const Options &src)
Status BuildTable(const std::string &dbname, Env *env, const Options &options, TableCache *table_cache, Iterator *iter, FileMetaData *meta)
SequenceNumber max_sequence
InternalKeyComparator const icmp_
port::AtomicPointer counter[kNumThreads]
static Status IOError(const Slice &msg, const Slice &msg2=Slice())