Browse Source

fix rocksdb_helper

shzhulin3 1 year ago
parent
commit
d28cde35c7

+ 3 - 3
README.md

@@ -162,11 +162,11 @@ fields:返回指定字段值,多个字段用逗号隔开
 
 ### 源码编译
 
-建议通过isearch_env镜像进行源码编译,获取isearch_env镜像的方式为: `docker pull intelligentsearch/isearch_env:2.0`
+建议通过isearch_env镜像进行源码编译,获取isearch_env镜像的方式为: `docker pull intelligentsearch/isearch_env:latest`
 
-也可以自行编译isearch_env镜像,Dockerfile文件位于dockerfiles\env目录: `docker build -t intelligentsearch/isearch_env:2.0 .`
+也可以自行编译isearch_env镜像,Dockerfile文件位于dockerfiles\env目录: `docker build -t intelligentsearch/isearch_env:latest .`
 
-然后运行容器: `docker run -itd intelligentsearch/isearch_env:2.0`
+然后运行容器: `docker run -itd intelligentsearch/isearch_env:latest`
 
 进入容器: `docker exec -it 容器id /bin/bash`
 

+ 1 - 2
src/search_local/index_storage/rocksdb_helper/db_main.cc

@@ -133,7 +133,7 @@ static int accept_connection(int fd)
 		if (newfd < 0 && errno == EINVAL)
 		{
 			if (getppid() == (pid_t)1)
-			{ // �������Ѿ��˳�
+			{ 
 				log_error("dtc father process not exist. helper[%d] exit now.", getpid());
 				exit(0);
 			}
@@ -372,7 +372,6 @@ int check_db_version(void)
 		case 2:
 		{
 			log_debug("no need to check rocksdb!");
-			// checker guass db version
 			break;
 		}
 	}

+ 77 - 4
src/search_local/index_storage/rocksdb_helper/db_process_rocks.cc

@@ -823,6 +823,11 @@ int RocksdbProcess::saveRow(
       if (ret != 0)
       {
         log_error("parse field value failed! compoundValue:%s", compoundValue.c_str());
+        for(int i = 0; i < mTableDef->num_fields()+1; i++){
+          if((*row)[i].str.ptr != NULL){
+            free((*row)[i].str.ptr);
+          }
+        }
         delete row;
         return -1;
       }
@@ -833,6 +838,11 @@ int RocksdbProcess::saveRow(
     ret = condition_filter(fieldValue, fieldId, mTableDef->field_type(fieldId), Condition);
     if (ret < 0)
     {
+      for(int i = 0; i < mTableDef->num_fields()+1; i++){
+        if((*row)[i].str.ptr != NULL){
+          free((*row)[i].str.ptr);
+        }
+      }
       delete row;
       log_error("string[%s] conver to value[%d] error: %d", fieldValue.c_str(), mTableDef->field_type(fieldId), ret);
       return (-2);
@@ -840,6 +850,11 @@ int RocksdbProcess::saveRow(
     else if (ret == 1)
     {
       // condition is not matched
+      for(int i = 0; i < mTableDef->num_fields()+1; i++){
+        if((*row)[i].str.ptr != NULL){
+          free((*row)[i].str.ptr);
+        }
+      }
       delete row;
       return 0;
     }
@@ -848,6 +863,11 @@ int RocksdbProcess::saveRow(
     ret = str2Value(fieldValue, mTableDef->field_type(fieldId), (*row)[fieldId]);
     if (ret < 0)
     {
+      for(int i = 0; i < mTableDef->num_fields()+1; i++){
+        if((*row)[i].str.ptr != NULL){
+          free((*row)[i].str.ptr);
+        }
+      }
       delete row;
       log_error("string[%s] conver to value[%d] error: %d", fieldValue.c_str(), mTableDef->field_type(fieldId), ret);
       return (-2);
@@ -857,6 +877,11 @@ int RocksdbProcess::saveRow(
   // Task->update_key(row);
   ret = Task->append_row(row);
 
+  for(int i = 0; i < mTableDef->num_fields()+1; i++){
+    if((*row)[i].str.ptr != NULL){
+      free((*row)[i].str.ptr);
+    }
+  }
   delete row;
 
   if (ret < 0)
@@ -1755,6 +1780,54 @@ int RocksdbProcess::process_delete(DTCTask *Task)
     return -1;
   }
 
+  std::set<int> req_ids;
+  std::vector<std::string> quick_keys(mCompoundKeyFieldNums);
+  quick_keys[0] = prefixKey;
+  DTCFieldValue *req_condition = (DTCFieldValue*)Task->request_condition();
+  if(req_condition != NULL){
+    for ( int i = 0; i < req_condition->num_fields(); i++ ){
+      int req_field_id = req_condition->field_id(i);
+      if ( mTableDef->is_volatile(req_field_id) )
+        continue;
+      int rocks_fid = translate_field_idx(req_field_id);
+      assert(rocks_fid >= 0 && rocks_fid < mCompoundKeyFieldNums + mExtraValueFieldNums);
+      if ( req_field_id == 0 || rocks_fid == 0 || rocks_fid >= mCompoundKeyFieldNums) 
+        continue;
+      req_ids.insert(rocks_fid);
+      std::string& va = quick_keys[rocks_fid];
+      ret = value2Str(req_condition->field_value(i), req_field_id, va);
+      assert( ret == 0 );
+    }
+    bool hit_all_key = true;
+    for ( int idx = 1; idx < mCompoundKeyFieldNums; idx++ ){
+      if(req_ids.find(idx) == req_ids.end()){
+        hit_all_key = false;
+      }
+    }
+    if(true == hit_all_key){
+      std::stringstream ss;
+      for(auto it :req_ids) { ss << it << ","; }
+      std::stringstream sskey;
+      for(auto key_it :quick_keys) { sskey << key_it << ","; }
+      log_debug("hit all unique keys, goto quick delete, req_ids: %s, keys: %s, mCompoundKeyFieldNums: %d", 
+        ss.str().c_str(), sskey.str().c_str(), mCompoundKeyFieldNums);
+      std::string quick_rocks_Key;
+      std::string keyBitmaps;
+      encode_bitmap_keys(quick_keys, keyBitmaps);
+      quick_rocks_Key = std::move(key_format::Encode(quick_keys, mKeyfield_types));
+
+      ret = mDBConn->delete_entry(quick_rocks_Key);
+      if ( ret != 0 ) {
+        log_error("deleteEntry failed! ");
+        Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "deleteEntry failed!");
+        return -1;
+      }
+      log_debug("quick delete success");
+      return 0;
+    }
+    req_ids.clear();
+  }
+
   if (mKeyfield_types[0] == DField::String)
     std::transform(prefixKey.begin(), prefixKey.end(), prefixKey.begin(), ::tolower);
 
@@ -3102,10 +3175,10 @@ int RocksdbProcess::split_values(
   char *head = const_cast<char *>(compoundValue.data());
   for (int idx = 0; idx < mExtraValueFieldNums; idx++)
   {
-    if(idx == mExtraValueFieldNums-1){ // extend field no need to parse
-      values.push_back("");
-      break;
-    }
+    //if(idx == mExtraValueFieldNums-1){ // extend field no need to parse
+    //  values.push_back("");
+    //  break;
+    //}
     ret = get_value_by_id(head, mFieldIndexMapping[mCompoundKeyFieldNums + idx], value);
     assert(ret == 0);
     values.push_back(std::move(value));

+ 17 - 7
src/search_local/index_storage/rocksdb_helper/elastic_buffer.cc

@@ -1,10 +1,20 @@
-/////////////////////////////////////////////////////////////
-//
-//  Elastic buffer for replication between slave and master
-//      created by qiuyu
-//        1/11/2021
-//
-/////////////////////////////////////////////////////////////
+/*
+ * =====================================================================================
+ *
+ *       Filename:  elastic_buffer.cc
+ *
+ *    Description:  elastic buffer for replication between slave and master
+ *
+ *        Version:  1.0
+ *        Created:  09/08/2020 10:02:05 PM
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Norton, yangshuang68@jd.com
+ *        Company:  JD.com, Inc.
+ *
+ * =====================================================================================
+ */
 
 #include "elastic_buffer.h"
 #include "log.h"

+ 18 - 7
src/search_local/index_storage/rocksdb_helper/elastic_buffer.h

@@ -1,10 +1,20 @@
-/////////////////////////////////////////////////////////////
-//
-//  Elastic buffer for replication between slave and master
-//      created by qiuyu
-//        1/11/2021
-//
-/////////////////////////////////////////////////////////////
+/*
+ * =====================================================================================
+ *
+ *       Filename:  elastic_buffer.h 
+ *
+ *    Description:  elastic buffer for replication between slave and master
+ *
+ *        Version:  1.0
+ *        Created:  09/08/2020 10:02:05 PM
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Norton, yangshuang68@jd.com
+ *        Company:  JD.com, Inc.
+ *
+ * =====================================================================================
+ */
 
 #ifndef __ELASTIC_BUFFER_H__
 #define __ELASTIC_BUFFER_H__
@@ -57,6 +67,7 @@ class ElasticBuffer
     Buffer_t* nextBuffer(const Buffer_t* curr) { return curr->sNext; }
     void resetElasticBuffer()
     {
+      mReadLen = 0;
       mWritingBuffer = mHeadBuffer;
       while (mWritingBuffer)
       {

+ 4 - 4
src/search_local/index_storage/rocksdb_helper/key_format.cc

@@ -514,14 +514,14 @@ void key_format::DecodeBytes(const std::string &src, std::string &dst)
 	{
 		dst = "";
 	}
-	//std::stringstream oss_dst;
+	std::stringstream oss_dst;
 	for (size_t i = 0; i < src.length(); i += SEGMENT_SIZE)
 	{
 		char padding_bytes = ENCODER_MARKER - src[i + 7];
-		//oss_dst << src.substr(i, SEGMENT_SIZE - 1 - padding_bytes);
-		dst += src.substr(i, SEGMENT_SIZE - 1 - padding_bytes);
+		oss_dst << src.substr(i, SEGMENT_SIZE - 1 - padding_bytes);
+		//dst += src.substr(i, SEGMENT_SIZE - 1 - padding_bytes);
 	}
-	//dst = oss_dst.str();
+	dst = oss_dst.str();
 }
 
 void key_format::DecodeBytes(const std::string &src, uint64_t &dst)

+ 26 - 15
src/search_local/index_storage/rocksdb_helper/rocksdb_replication.cc

@@ -23,11 +23,11 @@ extern DTCConfig *gConfig;
 
 // operation level, [prev state][current state]
 static bool gReplicationState[(int)ReplicationState::eReplicationMax + 1][(int)ReplicationState::eReplicationMax + 1] = {
-  {true, true, true, false, false},
+  {true, true, true, false, true},
   {false, true, true, false, false},
   {false, true, true, true, true},
   {false, true, false, true, true},
-  {false, false, false, false, true}
+  {true, false, false, false, true}
 };
 
 int getReplicationState(RocksDBConn* rocksdb)
@@ -248,6 +248,7 @@ void RocksReplicationChannel::handleReplication()
   
   {
     RocksdbReplication::ReplicationType type = GET_REQUEST_TYPE(mPacketHeader.sPacketFlag);
+    CLEAR_REQUEST_TYPE(mPacketHeader.sPacketFlag , type);
     switch (type)
     {
     case RocksdbReplication::eReplSync:
@@ -549,8 +550,8 @@ int RocksReplicationChannel::masterFillRangeKV(
     goto RANGE_QUERY_END;
   }
   
-  ret = mRocksdb->retrieve_start(startKey, value, rocksItr, true);
-  do {
+  ret = mRocksdb->retrieve_start(startKey, value, rocksItr, false);
+  while (mPacketBuffer->getBufferSize() + 2*sizeof(int) + startKey.length() + value.length() < REPLICATION_PACKET_SIZE) {
     if (ret < 0)
     {  
       log_error("query rocksdb failed! key:%s, ret:%d", startKey.c_str(), ret);
@@ -588,15 +589,25 @@ int RocksReplicationChannel::masterFillRangeKV(
     }
 
     ret = mRocksdb->next_entry(rocksItr, startKey, value);
-  } while (mPacketBuffer->getBufferSize() < REPLICATION_PACKET_SIZE);
+  };
       
   mRocksdb->retrieve_end(rocksItr);
   
 RANGE_QUERY_END:
   ReplicationPacket_t* respHeader = (ReplicationPacket_t*)(mPacketBuffer->getHeadBuffer()->sData);
   
-  if (finished) SET_REQUEST_TYPE(respHeader->sPacketFlag, RocksdbReplication::eReplFin); 
-  else SET_REQUEST_TYPE(respHeader->sPacketFlag, RocksdbReplication::eReplRepAck);
+  if (finished) {
+    SET_REQUEST_TYPE(respHeader->sPacketFlag, RocksdbReplication::eReplFin);
+    int state = (int)ReplicationState::eReplicationFinished;
+    int ret = updateReplicationState(mRocksdb, state);
+    if (ret < 0)
+    {
+      log_error("update replication state failed!");
+      return -1;
+    }
+  } else {
+    SET_REQUEST_TYPE(respHeader->sPacketFlag, RocksdbReplication::eReplRepAck);
+  }
 
   respHeader->sRawPacketLen = mPacketBuffer->getBufferSize();
   
@@ -691,15 +702,15 @@ int RocksReplicationChannel::slaveFillRangeKV()
 
     newEntries[key] = value;
     
-    std::string dKey, dVal;
-    extern HelperProcessBase* helperProc;
-    RocksdbProcess* p_rocksdb_process = dynamic_cast<RocksdbProcess*>(helperProc);
-    if (p_rocksdb_process != NULL){
-      ret = p_rocksdb_process->decodeInternalKV(
-        key, value, dKey, dVal);
-    }
+    //std::string dKey, dVal;
+    //extern HelperProcessBase* helperProc;
+    //RocksdbProcess* p_rocksdb_process = dynamic_cast<RocksdbProcess*>(helperProc);
+    //if (p_rocksdb_process != NULL){
+    //  ret = p_rocksdb_process->decodeInternalKV(
+    //    key, value, dKey, dVal);
+    //}
     
-    log_error("replication kv, :%s,%s", dKey.c_str(), dVal.c_str());
+    //log_error("replication kv, :%s,%s", dKey.c_str(), dVal.c_str());
   }
 
   ret = mRocksdb->batch_update(std::set<std::string>(), newEntries, true);

+ 2 - 1
src/search_local/index_storage/rocksdb_helper/rocksdb_replication.h

@@ -13,7 +13,7 @@
 #include "rocksdb_conn.h"
 
 #define REPLICATON_HEADER_MAGIC 0x7654321
-#define REPLICATION_PACKET_SIZE (10 * (1 << 20)) // 10M
+#define REPLICATION_PACKET_SIZE (1 * (1 << 20)) // 1M
 
 #define CLEAR_BITS(packetFlag) (packetFlag &= 0x0)
 
@@ -30,6 +30,7 @@
 enum class ReplicationType;
 #define SET_REQUEST_TYPE(packetFlag, type) (packetFlag |= (type << 2))
 #define GET_REQUEST_TYPE(packetFlag) ((RocksdbReplication::ReplicationType)((packetFlag & 0xff) >> 2))
+#define CLEAR_REQUEST_TYPE(packetFlag, type) (packetFlag &= (-((type << 2)+1)))
 
 typedef struct ReplicationPacket
 {