test_registry.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. 
  2. #include "hello_test.h"
  3. #include "servant/CommunicatorEpoll.h"
  4. #include "servant/ObjectProxy.h"
  5. #include "server/framework/DbHandle.h"
  6. #include "server/FrameworkServer.h"
  7. #include "QueryF.h"
  8. TEST_F(HelloTest, registryQuery)
  9. {
  10. shared_ptr<Communicator> c = getCommunicator();
  11. TC_Config conf = CLIENT_CONFIG();
  12. c->setProperty(conf);
  13. TC_Config fconf = FRAMEWORK_CONFIG();
  14. CDbHandle::cleanEndPoint();
  15. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9990, 1);
  16. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9991, 1);
  17. FrameworkServer fs;
  18. startServer(fs, FRAMEWORK_CONFIG());
  19. string obj = getObj(fconf, "RegistryAdapter");
  20. QueryFPrx qPrx = c->stringToProxy<QueryFPrx>(obj);
  21. {
  22. std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
  23. ASSERT_TRUE(endpoints.size() == 2);
  24. ASSERT_TRUE(endpoints[0].port == 9990);
  25. ASSERT_TRUE(endpoints[1].port == 9991);
  26. }
  27. {
  28. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  29. TC_Common::sleep(6);
  30. qPrx->getEndpoint();
  31. TC_Common::msleep(100);
  32. {
  33. std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
  34. ASSERT_TRUE(endpoints.size() == 3);
  35. ASSERT_TRUE(endpoints[0].port == 9990);
  36. ASSERT_TRUE(endpoints[1].port == 9991);
  37. ASSERT_TRUE(endpoints[2].port == 9992);
  38. }
  39. }
  40. {
  41. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9993, 1);
  42. TC_Common::sleep(6);
  43. qPrx->getEndpoint();
  44. TC_Common::msleep(100);
  45. {
  46. std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
  47. ASSERT_TRUE(endpoints.size() == 4);
  48. ASSERT_TRUE(endpoints[0].port == 9990);
  49. ASSERT_TRUE(endpoints[1].port == 9991);
  50. ASSERT_TRUE(endpoints[2].port == 9992);
  51. ASSERT_TRUE(endpoints[3].port == 9993);
  52. }
  53. }
  54. stopServer(fs);
  55. }
  56. #define START_FRAMEWORK_SERVER_1_2 \
  57. CDbHandle::cleanEndPoint(); \
  58. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9990, 1); \
  59. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9991, 1); \
  60. FrameworkServer fs; \
  61. startServer(fs, FRAMEWORK_CONFIG()); \
  62. RpcServer rpc1Server; \
  63. startServer(rpc1Server, RPC1_CONFIG()); \
  64. RpcServer rpc2Server; \
  65. startServer(rpc2Server, RPC2_CONFIG());
  66. #define START_FRAMEWORK_HTTP_SERVER_1_2 \
  67. CDbHandle::cleanEndPoint(); \
  68. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8180, 1); \
  69. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8181, 1); \
  70. FrameworkServer fs; \
  71. startServer(fs, FRAMEWORK_CONFIG()); \
  72. RpcServer rpc1Server; \
  73. startServer(rpc1Server, RPC1_CONFIG()); \
  74. RpcServer rpc2Server; \
  75. startServer(rpc2Server, RPC2_CONFIG());
  76. #define STOP_FRAMEWORK_SERVER stopServer(fs);
  77. #define HELLO_CALL {\
  78. HelloPrx qPrx = comm->stringToProxy<HelloPrx>("TestApp.RpcServer.HelloObj"); \
  79. int ret; \
  80. string out; \
  81. ret = qPrx->testHello(0, _buffer, out); \
  82. ASSERT_TRUE(ret == 0); \
  83. ASSERT_TRUE(out == _buffer); }
  84. #define CHECK_REGISTRY_UPDATE {\
  85. stopServer(rpc1Server); \
  86. CDbHandle::cleanEndPoint(); \
  87. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1); \
  88. RpcServer rpc3Server; \
  89. startServer(rpc3Server, RPC3_CONFIG()); \
  90. wait(6000); \
  91. HelloPrx qPrx = comm->stringToProxy<HelloPrx>("TestApp.RpcServer.HelloObj"); \
  92. string out = ""; \
  93. int ret = qPrx->testHello(0, _buffer, out); \
  94. ASSERT_TRUE(ret == 0); \
  95. ASSERT_TRUE(out == _buffer); \
  96. stopServer(rpc2Server); \
  97. wait(100); \
  98. out = ""; \
  99. ret = qPrx->testHello(0, _buffer, out); \
  100. ASSERT_TRUE(ret == 0); \
  101. ASSERT_TRUE(out == _buffer); \
  102. stopServer(rpc3Server); }
  103. #define CREATE_COR \
  104. { \
  105. auto scheduler = TC_CoroutineScheduler::create(); \
  106. ServantProxyThreadData::getData()->_sched = scheduler; \
  107. scheduler->go([&]() \
  108. { \
  109. { HELLO_CALL; wait(6500); HELLO_CALL } \
  110. }); \
  111. scheduler->run(); \
  112. }
  113. #define HTTP_CALL {\
  114. ServantPrx prx = comm->stringToProxy<ServantPrx>("TestApp.RpcServer.HttpObj"); \
  115. prx->tars_set_protocol(ServantProxy::PROTOCOL_HTTP1, 5); \
  116. string buff = _buffer + "-" + TC_Common::tostr(TC_Thread::CURRENT_THREADID()); \
  117. shared_ptr<TC_HttpRequest> req = std::make_shared<TC_HttpRequest>(); \
  118. req->setPostRequest("http://tars.com/hello", buff, true); \
  119. req->setHeader("Connection", "close"); \
  120. shared_ptr<TC_HttpResponse> rsp; \
  121. int count = 10; \
  122. while(count-- > 0) { prx->http_call("hello", req, rsp); } }
  123. #define HTTP_CREATE_COR \
  124. { \
  125. auto scheduler = TC_CoroutineScheduler::create(); \
  126. ServantProxyThreadData::getData()->_sched = scheduler; \
  127. scheduler->go([&]() \
  128. { \
  129. { HTTP_CALL; wait(6200); HTTP_CALL } \
  130. }); \
  131. scheduler->run(); \
  132. }
  133. TEST_F(HelloTest, registryRpc)
  134. {
  135. // shared_ptr<Communicator> c = getCommunicator();
  136. START_FRAMEWORK_SERVER_1_2
  137. shared_ptr<Communicator> c = getCommunicator();
  138. rpcFromRegistry(c.get());
  139. LOG_CONSOLE_DEBUG << endl;
  140. funcInCoroutine([&](){
  141. rpcFromRegistry(c.get());
  142. });
  143. funcInCoroutine([&](){
  144. shared_ptr<Communicator> comm = getCommunicator();
  145. rpcFromRegistry(comm.get());
  146. });
  147. LOG_CONSOLE_DEBUG << endl;
  148. stopServer(rpc1Server);
  149. stopServer(rpc2Server);
  150. stopServer(fs);
  151. }
  152. //一致hash测试
  153. TEST_F(HelloTest, registryRpcConHashInvoke)
  154. {
  155. START_FRAMEWORK_SERVER_1_2
  156. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9989, 1);
  157. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  158. shared_ptr<Communicator> c = getCommunicator();
  159. rpcConHashFromRegistry(c.get());
  160. stopServer(rpc1Server);
  161. stopServer(rpc2Server);
  162. STOP_FRAMEWORK_SERVER;
  163. }
  164. TEST_F(HelloTest, registryRpcUpdateList)
  165. {
  166. shared_ptr<Communicator> comm = getCommunicator();
  167. comm->setProperty("refresh-endpoint-interval", "5000");
  168. START_FRAMEWORK_SERVER_1_2
  169. HELLO_CALL
  170. CHECK_REGISTRY_UPDATE
  171. STOP_FRAMEWORK_SERVER
  172. }
  173. TEST_F(HelloTest, registryRpcUpdateListInCoroutine1)
  174. {
  175. shared_ptr<Communicator> comm = getCommunicator();
  176. comm->setProperty("refresh-endpoint-interval", "5000");
  177. START_FRAMEWORK_SERVER_1_2
  178. funcInCoroutine([&]()
  179. {
  180. HELLO_CALL
  181. CHECK_REGISTRY_UPDATE
  182. });
  183. STOP_FRAMEWORK_SERVER
  184. }
  185. TEST_F(HelloTest, registryRpcUpdateListInCoroutine2)
  186. {
  187. shared_ptr<Communicator> comm = getCommunicator();
  188. comm->setProperty("refresh-endpoint-interval", "5000");
  189. START_FRAMEWORK_SERVER_1_2
  190. HELLO_CALL
  191. funcInCoroutine([&]()
  192. {
  193. CHECK_REGISTRY_UPDATE
  194. });
  195. STOP_FRAMEWORK_SERVER
  196. }
  197. TEST_F(HelloTest, registryRpcCheckUpdateList)
  198. {
  199. shared_ptr<Communicator> comm = getCommunicator();
  200. comm->setProperty("refresh-endpoint-interval", "5000");
  201. START_FRAMEWORK_SERVER_1_2
  202. //发起网路调用
  203. HELLO_CALL
  204. HELLO_CALL
  205. //构建多个线程, 都完成hello的调用
  206. std::thread cor1([&]()
  207. {
  208. CREATE_COR
  209. });
  210. std::thread cor2([&]()
  211. {
  212. CREATE_COR
  213. });
  214. std::thread cor3([&]()
  215. {
  216. CREATE_COR
  217. });
  218. TC_Common::msleep(200);
  219. vector<shared_ptr<CommunicatorEpoll>> v = comm->getAllCommunicatorEpoll();
  220. //5个网络通信器(2个公有, 3个私有)
  221. ASSERT_TRUE(v.size() == 5);
  222. ASSERT_FALSE(v[0]->isSchedCommunicatorEpoll());
  223. ASSERT_FALSE(v[1]->isSchedCommunicatorEpoll());
  224. ASSERT_TRUE(v[2]->isSchedCommunicatorEpoll());
  225. ASSERT_TRUE(v[3]->isSchedCommunicatorEpoll());
  226. ASSERT_TRUE(v[4]->isSchedCommunicatorEpoll());
  227. vector<ObjectProxy*> vop;
  228. for (auto ce : v)
  229. {
  230. ObjectProxy* op = ce->hasObjectProxy("TestApp.RpcServer.HelloObj");
  231. if (op)
  232. {
  233. vop.push_back(op);
  234. }
  235. }
  236. //网络通信器都有对象
  237. ASSERT_TRUE(vop.size() == 5);
  238. for (auto op : vop)
  239. {
  240. vector<AdapterProxy*> adapters = op->getAdapters();
  241. // LOG_CONSOLE_DEBUG << adapters.size() << endl;
  242. // for(auto a : adapters)
  243. // {
  244. // LOG_CONSOLE_DEBUG << a->trans()->getConnectEndpoint().toString() << endl;
  245. // }
  246. ASSERT_TRUE(adapters.size() == 2);
  247. ASSERT_TRUE(adapters[0]->trans()->getConnectEndpoint().getPort() == 9990);
  248. ASSERT_TRUE(adapters[1]->trans()->getConnectEndpoint().getPort() == 9991);
  249. }
  250. stopServer(rpc1Server);
  251. //更新主控ip list
  252. CDbHandle::cleanEndPoint();
  253. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  254. RpcServer rpc3Server;
  255. startServer(rpc3Server, RPC3_CONFIG());
  256. //等待主控更新时间
  257. wait(6000);
  258. //调用两次, 这样两个公有网路通信器都会更新ip list
  259. HELLO_CALL
  260. HELLO_CALL
  261. stopServer(rpc2Server);
  262. wait(1000);
  263. for (auto op : vop)
  264. {
  265. vector<AdapterProxy*> adapters = op->getAdapters();
  266. for(auto adapter : adapters)
  267. {
  268. // LOG_CONSOLE_DEBUG << "isSched:"<< op->getCommunicatorEpoll()->isSchedCommunicatorEpoll()
  269. // << ", netThreadSeq:" << op->getCommunicatorEpoll()->getCommunicatorNetThreadSeq()
  270. // << ", " << adapter->trans()->getConnectEndpoint().toString()
  271. // << ", isActiveInReg:" << adapter->isActiveInReg() << endl;
  272. if(adapter->trans()->getConnectEndpoint().getPort() == 9992)
  273. {
  274. ASSERT_TRUE(adapter->isActiveInReg());
  275. }
  276. else
  277. {
  278. ASSERT_FALSE(adapter->isActiveInReg());
  279. }
  280. }
  281. }
  282. //第三个服务也停掉
  283. stopServer(rpc3Server);
  284. cor1.detach();
  285. cor2.detach();
  286. cor3.detach();
  287. STOP_FRAMEWORK_SERVER
  288. }
  289. TEST_F(HelloTest, registryHttpRpcCheckUpdateList)
  290. {
  291. shared_ptr<Communicator> comm = getCommunicator();
  292. comm->setProperty("refresh-endpoint-interval", "5000");
  293. START_FRAMEWORK_HTTP_SERVER_1_2
  294. //发起网路调用
  295. HTTP_CALL
  296. //构建多个线程, 都完成hello的调用
  297. std::thread cor1([&]()
  298. {
  299. //注意http call里面会wait一下在调用, 等待ip list更新
  300. HTTP_CREATE_COR
  301. });
  302. TC_Common::msleep(200);
  303. vector<shared_ptr<CommunicatorEpoll>> v = comm->getAllCommunicatorEpoll();
  304. // for_each(v.begin(), v.end(), [](const shared_ptr<CommunicatorEpoll> &c){
  305. // LOG_CONSOLE_DEBUG << c->getCommunicatorNetThreadSeq() << ", " << c->isSchedCommunicatorEpoll() << endl;
  306. // });
  307. //3个网络通信器(2个公有, 1个私有)
  308. ASSERT_TRUE(v.size() == 3);
  309. ASSERT_FALSE(v[0]->isSchedCommunicatorEpoll());
  310. ASSERT_FALSE(v[1]->isSchedCommunicatorEpoll());
  311. ASSERT_TRUE(v[2]->isSchedCommunicatorEpoll());
  312. ServantPrx prx = comm->stringToProxy<ServantPrx>("TestApp.RpcServer.HttpObj"); \
  313. vector<ObjectProxy*> vop = prx->getObjectProxys();
  314. //并行连接数*网络通信器个数
  315. ASSERT_TRUE(vop.size() == 5*v.size());
  316. for (auto op : vop)
  317. {
  318. vector<AdapterProxy*> adapters = op->getAdapters();
  319. // for_each(adapters.begin(), adapters.end(), [](AdapterProxy*ap){
  320. // LOG_CONSOLE_DEBUG << ap->trans()->getConnectEndpoint().toString() << endl;
  321. // });
  322. // LOG_CONSOLE_DEBUG << op << ", -------------------------------------------------" << endl;
  323. ASSERT_TRUE(adapters.size() == 2);
  324. ASSERT_TRUE(adapters[0]->trans()->getConnectEndpoint().getPort() == 8180);
  325. ASSERT_TRUE(adapters[1]->trans()->getConnectEndpoint().getPort() == 8181);
  326. }
  327. stopServer(rpc1Server);
  328. //更新主控ip list
  329. CDbHandle::cleanEndPoint();
  330. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8182, 1);
  331. RpcServer rpc3Server;
  332. startServer(rpc3Server, RPC3_CONFIG());
  333. //等待主控更新时间
  334. wait(6000);
  335. //调用, 触发ip list更新
  336. HTTP_CALL
  337. stopServer(rpc2Server);
  338. wait(100);
  339. stopServer(rpc3Server);
  340. for (auto op : vop)
  341. {
  342. vector<AdapterProxy*> adapters = op->getAdapters();
  343. for(auto adapter : adapters)
  344. {
  345. // LOG_CONSOLE_DEBUG << "isSched:"<< op->getCommunicatorEpoll()->isSchedCommunicatorEpoll()
  346. // << ", netThreadSeq:" << op->getCommunicatorEpoll()->getCommunicatorNetThreadSeq()
  347. // << ", " << adapter->trans()->getConnectEndpoint().toString()
  348. // << ", isActiveInReg:" << adapter->isActiveInReg() << endl;
  349. if(adapter->trans()->getConnectEndpoint().getPort() == 8182)
  350. {
  351. ASSERT_TRUE(adapter->isActiveInReg());
  352. }
  353. else
  354. {
  355. ASSERT_FALSE(adapter->isActiveInReg());
  356. }
  357. }
  358. }
  359. cor1.detach();
  360. STOP_FRAMEWORK_SERVER
  361. }
  362. TEST_F(HelloTest, registryRpcHashTagInvoke)
  363. {
  364. START_FRAMEWORK_SERVER_1_2
  365. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9989, 1);
  366. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  367. shared_ptr<Communicator> c = getCommunicator();
  368. string obj = "TestApp.RpcServer.HelloObj#9999";
  369. HelloPrx prx = c->stringToProxy<HelloPrx>(obj);
  370. string out;
  371. int co = prx->testHello(0, _buffer, out);
  372. ASSERT_TRUE(co == 0);
  373. stopServer(rpc1Server);
  374. stopServer(rpc2Server);
  375. STOP_FRAMEWORK_SERVER;
  376. }
  377. TEST_F(HelloTest, registryRpcMultiHashTagInvoke)
  378. {
  379. START_FRAMEWORK_SERVER_1_2
  380. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9989, 1);
  381. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  382. shared_ptr<Communicator> c = getCommunicator();
  383. string obj = "TestApp.RpcServer.HelloObj";
  384. HelloPrx prx = c->stringToProxy<HelloPrx>(obj);
  385. string out;
  386. int co = prx->testHello(0, _buffer, out);
  387. ASSERT_TRUE(co == 0);
  388. stopServer(rpc1Server);
  389. stopServer(rpc2Server);
  390. STOP_FRAMEWORK_SERVER;
  391. }