Feathercoin  0.5.0
P2P Digital Currency
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros
db_bench.cc
Go to the documentation of this file.
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include <sys/types.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include "db/db_impl.h"
9 #include "db/version_set.h"
10 #include "leveldb/cache.h"
11 #include "leveldb/db.h"
12 #include "leveldb/env.h"
13 #include "leveldb/write_batch.h"
14 #include "port/port.h"
15 #include "util/crc32c.h"
16 #include "util/histogram.h"
17 #include "util/mutexlock.h"
18 #include "util/random.h"
19 #include "util/testutil.h"
20 
21 // Comma-separated list of operations to run in the specified order
22 // Actual benchmarks:
23 // fillseq -- write N values in sequential key order in async mode
24 // fillrandom -- write N values in random key order in async mode
25 // overwrite -- overwrite N values in random key order in async mode
26 // fillsync -- write N/100 values in random key order in sync mode
27 // fill100K -- write N/1000 100K values in random order in async mode
28 // deleteseq -- delete N keys in sequential order
29 // deleterandom -- delete N keys in random order
30 // readseq -- read N times sequentially
31 // readreverse -- read N times in reverse order
32 // readrandom -- read N times in random order
33 // readmissing -- read N missing keys in random order
34 // readhot -- read N times in random order from 1% section of DB
35 // seekrandom -- N random seeks
36 // crc32c -- repeated crc32c of 4K of data
37 // acquireload -- load N*1000 times
38 // Meta operations:
39 // compact -- Compact the entire DB
40 // stats -- Print DB stats
41 // sstables -- Print sstable info
42 // heapprofile -- Dump a heap profile (if supported by this port)
43 static const char* FLAGS_benchmarks =
44  "fillseq,"
45  "fillsync,"
46  "fillrandom,"
47  "overwrite,"
48  "readrandom,"
49  "readrandom," // Extra run to allow previous compactions to quiesce
50  "readseq,"
51  "readreverse,"
52  "compact,"
53  "readrandom,"
54  "readseq,"
55  "readreverse,"
56  "fill100K,"
57  "crc32c,"
58  "snappycomp,"
59  "snappyuncomp,"
60  "acquireload,"
61  ;
62 
63 // Number of key/values to place in database
64 static int FLAGS_num = 1000000;
65 
66 // Number of read operations to do. If negative, do FLAGS_num reads.
67 static int FLAGS_reads = -1;
68 
69 // Number of concurrent threads to run.
70 static int FLAGS_threads = 1;
71 
72 // Size of each value
73 static int FLAGS_value_size = 100;
74 
75 // Arrange to generate values that shrink to this fraction of
76 // their original size after compression
77 static double FLAGS_compression_ratio = 0.5;
78 
79 // Print histogram of operation timings
80 static bool FLAGS_histogram = false;
81 
82 // Number of bytes to buffer in memtable before compacting
83 // (initialized to default value by "main")
84 static int FLAGS_write_buffer_size = 0;
85 
86 // Number of bytes to use as a cache of uncompressed data.
87 // Negative means use default settings.
88 static int FLAGS_cache_size = -1;
89 
90 // Maximum number of files to keep open at the same time (use default if == 0)
91 static int FLAGS_open_files = 0;
92 
93 // Bloom filter bits per key.
94 // Negative means use default settings.
95 static int FLAGS_bloom_bits = -1;
96 
97 // If true, do not destroy the existing database. If you set this
98 // flag and also specify a benchmark that wants a fresh database, that
99 // benchmark will fail.
100 static bool FLAGS_use_existing_db = false;
101 
102 // Use the db with the following name.
103 static const char* FLAGS_db = NULL;
104 
105 namespace leveldb {
106 
107 namespace {
108 
109 // Helper for quickly generating random data.
110 class RandomGenerator {
111  private:
112  std::string data_;
113  int pos_;
114 
115  public:
116  RandomGenerator() {
117  // We use a limited amount of data over and over again and ensure
118  // that it is larger than the compression window (32KB), and also
119  // large enough to serve all typical value sizes we want to write.
120  Random rnd(301);
121  std::string piece;
122  while (data_.size() < 1048576) {
123  // Add a short fragment that is as compressible as specified
124  // by FLAGS_compression_ratio.
125  test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
126  data_.append(piece);
127  }
128  pos_ = 0;
129  }
130 
131  Slice Generate(int len) {
132  if (pos_ + len > data_.size()) {
133  pos_ = 0;
134  assert(len < data_.size());
135  }
136  pos_ += len;
137  return Slice(data_.data() + pos_ - len, len);
138  }
139 };
140 
141 static Slice TrimSpace(Slice s) {
142  int start = 0;
143  while (start < s.size() && isspace(s[start])) {
144  start++;
145  }
146  int limit = s.size();
147  while (limit > start && isspace(s[limit-1])) {
148  limit--;
149  }
150  return Slice(s.data() + start, limit - start);
151 }
152 
153 static void AppendWithSpace(std::string* str, Slice msg) {
154  if (msg.empty()) return;
155  if (!str->empty()) {
156  str->push_back(' ');
157  }
158  str->append(msg.data(), msg.size());
159 }
160 
161 class Stats {
162  private:
163  double start_;
164  double finish_;
165  double seconds_;
166  int done_;
170  Histogram hist_;
171  std::string message_;
172 
173  public:
174  Stats() { Start(); }
175 
176  void Start() {
177  next_report_ = 100;
178  last_op_finish_ = start_;
179  hist_.Clear();
180  done_ = 0;
181  bytes_ = 0;
182  seconds_ = 0;
183  start_ = Env::Default()->NowMicros();
184  finish_ = start_;
185  message_.clear();
186  }
187 
188  void Merge(const Stats& other) {
189  hist_.Merge(other.hist_);
190  done_ += other.done_;
191  bytes_ += other.bytes_;
192  seconds_ += other.seconds_;
193  if (other.start_ < start_) start_ = other.start_;
194  if (other.finish_ > finish_) finish_ = other.finish_;
195 
196  // Just keep the messages from one thread
197  if (message_.empty()) message_ = other.message_;
198  }
199 
200  void Stop() {
201  finish_ = Env::Default()->NowMicros();
202  seconds_ = (finish_ - start_) * 1e-6;
203  }
204 
205  void AddMessage(Slice msg) {
206  AppendWithSpace(&message_, msg);
207  }
208 
209  void FinishedSingleOp() {
210  if (FLAGS_histogram) {
211  double now = Env::Default()->NowMicros();
212  double micros = now - last_op_finish_;
213  hist_.Add(micros);
214  if (micros > 20000) {
215  fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
216  fflush(stderr);
217  }
218  last_op_finish_ = now;
219  }
220 
221  done_++;
222  if (done_ >= next_report_) {
223  if (next_report_ < 1000) next_report_ += 100;
224  else if (next_report_ < 5000) next_report_ += 500;
225  else if (next_report_ < 10000) next_report_ += 1000;
226  else if (next_report_ < 50000) next_report_ += 5000;
227  else if (next_report_ < 100000) next_report_ += 10000;
228  else if (next_report_ < 500000) next_report_ += 50000;
229  else next_report_ += 100000;
230  fprintf(stderr, "... finished %d ops%30s\r", done_, "");
231  fflush(stderr);
232  }
233  }
234 
235  void AddBytes(int64_t n) {
236  bytes_ += n;
237  }
238 
239  void Report(const Slice& name) {
240  // Pretend at least one op was done in case we are running a benchmark
241  // that does not call FinishedSingleOp().
242  if (done_ < 1) done_ = 1;
243 
244  std::string extra;
245  if (bytes_ > 0) {
246  // Rate is computed on actual elapsed time, not the sum of per-thread
247  // elapsed times.
248  double elapsed = (finish_ - start_) * 1e-6;
249  char rate[100];
250  snprintf(rate, sizeof(rate), "%6.1f MB/s",
251  (bytes_ / 1048576.0) / elapsed);
252  extra = rate;
253  }
254  AppendWithSpace(&extra, message_);
255 
256  fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n",
257  name.ToString().c_str(),
258  seconds_ * 1e6 / done_,
259  (extra.empty() ? "" : " "),
260  extra.c_str());
261  if (FLAGS_histogram) {
262  fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
263  }
264  fflush(stdout);
265  }
266 };
267 
268 // State shared by all concurrent executions of the same benchmark.
269 struct SharedState {
270  port::Mutex mu;
271  port::CondVar cv;
272  int total;
273 
274  // Each thread goes through the following states:
275  // (1) initializing
276  // (2) waiting for others to be initialized
277  // (3) running
278  // (4) done
279 
281  int num_done;
282  bool start;
283 
284  SharedState() : cv(&mu) { }
285 };
286 
287 // Per-thread state for concurrent executions of the same benchmark.
288 struct ThreadState {
289  int tid; // 0..n-1 when running in n threads
290  Random rand; // Has different seeds for different threads
291  Stats stats;
292  SharedState* shared;
293 
294  ThreadState(int index)
295  : tid(index),
296  rand(1000 + index) {
297  }
298 };
299 
300 } // namespace
301 
302 class Benchmark {
303  private:
306  DB* db_;
307  int num_;
311  int reads_;
313 
314  void PrintHeader() {
315  const int kKeySize = 16;
317  fprintf(stdout, "Keys: %d bytes each\n", kKeySize);
318  fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
319  FLAGS_value_size,
320  static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
321  fprintf(stdout, "Entries: %d\n", num_);
322  fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
323  ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_)
324  / 1048576.0));
325  fprintf(stdout, "FileSize: %.1f MB (estimated)\n",
326  (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_)
327  / 1048576.0));
328  PrintWarnings();
329  fprintf(stdout, "------------------------------------------------\n");
330  }
331 
332  void PrintWarnings() {
333 #if defined(__GNUC__) && !defined(__OPTIMIZE__)
334  fprintf(stdout,
335  "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
336  );
337 #endif
338 #ifndef NDEBUG
339  fprintf(stdout,
340  "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
341 #endif
342 
343  // See if snappy is working by attempting to compress a compressible string
344  const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy";
345  std::string compressed;
346  if (!port::Snappy_Compress(text, sizeof(text), &compressed)) {
347  fprintf(stdout, "WARNING: Snappy compression is not enabled\n");
348  } else if (compressed.size() >= sizeof(text)) {
349  fprintf(stdout, "WARNING: Snappy compression is not effective\n");
350  }
351  }
352 
354  fprintf(stderr, "LevelDB: version %d.%d\n",
355  kMajorVersion, kMinorVersion);
356 
357 #if defined(__linux)
358  time_t now = time(NULL);
359  fprintf(stderr, "Date: %s", ctime(&now)); // ctime() adds newline
360 
361  FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
362  if (cpuinfo != NULL) {
363  char line[1000];
364  int num_cpus = 0;
365  std::string cpu_type;
366  std::string cache_size;
367  while (fgets(line, sizeof(line), cpuinfo) != NULL) {
368  const char* sep = strchr(line, ':');
369  if (sep == NULL) {
370  continue;
371  }
372  Slice key = TrimSpace(Slice(line, sep - 1 - line));
373  Slice val = TrimSpace(Slice(sep + 1));
374  if (key == "model name") {
375  ++num_cpus;
376  cpu_type = val.ToString();
377  } else if (key == "cache size") {
378  cache_size = val.ToString();
379  }
380  }
381  fclose(cpuinfo);
382  fprintf(stderr, "CPU: %d * %s\n", num_cpus, cpu_type.c_str());
383  fprintf(stderr, "CPUCache: %s\n", cache_size.c_str());
384  }
385 #endif
386  }
387 
388  public:
390  : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL),
391  filter_policy_(FLAGS_bloom_bits >= 0
392  ? NewBloomFilterPolicy(FLAGS_bloom_bits)
393  : NULL),
394  db_(NULL),
395  num_(FLAGS_num),
396  value_size_(FLAGS_value_size),
397  entries_per_batch_(1),
398  reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
399  heap_counter_(0) {
400  std::vector<std::string> files;
401  Env::Default()->GetChildren(FLAGS_db, &files);
402  for (int i = 0; i < files.size(); i++) {
403  if (Slice(files[i]).starts_with("heap-")) {
404  Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
405  }
406  }
407  if (!FLAGS_use_existing_db) {
408  DestroyDB(FLAGS_db, Options());
409  }
410  }
411 
413  delete db_;
414  delete cache_;
415  delete filter_policy_;
416  }
417 
418  void Run() {
419  PrintHeader();
420  Open();
421 
422  const char* benchmarks = FLAGS_benchmarks;
423  while (benchmarks != NULL) {
424  const char* sep = strchr(benchmarks, ',');
425  Slice name;
426  if (sep == NULL) {
427  name = benchmarks;
428  benchmarks = NULL;
429  } else {
430  name = Slice(benchmarks, sep - benchmarks);
431  benchmarks = sep + 1;
432  }
433 
434  // Reset parameters that may be overriddden bwlow
435  num_ = FLAGS_num;
436  reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
437  value_size_ = FLAGS_value_size;
438  entries_per_batch_ = 1;
439  write_options_ = WriteOptions();
440 
441  void (Benchmark::*method)(ThreadState*) = NULL;
442  bool fresh_db = false;
443  int num_threads = FLAGS_threads;
444 
445  if (name == Slice("fillseq")) {
446  fresh_db = true;
447  method = &Benchmark::WriteSeq;
448  } else if (name == Slice("fillbatch")) {
449  fresh_db = true;
450  entries_per_batch_ = 1000;
451  method = &Benchmark::WriteSeq;
452  } else if (name == Slice("fillrandom")) {
453  fresh_db = true;
454  method = &Benchmark::WriteRandom;
455  } else if (name == Slice("overwrite")) {
456  fresh_db = false;
457  method = &Benchmark::WriteRandom;
458  } else if (name == Slice("fillsync")) {
459  fresh_db = true;
460  num_ /= 1000;
461  write_options_.sync = true;
462  method = &Benchmark::WriteRandom;
463  } else if (name == Slice("fill100K")) {
464  fresh_db = true;
465  num_ /= 1000;
466  value_size_ = 100 * 1000;
467  method = &Benchmark::WriteRandom;
468  } else if (name == Slice("readseq")) {
469  method = &Benchmark::ReadSequential;
470  } else if (name == Slice("readreverse")) {
471  method = &Benchmark::ReadReverse;
472  } else if (name == Slice("readrandom")) {
473  method = &Benchmark::ReadRandom;
474  } else if (name == Slice("readmissing")) {
475  method = &Benchmark::ReadMissing;
476  } else if (name == Slice("seekrandom")) {
477  method = &Benchmark::SeekRandom;
478  } else if (name == Slice("readhot")) {
479  method = &Benchmark::ReadHot;
480  } else if (name == Slice("readrandomsmall")) {
481  reads_ /= 1000;
482  method = &Benchmark::ReadRandom;
483  } else if (name == Slice("deleteseq")) {
484  method = &Benchmark::DeleteSeq;
485  } else if (name == Slice("deleterandom")) {
486  method = &Benchmark::DeleteRandom;
487  } else if (name == Slice("readwhilewriting")) {
488  num_threads++; // Add extra thread for writing
489  method = &Benchmark::ReadWhileWriting;
490  } else if (name == Slice("compact")) {
491  method = &Benchmark::Compact;
492  } else if (name == Slice("crc32c")) {
493  method = &Benchmark::Crc32c;
494  } else if (name == Slice("acquireload")) {
495  method = &Benchmark::AcquireLoad;
496  } else if (name == Slice("snappycomp")) {
497  method = &Benchmark::SnappyCompress;
498  } else if (name == Slice("snappyuncomp")) {
499  method = &Benchmark::SnappyUncompress;
500  } else if (name == Slice("heapprofile")) {
501  HeapProfile();
502  } else if (name == Slice("stats")) {
503  PrintStats("leveldb.stats");
504  } else if (name == Slice("sstables")) {
505  PrintStats("leveldb.sstables");
506  } else {
507  if (name != Slice()) { // No error message for empty name
508  fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
509  }
510  }
511 
512  if (fresh_db) {
513  if (FLAGS_use_existing_db) {
514  fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
515  name.ToString().c_str());
516  method = NULL;
517  } else {
518  delete db_;
519  db_ = NULL;
520  DestroyDB(FLAGS_db, Options());
521  Open();
522  }
523  }
524 
525  if (method != NULL) {
526  RunBenchmark(num_threads, name, method);
527  }
528  }
529  }
530 
531  private:
532  struct ThreadArg {
534  SharedState* shared;
535  ThreadState* thread;
536  void (Benchmark::*method)(ThreadState*);
537  };
538 
539  static void ThreadBody(void* v) {
540  ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
541  SharedState* shared = arg->shared;
542  ThreadState* thread = arg->thread;
543  {
544  MutexLock l(&shared->mu);
545  shared->num_initialized++;
546  if (shared->num_initialized >= shared->total) {
547  shared->cv.SignalAll();
548  }
549  while (!shared->start) {
550  shared->cv.Wait();
551  }
552  }
553 
554  thread->stats.Start();
555  (arg->bm->*(arg->method))(thread);
556  thread->stats.Stop();
557 
558  {
559  MutexLock l(&shared->mu);
560  shared->num_done++;
561  if (shared->num_done >= shared->total) {
562  shared->cv.SignalAll();
563  }
564  }
565  }
566 
567  void RunBenchmark(int n, Slice name,
568  void (Benchmark::*method)(ThreadState*)) {
569  SharedState shared;
570  shared.total = n;
571  shared.num_initialized = 0;
572  shared.num_done = 0;
573  shared.start = false;
574 
575  ThreadArg* arg = new ThreadArg[n];
576  for (int i = 0; i < n; i++) {
577  arg[i].bm = this;
578  arg[i].method = method;
579  arg[i].shared = &shared;
580  arg[i].thread = new ThreadState(i);
581  arg[i].thread->shared = &shared;
582  Env::Default()->StartThread(ThreadBody, &arg[i]);
583  }
584 
585  shared.mu.Lock();
586  while (shared.num_initialized < n) {
587  shared.cv.Wait();
588  }
589 
590  shared.start = true;
591  shared.cv.SignalAll();
592  while (shared.num_done < n) {
593  shared.cv.Wait();
594  }
595  shared.mu.Unlock();
596 
597  for (int i = 1; i < n; i++) {
598  arg[0].thread->stats.Merge(arg[i].thread->stats);
599  }
600  arg[0].thread->stats.Report(name);
601 
602  for (int i = 0; i < n; i++) {
603  delete arg[i].thread;
604  }
605  delete[] arg;
606  }
607 
608  void Crc32c(ThreadState* thread) {
609  // Checksum about 500MB of data total
610  const int size = 4096;
611  const char* label = "(4K per op)";
612  std::string data(size, 'x');
613  int64_t bytes = 0;
614  uint32_t crc = 0;
615  while (bytes < 500 * 1048576) {
616  crc = crc32c::Value(data.data(), size);
617  thread->stats.FinishedSingleOp();
618  bytes += size;
619  }
620  // Print so result is not dead
621  fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));
622 
623  thread->stats.AddBytes(bytes);
624  thread->stats.AddMessage(label);
625  }
626 
627  void AcquireLoad(ThreadState* thread) {
628  int dummy;
629  port::AtomicPointer ap(&dummy);
630  int count = 0;
631  void *ptr = NULL;
632  thread->stats.AddMessage("(each op is 1000 loads)");
633  while (count < 100000) {
634  for (int i = 0; i < 1000; i++) {
635  ptr = ap.Acquire_Load();
636  }
637  count++;
638  thread->stats.FinishedSingleOp();
639  }
640  if (ptr == NULL) exit(1); // Disable unused variable warning.
641  }
642 
643  void SnappyCompress(ThreadState* thread) {
644  RandomGenerator gen;
645  Slice input = gen.Generate(Options().block_size);
646  int64_t bytes = 0;
647  int64_t produced = 0;
648  bool ok = true;
649  std::string compressed;
650  while (ok && bytes < 1024 * 1048576) { // Compress 1G
651  ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
652  produced += compressed.size();
653  bytes += input.size();
654  thread->stats.FinishedSingleOp();
655  }
656 
657  if (!ok) {
658  thread->stats.AddMessage("(snappy failure)");
659  } else {
660  char buf[100];
661  snprintf(buf, sizeof(buf), "(output: %.1f%%)",
662  (produced * 100.0) / bytes);
663  thread->stats.AddMessage(buf);
664  thread->stats.AddBytes(bytes);
665  }
666  }
667 
668  void SnappyUncompress(ThreadState* thread) {
669  RandomGenerator gen;
670  Slice input = gen.Generate(Options().block_size);
671  std::string compressed;
672  bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
673  int64_t bytes = 0;
674  char* uncompressed = new char[input.size()];
675  while (ok && bytes < 1024 * 1048576) { // Compress 1G
676  ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
677  uncompressed);
678  bytes += input.size();
679  thread->stats.FinishedSingleOp();
680  }
681  delete[] uncompressed;
682 
683  if (!ok) {
684  thread->stats.AddMessage("(snappy failure)");
685  } else {
686  thread->stats.AddBytes(bytes);
687  }
688  }
689 
690  void Open() {
691  assert(db_ == NULL);
692  Options options;
693  options.create_if_missing = !FLAGS_use_existing_db;
694  options.block_cache = cache_;
695  options.write_buffer_size = FLAGS_write_buffer_size;
696  options.max_open_files = FLAGS_open_files;
697  options.filter_policy = filter_policy_;
698  Status s = DB::Open(options, FLAGS_db, &db_);
699  if (!s.ok()) {
700  fprintf(stderr, "open error: %s\n", s.ToString().c_str());
701  exit(1);
702  }
703  }
704 
705  void WriteSeq(ThreadState* thread) {
706  DoWrite(thread, true);
707  }
708 
709  void WriteRandom(ThreadState* thread) {
710  DoWrite(thread, false);
711  }
712 
713  void DoWrite(ThreadState* thread, bool seq) {
714  if (num_ != FLAGS_num) {
715  char msg[100];
716  snprintf(msg, sizeof(msg), "(%d ops)", num_);
717  thread->stats.AddMessage(msg);
718  }
719 
720  RandomGenerator gen;
721  WriteBatch batch;
722  Status s;
723  int64_t bytes = 0;
724  for (int i = 0; i < num_; i += entries_per_batch_) {
725  batch.Clear();
726  for (int j = 0; j < entries_per_batch_; j++) {
727  const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
728  char key[100];
729  snprintf(key, sizeof(key), "%016d", k);
730  batch.Put(key, gen.Generate(value_size_));
731  bytes += value_size_ + strlen(key);
732  thread->stats.FinishedSingleOp();
733  }
734  s = db_->Write(write_options_, &batch);
735  if (!s.ok()) {
736  fprintf(stderr, "put error: %s\n", s.ToString().c_str());
737  exit(1);
738  }
739  }
740  thread->stats.AddBytes(bytes);
741  }
742 
743  void ReadSequential(ThreadState* thread) {
744  Iterator* iter = db_->NewIterator(ReadOptions());
745  int i = 0;
746  int64_t bytes = 0;
747  for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
748  bytes += iter->key().size() + iter->value().size();
749  thread->stats.FinishedSingleOp();
750  ++i;
751  }
752  delete iter;
753  thread->stats.AddBytes(bytes);
754  }
755 
756  void ReadReverse(ThreadState* thread) {
757  Iterator* iter = db_->NewIterator(ReadOptions());
758  int i = 0;
759  int64_t bytes = 0;
760  for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
761  bytes += iter->key().size() + iter->value().size();
762  thread->stats.FinishedSingleOp();
763  ++i;
764  }
765  delete iter;
766  thread->stats.AddBytes(bytes);
767  }
768 
769  void ReadRandom(ThreadState* thread) {
770  ReadOptions options;
771  std::string value;
772  int found = 0;
773  for (int i = 0; i < reads_; i++) {
774  char key[100];
775  const int k = thread->rand.Next() % FLAGS_num;
776  snprintf(key, sizeof(key), "%016d", k);
777  if (db_->Get(options, key, &value).ok()) {
778  found++;
779  }
780  thread->stats.FinishedSingleOp();
781  }
782  char msg[100];
783  snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
784  thread->stats.AddMessage(msg);
785  }
786 
787  void ReadMissing(ThreadState* thread) {
788  ReadOptions options;
789  std::string value;
790  for (int i = 0; i < reads_; i++) {
791  char key[100];
792  const int k = thread->rand.Next() % FLAGS_num;
793  snprintf(key, sizeof(key), "%016d.", k);
794  db_->Get(options, key, &value);
795  thread->stats.FinishedSingleOp();
796  }
797  }
798 
799  void ReadHot(ThreadState* thread) {
800  ReadOptions options;
801  std::string value;
802  const int range = (FLAGS_num + 99) / 100;
803  for (int i = 0; i < reads_; i++) {
804  char key[100];
805  const int k = thread->rand.Next() % range;
806  snprintf(key, sizeof(key), "%016d", k);
807  db_->Get(options, key, &value);
808  thread->stats.FinishedSingleOp();
809  }
810  }
811 
812  void SeekRandom(ThreadState* thread) {
813  ReadOptions options;
814  std::string value;
815  int found = 0;
816  for (int i = 0; i < reads_; i++) {
817  Iterator* iter = db_->NewIterator(options);
818  char key[100];
819  const int k = thread->rand.Next() % FLAGS_num;
820  snprintf(key, sizeof(key), "%016d", k);
821  iter->Seek(key);
822  if (iter->Valid() && iter->key() == key) found++;
823  delete iter;
824  thread->stats.FinishedSingleOp();
825  }
826  char msg[100];
827  snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
828  thread->stats.AddMessage(msg);
829  }
830 
831  void DoDelete(ThreadState* thread, bool seq) {
832  RandomGenerator gen;
833  WriteBatch batch;
834  Status s;
835  for (int i = 0; i < num_; i += entries_per_batch_) {
836  batch.Clear();
837  for (int j = 0; j < entries_per_batch_; j++) {
838  const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
839  char key[100];
840  snprintf(key, sizeof(key), "%016d", k);
841  batch.Delete(key);
842  thread->stats.FinishedSingleOp();
843  }
844  s = db_->Write(write_options_, &batch);
845  if (!s.ok()) {
846  fprintf(stderr, "del error: %s\n", s.ToString().c_str());
847  exit(1);
848  }
849  }
850  }
851 
852  void DeleteSeq(ThreadState* thread) {
853  DoDelete(thread, true);
854  }
855 
856  void DeleteRandom(ThreadState* thread) {
857  DoDelete(thread, false);
858  }
859 
860  void ReadWhileWriting(ThreadState* thread) {
861  if (thread->tid > 0) {
862  ReadRandom(thread);
863  } else {
864  // Special thread that keeps writing until other threads are done.
865  RandomGenerator gen;
866  while (true) {
867  {
868  MutexLock l(&thread->shared->mu);
869  if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
870  // Other threads have finished
871  break;
872  }
873  }
874 
875  const int k = thread->rand.Next() % FLAGS_num;
876  char key[100];
877  snprintf(key, sizeof(key), "%016d", k);
878  Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
879  if (!s.ok()) {
880  fprintf(stderr, "put error: %s\n", s.ToString().c_str());
881  exit(1);
882  }
883  }
884 
885  // Do not count any of the preceding work/delay in stats.
886  thread->stats.Start();
887  }
888  }
889 
890  void Compact(ThreadState* thread) {
891  db_->CompactRange(NULL, NULL);
892  }
893 
894  void PrintStats(const char* key) {
895  std::string stats;
896  if (!db_->GetProperty(key, &stats)) {
897  stats = "(failed)";
898  }
899  fprintf(stdout, "\n%s\n", stats.c_str());
900  }
901 
902  static void WriteToFile(void* arg, const char* buf, int n) {
903  reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
904  }
905 
906  void HeapProfile() {
907  char fname[100];
908  snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
909  WritableFile* file;
910  Status s = Env::Default()->NewWritableFile(fname, &file);
911  if (!s.ok()) {
912  fprintf(stderr, "%s\n", s.ToString().c_str());
913  return;
914  }
915  bool ok = port::GetHeapProfile(WriteToFile, file);
916  delete file;
917  if (!ok) {
918  fprintf(stderr, "heap profiling not supported\n");
919  Env::Default()->DeleteFile(fname);
920  }
921  }
922 };
923 
924 } // namespace leveldb
925 
926 int main(int argc, char** argv) {
927  FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
928  FLAGS_open_files = leveldb::Options().max_open_files;
929  std::string default_db_path;
930 
931  for (int i = 1; i < argc; i++) {
932  double d;
933  int n;
934  char junk;
935  if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) {
936  FLAGS_benchmarks = argv[i] + strlen("--benchmarks=");
937  } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) {
938  FLAGS_compression_ratio = d;
939  } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
940  (n == 0 || n == 1)) {
941  FLAGS_histogram = n;
942  } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
943  (n == 0 || n == 1)) {
944  FLAGS_use_existing_db = n;
945  } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
946  FLAGS_num = n;
947  } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
948  FLAGS_reads = n;
949  } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
950  FLAGS_threads = n;
951  } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
952  FLAGS_value_size = n;
953  } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
954  FLAGS_write_buffer_size = n;
955  } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
956  FLAGS_cache_size = n;
957  } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
958  FLAGS_bloom_bits = n;
959  } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
960  FLAGS_open_files = n;
961  } else if (strncmp(argv[i], "--db=", 5) == 0) {
962  FLAGS_db = argv[i] + 5;
963  } else {
964  fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
965  exit(1);
966  }
967  }
968 
969  // Choose a location for the test database if none given with --db=<path>
970  if (FLAGS_db == NULL) {
971  leveldb::Env::Default()->GetTestDirectory(&default_db_path);
972  default_db_path += "/dbbench";
973  FLAGS_db = default_db_path.c_str();
974  }
975 
976  leveldb::Benchmark benchmark;
977  benchmark.Run();
978  return 0;
979 }
virtual void CompactRange(const Slice *begin, const Slice *end)=0
void WriteRandom(ThreadState *thread)
Definition: db_bench.cc:709
void RunBenchmark(int n, Slice name, void(Benchmark::*method)(ThreadState *))
Definition: db_bench.cc:567
virtual Status DeleteFile(const std::string &fname)=0
Cache * block_cache
Definition: options.h:98
virtual void Prev()=0
virtual Status Write(const WriteOptions &options, WriteBatch *updates)=0
std::string * value
Definition: version_set.cc:270
bool create_if_missing
Definition: options.h:45
bool Snappy_Uncompress(const char *input_data, size_t input_length, char *output)
Definition: port_posix.h:145
static void WriteToFile(void *arg, const char *buf, int n)
Definition: db_bench.cc:902
void ReadReverse(ThreadState *thread)
Definition: db_bench.cc:756
void * Acquire_Load() const
Definition: port_win.cc:128
bool starts_with(const Slice &x) const
Definition: slice.h:75
int64_t bytes_
Definition: db_bench.cc:168
const char * data() const
Definition: slice.h:40
std::string message_
Definition: db_bench.cc:171
void ReadWhileWriting(ThreadState *thread)
Definition: db_bench.cc:860
SharedState * shared
Definition: db_bench.cc:292
bool start
Definition: db_bench.cc:282
int tid
Definition: db_bench.cc:289
void SnappyCompress(ThreadState *thread)
Definition: db_bench.cc:643
int num_done
Definition: db_bench.cc:281
void AcquireLoad(ThreadState *thread)
Definition: db_bench.cc:627
int total
Definition: db_bench.cc:272
int next_report_
Definition: db_bench.cc:167
void DeleteRandom(ThreadState *thread)
Definition: db_bench.cc:856
Slice CompressibleString(Random *rnd, double compressed_fraction, int len, std::string *dst)
Definition: testutil.cc:34
virtual Iterator * NewIterator(const ReadOptions &options)=0
void Delete(const Slice &key)
Definition: write_batch.cc:105
virtual Slice value() const =0
virtual void Next()=0
Status DestroyDB(const std::string &dbname, const Options &options)
Definition: db_impl.cc:1467
void SnappyUncompress(ThreadState *thread)
Definition: db_bench.cc:668
virtual void SeekToLast()=0
virtual Status GetChildren(const std::string &dir, std::vector< std::string > *result)=0
int pos_
Definition: db_bench.cc:113
port::CondVar cv
Definition: db_bench.cc:271
int std::string int dummy
Definition: util.h:164
Definition: db.h:44
double seconds_
Definition: db_bench.cc:165
void(Benchmark::* method)(ThreadState *)
Definition: db_bench.cc:536
Random rand
Definition: db_bench.cc:290
unsigned int uint32_t
Definition: stdint.h:21
bool Snappy_Compress(const char *input, size_t input_length, std::string *output)
size_t size() const
Definition: slice.h:43
double last_op_finish_
Definition: db_bench.cc:169
static Status Open(const Options &options, const std::string &name, DB **dbptr)
Definition: db_impl.cc:1430
bool GetHeapProfile(void(*func)(void *, const char *, int), void *arg)
Definition: port_posix.h:154
virtual void Seek(const Slice &target)=0
void DoWrite(ThreadState *thread, bool seq)
Definition: db_bench.cc:713
const FilterPolicy * NewBloomFilterPolicy(int bits_per_key)
Definition: bloom.cc:91
void ReadHot(ThreadState *thread)
Definition: db_bench.cc:799
virtual void SeekToFirst()=0
void SeekRandom(ThreadState *thread)
Definition: db_bench.cc:812
virtual Status GetTestDirectory(std::string *path)=0
void Crc32c(ThreadState *thread)
Definition: db_bench.cc:608
void ReadMissing(ThreadState *thread)
Definition: db_bench.cc:787
void ReadSequential(ThreadState *thread)
Definition: db_bench.cc:743
void Compact(ThreadState *thread)
Definition: db_bench.cc:890
int max_open_files
Definition: options.h:90
int main(int argc, char **argv)
Definition: db_bench.cc:926
port::Mutex mu
Definition: db_bench.cc:270
void DeleteSeq(ThreadState *thread)
Definition: db_bench.cc:852
const FilterPolicy * filter_policy
Definition: options.h:136
Cache * NewLRUCache(size_t capacity)
Definition: cache.cc:321
uint32_t Value(const char *data, size_t n)
Definition: crc32c.h:20
double finish_
Definition: db_bench.cc:164
virtual Status Get(const ReadOptions &options, const Slice &key, std::string *value)=0
const FilterPolicy * filter_policy_
Definition: db_bench.cc:305
int done_
Definition: db_bench.cc:166
void PrintStats(const char *key)
Definition: db_bench.cc:894
virtual bool Valid() const =0
Stats stats
Definition: db_bench.cc:291
void DoDelete(ThreadState *thread, bool seq)
Definition: db_bench.cc:831
void ReadRandom(ThreadState *thread)
Definition: db_bench.cc:769
signed long long int64_t
Definition: stdint.h:18
virtual Slice key() const =0
virtual Status NewWritableFile(const std::string &fname, WritableFile **result)=0
bool ok() const
Definition: status.h:52
void PrintWarnings()
Definition: db_bench.cc:332
size_t write_buffer_size
Definition: options.h:83
std::string data_
Definition: db_bench.cc:112
void PrintEnvironment()
Definition: db_bench.cc:353
double start_
Definition: db_bench.cc:163
virtual bool GetProperty(const Slice &property, std::string *value)=0
static void ThreadBody(void *v)
Definition: db_bench.cc:539
Histogram hist_
Definition: db_bench.cc:170
int num_initialized
Definition: db_bench.cc:280
std::string ToString() const
Definition: slice.h:66
virtual void StartThread(void(*function)(void *arg), void *arg)=0
void Put(const Slice &key, const Slice &value)
Definition: write_batch.cc:98
WriteOptions write_options_
Definition: db_bench.cc:310
void * arg
Definition: env_posix.cc:716
void WriteSeq(ThreadState *thread)
Definition: db_bench.cc:705
static Env * Default()
Definition: env_posix.cc:800
virtual Status Put(const WriteOptions &options, const Slice &key, const Slice &value)=0
Definition: db_impl.cc:1416
const char * name
Definition: testharness.cc:18
std::string ToString() const
Definition: status.cc:36
virtual uint64_t NowMicros()=0