rocksdb_replication.cc 23 KB


  1. //////////////////////////////////////////////////////////////////////
  2. //
  3. //
  4. // replication unit under Master-slave Architechture
  5. //
  6. //////////////////////////////////////////////////////////////////////
  7. #include "rocksdb_replication.h"
  8. #include <dbconfig.h>
  9. #include "log.h"
  10. #include "common/poll_thread.h"
  11. #include "common/dtcutils.h"
  12. #include "rocksdb_replication_util.h"
  13. #include "rocksdb_direct_context.h"
  14. #include "db_process_rocks.h"
  15. #define REPLICATION_STATE_KEY "replication_state_key__"
  16. #define REPLICATION_END_KEY "replication_end_key__"
  17. #define REPLICATION_HOLDER_VALUE "9876543210"
  18. extern DTCConfig *gConfig;
  19. // operation level, [prev state][current state]
  20. static bool gReplicationState[(int)ReplicationState::eReplicationMax + 1][(int)ReplicationState::eReplicationMax + 1] = {
  21. {true, true, true, false, true},
  22. {false, true, true, false, false},
  23. {false, true, true, true, true},
  24. {false, true, false, true, true},
  25. {true, false, false, false, true}
  26. };
  27. int getReplicationState(RocksDBConn* rocksdb)
  28. {
  29. std::string state;
  30. RocksDBConn::RocksItr_t rocksItr;
  31. int ret = rocksdb->get_entry(REPLICATION_STATE_KEY, state, RocksDBConn::COLUMN_META_DATA);
  32. if (ret == -RocksDBConn::ERROR_KEY_NOT_FOUND)
  33. {
  34. return (int)ReplicationState::eReplicationNoOpr;
  35. }
  36. else if (ret < 0)
  37. {
  38. log_error("get the replication state from rocksdb failed! ret:%d", ret);
  39. return -1;
  40. }
  41. return std::stoi(state);
  42. }
  43. int updateReplicationState(RocksDBConn* rocksdb, int state)
  44. {
  45. int prevState = getReplicationState(rocksdb);
  46. if (!gReplicationState[(int)prevState][(int)state])
  47. {
  48. log_error("incompatible state, prev:%d, curr:%d", prevState, state);
  49. return -1;
  50. }
  51. int ret = rocksdb->replace_entry(REPLICATION_STATE_KEY, std::to_string(state), true, RocksDBConn::COLUMN_META_DATA);
  52. if (ret < 0)
  53. {
  54. log_error("init replication status failed! errcode:%d", ret);
  55. }
  56. return ret;
  57. }
  58. RocksReplicationChannel::RocksReplicationChannel(
  59. WorkerType type,
  60. RocksDBConn* rocksdb,
  61. PollThread* poll,
  62. int fd)
  63. :
  64. PollerObject(poll, fd),
  65. mWorkerType(type),
  66. mRocksdb(rocksdb),
  67. mLocalReplicationThread(poll)
  68. {
  69. mPacketBuffer = new ElasticBuffer();
  70. }
  71. RocksReplicationChannel::~RocksReplicationChannel()
  72. {
  73. // call base to free the connection
  74. // free elastic buffer
  75. if (mPacketBuffer) free(mPacketBuffer);
  76. }
  77. int RocksReplicationChannel::attachThread()
  78. {
  79. enable_input();
  80. int ret = PollerObject::attach_poller();
  81. if (ret < 0)
  82. {
  83. log_error("attach thread failed!, fd:%d", netfd);
  84. return -1;
  85. }
  86. return 0;
  87. }
  88. void RocksReplicationChannel::input_notify()
  89. {
  90. log_error("come into rocksdb replication inputNotify");
  91. switch (mWorkerType)
  92. {
  93. case WorkerType::eReplListener:
  94. return handleAccept();
  95. case WorkerType::eReplChannel:
  96. return handleReplication();
  97. default:
  98. log_error("unkonwn replication worker type:%d", (int)mWorkerType);
  99. }
  100. return;
  101. }
  102. void RocksReplicationChannel::output_notify()
  103. {
  104. log_error("OutputNotify:: can never come here!!!");
  105. return;
  106. }
  107. void RocksReplicationChannel::hangup_notify()
  108. {
  109. log_error("HangupNotify:: close connection!");
  110. // close socket
  111. PollerObject::detach_poller();
  112. delete this;
  113. }
  114. int RocksReplicationChannel::triggerReplication()
  115. {
  116. // do fully replication from the end of rocksdb
  117. std::string tempValue;
  118. RocksDBConn::RocksItr_t rocksItr;
  119. int ret = mRocksdb->get_last_entry(mReplStartKey, tempValue, rocksItr);
  120. if (ret < 0)
  121. {
  122. log_error("get the start replication key from rocksdb failed!");
  123. return -1;
  124. }
  125. // get replication end key from rocksdb in 'COLUMN_META_DATA' column family
  126. mReplEndKey = "";
  127. ret = mRocksdb->get_entry(REPLICATION_END_KEY, mReplEndKey, RocksDBConn::COLUMN_META_DATA);
  128. if (ret < 0 && ret != -RocksDBConn::ERROR_KEY_NOT_FOUND)
  129. {
  130. log_error("get the start replication key from rocksdb failed! ret:%d", ret);
  131. return -1;
  132. }
  133. ret = slaveConstructRequest(RocksdbReplication::eReplSync);
  134. if (ret < 0)
  135. {
  136. log_error("construct slave sync request failed!");
  137. return -1;
  138. }
  139. ret = sendReplicationData();
  140. if (ret < 0)
  141. {
  142. log_error("send sync replication message failed!");
  143. return -1;
  144. }
  145. int state = (int)ReplicationState::eReplicationRegistry;
  146. ret = updateReplicationState(mRocksdb, state);
  147. if (ret < 0)
  148. {
  149. log_error("init replication state failed!");
  150. return -1;
  151. }
  152. return 0;
  153. }
  154. // accept slave connection and create replication channel
  155. void RocksReplicationChannel::handleAccept()
  156. {
  157. log_error("handle accept!!!!!");
  158. int slaveFd;
  159. struct sockaddr_in peer;
  160. socklen_t peerSize = sizeof(peer);
  161. // extracts all the connected connections in the pending queue until return
  162. // EAGAIN
  163. while (true)
  164. {
  165. slaveFd = accept(netfd, (struct sockaddr*)&peer, &peerSize);
  166. if (-1 == slaveFd)
  167. {
  168. if (errno == EINTR)
  169. {
  170. // system call "accept" was interrupted by signal before a valid connection
  171. // arrived, go on acceptting
  172. continue;
  173. }
  174. if(errno == EAGAIN || errno == EWOULDBLOCK)
  175. {
  176. // no remaining connection on the pending queue, break out
  177. // log_notice("accept new client error: %m, %d", errno);
  178. return;
  179. }
  180. // accept error
  181. log_error("accept slave connectting failed, netfd:%d, errno:%d", netfd, errno);
  182. return;
  183. }
  184. log_error("accept slave connectting, peerAddr:%s, slaveFd:%d", inet_ntoa(peer.sin_addr), slaveFd);
  185. // construct replication worker
  186. RocksReplicationChannel* channel = new RocksReplicationChannel(RocksReplicationChannel::eReplChannel, mRocksdb, mLocalReplicationThread, slaveFd);
  187. if (!channel)
  188. {
  189. close(slaveFd);
  190. log_error("create replication channel failed! peerAddr:%s", inet_ntoa(peer.sin_addr));
  191. // return -RocksdbReplication::eAcceptSlaveFailed;
  192. return;
  193. }
  194. int ret = channel->attachThread();
  195. if (ret < 0)
  196. {
  197. delete channel;
  198. log_error("add replication channel to epoll failed! slaveAddr:%s, fd:%d", inet_ntoa(peer.sin_addr), slaveFd);
  199. // return -RocksdbReplication::eAcceptSlaveFailed;
  200. return;
  201. }
  202. }
  203. return;
  204. }
  205. // do replication in the channel
  206. void RocksReplicationChannel::handleReplication()
  207. {
  208. int ret = recieveReplicationData();
  209. if (ret < 0) goto HANDLER_END;
  210. // check header
  211. assert (mPacketHeader.sMagic == REPLICATON_HEADER_MAGIC);
  212. {
  213. RocksdbReplication::ReplicationType type = GET_REQUEST_TYPE(mPacketHeader.sPacketFlag);
  214. CLEAR_REQUEST_TYPE(mPacketHeader.sPacketFlag , type);
  215. switch (type)
  216. {
  217. case RocksdbReplication::eReplSync:
  218. ret = handleReplicationRegister();
  219. break;
  220. case RocksdbReplication::eReplReqAck:
  221. ret = handleReplicationRequest();
  222. break;
  223. case RocksdbReplication::eReplRepAck:
  224. ret = handleReplicationResponse();
  225. break;
  226. case RocksdbReplication::eReplFin:
  227. ret = handleReplicationFinished();
  228. break;
  229. default:
  230. ret = -1;
  231. log_error("unkonwn request type:%d", (int)type);
  232. }
  233. }
  234. HANDLER_END:
  235. if (ret < 0)
  236. {
  237. int state = (int)ReplicationState::eReplicationFailed;
  238. int ret = updateReplicationState(mRocksdb, state);
  239. if (ret < 0)
  240. {
  241. log_error("update replication state failed!");
  242. }
  243. hangup_notify();
  244. }
  245. return;
  246. }
  247. // master handle slave replication registry
  248. int RocksReplicationChannel::handleReplicationRegister()
  249. {
  250. // get replication start key and end key
  251. int ret;
  252. std::string startKey = "", endKey = "";
  253. if (HAS_START_KEY(mPacketHeader.sPacketFlag))
  254. {
  255. ret = mPacketBuffer->getStrValue(startKey);
  256. if (ret != 0)
  257. {
  258. log_error("get start key failed!");
  259. return -1;
  260. }
  261. }
  262. CLEAR_START_KEY(mPacketHeader.sPacketFlag);
  263. if (HAS_END_KEY(mPacketHeader.sPacketFlag))
  264. {
  265. ret = mPacketBuffer->getStrValue(endKey);
  266. if (ret != 0)
  267. {
  268. log_error("get start key failed!");
  269. return -1;
  270. }
  271. CLEAR_END_KEY(mPacketHeader.sPacketFlag);
  272. }
  273. else
  274. {
  275. // bring back end key
  276. SET_END_KEY(mPacketHeader.sPacketFlag);
  277. }
  278. ret = masterFillRangeKV(startKey, endKey);
  279. if (ret < 0)
  280. {
  281. log_error("get range key-value failed! key:%s, value:%s", startKey.c_str(), endKey.c_str());
  282. return -1;
  283. }
  284. ret = sendReplicationData();
  285. if (ret < 0)
  286. {
  287. log_error("send replication data to slave failed! key:%s, value:%s", startKey.c_str(), endKey.c_str());
  288. return -1;
  289. }
  290. // master no need to hold on the replication state
  291. // int state = (int)ReplicationState::eReplicationFullSync;
  292. // ret = updateReplicationState(mRocksdb, state);
  293. // if (ret < 0)
  294. // {
  295. // log_error("update replication state failed!");
  296. // return -1;
  297. // }
  298. return 0;
  299. }
  300. // master handle replication request
  301. int RocksReplicationChannel::handleReplicationRequest()
  302. {
  303. // get replication start key and end key
  304. std::string startKey = "", endKey = "";
  305. if (HAS_START_KEY(mPacketHeader.sPacketFlag) == 0)
  306. {
  307. log_error("no replication start key in request!");
  308. return -1;
  309. }
  310. int ret = mPacketBuffer->getStrValue(startKey);
  311. if (ret != 0)
  312. {
  313. log_error("get start key failed!");
  314. return -1;
  315. }
  316. if (HAS_END_KEY(mPacketHeader.sPacketFlag) == 0)
  317. {
  318. log_error("no replication end key in request!");
  319. return -1;
  320. }
  321. ret = mPacketBuffer->getStrValue(endKey);
  322. if (ret != 0 || endKey.empty())
  323. {
  324. log_error("get start key failed! ret:%d", ret);
  325. return -1;
  326. }
  327. CLEAR_START_KEY(mPacketHeader.sPacketFlag);
  328. CLEAR_END_KEY(mPacketHeader.sPacketFlag);
  329. ret = masterFillRangeKV(startKey, endKey);
  330. if (ret < 0)
  331. {
  332. log_error("get range key-value failed! key:%s, value:%s", startKey.c_str(), endKey.c_str());
  333. return -1;
  334. }
  335. ret = sendReplicationData();
  336. if (ret < 0)
  337. {
  338. log_error("send replication data to slave failed! key:%s, value:%s", startKey.c_str(), endKey.c_str());
  339. return -1;
  340. }
  341. return 0;
  342. }
  343. // slave side
  344. int RocksReplicationChannel::handleReplicationResponse()
  345. {
  346. int ret;
  347. // response from handshake with 'replication sync' cmd
  348. if (HAS_END_KEY(mPacketHeader.sPacketFlag))
  349. {
  350. ret = mPacketBuffer->getStrValue(mReplEndKey);
  351. if (ret != 0)
  352. {
  353. log_error("get start key failed!");
  354. return -1;
  355. }
  356. assert(!mReplEndKey.empty());
  357. ret = mRocksdb->insert_entry(REPLICATION_END_KEY, mReplEndKey, true, RocksDBConn::COLUMN_META_DATA);
  358. if (ret >= 0)
  359. {
  360. }
  361. else if (ret == RocksDBConn::ERROR_DUPLICATE_KEY)
  362. {
  363. log_info("insert duplicate replication end key! key:%s", mReplEndKey.c_str());
  364. }
  365. else
  366. {
  367. log_error("insert the replication end key into rocksdb failed! ret:%d, key:%s", ret, mReplEndKey.c_str());
  368. return -1;
  369. }
  370. }
  371. ret = slaveFillRangeKV();
  372. if (ret < 0)
  373. {
  374. log_error("save range key-value failed!");
  375. return -1;
  376. }
  377. ret = slaveConstructRequest(RocksdbReplication::eReplReqAck);
  378. if (ret < 0)
  379. {
  380. log_error("slave side construct replication request failed!");
  381. return -1;
  382. }
  383. ret = sendReplicationData();
  384. if (ret < 0)
  385. {
  386. log_error("send replication data to slave failed! startKey:%s, endKey:%s", mReplStartKey.c_str(), mReplEndKey.c_str());
  387. return -1;
  388. }
  389. return 0;
  390. }
  391. // slave do fully replication finished
  392. int RocksReplicationChannel::handleReplicationFinished()
  393. {
  394. int ret;
  395. // response from handshake with 'replication sync' cmd
  396. if (HAS_END_KEY(mPacketHeader.sPacketFlag))
  397. {
  398. ret = mPacketBuffer->getStrValue(mReplEndKey);
  399. if (ret != 0)
  400. {
  401. log_error("get start key failed!");
  402. return -1;
  403. }
  404. assert(!mReplEndKey.empty());
  405. // save replication end key to prevent crash
  406. ret = mRocksdb->insert_entry(REPLICATION_END_KEY, mReplEndKey, true, RocksDBConn::COLUMN_META_DATA);
  407. if (ret >= 0)
  408. {
  409. }
  410. else if (ret == RocksDBConn::ERROR_DUPLICATE_KEY)
  411. {
  412. log_info("insert duplicate replication end key! key:%s", mReplEndKey.c_str());
  413. }
  414. else
  415. {
  416. log_error("insert the replication end key into rocksdb failed! ret:%d, key:%s", ret, mReplEndKey.c_str());
  417. return -1;
  418. }
  419. }
  420. ret = slaveFillRangeKV();
  421. if (ret < 0)
  422. {
  423. log_error("save range key-value failed!");
  424. return -1;
  425. }
  426. // replication has safely done, remove meta data from storage and close the connection
  427. assert(!mReplEndKey.empty());
  428. ret = mRocksdb->delete_entry(mReplEndKey, true, RocksDBConn::COLUMN_META_DATA);
  429. if (ret < 0)
  430. {
  431. log_error("remove replication end key failed! key:%s, ret:%d", mReplEndKey.c_str(), ret);
  432. return -1;
  433. }
  434. int state = (int)ReplicationState::eReplicationFinished;
  435. ret = updateReplicationState(mRocksdb, state);
  436. if (ret < 0)
  437. {
  438. log_error("update replication state failed!");
  439. return -1;
  440. }
  441. // close the connection
  442. hangup_notify();
  443. log_error("do rocksdb fully replication finished!");
  444. return 0;
  445. }
  446. // query data in range of (startKey, endKey]
  447. int RocksReplicationChannel::masterFillRangeKV(
  448. std::string& startKey,
  449. std::string& endKey)
  450. {
  451. mPacketBuffer->resetElasticBuffer();
  452. // save header
  453. char* pos = mPacketBuffer->getWritingPos(sizeof(ReplicationPacket_t), true);
  454. memmove((void*)pos, (void*)&mPacketHeader, sizeof(ReplicationPacket_t));
  455. mPacketBuffer->drawingWritingPos(sizeof(ReplicationPacket_t));
  456. // save replication end key
  457. int ret;
  458. bool finished = false;
  459. std::string value;
  460. RocksDBConn::RocksItr_t rocksItr;
  461. if (endKey.empty())
  462. {
  463. // get end key from rocksdb
  464. ret = mRocksdb->get_last_entry(endKey, value, rocksItr);
  465. if (ret < 0)
  466. {
  467. log_error("get the start replication key from rocksdb failed!");
  468. return -1;
  469. }
  470. mPacketBuffer->appendStrValue(endKey);
  471. }
  472. if (endKey.empty())
  473. {
  474. // no data in storage
  475. finished = true;
  476. goto RANGE_QUERY_END;
  477. }
  478. ret = mRocksdb->retrieve_start(startKey, value, rocksItr, false);
  479. while (mPacketBuffer->getBufferSize() + 2*sizeof(int) + startKey.length() + value.length() < REPLICATION_PACKET_SIZE) {
  480. if (ret < 0)
  481. {
  482. log_error("query rocksdb failed! key:%s, ret:%d", startKey.c_str(), ret);
  483. mRocksdb->retrieve_end(rocksItr);
  484. return -1;
  485. }
  486. else if (ret == 1)
  487. {
  488. finished = true;
  489. log_info("no matched key:%s", startKey.c_str());
  490. break;
  491. }
  492. // save kv
  493. ret = mPacketBuffer->appendStrValue(startKey);
  494. if (ret < 0)
  495. {
  496. log_error("append key failed! key:%s", startKey.c_str());
  497. return -1;
  498. }
  499. ret = mPacketBuffer->appendStrValue(value);
  500. if (ret < 0)
  501. {
  502. log_error("append value failed! value:%s", value.c_str());
  503. return -1;
  504. }
  505. // skip the range of fully replication
  506. if([&startKey, &endKey]() -> bool{
  507. return startKey.compare(endKey) >= 0;
  508. }())
  509. {
  510. finished = true;
  511. break;
  512. }
  513. ret = mRocksdb->next_entry(rocksItr, startKey, value);
  514. };
  515. mRocksdb->retrieve_end(rocksItr);
  516. RANGE_QUERY_END:
  517. ReplicationPacket_t* respHeader = (ReplicationPacket_t*)(mPacketBuffer->getHeadBuffer()->sData);
  518. if (finished) {
  519. SET_REQUEST_TYPE(respHeader->sPacketFlag, RocksdbReplication::eReplFin);
  520. int state = (int)ReplicationState::eReplicationFinished;
  521. int ret = updateReplicationState(mRocksdb, state);
  522. if (ret < 0)
  523. {
  524. log_error("update replication state failed!");
  525. return -1;
  526. }
  527. } else {
  528. SET_REQUEST_TYPE(respHeader->sPacketFlag, RocksdbReplication::eReplRepAck);
  529. }
  530. respHeader->sRawPacketLen = mPacketBuffer->getBufferSize();
  531. return 0;
  532. }
  533. int RocksReplicationChannel::slaveConstructRequest(RocksdbReplication::ReplicationType rType)
  534. {
  535. // build replication sync request
  536. CLEAR_BITS(mPacketHeader.sPacketFlag);
  537. switch (rType)
  538. {
  539. case RocksdbReplication::eReplSync:
  540. SET_REQUEST_TYPE(mPacketHeader.sPacketFlag, RocksdbReplication::eReplSync);
  541. break;
  542. case RocksdbReplication::eReplReqAck:
  543. SET_REQUEST_TYPE(mPacketHeader.sPacketFlag, RocksdbReplication::eReplReqAck);
  544. break;
  545. default:
  546. log_error("unsupport request type:%d", rType);
  547. return -1;
  548. }
  549. if (!mReplStartKey.empty()) SET_START_KEY(mPacketHeader.sPacketFlag);
  550. if (!mReplEndKey.empty()) SET_END_KEY(mPacketHeader.sPacketFlag);
  551. mPacketBuffer->resetElasticBuffer();
  552. char* pos = mPacketBuffer->getWritingPos(sizeof(ReplicationPacket_t), true);
  553. if (!pos)
  554. {
  555. log_error("allocate space for raw data failed!");
  556. return -1;
  557. }
  558. memcpy((void*)pos, (void*)&mPacketHeader, sizeof(ReplicationPacket_t));
  559. mPacketBuffer->drawingWritingPos(sizeof(ReplicationPacket_t));
  560. int ret;
  561. // padding replication key
  562. if (!mReplStartKey.empty())
  563. {
  564. ret = mPacketBuffer->appendStrValue(mReplStartKey);
  565. if (ret < 0)
  566. {
  567. log_error("append start key failed! key:%s", mReplStartKey.data());
  568. return -1;
  569. }
  570. }
  571. if (!mReplEndKey.empty())
  572. {
  573. ret = mPacketBuffer->appendStrValue(mReplEndKey);
  574. if (ret < 0)
  575. {
  576. log_error("append end key failed! key:%s", mReplEndKey.data());
  577. return -1;
  578. }
  579. }
  580. // set packet len
  581. ReplicationPacket_t* header = (ReplicationPacket_t*)mPacketBuffer->getHeadBuffer()->sData;
  582. header->sRawPacketLen = mPacketBuffer->getBufferSize();
  583. return 0;
  584. }
  585. // save kv those replicating from master
  586. int RocksReplicationChannel::slaveFillRangeKV()
  587. {
  588. int ret;
  589. std::string key, value;
  590. std::map<std::string, std::string> newEntries;
  591. while (true)
  592. {
  593. mReplStartKey = std::move(key);
  594. ret = mPacketBuffer->getStrValue(key);
  595. if (ret < 0)
  596. {
  597. log_error("get key from elastic buffer failed!");
  598. return -1;
  599. }
  600. else if (1 == ret) break;
  601. ret = mPacketBuffer->getStrValue(value);
  602. if (ret != 0)
  603. {
  604. log_error("get value from elsatic buffer failed! key:%s", key.c_str());
  605. return -1;
  606. }
  607. newEntries[key] = value;
  608. //std::string dKey, dVal;
  609. //extern HelperProcessBase* helperProc;
  610. //RocksdbProcess* p_rocksdb_process = dynamic_cast<RocksdbProcess*>(helperProc);
  611. //if (p_rocksdb_process != NULL){
  612. // ret = p_rocksdb_process->decodeInternalKV(
  613. // key, value, dKey, dVal);
  614. //}
  615. //log_error("replication kv, :%s,%s", dKey.c_str(), dVal.c_str());
  616. }
  617. ret = mRocksdb->batch_update(std::set<std::string>(), newEntries, true);
  618. if (ret !=0)
  619. {
  620. log_error("batch update replication keys failed!");
  621. return -1;
  622. }
  623. return 0;
  624. }
  625. int RocksReplicationChannel::recieveReplicationData()
  626. {
  627. // recieve header
  628. memset((void*)&mPacketHeader, 0, sizeof(ReplicationPacket_t));
  629. int dataLen = sizeof(ReplicationPacket_t);
  630. int ret = ReplicationUtil::recieveMessage(netfd, (char*)&mPacketHeader, dataLen);
  631. if (ret != dataLen)
  632. {
  633. log_error("blocking read error! readLen:%d", ret);
  634. return -1;
  635. }
  636. // recieve kv
  637. mPacketBuffer->resetElasticBuffer();
  638. dataLen = mPacketHeader.sRawPacketLen - sizeof(ReplicationPacket_t);
  639. while (dataLen > 0)
  640. {
  641. int singleReadLen = dataLen > BUFFER_SIZE ? BUFFER_SIZE : dataLen;
  642. char* pos = mPacketBuffer->getWritingPos(singleReadLen);
  643. if (!pos)
  644. {
  645. log_error("allocate space for raw data failed!");
  646. return -1;
  647. }
  648. int ret = ReplicationUtil::recieveMessage(netfd, pos, singleReadLen);
  649. if (ret != singleReadLen)
  650. {
  651. log_error("blocking read error! readLen:%d", ret);
  652. return -1;
  653. }
  654. mPacketBuffer->drawingWritingPos(singleReadLen);
  655. dataLen -= singleReadLen;
  656. }
  657. return 0;
  658. }
  659. int RocksReplicationChannel::sendReplicationData()
  660. {
  661. int ret;
  662. Buffer_t* cur = mPacketBuffer->getHeadBuffer();
  663. while (cur)
  664. {
  665. ret = ReplicationUtil::sendMessage(netfd, cur->sData, cur->sDataLen);
  666. if (ret != cur->sDataLen)
  667. {
  668. log_error("send replication data failed!");
  669. return -1;
  670. }
  671. cur = mPacketBuffer->nextBuffer(cur);
  672. }
  673. return 0;
  674. }
  675. /////////////////////////////////////////////////////////////////////
  676. //
  677. // RocksdbReplication implementation
  678. //
  679. /////////////////////////////////////////////////////////////////////
  680. RocksdbReplication::RocksdbReplication(RocksDBConn* rocksdb)
  681. :
  682. mRocksdb(rocksdb),
  683. mGlobalReplicationThread(new PollThread("RocksdbReplicator"))
  684. {
  685. }
  686. int RocksdbReplication::initializeReplication()
  687. {
  688. // initialize thread
  689. assert(mGlobalReplicationThread);
  690. int ret = mGlobalReplicationThread->initialize_thread();
  691. if (ret < 0)
  692. {
  693. log_error("initialize thread poll failed.");
  694. return -1;
  695. }
  696. // running thread
  697. mGlobalReplicationThread->running_thread();
  698. ret = startMasterListener();
  699. if (ret < 0)
  700. {
  701. log_error("start listener thread failed.");
  702. return -1;
  703. }
  704. return 0;
  705. }
  706. // every rocksdb instance will listen on the replication port
  707. int RocksdbReplication::startMasterListener()
  708. {
  709. // initialize replication state
  710. int state = (int)ReplicationState::eReplicationNoOpr;
  711. int ret = updateReplicationState(mRocksdb, state);
  712. if (ret < 0)
  713. {
  714. log_error("init replication state failed!");
  715. return -1;
  716. }
  717. // bind port
  718. int socketFd, masterPort;
  719. std::string masterIp = dtc::utils::get_local_ip();
  720. // std::string masterIp = "0.0.0.0";
  721. if (masterIp.empty())
  722. {
  723. log_error("get local ip failed!");
  724. return -1;
  725. }
  726. masterPort = gConfig->get_int_val("cache", "HelperReplPort", -1);
  727. // masterPort = 40411;
  728. if (masterPort <= 0)
  729. {
  730. log_error("get table def failed!");
  731. return -1;
  732. }
  733. socketFd = ReplicationUtil::sockBind(masterIp, masterPort, 256);
  734. if (socketFd < 0)
  735. {
  736. log_error("bind addr to socket failed!");
  737. return -1;
  738. }
  739. log_error("rocksdb replication start to listen on addr:%s:%d, fd:%d", masterIp.c_str(), masterPort, socketFd);
  740. RocksReplicationChannel* listener = new RocksReplicationChannel(RocksReplicationChannel::eReplListener, mRocksdb, mGlobalReplicationThread, socketFd);
  741. if (!listener)
  742. {
  743. close(socketFd);
  744. log_error("start master replication failed!");
  745. return -RocksdbReplication::eStartMasterFailed;
  746. }
  747. ret = listener->attachThread();
  748. if (ret < 0)
  749. {
  750. delete listener;
  751. log_error("attach thread failed! fd:%d", socketFd);
  752. return -RocksdbReplication::eStartMasterFailed;
  753. }
  754. //while (true)
  755. {
  756. // listener->handleAccept();
  757. // sleep(7);
  758. }
  759. return 0;
  760. }
  761. int RocksdbReplication::startSlaveReplication(
  762. const std::string& masterIp,
  763. int masterPort)
  764. {
  765. // create connection to the master and add it to the poll with blocking mode
  766. int fd = ReplicationUtil::connectServer(masterIp, masterPort);
  767. if (fd < 0) return -ReplicationErr::eConnectRefused;
  768. // create replication channel
  769. RocksReplicationChannel* channel = new RocksReplicationChannel(RocksReplicationChannel::eReplChannel, mRocksdb, mGlobalReplicationThread, fd);
  770. if (!channel)
  771. {
  772. close(fd);
  773. log_error("create replication channel failed! addr:%s:%d", masterIp.c_str(), masterPort);
  774. return -RocksdbReplication::eStartMasterFailed;
  775. }
  776. int ret = channel->attachThread();
  777. if (ret < 0)
  778. {
  779. delete channel;
  780. log_error("attach thread failed! fd:%d", fd);
  781. return -RocksdbReplication::eStartMasterFailed;
  782. }
  783. log_error("create replication channel success, fd:%d", fd);
  784. return channel->triggerReplication();
  785. }
  786. int RocksdbReplication::getReplicationState()
  787. {
  788. return ::getReplicationState(mRocksdb);
  789. }