85 template <
class T,
class V>
86 static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
87 if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
88 if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
97 ClipToRange(&result.
max_open_files, 64 + kNumNonTableCacheFiles, 50000);
117 :
env_(raw_options.env),
118 internal_comparator_(raw_options.comparator),
119 internal_filter_policy_(raw_options.filter_policy),
121 &internal_filter_policy_, raw_options)),
126 shutting_down_(NULL),
135 bg_compaction_scheduled_(false),
136 manual_compaction_(NULL),
137 consecutive_compaction_errors_(0) {
224 std::vector<std::string> filenames;
228 for (
size_t i = 0; i < filenames.size(); i++) {
242 keep = (live.find(number) != live.end());
247 keep = (live.find(number) != live.end());
262 static_cast<unsigned long long>(number));
290 dbname_,
"does not exist (create_if_missing is false)");
295 dbname_,
"exists (error_if_exists is true)");
312 std::vector<std::string> filenames;
317 std::set<uint64_t> expected;
321 std::vector<uint64_t> logs;
322 for (
size_t i = 0; i < filenames.size(); i++) {
324 expected.erase(number);
325 if (type ==
kLogFile && ((number >= min_log) || (number == prev_log)))
326 logs.push_back(number);
329 if (!expected.empty()) {
331 snprintf(buf,
sizeof(buf),
"%d missing files; e.g.",
332 static_cast<int>(expected.size()));
337 std::sort(logs.begin(), logs.end());
338 for (
size_t i = 0; i < logs.size(); i++) {
365 virtual void Corruption(
size_t bytes,
const Status& s) {
366 Log(info_log,
"%s%s: dropping %d bytes; %s",
367 (this->status == NULL ?
"(ignoring error) " :
""),
368 fname, static_cast<int>(bytes), s.
ToString().c_str());
369 if (this->status != NULL && this->status->
ok()) *this->status = s;
385 LogReporter reporter;
388 reporter.fname = fname.c_str();
397 (
unsigned long long) log_number);
404 while (reader.
ReadRecord(&record, &scratch) &&
406 if (record.
size() < 12) {
425 if (last_seq > *max_sequence) {
426 *max_sequence = last_seq;
441 if (status.
ok() && mem != NULL) {
447 if (mem != NULL) mem->
Unref();
461 (
unsigned long long) meta.
number);
471 (
unsigned long long) meta.
number,
500 assert(
imm_ != NULL);
532 int max_level_with_files = 1;
536 for (
int level = 1; level < config::kNumLevels; level++) {
538 max_level_with_files = level;
543 for (
int level = 0; level < max_level_with_files; level++) {
550 assert(level + 1 < config::kNumLevels);
555 manual.
level = level;
560 begin_storage =
InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
561 manual.
begin = &begin_storage;
566 end_storage =
InternalKey(*end, 0, static_cast<ValueType>(0));
567 manual.
end = &end_storage;
571 while (!manual.
done) {
605 }
else if (
imm_ == NULL &&
639 int seconds_to_sleep = 1;
641 seconds_to_sleep *= 2;
669 m->
done = (c == NULL);
674 "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
692 f->smallest, f->largest);
696 static_cast<unsigned long long>(f->number),
698 static_cast<unsigned long long>(f->file_size),
716 "Compaction error: %s", status.
ToString().c_str());
740 if (compact->
builder != NULL) {
745 assert(compact->
outfile == NULL);
748 for (
size_t i = 0; i < compact->
outputs.size(); i++) {
756 assert(compact != NULL);
757 assert(compact->
builder == NULL);
767 compact->
outputs.push_back(out);
782 assert(compact != NULL);
783 assert(compact->
outfile != NULL);
784 assert(compact->
builder != NULL);
787 assert(output_number != 0);
813 if (s.
ok() && current_entries > 0) {
822 "Generated table #%llu: %lld keys, %lld bytes",
823 (
unsigned long long) output_number,
824 (
unsigned long long) current_entries,
825 (
unsigned long long) current_bytes);
844 for (
size_t i = 0; i < compact->
outputs.size(); i++) {
864 assert(compact->
builder == NULL);
865 assert(compact->
outfile == NULL);
879 std::string current_user_key;
880 bool has_current_user_key =
false;
908 current_user_key.clear();
909 has_current_user_key =
false;
910 last_sequence_for_key = kMaxSequenceNumber;
912 if (!has_current_user_key ||
914 Slice(current_user_key)) != 0) {
917 has_current_user_key =
true;
918 last_sequence_for_key = kMaxSequenceNumber;
921 if (last_sequence_for_key <= compact->smallest_snapshot) {
937 last_sequence_for_key = ikey.
sequence;
941 " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
942 "%d smallest_snapshot: %d",
951 if (compact->
builder == NULL) {
979 if (status.
ok() && compact->
builder != NULL) {
990 for (
int which = 0; which < 2; which++) {
995 for (
size_t i = 0; i < compact->
outputs.size(); i++) {
1019 static void CleanupIteratorState(
void* arg1,
void* arg2) {
1020 IterState*
state =
reinterpret_cast<IterState*
>(arg1);
1022 state->mem->Unref();
1023 if (state->imm != NULL) state->imm->Unref();
1024 state->version->Unref();
1025 state->mu->Unlock();
1033 IterState* cleanup =
new IterState;
1038 std::vector<Iterator*> list;
1051 cleanup->mem =
mem_;
1052 cleanup->imm =
imm_;
1058 return internal_iter;
1074 std::string*
value) {
1088 if (imm != NULL) imm->
Ref();
1091 bool have_stat_update =
false;
1099 if (mem->
Get(lkey, value, &s)) {
1101 }
else if (imm != NULL && imm->
Get(lkey, value, &s)) {
1104 s = current->
Get(options, lkey, value, &stats);
1105 have_stat_update =
true;
1110 if (have_stat_update && current->
UpdateStats(stats)) {
1114 if (imm != NULL) imm->
Unref();
1126 ? reinterpret_cast<const SnapshotImpl*>(options.
snapshot)->number_
1175 Writer* last_writer = &w;
1176 if (status.
ok() && my_batch != NULL) {
1188 if (status.
ok() && options.
sync) {
1209 if (ready == last_writer)
break;
1226 assert(result != NULL);
1233 size_t max_size = 1 << 20;
1234 if (size <= (128<<10)) {
1235 max_size = size + (128<<10);
1238 *last_writer = first;
1239 std::deque<Writer*>::iterator iter =
writers_.begin();
1241 for (; iter !=
writers_.end(); ++iter) {
1243 if (w->
sync && !first->sync) {
1248 if (w->
batch != NULL) {
1250 if (size > max_size) {
1256 if (result == first->batch) {
1274 bool allow_delay = !force;
1292 allow_delay =
false;
1294 }
else if (!force &&
1298 }
else if (
imm_ != NULL) {
1338 Slice in = property;
1339 Slice prefix(
"leveldb.");
1347 if (!ok || level >= config::kNumLevels) {
1351 snprintf(buf,
sizeof(buf),
"%d",
1356 }
else if (in ==
"stats") {
1358 snprintf(buf,
sizeof(buf),
1360 "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1361 "--------------------------------------------------\n"
1364 for (
int level = 0; level < config::kNumLevels; level++) {
1366 if (
stats_[level].micros > 0 || files > 0) {
1369 "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
1380 }
else if (in ==
"sstables") {
1389 const Range* range,
int n,
1399 for (
int i = 0; i < n; i++) {
1402 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1405 sizes[i] = (limit >= start ? limit - start : 0);
1418 batch.
Put(key, value);
1419 return Write(opt, &batch);
1425 return Write(opt, &batch);
1469 std::vector<std::string> filenames;
1472 if (filenames.empty()) {
1482 for (
size_t i = 0; i < filenames.size(); i++) {
1486 if (result.
ok() && !del.
ok()) {
uint64_t ApproximateOffsetOf(Version *v, const InternalKey &key)
std::string DebugString() const
void MaybeIgnoreError(Status *s) const
bool IsBaseLevelForKey(const Slice &user_key)
virtual Status Write(const WriteOptions &options, WriteBatch *updates)
void Add(const Slice &key, const Slice &value)
virtual Status LockFile(const std::string &fname, FileLock **lock)=0
virtual Status DeleteFile(const std::string &fname)=0
const int kNumNonTableCacheFiles
size_t ApproximateMemoryUsage()
virtual void CompactRange(const Slice *begin, const Slice *end)
void AddInputDeletions(VersionEdit *edit)
void SetPrevLogNumber(uint64_t num)
int PickLevelForMemTableOutput(const Slice &smallest_user_key, const Slice &largest_user_key)
virtual Status Write(const WriteOptions &options, WriteBatch *updates)=0
bool OverlapInLevel(int level, const Slice *smallest_user_key, const Slice *largest_user_key)
void DecodeFrom(const Slice &s)
void SetLogNumber(uint64_t num)
static void SetContents(WriteBatch *batch, const Slice &contents)
bool ParseFileName(const std::string &fname, uint64_t *number, FileType *type)
void * Acquire_Load() const
bool starts_with(const Slice &x) const
bool ReadRecord(Slice *record, std::string *scratch)
std::string OldInfoLogFileName(const std::string &dbname)
virtual Status RenameFile(const std::string &src, const std::string &target)=0
void Evict(uint64_t file_number)
const char * data() const
int64_t TEST_MaxNextLevelOverlappingBytes()
CompactionState(Compaction *c)
int64_t MaxNextLevelOverlappingBytes()
Status TEST_CompactMemTable()
void TEST_CompactRange(int level, const Slice *begin, const Slice *end)
Status CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Compaction * CompactRange(int level, const InternalKey *begin, const InternalKey *end)
static Status Corruption(const Slice &msg, const Slice &msg2=Slice())
int consecutive_compaction_errors_
const char * LevelSummary(LevelSummaryStorage *scratch) const
port::AtomicPointer has_imm_
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
void SetLastSequence(uint64_t s)
void AddFile(int level, uint64_t file, uint64_t file_size, const InternalKey &smallest, const InternalKey &largest)
static void SetSequence(WriteBatch *batch, SequenceNumber seq)
const Snapshot * snapshot
virtual Status status() const =0
bool RecordReadSample(Slice key)
bool IsTrivialMove() const
void Log(Logger *info_log, const char *format,...)
std::string InfoLogFileName(const std::string &dbname)
Status RecoverLogFile(uint64_t log_number, VersionEdit *edit, SequenceNumber *max_sequence) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Iterator * MakeInputIterator(Compaction *c)
virtual bool GetProperty(const Slice &property, std::string *value)
void * NoBarrier_Load() const
Status WriteLevel0Table(MemTable *mem, VersionEdit *edit, Version *base) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Status AddRecord(const Slice &slice)
virtual void SleepForMicroseconds(int micros)=0
void Release_Store(void *v)
WriteBatch * BuildBatchGroup(Writer **last_writer)
Iterator * NewDBIterator(DBImpl *db, const Comparator *user_key_comparator, Iterator *internal_iter, SequenceNumber sequence, uint32_t seed)
bool ParseInternalKey(const Slice &internal_key, ParsedInternalKey *result)
Status LogAndApply(VersionEdit *edit, port::Mutex *mu) EXCLUSIVE_LOCKS_REQUIRED(mu)
void Delete(const Slice &key)
uint64_t LogNumber() const
virtual Slice value() const =0
void SetComparatorName(const Slice &name)
static Slice Contents(const WriteBatch *batch)
Iterator * TEST_NewInternalIterator()
bool NeedsCompaction() const
Version * current() const
Status DestroyDB(const std::string &dbname, const Options &options)
ManualCompaction * manual_compaction_
CompactionStats stats_[config::kNumLevels]
uint64_t MaxOutputFileSize() const
virtual Status GetChildren(const std::string &dir, std::vector< std::string > *result)=0
bool UpdateStats(const GetStats &stats)
static Status InvalidArgument(const Slice &msg, const Slice &msg2=Slice())
std::string TableFileName(const std::string &name, uint64_t number)
void remove_prefix(size_t n)
Status Recover(VersionEdit *edit) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
bool ConsumeDecimalNumber(Slice *in, uint64_t *val)
const SnapshotImpl * New(SequenceNumber seq)
Status OpenCompactionOutputFile(CompactionState *compact)
std::string DescriptorFileName(const std::string &dbname, uint64_t number)
virtual void ReleaseSnapshot(const Snapshot *snapshot)
unsigned long long uint64_t
static Status Open(const Options &options, const std::string &name, DB **dbptr)
void AddLiveFiles(std::set< uint64_t > *live)
std::vector< Output > outputs
std::deque< Writer * > writers_
Status SetCurrentFile(Env *env, const std::string &dbname, uint64_t descriptor_number)
uint64_t LastSequence() const
std::string CurrentFileName(const std::string &dbname)
void SetLastSequence(SequenceNumber seq)
int NumLevelFiles(int level) const
virtual Status NewSequentialFile(const std::string &fname, SequentialFile **result)=0
virtual void SeekToFirst()=0
void MarkFileNumberUsed(uint64_t number)
std::string const dbname_
static void Append(WriteBatch *dst, const WriteBatch *src)
uint64_t FileSize() const
virtual Status CreateDir(const std::string &dirname)=0
virtual Status Delete(const WriteOptions &, const Slice &key)
std::set< uint64_t > pending_outputs_
std::string DebugString() const
Output * current_output()
void RegisterCleanup(CleanupFunction function, void *arg1, void *arg2)
const InternalKeyComparator internal_comparator_
port::AtomicPointer shutting_down_
virtual Status Put(const WriteOptions &, const Slice &key, const Slice &value)
static Status InsertInto(const WriteBatch *batch, MemTable *memtable)
virtual void GetApproximateSizes(const Range *range, int n, uint64_t *sizes)
virtual Status Get(const ReadOptions &options, const Slice &key, std::string *value)
Iterator * NewMergingIterator(const Comparator *cmp, Iterator **list, int n)
const FilterPolicy * filter_policy
void DeleteFile(int level, uint64_t file)
static SequenceNumber Sequence(const WriteBatch *batch)
virtual Iterator * NewIterator(const ReadOptions &)
Cache * NewLRUCache(size_t capacity)
virtual void Schedule(void(*function)(void *arg), void *arg)=0
bool Get(const LookupKey &key, std::string *value, Status *s)
uint64_t PrevLogNumber() const
void Add(const CompactionStats &c)
int num_input_files(int which) const
const Comparator * user_comparator() const
void EncodeTo(std::string *dst) const
DBImpl(const Options &options, const std::string &dbname)
Status MakeRoomForWrite(bool force) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Status Get(const ReadOptions &, const LookupKey &key, std::string *val, GetStats *stats)
std::string LogFileName(const std::string &name, uint64_t number)
virtual bool Valid() const =0
virtual Status UnlockFile(FileLock *lock)=0
void SetNextFile(uint64_t num)
static int Count(const WriteBatch *batch)
Options SanitizeOptions(const std::string &dbname, const InternalKeyComparator *icmp, const InternalFilterPolicy *ipolicy, const Options &src)
void DeleteObsoleteFiles()
virtual Status NewLogger(const std::string &fname, Logger **result)=0
Status BuildTable(const std::string &dbname, Env *env, const Options &options, TableCache *table_cache, Iterator *iter, FileMetaData *meta)
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Status DoCompactionWork(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
uint64_t ManifestFileNumber() const
Iterator * NewIterator(const ReadOptions &options, uint64_t file_number, uint64_t file_size, Table **tableptr=NULL)
Status InstallCompactionResults(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Status FinishCompactionOutputFile(CompactionState *compact, Iterator *input)
void Delete(const SnapshotImpl *s)
virtual Slice key() const =0
virtual Status NewWritableFile(const std::string &fname, WritableFile **result)=0
const std::string dbname_
virtual Status DeleteDir(const std::string &dirname)=0
Compaction *const compaction
const InternalKey * begin
static void BGWork(void *db)
Iterator * NewInternalIterator(const ReadOptions &, SequenceNumber *latest_snapshot, uint32_t *seed)
void ReuseFileNumber(uint64_t file_number)
uint64_t NumEntries() const
static size_t ByteSize(const WriteBatch *batch)
void AddIterators(const ReadOptions &, std::vector< Iterator * > *iters)
SequenceNumber max_sequence
TableCache * table_cache_
std::string ToString() const
SequenceNumber smallest_snapshot
virtual const Snapshot * GetSnapshot()
virtual Status Delete(const WriteOptions &options, const Slice &key)=0
void RecordReadSample(Slice key)
void Put(const Slice &key, const Slice &value)
FileMetaData * input(int which, int i) const
bool bg_compaction_scheduled_
virtual bool FileExists(const std::string &fname)=0
const Comparator * comparator
bool ShouldStopBefore(const Slice &internal_key)
static Status IOError(const Slice &msg, const Slice &msg2=Slice())
virtual Status Put(const WriteOptions &options, const Slice &key, const Slice &value)=0
std::string LockFileName(const std::string &dbname)
Compaction * PickCompaction()
void CleanupCompaction(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
std::string ToString() const
int64_t NumLevelBytes(int level) const
virtual uint64_t NowMicros()=0
SnapshotImpl * oldest() const