async_file.h 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. /*
  2. * =====================================================================================
  3. *
  4. * Filename: async_file.h
  5. *
  6. * Description: async_file class definition.
  7. *
  8. * Version: 1.0
  9. * Created: 04/01/2021
  10. * Revision: none
  11. * Compiler: gcc
  12. *
  13. * Author: chenyujie, chenyujie28@jd.com@jd.com
  14. * Company: JD.com, Inc.
  15. *
  16. * =====================================================================================
  17. */
  18. #ifndef __HBP_ASYNC_FILE_H
  19. #define __HBP_ASYNC_FILE_H
  20. #include <list>
  21. #include <stdint.h>
  22. #include <dirent.h>
  23. #include <sys/mman.h>
  24. #include <sys/types.h>
  25. #include <sys/stat.h>
  26. #include <fcntl.h>
  27. #include <stdio.h>
  28. #include <unistd.h>
  29. // local
  30. #include "hwc_global.h"
  31. #include "afile_pos.h"
  32. // common
  33. #include "journal_id.h"
  34. #include "buffer.h"
  35. #include "log/log.h"
  36. class CMapBase {
  37. public:
  38. CMapBase();
  39. virtual ~ CMapBase();
  40. public:
  41. int Mount(const char *path, int rw, int size = 0);
  42. void Unlink();
  43. const int Size() {
  44. return _size;
  45. } const char *ErrorMessage() {
  46. return _errmsg;
  47. }
  48. private:
  49. void unmount();
  50. inline void close_fd() {
  51. if (_fd > 0) {
  52. close(_fd);
  53. _fd = -1;
  54. }
  55. }
  56. inline void unmap() {
  57. if (_map) {
  58. munmap((void *)_map, _size);
  59. _map = 0;
  60. _size = 0;
  61. }
  62. }
  63. protected:
  64. int _fd;
  65. int _rw;
  66. int _size;
  67. char _path[256];
  68. volatile char *_map;
  69. char _errmsg[256];
  70. };
  71. enum ESyncStatus
  72. {
  73. E_SYNC_PURE = 0x00,
  74. E_SYNC_ROCKSDB_FULL_SYNC_ING = 0x01,
  75. E_SYNC_ROCKSDB_FULL_SYNC_FINISH = 0x02,
  76. E_SYNC_BINLOG_SYNC_ING = 0x03
  77. };
  78. /*
  79. * Async Controller file struct
  80. */
  81. /* FIXME: 必须8 bytes对齐 */
  82. struct CControl {
  83. JournalID jid;
  84. CReaderPos rpos;
  85. CWriterPos wpos;
  86. uint64_t flag; /* dirty flag: 全量同步是否完成 */
  87. };
  88. /*
  89. * Async File Controller
  90. */
  91. class CAsyncFileController : public CMapBase {
  92. public:
  93. CAsyncFileController():CMapBase() {
  94. } virtual ~CAsyncFileController() {
  95. }
  96. inline int Init(const char *path = ASYNC_FILE_CONTROLLER) {
  97. return Mount(path, O_RDWR, ASYNC_FILE_CONTROLLER_SIZE);
  98. }
  99. inline void SwitchWriterPos() {
  100. WriterPos().Shift();
  101. }
  102. inline void SwitchReaderPos() {
  103. ReaderPos().Shift();
  104. }
  105. inline CReaderPos & ReaderPos() {
  106. CControl *p = (CControl *) _map;
  107. return p->rpos;
  108. }
  109. inline CWriterPos & WriterPos() {
  110. CControl *p = (CControl *) _map;
  111. return p->wpos;
  112. }
  113. inline JournalID & JournalId() {
  114. CControl *p = (CControl *) _map;
  115. return p->jid;
  116. }
  117. inline int IsDirty() {
  118. return (E_SYNC_ROCKSDB_FULL_SYNC_ING == DirtyFlag());
  119. }
  120. inline int GetDirty(){
  121. return DirtyFlag();
  122. }
  123. inline void SetDirty(int iState) {
  124. DirtyFlag() = iState;
  125. }
  126. inline void ClrDirty() {
  127. DirtyFlag() = E_SYNC_PURE;
  128. }
  129. private:
  130. /*
  131. * 当前仅用来表示full-sync是否完成
  132. */
  133. inline uint64_t & DirtyFlag() {
  134. CControl *p = (CControl *) _map;
  135. return p->flag;
  136. }
  137. };
  138. /*
  139. * Async File Implementation
  140. */
  141. class CAsyncFileImpl:public CMapBase {
  142. public:
  143. CAsyncFileImpl():CMapBase(), _pos() {
  144. } ~CAsyncFileImpl() {
  145. }
  146. int OpenForReader(CAsyncFilePos &);
  147. int OpenForWriter(CAsyncFilePos &);
  148. int Input(buffer &);
  149. int Output(buffer &);
  150. CAsyncFilePos & CurrentPos() {
  151. return _pos;
  152. }
  153. private:
  154. inline void WriteEndFlag() {
  155. uint32_t *flag = (uint32_t *) ((char *)_map + _pos.offset);
  156. *flag = ASYNC_FILE_END_FLAG;
  157. _pos.Front(4);
  158. return;
  159. }
  160. inline int IsWriterEnd(int len) {
  161. /*
  162. * 四字节长度 + 四字节的结束标志
  163. */
  164. if (_pos.offset + len + 4 + 4 >= (unsigned)Size())
  165. return 1;
  166. return 0;
  167. }
  168. inline int IsReaderEnd() {
  169. uint32_t *flag = (uint32_t *) ((char *)_map + _pos.offset);
  170. if (*flag == ASYNC_FILE_END_FLAG)
  171. return 1;
  172. return 0;
  173. }
  174. inline void FileName(char *s, int len) {
  175. snprintf(s, len, ASYNC_FILE_NAME "%d", _pos.serial);
  176. }
  177. private:
  178. CAsyncFilePos _pos;
  179. };
  180. /*
  181. * 写者
  182. */
  183. class CAsyncFileWriter {
  184. public:
  185. CAsyncFileWriter(int max = ASYNC_WRITER_MAP_FILES): _max(max) {
  186. bzero(_errmsg, sizeof(_errmsg));
  187. }
  188. ~CAsyncFileWriter() {
  189. std::list < CAsyncFileImpl * >::iterator it, p;
  190. for (it = _asyncfiles.begin(); it != _asyncfiles.end();) {
  191. p = it;
  192. ++it;
  193. DELETE(*p);
  194. }
  195. }
  196. int Open(void);
  197. int Write(buffer &);
  198. JournalID & JournalId(void) {
  199. return _controller.JournalId();
  200. }
  201. const char *ErrorMessage(void) {
  202. return _errmsg;
  203. }
  204. private:
  205. inline void AddToList(CAsyncFileImpl * p) {
  206. //控制map文件在一定数量,否则可能会导致磁盘flush
  207. if (_asyncfiles.size() >= (unsigned)_max)
  208. DropLastOne();
  209. _asyncfiles.push_front(p);
  210. }
  211. /*
  212. * unmap writer持有的最老的一个文件
  213. */
  214. inline void DropLastOne() {
  215. CAsyncFileImpl *p = _asyncfiles.back();
  216. DELETE(p);
  217. _asyncfiles.pop_back();
  218. }
  219. private:
  220. std::list < CAsyncFileImpl * >_asyncfiles;
  221. CAsyncFileController _controller;
  222. int _max;
  223. char _errmsg[256];
  224. };
  225. /*
  226. * 读者
  227. */
  228. class CAsyncFileReader {
  229. public:
  230. CAsyncFileReader(): _asyncfile(0), _processing(0) {
  231. bzero(_errmsg, sizeof(_errmsg));
  232. } ~CAsyncFileReader() {
  233. }
  234. int Open();
  235. int Read(buffer &);
  236. void Commit();
  237. const char *ErrorMessage(void) {
  238. return _errmsg;
  239. }
  240. private:
  241. CAsyncFileImpl * _asyncfile;
  242. CAsyncFileController _controller;
  243. char _errmsg[256];
  244. int _processing;
  245. };
  246. /*
  247. * 检查日志合法性
  248. */
  249. class CAsyncFileChecker {
  250. public:
  251. CAsyncFileChecker():_asyncfile(0) {
  252. bzero(_errmsg, sizeof(_errmsg));
  253. } ~CAsyncFileChecker() {
  254. }
  255. int Check();
  256. const char *ErrorMessage() {
  257. return _errmsg;
  258. }
  259. private:
  260. CAsyncFileImpl * _asyncfile;
  261. CAsyncFileController _controller;
  262. char _errmsg[256];
  263. };
  264. #endif