async_file.cc 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. #include "async_file.h"
  2. #include <sys/types.h>
  3. #include <sys/stat.h>
  4. #include <fcntl.h>
  5. #include <errno.h>
  6. ////////////////CMapBase
  7. CMapBase::CMapBase():
  8. _fd(-1), _rw(0), _size(0), _map(0)
  9. {
  10. bzero(_path, sizeof(_path));
  11. bzero(_errmsg, sizeof(_errmsg));
  12. }
  13. CMapBase::~CMapBase()
  14. {
  15. unmount();
  16. }
  17. void CMapBase::unmount()
  18. {
  19. close_fd();
  20. unmap();
  21. }
  22. void CMapBase::Unlink()
  23. {
  24. /*
  25. * 读完后,truncate文件为0并删除,防止启动磁盘flush
  26. */
  27. if (_rw == O_RDONLY) {
  28. //int unused;
  29. //unused = ftruncate(_fd, 0);
  30. //unused = unlink(_path);
  31. }
  32. }
  33. int CMapBase::Mount(const char *path, int rw, int size)
  34. {
  35. mkdir(ASYNC_FILE_PATH, 0777);
  36. if (access(ASYNC_FILE_PATH, W_OK | X_OK) < 0) {
  37. snprintf(_errmsg, sizeof(_errmsg), "dir(%s) Not writable",
  38. ASYNC_FILE_PATH);
  39. return -1;
  40. }
  41. int flag = rw;
  42. int prot = 0;
  43. switch (flag) {
  44. case O_WRONLY:
  45. case O_RDWR:
  46. default:
  47. flag = O_CREAT | O_RDWR;
  48. prot = PROT_WRITE | PROT_READ;
  49. break;
  50. case O_RDONLY:
  51. prot = PROT_READ;
  52. break;
  53. }
  54. if ((_fd = open(path, flag, 0644)) < 0) {
  55. snprintf(_errmsg, sizeof(_errmsg),
  56. "open failed[path:%s], errno:%d %m", path, errno);
  57. return -1;
  58. }
  59. struct stat st;
  60. fstat(_fd, &st);
  61. if (O_RDONLY == rw) {
  62. size = st.st_size;
  63. } else if (st.st_size != size) {
  64. if (ftruncate(_fd, size) != 0) {
  65. return -1;
  66. }
  67. }
  68. _map = (char *)mmap(0, size, prot, MAP_SHARED, _fd, 0);
  69. if (MAP_FAILED == _map) {
  70. snprintf(_errmsg, sizeof(_errmsg),
  71. "map failed[path:%s, size:%d, _fd:%d], errno:%d %m\n",
  72. path, size, _fd, errno);
  73. return -1;
  74. }
  75. _rw = rw;
  76. _size = size;
  77. snprintf(_path, sizeof(_path), "%s", path);
  78. return 0;
  79. }
  80. //////////////////FileImpl
  81. int CAsyncFileImpl::OpenForWriter(CAsyncFilePos & pos)
  82. {
  83. _pos = pos;
  84. char path[256] = { 0 };
  85. FileName(path, sizeof(path));
  86. return Mount(path, O_WRONLY, MAX_ASYNC_FILE_SIZE);
  87. }
  88. int CAsyncFileImpl::OpenForReader(CAsyncFilePos & pos)
  89. {
  90. _pos = pos;
  91. char path[256] = { 0 };
  92. FileName(path, sizeof(path));
  93. return Mount(path, O_RDONLY);
  94. }
  95. int CAsyncFileImpl::Input(buffer & buff)
  96. {
  97. if (IsWriterEnd(buff.size())) {
  98. WriteEndFlag();
  99. return CHBGlobal::ASYNC_NEED_SWTICH_FILE;
  100. }
  101. //len
  102. uint32_t *len = (uint32_t *) ((char *)_map + _pos.offset);
  103. *len = buff.size();
  104. _pos.Front(4);
  105. //buff
  106. memcpy((char *)_map + _pos.offset, buff.c_str(), *len);
  107. _pos.Front(*len);
  108. return CHBGlobal::ASYNC_PROCESS_OK;
  109. }
  110. int CAsyncFileImpl::Output(buffer & buff)
  111. {
  112. if (IsReaderEnd())
  113. return CHBGlobal::ASYNC_NEED_SWTICH_FILE;
  114. //len
  115. uint32_t *len = (uint32_t *) ((char *)_map + _pos.offset);
  116. _pos.Front(4);
  117. //buff
  118. buff.append((char *)_map + _pos.offset, *len);
  119. _pos.Front(*len);
  120. return CHBGlobal::ASYNC_PROCESS_OK;
  121. }
  122. //////////////////Writer /////////////////////////
  123. int CAsyncFileWriter::Open()
  124. {
  125. if (_controller.Init()) {
  126. snprintf(_errmsg, sizeof(_errmsg), "controller init failed, %s",
  127. _controller.ErrorMessage());
  128. return CHBGlobal::ASYNC_PROCESS_ERR;
  129. }
  130. CAsyncFileImpl *p = new CAsyncFileImpl();
  131. if (p->OpenForWriter(_controller.WriterPos())) {
  132. snprintf(_errmsg, sizeof(_errmsg), "open for writer failed, %s",
  133. p->ErrorMessage());
  134. return CHBGlobal::ASYNC_PROCESS_ERR;
  135. }
  136. /*
  137. * 加入writer的map文件列表
  138. */
  139. AddToList(p);
  140. return CHBGlobal::ASYNC_PROCESS_OK;
  141. }
  142. int CAsyncFileWriter::Write(buffer & buf)
  143. {
  144. if (_asyncfiles.size() <= 0) {
  145. snprintf(_errmsg, sizeof(_errmsg),
  146. "__BUG__, writer maps is zero");
  147. return CHBGlobal::ERR_ASYNC_WRITER_LOGIC;
  148. }
  149. //从缓冲中取一个文件来写
  150. int ret = _asyncfiles.front()->Input(buf);
  151. switch (ret) {
  152. case CHBGlobal::ASYNC_PROCESS_OK:
  153. {
  154. //成功写入 更新控制文件写指针
  155. _controller.WriterPos() =
  156. _asyncfiles.front()->CurrentPos();
  157. break;
  158. }
  159. case CHBGlobal::ASYNC_NEED_SWTICH_FILE:
  160. {
  161. //已经写满,需要切换, 先准备下一个文件,再切换。
  162. CAsyncFilePos pos = _controller.WriterPos();
  163. pos.Shift();
  164. CAsyncFileImpl *p = new CAsyncFileImpl();
  165. if (p->OpenForWriter(pos)) {
  166. snprintf(_errmsg, sizeof(_errmsg),
  167. "create CAsyncFileImpl failed, %s",
  168. p->ErrorMessage());
  169. DELETE(p);
  170. return CHBGlobal::ERR_ASYNC_SWTICH_FILE_ERR;
  171. }
  172. //加入缓冲
  173. AddToList(p);
  174. //更新控制文件
  175. _controller.SwitchWriterPos();
  176. //继续写
  177. return Write(buf);
  178. break;
  179. }
  180. //no default.
  181. }
  182. return CHBGlobal::ASYNC_PROCESS_OK;
  183. }
  184. /////////////////////////Reader ////////////////////
  185. int CAsyncFileReader::Open()
  186. {
  187. if (_controller.Init()) {
  188. snprintf(_errmsg, sizeof(_errmsg), "controller init failed, %s",
  189. _controller.ErrorMessage());
  190. return CHBGlobal::ASYNC_PROCESS_ERR;
  191. }
  192. _asyncfile = new CAsyncFileImpl();
  193. if (_asyncfile->OpenForReader(_controller.ReaderPos())) {
  194. snprintf(_errmsg, sizeof(_errmsg), "open for reader failed, %s",
  195. _asyncfile->ErrorMessage());
  196. return CHBGlobal::ASYNC_PROCESS_ERR;
  197. }
  198. return CHBGlobal::ASYNC_PROCESS_OK;
  199. }
  200. inline void CAsyncFileReader::Commit(void) {
  201. if(_processing) {
  202. _controller.ReaderPos() = _asyncfile->CurrentPos();
  203. _processing = 0;
  204. }
  205. }
  206. int CAsyncFileReader::Read(buffer & buff)
  207. {
  208. if(!_asyncfile){
  209. snprintf(_errmsg, sizeof(_errmsg), "reader encounter logic error");
  210. return CHBGlobal::ERR_ASYNC_READER_LOGIC;
  211. }
  212. Commit();
  213. /* 判断是否出现切文件暂态,如果出现则sespend读者 */
  214. if(_controller.ReaderPos().IsTransient(_controller.WriterPos())){
  215. return CHBGlobal::ASYNC_READER_WAIT_DATA;
  216. }
  217. /* 暂时没有更多数据 */
  218. if (_controller.ReaderPos().EQ(_controller.WriterPos()))
  219. return CHBGlobal::ASYNC_READER_WAIT_DATA;
  220. /* ERROR */
  221. if( _controller.ReaderPos().GT(_controller.WriterPos())) {
  222. /* 害怕再次遇到切换暂态 */
  223. usleep(1000);
  224. if( _controller.ReaderPos().GT(_controller.WriterPos())) {
  225. snprintf(_errmsg, sizeof(_errmsg), "reader pos is overflow");
  226. return CHBGlobal::ERR_ASYNC_READER_OVERFLOW;
  227. }
  228. }
  229. int ret = _asyncfile->Output(buff);
  230. switch (ret) {
  231. case CHBGlobal::ASYNC_PROCESS_OK:
  232. {
  233. // mark as processing, delay commit
  234. _processing = 1;
  235. //更新控制文件读指针
  236. //_controller.ReaderPos() = _asyncfile->CurrentPos();
  237. break;
  238. }
  239. case CHBGlobal::ASYNC_NEED_SWTICH_FILE:
  240. {
  241. CAsyncFilePos pos = _controller.ReaderPos();
  242. pos.Shift();
  243. //delete file
  244. _asyncfile->Unlink();
  245. DELETE(_asyncfile);
  246. _asyncfile = new CAsyncFileImpl();
  247. if (_asyncfile->OpenForReader(pos)) {
  248. snprintf(_errmsg, sizeof(_errmsg),
  249. "create CAsyncFileImpl failed, %s",
  250. _asyncfile->ErrorMessage());
  251. DELETE(_asyncfile);
  252. return CHBGlobal::ERR_ASYNC_SWTICH_FILE_ERR;
  253. }
  254. _controller.SwitchReaderPos();
  255. return Read(buff);
  256. break;
  257. }
  258. }
  259. return CHBGlobal::ASYNC_PROCESS_OK;
  260. }
  261. int CAsyncFileChecker::Check()
  262. {
  263. if (_controller.Init()) {
  264. snprintf(_errmsg, sizeof(_errmsg), "controller init failed");
  265. return CHBGlobal::ERR_ASYNC_CONTROLLER_ERR;
  266. }
  267. if (_controller.IsDirty()) {
  268. snprintf(_errmsg, sizeof(_errmsg),
  269. "full sync is not complete, status is dirty");
  270. return CHBGlobal::ERR_FULL_SYNC_NOT_COMPLETE;
  271. }
  272. /* 检查reader是否比write快 */
  273. if (_controller.ReaderPos().GT(_controller.WriterPos())) {
  274. snprintf(_errmsg, sizeof(_errmsg), "reader pos is overflow");
  275. return CHBGlobal::ERR_ASYNC_READER_OVERFLOW;
  276. }
  277. /*检查reader的有效性 */
  278. if (!_controller.ReaderPos().Zero()) {
  279. _asyncfile = new CAsyncFileImpl;
  280. if (_asyncfile->OpenForReader(_controller.ReaderPos())) {
  281. snprintf(_errmsg, sizeof(_errmsg),
  282. "reader pos is error");
  283. return CHBGlobal::ERR_ASYNC_READER_POS_ERR;
  284. }
  285. if (_controller.ReaderPos().offset >
  286. (unsigned)_asyncfile->Size() + 4) {
  287. snprintf(_errmsg, sizeof(_errmsg),
  288. "reader pos is error");
  289. return CHBGlobal::ERR_ASYNC_READER_POS_ERR;
  290. }
  291. }
  292. DELETE(_asyncfile);
  293. /*检查writer有效性 */
  294. if (!_controller.WriterPos().Zero()) {
  295. _asyncfile = new CAsyncFileImpl;
  296. if (_asyncfile->OpenForWriter(_controller.WriterPos())) {
  297. snprintf(_errmsg, sizeof(_errmsg),
  298. "writer pos is error");
  299. return CHBGlobal::ERR_ASYNC_WRITER_POS_ERR;
  300. }
  301. if (_controller.WriterPos().offset >
  302. (unsigned)_asyncfile->Size() + 4) {
  303. snprintf(_errmsg, sizeof(_errmsg),
  304. "writer pos is error");
  305. return CHBGlobal::ERR_ASYNC_WRITER_POS_ERR;
  306. }
  307. }
  308. DELETE(_asyncfile);
  309. return 0;
  310. }
  311. /*
  312. * 测试代码
  313. */
  314. #ifdef __UNIT_TEST__
  315. #include <stdio.h>
  316. #include <sys/types.h>
  317. #include <sys/wait.h>
  318. int W()
  319. {
  320. buffer buff;
  321. buff.append("just for test");
  322. CAsyncFileWriter writer(3);
  323. if (writer.Open()) {
  324. printf("writer Open failed\n");
  325. return -1;
  326. }
  327. while (1) {
  328. writer.Write(buff);
  329. //usleep(1000);
  330. }
  331. return 0;
  332. }
  333. int R()
  334. {
  335. CAsyncFileReader reader;
  336. if (reader.Open()) {
  337. printf("reader Open failed\n");
  338. return -1;
  339. }
  340. while (1) {
  341. buffer buff;
  342. int ret = reader.Read(buff);
  343. if (ret == CHBGlobal::ASYNC_READER_WAIT_DATA) {
  344. printf("no data. sleep 1 \n");
  345. sleep(1);
  346. } else {
  347. printf("error, \n");
  348. exit(0);
  349. }
  350. // printf("buff.len=%d, buff.content=%9s\n", buff.size(), buff.c_str());
  351. }
  352. return 0;
  353. }
  354. int main()
  355. {
  356. int ret = 0;
  357. CAsyncFileChecker check;
  358. if (ret = check.Check()) {
  359. printf("check not pass, ret=%d\n", ret);
  360. return -1;
  361. }
  362. int pid = fork();
  363. if (pid < 0) {
  364. printf("fork child failed: %m\n");
  365. return -1;
  366. } else if (pid > 0) {
  367. nice(5);
  368. W();
  369. } else {
  370. //先让writer跑起来, 否则没有文件,reader会出错
  371. sleep(3);
  372. R();
  373. }
  374. return 0;
  375. }
  376. #endif