rpc_buffer.h 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. /*
  2. Copyright (c) 2020 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __RPC_BUFFER_H__
  14. #define __RPC_BUFFER_H__
  15. #ifdef _WIN32
  16. #include <workflow/PlatformSocket.h>
  17. #else
  18. #include <sys/uio.h>
  19. #endif
  20. #include <stddef.h>
  21. #include <string.h>
  22. #include <list>
  23. namespace srpc
  24. {
  25. static constexpr int BUFFER_PIECE_MIN_SIZE = 2 * 1024;
  26. static constexpr int BUFFER_PIECE_MAX_SIZE = 256 * 1024;
  27. static constexpr int BUFFER_MODE_COPY = 0;
  28. static constexpr int BUFFER_MODE_NOCOPY = 1;
  29. static constexpr int BUFFER_MODE_GIFT_NEW = 2;
  30. static constexpr int BUFFER_MODE_GIFT_MALLOC = 3;
  31. static constexpr bool BUFFER_FETCH_MOVE = true;
  32. static constexpr bool BUFFER_FETCH_STAY = false;
  33. /**
  34. * @brief Buffer Class
  35. * @details
  36. * - Thread Safety : NO
  37. * - All buffer should allocated by new char[...] or malloc
  38. * - Gather buffer piece by piece
  39. * - Get buffer one by one
  40. */
  41. class RPCBuffer
  42. {
  43. public:
  44. /**
  45. * @brief Free all buffer and rewind every state to initialize.
  46. * @note NEVER fail
  47. */
  48. void clear();
  49. /**
  50. * @brief Get all size of buffer
  51. */
  52. size_t size() const { return size_; }
  53. /**
  54. * @brief Cut current buffer at absolutely offset. Current buffer keeps
  55. * the first part and gives the second part to the out buffer.
  56. * @param[in] offset where to cut
  57. * @param[in] out points to out buffer
  58. * @return actual give how many bytes to out
  59. * @note this will cause current buffer rewind()
  60. */
  61. size_t cut(size_t offset, RPCBuffer *out);
  62. public:
  63. /**
  64. * @brief For write. Add one buffer allocated by RPCBuffer
  65. * @param[in,out] buf a pointer to a buffer
  66. * @param[in,out] size points to the expect size of buffer, return actual size
  67. * @return false if OOM, or your will get a buffer actual size <=expect-size
  68. * @note Ownership of this buffer remains with the stream
  69. */
  70. bool acquire(void **buf, size_t *size);
  71. /**
  72. * @brief For write. Add one buffer allocated by RPCBuffer
  73. * @param[in,out] buf a pointer to a buffer
  74. * @return 0 if OOM, or the actual size of the buffer
  75. * @note Ownership of this buffer remains with the stream
  76. */
  77. size_t acquire(void **buf);
  78. /**
  79. * @brief For write. Add one buffer
  80. * @param[in] buf upstream name
  81. * @param[in] buflen consistent-hash functional
  82. * @param[in] mode BUFFER_MODE_XXX
  83. * @return false if OOM
  84. * @note BUFFER_MODE_NOCOPY mean in deconstructor ignore this buffer
  85. * @note BUFFER_MODE_COPY mean memcpy this buffer right here at once
  86. * @note BUFFER_MODE_GIFT_NEW mean in deconstructor delete this buffer
  87. * @note BUFFER_MODE_GIFT_MALLOC mean in deconstructor free this buffer
  88. */
  89. bool append(void *buf, size_t buflen, int mode);
  90. bool append(const void *buf, size_t buflen, int mode);
  91. /**
  92. * @brief For write. Backs up a number of bytes of last buffer
  93. * @param[in] count how many bytes back up
  94. * @return the actual bytes backup
  95. * @note It is affect the buffer with both acquire and append
  96. * @note count should be less than or equal to the size of the last buffer
  97. */
  98. size_t backup(size_t count);
  99. /**
  100. * @brief For write. Add one buffer use memcpy
  101. * @param[in] buf buffer
  102. * @param[in] buflen buffer size
  103. * @return false if OOM
  104. */
  105. bool write(const void *buf, size_t buflen);
  106. public:
  107. /**
  108. * @brief For workflow message encode.
  109. * @param[in] iov iov vector
  110. * @param[in] count iov vector count
  111. * @return use how many iov
  112. * @retval -1 when count <=0, and set errno EINVAL
  113. */
  114. int encode(struct iovec *iov, int count);
  115. /**
  116. * @brief merge all buffer into one piece
  117. * @param[out] iov pointer and length of result
  118. * @return suceess or OOM
  119. * @retval 0 success
  120. * @retval -1 OOM
  121. */
  122. int merge_all(struct iovec& iov);
  123. public:
  124. /**
  125. * @brief For read. Reset buffer position to head
  126. * @note NEVER fail
  127. */
  128. void rewind();
  129. /**
  130. * @brief For read. Get one buffer, NO move offset
  131. * @param[in,out] buf a pointer to a buffer
  132. * @return 0 if no more data to read, or the forward size available
  133. */
  134. size_t peek(const void **buf);
  135. /**
  136. * @brief For read. Get one buffer by except size, move offset
  137. * @param[in,out] buf a pointer to a buffer
  138. * @param[in,out] size except buffer size, and return actual size
  139. * @return false if OOM, or your will get a buffer actual size <=expect-size
  140. */
  141. bool fetch(const void **buf, size_t *size);
  142. /**
  143. * @brief For read. Get one buffer, move offset
  144. * @param[in,out] buf a pointer to a buffer
  145. * @return 0 if no more data to read, or the forward size available
  146. */
  147. size_t fetch(const void **buf);
  148. /**
  149. * @brief For read. Fill one buffer with memcpy, move offset
  150. * @param[in] buf buffer wait to fill
  151. * @param[in] buflen except buffer size
  152. * @return true if fill buffer exactly bytes, false if no more data to read
  153. */
  154. bool read(void *buf, size_t buflen);
  155. /**
  156. * @brief For read. move offset, positive mean skip, negative mean backward
  157. * @param[in] offset except move offset
  158. * @return actual move offset
  159. * @note If offset=0, do nothing at all
  160. */
  161. long seek(long offset);
  162. public:
  163. void set_piece_min_size(size_t size) { piece_min_size_ = size; }
  164. void set_piece_max_size(size_t size) { piece_max_size_ = size; }
  165. RPCBuffer() = default;
  166. ~RPCBuffer();
  167. RPCBuffer(const RPCBuffer&) = delete;
  168. RPCBuffer(RPCBuffer&&) = delete;
  169. RPCBuffer& operator=(const RPCBuffer&) = delete;
  170. RPCBuffer& operator=(RPCBuffer&&) = delete;
  171. private:
  172. struct buffer_t
  173. {
  174. void *buf;
  175. size_t buflen;
  176. bool is_nocopy;
  177. bool is_new;
  178. };
  179. void clear_list_buffer();
  180. size_t internal_fetch(const void **buf, bool move_or_stay);
  181. long read_skip(long offset);
  182. long read_back(long offset);
  183. std::list<buffer_t> buffer_list_;
  184. std::pair<std::list<buffer_t>::iterator, size_t> cur_;
  185. size_t size_ = 0;
  186. size_t piece_min_size_ = BUFFER_PIECE_MIN_SIZE;
  187. size_t piece_max_size_ = BUFFER_PIECE_MAX_SIZE;
  188. bool init_read_over_ = false;
  189. size_t last_piece_left_ = 0;
  190. };
  191. } // namespace srpc
  192. #endif