test_communicator.cpp 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. 
  2. #include "hello_test.h"
  3. #include "servant/CommunicatorEpoll.h"
  4. TEST_F(HelloTest, testNotifyCommunicatorEpoll)
  5. {
  6. shared_ptr<Communicator> c = getCommunicator();
  7. int netThread = TC_Common::strto<int>(c->getProperty("netthread"));
  8. HelloServer server;
  9. startServer(server, (TC_EpollServer::SERVER_OPEN_COROUTINE) 0);
  10. auto v1 = c->getAllCommunicatorEpoll();
  11. ASSERT_TRUE(v1.size() == 0);
  12. HelloPrx prx = getObj<HelloPrx>(c.get(), "HelloAdapter");
  13. std::thread th1([&]()
  14. { checkSyncOnce(prx); });
  15. TC_Common::msleep(100);
  16. auto v2 = c->getAllCommunicatorEpoll();
  17. //两个公有通信器
  18. ASSERT_TRUE(v2.size() == (size_t)netThread);
  19. std::thread th2([&]()
  20. { checkSyncOnce(prx); });
  21. TC_Common::msleep(100);
  22. auto v3 = c->getAllCommunicatorEpoll();
  23. //通信器个数不变
  24. ASSERT_TRUE(v3.size() == (size_t)netThread);
  25. funcInCoroutine([&]()
  26. {
  27. checkSyncOnce(prx);
  28. auto v4 = c->getAllCommunicatorEpoll();
  29. //为设置协程的情况下, 仍然是两个通信器
  30. ASSERT_TRUE(v4.size() == (size_t)netThread);
  31. },
  32. false);
  33. funcInCoroutine([&]()
  34. {
  35. checkSyncOnce(prx);
  36. auto v5 = c->getAllCommunicatorEpoll();
  37. //设置了协程, 会增加一个私有协程网络通信器
  38. ASSERT_TRUE(v5.size() == (size_t)(netThread + 1));
  39. },
  40. true);
  41. auto v6 = c->getAllCommunicatorEpoll();
  42. //之前的业务线程释放了, 会减少一个私有协程网络通信器
  43. ASSERT_TRUE(v6.size() == (size_t)netThread);
  44. funcInCoroutine([&]()
  45. {
  46. checkASyncOnce(prx);
  47. //设置协程, 会增加一个私有网路通信器, 同时异步回调在私有网络通信器中
  48. auto v7 = c->getAllCommunicatorEpoll();
  49. ASSERT_TRUE(v7.size() == (size_t)(netThread + 1));
  50. });
  51. th1.join();
  52. th2.join();
  53. stopServer(server);
  54. }
  55. int getNotify(shared_ptr<Communicator> c)
  56. {
  57. if(c->getCommunicatorEpollNum() <= 0)
  58. {
  59. return 0;
  60. }
  61. auto n = c->getCommunicatorEpoll(0)->getNotify();
  62. int sum = 0;
  63. for(size_t i = 0; i< MAX_CLIENT_NOTIFYEVENT_NUM; i++)
  64. {
  65. if(n[i] != NULL)
  66. {
  67. ++sum;
  68. }
  69. }
  70. return sum;
  71. }
  72. //测试notify句柄不泄露!
  73. TEST_F(HelloTest, testNotifyBussThreadQuitOnce)
  74. {
  75. shared_ptr<Communicator> c = getCommunicator();
  76. HelloPrx prx = getObj<HelloPrx>(c.get(), "HelloAdapter");
  77. HelloServer server;
  78. startServer(server, (TC_EpollServer::SERVER_OPEN_COROUTINE) 0);
  79. //先把服务器端的强制都创建好
  80. for (int i = 0; i < 20; i++)
  81. {
  82. checkSyncOnce(prx);
  83. }
  84. for (int i = 0; i < 3; i++)
  85. {
  86. size_t sp_count = ServantProxyThreadData::g_immortal->getList().size();
  87. size_t notify_count = getNotify(c);
  88. {
  89. std::thread th1([&]()
  90. { checkSyncOnce(prx); });
  91. th1.join();
  92. }
  93. TC_Common::msleep(10);
  94. //私有线程数据释放了, 通知句柄数不增加
  95. ASSERT_TRUE(ServantProxyThreadData::g_immortal->getList().size() == sp_count);
  96. ASSERT_TRUE(getNotify(c) == (int)notify_count);
  97. }
  98. stopServer(server);
  99. }
  100. //测试notify句柄不泄露!
  101. TEST_F(HelloTest, testNotifyBussThreadQuit)
  102. {
  103. shared_ptr<Communicator> c = getCommunicator();
  104. HelloPrx prx = getObj<HelloPrx>(c.get(), "HelloAdapter");
  105. HelloServer server;
  106. startServer(server, (TC_EpollServer::SERVER_OPEN_COROUTINE) 0);
  107. // for(int i = 0; i < 5000; i++)
  108. for(int i = 0; i < 1000; i++)
  109. {
  110. {
  111. std::thread th1([&]()
  112. {
  113. checkSyncOnce(prx);
  114. });
  115. th1.join();
  116. }
  117. TC_Common::msleep(10);
  118. int num = getNotify(c);
  119. ASSERT_TRUE(num < 5);
  120. }
  121. stopServer(server);
  122. }
  123. //
  124. //TEST_F(HelloTest, testNotifyCommunicatorQuit)
  125. //{
  126. // HelloServer server;
  127. // startServer(server, (TC_EpollServer::SERVER_OPEN_COROUTINE) 0);
  128. //
  129. // for(int i = 0; i < 1000; i++)
  130. // {
  131. // shared_ptr<Communicator> c = getCommunicator();
  132. // HelloPrx prx = getObj<HelloPrx>(c.get(), "HelloAdapter");
  133. //
  134. // {
  135. //
  136. // std::thread th1([&]()
  137. // {
  138. // try
  139. // {
  140. // checkSyncOnce(prx);
  141. // }
  142. // catch(exception &ex)
  143. // {
  144. // LOG_CONSOLE_DEBUG << i << ", " << ex.what() << endl;
  145. // }
  146. //
  147. // c.reset();
  148. // });
  149. //
  150. // th1.join();
  151. // }
  152. //
  153. // }
  154. //
  155. // stopServer(server);
  156. //
  157. //}
  158. TEST_F(HelloTest, testNotifyProxyQuit)
  159. {
  160. HelloServer server;
  161. startServer(server, (TC_EpollServer::SERVER_OPEN_COROUTINE)0);
  162. shared_ptr<Communicator> c = getCommunicator();
  163. HelloPrx prx = getObj<HelloPrx>(c.get(), "HelloAdapter");
  164. int count = 10;
  165. while (count-- > 0)
  166. {
  167. vector<std::thread*> vt;
  168. for (int i = 0; i < 1; i++)
  169. {
  170. vt.push_back(new std::thread([&]()
  171. {
  172. checkSyncOnce(prx);
  173. }));
  174. }
  175. for (auto v: vt)
  176. {
  177. v->join();
  178. delete v;
  179. }
  180. // TC_Common::sleep(3);
  181. }
  182. stopServer(server);
  183. }
  184. TEST_F(HelloTest, testCommunicatorGetResourcesInfo)
  185. {
  186. HelloServer server;
  187. startServer(server, (TC_EpollServer::SERVER_OPEN_COROUTINE)0);
  188. shared_ptr<Communicator> c = getCommunicator();
  189. HelloPrx prx = getObj<HelloPrx>(c.get(), "HelloAdapter");
  190. int count = 10;
  191. while (count-- > 0)
  192. {
  193. vector<std::thread*> vt;
  194. for (int i = 0; i < 1; i++)
  195. {
  196. vt.push_back(new std::thread([&]()
  197. {
  198. checkSyncOnce(prx);
  199. }));
  200. }
  201. for (auto v: vt)
  202. {
  203. v->join();
  204. delete v;
  205. }
  206. }
  207. string buf = c->getResourcesInfo();
  208. LOG_CONSOLE_DEBUG << buf << endl;
  209. stopServer(server);
  210. }
  211. #if 0
  212. TEST_F(HelloTest, testNotifyCtrlC)
  213. {
  214. std::thread th([](){
  215. TC_Common::sleep(2);
  216. kill(getpid(), SIGINT);
  217. });
  218. HelloServer server;
  219. startServer(server, (TC_EpollServer::SERVER_OPEN_COROUTINE) 0);
  220. HelloPrx prx = getObj<HelloPrx>(server.getCommunicator(), "HelloAdapter");
  221. for(int i = 0; i < 10000; i++)
  222. {
  223. checkSyncOnce(prx);
  224. }
  225. stopServer(server);
  226. }
  227. TEST_F(HelloTest, testNotifyCtrlCGlobalCommunicator)
  228. {
  229. std::thread th([](){
  230. TC_Common::sleep(2);
  231. kill(getpid(), SIGINT);
  232. });
  233. shared_ptr<Communicator> c = std::make_shared<Communicator>();
  234. HelloServer server;
  235. startServer(server, (TC_EpollServer::SERVER_OPEN_COROUTINE) 0);
  236. for(int i = 0; i < 10000; i++)
  237. {
  238. checkSync(c.get());
  239. }
  240. stopServer(server);
  241. }
  242. #endif