test_registry.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. 
  2. #include "hello_test.h"
  3. #include "servant/CommunicatorEpoll.h"
  4. #include "servant/ObjectProxy.h"
  5. #include "server/framework/DbHandle.h"
  6. #include "server/FrameworkServer.h"
  7. #include "QueryF.h"
  8. TEST_F(HelloTest, registryQuery)
  9. {
  10. shared_ptr<Communicator> c = getCommunicator();
  11. TC_Config conf = CLIENT_CONFIG();
  12. // conf.parseString(CLIENT_CONFIG);
  13. c->setProperty(conf);
  14. TC_Config fconf = FRAMEWORK_CONFIG();
  15. CDbHandle::cleanEndPoint();
  16. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9990, 1);
  17. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9991, 1);
  18. FrameworkServer fs;
  19. startServer(fs, FRAMEWORK_CONFIG());
  20. string obj = getObj(fconf, "RegistryAdapter");
  21. QueryFPrx qPrx = c->stringToProxy<QueryFPrx>(obj);
  22. {
  23. std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
  24. ASSERT_TRUE(endpoints.size() == 2);
  25. ASSERT_TRUE(endpoints[0].port == 9990);
  26. ASSERT_TRUE(endpoints[1].port == 9991);
  27. }
  28. {
  29. LOG_CONSOLE_DEBUG << "add TestApp.RpcServer.HelloObj 9992" << endl;
  30. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  31. TC_Common::sleep(6);
  32. qPrx->getEndpoint();
  33. TC_Common::msleep(100);
  34. {
  35. std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
  36. ASSERT_TRUE(endpoints.size() == 3);
  37. ASSERT_TRUE(endpoints[0].port == 9990);
  38. ASSERT_TRUE(endpoints[1].port == 9991);
  39. ASSERT_TRUE(endpoints[2].port == 9992);
  40. }
  41. }
  42. {
  43. LOG_CONSOLE_DEBUG << "add TestApp.RpcServer.HelloObj 9993" << endl;
  44. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9993, 1);
  45. TC_Common::sleep(6);
  46. qPrx->getEndpoint();
  47. TC_Common::msleep(100);
  48. {
  49. std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
  50. ASSERT_TRUE(endpoints.size() == 4);
  51. ASSERT_TRUE(endpoints[0].port == 9990);
  52. ASSERT_TRUE(endpoints[1].port == 9991);
  53. ASSERT_TRUE(endpoints[2].port == 9992);
  54. ASSERT_TRUE(endpoints[3].port == 9993);
  55. }
  56. }
  57. stopServer(fs);
  58. }
  59. #define START_FRAMEWORK_SERVER_1_2 \
  60. CDbHandle::cleanEndPoint(); \
  61. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9990, 1); \
  62. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9991, 1); \
  63. FrameworkServer fs; \
  64. startServer(fs, FRAMEWORK_CONFIG()); \
  65. RpcServer rpc1Server; \
  66. startServer(rpc1Server, RPC1_CONFIG()); \
  67. RpcServer rpc2Server; \
  68. startServer(rpc2Server, RPC2_CONFIG());
  69. #define START_FRAMEWORK_HTTP_SERVER_1_2 \
  70. CDbHandle::cleanEndPoint(); \
  71. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8180, 1); \
  72. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8181, 1); \
  73. FrameworkServer fs; \
  74. startServer(fs, FRAMEWORK_CONFIG()); \
  75. RpcServer rpc1Server; \
  76. startServer(rpc1Server, RPC1_CONFIG()); \
  77. RpcServer rpc2Server; \
  78. startServer(rpc2Server, RPC2_CONFIG());
  79. #define STOP_FRAMEWORK_SERVER stopServer(fs);
  80. #define HELLO_CALL {\
  81. HelloPrx qPrx = comm->stringToProxy<HelloPrx>("TestApp.RpcServer.HelloObj"); \
  82. int ret; \
  83. string out; \
  84. ret = qPrx->testHello(0, _buffer, out); \
  85. ASSERT_TRUE(ret == 0); \
  86. ASSERT_TRUE(out == _buffer); }
  87. #define CHECK_REGISTRY_UPDATE {\
  88. stopServer(rpc1Server); \
  89. CDbHandle::cleanEndPoint(); \
  90. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1); \
  91. RpcServer rpc3Server; \
  92. startServer(rpc3Server, RPC3_CONFIG()); \
  93. wait(6000); \
  94. HelloPrx qPrx = comm->stringToProxy<HelloPrx>("TestApp.RpcServer.HelloObj"); \
  95. string out = ""; \
  96. int ret = qPrx->testHello(0, _buffer, out); \
  97. ASSERT_TRUE(ret == 0); \
  98. ASSERT_TRUE(out == _buffer); \
  99. stopServer(rpc2Server); \
  100. wait(100); \
  101. out = ""; \
  102. ret = qPrx->testHello(0, _buffer, out); \
  103. ASSERT_TRUE(ret == 0); \
  104. ASSERT_TRUE(out == _buffer); \
  105. stopServer(rpc3Server); }
  106. #define CREATE_COR \
  107. { \
  108. auto scheduler = TC_CoroutineScheduler::create(); \
  109. ServantProxyThreadData::getData()->_sched = scheduler; \
  110. scheduler->go([&]() \
  111. { \
  112. { HELLO_CALL; wait(6500); HELLO_CALL } \
  113. }); \
  114. scheduler->run(); \
  115. }
  116. #define HTTP_CALL {\
  117. ServantPrx prx = comm->stringToProxy<ServantPrx>("TestApp.RpcServer.HttpObj"); \
  118. prx->tars_set_protocol(ServantProxy::PROTOCOL_HTTP1, 5); \
  119. string buff = _buffer + "-" + TC_Common::tostr(TC_Thread::CURRENT_THREADID()); \
  120. shared_ptr<TC_HttpRequest> req = std::make_shared<TC_HttpRequest>(); \
  121. req->setPostRequest("http://tars.com/hello", buff, true); \
  122. req->setHeader("Connection", "close"); \
  123. shared_ptr<TC_HttpResponse> rsp; \
  124. int count = 10; \
  125. while(count-- > 0) { prx->http_call("hello", req, rsp); } }
  126. #define HTTP_CREATE_COR \
  127. { \
  128. auto scheduler = TC_CoroutineScheduler::create(); \
  129. ServantProxyThreadData::getData()->_sched = scheduler; \
  130. scheduler->go([&]() \
  131. { \
  132. { HTTP_CALL; wait(6200); HTTP_CALL } \
  133. }); \
  134. scheduler->run(); \
  135. }
  136. TEST_F(HelloTest, registryRpc)
  137. {
  138. // shared_ptr<Communicator> c = getCommunicator();
  139. START_FRAMEWORK_SERVER_1_2
  140. shared_ptr<Communicator> c = getCommunicator();
  141. rpcFromRegistry(c.get());
  142. LOG_CONSOLE_DEBUG << endl;
  143. funcInCoroutine([&](){
  144. rpcFromRegistry(c.get());
  145. });
  146. funcInCoroutine([&](){
  147. shared_ptr<Communicator> comm = getCommunicator();
  148. rpcFromRegistry(comm.get());
  149. });
  150. LOG_CONSOLE_DEBUG << endl;
  151. stopServer(rpc1Server);
  152. stopServer(rpc2Server);
  153. stopServer(fs);
  154. }
  155. TEST_F(HelloTest, registryRpcUpdateList)
  156. {
  157. shared_ptr<Communicator> comm = getCommunicator();
  158. comm->setProperty("refresh-endpoint-interval", "5000");
  159. START_FRAMEWORK_SERVER_1_2
  160. HELLO_CALL
  161. CHECK_REGISTRY_UPDATE
  162. STOP_FRAMEWORK_SERVER
  163. }
  164. TEST_F(HelloTest, registryRpcUpdateListInCoroutine1)
  165. {
  166. shared_ptr<Communicator> comm = getCommunicator();
  167. comm->setProperty("refresh-endpoint-interval", "5000");
  168. START_FRAMEWORK_SERVER_1_2
  169. funcInCoroutine([&]()
  170. {
  171. HELLO_CALL
  172. CHECK_REGISTRY_UPDATE
  173. });
  174. STOP_FRAMEWORK_SERVER
  175. }
  176. TEST_F(HelloTest, registryRpcUpdateListInCoroutine2)
  177. {
  178. shared_ptr<Communicator> comm = getCommunicator();
  179. comm->setProperty("refresh-endpoint-interval", "5000");
  180. START_FRAMEWORK_SERVER_1_2
  181. HELLO_CALL
  182. funcInCoroutine([&]()
  183. {
  184. CHECK_REGISTRY_UPDATE
  185. });
  186. STOP_FRAMEWORK_SERVER
  187. }
  188. TEST_F(HelloTest, registryRpcCheckUpdateList)
  189. {
  190. shared_ptr<Communicator> comm = getCommunicator();
  191. comm->setProperty("refresh-endpoint-interval", "5000");
  192. START_FRAMEWORK_SERVER_1_2
  193. //发起网路调用
  194. HELLO_CALL
  195. HELLO_CALL
  196. //构建多个线程, 都完成hello的调用
  197. std::thread cor1([&]()
  198. {
  199. CREATE_COR
  200. });
  201. std::thread cor2([&]()
  202. {
  203. CREATE_COR
  204. });
  205. std::thread cor3([&]()
  206. {
  207. CREATE_COR
  208. });
  209. TC_Common::msleep(200);
  210. vector<shared_ptr<CommunicatorEpoll>> v = comm->getAllCommunicatorEpoll();
  211. //5个网络通信器(2个公有, 3个私有)
  212. ASSERT_TRUE(v.size() == 5);
  213. ASSERT_FALSE(v[0]->isSchedCommunicatorEpoll());
  214. ASSERT_FALSE(v[1]->isSchedCommunicatorEpoll());
  215. ASSERT_TRUE(v[2]->isSchedCommunicatorEpoll());
  216. ASSERT_TRUE(v[3]->isSchedCommunicatorEpoll());
  217. ASSERT_TRUE(v[4]->isSchedCommunicatorEpoll());
  218. vector<ObjectProxy*> vop;
  219. for (auto ce : v)
  220. {
  221. ObjectProxy* op = ce->hasObjectProxy("TestApp.RpcServer.HelloObj");
  222. if (op)
  223. {
  224. vop.push_back(op);
  225. }
  226. }
  227. //网络通信器都有对象
  228. ASSERT_TRUE(vop.size() == 5);
  229. for (auto op : vop)
  230. {
  231. vector<AdapterProxy*> adapters = op->getAdapters();
  232. // LOG_CONSOLE_DEBUG << adapters.size() << endl;
  233. // for(auto a : adapters)
  234. // {
  235. // LOG_CONSOLE_DEBUG << a->trans()->getConnectEndpoint().toString() << endl;
  236. // }
  237. ASSERT_TRUE(adapters.size() == 2);
  238. ASSERT_TRUE(adapters[0]->trans()->getConnectEndpoint().getPort() == 9990);
  239. ASSERT_TRUE(adapters[1]->trans()->getConnectEndpoint().getPort() == 9991);
  240. }
  241. stopServer(rpc1Server);
  242. //更新主控ip list
  243. CDbHandle::cleanEndPoint();
  244. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  245. RpcServer rpc3Server;
  246. startServer(rpc3Server, RPC3_CONFIG());
  247. //等待主控更新时间
  248. wait(6000);
  249. //调用两次, 这样两个公有网路通信器都会更新ip list
  250. HELLO_CALL
  251. HELLO_CALL
  252. stopServer(rpc2Server);
  253. wait(1000);
  254. for (auto op : vop)
  255. {
  256. vector<AdapterProxy*> adapters = op->getAdapters();
  257. for(auto adapter : adapters)
  258. {
  259. // LOG_CONSOLE_DEBUG << "isSched:"<< op->getCommunicatorEpoll()->isSchedCommunicatorEpoll()
  260. // << ", netThreadSeq:" << op->getCommunicatorEpoll()->getCommunicatorNetThreadSeq()
  261. // << ", " << adapter->trans()->getConnectEndpoint().toString()
  262. // << ", isActiveInReg:" << adapter->isActiveInReg() << endl;
  263. if(adapter->trans()->getConnectEndpoint().getPort() == 9992)
  264. {
  265. ASSERT_TRUE(adapter->isActiveInReg());
  266. }
  267. else
  268. {
  269. ASSERT_FALSE(adapter->isActiveInReg());
  270. }
  271. }
  272. }
  273. //第三个服务也停掉
  274. stopServer(rpc3Server);
  275. cor1.detach();
  276. cor2.detach();
  277. cor3.detach();
  278. STOP_FRAMEWORK_SERVER
  279. }
  280. TEST_F(HelloTest, registryHttpRpcCheckUpdateList)
  281. {
  282. shared_ptr<Communicator> comm = getCommunicator();
  283. comm->setProperty("refresh-endpoint-interval", "5000");
  284. START_FRAMEWORK_HTTP_SERVER_1_2
  285. //发起网路调用
  286. HTTP_CALL
  287. //构建多个线程, 都完成hello的调用
  288. std::thread cor1([&]()
  289. {
  290. //注意http call里面会wait一下在调用, 等待ip list更新
  291. HTTP_CREATE_COR
  292. });
  293. TC_Common::msleep(200);
  294. vector<shared_ptr<CommunicatorEpoll>> v = comm->getAllCommunicatorEpoll();
  295. // for_each(v.begin(), v.end(), [](const shared_ptr<CommunicatorEpoll> &c){
  296. // LOG_CONSOLE_DEBUG << c->getCommunicatorNetThreadSeq() << ", " << c->isSchedCommunicatorEpoll() << endl;
  297. // });
  298. //3个网络通信器(2个公有, 1个私有)
  299. ASSERT_TRUE(v.size() == 3);
  300. ASSERT_FALSE(v[0]->isSchedCommunicatorEpoll());
  301. ASSERT_FALSE(v[1]->isSchedCommunicatorEpoll());
  302. ASSERT_TRUE(v[2]->isSchedCommunicatorEpoll());
  303. ServantPrx prx = comm->stringToProxy<ServantPrx>("TestApp.RpcServer.HttpObj"); \
  304. vector<ObjectProxy*> vop = prx->getObjectProxys();
  305. //并行连接数*网络通信器个数
  306. ASSERT_TRUE(vop.size() == 5*v.size());
  307. for (auto op : vop)
  308. {
  309. vector<AdapterProxy*> adapters = op->getAdapters();
  310. // for_each(adapters.begin(), adapters.end(), [](AdapterProxy*ap){
  311. // LOG_CONSOLE_DEBUG << ap->trans()->getConnectEndpoint().toString() << endl;
  312. // });
  313. // LOG_CONSOLE_DEBUG << op << ", -------------------------------------------------" << endl;
  314. ASSERT_TRUE(adapters.size() == 2);
  315. ASSERT_TRUE(adapters[0]->trans()->getConnectEndpoint().getPort() == 8180);
  316. ASSERT_TRUE(adapters[1]->trans()->getConnectEndpoint().getPort() == 8181);
  317. }
  318. stopServer(rpc1Server);
  319. //更新主控ip list
  320. CDbHandle::cleanEndPoint();
  321. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8182, 1);
  322. RpcServer rpc3Server;
  323. startServer(rpc3Server, RPC3_CONFIG());
  324. //等待主控更新时间
  325. wait(6000);
  326. //调用, 触发ip list更新
  327. HTTP_CALL
  328. stopServer(rpc2Server);
  329. wait(100);
  330. stopServer(rpc3Server);
  331. for (auto op : vop)
  332. {
  333. vector<AdapterProxy*> adapters = op->getAdapters();
  334. for(auto adapter : adapters)
  335. {
  336. // LOG_CONSOLE_DEBUG << "isSched:"<< op->getCommunicatorEpoll()->isSchedCommunicatorEpoll()
  337. // << ", netThreadSeq:" << op->getCommunicatorEpoll()->getCommunicatorNetThreadSeq()
  338. // << ", " << adapter->trans()->getConnectEndpoint().toString()
  339. // << ", isActiveInReg:" << adapter->isActiveInReg() << endl;
  340. if(adapter->trans()->getConnectEndpoint().getPort() == 8182)
  341. {
  342. ASSERT_TRUE(adapter->isActiveInReg());
  343. }
  344. else
  345. {
  346. ASSERT_FALSE(adapter->isActiveInReg());
  347. }
  348. }
  349. }
  350. cor1.detach();
  351. STOP_FRAMEWORK_SERVER
  352. }