bench_index.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. /*
  2. * Tencent is pleased to support the open source community by making wwsearch
  3. * available.
  4. *
  5. * Copyright (C) 2018-present Tencent. All Rights Reserved.
  6. *
  7. * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  8. * use this file except in compliance with the License. You may obtain a copy of
  9. * the License at
  10. *
  11. * https://opensource.org/licenses/Apache-2.0
  12. *
  13. * Unless required by applicable law or agreed to in writing, software
  14. * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  15. * WARRANTIES OF ANY KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations under the License.
  17. */
  18. #include <time.h>
  19. #include <thread>
  20. #include "bench_index.h"
  21. #include "random_creater.h"
  22. #include "staticstic.h"
  23. #include "include/header.h"
  24. #include "include/index_wrapper.h"
  25. #include "include/virtual_db_mock.h"
  26. #include "rocksdb/perf_context.h"
  27. namespace rocksdb {
  28. extern thread_local PerfContext perf_context;
  29. extern __thread PerfLevel perf_level; // = kEnableCount;
  30. } // namespace rocksdb
  31. thread_local uint64_t last_perf_count = 0;
  32. namespace wwsearch {
  33. static int kThreadStop = false;
  34. static constexpr int kFlushLogMaxSize = 20 * 1024 * 1024;
  35. // * merge operator
  36. // * multi instance
  37. // * batch write
  38. const char *BenchIndex::Description =
  39. "-d [data dir] -p [instance of db] -n [threads per db] -e [debug] \n"
  40. "\t\t-f [run times per thread] -g [batch num] -h [str attr len] \n "
  41. "\t\t-i [numeric att num] -j [string attr num] -k [suffix attr num] -l \n"
  42. "\t\t[max uin num] -o [mode 1:add 2:kv debug] -r [rocks "
  43. "perf:1=disable,2=count,3=time,4=time and lock] -t [mmseg path] -s [suffix "
  44. "or not 1/0] -m [mock or not 1/0]";
  45. const char *BenchIndex::Usage = "Benchmark for index ";
  46. void BenchIndex::Run(wwsearch::ArgsHelper &args) {
  47. printf("doing %s\n", __FUNCTION__);
  48. uint32_t instance_num = args.UInt('p');
  49. int thread_num = args.UInt('n');
  50. assert(thread_num > 0);
  51. BenchIndexParams job_params;
  52. job_params.debug_ = args.UInt64('e');
  53. job_params.run_times_ = args.UInt64('f');
  54. job_params.batch_num_ = args.UInt64('g');
  55. job_params.str_len_ = args.UInt64('h');
  56. job_params.nummeric_attr_num_ = args.UInt64('i');
  57. job_params.string_attr_num_ = args.UInt64('j');
  58. job_params.suffix_attr_num_ = args.UInt64('k');
  59. job_params.max_uin_num_ = args.UInt64('l');
  60. job_params.index_type = args.UInt64('o');
  61. job_params.perf_rocks = args.UInt64('r');
  62. job_params.mock = args.UInt64('m');
  63. std::vector<DefaultIndexWrapper *> indexers;
  64. std::vector<std::thread *> new_threads;
  65. std::vector<std::thread *> staticstic_threads;
  66. std::vector<Staticstic *> staticstics;
  67. std::vector<RandomCreater *> random_creaters;
  68. unsigned int begin_seed = time(NULL);
  69. char buffer[256];
  70. job_params.Print();
  71. for (size_t i = 0; i < instance_num; i++) {
  72. snprintf(buffer, sizeof(buffer), "%s/db_%d", args.String('d').c_str(), i);
  73. DefaultIndexWrapper *indexer = new DefaultIndexWrapper();
  74. indexer->DBParams().path = buffer;
  75. indexer->DBParams().mmseg_instance_num_ = thread_num;
  76. if (args.Have('t')) {
  77. indexer->DBParams().mmseg_dict_dir_ = args.String('t');
  78. } else {
  79. indexer->DBParams().mmseg_dict_dir_ = "./";
  80. }
  81. indexer->Config().SetLogLevel(wwsearch::kSearchLogLevelError);
  82. if (job_params.mock == 1) {
  83. VirtualDB *vdb = new VirtualDBMock(indexer->codec_);
  84. indexer->SetVdb(vdb);
  85. }
  86. auto search_stuats = indexer->Open();
  87. if (!search_stuats.OK()) {
  88. printf("open db fail,ret:%d,msg:%s\n", search_stuats.GetCode(),
  89. search_stuats.GetState().c_str());
  90. }
  91. assert(search_stuats.OK());
  92. indexers.push_back(indexer);
  93. snprintf(buffer, sizeof(buffer), "./staticstic_log/bench_index_%d", i);
  94. Staticstic *staticstic = new Staticstic(buffer, 10, kFlushLogMaxSize);
  95. staticstics.push_back(staticstic);
  96. printf("instance [%d],see log [%s]\n", i, buffer);
  97. for (size_t i = 0; i < thread_num; i++) {
  98. RandomCreater *random_creater = new RandomCreater();
  99. random_creater->Init(begin_seed++);
  100. random_creaters.push_back(random_creater);
  101. std::thread *new_thread = new std::thread(
  102. BenchIndex::ThreadRun, std::ref(*(indexers.back())),
  103. std::ref(*staticstic), random_creater, std::ref(job_params));
  104. new_threads.push_back(new_thread);
  105. }
  106. std::thread *staticstic_thread =
  107. new std::thread(BenchIndex::PrintStaitic, std::ref(*staticstic));
  108. staticstic_threads.push_back(staticstic_thread);
  109. }
  110. for (size_t i = 0; i < new_threads.size(); i++) {
  111. new_threads[i]->join();
  112. delete new_threads[i];
  113. delete random_creaters[i];
  114. }
  115. kThreadStop = true;
  116. for (size_t i = 0; i < staticstic_threads.size(); i++) {
  117. staticstic_threads[i]->join();
  118. delete staticstic_threads[i];
  119. delete staticstics[i];
  120. }
  121. for (auto indexer : indexers) {
  122. delete indexer;
  123. }
  124. SearchLogDebug("Run finish");
  125. return;
  126. }
  127. static void InitStringField(wwsearch::IndexField *field, uint16_t field_id,
  128. const std::string &word, bool suffix = false) {
  129. ::wwsearch::IndexFieldFlag flag;
  130. flag.SetDocValue();
  131. flag.SetStoredField();
  132. flag.SetTokenize();
  133. flag.SetInvertIndex();
  134. flag.SetInvertIndex();
  135. if (suffix) {
  136. flag.SetSuffixBuild();
  137. }
  138. field->SetMeta(field_id, flag);
  139. field->SetString(word);
  140. }
  141. static void InitUint32Field(wwsearch::IndexField *field, uint16_t field_id,
  142. uint32_t value) {
  143. ::wwsearch::IndexFieldFlag flag;
  144. flag.SetDocValue();
  145. flag.SetStoredField();
  146. flag.SetTokenize();
  147. flag.SetInvertIndex();
  148. field->SetMeta(field_id, flag);
  149. field->SetUint32(value);
  150. }
  151. static void InitUint64Field(wwsearch::IndexField *field, uint16_t field_id,
  152. uint64_t value) {
  153. ::wwsearch::IndexFieldFlag flag;
  154. flag.SetDocValue();
  155. flag.SetStoredField();
  156. flag.SetTokenize();
  157. flag.SetInvertIndex();
  158. field->SetMeta(field_id, flag);
  159. field->SetUint64(value);
  160. }
  161. void BenchIndex::CollectRocksDBPerf(Staticstic *staticstic, uint64_t count,
  162. bool quit) {
  163. last_perf_count += count;
  164. if (last_perf_count == 0) return;
  165. // printf("%s\n",rocksdb::perf_context.ToString(true).c_str());
  166. if (last_perf_count >= 100 || quit) {
  167. // NOTE: real is nana seconds
  168. #define ROCKS_STAT(metric) \
  169. { \
  170. timeval begin, end; \
  171. begin.tv_sec = 0; \
  172. begin.tv_usec = 0; \
  173. uint64_t cost = (rocksdb::perf_context.metric) / last_perf_count; \
  174. end.tv_sec = (cost / 1000000); \
  175. end.tv_usec = (cost % 1000000); \
  176. if (cost != 0) staticstic->AddStat(#metric, 0, begin, &end); \
  177. }
  178. ROCKS_STAT(user_key_comparison_count);
  179. ROCKS_STAT(block_cache_hit_count);
  180. ROCKS_STAT(block_read_count);
  181. ROCKS_STAT(block_read_byte);
  182. ROCKS_STAT(block_read_time);
  183. ROCKS_STAT(block_checksum_time);
  184. ROCKS_STAT(block_decompress_time);
  185. ROCKS_STAT(get_read_bytes);
  186. ROCKS_STAT(multiget_read_bytes);
  187. ROCKS_STAT(iter_read_bytes);
  188. ROCKS_STAT(internal_key_skipped_count);
  189. ROCKS_STAT(internal_delete_skipped_count);
  190. ROCKS_STAT(internal_recent_skipped_count);
  191. ROCKS_STAT(internal_merge_count);
  192. ROCKS_STAT(get_snapshot_time);
  193. ROCKS_STAT(get_from_memtable_time);
  194. ROCKS_STAT(get_from_memtable_count);
  195. ROCKS_STAT(get_post_process_time);
  196. ROCKS_STAT(get_from_output_files_time);
  197. ROCKS_STAT(seek_on_memtable_time);
  198. ROCKS_STAT(seek_on_memtable_count);
  199. ROCKS_STAT(next_on_memtable_count);
  200. ROCKS_STAT(prev_on_memtable_count);
  201. ROCKS_STAT(seek_child_seek_time);
  202. ROCKS_STAT(seek_child_seek_count);
  203. ROCKS_STAT(seek_min_heap_time);
  204. ROCKS_STAT(seek_max_heap_time);
  205. ROCKS_STAT(seek_internal_seek_time);
  206. ROCKS_STAT(find_next_user_entry_time);
  207. ROCKS_STAT(write_wal_time);
  208. ROCKS_STAT(write_memtable_time);
  209. ROCKS_STAT(write_delay_time);
  210. ROCKS_STAT(write_scheduling_flushes_compactions_time);
  211. ROCKS_STAT(write_pre_and_post_process_time);
  212. ROCKS_STAT(write_thread_wait_nanos);
  213. ROCKS_STAT(db_mutex_lock_nanos);
  214. ROCKS_STAT(db_condition_wait_nanos);
  215. ROCKS_STAT(merge_operator_time_nanos);
  216. ROCKS_STAT(read_index_block_nanos);
  217. ROCKS_STAT(read_filter_block_nanos);
  218. ROCKS_STAT(new_table_block_iter_nanos);
  219. ROCKS_STAT(new_table_iterator_nanos);
  220. ROCKS_STAT(block_seek_nanos);
  221. ROCKS_STAT(find_table_nanos);
  222. ROCKS_STAT(bloom_memtable_hit_count);
  223. ROCKS_STAT(bloom_memtable_miss_count);
  224. ROCKS_STAT(bloom_sst_hit_count);
  225. ROCKS_STAT(bloom_sst_miss_count);
  226. ROCKS_STAT(key_lock_wait_time);
  227. ROCKS_STAT(key_lock_wait_count);
  228. ROCKS_STAT(env_new_sequential_file_nanos);
  229. ROCKS_STAT(env_new_random_access_file_nanos);
  230. ROCKS_STAT(env_new_writable_file_nanos);
  231. ROCKS_STAT(env_reuse_writable_file_nanos);
  232. ROCKS_STAT(env_new_random_rw_file_nanos);
  233. ROCKS_STAT(env_new_directory_nanos);
  234. ROCKS_STAT(env_file_exists_nanos);
  235. ROCKS_STAT(env_get_children_nanos);
  236. ROCKS_STAT(env_get_children_file_attributes_nanos);
  237. ROCKS_STAT(env_delete_file_nanos);
  238. ROCKS_STAT(env_create_dir_nanos);
  239. ROCKS_STAT(env_create_dir_if_missing_nanos);
  240. ROCKS_STAT(env_delete_dir_nanos);
  241. ROCKS_STAT(env_get_file_size_nanos);
  242. ROCKS_STAT(env_get_file_modification_time_nanos);
  243. ROCKS_STAT(env_rename_file_nanos);
  244. ROCKS_STAT(env_link_file_nanos);
  245. ROCKS_STAT(env_lock_file_nanos);
  246. ROCKS_STAT(env_unlock_file_nanos);
  247. ROCKS_STAT(env_new_logger_nanos);
  248. rocksdb::perf_context.Reset();
  249. last_perf_count = 0;
  250. }
  251. }
  252. void BenchIndex::ThreadRun(DefaultIndexWrapper &wrapper, Staticstic &staticstic,
  253. RandomCreater *random_creater,
  254. BenchIndexParams &params) {
  255. // rocksdb
  256. rocksdb::perf_level = (rocksdb::PerfLevel)(params.perf_rocks);
  257. char buffer[200];
  258. #define ATTR_SIZE (6)
  259. const char *format[ATTR_SIZE] = {
  260. "name_%010u", "pinyin_%010u", "english_%010u", "1%10u", "mail%10u", "%u"};
  261. uint64_t run_times = params.run_times_;
  262. assert(params.batch_num_ > 0);
  263. printf("Start thread\n");
  264. while (run_times-- > 0 && !kThreadStop) {
  265. std::vector<wwsearch::DocumentUpdater *> documents;
  266. for (size_t i = 0; i < params.batch_num_; i++) {
  267. wwsearch::DocumentUpdater *document_updater =
  268. new wwsearch::DocumentUpdater();
  269. wwsearch::Document &document = document_updater->New();
  270. DocumentID doc_id = random_creater->GetUInt64();
  271. while (0 == doc_id) {
  272. random_creater->GetUInt64();
  273. }
  274. document.SetID(doc_id); // start from 1
  275. uint32_t field_id = 0;
  276. for (size_t i = 0; i < params.nummeric_attr_num_; i++) {
  277. InitUint32Field(document.AddField(), field_id++,
  278. random_creater->GetUInt32());
  279. }
  280. for (size_t i = 0; i < params.string_attr_num_; i++) {
  281. InitStringField(document.AddField(), field_id++,
  282. random_creater->GetString(params.str_len_), false);
  283. }
  284. for (size_t i = 0; i < params.suffix_attr_num_; i++) {
  285. InitStringField(document.AddField(), field_id++,
  286. random_creater->GetString(params.str_len_), true);
  287. }
  288. /*
  289. uint32_t v;
  290. //
  291. name,pinyin,english,phone,mail,usrid,exattrs,corpid,ppartyid,type,update
  292. for (int field_id = 0; field_id < ATTR_SIZE; field_id++) {
  293. wwsearch::IndexFieldFlag flag;
  294. flag.SetStoredField();
  295. flag.SetTokenize();
  296. flag.SetInvertIndex();
  297. if (field_id == 3) flag.SetSuffixBuild();
  298. snprintf(buffer, sizeof(buffer), format[field_id],
  299. random_creater->GetUInt32());
  300. auto field = document.AddField();
  301. field->SetMeta(field_id, flag);
  302. field->SetString(buffer);
  303. }
  304. */
  305. documents.push_back(document_updater);
  306. }
  307. TableID table;
  308. table.business_type = 0;
  309. table.partition_set = random_creater->GetUInt32() % params.max_uin_num_;
  310. wwsearch::SearchTracer tracer;
  311. timeval begin, end;
  312. gettimeofday(&begin, NULL);
  313. bool success = false;
  314. if (params.index_type == 1) {
  315. success = wrapper.index_writer_->AddDocuments(table, documents, nullptr,
  316. &tracer);
  317. } else if (params.index_type == 2) {
  318. success = wrapper.index_writer_->AddOrUpdateDocuments(table, documents,
  319. nullptr, &tracer);
  320. } else if (params.index_type == 3) {
  321. success = wrapper.index_writer_->UpdateDocuments(table, documents,
  322. nullptr, &tracer);
  323. } else {
  324. // donothings
  325. // assert(false);
  326. }
  327. gettimeofday(&end, NULL);
  328. for (auto &du : documents) {
  329. staticstic.AddStat("BenchIndex", du->Status().GetCode(), begin, &end);
  330. staticstic.AddStat("OK", success ? 0 : -1, begin, &end);
  331. staticstic.AddStat("Keys", 0,
  332. tracer.Get(wwsearch::TracerType::kRealInsertKeys),
  333. begin, &end);
  334. staticstic.AddStat("Keys", 1,
  335. tracer.Get(wwsearch::TracerType::kDocumentCount),
  336. begin, &end);
  337. if (!du->Status().OK() && !du->Status().DocumentExist()) {
  338. printf("Other error,ret:%d,msg:%s]n", du->Status().GetCode(),
  339. du->Status().GetState().c_str());
  340. assert(false);
  341. }
  342. delete du;
  343. }
  344. printf("release documents run_times %llu ... \n", run_times);
  345. documents.clear();
  346. CollectRocksDBPerf(&staticstic, params.batch_num_);
  347. // rocksdb perf time.
  348. }
  349. CollectRocksDBPerf(&staticstic, 0, true);
  350. }
  351. void BenchIndex::PrintStaitic(Staticstic &staticstic) {
  352. while (!kThreadStop) {
  353. staticstic.Report();
  354. sleep(10);
  355. }
  356. }
  357. } // namespace wwsearch