bench_db.cpp 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. /*
  2. * Tencent is pleased to support the open source community by making wwsearch
  3. * available.
  4. *
  5. * Copyright (C) 2018-present Tencent. All Rights Reserved.
  6. *
  7. * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  8. * use this file except in compliance with the License. You may obtain a copy of
  9. * the License at
  10. *
  11. * https://opensource.org/licenses/Apache-2.0
  12. *
  13. * Unless required by applicable law or agreed to in writing, software
  14. * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  15. * WARRANTIES OF ANY KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations under the License.
  17. */
  18. #include <time.h>
  19. #include <thread>
  20. #include "bench_db.h"
  21. #include "random_creater.h"
  22. #include "include/header.h"
  23. #include "include/index_wrapper.h"
  24. // Bench design:
  25. // -d data dir,example /data1/test/db_0
  26. // -p intance of db
  27. // -n thread per instance
  28. // -t run times per instance
  29. // -m mode,merge or put?
  30. namespace wwsearch {
  31. static int kThreadStop = false;
  32. const char *BenchDB::Description =
  33. "-d [data dir] -p [instance of db] -n [threads per db] -t [run times per "
  34. "instance] -m [0:not merge 1:merge]";
  35. const char *BenchDB::Usage = "Benchmark for db put";
  36. static constexpr int kFlushLogMaxSize = 20 * 1024 * 1024;
  37. void BenchDB::Run(wwsearch::ArgsHelper &args) {
  38. printf("doing %s\n", __FUNCTION__);
  39. uint32_t instance_num = args.UInt('p');
  40. uint32_t mode = args.UInt('m');
  41. int thread_num = args.UInt('n');
  42. assert(thread_num > 0);
  43. uint64_t run_times = args.UInt64('t');
  44. std::vector<DefaultIndexWrapper *> indexers;
  45. std::vector<std::thread *> new_threads;
  46. std::vector<std::thread *> staticstic_threads;
  47. std::vector<RandomCreater *> random_creaters;
  48. unsigned int begin_seed = time(NULL);
  49. printf("mode=%d\n", mode);
  50. char buffer[256];
  51. for (size_t i = 0; i < instance_num; i++) {
  52. snprintf(buffer, sizeof(buffer), "%s/db_%d", args.String('d').c_str(), i);
  53. DefaultIndexWrapper *indexer = new DefaultIndexWrapper();
  54. indexer->DBParams().path = buffer;
  55. auto search_stuats = indexer->Open();
  56. if (!search_stuats.OK()) {
  57. printf("open db fail,ret:%d,msg:%s\n", search_stuats.GetCode(),
  58. search_stuats.GetState().c_str());
  59. }
  60. assert(search_stuats.OK());
  61. indexers.push_back(indexer);
  62. snprintf(buffer, sizeof(buffer), "./staticstic_log/bench_db_%d", i);
  63. Staticstic *staticstic = new Staticstic(buffer, 10, kFlushLogMaxSize);
  64. printf("instance [%d],see log [%s]\n", i, buffer);
  65. for (size_t i = 0; i < thread_num; i++) {
  66. RandomCreater *random_creater = new RandomCreater();
  67. random_creater->Init(begin_seed++);
  68. random_creaters.push_back(random_creater);
  69. std::thread *new_thread = new std::thread(
  70. BenchDB::ThreadRun, std::ref(*(indexers.back())),
  71. run_times / thread_num, std::ref(*staticstic), random_creater, mode);
  72. new_threads.push_back(new_thread);
  73. }
  74. std::thread *staticstic_thread =
  75. new std::thread(BenchDB::PrintStaitic, std::ref(*staticstic));
  76. staticstic_threads.push_back(staticstic_thread);
  77. }
  78. for (size_t i = 0; i < new_threads.size(); i++) {
  79. new_threads[i]->join();
  80. delete new_threads[i];
  81. delete random_creaters[i];
  82. }
  83. kThreadStop = true;
  84. for (size_t i = 0; i < staticstic_threads.size(); i++) {
  85. staticstic_threads[i]->join();
  86. delete staticstic_threads[i];
  87. }
  88. SearchLogDebug("Run finish");
  89. return;
  90. }
  91. void BenchDB::ThreadRun(DefaultIndexWrapper &wrapper, uint64_t run_times,
  92. Staticstic &staticstic, RandomCreater *random_creater,
  93. int mode) {
  94. char buffer[200];
  95. #define ATTR_SIZE (6)
  96. const char *format[ATTR_SIZE] = {
  97. "name_%010u", "pinyin_%010u", "english_%010u", "1%10u", "mail%10u", "%u"};
  98. while (run_times-- > 0 && !kThreadStop) {
  99. wwsearch::Document document;
  100. DocumentID doc_id = random_creater->GetUInt64();
  101. if (0 == doc_id) doc_id = 1;
  102. document.SetID(doc_id); // start from 1
  103. uint32_t v;
  104. for (int field_id = 0; field_id < ATTR_SIZE; field_id++) {
  105. wwsearch::IndexFieldFlag flag;
  106. flag.SetStoredField();
  107. flag.SetTokenize();
  108. if (field_id == 3) flag.SetSuffixBuild();
  109. snprintf(buffer, sizeof(buffer), format[field_id],
  110. random_creater->GetUInt32());
  111. auto field = document.AddField();
  112. field->SetMeta(field_id, flag);
  113. field->SetString(buffer);
  114. }
  115. std::string str;
  116. assert(document.SerializeToBytes(str, 0));
  117. SearchStatus status;
  118. auto db = wrapper.vdb_;
  119. auto buffer = db->NewWriteBuffer(nullptr);
  120. if (mode == 0) {
  121. status =
  122. buffer->Put(kStoredFieldColumn, random_creater->GetString(10), str);
  123. } else {
  124. status =
  125. buffer->Merge(kStoredFieldColumn, random_creater->GetString(10), str);
  126. }
  127. if (!status.OK()) {
  128. printf("mode:%d,buffer->put/merge ret:%d,msg:%s\n", mode,
  129. status.GetCode(), status.GetState().c_str());
  130. }
  131. assert(status.OK());
  132. timeval begin, end;
  133. gettimeofday(&begin, NULL);
  134. status = db->FlushBuffer(buffer);
  135. gettimeofday(&end, NULL);
  136. if (!status.OK()) {
  137. printf("mode:%d,flush ret:%d,msg:%s\n", mode, status.GetCode(),
  138. status.GetState().c_str());
  139. }
  140. staticstic.AddStat("BenchDB", status.GetCode(), begin, &end);
  141. db->ReleaseWriteBuffer(buffer);
  142. }
  143. }
  144. void BenchDB::PrintStaitic(Staticstic &staticstic) {
  145. while (!kThreadStop) {
  146. staticstic.Report();
  147. sleep(10);
  148. }
  149. }
  150. } // namespace wwsearch