elastic_buffer.h 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. /*
  2. * =====================================================================================
  3. *
  4. * Filename: elastic_buffer.h
  5. *
  6. * Description: elastic buffer for replication between slave and master
  7. *
  8. * Version: 1.0
  9. * Created: 09/08/2020 10:02:05 PM
  10. * Revision: none
  11. * Compiler: gcc
  12. *
  13. * Author: Norton, yangshuang68@jd.com
  14. * Company: JD.com, Inc.
  15. *
  16. * =====================================================================================
  17. */
  18. #ifndef __ELASTIC_BUFFER_H__
  19. #define __ELASTIC_BUFFER_H__
  20. #include <string>
  21. // 1M for each buffer
  22. #define BUFFER_SIZE (1 << 20)
  23. typedef struct Buffer
  24. {
  25. unsigned int sDataLen;
  26. struct Buffer* sNext;
  27. char* sData;
  28. ~Buffer()
  29. {
  30. if (sData) delete sData;
  31. sNext= NULL;
  32. sData = NULL;
  33. }
  34. }Buffer_t;
  35. class ElasticBuffer
  36. {
  37. private:
  38. Buffer_t* mHeadBuffer;
  39. Buffer_t* mWritingBuffer;
  40. Buffer_t* mReadingBuffer;
  41. int mReadLen;
  42. int mCacheBufferNum;
  43. public:
  44. ElasticBuffer()
  45. :
  46. mHeadBuffer(NULL),
  47. mWritingBuffer(NULL),
  48. mReadingBuffer(NULL),
  49. mReadLen(0),
  50. mCacheBufferNum(0)
  51. {
  52. }
  53. virtual ~ElasticBuffer();
  54. int appendStrValue(const std::string& value);
  55. int getStrValue(std::string& value);
  56. Buffer_t* getHeadBuffer() { return mHeadBuffer; }
  57. Buffer_t* nextBuffer(const Buffer_t* curr) { return curr->sNext; }
  58. void resetElasticBuffer()
  59. {
  60. mReadLen = 0;
  61. mWritingBuffer = mHeadBuffer;
  62. while (mWritingBuffer)
  63. {
  64. mWritingBuffer->sDataLen = 0;
  65. mWritingBuffer = mWritingBuffer->sNext;
  66. }
  67. mReadingBuffer = mWritingBuffer = mHeadBuffer;
  68. }
  69. // raw data batch copy
  70. char* getWritingPos(int dataLen = 0, bool atomicLen = false);
  71. char* drawingWritingPos(int dataLen);
  72. char* getReadingPos() { return mReadingBuffer->sData + mReadLen; }
  73. char* drawingReadingPos(int dataLen);
  74. // use fixed buffer size with previours buffer unit to prevent TCP sticky
  75. int getBufferSize() {
  76. return mCacheBufferNum > 0 ?
  77. (BUFFER_SIZE * (mCacheBufferNum - 1) + mWritingBuffer->sDataLen) : 0; }
  78. private:
  79. int assignStrValue(
  80. std::string& value,
  81. int dataLen);
  82. int expandElasticBuffer(int dataLen);
  83. char* encodeLength(
  84. char *p,
  85. uint32_t len);
  86. int decodeLength(
  87. char* &cur,
  88. int pLen,
  89. uint32_t &len);
  90. };
  91. #endif // __ELASTIC_BUFFER_H__