virtual_db_rocks.h 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /*
  2. * Tencent is pleased to support the open source community by making wwsearch
  3. * available.
  4. *
  5. * Copyright (C) 2018-present Tencent. All Rights Reserved.
  6. *
  7. * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  8. * use this file except in compliance with the License. You may obtain a copy of
  9. * the License at
  10. *
  11. * https://opensource.org/licenses/Apache-2.0
  12. *
  13. * Unless required by applicable law or agreed to in writing, software
  14. * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  15. * WARRANTIES OF ANY KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations under the License.
  17. */
  18. #pragma once
  19. #include "codec.h"
  20. #include "codec_doclist_impl.h"
  21. #include "header.h"
  22. #include "virtual_db.h"
  23. #include "db/db_impl.h"
  24. #include "rocksdb/cache.h"
  25. #include "rocksdb/db.h"
  26. #include "rocksdb/merge_operator.h"
  27. #include "rocksdb/options.h"
  28. #include "rocksdb/slice.h"
  29. #include "rocksdb/statistics.h"
  30. #include "rocksdb/utilities/db_ttl.h"
  31. #include "virtual_db_rocks_write_queue.h"
  32. namespace wwsearch {
  33. namespace merge {
  34. class OptimizeMerger;
  35. }
  36. // Simple merger for test
  37. class ConcatMergeOperator : public rocksdb::MergeOperator {
  38. private:
  39. public:
  40. // Constructor
  41. ConcatMergeOperator() {}
  42. virtual ~ConcatMergeOperator() {}
  43. // merge all value of one key in db.
  44. virtual bool FullMergeV2(
  45. const rocksdb::MergeOperator::MergeOperationInput& merge_in,
  46. rocksdb::MergeOperator::MergeOperationOutput* merge_out) const override {
  47. if (merge_in.existing_value) {
  48. merge_out->new_value.assign(merge_in.existing_value->data(),
  49. merge_in.existing_value->size());
  50. }
  51. for (auto item : merge_in.operand_list) {
  52. merge_out->new_value.append(item.data(), item.size());
  53. }
  54. return true;
  55. }
  56. // merge some value of one key in db.
  57. virtual bool PartialMergeMulti(const rocksdb::Slice& key,
  58. const std::deque<rocksdb::Slice>& operand_list,
  59. std::string* new_value,
  60. rocksdb::Logger* logger) const override {
  61. for (auto item : operand_list) {
  62. new_value->append(item.data(), item.size());
  63. return true;
  64. }
  65. }
  66. // Return name of the merger.
  67. virtual const char* Name() const override { return "ConcatMergeOperator"; }
  68. private:
  69. };
  70. // Another optimize merger.
  71. namespace merge {
  72. struct DocList {
  73. DocumentID doc_id_;
  74. DocumentState doc_state_;
  75. } __attribute__((packed));
  76. typedef struct DocList DocList;
  77. } // namespace merge
  78. // This merger will merge doclist into one doclist.
  79. class DocListMergeOperator : public rocksdb::MergeOperator {
  80. private:
  81. Codec* codec_;
  82. merge::OptimizeMerger* merger_;
  83. public:
  84. // Constructor
  85. DocListMergeOperator(Codec* codec, merge::OptimizeMerger* merger)
  86. : codec_(codec), merger_(merger) {}
  87. virtual ~DocListMergeOperator();
  88. // New global single instance
  89. static DocListMergeOperator* NewInstance(VDBParams* params);
  90. // merge all doclist of inverted index's value with order.
  91. virtual bool FullMergeV2(
  92. const rocksdb::MergeOperator::MergeOperationInput& merge_in,
  93. rocksdb::MergeOperator::MergeOperationOutput* merge_out) const override;
  94. // merge some doclist of inverted index's value with order.
  95. virtual bool PartialMergeMulti(const rocksdb::Slice& key,
  96. const std::deque<rocksdb::Slice>& operand_list,
  97. std::string* new_value,
  98. rocksdb::Logger* logger) const override;
  99. // deprecated api
  100. virtual bool FullMerge(const rocksdb::Slice& key,
  101. const rocksdb::Slice* existing_value,
  102. const std::deque<std::string>& operand_list,
  103. std::string* new_value,
  104. rocksdb::Logger* logger) const override;
  105. // deprecated api
  106. virtual bool PartialMerge(const rocksdb::Slice& key,
  107. const rocksdb::Slice& left_operand,
  108. const rocksdb::Slice& right_operand,
  109. std::string* new_value,
  110. rocksdb::Logger* logger) const override;
  111. // TODO:
  112. // support multi partial merge
  113. // support full merge v2
  114. // support shouldmerge()
  115. // Do not change the merge name.
  116. virtual const char* Name() const override { return "DocListMergeOperator"; }
  117. private:
  118. // Inner api do the real merge job
  119. bool DocListMerge(std::vector<DocListReaderCodec*>& items,
  120. std::string* new_value, rocksdb::Logger* logger,
  121. bool purge_deleted = false) const;
  122. // Decode packed doclist into fixed size doclist.
  123. std::pair<merge::DocList*, size_t> DecodeDocList(
  124. std::vector<std::string>& alloc_buffer, const char* data,
  125. size_t data_len) const;
  126. // Inner api do the real merge job
  127. bool DoMerge(std::string* new_value,
  128. std::vector<std::pair<merge::DocList*, size_t>>& doc_lists,
  129. bool full_merge) const;
  130. };
  131. // RocksDB snapshot wrapper
  132. class VirtualDBRocksSnapshot : public VirtualDBSnapshot {
  133. private:
  134. const rocksdb::Snapshot* snapshot_;
  135. public:
  136. VirtualDBRocksSnapshot(const rocksdb::Snapshot* snapshot)
  137. : snapshot_(snapshot) {}
  138. virtual ~VirtualDBRocksSnapshot() {}
  139. // Get current snapshot.
  140. inline const rocksdb::Snapshot* GetSnapshot() { return this->snapshot_; }
  141. private:
  142. };
  143. // RocksDB read option wrapper
  144. class VirtualDBRocksReadOption : public VirtualDBReadOption {
  145. public:
  146. virtual ~VirtualDBRocksReadOption() {}
  147. private:
  148. };
  149. // RocksDB write option wrapper
  150. class VirtualDBRocksWriteOption : public VirtualDBWriteOption {
  151. public:
  152. VirtualDBRocksWriteOption() : options_(rocksdb::WriteOptions()) {}
  153. virtual ~VirtualDBRocksWriteOption() {}
  154. rocksdb::WriteOptions& GetWriteOptions() { return options_; }
  155. private:
  156. rocksdb::WriteOptions options_;
  157. };
  158. // RocksDB wrapper
  159. class VirtualDBRocksImpl : public VirtualDB {
  160. private:
  161. // outer reference
  162. VDBParams* params_;
  163. // new reference.
  164. rocksdb::DB* db_;
  165. DocListMergeOperator* merger_;
  166. // rocksdb::DBWithTTL* db_;
  167. rocksdb::Options options_;
  168. std::vector<rocksdb::ColumnFamilyHandle*> column_famil_handles_;
  169. std::vector<rocksdb::ColumnFamilyDescriptor> column_families_;
  170. std::shared_ptr<DbRocksWriteQueue> write_queue_;
  171. public:
  172. explicit VirtualDBRocksImpl(
  173. VDBParams* params, const std::shared_ptr<DbRocksWriteQueue>& write_queue);
  174. ~VirtualDBRocksImpl();
  175. virtual bool Open() override;
  176. virtual rocksdb::DB* GetDb() { return db_; }
  177. virtual std::vector<rocksdb::ColumnFamilyHandle*>& ColumnFamilyHandle();
  178. virtual VirtualDBSnapshot* NewSnapshot();
  179. virtual void ReleaseSnapshot(VirtualDBSnapshot*);
  180. // Flush to db.
  181. virtual SearchStatus FlushBuffer(WriteBuffer* write_buffer) override;
  182. // virtual SearchStatus FlushBuffer(const std::string* write_buffer) = 0;
  183. virtual WriteBuffer* NewWriteBuffer(const std::string* write_buffer) override;
  184. // Release
  185. virtual void ReleaseWriteBuffer(WriteBuffer* buffer) override;
  186. // Get one k/v
  187. virtual SearchStatus Get(StorageColumnType column, const std::string& key,
  188. std::string& value,
  189. VirtualDBSnapshot* snapshot = nullptr) override;
  190. // MultiGet will set status's size to keys's size to indicate each key's
  191. // return status.
  192. virtual void MultiGet(std::vector<StorageColumnType> columns,
  193. std::vector<std::string>& keys,
  194. std::vector<std::string>& values,
  195. std::vector<SearchStatus>& status,
  196. VirtualDBSnapshot* snapshot = nullptr) override;
  197. // New iterator for read.
  198. virtual Iterator* NewIterator(StorageColumnType column,
  199. VirtualDBReadOption* options) override;
  200. // Trigger RocksDB Compact one range.
  201. virtual SearchStatus CompactRange(StorageColumnType column,
  202. const std::string& begin,
  203. const std::string& end) override;
  204. virtual SearchStatus DropDB() override;
  205. // Drop rocksdb instance.
  206. static bool DropDB(const char* path) {
  207. rocksdb::DestroyDB(path, rocksdb::Options());
  208. }
  209. // Test api
  210. rocksdb::Cache* GetTableCache() {
  211. rocksdb::DBImpl* db_impl = dynamic_cast<rocksdb::DBImpl*>(db_);
  212. if (nullptr == db_impl) {
  213. assert(false);
  214. }
  215. return db_impl->TEST_table_cache();
  216. }
  217. // Get Rocksdb statistics.
  218. const std::shared_ptr<rocksdb::Statistics>& GetDBStatistics() const {
  219. return options_.statistics;
  220. }
  221. private:
  222. void InitDBOptions();
  223. // new one family
  224. rocksdb::ColumnFamilyOptions NewColumnFamilyOptions(
  225. const rocksdb::Options& options, StorageColumnType column);
  226. // release
  227. void Clear() {
  228. for (auto cfh : this->column_famil_handles_) delete cfh;
  229. this->column_famil_handles_.clear();
  230. if (nullptr != this->db_) delete this->db_;
  231. this->db_ = nullptr;
  232. }
  233. };
  234. } // namespace wwsearch