poller.cc 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. #include <sys/types.h>
  2. #include <sys/socket.h>
  3. #include <errno.h>
  4. #include <string.h>
  5. #include <stdio.h>
  6. #include <fcntl.h>
  7. #include <unistd.h>
  8. #include <fcntl.h>
  9. #include "memcheck.h"
  10. #include "poll_thread.h"
  11. #include "poller.h"
  12. #include "log.h"
  13. CPollerObject::~CPollerObject ()
  14. {
  15. if (ownerUnit && epslot)
  16. ownerUnit->FreeEpollSlot (epslot);
  17. if (netfd > 0) {
  18. log4cplus_debug("%d fd been closed!",netfd);
  19. close (netfd);
  20. netfd = 0;
  21. }
  22. if(eventSlot)
  23. {
  24. eventSlot->poller = NULL;
  25. eventSlot = NULL;
  26. }
  27. }
  28. int CPollerObject::AttachPoller (CPollerUnit *unit)
  29. {
  30. if(unit) {
  31. if( ownerUnit==NULL)
  32. ownerUnit = unit;
  33. else
  34. return -1;
  35. }
  36. if(netfd < 0)
  37. return -1;
  38. if(epslot <= 0) {
  39. if (!(epslot = ownerUnit->AllocEpollSlot ()))
  40. return -1;
  41. struct CEpollSlot *slot = ownerUnit->GetSlot(epslot);
  42. slot->poller = this;
  43. int flag = fcntl (netfd, F_GETFL);
  44. fcntl (netfd, F_SETFL, O_NONBLOCK | flag);
  45. struct epoll_event ev;
  46. memset(&ev,0x0,sizeof(ev));
  47. ev.events = newEvents;
  48. slot->seq++;
  49. ev.data.u64 = ((unsigned long long)slot->seq << 32) + epslot;
  50. if (ownerUnit->Epctl (EPOLL_CTL_ADD, netfd, &ev) == 0)
  51. oldEvents = newEvents;
  52. else {
  53. ownerUnit->FreeEpollSlot(epslot);
  54. log4cplus_warning("Epctl: %m");
  55. return -1;
  56. }
  57. return 0;
  58. }
  59. return ApplyEvents ();
  60. }
  61. int CPollerObject::DetachPoller() {
  62. if(epslot) {
  63. struct epoll_event ev;
  64. memset(&ev,0x0,sizeof(ev));
  65. if (ownerUnit->Epctl (EPOLL_CTL_DEL, netfd, &ev) == 0)
  66. oldEvents = newEvents;
  67. else {
  68. log4cplus_warning("Epctl: %m");
  69. return -1;
  70. }
  71. ownerUnit->FreeEpollSlot(epslot);
  72. epslot = 0;
  73. }
  74. return 0;
  75. }
  76. int CPollerObject::ApplyEvents ()
  77. {
  78. if (epslot <= 0 || oldEvents == newEvents)
  79. return 0;
  80. struct epoll_event ev;
  81. memset(&ev,0x0,sizeof(ev));
  82. ev.events = newEvents;
  83. struct CEpollSlot *slot = ownerUnit->GetSlot(epslot);
  84. slot->seq++;
  85. ev.data.u64 = ((unsigned long long)slot->seq << 32) + epslot;
  86. if (ownerUnit->Epctl (EPOLL_CTL_MOD, netfd, &ev) == 0)
  87. oldEvents = newEvents;
  88. else {
  89. log4cplus_warning("Epctl: %m");
  90. return -1;
  91. }
  92. return 0;
  93. }
  94. int CPollerObject::DelayApplyEvents ()
  95. {
  96. if (epslot <= 0 || oldEvents == newEvents)
  97. return 0;
  98. if(eventSlot)
  99. return 0;
  100. eventSlot = ownerUnit->AddDelayEventPoller(this);
  101. if(eventSlot == NULL)
  102. {
  103. log4cplus_error("max events!!!!!!");
  104. struct epoll_event ev;
  105. ev.events = newEvents;
  106. struct CEpollSlot *slot = ownerUnit->GetSlot(epslot);
  107. slot->seq++;
  108. ev.data.u64 = ((unsigned long long)slot->seq << 32) + epslot;
  109. if (ownerUnit->Epctl (EPOLL_CTL_MOD, netfd, &ev) == 0)
  110. oldEvents = newEvents;
  111. else {
  112. log4cplus_warning("Epctl: %m");
  113. return -1;
  114. }
  115. }
  116. return 0;
  117. }
  118. int CPollerObject::CheckLinkStatus(void)
  119. {
  120. char msg[1] = {0};
  121. int err = 0;
  122. err = recv(netfd, msg, sizeof(msg), MSG_DONTWAIT|MSG_PEEK);
  123. /* client already close connection. */
  124. if(err == 0 || (err < 0 && errno != EAGAIN))
  125. return -1;
  126. return 0;
  127. }
  128. void CPollerObject::InitPollFd(struct pollfd *pfd)
  129. {
  130. pfd->fd = netfd;
  131. pfd->events = newEvents;
  132. pfd->revents = 0;
  133. }
  134. void CPollerObject::InputNotify(void) {
  135. EnableInput(false);
  136. }
  137. void CPollerObject::OutputNotify(void) {
  138. EnableOutput(false);
  139. }
  140. int CPollerUnit::totalEventSlot = 40960;
  141. void CPollerObject::HangupNotify(void) {
  142. delete this;
  143. }
  144. CPollerUnit::CPollerUnit(int mp)
  145. {
  146. maxPollers = mp;
  147. eeSize = maxPollers > 1024 ? 1024 : maxPollers;
  148. epfd = -1;
  149. ep_events = NULL;
  150. pollerTable = NULL;
  151. freeSlotList = 0;
  152. usedPollers = 0;
  153. //not initailize eventCnt variable may crash, fix crash bug by linjinming 2014-05-18
  154. eventCnt = 0;
  155. }
  156. CPollerUnit::~CPollerUnit() {
  157. // skip first one
  158. for (int i = 1; i < maxPollers; i++)
  159. {
  160. if (pollerTable[i].freeList)
  161. continue;
  162. //delete pollerTable[i].poller;
  163. }
  164. FREE_CLEAR(pollerTable);
  165. if (epfd != -1)
  166. {
  167. close (epfd);
  168. epfd = -1;
  169. }
  170. FREE_CLEAR(ep_events);
  171. }
  172. int CPollerUnit::SetMaxPollers(int mp)
  173. {
  174. if(epfd >= 0)
  175. return -1;
  176. maxPollers = mp;
  177. return 0;
  178. }
  179. int CPollerUnit::InitializePollerUnit(void)
  180. {
  181. pollerTable = (struct CEpollSlot *)CALLOC(maxPollers, sizeof (*pollerTable));
  182. if (!pollerTable)
  183. {
  184. log4cplus_error("calloc failed, num=%d, %m", maxPollers);
  185. return -1;
  186. }
  187. // already zero-ed
  188. for (int i = 1; i < maxPollers - 1; i++)
  189. {
  190. pollerTable[i].freeList = i+1;
  191. }
  192. pollerTable[maxPollers - 1].freeList = 0;
  193. freeSlotList = 1;
  194. ep_events = (struct epoll_event *)CALLOC(eeSize, sizeof (struct epoll_event));
  195. if (!ep_events)
  196. {
  197. log4cplus_error("malloc failed, %m");
  198. return -1;
  199. }
  200. if ((epfd = epoll_create (maxPollers)) == -1)
  201. {
  202. log4cplus_warning("epoll_create failed, %m");
  203. return -1;
  204. }
  205. fcntl(epfd, F_SETFD, FD_CLOEXEC);
  206. return 0;
  207. }
  208. inline int CPollerUnit::VerifyEvents (struct epoll_event *ev)
  209. {
  210. int idx = EPOLL_DATA_SLOT (ev);
  211. if ((idx >= maxPollers) || (EPOLL_DATA_SEQ (ev) != pollerTable[idx].seq))
  212. {
  213. return -1;
  214. }
  215. if(pollerTable[idx].poller == NULL || pollerTable[idx].freeList != 0)
  216. {
  217. log4cplus_info("receive invalid epoll event. idx=%d seq=%d poller=%p freelist=%d event=%x",
  218. idx, (int)EPOLL_DATA_SEQ(ev), pollerTable[idx].poller,
  219. pollerTable[idx].freeList, ev->events);
  220. return -1;
  221. }
  222. return 0;
  223. }
  224. void CPollerUnit::FreeEpollSlot (int n)
  225. {
  226. if(n <= 0) return;
  227. pollerTable[n].freeList = freeSlotList;
  228. freeSlotList = n;
  229. usedPollers--;
  230. pollerTable[n].seq++;
  231. pollerTable[n].poller = NULL;
  232. }
  233. int CPollerUnit::AllocEpollSlot ()
  234. {
  235. if (0 == freeSlotList)
  236. {
  237. log4cplus_error("no free epoll slot, usedPollers = %d", usedPollers);
  238. return -1;
  239. }
  240. int n = freeSlotList;
  241. usedPollers++;
  242. freeSlotList = pollerTable[n].freeList;
  243. pollerTable[n].freeList = 0;
  244. return n;
  245. }
  246. int CPollerUnit::Epctl (int op, int fd, struct epoll_event *events)
  247. {
  248. if (epoll_ctl (epfd, op, fd, events) == -1)
  249. {
  250. log4cplus_warning("epoll_ctl error, epfd=%d, fd=%d", epfd, fd);
  251. return -1;
  252. }
  253. return 0;
  254. }
  255. int CPollerUnit::WaitPollerEvents(int timeout) {
  256. nrEvents = epoll_wait (epfd, ep_events, eeSize, timeout);
  257. return nrEvents;
  258. }
  259. void CPollerUnit::ProcessPollerEvents(void) {
  260. for (int i = 0; i < nrEvents; i++)
  261. {
  262. if(VerifyEvents (ep_events+i) == -1)
  263. {
  264. log4cplus_info("VerifyEvents failed, ep_events[%d].data.u64 = %llu", i, (unsigned long long)ep_events[i].data.u64);
  265. continue;
  266. }
  267. CEpollSlot *s = &pollerTable[EPOLL_DATA_SLOT(ep_events+i)];
  268. CPollerObject *p = s->poller;
  269. p->newEvents = p->oldEvents;
  270. if(ep_events[i].events & (EPOLLHUP | EPOLLERR))
  271. {
  272. p->HangupNotify();
  273. continue;
  274. }
  275. if(ep_events[i].events & EPOLLIN)
  276. p->InputNotify();
  277. s = &pollerTable[EPOLL_DATA_SLOT(ep_events+i)];
  278. if(s->poller==p && ep_events[i].events & EPOLLOUT)
  279. p->OutputNotify();
  280. s = &pollerTable[EPOLL_DATA_SLOT(ep_events+i)];
  281. if(s->poller==p)
  282. p->DelayApplyEvents();
  283. }
  284. }
  285. int CPollerUnit::DelayApplyEvents()
  286. {
  287. for(int i = 0; i < eventCnt; i++)
  288. {
  289. CPollerObject * p = eventSlot[i].poller;
  290. if(p)
  291. {
  292. p->ApplyEvents();
  293. eventSlot[i].poller = NULL;
  294. p->ClearDelayApplyEvents();
  295. }
  296. }
  297. eventCnt = 0;
  298. return 0;
  299. }