rocksdb_replication.h 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. //////////////////////////////////////////////////////////////////////
  2. //
  3. //
  4. // replication unit under Master-slave Architechture
  5. //
  6. //////////////////////////////////////////////////////////////////////
  7. #ifndef __ROCKSDB_REPLICATION_H_
  8. #define __ROCKSDB_REPLICATION_H_
  9. #include "common/poller.h"
  10. #include "elastic_buffer.h"
  11. #include "rocksdb_conn.h"
  12. #define REPLICATON_HEADER_MAGIC 0x7654321
  13. #define REPLICATION_PACKET_SIZE (1 * (1 << 20)) // 1M
  14. #define CLEAR_BITS(packetFlag) (packetFlag &= 0x0)
  15. // 2 bit for setting whether the key and value exist in the packet
  16. #define SET_START_KEY(packetFlag) (packetFlag |= 0x01)
  17. #define CLEAR_START_KEY(packetFlag) (packetFlag &= -2)
  18. #define HAS_START_KEY(packetFlag) (packetFlag & 0x01)
  19. #define SET_END_KEY(packetFlag) (packetFlag |= 0x02)
  20. #define CLEAR_END_KEY(packetFlag) (packetFlag &= -3)
  21. #define HAS_END_KEY(packetFlag) (packetFlag & 0x02)
  22. // 6 bit for replication type
  23. enum class ReplicationType;
  24. #define SET_REQUEST_TYPE(packetFlag, type) (packetFlag |= (type << 2))
  25. #define GET_REQUEST_TYPE(packetFlag) ((RocksdbReplication::ReplicationType)((packetFlag & 0xff) >> 2))
  26. #define CLEAR_REQUEST_TYPE(packetFlag, type) (packetFlag &= (-((type << 2)+1)))
  27. typedef struct ReplicationPacket
  28. {
  29. int sMagic = REPLICATON_HEADER_MAGIC;
  30. // char sReplicationType;
  31. unsigned int sPacketFlag;
  32. int sRawPacketLen;
  33. char kvRows[0];
  34. ReplicationPacket()
  35. {
  36. sPacketFlag = 0;
  37. sRawPacketLen = 0;
  38. }
  39. } ReplicationPacket_t;
  40. class PollThread;
  41. class RocksdbReplication
  42. {
  43. public:
  44. enum ReplicationType : unsigned char
  45. {
  46. eReplSync, // slave register
  47. eReplReqAck, // slave ask for replication
  48. eReplRepAck, // response from master
  49. eReplFin, // replication finished
  50. // can never bigger than the 'eReplMax'
  51. eReplMax = 63 // ((2 << 5) -1)
  52. };
  53. enum ReplicationErr
  54. {
  55. eConnectRefused,
  56. eStartMasterFailed,
  57. eAcceptSlaveFailed
  58. };
  59. private:
  60. RocksDBConn* mRocksdb;
  61. PollThread* mGlobalReplicationThread;
  62. public:
  63. RocksdbReplication(RocksDBConn* rocksdb);
  64. virtual ~RocksdbReplication() {}
  65. int initializeReplication();
  66. //////////////////////////////////////////
  67. // master api
  68. //////////////////////////////////////////
  69. //////////////////////////////////////////
  70. // slave api
  71. //////////////////////////////////////////
  72. int startSlaveReplication(
  73. const std::string& masterIp,
  74. int masterPort);
  75. int getReplicationState();
  76. private:
  77. int startMasterListener();
  78. // int updateReplicationState(int state);
  79. };
  80. class RocksReplicationChannel : public PollerObject
  81. {
  82. public:
  83. enum WorkerType
  84. {
  85. eReplListener, // master wait for slave connect
  86. eReplChannel // real replication channel
  87. };
  88. private:
  89. WorkerType mWorkerType;
  90. RocksDBConn* mRocksdb;
  91. PollThread* mLocalReplicationThread; // reference to mGlobalReplicationThread
  92. ReplicationPacket_t mPacketHeader;
  93. ElasticBuffer *mPacketBuffer;
  94. // slave temporary variable
  95. std::string mReplStartKey;
  96. std::string mReplEndKey;
  97. public:
  98. RocksReplicationChannel(
  99. WorkerType type,
  100. RocksDBConn* rocksdb,
  101. PollThread* poll,
  102. int fd);
  103. virtual ~RocksReplicationChannel();
  104. int attachThread();
  105. virtual void input_notify(void);
  106. virtual void output_notify(void);
  107. virtual void hangup_notify(void);
  108. int triggerReplication();
  109. private:
  110. public:
  111. void handleAccept();
  112. void handleReplication();
  113. int handleReplicationRegister();
  114. int handleReplicationRequest();
  115. int handleReplicationResponse();
  116. int handleReplicationFinished();
  117. int recieveReplicationData();
  118. int sendReplicationData();
  119. int masterFillRangeKV(std::string& startKey, std::string& endKey);
  120. int slaveFillRangeKV();
  121. int slaveConstructRequest(RocksdbReplication::ReplicationType rType);
  122. };
  123. #endif // __ROCKSDB_REPLICATION_H_