db_process_rocks.cc 94 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637
  1. /*
  2. * =====================================================================================
  3. *
  4. * Filename: db_process_rocks.cc
  5. *
  6. * Description:
  7. *
  8. * Version: 1.0
  9. * Created: 09/08/2020 10:02:05 PM
  10. * Revision: none
  11. * Compiler: gcc
  12. *
  13. * Author: Norton, yangshuang68@jd.com
  14. * Company: JD.com, Inc.
  15. *
  16. * =====================================================================================
  17. */
  18. #include <stdio.h>
  19. #include <stdlib.h>
  20. #include <string.h>
  21. #include <stdarg.h>
  22. #include <limits.h>
  23. #include <errno.h>
  24. #include <unistd.h>
  25. #include <netinet/in.h>
  26. #include <arpa/inet.h>
  27. #include <ctype.h>
  28. #include <bitset>
  29. #include <map>
  30. #include <string>
  31. #include <vector>
  32. #include <algorithm>
  33. #include "db_process_rocks.h"
  34. #include <protocol.h>
  35. #include <log.h>
  36. #include "proc_title.h"
  37. #include "table_def_manager.h"
  38. #include "buffer_pool.h"
  39. #include <daemon.h>
  40. #include "mysql_error.h"
  41. #include <sstream>
  42. #include <fstream>
  43. // #define DEBUG_INFO
  44. #define PRINT_STAT
  45. #define BITS_OF_BYTE 8 /* bits of one byte */
  46. #define MAX_REPLICATE_LEN (1UL << 20)
  47. static const std::string gReplicatePrefixKey = "__ROCKS_REPLICAE_PREFIX_KEY__";
  48. CommKeyComparator gInternalComparator;
  49. RocksdbProcess::RocksdbProcess(RocksDBConn *conn)
  50. {
  51. mDBConn = conn;
  52. strncpy(name, "helper", 6);
  53. titlePrefixSize = 0;
  54. procTimeout = 0;
  55. mTableDef = NULL;
  56. mCompoundKeyFieldNums = -1;
  57. mExtraValueFieldNums = -1;
  58. mNoBitmapKey = true;
  59. mPrevMigrateKey = "";
  60. mCurrentMigrateKey = "";
  61. mUncommitedMigId = -1;
  62. mOrderByUnit = NULL;
  63. mReplUnit = NULL;
  64. }
  65. RocksdbProcess::~RocksdbProcess()
  66. {
  67. }
  68. int RocksdbProcess::check_table()
  69. {
  70. // no table concept in rocksdb
  71. return (0);
  72. }
  73. void RocksdbProcess::init_ping_timeout(void)
  74. {
  75. // only for frame adapt
  76. return;
  77. }
  78. void RocksdbProcess::use_matched_rows(void)
  79. {
  80. // only for frame adapt, no actual meanings
  81. return;
  82. }
  83. int RocksdbProcess::Init(int GroupID, const DbConfig *Config, DTCTableDefinition *tdef, int slave)
  84. {
  85. int ret;
  86. SelfGroupID = GroupID;
  87. mTableDef = tdef;
  88. std::vector<int> dtcFieldIndex;
  89. int totalFields = mTableDef->num_fields();
  90. for (int i = 0; i <= totalFields; i++)
  91. {
  92. //bug fix volatile不在db中
  93. if (mTableDef->is_volatile(i))
  94. continue;
  95. dtcFieldIndex.push_back(i);
  96. }
  97. totalFields = dtcFieldIndex.size();
  98. if (totalFields <= 0)
  99. {
  100. log_error("field can not be empty!");
  101. return -1;
  102. }
  103. mCompoundKeyFieldNums = mTableDef->uniq_fields();
  104. if (mCompoundKeyFieldNums <= 0)
  105. {
  106. log_error("not found unique constraint in any field!");
  107. return -1;
  108. }
  109. mExtraValueFieldNums = totalFields - mCompoundKeyFieldNums;
  110. log_info("total fields:%d, uniqKeyNum:%d, valueNum:%d", totalFields, mCompoundKeyFieldNums, mExtraValueFieldNums);
  111. // create map relationship
  112. uint8_t keyIndex;
  113. uint8_t *uniqFields = mTableDef->uniq_fields_list();
  114. for (int idx = 0; idx < mCompoundKeyFieldNums; idx++)
  115. {
  116. keyIndex = *(uniqFields + idx);
  117. dtcFieldIndex[keyIndex] = -1;
  118. mFieldIndexMapping.push_back(keyIndex);
  119. }
  120. if (dtcFieldIndex.size() <= 0)
  121. {
  122. log_error("no value field!");
  123. return -1;
  124. }
  125. // classify the unique keys into two types: Integer fixed len and elastic string type,
  126. // no need to do collecting if the key is binary
  127. // int keyType = mTableDef->key_type();
  128. mKeyfield_types.resize(mCompoundKeyFieldNums);
  129. // mKeyfield_types[0] = keyType;
  130. {
  131. // shrink string keys or integer keys into the head of the array
  132. int fieldType;
  133. // int moveHeadIdx = -1;
  134. for (size_t idx = 0; idx < mFieldIndexMapping.size(); idx++)
  135. {
  136. fieldType = mTableDef->field_type(mFieldIndexMapping[idx]);
  137. mKeyfield_types[idx] = fieldType;
  138. log_info("fieldId:%d, fieldType:%d", mFieldIndexMapping[idx], fieldType);
  139. switch (fieldType)
  140. {
  141. case DField::Signed:
  142. case DField::Unsigned:
  143. case DField::Float:
  144. case DField::Binary:
  145. break;
  146. case DField::String:
  147. {
  148. mNoBitmapKey = false;
  149. break;
  150. }
  151. default:
  152. log_error("unexpected field type! type:%d", fieldType);
  153. return -1;
  154. };
  155. }
  156. }
  157. // remove key from vector
  158. auto start = std::remove_if(dtcFieldIndex.begin(), dtcFieldIndex.end(),
  159. [](const int idx) { return idx == -1; });
  160. dtcFieldIndex.erase(start, dtcFieldIndex.end());
  161. // append value maps
  162. mFieldIndexMapping.insert(mFieldIndexMapping.end(), dtcFieldIndex.begin(), dtcFieldIndex.end());
  163. {
  164. mReverseFieldIndexMapping.resize(mFieldIndexMapping.size());
  165. for (size_t idx1 = 0; idx1 < mFieldIndexMapping.size(); idx1++)
  166. {
  167. mReverseFieldIndexMapping[mFieldIndexMapping[idx1]] = idx1;
  168. }
  169. }
  170. // init replication tag key
  171. ret = get_replicate_end_key();
  172. std::stringstream ss;
  173. ss << "rocks helper meta info, keysize:" << mCompoundKeyFieldNums << " valuesize:"
  174. << mExtraValueFieldNums << " rocksdb fields:[";
  175. for (size_t idx = 0; idx < mFieldIndexMapping.size(); idx++)
  176. {
  177. log_info("%d, type:%d", mFieldIndexMapping[idx], idx < mCompoundKeyFieldNums ? mKeyfield_types[idx] : -1);
  178. if (idx == 0)
  179. ss << mFieldIndexMapping[idx];
  180. else
  181. ss << ", " << mFieldIndexMapping[idx];
  182. }
  183. ss << "]";
  184. log_info("%s", ss.str().c_str());
  185. return ret;
  186. }
  187. int RocksdbProcess::startReplListener()
  188. {
  189. // init replication related
  190. mReplUnit = new RocksdbReplication(mDBConn);
  191. if (!mReplUnit)
  192. {
  193. log_error("%s", "create replication unit failed");
  194. return -1;
  195. }
  196. return mReplUnit->initializeReplication();
  197. }
  198. int RocksdbProcess::get_replicate_end_key()
  199. {
  200. return 0;
  201. std::string value;
  202. std::string fullKey = gReplicatePrefixKey;
  203. int ret = mDBConn->get_entry(fullKey, value, RocksDBConn::COLUMN_META_DATA);
  204. if (ret < 0 && ret != -RocksDBConn::ERROR_KEY_NOT_FOUND)
  205. {
  206. log_error("query replicate end key failed! ret:%d", ret);
  207. return -1;
  208. }
  209. else
  210. {
  211. mReplEndKey = value;
  212. }
  213. return 0;
  214. }
  215. inline int RocksdbProcess::value_add_to_str(
  216. const DTCValue *additionValue,
  217. int ifield_type,
  218. std::string &baseValue)
  219. {
  220. log_debug("value_add_to_str ifield_type[%d]", ifield_type);
  221. if (additionValue == NULL)
  222. {
  223. log_error("value can not be null!");
  224. return -1;
  225. }
  226. switch (ifield_type)
  227. {
  228. case DField::Signed:
  229. {
  230. long long va = strtoll(baseValue.c_str(), NULL, 10);
  231. va += (long long)additionValue->s64;
  232. baseValue = std::to_string(va);
  233. break;
  234. }
  235. case DField::Unsigned:
  236. {
  237. unsigned long long va = strtoull(baseValue.c_str(), NULL, 10);
  238. va += (unsigned long long)additionValue->u64;
  239. baseValue = std::to_string(va);
  240. break;
  241. }
  242. case DField::Float:
  243. {
  244. double va = strtod(baseValue.c_str(), NULL);
  245. va += additionValue->flt;
  246. baseValue = std::to_string(va);
  247. break;
  248. }
  249. case DField::String:
  250. case DField::Binary:
  251. log_error("string type can not do add operation!");
  252. break;
  253. default:
  254. log_error("unexpected field type! type:%d", ifield_type);
  255. return -1;
  256. };
  257. return 0;
  258. }
  259. inline int RocksdbProcess::value2Str(
  260. const DTCValue *Value,
  261. int fieldId,
  262. std::string &strValue)
  263. {
  264. const DTCValue *defaultValue;
  265. bool valueNull = false;
  266. if (Value == NULL)
  267. {
  268. log_info("value is null, use user default value!");
  269. defaultValue = mTableDef->default_value(fieldId);
  270. valueNull = true;
  271. }
  272. int ifield_type = mTableDef->field_type(fieldId);
  273. {
  274. switch (ifield_type)
  275. {
  276. case DField::Signed:
  277. {
  278. int64_t val;
  279. if (valueNull)
  280. val = defaultValue->s64;
  281. else
  282. val = Value->s64;
  283. strValue = std::move(std::to_string((long long)val));
  284. break;
  285. }
  286. case DField::Unsigned:
  287. {
  288. uint64_t val;
  289. if (valueNull)
  290. val = defaultValue->u64;
  291. else
  292. val = Value->u64;
  293. strValue = std::move(std::to_string((unsigned long long)val));
  294. break;
  295. }
  296. case DField::Float:
  297. {
  298. double val;
  299. if (valueNull)
  300. val = defaultValue->flt;
  301. else
  302. val = Value->flt;
  303. strValue = std::move(std::to_string(val));
  304. break;
  305. }
  306. case DField::String:
  307. case DField::Binary:
  308. {
  309. // value whether be "" or NULL ????
  310. // in current, regard NULL as empty string, not support NULL attribute here
  311. if (valueNull)
  312. strValue = std::move(std::string(defaultValue->str.ptr, defaultValue->str.len));
  313. else
  314. {
  315. if (Value->str.is_empty())
  316. {
  317. log_info("empty str value!");
  318. strValue = "";
  319. return 0;
  320. }
  321. strValue = std::move(std::string(Value->str.ptr, Value->str.len));
  322. /*if ( mkey_type == DField::String )
  323. {
  324. // case insensitive
  325. std::transform(strValue.begin(), strValue.end(), strValue.begin(), ::tolower);
  326. }*/
  327. }
  328. break;
  329. }
  330. default:
  331. log_error("unexpected field type! type:%d", ifield_type);
  332. return -1;
  333. };
  334. }
  335. return 0;
  336. }
  337. inline int RocksdbProcess::setdefault_value(
  338. int field_type,
  339. DTCValue &Value)
  340. {
  341. switch (field_type)
  342. {
  343. case DField::Signed:
  344. Value.s64 = 0;
  345. break;
  346. case DField::Unsigned:
  347. Value.u64 = 0;
  348. break;
  349. case DField::Float:
  350. Value.flt = 0.0;
  351. break;
  352. case DField::String:
  353. Value.str.len = 0;
  354. Value.str.ptr = 0;
  355. break;
  356. case DField::Binary:
  357. Value.bin.len = 0;
  358. Value.bin.ptr = 0;
  359. break;
  360. default:
  361. Value.s64 = 0;
  362. };
  363. return (0);
  364. }
  365. inline int RocksdbProcess::str2Value(
  366. const std::string &Str,
  367. int field_type,
  368. DTCValue &Value)
  369. {
  370. if (Str == NULL)
  371. {
  372. log_debug("Str is NULL, field_type: %d. Check mysql table definition.", field_type);
  373. setdefault_value(field_type, Value);
  374. return (0);
  375. }
  376. switch (field_type)
  377. {
  378. case DField::Signed:
  379. errno = 0;
  380. Value.s64 = strtoll(Str.c_str(), NULL, 10);
  381. if (errno != 0)
  382. return (-1);
  383. break;
  384. case DField::Unsigned:
  385. errno = 0;
  386. Value.u64 = strtoull(Str.c_str(), NULL, 10);
  387. if (errno != 0)
  388. return (-1);
  389. break;
  390. case DField::Float:
  391. errno = 0;
  392. Value.flt = strtod(Str.c_str(), NULL);
  393. if (errno != 0)
  394. return (-1);
  395. break;
  396. case DField::String:
  397. {
  398. char* p = (char*)calloc(Str.length() , sizeof(char));
  399. memcpy((void*)p , (void*)Str.data() , Str.length());
  400. Value.str.ptr = p;
  401. Value.str.len = Str.length();
  402. }
  403. break;
  404. case DField::Binary:
  405. {
  406. char* p = (char*)calloc(Str.length() , sizeof(char));
  407. memcpy((void*)p , (void*)Str.data() , Str.length());
  408. Value.bin.ptr = p;
  409. Value.bin.len = Str.length();
  410. }
  411. break;
  412. default:
  413. log_error("type[%d] invalid.", field_type);
  414. return -1;
  415. }
  416. return 0;
  417. }
  418. int RocksdbProcess::condition_filter(
  419. const std::string &rocksValue,
  420. int fieldid,
  421. int fieldType,
  422. const DTCFieldValue *condition)
  423. {
  424. if (condition == NULL)
  425. return 0;
  426. bool matched;
  427. // find out the condition value
  428. for (int idx = 0; idx < condition->num_fields(); idx++)
  429. {
  430. if (mTableDef->is_volatile(idx))
  431. {
  432. log_error("volatile field, idx:%d", idx);
  433. return -1;
  434. }
  435. int fId = condition->field_id(idx);
  436. if (fId != fieldid)
  437. continue;
  438. // DTC support query condition
  439. /* enum {
  440. EQ = 0,
  441. NE = 1,
  442. LT = 2,
  443. LE = 3,
  444. GT = 4,
  445. GE = 5,
  446. TotalComparison
  447. }; */
  448. int comparator = condition->field_operation(idx);
  449. const DTCValue *condValue = condition->field_value(idx);
  450. switch (fieldType)
  451. {
  452. case DField::Signed:
  453. // matched = is_matched_template(strtoll(rocksValue.c_str(), NULL, 10), comparator, condValue.s64);
  454. matched = is_matched<int64_t>(strtoll(rocksValue.c_str(), NULL, 10), comparator, condValue->s64);
  455. if (!matched)
  456. {
  457. log_info("not match the condition, lv:%s, rv:%lld, com:%d", rocksValue.c_str(),
  458. (long long)condValue->s64, comparator);
  459. return 1;
  460. }
  461. break;
  462. case DField::Unsigned:
  463. // matched = is_matched_template(strtoull(rocksValue.c_str(), NULL, 10), comparator, condValue.u64);
  464. matched = is_matched<uint64_t>(strtoull(rocksValue.c_str(), NULL, 10), comparator, condValue->u64);
  465. if (!matched)
  466. {
  467. log_info("not match the condition, lv:%s, rv:%llu, com:%d", rocksValue.c_str(),
  468. (unsigned long long)condValue->u64, comparator);
  469. return 1;
  470. }
  471. break;
  472. case DField::Float:
  473. // matched = is_matched_template(strtod(rocksValue.c_str(), NULL, 10), comparator, condValue.flt);
  474. matched = is_matched<double>(strtod(rocksValue.c_str(), NULL), comparator, condValue->flt);
  475. if (!matched)
  476. {
  477. log_info("not match the condition, lv:%s, rv:%lf, com:%d", rocksValue.c_str(),
  478. condValue->flt, comparator);
  479. return 1;
  480. }
  481. break;
  482. case DField::String:
  483. matched = is_matched(rocksValue.c_str(), comparator, condValue->str.ptr, (int)rocksValue.length(), condValue->str.len, false);
  484. if (!matched)
  485. {
  486. log_info("not match the condition, lv:%s, rv:%s, com:%d", rocksValue.c_str(),
  487. std::string(condValue->str.ptr, condValue->str.len).c_str(), comparator);
  488. return 1;
  489. }
  490. case DField::Binary:
  491. // matched = is_matched_template(rocksValue.c_str(), comparator, condValue.str.ptr, (int)rocksValue.length(), condValue.str.len);
  492. matched = is_matched(rocksValue.c_str(), comparator, condValue->bin.ptr, (int)rocksValue.length(), condValue->bin.len, true);
  493. if (!matched)
  494. {
  495. log_info("not match the condition, lv:%s, rv:%s, com:%d", rocksValue.c_str(),
  496. std::string(condValue->bin.ptr, condValue->bin.len).c_str(), comparator);
  497. return 1;
  498. }
  499. break;
  500. default:
  501. log_error("field[%d] type[%d] invalid.", fieldid, fieldType);
  502. return -1;
  503. }
  504. }
  505. return 0;
  506. }
  507. int RocksdbProcess::condition_filter(
  508. const std::string &rocksValue,
  509. const std::string &condValue,
  510. int fieldType,
  511. int comparator)
  512. {
  513. bool matched;
  514. switch (fieldType)
  515. {
  516. case DField::Signed:
  517. matched = is_matched<int64_t>(strtoll(rocksValue.c_str(), NULL, 10), comparator, strtoll(condValue.c_str(), NULL, 10));
  518. if (!matched)
  519. {
  520. log_info("not match the condition, lv:%s, rv:%s, com:%d", rocksValue.c_str(),
  521. condValue.c_str(), comparator);
  522. return 1;
  523. }
  524. break;
  525. case DField::Unsigned:
  526. matched = is_matched<uint64_t>(strtoull(rocksValue.c_str(), NULL, 10), comparator, strtoull(condValue.c_str(), NULL, 10));
  527. if (!matched)
  528. {
  529. log_info("not match the condition, lv:%s, rv:%s, com:%d", rocksValue.c_str(),
  530. condValue.c_str(), comparator);
  531. return 1;
  532. }
  533. break;
  534. case DField::Float:
  535. matched = is_matched<double>(strtod(rocksValue.c_str(), NULL), comparator, strtod(condValue.c_str(), NULL));
  536. if (!matched)
  537. {
  538. log_info("not match the condition, lv:%s, rv:%s, com:%d", rocksValue.c_str(),
  539. condValue.c_str(), comparator);
  540. return 1;
  541. }
  542. break;
  543. case DField::String:
  544. case DField::Binary:
  545. {
  546. matched = is_matched(rocksValue.c_str(), comparator, condValue.c_str(), (int)rocksValue.length(), (int)condValue.length(), false);
  547. if (!matched)
  548. {
  549. log_info("not match the condition, lv:%s, rv:%s, com:%d", rocksValue.c_str(),
  550. condValue.c_str(), comparator);
  551. return 1;
  552. }
  553. }
  554. break;
  555. default:
  556. log_error("invalid field type[%d].", fieldType);
  557. return -1;
  558. }
  559. return 0;
  560. }
  561. template <class... Args>
  562. bool RocksdbProcess::is_matched_template(Args... len)
  563. {
  564. return is_matched(len...);
  565. }
  566. template <class T>
  567. bool RocksdbProcess::is_matched(
  568. const T lv,
  569. int comparator,
  570. const T rv)
  571. {
  572. /* enum {
  573. EQ = 0,
  574. NE = 1,
  575. LT = 2,
  576. LE = 3,
  577. GT = 4,
  578. GE = 5,
  579. TotalComparison
  580. }; */
  581. switch (comparator)
  582. {
  583. case 0:
  584. return lv == rv;
  585. case 1:
  586. return lv != rv;
  587. case 2:
  588. return lv < rv;
  589. case 3:
  590. return lv <= rv;
  591. case 4:
  592. return lv > rv;
  593. case 5:
  594. return lv >= rv;
  595. default:
  596. log_error("unsupport comparator:%d", comparator);
  597. }
  598. return false;
  599. }
  600. template bool RocksdbProcess::is_matched<int64_t>(const int64_t lv, int comp, const int64_t rv);
  601. template bool RocksdbProcess::is_matched<uint64_t>(const uint64_t lv, int comp, const uint64_t rv);
  602. template bool RocksdbProcess::is_matched<double>(const double lv, int comp, const double rv);
  603. int RocksdbProcess::memcmp_ignore_case(
  604. const void* lv,
  605. const void* rv,
  606. int count)
  607. {
  608. int iret = 0;
  609. for (int i = 0; i < count; i++){
  610. char lv_buffer = tolower(((char*)lv)[i]);
  611. char rv_buffer = tolower(((char*)rv)[i]);
  612. iret = memcmp(&lv_buffer , &rv_buffer , sizeof(char));
  613. if (iret != 0){
  614. return iret;
  615. }
  616. }
  617. return iret;
  618. }
  619. //template<>
  620. bool RocksdbProcess::is_matched(
  621. const char *lv,
  622. int comparator,
  623. const char *rv,
  624. int lLen,
  625. int rLen,
  626. bool caseSensitive)
  627. {
  628. /* enum {
  629. EQ = 0,
  630. NE = 1,
  631. LT = 2,
  632. LE = 3,
  633. GT = 4,
  634. GE = 5,
  635. TotalComparison
  636. }; */
  637. int ret;
  638. int minLen = lLen <= rLen ? lLen : rLen;
  639. switch (comparator)
  640. {
  641. case 0:
  642. if (caseSensitive)
  643. return lLen == rLen && !memcmp(lv, rv, minLen);
  644. return lLen == rLen && !memcmp_ignore_case(lv, rv, minLen);
  645. case 1:
  646. if (lLen != rLen)
  647. return true;
  648. if (caseSensitive)
  649. return memcmp(lv, rv, minLen);
  650. return memcmp_ignore_case(lv, rv, minLen);
  651. case 2:
  652. if (caseSensitive)
  653. ret = memcmp(lv, rv, minLen);
  654. else
  655. ret = memcmp_ignore_case(lv, rv, minLen);
  656. return ret < 0 || (ret == 0 && lLen < rLen);
  657. case 3:
  658. if (caseSensitive)
  659. ret = memcmp(lv, rv, minLen);
  660. else
  661. ret = memcmp_ignore_case(lv, rv, minLen);
  662. //log_error("iret:%d , len:%d ,rLen:%d", ret , lLen , rLen);
  663. return ret < 0 || (ret == 0 && lLen <= rLen);
  664. case 4:
  665. if (caseSensitive)
  666. ret = memcmp(lv, rv, minLen);
  667. else
  668. ret = memcmp_ignore_case(lv, rv, minLen);
  669. return ret > 0 || (ret == 0 && lLen > rLen);
  670. case 5:
  671. if (caseSensitive)
  672. ret = memcmp(lv, rv, minLen);
  673. else
  674. ret = memcmp_ignore_case(lv, rv, minLen);
  675. return ret > 0 || (ret == 0 && lLen >= rLen);
  676. default:
  677. log_error("unsupport comparator:%d", comparator);
  678. }
  679. return false;
  680. }
  681. int RocksdbProcess::saveRow(
  682. const std::string &compoundKey,
  683. const std::string &compoundValue,
  684. bool countOnly,
  685. int &totalRows,
  686. DTCTask *Task)
  687. {
  688. if (mCompoundKeyFieldNums + mExtraValueFieldNums <= 0)
  689. {
  690. log_error("no fields in the table! key:%s");
  691. return (-1);
  692. }
  693. int ret;
  694. // decode the compoundKey and check whether it is matched
  695. std::vector<std::string> keys;
  696. key_format::Decode(compoundKey, mKeyfield_types, keys);
  697. if (keys.size() != mCompoundKeyFieldNums)
  698. {
  699. // unmatched row
  700. log_error("unmatched row, fullKey:%s, keyNum:%lu, definitionFieldNum:%d",
  701. compoundKey.c_str(), keys.size(), mCompoundKeyFieldNums);
  702. return -1;
  703. }
  704. if (countOnly)
  705. {
  706. totalRows++;
  707. return 0;
  708. }
  709. // decode key bitmap
  710. int bitmapLen = 0;
  711. decodeBitmapKeys(compoundValue, keys, bitmapLen);
  712. //DBConn.Row[0]是key的值,mTableDef->Field[0]也是key,
  713. //因此从1开始。而结果Row是从0开始的(不包括key)
  714. RowValue *row = new RowValue(mTableDef);
  715. const DTCFieldValue *Condition = Task->request_condition();
  716. std::string fieldValue;
  717. char *valueHead = const_cast<char *>(compoundValue.data()) + bitmapLen;
  718. for (size_t idx = 0; idx < mFieldIndexMapping.size(); idx++)
  719. {
  720. int fieldId = mFieldIndexMapping[idx];
  721. if (idx < mCompoundKeyFieldNums)
  722. {
  723. fieldValue = keys[idx];
  724. }
  725. else
  726. {
  727. ret = get_value_by_id(valueHead, fieldId, fieldValue);
  728. if (ret != 0)
  729. {
  730. log_error("parse field value failed! compoundValue:%s", compoundValue.c_str());
  731. for(int i = 0; i < mTableDef->num_fields()+1; i++){
  732. if((*row)[i].str.ptr != NULL){
  733. free((*row)[i].str.ptr);
  734. }
  735. }
  736. delete row;
  737. return -1;
  738. }
  739. }
  740. log_info("save row, fieldId:%d, val:%s", fieldId, fieldValue.data());
  741. // do condition filter
  742. ret = condition_filter(fieldValue, fieldId, mTableDef->field_type(fieldId), Condition);
  743. if (ret < 0)
  744. {
  745. for(int i = 0; i < mTableDef->num_fields()+1; i++){
  746. if((*row)[i].str.ptr != NULL){
  747. free((*row)[i].str.ptr);
  748. }
  749. }
  750. delete row;
  751. log_error("string[%s] conver to value[%d] error: %d", fieldValue.c_str(), mTableDef->field_type(fieldId), ret);
  752. return (-2);
  753. }
  754. else if (ret == 1)
  755. {
  756. // condition is not matched
  757. for(int i = 0; i < mTableDef->num_fields()+1; i++){
  758. if((*row)[i].str.ptr != NULL){
  759. free((*row)[i].str.ptr);
  760. }
  761. }
  762. delete row;
  763. return 0;
  764. }
  765. // fill the value
  766. ret = str2Value(fieldValue, mTableDef->field_type(fieldId), (*row)[fieldId]);
  767. if (ret < 0)
  768. {
  769. for(int i = 0; i < mTableDef->num_fields()+1; i++){
  770. if((*row)[i].str.ptr != NULL){
  771. free((*row)[i].str.ptr);
  772. }
  773. }
  774. delete row;
  775. log_error("string[%s] conver to value[%d] error: %d", fieldValue.c_str(), mTableDef->field_type(fieldId), ret);
  776. return (-2);
  777. }
  778. }
  779. // Task->update_key(row);
  780. ret = Task->append_row(row);
  781. for(int i = 0; i < mTableDef->num_fields()+1; i++){
  782. if((*row)[i].str.ptr != NULL){
  783. free((*row)[i].str.ptr);
  784. }
  785. }
  786. delete row;
  787. if (ret < 0)
  788. {
  789. log_error("append row to task failed!");
  790. return (-3);
  791. }
  792. // totalRows++;
  793. return (0);
  794. }
  795. int RocksdbProcess::save_direct_row(
  796. const std::string &prefixKey,
  797. const std::string &compoundKey,
  798. const std::string &compoundValue,
  799. DirectRequestContext *reqCxt,
  800. DirectResponseContext *respCxt,
  801. int &totalRows)
  802. {
  803. if (mCompoundKeyFieldNums + mExtraValueFieldNums <= 0)
  804. {
  805. log_error("no fields in the table! key:%s", prefixKey.c_str());
  806. return (-1);
  807. }
  808. int ret;
  809. // decode the compoundKey and check whether it is matched
  810. std::vector<std::string> keys;
  811. key_format::Decode(compoundKey, mKeyfield_types, keys);
  812. if (keys.size() != mCompoundKeyFieldNums)
  813. {
  814. // unmatched row
  815. log_error("unmatched row, key:%s, fullKey:%s, keyNum:%lu, definitionFieldNum:%d",
  816. prefixKey.c_str(), compoundKey.c_str(), keys.size(), mCompoundKeyFieldNums);
  817. return -1;
  818. }
  819. // decode key bitmap
  820. int bitmapLen = 0;
  821. decodeBitmapKeys(compoundValue, keys, bitmapLen);
  822. std::string realValue = compoundValue.substr(bitmapLen);
  823. std::vector<std::string> values;
  824. split_values(realValue, values);
  825. assert(values.size() == mExtraValueFieldNums);
  826. int fieldId, rocksFId;
  827. std::string fieldValue;
  828. std::vector<QueryCond>& condFields = ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sFieldConds;
  829. for (size_t idx = 0; idx < condFields.size(); idx++)
  830. {
  831. fieldId = condFields[idx].sFieldIndex;
  832. rocksFId = translate_field_idx(fieldId);
  833. if (rocksFId < mCompoundKeyFieldNums)
  834. {
  835. fieldValue = keys[rocksFId];
  836. }
  837. else
  838. {
  839. fieldValue = values[rocksFId - mCompoundKeyFieldNums];
  840. }
  841. // do condition filter
  842. ret = condition_filter(fieldValue, condFields[idx].sCondValue, mTableDef->field_type(fieldId), condFields[idx].sCondOpr);
  843. if (ret < 0)
  844. {
  845. log_error("condition filt failed! key:%s", prefixKey.c_str());
  846. return -1;
  847. }
  848. else if (ret == 1)
  849. {
  850. // condition is not matched
  851. return 0;
  852. }
  853. }
  854. // deal with order by syntax
  855. if (mOrderByUnit || ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sOrderbyFields.size() > 0)
  856. {
  857. if (!mOrderByUnit)
  858. {
  859. // build order by unit
  860. int heapSize = ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sLimitCond.sLimitStart >= 0 && ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sLimitCond.sLimitStep > 0 ?
  861. ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sLimitCond.sLimitStart + ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sLimitCond.sLimitStep : 50;
  862. mOrderByUnit = new RocksdbOrderByUnit(mTableDef, heapSize,
  863. mReverseFieldIndexMapping, ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sOrderbyFields);
  864. assert(mOrderByUnit);
  865. }
  866. struct OrderByUnitElement element;
  867. element.mRocksKeys.swap(keys);
  868. element.mRocksValues.swap(values);
  869. mOrderByUnit->add_row(element);
  870. return 0;
  871. }
  872. // limit condition control
  873. ret = 0;
  874. if (((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sLimitCond.sLimitStart >= 0 && ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sLimitCond.sLimitStep > 0)
  875. {
  876. if (totalRows < ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sLimitCond.sLimitStart)
  877. {
  878. // not reach to the range of limitation
  879. totalRows++;
  880. return 0;
  881. }
  882. // leaving from the range of limitaion
  883. if (((RangeQueryRows_t*)respCxt->sDirectRespValue.uRangeQueryRows)->sRowValues.size() == ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sLimitCond.sLimitStep - 1) ret = 1;
  884. }
  885. // build row
  886. build_direct_row(keys, values, respCxt);
  887. totalRows++;
  888. return ret;
  889. }
  890. void RocksdbProcess::build_direct_row(
  891. const std::vector<std::string> &keys,
  892. const std::vector<std::string> &values,
  893. DirectResponseContext *respCxt)
  894. {
  895. int rocksFId;
  896. std::string row, fieldValue;
  897. for (size_t idx1 = 0; idx1 < mReverseFieldIndexMapping.size(); idx1++)
  898. {
  899. rocksFId = mReverseFieldIndexMapping[idx1];
  900. if (rocksFId < mCompoundKeyFieldNums)
  901. {
  902. fieldValue = keys[rocksFId];
  903. }
  904. else
  905. {
  906. fieldValue = values[rocksFId - mCompoundKeyFieldNums];
  907. }
  908. int dataLen = fieldValue.length();
  909. //row.append(std::string((char *)&dataLen, 4)).append(fieldValue);
  910. row += std::string((char *)&dataLen, 4);
  911. row += fieldValue;
  912. }
  913. ((RangeQueryRows_t*)respCxt->sDirectRespValue.uRangeQueryRows)->sRowValues.push_front(row);
  914. return;
  915. }
  916. int RocksdbProcess::update_row(
  917. const std::string &prefixKey,
  918. const std::string &compoundKey,
  919. const std::string &compoundValue,
  920. DTCTask *Task,
  921. std::string &newKey,
  922. std::string &newValue)
  923. {
  924. if (mCompoundKeyFieldNums + mExtraValueFieldNums <= 0)
  925. {
  926. log_error("no fields in the table!");
  927. return (-1);
  928. }
  929. int ret;
  930. // decode the compoundKey and check whether it is matched
  931. std::vector<std::string> keys;
  932. key_format::Decode(compoundKey, mKeyfield_types, keys);
  933. if (keys.size() != mCompoundKeyFieldNums)
  934. {
  935. // unmatched row
  936. log_error("unmatched row, key:%s, fullKey:%s, keyNum:%lu, definitionFieldNum:%d",
  937. prefixKey.c_str(), compoundKey.c_str(), keys.size(), mCompoundKeyFieldNums);
  938. return -1;
  939. }
  940. bool upKey = false, upVal = false;
  941. const DTCFieldValue *updateInfo = Task->request_operation();
  942. whether_update_key(updateInfo, upKey, upVal);
  943. int keyBitmapLen = 0;
  944. if (upKey)
  945. {
  946. // recover keys for the next update
  947. decodeBitmapKeys(compoundValue, keys, keyBitmapLen);
  948. }
  949. else
  950. {
  951. // no need to create bitmap and compound key again
  952. keyBitmapLen = get_key_bitmap_len(compoundValue);
  953. assert(keyBitmapLen >= 0);
  954. }
  955. std::string bitmapKey = compoundValue.substr(0, keyBitmapLen);
  956. std::string realValue = compoundValue.substr(keyBitmapLen);
  957. std::string fieldValue;
  958. const DTCFieldValue *Condition = Task->request_condition();
  959. char *valueHead = (char *)realValue.data();
  960. for (size_t idx = 1; idx < mFieldIndexMapping.size(); idx++)
  961. {
  962. int dtcfid = mFieldIndexMapping[idx];
  963. if (idx < mCompoundKeyFieldNums)
  964. {
  965. fieldValue = keys[idx];
  966. }
  967. else
  968. {
  969. ret = get_value_by_id(valueHead, dtcfid, fieldValue);
  970. if (ret != 0)
  971. {
  972. log_error("parse field value failed! compoundValue:%s, key:%s", realValue.c_str(), prefixKey.c_str());
  973. return -1;
  974. }
  975. }
  976. // do condition filter
  977. ret = condition_filter(fieldValue, dtcfid, mTableDef->field_type(dtcfid), Condition);
  978. if (ret < 0)
  979. {
  980. log_error("string[%s] conver to value[%d] error: %d, %m", fieldValue.c_str(), mTableDef->field_type(dtcfid), ret);
  981. return (-2);
  982. }
  983. else if (ret == 1)
  984. {
  985. // condition is not matched
  986. return 1;
  987. }
  988. }
  989. // update value
  990. std::vector<std::string> values;
  991. if (upVal)
  992. {
  993. // translate the plane raw value to individual field
  994. split_values(realValue, values);
  995. }
  996. for (int i = 0; i < updateInfo->num_fields(); i++)
  997. {
  998. const int fid = updateInfo->field_id(i);
  999. if (mTableDef->is_volatile(fid))
  1000. continue;
  1001. int rocksFId = translate_field_idx(fid);
  1002. assert(rocksFId >= 0 && rocksFId < mCompoundKeyFieldNums + mExtraValueFieldNums);
  1003. // if not update the value field, the rocksfid must be less than 'mCompoundKeyFieldNums', so it would not touch
  1004. // the container of 'values'
  1005. std::string &va = rocksFId < mCompoundKeyFieldNums ? keys[rocksFId] : values[rocksFId - mCompoundKeyFieldNums];
  1006. switch (updateInfo->field_operation(i))
  1007. {
  1008. case DField::Set:
  1009. value2Str(updateInfo->field_value(i), fid, va);
  1010. break;
  1011. case DField::Add:
  1012. value_add_to_str(updateInfo->field_value(i), updateInfo->field_type(i), va);
  1013. break;
  1014. default:
  1015. log_error("unsupport operation, opr:%d", updateInfo->field_operation(i));
  1016. return -1;
  1017. };
  1018. }
  1019. if (upKey)
  1020. {
  1021. bitmapKey.clear();
  1022. encode_bitmap_keys(keys, bitmapKey);
  1023. newKey = std::move(key_format::Encode(keys, mKeyfield_types));
  1024. }
  1025. else
  1026. newKey = compoundKey;
  1027. if (upVal)
  1028. shrink_value(values, newValue);
  1029. else
  1030. newValue = std::move(realValue);
  1031. newValue = std::move(bitmapKey.append(newValue));
  1032. return 0;
  1033. }
  1034. // query value of the key
  1035. int RocksdbProcess::process_select(DTCTask *Task)
  1036. {
  1037. log_info("come into process select!");
  1038. #ifdef PRINT_STAT
  1039. mSTime = GET_TIMESTAMP();
  1040. #endif
  1041. int ret, i;
  1042. int haslimit = !Task->count_only() && (Task->requestInfo.limit_start() || Task->requestInfo.limit_count());
  1043. // create resultWriter
  1044. ret = Task->prepare_result_no_limit();
  1045. if (ret != 0)
  1046. {
  1047. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "task prepare-result error");
  1048. log_error("task prepare-result error: %d, %m", ret);
  1049. return (-2);
  1050. }
  1051. // prefix key
  1052. std::string prefixKey;
  1053. ret = value2Str(Task->request_key(), 0, prefixKey);
  1054. if (ret != 0 || prefixKey.empty())
  1055. {
  1056. log_error("dtc primary key can not be empty!");
  1057. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "get dtc primary key failed!");
  1058. return -1;
  1059. }
  1060. if (mKeyfield_types[0] == DField::String)
  1061. std::transform(prefixKey.begin(), prefixKey.end(), prefixKey.begin(), ::tolower);
  1062. // encode the key to rocksdb format
  1063. std::string fullKey = std::move(key_format::encode_bytes(prefixKey));
  1064. if (fullKey.empty())
  1065. {
  1066. log_error("encode primary key failed! key:%s", prefixKey.c_str());
  1067. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "encode primary key failed!");
  1068. return -1;
  1069. }
  1070. std::string encodePreKey = fullKey;
  1071. std::string value;
  1072. RocksDBConn::RocksItr_t rocksItr;
  1073. ret = mDBConn->retrieve_start(fullKey, value, rocksItr, true);
  1074. if (ret < 0)
  1075. {
  1076. log_error("query rocksdb failed! key:%s, ret:%d", prefixKey.c_str(), ret);
  1077. Task->set_error(ret, __FUNCTION__, "retrieve primary key failed!");
  1078. mDBConn->retrieve_end(rocksItr);
  1079. return -1;
  1080. }
  1081. else if (ret == 1)
  1082. {
  1083. // not found the key
  1084. Task->set_total_rows(0);
  1085. log_info("no matched key:%s", prefixKey.c_str());
  1086. mDBConn->retrieve_end(rocksItr);
  1087. return 0;
  1088. }
  1089. log_info("begin to iterate key:%s", prefixKey.c_str());
  1090. // iterate the matched prefix key and find out the real one from start to end
  1091. int totalRows = 0;
  1092. bool countOnly = Task->count_only();
  1093. while (true)
  1094. {
  1095. ret = key_matched(encodePreKey, fullKey);
  1096. if (ret != 0)
  1097. {
  1098. // prefix key not matched, reach to the end
  1099. break;
  1100. }
  1101. // save row
  1102. ret = saveRow(fullKey, value, countOnly, totalRows, Task);
  1103. if (ret < 0)
  1104. {
  1105. // ignore the incorrect key and keep going
  1106. log_error("save row failed! key:%s", prefixKey.c_str());
  1107. }
  1108. // move iterator to the next key
  1109. ret = mDBConn->next_entry(rocksItr, fullKey, value);
  1110. if (ret < 0)
  1111. {
  1112. log_error("iterate rocksdb failed! key:%s", prefixKey.c_str());
  1113. Task->set_error(ret, __FUNCTION__, "iterate rocksdb failed!");
  1114. mDBConn->retrieve_end(rocksItr);
  1115. return -1;
  1116. }
  1117. else if (ret == 1)
  1118. {
  1119. // reach to the storage end
  1120. break;
  1121. }
  1122. // has remaining value in rocksdb
  1123. }
  1124. if (Task->count_only())
  1125. {
  1126. Task->set_total_rows(totalRows);
  1127. }
  1128. //bug fixed确认客户端带Limit限制
  1129. if (haslimit)
  1130. { // 获取总行数
  1131. Task->set_total_rows(totalRows, 1);
  1132. }
  1133. mDBConn->retrieve_end(rocksItr);
  1134. #ifdef PRINT_STAT
  1135. mETime = GET_TIMESTAMP();
  1136. insert_stat(OprType::eQuery, mETime - mSTime);
  1137. #endif
  1138. return (0);
  1139. }
  1140. int RocksdbProcess::process_insert(DTCTask *Task)
  1141. {
  1142. log_info("come into process insert!!!");
  1143. #ifdef PRINT_STAT
  1144. mSTime = GET_TIMESTAMP();
  1145. #endif
  1146. int ret;
  1147. set_title("INSERT...");
  1148. int totalFields = mTableDef->num_fields();
  1149. // initialize the fixed empty string vector
  1150. std::bitset<256> filledMap;
  1151. std::vector<std::string> keys(mCompoundKeyFieldNums);
  1152. std::vector<std::string> values(mExtraValueFieldNums);
  1153. ret = value2Str(Task->request_key(), 0, keys[0]);
  1154. if (ret != 0 || keys[0].empty())
  1155. {
  1156. log_error("dtc primary key can not be empty!");
  1157. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "get dtc primary key failed!");
  1158. return -1;
  1159. }
  1160. else
  1161. {
  1162. filledMap[0] = 1;
  1163. }
  1164. log_info("insert key:%s", keys[0].c_str());
  1165. if (Task->request_operation())
  1166. {
  1167. int fid, rocksFId;
  1168. const DTCFieldValue *updateInfo = Task->request_operation();
  1169. for (int i = 0; i < updateInfo->num_fields(); ++i)
  1170. {
  1171. fid = updateInfo->field_id(i);
  1172. if (mTableDef->is_volatile(fid))
  1173. continue;
  1174. rocksFId = translate_field_idx(fid);
  1175. assert(rocksFId >= 0 && rocksFId < mCompoundKeyFieldNums + mExtraValueFieldNums);
  1176. if (fid == 0)
  1177. continue;
  1178. std::string &va = rocksFId < mCompoundKeyFieldNums ? keys[rocksFId] : values[rocksFId - mCompoundKeyFieldNums];
  1179. ret = value2Str(updateInfo->field_value(i), fid, va);
  1180. assert(ret == 0);
  1181. filledMap[fid] = 1;
  1182. }
  1183. }
  1184. // set default value
  1185. for (int i = 1; i <= totalFields; ++i)
  1186. {
  1187. int rocksFId;
  1188. if (mTableDef->is_volatile(i))
  1189. continue;
  1190. if (filledMap[i])
  1191. continue;
  1192. rocksFId = translate_field_idx(i);
  1193. assert(rocksFId >= 0 && rocksFId < mCompoundKeyFieldNums + mExtraValueFieldNums);
  1194. std::string &va = rocksFId < mCompoundKeyFieldNums ? keys[rocksFId] : values[rocksFId - mCompoundKeyFieldNums];
  1195. ret = value2Str(mTableDef->default_value(i), i, va);
  1196. assert(ret == 0);
  1197. }
  1198. #ifdef DEBUG_INFO
  1199. std::stringstream ss;
  1200. ss << "insert row:[";
  1201. for (size_t idx = 0; idx < mCompoundKeyFieldNums; idx++)
  1202. {
  1203. ss << keys[idx];
  1204. if (idx != mCompoundKeyFieldNums - 1)
  1205. ss << ",";
  1206. }
  1207. ss << "]";
  1208. log_error("%s", ss.str().c_str());
  1209. #endif
  1210. // convert string type 'key' into lower case and build case letter bitmap
  1211. std::string keyBitmaps;
  1212. encode_bitmap_keys(keys, keyBitmaps);
  1213. std::string rocksKey, rocksValue;
  1214. rocksKey = std::move(key_format::Encode(keys, mKeyfield_types));
  1215. // value use plane style to organize, no need to encode
  1216. ret = shrink_value(values, rocksValue);
  1217. if (ret != 0)
  1218. {
  1219. std::string rkey;
  1220. value2Str(Task->request_key(), 0, rkey);
  1221. log_error("shrink value failed, key:%s", rkey.c_str());
  1222. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "shrink buff failed!");
  1223. return -1;
  1224. }
  1225. // add key bitmaps to the rocksdb value field
  1226. keyBitmaps.append(rocksValue);
  1227. log_info("save kv, key:%s, value:%s", rocksKey.c_str(), rocksValue.c_str());
  1228. ret = mDBConn->insert_entry(rocksKey, keyBitmaps, true);
  1229. if (ret != 0)
  1230. {
  1231. std::string rkey;
  1232. value2Str(Task->request_key(), 0, rkey);
  1233. if (ret != -ER_DUP_ENTRY)
  1234. {
  1235. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "insert key failed!");
  1236. log_error("insert key failed, ret:%d, key:%s", ret, rkey.c_str());
  1237. }
  1238. else
  1239. {
  1240. Task->set_error_dup(ret, __FUNCTION__, "insert entry failed!");
  1241. log_error("insert duplicate key : %s", rkey.c_str());
  1242. }
  1243. return (-1);
  1244. }
  1245. Task->resultInfo.set_affected_rows(1);
  1246. #ifdef PRINT_STAT
  1247. mETime = GET_TIMESTAMP();
  1248. insert_stat(OprType::eInsert, mETime - mSTime);
  1249. #endif
  1250. return (0);
  1251. }
  1252. // update the exist rows matched the condition
  1253. int RocksdbProcess::process_update(DTCTask *Task)
  1254. {
  1255. log_info("come into rocks update");
  1256. #ifdef PRINT_STAT
  1257. mSTime = GET_TIMESTAMP();
  1258. #endif
  1259. int ret, affectRows = 0;
  1260. // prefix key
  1261. std::string prefixKey;
  1262. ret = value2Str(Task->request_key(), 0, prefixKey);
  1263. if (ret != 0)
  1264. {
  1265. log_error("get dtc primary key failed! key");
  1266. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "get key failed!");
  1267. return -1;
  1268. }
  1269. log_info("update key:%s", prefixKey.c_str());
  1270. if (Task->request_operation() == NULL)
  1271. {
  1272. log_info("request operation info is null!");
  1273. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "update field not found");
  1274. return (-1);
  1275. }
  1276. if (Task->request_operation()->has_type_commit() == 0)
  1277. {
  1278. // pure volatile fields update, always succeed
  1279. log_info("update pure volatile fields!");
  1280. Task->resultInfo.set_affected_rows(affectRows);
  1281. return (0);
  1282. }
  1283. const DTCFieldValue *updateInfo = Task->request_operation();
  1284. if (updateInfo == NULL)
  1285. {
  1286. // no need to do update
  1287. log_info("request update info is null!");
  1288. Task->resultInfo.set_affected_rows(affectRows);
  1289. return (0);
  1290. }
  1291. set_title("UPDATE...");
  1292. bool updateKey = false, updateValue = false;
  1293. whether_update_key(updateInfo, updateKey, updateValue);
  1294. if (!updateKey && !updateValue)
  1295. {
  1296. // no need to do update
  1297. log_info("no change for the row!");
  1298. Task->resultInfo.set_affected_rows(affectRows);
  1299. return (0);
  1300. }
  1301. if (mKeyfield_types[0] == DField::String)
  1302. std::transform(prefixKey.begin(), prefixKey.end(), prefixKey.begin(), ::tolower);
  1303. // encode the key to rocksdb format
  1304. std::string fullKey = std::move(key_format::encode_bytes(prefixKey));
  1305. std::string encodePreKey = fullKey;
  1306. std::string compoundValue;
  1307. RocksDBConn::RocksItr_t rocksItr;
  1308. ret = mDBConn->retrieve_start(fullKey, compoundValue, rocksItr, true);
  1309. if (ret < 0)
  1310. {
  1311. log_info("retrieve rocksdb failed, key:%s", prefixKey.c_str());
  1312. Task->set_error_dup(ret, __FUNCTION__, "retrieve rocksdb failed!");
  1313. mDBConn->retrieve_end(rocksItr);
  1314. return -1;
  1315. }
  1316. else if (ret == 1)
  1317. {
  1318. // not found the key
  1319. log_info("no matched key:%s", prefixKey.c_str());
  1320. Task->resultInfo.set_affected_rows(affectRows);
  1321. mDBConn->retrieve_end(rocksItr);
  1322. return 0;
  1323. }
  1324. // retrieve the range keys one by one
  1325. std::set<std::string> deletedKeys;
  1326. std::map<std::string, std::string> newEntries;
  1327. if (updateKey)
  1328. {
  1329. // Will update the row, so we need to keep the whole row in the memory for checking
  1330. // the unique constraints
  1331. // Due to the limitaion of the memory, we can not hold all rows in the memory sometimes,
  1332. // so use the LRU to evit the oldest row
  1333. std::set<std::string> originKeys; // keys located in rocksdb those before doing update
  1334. while (true)
  1335. {
  1336. // find if the key has been update before, if yes, should rollback the previously update
  1337. auto itr = newEntries.find(fullKey);
  1338. if (itr != newEntries.end())
  1339. {
  1340. log_info("duplicated entry, key:%s", fullKey.c_str());
  1341. // report alarm
  1342. Task->set_error_dup(-ER_DUP_ENTRY, __FUNCTION__, "duplicate key!");
  1343. mDBConn->retrieve_end(rocksItr);
  1344. return -1;
  1345. }
  1346. ret = key_matched(encodePreKey, fullKey);
  1347. if (ret != 0)
  1348. {
  1349. // prefix key not matched, reach to the end
  1350. break;
  1351. }
  1352. // update row
  1353. std::string newKey, newValue;
  1354. ret = update_row(prefixKey, fullKey, compoundValue, Task, newKey, newValue);
  1355. if (ret < 0)
  1356. {
  1357. // ignore the incorrect key and keep going
  1358. log_error("save row failed! key:%s, compoundValue:%s", fullKey.c_str(), compoundValue.c_str());
  1359. }
  1360. else if (ret == 1)
  1361. {
  1362. // key matched, but condition mismatched, keep on retrieve
  1363. originKeys.insert(fullKey);
  1364. }
  1365. else
  1366. {
  1367. {
  1368. ret = rocks_key_matched(fullKey, newKey);
  1369. if (ret == 0)
  1370. {
  1371. log_info("duplicated entry, newkey:%s", newKey.c_str());
  1372. Task->set_error_dup(-ER_DUP_ENTRY, __FUNCTION__, "duplicate key!");
  1373. mDBConn->retrieve_end(rocksItr);
  1374. return -1;
  1375. }
  1376. }
  1377. if ( originKeys.find(newKey) == originKeys.end()
  1378. && newEntries.find(newKey) == newEntries.end()
  1379. && deletedKeys.find(newKey) == deletedKeys.end() )
  1380. {
  1381. // there are so many duplcate string save in the different containers, this will
  1382. // waste too much space, we can optimize it in the future
  1383. affectRows++;
  1384. deletedKeys.insert(fullKey);
  1385. newEntries[newKey] = newValue;
  1386. }
  1387. else
  1388. {
  1389. // duplicate key, ignore it
  1390. log_info("duplicated entry, newkey:%s", newKey.c_str());
  1391. Task->set_error_dup(-ER_DUP_ENTRY, __FUNCTION__, "duplicate key!");
  1392. mDBConn->retrieve_end(rocksItr);
  1393. return -1;
  1394. }
  1395. }
  1396. // move iterator to the next key
  1397. ret = mDBConn->next_entry(rocksItr, fullKey, compoundValue);
  1398. if (ret < 0)
  1399. {
  1400. log_error("retrieve compoundValue failed, key:%s", prefixKey.c_str());
  1401. Task->set_error_dup(ret, __FUNCTION__, "get next entry failed!");
  1402. mDBConn->retrieve_end(rocksItr);
  1403. return -1;
  1404. }
  1405. else if (ret == 1)
  1406. {
  1407. // no remaining rows in the storage
  1408. break;
  1409. }
  1410. // has remaining compoundValue in rocksdb
  1411. }
  1412. }
  1413. else
  1414. {
  1415. // do not break the unique key constraints, no need to hold the entire row in memory
  1416. // iterate the matched prefix key and find out the real one from start to end
  1417. while (true)
  1418. {
  1419. ret = key_matched(encodePreKey, fullKey);
  1420. if (ret != 0)
  1421. {
  1422. // prefix key not matched, reach to the end
  1423. break;
  1424. }
  1425. // update row
  1426. std::string newKey, newValue;
  1427. ret = update_row(prefixKey, fullKey, compoundValue, Task, newKey, newValue);
  1428. if (ret < 0)
  1429. {
  1430. // ignore the incorrect key and keep going
  1431. log_error("save row failed! key:%s, compoundValue:%s", fullKey.c_str(), compoundValue.c_str());
  1432. }
  1433. else if (ret == 1)
  1434. {
  1435. // key matched, but condition mismatched, keep on retrieve
  1436. }
  1437. else
  1438. {
  1439. affectRows++;
  1440. newEntries[fullKey] = newValue;
  1441. }
  1442. // move iterator to the next key
  1443. ret = mDBConn->next_entry(rocksItr, fullKey, compoundValue);
  1444. if (ret < 0)
  1445. {
  1446. log_error("retrieve compoundValue failed, key:%s", prefixKey.c_str());
  1447. Task->set_error_dup(ret, __FUNCTION__, "get next entry failed!");
  1448. mDBConn->retrieve_end(rocksItr);
  1449. return -1;
  1450. }
  1451. else if (ret == 1)
  1452. {
  1453. // reach to the storage end
  1454. break;
  1455. }
  1456. // has remaining compoundValue in rocksdb
  1457. }
  1458. }
  1459. #ifdef DEBUG_INFO
  1460. std::vector<std::string> keys;
  1461. std::stringstream ss;
  1462. ss << "delete keys:[";
  1463. auto itr = deletedKeys.begin();
  1464. while (itr != deletedKeys.end())
  1465. {
  1466. keys.clear();
  1467. key_format::Decode(*itr, mKeyfield_types, keys);
  1468. ss << "(";
  1469. for (size_t idx = 0; idx < keys.size(); idx++)
  1470. {
  1471. ss << keys[idx];
  1472. if (idx != keys.size() - 1)
  1473. ss << ",";
  1474. }
  1475. ss << ") ";
  1476. itr++;
  1477. }
  1478. ss << "]";
  1479. ss << "new keys:[";
  1480. auto itrM = newEntries.begin();
  1481. while (itrM != newEntries.end())
  1482. {
  1483. keys.clear();
  1484. key_format::Decode(itrM->first, mKeyfield_types, keys);
  1485. ss << "(";
  1486. for (size_t idx = 0; idx < keys.size(); idx++)
  1487. {
  1488. ss << keys[idx];
  1489. if (idx != keys.size() - 1)
  1490. ss << ",";
  1491. }
  1492. ss << ") ";
  1493. itrM++;
  1494. }
  1495. ss << "]";
  1496. log_error("%s", ss.str().c_str());
  1497. #endif
  1498. // do atomic update
  1499. ret = mDBConn->batch_update(deletedKeys, newEntries, true);
  1500. if (ret != 0)
  1501. {
  1502. log_error("batch update rocksdb failed! key:%s", prefixKey.c_str());
  1503. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "update failed!");
  1504. return -1;
  1505. }
  1506. mDBConn->retrieve_end(rocksItr);
  1507. Task->resultInfo.set_affected_rows(affectRows);
  1508. #ifdef PRINT_STAT
  1509. mETime = GET_TIMESTAMP();
  1510. insert_stat(OprType::eUpdate, mETime - mSTime);
  1511. #endif
  1512. return (0);
  1513. }
  1514. int RocksdbProcess::whether_update_key(
  1515. const DTCFieldValue *UpdateInfo,
  1516. bool &updateKey,
  1517. bool &updateValue)
  1518. {
  1519. int fieldId;
  1520. updateKey = false;
  1521. updateValue = false;
  1522. for (int i = 0; i < UpdateInfo->num_fields(); i++)
  1523. {
  1524. const int fid = UpdateInfo->field_id(i);
  1525. if (mTableDef->is_volatile(fid))
  1526. continue;
  1527. assert(fid < mFieldIndexMapping.size());
  1528. for (size_t idx = 0; idx < mFieldIndexMapping.size(); idx++)
  1529. {
  1530. if (fid == mFieldIndexMapping[idx])
  1531. {
  1532. if (idx < mCompoundKeyFieldNums)
  1533. updateKey = true;
  1534. else
  1535. updateValue = true;
  1536. break;
  1537. }
  1538. }
  1539. }
  1540. return 0;
  1541. }
  1542. int RocksdbProcess::process_delete(DTCTask *Task)
  1543. {
  1544. int ret, affectRows = 0;
  1545. // prefix key
  1546. std::string prefixKey;
  1547. ret = value2Str(Task->request_key(), 0, prefixKey);
  1548. if (ret != 0)
  1549. {
  1550. log_error("get dtc primary key failed! key");
  1551. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "get key failed!");
  1552. return -1;
  1553. }
  1554. std::set<int> req_ids;
  1555. std::vector<std::string> quick_keys(mCompoundKeyFieldNums);
  1556. quick_keys[0] = prefixKey;
  1557. DTCFieldValue *req_condition = (DTCFieldValue*)Task->request_condition();
  1558. if(req_condition != NULL){
  1559. for ( int i = 0; i < req_condition->num_fields(); i++ ){
  1560. int req_field_id = req_condition->field_id(i);
  1561. if ( mTableDef->is_volatile(req_field_id) )
  1562. continue;
  1563. int rocks_fid = translate_field_idx(req_field_id);
  1564. assert(rocks_fid >= 0 && rocks_fid < mCompoundKeyFieldNums + mExtraValueFieldNums);
  1565. if ( req_field_id == 0 || rocks_fid == 0 || rocks_fid >= mCompoundKeyFieldNums)
  1566. continue;
  1567. req_ids.insert(rocks_fid);
  1568. std::string& va = quick_keys[rocks_fid];
  1569. ret = value2Str(req_condition->field_value(i), req_field_id, va);
  1570. assert( ret == 0 );
  1571. }
  1572. bool hit_all_key = true;
  1573. for ( int idx = 1; idx < mCompoundKeyFieldNums; idx++ ){
  1574. if(req_ids.find(idx) == req_ids.end()){
  1575. hit_all_key = false;
  1576. }
  1577. }
  1578. if(true == hit_all_key){
  1579. std::stringstream ss;
  1580. for(auto it :req_ids) { ss << it << ","; }
  1581. std::stringstream sskey;
  1582. for(auto key_it :quick_keys) { sskey << key_it << ","; }
  1583. log_debug("hit all unique keys, goto quick delete, req_ids: %s, keys: %s, mCompoundKeyFieldNums: %d",
  1584. ss.str().c_str(), sskey.str().c_str(), mCompoundKeyFieldNums);
  1585. std::string quick_rocks_Key;
  1586. std::string keyBitmaps;
  1587. encode_bitmap_keys(quick_keys, keyBitmaps);
  1588. quick_rocks_Key = std::move(key_format::Encode(quick_keys, mKeyfield_types));
  1589. ret = mDBConn->delete_entry(quick_rocks_Key);
  1590. if ( ret != 0 ) {
  1591. log_error("deleteEntry failed! ");
  1592. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "deleteEntry failed!");
  1593. return -1;
  1594. }
  1595. log_debug("quick delete success");
  1596. return 0;
  1597. }
  1598. req_ids.clear();
  1599. }
  1600. if (mKeyfield_types[0] == DField::String)
  1601. std::transform(prefixKey.begin(), prefixKey.end(), prefixKey.begin(), ::tolower);
  1602. // encode the key to rocksdb format
  1603. std::string fullKey = std::move(key_format::encode_bytes(prefixKey));
  1604. std::string encodePreKey = fullKey;
  1605. std::string compoundValue;
  1606. RocksDBConn::RocksItr_t rocksItr;
  1607. ret = mDBConn->retrieve_start(fullKey, compoundValue, rocksItr, true);
  1608. if (ret < 0)
  1609. {
  1610. log_error("retrieve rocksdb failed! key:%s", fullKey.c_str());
  1611. Task->set_error_dup(ret, __FUNCTION__, "retrieve failed!");
  1612. mDBConn->retrieve_end(rocksItr);
  1613. return -1;
  1614. }
  1615. else if (ret == 1)
  1616. {
  1617. // not found the key
  1618. Task->resultInfo.set_affected_rows(affectRows);
  1619. mDBConn->retrieve_end(rocksItr);
  1620. log_info("no matched key:%s", prefixKey.c_str());
  1621. return 0;
  1622. }
  1623. // iterate the matched prefix key and find out the real one from start to end
  1624. bool condMatch = true;
  1625. int bitmapLen = 0;
  1626. DTCFieldValue *condition;
  1627. std::set<std::string> deleteKeys;
  1628. while (true)
  1629. {
  1630. // check whether the key is in the range of the prefix of the 'fullKey'
  1631. ret = key_matched(encodePreKey, fullKey);
  1632. if (ret != 0)
  1633. {
  1634. // prefix key not matched, reach to the end
  1635. break;
  1636. }
  1637. // decode the compoundKey and check whether it is matched
  1638. std::string realValue;
  1639. std::vector<std::string> keys;
  1640. std::vector<std::string> values;
  1641. key_format::Decode(fullKey, mKeyfield_types, keys);
  1642. assert(keys.size() > 0);
  1643. ret = prefixKey.compare(keys[0]);
  1644. // if ( ret != 0 ) goto NEXT_ENTRY;
  1645. if (ret != 0)
  1646. break;
  1647. if (keys.size() != mCompoundKeyFieldNums)
  1648. {
  1649. // unmatched row
  1650. log_error("unmatched row, fullKey:%s, keyNum:%lu, definitionFieldNum:%d",
  1651. fullKey.c_str(), keys.size(), mCompoundKeyFieldNums);
  1652. ret = 0;
  1653. }
  1654. // decode key bitmap
  1655. decodeBitmapKeys(compoundValue, keys, bitmapLen);
  1656. realValue = compoundValue.substr(bitmapLen);
  1657. split_values(realValue, values);
  1658. if (values.size() != mExtraValueFieldNums)
  1659. {
  1660. log_info("unmatched row, fullKey:%s, value:%s, keyNum:%lu, valueNum:%lu",
  1661. fullKey.c_str(), compoundValue.c_str(), keys.size(), values.size());
  1662. ret = 0;
  1663. }
  1664. // condition filter
  1665. condition = (DTCFieldValue *)Task->request_condition();
  1666. for (size_t idx = 1; idx < mFieldIndexMapping.size(); idx++)
  1667. {
  1668. int fieldId = mFieldIndexMapping[idx];
  1669. std::string &fieldValue = idx < mCompoundKeyFieldNums ? keys[idx] : values[idx - mCompoundKeyFieldNums];
  1670. // do condition filter
  1671. ret = condition_filter(fieldValue, fieldId, mTableDef->field_type(fieldId), condition);
  1672. if (ret < 0)
  1673. {
  1674. log_error("string[%s] conver to value[%d] error: %d, %m", fieldValue.c_str(), mTableDef->field_type(fieldId), ret);
  1675. condMatch = false;
  1676. break;
  1677. }
  1678. else if (ret == 1)
  1679. {
  1680. // condition is not matched
  1681. condMatch = false;
  1682. break;
  1683. }
  1684. }
  1685. if (condMatch)
  1686. {
  1687. #ifdef DEBUG_INFO
  1688. std::stringstream ss;
  1689. ss << "delete key:[";
  1690. for (size_t idx = 0; idx < mCompoundKeyFieldNums; idx++)
  1691. {
  1692. ss << keys[idx];
  1693. if (idx != mCompoundKeyFieldNums - 1)
  1694. ss << ",";
  1695. }
  1696. ss << "]";
  1697. log_error("%s", ss.str().c_str());
  1698. #endif
  1699. deleteKeys.insert(fullKey);
  1700. affectRows++;
  1701. }
  1702. NEXT_ENTRY:
  1703. // move iterator to the next key
  1704. ret = mDBConn->next_entry(rocksItr, fullKey, compoundValue);
  1705. if (ret < 0)
  1706. {
  1707. Task->set_error_dup(ret, __FUNCTION__, "get next entry failed!");
  1708. mDBConn->retrieve_end(rocksItr);
  1709. return -1;
  1710. }
  1711. else if (ret == 1)
  1712. {
  1713. // reach to the end of the storage
  1714. break;
  1715. }
  1716. }
  1717. // delete key from Rocksdb storage, inside the 'retrieve end' for transaction isolation, this delete will be not seen before release this retrieve
  1718. ret = mDBConn->batch_update(deleteKeys, std::map<std::string, std::string>(), true);
  1719. if (ret != 0)
  1720. {
  1721. log_error("batch update rocksdb failed! key:%s", prefixKey.c_str());
  1722. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "update failed!");
  1723. return -1;
  1724. }
  1725. mDBConn->retrieve_end(rocksItr);
  1726. Task->resultInfo.set_affected_rows(affectRows);
  1727. return (0);
  1728. }
  1729. int RocksdbProcess::process_task(DTCTask *Task)
  1730. {
  1731. if (Task == NULL)
  1732. {
  1733. log_error("Task is NULL!%s", "");
  1734. return (-1);
  1735. }
  1736. mTableDef = TableDefinitionManager::Instance()->get_cur_table_def();
  1737. switch (Task->request_code())
  1738. {
  1739. case DRequest::Nop:
  1740. case DRequest::Purge:
  1741. case DRequest::Flush:
  1742. return 0;
  1743. case DRequest::Get:
  1744. return process_select(Task);
  1745. case DRequest::Insert:
  1746. return process_insert(Task);
  1747. case DRequest::Update:
  1748. return process_update(Task);
  1749. case DRequest::Delete:
  1750. return process_delete(Task);
  1751. case DRequest::Replace:
  1752. return process_replace(Task);
  1753. case DRequest::ReloadConfig:
  1754. return process_reload_config(Task);
  1755. // master-slave replication
  1756. case DRequest::Replicate:
  1757. return ProcessReplicate(Task);
  1758. // cluster scaleable
  1759. //case DRequest::Migrate:
  1760. // return ProcessMigrate();
  1761. default:
  1762. Task->set_error(-EC_BAD_COMMAND, __FUNCTION__, "invalid request-code");
  1763. return (-1);
  1764. }
  1765. }
  1766. //add by frankyang 处理更新过的交易日志
  1767. int RocksdbProcess::process_replace(DTCTask *Task)
  1768. {
  1769. int ret, affectRows = 0;
  1770. set_title("REPLACE...");
  1771. // primary key in DTC
  1772. std::vector<std::string> keys, values;
  1773. keys.resize(mCompoundKeyFieldNums);
  1774. values.resize(mExtraValueFieldNums);
  1775. std::string strKey;
  1776. value2Str(Task->request_key(), 0, strKey);
  1777. keys[0] = strKey;
  1778. // deal update info
  1779. const DTCFieldValue *updateInfo = Task->request_operation();
  1780. if (updateInfo != NULL)
  1781. {
  1782. for (int idx = 0; idx < updateInfo->num_fields(); idx++)
  1783. {
  1784. const int fid = updateInfo->field_id(idx);
  1785. if (mTableDef->is_volatile(fid))
  1786. continue;
  1787. std::string fieldValue;
  1788. switch (updateInfo->field_operation(idx))
  1789. {
  1790. case DField::Set:
  1791. {
  1792. ret = value2Str(updateInfo->field_value(idx), fid, fieldValue);
  1793. if (ret != 0)
  1794. {
  1795. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "get value failed!");
  1796. log_error("translate value failed");
  1797. return (-1);
  1798. }
  1799. break;
  1800. }
  1801. case DField::Add:
  1802. {
  1803. // add additional value to the defaule value
  1804. const DTCValue *addValue = updateInfo->field_value(idx);
  1805. const DTCValue *defValue = mTableDef->default_value(idx);
  1806. switch (updateInfo->field_type(idx))
  1807. {
  1808. case DField::Signed:
  1809. fieldValue = std::move(std::to_string((long long)(addValue->s64 + defValue->s64)));
  1810. break;
  1811. case DField::Unsigned:
  1812. fieldValue = std::move(std::to_string((unsigned long long)(addValue->u64 + defValue->u64)));
  1813. break;
  1814. case DField::Float:
  1815. fieldValue = std::move(std::to_string(addValue->flt + defValue->flt));
  1816. break;
  1817. default:
  1818. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "unkonwn field type!");
  1819. log_error("unsupport field data type:%d", updateInfo->field_type(idx));
  1820. return -1;
  1821. }
  1822. break;
  1823. }
  1824. default:
  1825. log_error("unsupport field operation:%d", updateInfo->field_operation(idx));
  1826. break;
  1827. }
  1828. int rocksidx = translate_field_idx(fid);
  1829. assert(rocksidx >= 0 && rocksidx < mCompoundKeyFieldNums + mExtraValueFieldNums);
  1830. rocksidx < mCompoundKeyFieldNums ? keys[rocksidx] = std::move(fieldValue) : values[rocksidx - mCompoundKeyFieldNums] = std::move(fieldValue);
  1831. }
  1832. }
  1833. // deal default value
  1834. uint8_t mask[32];
  1835. FIELD_ZERO(mask);
  1836. if (updateInfo)
  1837. updateInfo->build_field_mask(mask);
  1838. for (int i = 1; i <= mTableDef->num_fields(); i++)
  1839. {
  1840. if (FIELD_ISSET(i, mask) || mTableDef->is_volatile(i))
  1841. continue;
  1842. std::string fieldValue;
  1843. ret = value2Str(mTableDef->default_value(i), i, fieldValue);
  1844. if (ret != 0)
  1845. {
  1846. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "get value failed!");
  1847. log_error("translate value failed");
  1848. return (-1);
  1849. }
  1850. int rocksidx = translate_field_idx(i);
  1851. assert(rocksidx >= 0 && rocksidx < mCompoundKeyFieldNums + mExtraValueFieldNums);
  1852. rocksidx < mCompoundKeyFieldNums ? keys[rocksidx] = std::move(fieldValue) : values[rocksidx - mCompoundKeyFieldNums] = std::move(fieldValue);
  1853. }
  1854. // convert string type 'key' into lower case and build case letter bitmap
  1855. std::string keyBitmaps;
  1856. encode_bitmap_keys(keys, keyBitmaps);
  1857. std::string rocksKey, rocksValue;
  1858. rocksKey = std::move(key_format::Encode(keys, mKeyfield_types));
  1859. shrink_value(values, rocksValue);
  1860. // add key bitmaps to the rocksdb value field
  1861. keyBitmaps.append(rocksValue);
  1862. ret = mDBConn->replace_entry(rocksKey, keyBitmaps, true);
  1863. if (ret == 0)
  1864. {
  1865. Task->resultInfo.set_affected_rows(1);
  1866. return 0;
  1867. }
  1868. log_error("replace key failed, key:%s, code:%d", rocksKey.c_str(), ret);
  1869. Task->set_error_dup(ret, __FUNCTION__, "replace key failed!");
  1870. return -1;
  1871. }
  1872. int RocksdbProcess::ProcessReplicate(DTCTask *Task)
  1873. {
  1874. log_info("come into rocksdb replicate!");
  1875. int ret, totalRows = 0;
  1876. // create resultWriter
  1877. ret = Task->prepare_result_no_limit();
  1878. if (ret != 0)
  1879. {
  1880. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "task prepare-result error");
  1881. log_error("task prepare-result error: %d, %m", ret);
  1882. return (-2);
  1883. }
  1884. // key for replication start:
  1885. std::string startKey, prevPrimaryKey, compoundKey, compoundValue;
  1886. RocksDBConn::RocksItr_t rocksItr;
  1887. // whether start a newly replication or not
  1888. uint32_t start = Task->requestInfo.limit_start();
  1889. uint32_t count = Task->requestInfo.limit_count();
  1890. // if full replicate start from 0 and the start key is empty, means it's a newly replication
  1891. bool isBeginRepl = (start == 0);
  1892. if (likely(!isBeginRepl))
  1893. {
  1894. // replicate with user given key
  1895. ret = value2Str(Task->request_key(), 0, startKey);
  1896. if (ret != 0 || startKey.empty())
  1897. {
  1898. log_error("replicate key can not be empty!");
  1899. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "get replicate key failed!");
  1900. return -1;
  1901. }
  1902. // encode the key to rocksdb format
  1903. compoundKey = std::move(key_format::encode_bytes(startKey));
  1904. if (compoundKey.empty())
  1905. {
  1906. log_error("encode primary key failed! key:%s", startKey.c_str());
  1907. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "encode replicate key failed!");
  1908. return -1;
  1909. }
  1910. prevPrimaryKey = compoundKey;
  1911. ret = mDBConn->search_lower_bound(compoundKey, compoundValue, rocksItr);
  1912. }
  1913. else
  1914. {
  1915. #if 0
  1916. // get the last key for replication finished tag
  1917. ret = mDBConn->get_last_entry(compoundKey, compoundValue, rocksItr);
  1918. if ( ret < 0 )
  1919. {
  1920. // replicate error, let the user decide to try again
  1921. log_error("get last key failed!");
  1922. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "get last replicate key failed!");
  1923. return -1;
  1924. }
  1925. else if ( ret == 1 )
  1926. {
  1927. // empty database
  1928. Task->set_total_rows(0);
  1929. return 0;
  1930. }
  1931. // set the finished key of replicating into meta data column family
  1932. // delete the odd migrate-end-key from that may insert by the previous slave
  1933. mReplEndKey = compoundKey;
  1934. // use replace api to instead of insert, in case there has multi slave replicator, all
  1935. // of them should always replicate to the latest one key
  1936. ret = mDBConn->replace_entry(gReplicatePrefixKey, mReplEndKey, true, RocksDBConn::COLUMN_META_DATA);
  1937. if ( ret != 0 )
  1938. {
  1939. log_error("save replicating-finished-key failed! key:%s", mReplEndKey.c_str());
  1940. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "save replicating finished key failed!");
  1941. return -1;
  1942. }
  1943. #endif
  1944. // do forward retrieving for reducing duplicate replication
  1945. startKey = "";
  1946. prevPrimaryKey = "";
  1947. ret = mDBConn->get_first_entry(compoundKey, compoundValue, rocksItr);
  1948. }
  1949. if (ret < 0)
  1950. {
  1951. log_error("query rocksdb failed! isBeginRepl:%d, key:%s", isBeginRepl, startKey.c_str());
  1952. Task->set_error_dup(ret, __FUNCTION__, "do replication failed!");
  1953. mDBConn->retrieve_end(rocksItr);
  1954. return -1;
  1955. }
  1956. else if (ret == 1)
  1957. {
  1958. // not found the key
  1959. Task->set_total_rows(0);
  1960. log_info("do full replication finished! %s", startKey.c_str());
  1961. mDBConn->retrieve_end(rocksItr);
  1962. return 0;
  1963. }
  1964. // iterate the matched prefix key and find out the real one from start to end
  1965. int replLen = 0;
  1966. while (true)
  1967. {
  1968. // 1.skip the user given key
  1969. // 2.the same prefix key only get once
  1970. if (key_matched(prevPrimaryKey, compoundKey) == 0)
  1971. {
  1972. // ignore the matched key that has been migrated in the previous call
  1973. }
  1974. else
  1975. {
  1976. // save row
  1977. ret = saveRow(compoundKey, compoundValue, false, totalRows, Task);
  1978. if (ret < 0)
  1979. {
  1980. // ignore the incorrect key and keep going
  1981. log_error("save row failed! key:%s, value:%s", compoundKey.c_str(), compoundValue.c_str());
  1982. }
  1983. key_format::get_format_key(compoundKey, mKeyfield_types[0], prevPrimaryKey);
  1984. }
  1985. // move iterator to the next key
  1986. ret = mDBConn->next_entry(rocksItr, compoundKey, compoundValue);
  1987. if (ret < 0)
  1988. {
  1989. log_error("iterate rocksdb failed! key:%s", startKey.c_str());
  1990. Task->set_error(ret, __FUNCTION__, "iterate rocksdb failed!");
  1991. mDBConn->retrieve_end(rocksItr);
  1992. return -1;
  1993. }
  1994. else if (ret == 1)
  1995. {
  1996. // reach to the end
  1997. break;
  1998. }
  1999. // has remaining value in rocksdb
  2000. if (totalRows >= count)
  2001. break;
  2002. // replLen += (compoundKey,length() + compoundValue.length());
  2003. // if ( relpLen >= MAX_REPLICATE_LEN )
  2004. // {
  2005. // // avoid network congestion
  2006. // break;
  2007. // }
  2008. }
  2009. Task->set_total_rows(totalRows);
  2010. mDBConn->retrieve_end(rocksItr);
  2011. return (0);
  2012. }
  2013. int RocksdbProcess::process_direct_query(
  2014. DirectRequestContext *reqCxt,
  2015. DirectResponseContext *respCxt)
  2016. {
  2017. log_info("come into process direct query!");
  2018. #ifdef PRINT_STAT
  2019. mSTime = GET_TIMESTAMP();
  2020. #endif
  2021. int ret;
  2022. std::vector<QueryCond> primaryKeyConds;
  2023. ret = analyse_primary_key_conds(reqCxt, primaryKeyConds);
  2024. #if 0
  2025. std::vector<QueryCond>::iterator iter = primaryKeyConds.begin();
  2026. for (; iter != primaryKeyConds.end(); ++iter){
  2027. std::vector<int> fieldTypes;
  2028. fieldTypes.push_back(DField::Signed);
  2029. std::vector<std::string> fieldValues;
  2030. int ipos = iter->sCondValue.find_last_of("#");
  2031. std::string stemp = iter->sCondValue.substr(ipos + 1);
  2032. key_format::Decode(stemp , fieldTypes , fieldValues);
  2033. log_error("field index:%d , condopr:%d , condvalue:%s" ,
  2034. iter->sFieldIndex ,
  2035. iter->sCondOpr ,
  2036. fieldValues[0].c_str());
  2037. }
  2038. #endif
  2039. if (ret != 0)
  2040. {
  2041. log_error("query condition incorrect in query context!");
  2042. respCxt->sRowNums = -EC_ERROR_BASE;
  2043. return -1;
  2044. }
  2045. // prefix key
  2046. std::string prefixKey = primaryKeyConds[0].sCondValue;
  2047. if (prefixKey.empty())
  2048. {
  2049. log_error("dtc primary key can not be empty!");
  2050. respCxt->sRowNums = -EC_ERROR_BASE;
  2051. return -1;
  2052. }
  2053. if (mKeyfield_types[0] == DField::String)
  2054. std::transform(prefixKey.begin(), prefixKey.end(), prefixKey.begin(), ::tolower);
  2055. // encode the key to rocksdb format
  2056. std::string fullKey = std::move(key_format::encode_bytes(prefixKey));
  2057. std::string encodePreKey = fullKey;
  2058. int totalRows = 0;
  2059. std::string value;
  2060. RocksDBConn::RocksItr_t rocksItr;
  2061. bool forwardDirection = (primaryKeyConds[0].sCondOpr == (uint8_t)CondOpr::eEQ ||
  2062. primaryKeyConds[0].sCondOpr == (uint8_t)CondOpr::eGT ||
  2063. primaryKeyConds[0].sCondOpr == (uint8_t)CondOpr::eGE);
  2064. bool backwardEqual = primaryKeyConds[0].sCondOpr == (uint8_t)CondOpr::eLE;
  2065. log_debug("forwardDirection:%d , backwardEqual:%d", (int)forwardDirection , (int)backwardEqual);
  2066. if (backwardEqual)
  2067. {
  2068. // if the query condtion is < || <=, use seek_for_prev to seek in the total_order_seek mode
  2069. // will not use the prefix seek features, eg:
  2070. // 1. we have the flowing union key in the rocks (101,xx), (102,xx), (103,xx)
  2071. // 2. we use total_order_seek features for ranging query with primary key '102', and this
  2072. // lead the rocksdb doesn't use prefix_extractor to match the prefix key, so it use the
  2073. // entire key for comparing, and seek_for_prev will stop in the last key that <= the
  2074. // target key, so the iterator point to the key '(101, xx)', that's not what we want,
  2075. // wo need it point to the '(102,xx)'
  2076. // do primary key equal query first in this section
  2077. ret = mDBConn->retrieve_start(fullKey, value, rocksItr, true);
  2078. if (ret < 0)
  2079. {
  2080. log_error("query rocksdb failed! key:%s, ret:%d", prefixKey.c_str(), ret);
  2081. respCxt->sRowNums = -EC_ERROR_BASE;
  2082. mDBConn->retrieve_end(rocksItr);
  2083. return -1;
  2084. }
  2085. else if (ret == 0)
  2086. {
  2087. while (true)
  2088. {
  2089. ret = key_matched(encodePreKey, fullKey);
  2090. if (ret != 0)
  2091. {
  2092. // prefix key not matched, reach to the end
  2093. break;
  2094. }
  2095. // save row
  2096. ret = save_direct_row(prefixKey, fullKey, value, reqCxt, respCxt, totalRows);
  2097. if (ret < 0)
  2098. {
  2099. // ignore the incorrect key and keep going
  2100. log_error("save row failed! key:%s, value:%s", fullKey.c_str(), value.c_str());
  2101. }
  2102. else if (ret == 1)
  2103. break;
  2104. // move iterator to the next key
  2105. ret = mDBConn->next_entry(rocksItr, fullKey, value);
  2106. if (ret < 0)
  2107. {
  2108. log_error("iterate rocksdb failed! key:%s", prefixKey.c_str());
  2109. respCxt->sRowNums = -EC_ERROR_BASE;
  2110. mDBConn->retrieve_end(rocksItr);
  2111. return -1;
  2112. }
  2113. else if (ret == 1)
  2114. {
  2115. // reach to the storage end
  2116. break;
  2117. }
  2118. // has remaining value in rocksdb
  2119. }
  2120. }
  2121. }
  2122. // range query in the following section
  2123. ret = mDBConn->retrieve_start(fullKey, value, rocksItr, false, forwardDirection);
  2124. if (ret < 0)
  2125. {
  2126. log_error("query rocksdb failed! key:%s, ret:%d", prefixKey.c_str(), ret);
  2127. respCxt->sRowNums = -EC_ERROR_BASE;
  2128. mDBConn->retrieve_end(rocksItr);
  2129. return -1;
  2130. }
  2131. else if (ret == 1)
  2132. {
  2133. // not found the key
  2134. log_info("no matched key:%s", prefixKey.c_str());
  2135. respCxt->sRowNums = 0;
  2136. mDBConn->retrieve_end(rocksItr);
  2137. return 0;
  2138. }
  2139. // iterate the matched prefix key and find out the real one from start to end
  2140. while (true)
  2141. {
  2142. ret = range_key_matched(fullKey, primaryKeyConds);
  2143. #if 0
  2144. std::vector<std::string> rocksValues;
  2145. std::vector<int> fieldTypes;
  2146. fieldTypes.push_back(DField::String);
  2147. fieldTypes.push_back(DField::String);
  2148. fieldTypes.push_back(DField::Signed);
  2149. fieldTypes.push_back(DField::Signed);
  2150. fieldTypes.push_back(DField::Signed);
  2151. key_format::Decode(fullKey, fieldTypes, rocksValues);
  2152. for (int i = 0; i < rocksValues[0].length(); i++){
  2153. log_error("No:%d, is %d \n" , i , (int)rocksValues[0][i]);
  2154. }
  2155. int ipos = rocksValues[0].find_last_of("#");
  2156. std::string stemp = rocksValues[0].substr(ipos + 1);
  2157. std::vector<std::string> rocksValues001;
  2158. std::vector<int> fieldTypes001;
  2159. fieldTypes001.push_back(DField::Signed);
  2160. key_format::Decode(stemp , fieldTypes001 , rocksValues001);
  2161. log_error("primary value:%s", rocksValues001[0].c_str());
  2162. for (size_t i = 0; i < rocksValues.size(); i++)
  2163. {
  2164. log_error("value:%s", rocksValues[i].c_str() );
  2165. }
  2166. #endif
  2167. if (ret == -1)
  2168. {
  2169. // prefix key not matched, reach to the end
  2170. break;
  2171. }
  2172. if (ret == 0)
  2173. {
  2174. // save row
  2175. ret = save_direct_row(prefixKey, fullKey, value, reqCxt, respCxt, totalRows);
  2176. if (ret < 0)
  2177. {
  2178. // ignore the incorrect key and keep going
  2179. log_error("save row failed! key:%s, value:%s", fullKey.c_str(), value.c_str());
  2180. }
  2181. else if (ret == 1)
  2182. break;
  2183. }
  2184. // move iterator to the next key
  2185. if (forwardDirection)
  2186. {
  2187. ret = mDBConn->next_entry(rocksItr, fullKey, value);
  2188. }
  2189. else
  2190. {
  2191. ret = mDBConn->prev_entry(rocksItr, fullKey, value);
  2192. }
  2193. if (ret < 0)
  2194. {
  2195. log_error("iterate rocksdb failed! key:%s", prefixKey.c_str());
  2196. respCxt->sRowNums = 0;
  2197. mDBConn->retrieve_end(rocksItr);
  2198. return -1;
  2199. }
  2200. else if (ret == 1)
  2201. {
  2202. // reach to the storage end
  2203. break;
  2204. }
  2205. // has remaining value in rocksdb
  2206. }
  2207. // generate response rows in order by container
  2208. if (mOrderByUnit)
  2209. {
  2210. OrderByUnitElement element;
  2211. int start = ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sLimitCond.sLimitStart >= 0 && ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sLimitCond.sLimitStep > 0 ?
  2212. ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sLimitCond.sLimitStart : 0;
  2213. while (true)
  2214. {
  2215. ret = mOrderByUnit->get_row(element);
  2216. if (0 == ret)
  2217. {
  2218. delete mOrderByUnit;
  2219. mOrderByUnit = NULL;
  2220. break;
  2221. }
  2222. if (start != 0)
  2223. {
  2224. start--;
  2225. continue;
  2226. }
  2227. build_direct_row(element.mRocksKeys, element.mRocksValues, respCxt);
  2228. }
  2229. }
  2230. respCxt->sRowNums = ((RangeQueryRows_t*)respCxt->sDirectRespValue.uRangeQueryRows)->sRowValues.size();
  2231. mDBConn->retrieve_end(rocksItr);
  2232. #ifdef PRINT_STAT
  2233. mETime = GET_TIMESTAMP();
  2234. insert_stat(OprType::eDirectQuery, mETime - mSTime);
  2235. log_debug("cost time: %lld", (long long int)(mETime - mSTime));
  2236. #endif
  2237. return (0);
  2238. }
  2239. int RocksdbProcess::TriggerReplication(
  2240. const std::string& masterIp,
  2241. int masterPort)
  2242. {
  2243. return mReplUnit->startSlaveReplication(masterIp, masterPort);
  2244. }
  2245. int RocksdbProcess::QueryReplicationState()
  2246. {
  2247. return mReplUnit->getReplicationState();
  2248. }
  2249. int RocksdbProcess::encode_dtc_key(
  2250. const std::string &oKey,
  2251. std::string &codedKey)
  2252. {
  2253. int keyLen = oKey.length();
  2254. static const int maxLen = 10240;
  2255. assert(sizeof(int) + keyLen <= maxLen);
  2256. static thread_local char keyBuff[maxLen];
  2257. char *pos = keyBuff;
  2258. *(int *)pos = keyLen;
  2259. pos += sizeof(int);
  2260. memcpy((void *)pos, (void *)oKey.data(), keyLen);
  2261. codedKey.assign(keyBuff, keyLen + sizeof(int));
  2262. return 0;
  2263. }
  2264. int RocksdbProcess::decode_keys(
  2265. const std::string &compoundKey,
  2266. std::vector<std::string> &keys)
  2267. {
  2268. int ret;
  2269. std::string keyField;
  2270. char *head = const_cast<char *>(compoundKey.data());
  2271. // decode dtckey first
  2272. int keyLen = *(int *)head;
  2273. head += sizeof(int);
  2274. keyField.assign(head, keyLen);
  2275. head += keyLen;
  2276. keys.push_back(std::move(keyField));
  2277. // decode other key fields
  2278. for (int idx = 1; idx < mCompoundKeyFieldNums; idx++)
  2279. {
  2280. ret = get_value_by_id(head, mFieldIndexMapping[idx], keyField);
  2281. assert(ret == 0);
  2282. keys.push_back(std::move(keyField));
  2283. }
  2284. return 0;
  2285. }
  2286. int RocksdbProcess::encode_rocks_key(
  2287. const std::vector<std::string> &keys,
  2288. std::string &rocksKey)
  2289. {
  2290. assert(keys.size() == mCompoundKeyFieldNums);
  2291. // evaluate space
  2292. static int align = 1 << 12;
  2293. int valueLen = 0, fid, fsize;
  2294. int totalLen = align;
  2295. char *valueBuff = (char *)malloc(totalLen);
  2296. // encode key first
  2297. int keyLen = keys[0].length();
  2298. *(int *)valueBuff = keyLen;
  2299. valueLen += sizeof(int);
  2300. memcpy((void *)valueBuff, (void *)keys[0].data(), keyLen);
  2301. valueLen += keyLen;
  2302. for (size_t idx = 1; idx < mCompoundKeyFieldNums; idx++)
  2303. {
  2304. fid = mFieldIndexMapping[idx];
  2305. switch (mTableDef->field_type(fid))
  2306. {
  2307. case DField::Signed:
  2308. {
  2309. fsize = mTableDef->field_size(fid);
  2310. if (fsize > sizeof(int32_t))
  2311. {
  2312. if (valueLen + sizeof(int64_t) > totalLen)
  2313. {
  2314. // expand buff
  2315. totalLen = (valueLen + sizeof(int64_t) + align - 1) & -align;
  2316. valueBuff = expand_buff(totalLen, valueBuff);
  2317. if (!valueBuff)
  2318. return -1;
  2319. }
  2320. *(int64_t *)(valueBuff + valueLen) = strtoll(keys[idx].c_str(), NULL, 10);
  2321. valueLen += sizeof(int64_t);
  2322. }
  2323. else
  2324. {
  2325. if (valueLen + sizeof(int32_t) > totalLen)
  2326. {
  2327. // expand buff
  2328. totalLen = (valueLen + sizeof(int32_t) + align - 1) & -align;
  2329. valueBuff = expand_buff(totalLen, valueBuff);
  2330. if (!valueBuff)
  2331. return -1;
  2332. }
  2333. *(int32_t *)(valueBuff + valueLen) = strtol(keys[idx].c_str(), NULL, 10);
  2334. valueLen += sizeof(int32_t);
  2335. }
  2336. break;
  2337. }
  2338. case DField::Unsigned:
  2339. {
  2340. fsize = mTableDef->field_size(fid);
  2341. if (fsize > sizeof(uint32_t))
  2342. {
  2343. if (valueLen + sizeof(uint64_t) > totalLen)
  2344. {
  2345. // expand buff
  2346. totalLen = (valueLen + sizeof(uint64_t) + align - 1) & -align;
  2347. valueBuff = expand_buff(totalLen, valueBuff);
  2348. if (!valueBuff)
  2349. return -1;
  2350. }
  2351. *(uint64_t *)(valueBuff + valueLen) = strtoull(keys[idx].c_str(), NULL, 10);
  2352. valueLen += sizeof(uint64_t);
  2353. }
  2354. else
  2355. {
  2356. if (valueLen + sizeof(uint32_t) > totalLen)
  2357. {
  2358. // expand buff
  2359. totalLen = (valueLen + sizeof(uint32_t) + align - 1) & -align;
  2360. valueBuff = expand_buff(totalLen, valueBuff);
  2361. if (!valueBuff)
  2362. return -1;
  2363. }
  2364. *(uint32_t *)(valueBuff + valueLen) = strtoul(keys[idx].c_str(), NULL, 10);
  2365. valueLen += sizeof(uint32_t);
  2366. }
  2367. break;
  2368. }
  2369. case DField::Float:
  2370. {
  2371. fsize = mTableDef->field_size(fid);
  2372. if (fsize > sizeof(float))
  2373. {
  2374. if (valueLen + sizeof(double) > totalLen)
  2375. {
  2376. // expand buff
  2377. totalLen = (valueLen + sizeof(double) + align - 1) & -align;
  2378. valueBuff = expand_buff(totalLen, valueBuff);
  2379. if (!valueBuff)
  2380. return -1;
  2381. }
  2382. *(double *)(valueBuff + valueLen) = strtod(keys[idx].c_str(), NULL);
  2383. valueLen += sizeof(double);
  2384. }
  2385. else
  2386. {
  2387. if (valueLen + sizeof(float) > totalLen)
  2388. {
  2389. // expand buff
  2390. totalLen = (valueLen + sizeof(float) + align - 1) & -align;
  2391. valueBuff = expand_buff(totalLen, valueBuff);
  2392. if (!valueBuff)
  2393. return -1;
  2394. }
  2395. *(float *)(valueBuff + valueLen) = strtof(keys[idx].c_str(), NULL);
  2396. valueLen += sizeof(float);
  2397. }
  2398. break;
  2399. }
  2400. case DField::String:
  2401. case DField::Binary:
  2402. {
  2403. int len = keys[idx].length();
  2404. fsize = len + sizeof(int);
  2405. {
  2406. if (valueLen + fsize > totalLen)
  2407. {
  2408. // expand buff
  2409. totalLen = (valueLen + fsize + align - 1) & -align;
  2410. valueBuff = expand_buff(totalLen, valueBuff);
  2411. if (!valueBuff)
  2412. return -1;
  2413. }
  2414. *(int *)(valueBuff + valueLen) = len;
  2415. valueLen += sizeof(int);
  2416. if (len > 0)
  2417. memcpy((void *)(valueBuff + valueLen), (void *)keys[idx].data(), len);
  2418. valueLen += len;
  2419. }
  2420. break;
  2421. }
  2422. default:
  2423. log_error("unexpected field type! type:%d", mTableDef->field_type(fid));
  2424. return -1;
  2425. };
  2426. }
  2427. rocksKey.assign(valueBuff, valueLen);
  2428. free(valueBuff);
  2429. return 0;
  2430. }
  2431. // 1. convert string type key into lower case
  2432. // 2. create bit map for those been converted keys
  2433. void RocksdbProcess::encode_bitmap_keys(
  2434. std::vector<std::string> &keys,
  2435. std::string &keyBitmaps)
  2436. {
  2437. if (mNoBitmapKey)
  2438. return;
  2439. std::vector<char> keyLocationBitmap, keyCaseBitmap;
  2440. int8_t localBits = 0;
  2441. bool hasBeenConverted = false;
  2442. for (size_t idx = 0; idx < keys.size(); idx++)
  2443. {
  2444. switch (mKeyfield_types[idx])
  2445. {
  2446. default:
  2447. hasBeenConverted = false;
  2448. break;
  2449. case DField::String:
  2450. {
  2451. // maybe need convert
  2452. std::vector<char> currentKeyBitmap;
  2453. hasBeenConverted = convert_to_lower(keys[idx], currentKeyBitmap);
  2454. if (hasBeenConverted)
  2455. {
  2456. keyCaseBitmap.insert(keyCaseBitmap.end(),
  2457. std::make_move_iterator(currentKeyBitmap.begin()),
  2458. std::make_move_iterator(currentKeyBitmap.end()));
  2459. }
  2460. }
  2461. }
  2462. // record key location bitmap
  2463. if (hasBeenConverted)
  2464. {
  2465. int shift = BITS_OF_BYTE - 1 - 1 - idx % (BITS_OF_BYTE - 1);
  2466. localBits = (localBits >> shift | 1U) << shift;
  2467. }
  2468. // the last boundary bit in this section and has remaining keys, need to set the
  2469. // head bit for indicading
  2470. if ((idx + 1) % (BITS_OF_BYTE - 1) == 0 || idx == keys.size() - 1)
  2471. {
  2472. if (idx != keys.size() - 1)
  2473. localBits |= 128U;
  2474. keyLocationBitmap.push_back((char)localBits);
  2475. localBits = 0;
  2476. }
  2477. }
  2478. // shrink bits to buffer
  2479. keyBitmaps.append(
  2480. std::string(keyLocationBitmap.begin(), keyLocationBitmap.end()))
  2481. .append(
  2482. std::string(keyCaseBitmap.begin(), keyCaseBitmap.end()));
  2483. }
  2484. void RocksdbProcess::decodeBitmapKeys(
  2485. const std::string &rocksValue,
  2486. std::vector<std::string> &keys,
  2487. int &bitmapLen)
  2488. {
  2489. bitmapLen = 0;
  2490. if (mNoBitmapKey)
  2491. return;
  2492. int8_t sectionBits;
  2493. std::vector<char> keyLocationBitmap;
  2494. // decode key location bitmap
  2495. while (true)
  2496. {
  2497. sectionBits = rocksValue[bitmapLen];
  2498. keyLocationBitmap.push_back(sectionBits);
  2499. bitmapLen++;
  2500. if ((sectionBits & 0x80) == 0)
  2501. break;
  2502. }
  2503. int shift = 0;
  2504. for (size_t idx = 0; idx < keys.size(); idx++)
  2505. {
  2506. sectionBits = keyLocationBitmap[idx / (BITS_OF_BYTE - 1)];
  2507. shift = BITS_OF_BYTE - 1 - 1 - idx % (BITS_OF_BYTE - 1);
  2508. switch (mKeyfield_types[idx])
  2509. {
  2510. default:
  2511. assert((sectionBits >> shift & 1U) == 0);
  2512. break;
  2513. case DField::String:
  2514. {
  2515. if ((sectionBits >> shift & 1U) == 0)
  2516. {
  2517. // no need to do convert
  2518. }
  2519. else
  2520. {
  2521. // recovery the origin key
  2522. recover_to_upper(rocksValue, bitmapLen, keys[idx]);
  2523. }
  2524. }
  2525. }
  2526. }
  2527. }
  2528. int RocksdbProcess::get_key_bitmap_len(const std::string &rocksValue)
  2529. {
  2530. int bitmapLen = 0;
  2531. if (mNoBitmapKey)
  2532. return bitmapLen;
  2533. int8_t sectionBits;
  2534. std::deque<char> keyLocationBitmap;
  2535. // decode key location bitmap
  2536. while (true)
  2537. {
  2538. sectionBits = rocksValue[bitmapLen];
  2539. keyLocationBitmap.push_back(sectionBits);
  2540. bitmapLen++;
  2541. if ((sectionBits & 0x80) == 0)
  2542. break;
  2543. }
  2544. int shift = 0;
  2545. while (keyLocationBitmap.size() != 0)
  2546. {
  2547. sectionBits = keyLocationBitmap.front();
  2548. for (int8_t idx = 1; idx < BITS_OF_BYTE; idx++)
  2549. {
  2550. shift = BITS_OF_BYTE - 1 - idx;
  2551. if ((sectionBits >> shift & 1U) == 1)
  2552. {
  2553. // collect the key bitmap len
  2554. int8_t keyBits;
  2555. while (true)
  2556. {
  2557. keyBits = (int8_t)rocksValue[bitmapLen++];
  2558. if ((keyBits & 0x80) == 0)
  2559. break;
  2560. }
  2561. }
  2562. }
  2563. keyLocationBitmap.pop_front();
  2564. }
  2565. return bitmapLen;
  2566. }
  2567. bool RocksdbProcess::convert_to_lower(
  2568. std::string &key,
  2569. std::vector<char> &keyCaseBitmap)
  2570. {
  2571. bool hasConverted = false;
  2572. int8_t caseBits = 0;
  2573. char lowerBase = 'a' - 'A';
  2574. for (size_t idx = 0; idx < key.length(); idx++)
  2575. {
  2576. char &cv = key.at(idx);
  2577. if (cv >= 'A' && cv <= 'Z')
  2578. {
  2579. cv += lowerBase;
  2580. int shift = BITS_OF_BYTE - 1 - 1 - idx % (BITS_OF_BYTE - 1);
  2581. caseBits = (caseBits >> shift | 1U) << shift;
  2582. hasConverted = true;
  2583. }
  2584. if ((idx + 1) % (BITS_OF_BYTE - 1) == 0 || idx == key.length() - 1)
  2585. {
  2586. if (idx != key.length() - 1)
  2587. caseBits |= 128U;
  2588. keyCaseBitmap.push_back((char)caseBits);
  2589. caseBits = 0;
  2590. }
  2591. }
  2592. return hasConverted;
  2593. }
  2594. void RocksdbProcess::recover_to_upper(
  2595. const std::string &rocksValue,
  2596. int &bitmapLen,
  2597. std::string &key)
  2598. {
  2599. int shift;
  2600. int kIdx = 0;
  2601. bool hasRemaining = true;
  2602. char upperBase = 'a' - 'A';
  2603. int8_t sectionBits;
  2604. do
  2605. {
  2606. sectionBits = rocksValue[bitmapLen];
  2607. shift = BITS_OF_BYTE - 1 - 1 - kIdx % (BITS_OF_BYTE - 1);
  2608. if (sectionBits >> shift & 1U)
  2609. {
  2610. // convert to upper mode
  2611. char &cc = key[kIdx];
  2612. assert(cc >= 'a' && cc <= 'z');
  2613. cc -= upperBase;
  2614. }
  2615. kIdx++;
  2616. if (kIdx % (BITS_OF_BYTE - 1) == 0)
  2617. {
  2618. bitmapLen++;
  2619. hasRemaining = (sectionBits & 0x80) != 0;
  2620. }
  2621. } while (hasRemaining);
  2622. }
  2623. int RocksdbProcess::shrink_value(
  2624. const std::vector<std::string> &values,
  2625. std::string &rocksValue)
  2626. {
  2627. assert(values.size() == mExtraValueFieldNums);
  2628. // evaluate space
  2629. static int align = 1 << 12;
  2630. int valueLen = 0, fid, fsize;
  2631. int totalLen = align;
  2632. char *valueBuff = (char *)malloc(totalLen);
  2633. for (size_t idx = 0; idx < mExtraValueFieldNums; idx++)
  2634. {
  2635. fid = mFieldIndexMapping[mCompoundKeyFieldNums + idx];
  2636. switch (mTableDef->field_type(fid))
  2637. {
  2638. case DField::Signed:
  2639. {
  2640. fsize = mTableDef->field_size(fid);
  2641. if (fsize > sizeof(int32_t))
  2642. {
  2643. if (valueLen + sizeof(int64_t) > totalLen)
  2644. {
  2645. // expand buff
  2646. totalLen = (valueLen + sizeof(int64_t) + align - 1) & -align;
  2647. valueBuff = expand_buff(totalLen, valueBuff);
  2648. if (!valueBuff)
  2649. return -1;
  2650. }
  2651. *(int64_t *)(valueBuff + valueLen) = strtoll(values[idx].c_str(), NULL, 10);
  2652. valueLen += sizeof(int64_t);
  2653. }
  2654. else
  2655. {
  2656. if (valueLen + sizeof(int32_t) > totalLen)
  2657. {
  2658. // expand buff
  2659. totalLen = (valueLen + sizeof(int32_t) + align - 1) & -align;
  2660. valueBuff = expand_buff(totalLen, valueBuff);
  2661. if (!valueBuff)
  2662. return -1;
  2663. }
  2664. *(int32_t *)(valueBuff + valueLen) = strtol(values[idx].c_str(), NULL, 10);
  2665. valueLen += sizeof(int32_t);
  2666. }
  2667. break;
  2668. }
  2669. case DField::Unsigned:
  2670. {
  2671. fsize = mTableDef->field_size(fid);
  2672. if (fsize > sizeof(uint32_t))
  2673. {
  2674. if (valueLen + sizeof(uint64_t) > totalLen)
  2675. {
  2676. // expand buff
  2677. totalLen = (valueLen + sizeof(uint64_t) + align - 1) & -align;
  2678. valueBuff = expand_buff(totalLen, valueBuff);
  2679. if (!valueBuff)
  2680. return -1;
  2681. }
  2682. *(uint64_t *)(valueBuff + valueLen) = strtoull(values[idx].c_str(), NULL, 10);
  2683. valueLen += sizeof(uint64_t);
  2684. }
  2685. else
  2686. {
  2687. if (valueLen + sizeof(uint32_t) > totalLen)
  2688. {
  2689. // expand buff
  2690. totalLen = (valueLen + sizeof(uint32_t) + align - 1) & -align;
  2691. valueBuff = expand_buff(totalLen, valueBuff);
  2692. if (!valueBuff)
  2693. return -1;
  2694. }
  2695. *(uint32_t *)(valueBuff + valueLen) = strtoul(values[idx].c_str(), NULL, 10);
  2696. valueLen += sizeof(uint32_t);
  2697. }
  2698. break;
  2699. }
  2700. case DField::Float:
  2701. {
  2702. fsize = mTableDef->field_size(fid);
  2703. if (fsize > sizeof(float))
  2704. {
  2705. if (valueLen + sizeof(double) > totalLen)
  2706. {
  2707. // expand buff
  2708. totalLen = (valueLen + sizeof(double) + align - 1) & -align;
  2709. valueBuff = expand_buff(totalLen, valueBuff);
  2710. if (!valueBuff)
  2711. return -1;
  2712. }
  2713. *(double *)(valueBuff + valueLen) = strtod(values[idx].c_str(), NULL);
  2714. valueLen += sizeof(double);
  2715. }
  2716. else
  2717. {
  2718. if (valueLen + sizeof(float) > totalLen)
  2719. {
  2720. // expand buff
  2721. totalLen = (valueLen + sizeof(float) + align - 1) & -align;
  2722. valueBuff = expand_buff(totalLen, valueBuff);
  2723. if (!valueBuff)
  2724. return -1;
  2725. }
  2726. *(float *)(valueBuff + valueLen) = strtof(values[idx].c_str(), NULL);
  2727. valueLen += sizeof(float);
  2728. }
  2729. break;
  2730. }
  2731. case DField::String:
  2732. case DField::Binary:
  2733. {
  2734. int len = values[idx].length();
  2735. fsize = len + sizeof(int);
  2736. {
  2737. if (valueLen + fsize > totalLen)
  2738. {
  2739. // expand buff
  2740. totalLen = (valueLen + fsize + align - 1) & -align;
  2741. valueBuff = expand_buff(totalLen, valueBuff);
  2742. if (!valueBuff)
  2743. return -1;
  2744. }
  2745. *(int *)(valueBuff + valueLen) = len;
  2746. valueLen += sizeof(int);
  2747. if (len > 0)
  2748. memcpy((void *)(valueBuff + valueLen), (void *)values[idx].data(), len);
  2749. valueLen += len;
  2750. }
  2751. break;
  2752. }
  2753. default:
  2754. log_error("unexpected field type! type:%d", mTableDef->field_type(fid));
  2755. return -1;
  2756. };
  2757. }
  2758. rocksValue.assign(valueBuff, valueLen);
  2759. free(valueBuff);
  2760. return 0;
  2761. }
  2762. int RocksdbProcess::split_values(
  2763. const std::string &compoundValue,
  2764. std::vector<std::string> &values)
  2765. {
  2766. int ret;
  2767. std::string value;
  2768. char *head = const_cast<char *>(compoundValue.data());
  2769. for (int idx = 0; idx < mExtraValueFieldNums; idx++)
  2770. {
  2771. //if(idx == mExtraValueFieldNums-1){ // extend field no need to parse
  2772. // values.push_back("");
  2773. // break;
  2774. //}
  2775. ret = get_value_by_id(head, mFieldIndexMapping[mCompoundKeyFieldNums + idx], value);
  2776. assert(ret == 0);
  2777. values.push_back(std::move(value));
  2778. }
  2779. return 0;
  2780. }
  2781. // translate dtcfid to rocksfid
  2782. int RocksdbProcess::translate_field_idx(int dtcfid)
  2783. {
  2784. for (size_t idx = 0; idx < mFieldIndexMapping.size(); idx++)
  2785. {
  2786. if (mFieldIndexMapping[idx] == dtcfid)
  2787. return idx;
  2788. }
  2789. return -1;
  2790. }
  2791. int RocksdbProcess::get_value_by_id(
  2792. char *&valueHead,
  2793. int fieldId,
  2794. std::string &fieldValue)
  2795. {
  2796. assert(valueHead);
  2797. // evaluate space
  2798. int fsize;
  2799. int fieldType = mTableDef->field_type(fieldId);
  2800. switch (fieldType)
  2801. {
  2802. case DField::Signed:
  2803. {
  2804. fsize = mTableDef->field_size(fieldId);
  2805. if (fsize > sizeof(int32_t))
  2806. {
  2807. int64_t value = *(int64_t *)(valueHead);
  2808. valueHead += sizeof(int64_t);
  2809. fieldValue = std::move(std::to_string(value));
  2810. }
  2811. else
  2812. {
  2813. int32_t value = *(int32_t *)(valueHead);
  2814. valueHead += sizeof(int32_t);
  2815. fieldValue = std::move(std::to_string(value));
  2816. }
  2817. break;
  2818. }
  2819. case DField::Unsigned:
  2820. {
  2821. fsize = mTableDef->field_size(fieldId);
  2822. if (fsize > sizeof(uint32_t))
  2823. {
  2824. uint64_t value = *(uint64_t *)(valueHead);
  2825. valueHead += sizeof(uint64_t);
  2826. fieldValue = std::move(std::to_string(value));
  2827. }
  2828. else
  2829. {
  2830. uint32_t value = *(uint32_t *)(valueHead);
  2831. valueHead += sizeof(uint32_t);
  2832. fieldValue = std::move(std::to_string(value));
  2833. }
  2834. break;
  2835. }
  2836. case DField::Float:
  2837. {
  2838. fsize = mTableDef->field_size(fieldId);
  2839. if (fsize <= sizeof(float))
  2840. {
  2841. float value = *(float *)(valueHead);
  2842. valueHead += sizeof(float);
  2843. fieldValue = std::move(std::to_string(value));
  2844. }
  2845. else
  2846. {
  2847. double value = *(double *)(valueHead);
  2848. valueHead += sizeof(double);
  2849. fieldValue = std::move(std::to_string(value));
  2850. }
  2851. break;
  2852. }
  2853. case DField::String:
  2854. case DField::Binary:
  2855. {
  2856. int len;
  2857. {
  2858. len = *(int *)(valueHead);
  2859. valueHead += sizeof(int);
  2860. fieldValue = std::move(std::string(valueHead, len));
  2861. valueHead += len;
  2862. }
  2863. break;
  2864. }
  2865. default:
  2866. log_error("unexpected field type! type:%d", fieldType);
  2867. return -1;
  2868. };
  2869. return 0;
  2870. }
  2871. char *RocksdbProcess::expand_buff(int len, char *oldPtr)
  2872. {
  2873. char *newPtr = (char *)realloc((void *)oldPtr, len);
  2874. if (!newPtr)
  2875. {
  2876. log_error("realloc memory failed!");
  2877. free(oldPtr);
  2878. }
  2879. return newPtr;
  2880. }
  2881. // check two rocksdb key whether equal or not
  2882. int RocksdbProcess::rocks_key_matched(const std::string &rocksKey1, const std::string &rocksKey2)
  2883. {
  2884. return rocksKey1.compare(rocksKey2);
  2885. }
  2886. // check whether the key in the query conditon range matched or not
  2887. // 1 : in the range but not matched
  2888. // 0: key matched
  2889. // -1: in the out of the range
  2890. int RocksdbProcess::range_key_matched(
  2891. const std::string &rocksKey,
  2892. const std::vector<QueryCond> &keyConds)
  2893. {
  2894. std::string primaryKey;
  2895. int fieldType = mKeyfield_types[0];
  2896. key_format::decode_primary_key(rocksKey, fieldType, primaryKey);
  2897. int ret;
  2898. for (size_t idx = 0; idx < keyConds.size(); idx++)
  2899. {
  2900. ret = condition_filter(primaryKey, keyConds[idx].sCondValue, fieldType, keyConds[idx].sCondOpr);
  2901. if (ret != 0)
  2902. {
  2903. // check boundary value
  2904. switch (keyConds[idx].sCondOpr)
  2905. {
  2906. /* enum {
  2907. EQ = 0,
  2908. NE = 1,
  2909. LT = 2,
  2910. LE = 3,
  2911. GT = 4,
  2912. GE = 5,
  2913. }; */
  2914. case 0:
  2915. case 1: // not support now
  2916. case 3:
  2917. case 5:
  2918. return -1;
  2919. case 2:
  2920. case 4:
  2921. return primaryKey.compare(keyConds[idx].sCondValue) == 0 ? 1 : -1;
  2922. default:
  2923. log_error("unsupport condition:%d", keyConds[idx].sCondOpr);
  2924. }
  2925. }
  2926. }
  2927. return 0;
  2928. }
  2929. int RocksdbProcess::analyse_primary_key_conds(
  2930. DirectRequestContext *reqCxt,
  2931. std::vector<QueryCond> &primaryKeyConds)
  2932. {
  2933. std::vector<QueryCond>& queryConds = ((RangeQuery_t*)reqCxt->sPacketValue.uRangeQuery)->sFieldConds;
  2934. auto itr = queryConds.begin();
  2935. while (itr != queryConds.end())
  2936. {
  2937. if (itr->sFieldIndex == 0)
  2938. {
  2939. switch ((CondOpr)itr->sCondOpr)
  2940. {
  2941. case CondOpr::eEQ:
  2942. case CondOpr::eLT:
  2943. case CondOpr::eLE:
  2944. case CondOpr::eGT:
  2945. case CondOpr::eGE:
  2946. break;
  2947. case CondOpr::eNE:
  2948. default:
  2949. log_error("unsupport query expression now! condExpr:%d", itr->sCondOpr);
  2950. return -1;
  2951. }
  2952. primaryKeyConds.push_back(*itr);
  2953. itr = queryConds.erase(itr);
  2954. }
  2955. else
  2956. {
  2957. itr++;
  2958. }
  2959. }
  2960. if (primaryKeyConds.size() <= 0)
  2961. {
  2962. log_error("no explicit primary key in query context!");
  2963. return -1;
  2964. }
  2965. return 0;
  2966. }
  2967. void RocksdbProcess::init_title(int group, int role)
  2968. {
  2969. titlePrefixSize = snprintf(name, sizeof(name), "helper%d%c", group, MACHINEROLESTRING[role]);
  2970. memcpy(title, name, titlePrefixSize);
  2971. title[titlePrefixSize++] = ':';
  2972. title[titlePrefixSize++] = ' ';
  2973. title[titlePrefixSize] = '\0';
  2974. title[sizeof(title) - 1] = '\0';
  2975. }
  2976. void RocksdbProcess::set_title(const char *status)
  2977. {
  2978. strncpy(title + titlePrefixSize, status, sizeof(title) - 1 - titlePrefixSize);
  2979. set_proc_title(title);
  2980. }
  2981. int RocksdbProcess::process_reload_config(DTCTask *Task)
  2982. {
  2983. const char *keyStr = gConfig->get_str_val("cache", "CacheShmKey");
  2984. int cacheKey = 0;
  2985. if (keyStr == NULL)
  2986. {
  2987. cacheKey = 0;
  2988. log_notice("CacheShmKey not set!");
  2989. return -1;
  2990. }
  2991. else if (!strcasecmp(keyStr, "none"))
  2992. {
  2993. log_crit("CacheShmKey set to NONE, Cache disabled");
  2994. return -1;
  2995. }
  2996. else if (isdigit(keyStr[0]))
  2997. {
  2998. cacheKey = strtol(keyStr, NULL, 0);
  2999. }
  3000. else
  3001. {
  3002. log_crit("Invalid CacheShmKey value \"%s\"", keyStr);
  3003. return -1;
  3004. }
  3005. CacheInfo stInfo;
  3006. DTCBufferPool bufPool;
  3007. memset(&stInfo, 0, sizeof(stInfo));
  3008. stInfo.ipcMemKey = cacheKey;
  3009. stInfo.keySize = TableDefinitionManager::Instance()->get_cur_table_def()->key_format();
  3010. stInfo.readOnly = 1;
  3011. if (bufPool.cache_open(&stInfo))
  3012. {
  3013. log_error("%s", bufPool.Error());
  3014. Task->set_error(-EC_RELOAD_CONFIG_FAILED, __FUNCTION__, "open cache error!");
  3015. return -1;
  3016. }
  3017. bufPool.reload_table();
  3018. log_error("cmd notify work helper reload table, tableIdx : [%d], pid : [%d]", bufPool.shm_table_idx(), getpid());
  3019. return 0;
  3020. }
  3021. void RocksdbProcess::insert_stat(
  3022. RocksdbProcess::OprType oprType,
  3023. int64_t timeElapse)
  3024. {
  3025. assert(oprType >= OprType::eInsert && oprType < OprType::eDelete);
  3026. int opr = (int)oprType;
  3027. if (timeElapse < 1000)
  3028. mOprTimeCost[opr][(int)TimeZone::eStatLevel0]++;
  3029. else if (timeElapse < 2000)
  3030. mOprTimeCost[opr][(int)TimeZone::eStatLevel1]++;
  3031. else if (timeElapse < 3000)
  3032. mOprTimeCost[opr][(int)TimeZone::eStatLevel2]++;
  3033. else if (timeElapse < 4000)
  3034. mOprTimeCost[opr][(int)TimeZone::eStatLevel3]++;
  3035. else if (timeElapse < 5000)
  3036. mOprTimeCost[opr][(int)TimeZone::eStatLevel4]++;
  3037. else
  3038. mOprTimeCost[opr][(int)TimeZone::eStatLevel5]++;
  3039. mTotalOpr++;
  3040. if (mTotalOpr % 10000 == 0)
  3041. print_stat_info();
  3042. return;
  3043. }
  3044. void RocksdbProcess::print_stat_info()
  3045. {
  3046. int totalNum;
  3047. std::stringstream ss;
  3048. ss << "time cost per opr:\n";
  3049. ss << "totalOpr:" << mTotalOpr << "\n";
  3050. for (unsigned char idx0 = 0; idx0 <= (unsigned char)OprType::eQuery; idx0++)
  3051. {
  3052. switch ((OprType)idx0)
  3053. {
  3054. case OprType::eInsert:
  3055. {
  3056. totalNum = 0;
  3057. ss << "Insert:[";
  3058. for (unsigned char idx1 = 0; idx1 < (unsigned char)TimeZone::eStatMax; idx1++)
  3059. {
  3060. ss << mOprTimeCost[idx0][idx1];
  3061. if (idx1 != (unsigned char)TimeZone::eStatMax - 1)
  3062. ss << ", ";
  3063. totalNum += mOprTimeCost[idx0][idx1];
  3064. }
  3065. ss << "] total:" << totalNum << "\n";
  3066. break;
  3067. }
  3068. case OprType::eUpdate:
  3069. {
  3070. totalNum = 0;
  3071. ss << "Update:[";
  3072. for (unsigned char idx1 = 0; idx1 < (unsigned char)TimeZone::eStatMax; idx1++)
  3073. {
  3074. ss << mOprTimeCost[idx0][idx1];
  3075. if (idx1 != (unsigned char)TimeZone::eStatMax - 1)
  3076. ss << ", ";
  3077. totalNum += mOprTimeCost[idx0][idx1];
  3078. }
  3079. ss << "] total:" << totalNum << "\n";
  3080. break;
  3081. }
  3082. case OprType::eDirectQuery:
  3083. {
  3084. totalNum = 0;
  3085. ss << "DirectQuery:[";
  3086. for (unsigned char idx1 = 0; idx1 < (unsigned char)TimeZone::eStatMax; idx1++)
  3087. {
  3088. ss << mOprTimeCost[idx0][idx1];
  3089. if (idx1 != (unsigned char)TimeZone::eStatMax - 1)
  3090. ss << ", ";
  3091. totalNum += mOprTimeCost[idx0][idx1];
  3092. }
  3093. ss << "] total:" << totalNum << "\n";
  3094. break;
  3095. }
  3096. case OprType::eQuery:
  3097. {
  3098. totalNum = 0;
  3099. ss << "Query:[";
  3100. for (unsigned char idx1 = 0; idx1 < (unsigned char)TimeZone::eStatMax; idx1++)
  3101. {
  3102. ss << mOprTimeCost[idx0][idx1];
  3103. if (idx1 != (unsigned char)TimeZone::eStatMax - 1)
  3104. ss << ", ";
  3105. totalNum += mOprTimeCost[idx0][idx1];
  3106. }
  3107. ss << "] total:" << totalNum << "\n";
  3108. break;
  3109. }
  3110. case OprType::eReplace:
  3111. {
  3112. totalNum = 0;
  3113. ss << "Replace:[";
  3114. for (unsigned char idx1 = 0; idx1 < (unsigned char)TimeZone::eStatMax; idx1++)
  3115. {
  3116. ss << mOprTimeCost[idx0][idx1];
  3117. if (idx1 != (unsigned char)TimeZone::eStatMax - 1)
  3118. ss << ", ";
  3119. totalNum += mOprTimeCost[idx0][idx1];
  3120. }
  3121. ss << "] total:" << totalNum << "\n";
  3122. break;
  3123. }
  3124. case OprType::eDelete:
  3125. {
  3126. totalNum = 0;
  3127. ss << "Delete:[";
  3128. for (unsigned char idx1 = 0; idx1 < (unsigned char)TimeZone::eStatMax; idx1++)
  3129. {
  3130. ss << mOprTimeCost[idx0][idx1];
  3131. if (idx1 != (unsigned char)TimeZone::eStatMax - 1)
  3132. ss << ", ";
  3133. totalNum += mOprTimeCost[idx0][idx1];
  3134. }
  3135. ss << "] total:" << totalNum << "\n";
  3136. break;
  3137. }
  3138. }
  3139. }
  3140. log_error("%s", ss.str().c_str());
  3141. return;
  3142. }
  3143. // use for debuging
  3144. int RocksdbProcess::decodeInternalKV(
  3145. const std::string encodeKey,
  3146. const std::string encodeVal,
  3147. std::string& decodeKeys,
  3148. std::string& decodeVals)
  3149. {
  3150. int ret;
  3151. // decode the compoundKey and check whether it is matched
  3152. std::vector<std::string> keys;
  3153. key_format::Decode(encodeKey, mKeyfield_types, keys);
  3154. if ( keys.size() != mCompoundKeyFieldNums )
  3155. {
  3156. log_error("unmatched row, fullKey:%s, keyNum:%lu, definitionFieldNum:%d",
  3157. encodeKey.c_str(), keys.size(), mCompoundKeyFieldNums);
  3158. return -1;
  3159. }
  3160. // decode key bitmap
  3161. int bitmapLen = 0;
  3162. decodeBitmapKeys(encodeVal, keys, bitmapLen);
  3163. std::string fieldValue;
  3164. char *valueHead = const_cast<char*>(encodeVal.data()) + bitmapLen;
  3165. for (size_t idx = 0; idx < mFieldIndexMapping.size(); idx++)
  3166. {
  3167. int fieldId = mFieldIndexMapping[idx];
  3168. if ( idx < mCompoundKeyFieldNums )
  3169. {
  3170. fieldValue = keys[idx];
  3171. decodeKeys.append(fieldValue);
  3172. if (idx != mCompoundKeyFieldNums - 1) decodeKeys.append(",");
  3173. }
  3174. else
  3175. {
  3176. ret = get_value_by_id(valueHead, fieldId, fieldValue);
  3177. if ( ret != 0 )
  3178. {
  3179. log_error("parse field value failed! compoundValue:%s", encodeVal.c_str());
  3180. return -1;
  3181. }
  3182. decodeVals.append(fieldValue);
  3183. if (idx != mFieldIndexMapping.size() - 1) decodeVals.append(",");
  3184. }
  3185. }
  3186. return(0);
  3187. }