Feathercoin  0.5.0
P2P Digital Currency
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros
db_impl.cc
Go to the documentation of this file.
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "db/db_impl.h"
6 
7 #include <algorithm>
8 #include <set>
9 #include <string>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <vector>
13 #include "db/builder.h"
14 #include "db/db_iter.h"
15 #include "db/dbformat.h"
16 #include "db/filename.h"
17 #include "db/log_reader.h"
18 #include "db/log_writer.h"
19 #include "db/memtable.h"
20 #include "db/table_cache.h"
21 #include "db/version_set.h"
23 #include "leveldb/db.h"
24 #include "leveldb/env.h"
25 #include "leveldb/status.h"
26 #include "leveldb/table.h"
27 #include "leveldb/table_builder.h"
28 #include "port/port.h"
29 #include "table/block.h"
30 #include "table/merger.h"
32 #include "util/coding.h"
33 #include "util/logging.h"
34 #include "util/mutexlock.h"
35 
36 namespace leveldb {
37 
38 const int kNumNonTableCacheFiles = 10;
39 
40 // Information kept for every waiting writer
44  bool sync;
45  bool done;
47 
48  explicit Writer(port::Mutex* mu) : cv(mu) { }
49 };
50 
53 
54  // Sequence numbers < smallest_snapshot are not significant since we
55  // will never have to service a snapshot below smallest_snapshot.
56  // Therefore if we have seen a sequence number S <= smallest_snapshot,
57  // we can drop all entries for the same key with sequence numbers < S.
59 
60  // Files produced by compaction
61  struct Output {
65  };
66  std::vector<Output> outputs;
67 
68  // State kept for output being generated
71 
73 
74  Output* current_output() { return &outputs[outputs.size()-1]; }
75 
77  : compaction(c),
78  outfile(NULL),
79  builder(NULL),
80  total_bytes(0) {
81  }
82 };
83 
84 // Fix user-supplied options to be reasonable
85 template <class T,class V>
86 static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
87  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
88  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
89 }
90 Options SanitizeOptions(const std::string& dbname,
91  const InternalKeyComparator* icmp,
92  const InternalFilterPolicy* ipolicy,
93  const Options& src) {
94  Options result = src;
95  result.comparator = icmp;
96  result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
97  ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
98  ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
99  ClipToRange(&result.block_size, 1<<10, 4<<20);
100  if (result.info_log == NULL) {
101  // Open a log file in the same directory as the db
102  src.env->CreateDir(dbname); // In case it does not exist
103  src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
104  Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
105  if (!s.ok()) {
106  // No place suitable for logging
107  result.info_log = NULL;
108  }
109  }
110  if (result.block_cache == NULL) {
111  result.block_cache = NewLRUCache(8 << 20);
112  }
113  return result;
114 }
115 
116 DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
117  : env_(raw_options.env),
118  internal_comparator_(raw_options.comparator),
119  internal_filter_policy_(raw_options.filter_policy),
120  options_(SanitizeOptions(dbname, &internal_comparator_,
121  &internal_filter_policy_, raw_options)),
122  owns_info_log_(options_.info_log != raw_options.info_log),
123  owns_cache_(options_.block_cache != raw_options.block_cache),
124  dbname_(dbname),
125  db_lock_(NULL),
126  shutting_down_(NULL),
127  bg_cv_(&mutex_),
128  mem_(new MemTable(internal_comparator_)),
129  imm_(NULL),
130  logfile_(NULL),
131  logfile_number_(0),
132  log_(NULL),
133  seed_(0),
134  tmp_batch_(new WriteBatch),
135  bg_compaction_scheduled_(false),
136  manual_compaction_(NULL),
137  consecutive_compaction_errors_(0) {
138  mem_->Ref();
139  has_imm_.Release_Store(NULL);
140 
141  // Reserve ten files or so for other uses and give the rest to TableCache.
142  const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
143  table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
144 
147 }
148 
150  // Wait for background work to finish
151  mutex_.Lock();
152  shutting_down_.Release_Store(this); // Any non-NULL value is ok
153  while (bg_compaction_scheduled_) {
154  bg_cv_.Wait();
155  }
156  mutex_.Unlock();
157 
158  if (db_lock_ != NULL) {
160  }
161 
162  delete versions_;
163  if (mem_ != NULL) mem_->Unref();
164  if (imm_ != NULL) imm_->Unref();
165  delete tmp_batch_;
166  delete log_;
167  delete logfile_;
168  delete table_cache_;
169 
170  if (owns_info_log_) {
171  delete options_.info_log;
172  }
173  if (owns_cache_) {
174  delete options_.block_cache;
175  }
176 }
177 
179  VersionEdit new_db;
180  new_db.SetComparatorName(user_comparator()->Name());
181  new_db.SetLogNumber(0);
182  new_db.SetNextFile(2);
183  new_db.SetLastSequence(0);
184 
185  const std::string manifest = DescriptorFileName(dbname_, 1);
186  WritableFile* file;
187  Status s = env_->NewWritableFile(manifest, &file);
188  if (!s.ok()) {
189  return s;
190  }
191  {
192  log::Writer log(file);
193  std::string record;
194  new_db.EncodeTo(&record);
195  s = log.AddRecord(record);
196  if (s.ok()) {
197  s = file->Close();
198  }
199  }
200  delete file;
201  if (s.ok()) {
202  // Make "CURRENT" file that points to the new manifest file.
203  s = SetCurrentFile(env_, dbname_, 1);
204  } else {
205  env_->DeleteFile(manifest);
206  }
207  return s;
208 }
209 
211  if (s->ok() || options_.paranoid_checks) {
212  // No change needed
213  } else {
214  Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
215  *s = Status::OK();
216  }
217 }
218 
220  // Make a set of all of the live files
221  std::set<uint64_t> live = pending_outputs_;
222  versions_->AddLiveFiles(&live);
223 
224  std::vector<std::string> filenames;
225  env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
226  uint64_t number;
227  FileType type;
228  for (size_t i = 0; i < filenames.size(); i++) {
229  if (ParseFileName(filenames[i], &number, &type)) {
230  bool keep = true;
231  switch (type) {
232  case kLogFile:
233  keep = ((number >= versions_->LogNumber()) ||
234  (number == versions_->PrevLogNumber()));
235  break;
236  case kDescriptorFile:
237  // Keep my manifest file, and any newer incarnations'
238  // (in case there is a race that allows other incarnations)
239  keep = (number >= versions_->ManifestFileNumber());
240  break;
241  case kTableFile:
242  keep = (live.find(number) != live.end());
243  break;
244  case kTempFile:
245  // Any temp files that are currently being written to must
246  // be recorded in pending_outputs_, which is inserted into "live"
247  keep = (live.find(number) != live.end());
248  break;
249  case kCurrentFile:
250  case kDBLockFile:
251  case kInfoLogFile:
252  keep = true;
253  break;
254  }
255 
256  if (!keep) {
257  if (type == kTableFile) {
258  table_cache_->Evict(number);
259  }
260  Log(options_.info_log, "Delete type=%d #%lld\n",
261  int(type),
262  static_cast<unsigned long long>(number));
263  env_->DeleteFile(dbname_ + "/" + filenames[i]);
264  }
265  }
266  }
267 }
268 
270  mutex_.AssertHeld();
271 
272  // Ignore error from CreateDir since the creation of the DB is
273  // committed only when the descriptor is created, and this directory
274  // may already exist from a previous failed creation attempt.
276  assert(db_lock_ == NULL);
278  if (!s.ok()) {
279  return s;
280  }
281 
284  s = NewDB();
285  if (!s.ok()) {
286  return s;
287  }
288  } else {
290  dbname_, "does not exist (create_if_missing is false)");
291  }
292  } else {
295  dbname_, "exists (error_if_exists is true)");
296  }
297  }
298 
299  s = versions_->Recover();
300  if (s.ok()) {
302 
303  // Recover from all newer log files than the ones named in the
304  // descriptor (new log files may have been added by the previous
305  // incarnation without registering them in the descriptor).
306  //
307  // Note that PrevLogNumber() is no longer used, but we pay
308  // attention to it in case we are recovering a database
309  // produced by an older version of leveldb.
310  const uint64_t min_log = versions_->LogNumber();
311  const uint64_t prev_log = versions_->PrevLogNumber();
312  std::vector<std::string> filenames;
313  s = env_->GetChildren(dbname_, &filenames);
314  if (!s.ok()) {
315  return s;
316  }
317  std::set<uint64_t> expected;
318  versions_->AddLiveFiles(&expected);
319  uint64_t number;
320  FileType type;
321  std::vector<uint64_t> logs;
322  for (size_t i = 0; i < filenames.size(); i++) {
323  if (ParseFileName(filenames[i], &number, &type)) {
324  expected.erase(number);
325  if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
326  logs.push_back(number);
327  }
328  }
329  if (!expected.empty()) {
330  char buf[50];
331  snprintf(buf, sizeof(buf), "%d missing files; e.g.",
332  static_cast<int>(expected.size()));
333  return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
334  }
335 
336  // Recover in the order in which the logs were generated
337  std::sort(logs.begin(), logs.end());
338  for (size_t i = 0; i < logs.size(); i++) {
339  s = RecoverLogFile(logs[i], edit, &max_sequence);
340 
341  // The previous incarnation may not have written any MANIFEST
342  // records after allocating this log number. So we manually
343  // update the file number allocation counter in VersionSet.
344  versions_->MarkFileNumberUsed(logs[i]);
345  }
346 
347  if (s.ok()) {
349  versions_->SetLastSequence(max_sequence);
350  }
351  }
352  }
353 
354  return s;
355 }
356 
358  VersionEdit* edit,
360  struct LogReporter : public log::Reader::Reporter {
361  Env* env;
362  Logger* info_log;
363  const char* fname;
364  Status* status; // NULL if options_.paranoid_checks==false
365  virtual void Corruption(size_t bytes, const Status& s) {
366  Log(info_log, "%s%s: dropping %d bytes; %s",
367  (this->status == NULL ? "(ignoring error) " : ""),
368  fname, static_cast<int>(bytes), s.ToString().c_str());
369  if (this->status != NULL && this->status->ok()) *this->status = s;
370  }
371  };
372 
373  mutex_.AssertHeld();
374 
375  // Open the log file
376  std::string fname = LogFileName(dbname_, log_number);
377  SequentialFile* file;
378  Status status = env_->NewSequentialFile(fname, &file);
379  if (!status.ok()) {
380  MaybeIgnoreError(&status);
381  return status;
382  }
383 
384  // Create the log reader.
385  LogReporter reporter;
386  reporter.env = env_;
387  reporter.info_log = options_.info_log;
388  reporter.fname = fname.c_str();
389  reporter.status = (options_.paranoid_checks ? &status : NULL);
390  // We intentially make log::Reader do checksumming even if
391  // paranoid_checks==false so that corruptions cause entire commits
392  // to be skipped instead of propagating bad information (like overly
393  // large sequence numbers).
394  log::Reader reader(file, &reporter, true/*checksum*/,
395  0/*initial_offset*/);
396  Log(options_.info_log, "Recovering log #%llu",
397  (unsigned long long) log_number);
398 
399  // Read all the records and add to a memtable
400  std::string scratch;
401  Slice record;
402  WriteBatch batch;
403  MemTable* mem = NULL;
404  while (reader.ReadRecord(&record, &scratch) &&
405  status.ok()) {
406  if (record.size() < 12) {
407  reporter.Corruption(
408  record.size(), Status::Corruption("log record too small"));
409  continue;
410  }
411  WriteBatchInternal::SetContents(&batch, record);
412 
413  if (mem == NULL) {
414  mem = new MemTable(internal_comparator_);
415  mem->Ref();
416  }
417  status = WriteBatchInternal::InsertInto(&batch, mem);
418  MaybeIgnoreError(&status);
419  if (!status.ok()) {
420  break;
421  }
422  const SequenceNumber last_seq =
424  WriteBatchInternal::Count(&batch) - 1;
425  if (last_seq > *max_sequence) {
426  *max_sequence = last_seq;
427  }
428 
430  status = WriteLevel0Table(mem, edit, NULL);
431  if (!status.ok()) {
432  // Reflect errors immediately so that conditions like full
433  // file-systems cause the DB::Open() to fail.
434  break;
435  }
436  mem->Unref();
437  mem = NULL;
438  }
439  }
440 
441  if (status.ok() && mem != NULL) {
442  status = WriteLevel0Table(mem, edit, NULL);
443  // Reflect errors immediately so that conditions like full
444  // file-systems cause the DB::Open() to fail.
445  }
446 
447  if (mem != NULL) mem->Unref();
448  delete file;
449  return status;
450 }
451 
453  Version* base) {
454  mutex_.AssertHeld();
455  const uint64_t start_micros = env_->NowMicros();
457  meta.number = versions_->NewFileNumber();
458  pending_outputs_.insert(meta.number);
459  Iterator* iter = mem->NewIterator();
460  Log(options_.info_log, "Level-0 table #%llu: started",
461  (unsigned long long) meta.number);
462 
463  Status s;
464  {
465  mutex_.Unlock();
466  s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
467  mutex_.Lock();
468  }
469 
470  Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
471  (unsigned long long) meta.number,
472  (unsigned long long) meta.file_size,
473  s.ToString().c_str());
474  delete iter;
475  pending_outputs_.erase(meta.number);
476 
477 
478  // Note that if file_size is zero, the file has been deleted and
479  // should not be added to the manifest.
480  int level = 0;
481  if (s.ok() && meta.file_size > 0) {
482  const Slice min_user_key = meta.smallest.user_key();
483  const Slice max_user_key = meta.largest.user_key();
484  if (base != NULL) {
485  level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
486  }
487  edit->AddFile(level, meta.number, meta.file_size,
488  meta.smallest, meta.largest);
489  }
490 
492  stats.micros = env_->NowMicros() - start_micros;
493  stats.bytes_written = meta.file_size;
494  stats_[level].Add(stats);
495  return s;
496 }
497 
499  mutex_.AssertHeld();
500  assert(imm_ != NULL);
501 
502  // Save the contents of the memtable as a new Table
503  VersionEdit edit;
505  base->Ref();
506  Status s = WriteLevel0Table(imm_, &edit, base);
507  base->Unref();
508 
509  if (s.ok() && shutting_down_.Acquire_Load()) {
510  s = Status::IOError("Deleting DB during memtable compaction");
511  }
512 
513  // Replace immutable memtable with the generated Table
514  if (s.ok()) {
515  edit.SetPrevLogNumber(0);
516  edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
517  s = versions_->LogAndApply(&edit, &mutex_);
518  }
519 
520  if (s.ok()) {
521  // Commit to the new state
522  imm_->Unref();
523  imm_ = NULL;
524  has_imm_.Release_Store(NULL);
526  }
527 
528  return s;
529 }
530 
531 void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
532  int max_level_with_files = 1;
533  {
534  MutexLock l(&mutex_);
536  for (int level = 1; level < config::kNumLevels; level++) {
537  if (base->OverlapInLevel(level, begin, end)) {
538  max_level_with_files = level;
539  }
540  }
541  }
542  TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
543  for (int level = 0; level < max_level_with_files; level++) {
544  TEST_CompactRange(level, begin, end);
545  }
546 }
547 
548 void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
549  assert(level >= 0);
550  assert(level + 1 < config::kNumLevels);
551 
552  InternalKey begin_storage, end_storage;
553 
554  ManualCompaction manual;
555  manual.level = level;
556  manual.done = false;
557  if (begin == NULL) {
558  manual.begin = NULL;
559  } else {
560  begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
561  manual.begin = &begin_storage;
562  }
563  if (end == NULL) {
564  manual.end = NULL;
565  } else {
566  end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
567  manual.end = &end_storage;
568  }
569 
570  MutexLock l(&mutex_);
571  while (!manual.done) {
572  while (manual_compaction_ != NULL) {
573  bg_cv_.Wait();
574  }
575  manual_compaction_ = &manual;
577  while (manual_compaction_ == &manual) {
578  bg_cv_.Wait();
579  }
580  }
581 }
582 
584  // NULL batch means just wait for earlier writes to be done
585  Status s = Write(WriteOptions(), NULL);
586  if (s.ok()) {
587  // Wait until the compaction completes
588  MutexLock l(&mutex_);
589  while (imm_ != NULL && bg_error_.ok()) {
590  bg_cv_.Wait();
591  }
592  if (imm_ != NULL) {
593  s = bg_error_;
594  }
595  }
596  return s;
597 }
598 
600  mutex_.AssertHeld();
602  // Already scheduled
603  } else if (shutting_down_.Acquire_Load()) {
604  // DB is being deleted; no more background compactions
605  } else if (imm_ == NULL &&
606  manual_compaction_ == NULL &&
608  // No work to be done
609  } else {
611  env_->Schedule(&DBImpl::BGWork, this);
612  }
613 }
614 
615 void DBImpl::BGWork(void* db) {
616  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
617 }
618 
620  MutexLock l(&mutex_);
621  assert(bg_compaction_scheduled_);
622  if (!shutting_down_.Acquire_Load()) {
624  if (s.ok()) {
625  // Success
627  } else if (shutting_down_.Acquire_Load()) {
628  // Error most likely due to shutdown; do not wait
629  } else {
630  // Wait a little bit before retrying background compaction in
631  // case this is an environmental problem and we do not want to
632  // chew up resources for failed compactions for the duration of
633  // the problem.
634  bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
635  Log(options_.info_log, "Waiting after background compaction error: %s",
636  s.ToString().c_str());
637  mutex_.Unlock();
639  int seconds_to_sleep = 1;
640  for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) {
641  seconds_to_sleep *= 2;
642  }
643  env_->SleepForMicroseconds(seconds_to_sleep * 1000000);
644  mutex_.Lock();
645  }
646  }
647 
648  bg_compaction_scheduled_ = false;
649 
650  // Previous compaction may have produced too many files in a level,
651  // so reschedule another compaction if needed.
653  bg_cv_.SignalAll();
654 }
655 
657  mutex_.AssertHeld();
658 
659  if (imm_ != NULL) {
660  return CompactMemTable();
661  }
662 
663  Compaction* c;
664  bool is_manual = (manual_compaction_ != NULL);
665  InternalKey manual_end;
666  if (is_manual) {
668  c = versions_->CompactRange(m->level, m->begin, m->end);
669  m->done = (c == NULL);
670  if (c != NULL) {
671  manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
672  }
674  "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
675  m->level,
676  (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
677  (m->end ? m->end->DebugString().c_str() : "(end)"),
678  (m->done ? "(end)" : manual_end.DebugString().c_str()));
679  } else {
680  c = versions_->PickCompaction();
681  }
682 
683  Status status;
684  if (c == NULL) {
685  // Nothing to do
686  } else if (!is_manual && c->IsTrivialMove()) {
687  // Move file to next level
688  assert(c->num_input_files(0) == 1);
689  FileMetaData* f = c->input(0, 0);
690  c->edit()->DeleteFile(c->level(), f->number);
691  c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
692  f->smallest, f->largest);
693  status = versions_->LogAndApply(c->edit(), &mutex_);
695  Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
696  static_cast<unsigned long long>(f->number),
697  c->level() + 1,
698  static_cast<unsigned long long>(f->file_size),
699  status.ToString().c_str(),
700  versions_->LevelSummary(&tmp));
701  } else {
702  CompactionState* compact = new CompactionState(c);
703  status = DoCompactionWork(compact);
704  CleanupCompaction(compact);
705  c->ReleaseInputs();
707  }
708  delete c;
709 
710  if (status.ok()) {
711  // Done
712  } else if (shutting_down_.Acquire_Load()) {
713  // Ignore compaction errors found during shutting down
714  } else {
716  "Compaction error: %s", status.ToString().c_str());
718  bg_error_ = status;
719  }
720  }
721 
722  if (is_manual) {
724  if (!status.ok()) {
725  m->done = true;
726  }
727  if (!m->done) {
728  // We only compacted part of the requested range. Update *m
729  // to the range that is left to be compacted.
730  m->tmp_storage = manual_end;
731  m->begin = &m->tmp_storage;
732  }
733  manual_compaction_ = NULL;
734  }
735  return status;
736 }
737 
739  mutex_.AssertHeld();
740  if (compact->builder != NULL) {
741  // May happen if we get a shutdown call in the middle of compaction
742  compact->builder->Abandon();
743  delete compact->builder;
744  } else {
745  assert(compact->outfile == NULL);
746  }
747  delete compact->outfile;
748  for (size_t i = 0; i < compact->outputs.size(); i++) {
749  const CompactionState::Output& out = compact->outputs[i];
750  pending_outputs_.erase(out.number);
751  }
752  delete compact;
753 }
754 
756  assert(compact != NULL);
757  assert(compact->builder == NULL);
758  uint64_t file_number;
759  {
760  mutex_.Lock();
761  file_number = versions_->NewFileNumber();
762  pending_outputs_.insert(file_number);
764  out.number = file_number;
765  out.smallest.Clear();
766  out.largest.Clear();
767  compact->outputs.push_back(out);
768  mutex_.Unlock();
769  }
770 
771  // Make the output file
772  std::string fname = TableFileName(dbname_, file_number);
773  Status s = env_->NewWritableFile(fname, &compact->outfile);
774  if (s.ok()) {
775  compact->builder = new TableBuilder(options_, compact->outfile);
776  }
777  return s;
778 }
779 
781  Iterator* input) {
782  assert(compact != NULL);
783  assert(compact->outfile != NULL);
784  assert(compact->builder != NULL);
785 
786  const uint64_t output_number = compact->current_output()->number;
787  assert(output_number != 0);
788 
789  // Check for iterator errors
790  Status s = input->status();
791  const uint64_t current_entries = compact->builder->NumEntries();
792  if (s.ok()) {
793  s = compact->builder->Finish();
794  } else {
795  compact->builder->Abandon();
796  }
797  const uint64_t current_bytes = compact->builder->FileSize();
798  compact->current_output()->file_size = current_bytes;
799  compact->total_bytes += current_bytes;
800  delete compact->builder;
801  compact->builder = NULL;
802 
803  // Finish and check for file errors
804  if (s.ok()) {
805  s = compact->outfile->Sync();
806  }
807  if (s.ok()) {
808  s = compact->outfile->Close();
809  }
810  delete compact->outfile;
811  compact->outfile = NULL;
812 
813  if (s.ok() && current_entries > 0) {
814  // Verify that the table is usable
816  output_number,
817  current_bytes);
818  s = iter->status();
819  delete iter;
820  if (s.ok()) {
822  "Generated table #%llu: %lld keys, %lld bytes",
823  (unsigned long long) output_number,
824  (unsigned long long) current_entries,
825  (unsigned long long) current_bytes);
826  }
827  }
828  return s;
829 }
830 
831 
833  mutex_.AssertHeld();
834  Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
835  compact->compaction->num_input_files(0),
836  compact->compaction->level(),
837  compact->compaction->num_input_files(1),
838  compact->compaction->level() + 1,
839  static_cast<long long>(compact->total_bytes));
840 
841  // Add compaction outputs
842  compact->compaction->AddInputDeletions(compact->compaction->edit());
843  const int level = compact->compaction->level();
844  for (size_t i = 0; i < compact->outputs.size(); i++) {
845  const CompactionState::Output& out = compact->outputs[i];
846  compact->compaction->edit()->AddFile(
847  level + 1,
848  out.number, out.file_size, out.smallest, out.largest);
849  }
850  return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
851 }
852 
854  const uint64_t start_micros = env_->NowMicros();
855  int64_t imm_micros = 0; // Micros spent doing imm_ compactions
856 
857  Log(options_.info_log, "Compacting %d@%d + %d@%d files",
858  compact->compaction->num_input_files(0),
859  compact->compaction->level(),
860  compact->compaction->num_input_files(1),
861  compact->compaction->level() + 1);
862 
863  assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
864  assert(compact->builder == NULL);
865  assert(compact->outfile == NULL);
866  if (snapshots_.empty()) {
868  } else {
870  }
871 
872  // Release mutex while we're actually doing the compaction work
873  mutex_.Unlock();
874 
875  Iterator* input = versions_->MakeInputIterator(compact->compaction);
876  input->SeekToFirst();
877  Status status;
878  ParsedInternalKey ikey;
879  std::string current_user_key;
880  bool has_current_user_key = false;
881  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
882  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
883  // Prioritize immutable compaction work
884  if (has_imm_.NoBarrier_Load() != NULL) {
885  const uint64_t imm_start = env_->NowMicros();
886  mutex_.Lock();
887  if (imm_ != NULL) {
888  CompactMemTable();
889  bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
890  }
891  mutex_.Unlock();
892  imm_micros += (env_->NowMicros() - imm_start);
893  }
894 
895  Slice key = input->key();
896  if (compact->compaction->ShouldStopBefore(key) &&
897  compact->builder != NULL) {
898  status = FinishCompactionOutputFile(compact, input);
899  if (!status.ok()) {
900  break;
901  }
902  }
903 
904  // Handle key/value, add to state, etc.
905  bool drop = false;
906  if (!ParseInternalKey(key, &ikey)) {
907  // Do not hide error keys
908  current_user_key.clear();
909  has_current_user_key = false;
910  last_sequence_for_key = kMaxSequenceNumber;
911  } else {
912  if (!has_current_user_key ||
913  user_comparator()->Compare(ikey.user_key,
914  Slice(current_user_key)) != 0) {
915  // First occurrence of this user key
916  current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
917  has_current_user_key = true;
918  last_sequence_for_key = kMaxSequenceNumber;
919  }
920 
921  if (last_sequence_for_key <= compact->smallest_snapshot) {
922  // Hidden by an newer entry for same user key
923  drop = true; // (A)
924  } else if (ikey.type == kTypeDeletion &&
925  ikey.sequence <= compact->smallest_snapshot &&
926  compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
927  // For this user key:
928  // (1) there is no data in higher levels
929  // (2) data in lower levels will have larger sequence numbers
930  // (3) data in layers that are being compacted here and have
931  // smaller sequence numbers will be dropped in the next
932  // few iterations of this loop (by rule (A) above).
933  // Therefore this deletion marker is obsolete and can be dropped.
934  drop = true;
935  }
936 
937  last_sequence_for_key = ikey.sequence;
938  }
939 #if 0
941  " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
942  "%d smallest_snapshot: %d",
943  ikey.user_key.ToString().c_str(),
944  (int)ikey.sequence, ikey.type, kTypeValue, drop,
945  compact->compaction->IsBaseLevelForKey(ikey.user_key),
946  (int)last_sequence_for_key, (int)compact->smallest_snapshot);
947 #endif
948 
949  if (!drop) {
950  // Open output file if necessary
951  if (compact->builder == NULL) {
952  status = OpenCompactionOutputFile(compact);
953  if (!status.ok()) {
954  break;
955  }
956  }
957  if (compact->builder->NumEntries() == 0) {
958  compact->current_output()->smallest.DecodeFrom(key);
959  }
960  compact->current_output()->largest.DecodeFrom(key);
961  compact->builder->Add(key, input->value());
962 
963  // Close output file if it is big enough
964  if (compact->builder->FileSize() >=
965  compact->compaction->MaxOutputFileSize()) {
966  status = FinishCompactionOutputFile(compact, input);
967  if (!status.ok()) {
968  break;
969  }
970  }
971  }
972 
973  input->Next();
974  }
975 
976  if (status.ok() && shutting_down_.Acquire_Load()) {
977  status = Status::IOError("Deleting DB during compaction");
978  }
979  if (status.ok() && compact->builder != NULL) {
980  status = FinishCompactionOutputFile(compact, input);
981  }
982  if (status.ok()) {
983  status = input->status();
984  }
985  delete input;
986  input = NULL;
987 
989  stats.micros = env_->NowMicros() - start_micros - imm_micros;
990  for (int which = 0; which < 2; which++) {
991  for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
992  stats.bytes_read += compact->compaction->input(which, i)->file_size;
993  }
994  }
995  for (size_t i = 0; i < compact->outputs.size(); i++) {
996  stats.bytes_written += compact->outputs[i].file_size;
997  }
998 
999  mutex_.Lock();
1000  stats_[compact->compaction->level() + 1].Add(stats);
1001 
1002  if (status.ok()) {
1003  status = InstallCompactionResults(compact);
1004  }
1007  "compacted to: %s", versions_->LevelSummary(&tmp));
1008  return status;
1009 }
1010 
1011 namespace {
1012 struct IterState {
1013  port::Mutex* mu;
1014  Version* version;
1015  MemTable* mem;
1016  MemTable* imm;
1017 };
1018 
1019 static void CleanupIteratorState(void* arg1, void* arg2) {
1020  IterState* state = reinterpret_cast<IterState*>(arg1);
1021  state->mu->Lock();
1022  state->mem->Unref();
1023  if (state->imm != NULL) state->imm->Unref();
1024  state->version->Unref();
1025  state->mu->Unlock();
1026  delete state;
1027 }
1028 } // namespace
1029 
1031  SequenceNumber* latest_snapshot,
1032  uint32_t* seed) {
1033  IterState* cleanup = new IterState;
1034  mutex_.Lock();
1035  *latest_snapshot = versions_->LastSequence();
1036 
1037  // Collect together all needed child iterators
1038  std::vector<Iterator*> list;
1039  list.push_back(mem_->NewIterator());
1040  mem_->Ref();
1041  if (imm_ != NULL) {
1042  list.push_back(imm_->NewIterator());
1043  imm_->Ref();
1044  }
1045  versions_->current()->AddIterators(options, &list);
1046  Iterator* internal_iter =
1047  NewMergingIterator(&internal_comparator_, &list[0], list.size());
1048  versions_->current()->Ref();
1049 
1050  cleanup->mu = &mutex_;
1051  cleanup->mem = mem_;
1052  cleanup->imm = imm_;
1053  cleanup->version = versions_->current();
1054  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
1055 
1056  *seed = ++seed_;
1057  mutex_.Unlock();
1058  return internal_iter;
1059 }
1060 
1062  SequenceNumber ignored;
1063  uint32_t ignored_seed;
1064  return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
1065 }
1066 
1068  MutexLock l(&mutex_);
1070 }
1071 
1073  const Slice& key,
1074  std::string* value) {
1075  Status s;
1076  MutexLock l(&mutex_);
1077  SequenceNumber snapshot;
1078  if (options.snapshot != NULL) {
1079  snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
1080  } else {
1081  snapshot = versions_->LastSequence();
1082  }
1083 
1084  MemTable* mem = mem_;
1085  MemTable* imm = imm_;
1086  Version* current = versions_->current();
1087  mem->Ref();
1088  if (imm != NULL) imm->Ref();
1089  current->Ref();
1090 
1091  bool have_stat_update = false;
1093 
1094  // Unlock while reading from files and memtables
1095  {
1096  mutex_.Unlock();
1097  // First look in the memtable, then in the immutable memtable (if any).
1098  LookupKey lkey(key, snapshot);
1099  if (mem->Get(lkey, value, &s)) {
1100  // Done
1101  } else if (imm != NULL && imm->Get(lkey, value, &s)) {
1102  // Done
1103  } else {
1104  s = current->Get(options, lkey, value, &stats);
1105  have_stat_update = true;
1106  }
1107  mutex_.Lock();
1108  }
1109 
1110  if (have_stat_update && current->UpdateStats(stats)) {
1112  }
1113  mem->Unref();
1114  if (imm != NULL) imm->Unref();
1115  current->Unref();
1116  return s;
1117 }
1118 
1120  SequenceNumber latest_snapshot;
1121  uint32_t seed;
1122  Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
1123  return NewDBIterator(
1124  this, user_comparator(), iter,
1125  (options.snapshot != NULL
1126  ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
1127  : latest_snapshot),
1128  seed);
1129 }
1130 
1132  MutexLock l(&mutex_);
1133  if (versions_->current()->RecordReadSample(key)) {
1135  }
1136 }
1137 
1139  MutexLock l(&mutex_);
1140  return snapshots_.New(versions_->LastSequence());
1141 }
1142 
1144  MutexLock l(&mutex_);
1145  snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
1146 }
1147 
1148 // Convenience methods
1149 Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
1150  return DB::Put(o, key, val);
1151 }
1152 
1153 Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
1154  return DB::Delete(options, key);
1155 }
1156 
1157 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
1158  Writer w(&mutex_);
1159  w.batch = my_batch;
1160  w.sync = options.sync;
1161  w.done = false;
1162 
1163  MutexLock l(&mutex_);
1164  writers_.push_back(&w);
1165  while (!w.done && &w != writers_.front()) {
1166  w.cv.Wait();
1167  }
1168  if (w.done) {
1169  return w.status;
1170  }
1171 
1172  // May temporarily unlock and wait.
1173  Status status = MakeRoomForWrite(my_batch == NULL);
1174  uint64_t last_sequence = versions_->LastSequence();
1175  Writer* last_writer = &w;
1176  if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
1177  WriteBatch* updates = BuildBatchGroup(&last_writer);
1178  WriteBatchInternal::SetSequence(updates, last_sequence + 1);
1179  last_sequence += WriteBatchInternal::Count(updates);
1180 
1181  // Add to log and apply to memtable. We can release the lock
1182  // during this phase since &w is currently responsible for logging
1183  // and protects against concurrent loggers and concurrent writes
1184  // into mem_.
1185  {
1186  mutex_.Unlock();
1187  status = log_->AddRecord(WriteBatchInternal::Contents(updates));
1188  if (status.ok() && options.sync) {
1189  status = logfile_->Sync();
1190  }
1191  if (status.ok()) {
1192  status = WriteBatchInternal::InsertInto(updates, mem_);
1193  }
1194  mutex_.Lock();
1195  }
1196  if (updates == tmp_batch_) tmp_batch_->Clear();
1197 
1198  versions_->SetLastSequence(last_sequence);
1199  }
1200 
1201  while (true) {
1202  Writer* ready = writers_.front();
1203  writers_.pop_front();
1204  if (ready != &w) {
1205  ready->status = status;
1206  ready->done = true;
1207  ready->cv.Signal();
1208  }
1209  if (ready == last_writer) break;
1210  }
1211 
1212  // Notify new head of write queue
1213  if (!writers_.empty()) {
1214  writers_.front()->cv.Signal();
1215  }
1216 
1217  return status;
1218 }
1219 
1220 // REQUIRES: Writer list must be non-empty
1221 // REQUIRES: First writer must have a non-NULL batch
1223  assert(!writers_.empty());
1224  Writer* first = writers_.front();
1225  WriteBatch* result = first->batch;
1226  assert(result != NULL);
1227 
1228  size_t size = WriteBatchInternal::ByteSize(first->batch);
1229 
1230  // Allow the group to grow up to a maximum size, but if the
1231  // original write is small, limit the growth so we do not slow
1232  // down the small write too much.
1233  size_t max_size = 1 << 20;
1234  if (size <= (128<<10)) {
1235  max_size = size + (128<<10);
1236  }
1237 
1238  *last_writer = first;
1239  std::deque<Writer*>::iterator iter = writers_.begin();
1240  ++iter; // Advance past "first"
1241  for (; iter != writers_.end(); ++iter) {
1242  Writer* w = *iter;
1243  if (w->sync && !first->sync) {
1244  // Do not include a sync write into a batch handled by a non-sync write.
1245  break;
1246  }
1247 
1248  if (w->batch != NULL) {
1249  size += WriteBatchInternal::ByteSize(w->batch);
1250  if (size > max_size) {
1251  // Do not make batch too big
1252  break;
1253  }
1254 
1255  // Append to *reuslt
1256  if (result == first->batch) {
1257  // Switch to temporary batch instead of disturbing caller's batch
1258  result = tmp_batch_;
1259  assert(WriteBatchInternal::Count(result) == 0);
1260  WriteBatchInternal::Append(result, first->batch);
1261  }
1262  WriteBatchInternal::Append(result, w->batch);
1263  }
1264  *last_writer = w;
1265  }
1266  return result;
1267 }
1268 
1269 // REQUIRES: mutex_ is held
1270 // REQUIRES: this thread is currently at the front of the writer queue
1272  mutex_.AssertHeld();
1273  assert(!writers_.empty());
1274  bool allow_delay = !force;
1275  Status s;
1276  while (true) {
1277  if (!bg_error_.ok()) {
1278  // Yield previous error
1279  s = bg_error_;
1280  break;
1281  } else if (
1282  allow_delay &&
1283  versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
1284  // We are getting close to hitting a hard limit on the number of
1285  // L0 files. Rather than delaying a single write by several
1286  // seconds when we hit the hard limit, start delaying each
1287  // individual write by 1ms to reduce latency variance. Also,
1288  // this delay hands over some CPU to the compaction thread in
1289  // case it is sharing the same core as the writer.
1290  mutex_.Unlock();
1291  env_->SleepForMicroseconds(1000);
1292  allow_delay = false; // Do not delay a single write more than once
1293  mutex_.Lock();
1294  } else if (!force &&
1296  // There is room in current memtable
1297  break;
1298  } else if (imm_ != NULL) {
1299  // We have filled up the current memtable, but the previous
1300  // one is still being compacted, so we wait.
1301  Log(options_.info_log, "Current memtable full; waiting...\n");
1302  bg_cv_.Wait();
1303  } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
1304  // There are too many level-0 files.
1305  Log(options_.info_log, "Too many L0 files; waiting...\n");
1306  bg_cv_.Wait();
1307  } else {
1308  // Attempt to switch to a new memtable and trigger compaction of old
1309  assert(versions_->PrevLogNumber() == 0);
1310  uint64_t new_log_number = versions_->NewFileNumber();
1311  WritableFile* lfile = NULL;
1312  s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1313  if (!s.ok()) {
1314  // Avoid chewing through file number space in a tight loop.
1315  versions_->ReuseFileNumber(new_log_number);
1316  break;
1317  }
1318  delete log_;
1319  delete logfile_;
1320  logfile_ = lfile;
1321  logfile_number_ = new_log_number;
1322  log_ = new log::Writer(lfile);
1323  imm_ = mem_;
1326  mem_->Ref();
1327  force = false; // Do not force another compaction if have room
1329  }
1330  }
1331  return s;
1332 }
1333 
1334 bool DBImpl::GetProperty(const Slice& property, std::string* value) {
1335  value->clear();
1336 
1337  MutexLock l(&mutex_);
1338  Slice in = property;
1339  Slice prefix("leveldb.");
1340  if (!in.starts_with(prefix)) return false;
1341  in.remove_prefix(prefix.size());
1342 
1343  if (in.starts_with("num-files-at-level")) {
1344  in.remove_prefix(strlen("num-files-at-level"));
1345  uint64_t level;
1346  bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
1347  if (!ok || level >= config::kNumLevels) {
1348  return false;
1349  } else {
1350  char buf[100];
1351  snprintf(buf, sizeof(buf), "%d",
1352  versions_->NumLevelFiles(static_cast<int>(level)));
1353  *value = buf;
1354  return true;
1355  }
1356  } else if (in == "stats") {
1357  char buf[200];
1358  snprintf(buf, sizeof(buf),
1359  " Compactions\n"
1360  "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1361  "--------------------------------------------------\n"
1362  );
1363  value->append(buf);
1364  for (int level = 0; level < config::kNumLevels; level++) {
1365  int files = versions_->NumLevelFiles(level);
1366  if (stats_[level].micros > 0 || files > 0) {
1367  snprintf(
1368  buf, sizeof(buf),
1369  "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
1370  level,
1371  files,
1372  versions_->NumLevelBytes(level) / 1048576.0,
1373  stats_[level].micros / 1e6,
1374  stats_[level].bytes_read / 1048576.0,
1375  stats_[level].bytes_written / 1048576.0);
1376  value->append(buf);
1377  }
1378  }
1379  return true;
1380  } else if (in == "sstables") {
1381  *value = versions_->current()->DebugString();
1382  return true;
1383  }
1384 
1385  return false;
1386 }
1387 
1389  const Range* range, int n,
1390  uint64_t* sizes) {
1391  // TODO(opt): better implementation
1392  Version* v;
1393  {
1394  MutexLock l(&mutex_);
1395  versions_->current()->Ref();
1396  v = versions_->current();
1397  }
1398 
1399  for (int i = 0; i < n; i++) {
1400  // Convert user_key into a corresponding internal key.
1401  InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1402  InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1403  uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1404  uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1405  sizes[i] = (limit >= start ? limit - start : 0);
1406  }
1407 
1408  {
1409  MutexLock l(&mutex_);
1410  v->Unref();
1411  }
1412 }
1413 
1414 // Default implementations of convenience methods that subclasses of DB
1415 // can call if they wish
1416 Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1417  WriteBatch batch;
1418  batch.Put(key, value);
1419  return Write(opt, &batch);
1420 }
1421 
1422 Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1423  WriteBatch batch;
1424  batch.Delete(key);
1425  return Write(opt, &batch);
1426 }
1427 
1428 DB::~DB() { }
1429 
1430 Status DB::Open(const Options& options, const std::string& dbname,
1431  DB** dbptr) {
1432  *dbptr = NULL;
1433 
1434  DBImpl* impl = new DBImpl(options, dbname);
1435  impl->mutex_.Lock();
1436  VersionEdit edit;
1437  Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
1438  if (s.ok()) {
1439  uint64_t new_log_number = impl->versions_->NewFileNumber();
1440  WritableFile* lfile;
1441  s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
1442  &lfile);
1443  if (s.ok()) {
1444  edit.SetLogNumber(new_log_number);
1445  impl->logfile_ = lfile;
1446  impl->logfile_number_ = new_log_number;
1447  impl->log_ = new log::Writer(lfile);
1448  s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
1449  }
1450  if (s.ok()) {
1451  impl->DeleteObsoleteFiles();
1452  impl->MaybeScheduleCompaction();
1453  }
1454  }
1455  impl->mutex_.Unlock();
1456  if (s.ok()) {
1457  *dbptr = impl;
1458  } else {
1459  delete impl;
1460  }
1461  return s;
1462 }
1463 
1465 }
1466 
1467 Status DestroyDB(const std::string& dbname, const Options& options) {
1468  Env* env = options.env;
1469  std::vector<std::string> filenames;
1470  // Ignore error in case directory does not exist
1471  env->GetChildren(dbname, &filenames);
1472  if (filenames.empty()) {
1473  return Status::OK();
1474  }
1475 
1476  FileLock* lock;
1477  const std::string lockname = LockFileName(dbname);
1478  Status result = env->LockFile(lockname, &lock);
1479  if (result.ok()) {
1480  uint64_t number;
1481  FileType type;
1482  for (size_t i = 0; i < filenames.size(); i++) {
1483  if (ParseFileName(filenames[i], &number, &type) &&
1484  type != kDBLockFile) { // Lock file will be deleted at end
1485  Status del = env->DeleteFile(dbname + "/" + filenames[i]);
1486  if (result.ok() && !del.ok()) {
1487  result = del;
1488  }
1489  }
1490  }
1491  env->UnlockFile(lock); // Ignore error since state is already gone
1492  env->DeleteFile(lockname);
1493  env->DeleteDir(dbname); // Ignore error in case dir contains other files
1494  }
1495  return result;
1496 }
1497 
1498 } // namespace leveldb
uint64_t ApproximateOffsetOf(Version *v, const InternalKey &key)
std::string DebugString() const
Definition: version_set.cc:579
void MaybeIgnoreError(Status *s) const
Definition: db_impl.cc:210
bool IsBaseLevelForKey(const Slice &user_key)
virtual Status Write(const WriteOptions &options, WriteBatch *updates)
Definition: db_impl.cc:1157
FileMetaData meta
Definition: repair.cc:96
void Add(const Slice &key, const Slice &value)
virtual Status LockFile(const std::string &fname, FileLock **lock)=0
virtual Status DeleteFile(const std::string &fname)=0
const int kNumNonTableCacheFiles
Definition: db_impl.cc:38
size_t ApproximateMemoryUsage()
Definition: memtable.cc:31
virtual void CompactRange(const Slice *begin, const Slice *end)
Definition: db_impl.cc:531
void AddInputDeletions(VersionEdit *edit)
void SetPrevLogNumber(uint64_t num)
Definition: version_edit.h:43
Cache * block_cache
Definition: options.h:98
int PickLevelForMemTableOutput(const Slice &smallest_user_key, const Slice &largest_user_key)
Definition: version_set.cc:507
virtual Status Write(const WriteOptions &options, WriteBatch *updates)=0
bool OverlapInLevel(int level, const Slice *smallest_user_key, const Slice *largest_user_key)
Definition: version_set.cc:500
Status NewDB()
Definition: db_impl.cc:178
void DecodeFrom(const Slice &s)
Definition: dbformat.h:153
std::string * value
Definition: version_set.cc:270
bool create_if_missing
Definition: options.h:45
void SetLogNumber(uint64_t num)
Definition: version_edit.h:39
static void SetContents(WriteBatch *batch, const Slice &contents)
Definition: write_batch.cc:136
bool ParseFileName(const std::string &fname, uint64_t *number, FileType *type)
Definition: filename.cc:75
void * Acquire_Load() const
Definition: port_win.cc:128
bool starts_with(const Slice &x) const
Definition: slice.h:75
bool ReadRecord(Slice *record, std::string *scratch)
Definition: log_reader.cc:59
bool empty() const
Definition: snapshot.h:37
std::string OldInfoLogFileName(const std::string &dbname)
Definition: filename.cc:63
bool owns_cache_
Definition: db_impl.h:125
virtual Status RenameFile(const std::string &src, const std::string &target)=0
void Evict(uint64_t file_number)
Definition: table_cache.cc:115
const char * data() const
Definition: slice.h:40
int64_t TEST_MaxNextLevelOverlappingBytes()
Definition: db_impl.cc:1067
VersionEdit * edit()
Definition: version_set.h:334
virtual Status Sync()=0
CompactionState(Compaction *c)
Definition: db_impl.cc:76
virtual ~DB()
Definition: db_impl.cc:1428
int64_t MaxNextLevelOverlappingBytes()
port::CondVar bg_cv_
Definition: db_impl.h:137
Status TEST_CompactMemTable()
Definition: db_impl.cc:583
void TEST_CompactRange(int level, const Slice *begin, const Slice *end)
Definition: db_impl.cc:548
Status CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:498
Compaction * CompactRange(int level, const InternalKey *begin, const InternalKey *end)
Options const options_
Definition: repair.cc:104
static Status Corruption(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:38
bool start
Definition: db_bench.cc:282
int consecutive_compaction_errors_
Definition: db_impl.h:173
const char * LevelSummary(LevelSummaryStorage *scratch) const
port::AtomicPointer has_imm_
Definition: db_impl.h:140
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:656
void SetLastSequence(uint64_t s)
Definition: version_set.h:212
void AddFile(int level, uint64_t file, uint64_t file_size, const InternalKey &smallest, const InternalKey &largest)
Definition: version_edit.h:62
static void SetSequence(WriteBatch *batch, SequenceNumber seq)
Definition: write_batch.cc:94
const Snapshot * snapshot
Definition: options.h:159
virtual Status status() const =0
bool RecordReadSample(Slice key)
Definition: version_set.cc:449
bool IsTrivialMove() const
void Log(Logger *info_log, const char *format,...)
Definition: env.cc:27
std::string InfoLogFileName(const std::string &dbname)
Definition: filename.cc:58
Status RecoverLogFile(uint64_t log_number, VersionEdit *edit, SequenceNumber *max_sequence) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:357
Iterator * MakeInputIterator(Compaction *c)
virtual bool GetProperty(const Slice &property, std::string *value)
Definition: db_impl.cc:1334
void * NoBarrier_Load() const
Definition: port_win.cc:138
Status WriteLevel0Table(MemTable *mem, VersionEdit *edit, Version *base) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:452
Status AddRecord(const Slice &slice)
Definition: log_writer.cc:27
virtual void SleepForMicroseconds(int micros)=0
void Release_Store(void *v)
Definition: port_win.cc:134
const InternalKey * end
Definition: db_impl.h:164
virtual Status Close()=0
WriteBatch * BuildBatchGroup(Writer **last_writer)
Definition: db_impl.cc:1222
size_t block_size
Definition: options.h:106
Iterator * NewDBIterator(DBImpl *db, const Comparator *user_key_comparator, Iterator *internal_iter, SequenceNumber sequence, uint32_t seed)
Definition: db_iter.cc:307
bool ParseInternalKey(const Slice &internal_key, ParsedInternalKey *result)
Definition: dbformat.h:176
Status LogAndApply(VersionEdit *edit, port::Mutex *mu) EXCLUSIVE_LOCKS_REQUIRED(mu)
Definition: version_set.cc:825
void Delete(const Slice &key)
Definition: write_batch.cc:105
uint64_t LogNumber() const
Definition: version_set.h:221
static Status OK()
Definition: status.h:32
InternalKey smallest
Definition: version_edit.h:22
virtual Slice value() const =0
void SetComparatorName(const Slice &name)
Definition: version_edit.h:35
bool empty() const
Definition: slice.h:46
static Slice Contents(const WriteBatch *batch)
Iterator * TEST_NewInternalIterator()
Definition: db_impl.cc:1061
port::Mutex * mu
Definition: db_impl.cc:1013
virtual void Next()=0
bool NeedsCompaction() const
Definition: version_set.h:251
Version * current() const
Definition: version_set.h:185
Status DestroyDB(const std::string &dbname, const Options &options)
Definition: db_impl.cc:1467
uint64_t SequenceNumber
Definition: dbformat.h:63
ManualCompaction * manual_compaction_
Definition: db_impl.h:167
CompactionStats stats_[config::kNumLevels]
Definition: db_impl.h:190
virtual ~Snapshot()
Definition: db_impl.cc:1464
uint64_t MaxOutputFileSize() const
Definition: version_set.h:343
virtual Status GetChildren(const std::string &dir, std::vector< std::string > *result)=0
Definition: db.h:44
bool UpdateStats(const GetStats &stats)
Definition: version_set.cc:436
static Status InvalidArgument(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:44
std::string TableFileName(const std::string &name, uint64_t number)
Definition: filename.cc:32
void remove_prefix(size_t n)
Definition: slice.h:59
Status Recover(VersionEdit *edit) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:269
bool ConsumeDecimalNumber(Slice *in, uint64_t *val)
Definition: logging.cc:57
const SnapshotImpl * New(SequenceNumber seq)
Definition: snapshot.h:41
WriteBatch * tmp_batch_
Definition: db_impl.h:148
Logger * info_log
Definition: options.h:68
MemTable * mem_
Definition: write_batch.cc:115
Status OpenCompactionOutputFile(CompactionState *compact)
Definition: db_impl.cc:755
bool owns_cache_
Definition: repair.cc:106
bool error_if_exists
Definition: options.h:49
unsigned int uint32_t
Definition: stdint.h:21
std::string DescriptorFileName(const std::string &dbname, uint64_t number)
Definition: filename.cc:37
virtual void ReleaseSnapshot(const Snapshot *snapshot)
Definition: db_impl.cc:1143
size_t size() const
Definition: slice.h:43
unsigned long long uint64_t
Definition: stdint.h:22
static Status Open(const Options &options, const std::string &name, DB **dbptr)
Definition: db_impl.cc:1430
uint32_t seed_
Definition: db_impl.h:144
bool owns_info_log_
Definition: db_impl.h:124
MTState * state
Definition: db_test.cc:1708
Iterator * NewIterator()
Definition: memtable.cc:78
void AddLiveFiles(std::set< uint64_t > *live)
std::vector< Output > outputs
Definition: db_impl.cc:66
std::deque< Writer * > writers_
Definition: db_impl.h:147
Status SetCurrentFile(Env *env, const std::string &dbname, uint64_t descriptor_number)
Definition: filename.cc:121
uint64_t LastSequence() const
Definition: version_set.h:209
std::string CurrentFileName(const std::string &dbname)
Definition: filename.cc:45
void SetLastSequence(SequenceNumber seq)
Definition: version_edit.h:51
Env *const env_
Definition: db_impl.h:120
int NumLevelFiles(int level) const
SnapshotList snapshots_
Definition: db_impl.h:150
Version * version
Definition: db_impl.cc:1014
virtual Status NewSequentialFile(const std::string &fname, SequentialFile **result)=0
virtual void SeekToFirst()=0
MemTable * imm_
Definition: db_impl.h:139
WriteBatch * batch
Definition: db_impl.cc:43
void MarkFileNumberUsed(uint64_t number)
MemTable * imm
Definition: db_impl.cc:1016
std::string const dbname_
Definition: repair.cc:100
uint64_t NewFileNumber()
Definition: version_set.h:191
bool paranoid_checks
Definition: options.h:57
Env *const env_
Definition: repair.cc:101
static void Append(WriteBatch *dst, const WriteBatch *src)
Definition: write_batch.cc:141
uint64_t FileSize() const
virtual Status CreateDir(const std::string &dirname)=0
virtual Status Delete(const WriteOptions &, const Slice &key)
Definition: db_impl.cc:1153
const char * base
Definition: testharness.cc:17
int max_open_files
Definition: options.h:90
WritableFile * logfile_
Definition: db_impl.h:141
std::set< uint64_t > pending_outputs_
Definition: db_impl.h:154
std::string DebugString() const
Definition: dbformat.cc:34
void RegisterCleanup(CleanupFunction function, void *arg1, void *arg2)
Definition: iterator.cc:26
const InternalKeyComparator internal_comparator_
Definition: db_impl.h:121
port::AtomicPointer shutting_down_
Definition: db_impl.h:136
virtual Status Put(const WriteOptions &, const Slice &key, const Slice &value)
Definition: db_impl.cc:1149
Writer(port::Mutex *mu)
Definition: db_impl.cc:48
void BackgroundCall()
Definition: db_impl.cc:619
static Status InsertInto(const WriteBatch *batch, MemTable *memtable)
Definition: write_batch.cc:128
virtual void GetApproximateSizes(const Range *range, int n, uint64_t *sizes)
Definition: db_impl.cc:1388
virtual Status Get(const ReadOptions &options, const Slice &key, std::string *value)
Definition: db_impl.cc:1072
int level() const
Definition: version_set.h:330
Iterator * NewMergingIterator(const Comparator *cmp, Iterator **list, int n)
Definition: merger.cc:186
const FilterPolicy * filter_policy
Definition: options.h:136
void DeleteFile(int level, uint64_t file)
Definition: version_edit.h:75
static SequenceNumber Sequence(const WriteBatch *batch)
Definition: write_batch.cc:90
virtual Iterator * NewIterator(const ReadOptions &)
Definition: db_impl.cc:1119
bool owns_info_log_
Definition: repair.cc:105
Cache * NewLRUCache(size_t capacity)
Definition: cache.cc:321
port::Mutex mutex_
Definition: memenv.cc:374
virtual void Schedule(void(*function)(void *arg), void *arg)=0
bool Get(const LookupKey &key, std::string *value, Status *s)
Definition: memtable.cc:108
uint64_t PrevLogNumber() const
Definition: version_set.h:225
void Add(const CompactionStats &c)
Definition: db_impl.h:184
int num_input_files(int which) const
Definition: version_set.h:337
const Comparator * user_comparator() const
Definition: db_impl.h:196
void EncodeTo(std::string *dst) const
Definition: version_edit.cc:41
DBImpl(const Options &options, const std::string &dbname)
Definition: db_impl.cc:116
const Options options_
Definition: db_impl.h:123
Status MakeRoomForWrite(bool force) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:1271
Status Get(const ReadOptions &, const LookupKey &key, std::string *val, GetStats *stats)
Definition: version_set.cc:337
std::string LogFileName(const std::string &name, uint64_t number)
Definition: filename.cc:27
virtual bool Valid() const =0
virtual Status UnlockFile(FileLock *lock)=0
void SetNextFile(uint64_t num)
Definition: version_edit.h:47
static int Count(const WriteBatch *batch)
Definition: write_batch.cc:82
port::Mutex mutex_
Definition: db_impl.h:135
Options SanitizeOptions(const std::string &dbname, const InternalKeyComparator *icmp, const InternalFilterPolicy *ipolicy, const Options &src)
Definition: db_impl.cc:90
void DeleteObsoleteFiles()
Definition: db_impl.cc:219
virtual Status NewLogger(const std::string &fname, Logger **result)=0
Stats stats
Definition: db_bench.cc:291
SequenceNumber sequence
Definition: dbformat.h:72
Status BuildTable(const std::string &dbname, Env *env, const Options &options, TableCache *table_cache, Iterator *iter, FileMetaData *meta)
Definition: builder.cc:17
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:599
MemTable * mem_
Definition: db_impl.h:138
Status DoCompactionWork(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:853
uint64_t ManifestFileNumber() const
Definition: version_set.h:188
Iterator * NewIterator(const ReadOptions &options, uint64_t file_number, uint64_t file_size, Table **tableptr=NULL)
Definition: table_cache.cc:76
Status InstallCompactionResults(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:832
Status FinishCompactionOutputFile(CompactionState *compact, Iterator *input)
Definition: db_impl.cc:780
void Delete(const SnapshotImpl *s)
Definition: snapshot.h:52
signed long long int64_t
Definition: stdint.h:18
virtual Slice key() const =0
virtual Status NewWritableFile(const std::string &fname, WritableFile **result)=0
Slice user_key() const
Definition: dbformat.h:159
const std::string dbname_
Definition: db_impl.h:126
virtual Status DeleteDir(const std::string &dirname)=0
bool ok() const
Definition: status.h:52
Compaction *const compaction
Definition: db_impl.cc:52
size_t write_buffer_size
Definition: options.h:83
FileType
Definition: filename.h:20
SequenceNumber number_
Definition: snapshot.h:18
MemTable * mem
Definition: db_impl.cc:1015
const InternalKey * begin
Definition: db_impl.h:163
Status bg_error_
Definition: db_impl.h:172
static void BGWork(void *db)
Definition: db_impl.cc:615
Iterator * NewInternalIterator(const ReadOptions &, SequenceNumber *latest_snapshot, uint32_t *seed)
Definition: db_impl.cc:1030
void ReuseFileNumber(uint64_t file_number)
Definition: version_set.h:196
uint64_t NumEntries() const
static size_t ByteSize(const WriteBatch *batch)
void AddIterators(const ReadOptions &, std::vector< Iterator * > *iters)
Definition: version_set.cc:239
SequenceNumber max_sequence
Definition: repair.cc:97
uint64_t logfile_number_
Definition: db_impl.h:142
TableCache * table_cache_
Definition: db_impl.h:129
FileLock * db_lock_
Definition: db_impl.h:132
std::string ToString() const
Definition: slice.h:66
SequenceNumber smallest_snapshot
Definition: db_impl.cc:58
virtual const Snapshot * GetSnapshot()
Definition: db_impl.cc:1138
virtual Status Delete(const WriteOptions &options, const Slice &key)=0
Definition: db_impl.cc:1422
port::CondVar cv
Definition: db_impl.cc:46
void RecordReadSample(Slice key)
Definition: db_impl.cc:1131
void Put(const Slice &key, const Slice &value)
Definition: write_batch.cc:98
FileMetaData * input(int which, int i) const
Definition: version_set.h:340
bool bg_compaction_scheduled_
Definition: db_impl.h:157
virtual bool FileExists(const std::string &fname)=0
const Comparator * comparator
Definition: options.h:41
log::Writer * log_
Definition: db_impl.h:143
bool ShouldStopBefore(const Slice &internal_key)
static Status IOError(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:47
virtual Status Put(const WriteOptions &options, const Slice &key, const Slice &value)=0
Definition: db_impl.cc:1416
std::string LockFileName(const std::string &dbname)
Definition: filename.cc:49
Compaction * PickCompaction()
void CleanupCompaction(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:738
VersionSet * versions_
Definition: db_impl.h:169
std::string ToString() const
Definition: status.cc:36
int64_t NumLevelBytes(int level) const
virtual ~DBImpl()
Definition: db_impl.cc:149
virtual uint64_t NowMicros()=0
SnapshotImpl * oldest() const
Definition: snapshot.h:38