test_registry.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. 
  2. #include "hello_test.h"
  3. #include "servant/CommunicatorEpoll.h"
  4. #include "servant/ObjectProxy.h"
  5. #include "server/framework/DbHandle.h"
  6. #include "server/FrameworkServer.h"
  7. #include "QueryF.h"
  8. TEST_F(HelloTest, registryQuery)
  9. {
  10. shared_ptr<Communicator> c = getCommunicator();
  11. TC_Config conf = CLIENT_CONFIG();
  12. c->setProperty(conf);
  13. TC_Config fconf = FRAMEWORK_CONFIG();
  14. CDbHandle::cleanEndPoint();
  15. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9990, 1);
  16. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9991, 1);
  17. FrameworkServer fs;
  18. startServer(fs, FRAMEWORK_CONFIG());
  19. string obj = getObj(fconf, "RegistryAdapter");
  20. QueryFPrx qPrx = c->stringToProxy<QueryFPrx>(obj);
  21. {
  22. std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
  23. ASSERT_TRUE(endpoints.size() == 2);
  24. ASSERT_TRUE(endpoints[0].port == 9990);
  25. ASSERT_TRUE(endpoints[1].port == 9991);
  26. }
  27. {
  28. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  29. TC_Common::sleep(6);
  30. qPrx->getEndpoint();
  31. TC_Common::msleep(100);
  32. {
  33. std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
  34. ASSERT_TRUE(endpoints.size() == 3);
  35. ASSERT_TRUE(endpoints[0].port == 9990);
  36. ASSERT_TRUE(endpoints[1].port == 9991);
  37. ASSERT_TRUE(endpoints[2].port == 9992);
  38. }
  39. }
  40. {
  41. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9993, 1);
  42. TC_Common::sleep(6);
  43. qPrx->getEndpoint();
  44. TC_Common::msleep(100);
  45. {
  46. std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
  47. ASSERT_TRUE(endpoints.size() == 4);
  48. ASSERT_TRUE(endpoints[0].port == 9990);
  49. ASSERT_TRUE(endpoints[1].port == 9991);
  50. ASSERT_TRUE(endpoints[2].port == 9992);
  51. ASSERT_TRUE(endpoints[3].port == 9993);
  52. }
  53. }
  54. stopServer(fs);
  55. }
  56. #define START_FRAMEWORK_SERVER_1_2 \
  57. CDbHandle::cleanEndPoint(); \
  58. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9990, 1); \
  59. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9991, 1); \
  60. FrameworkServer fs; \
  61. startServer(fs, FRAMEWORK_CONFIG()); \
  62. RpcServer rpc1Server; \
  63. startServer(rpc1Server, RPC1_CONFIG()); \
  64. RpcServer rpc2Server; \
  65. startServer(rpc2Server, RPC2_CONFIG());
  66. #define START_FRAMEWORK_HTTP_SERVER_1_2 \
  67. CDbHandle::cleanEndPoint(); \
  68. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8180, 1); \
  69. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8181, 1); \
  70. FrameworkServer fs; \
  71. startServer(fs, FRAMEWORK_CONFIG()); \
  72. RpcServer rpc1Server; \
  73. startServer(rpc1Server, RPC1_CONFIG()); \
  74. RpcServer rpc2Server; \
  75. startServer(rpc2Server, RPC2_CONFIG());
  76. #define STOP_FRAMEWORK_SERVER stopServer(fs);
  77. #define HELLO_CALL {\
  78. HelloPrx qPrx = comm->stringToProxy<HelloPrx>("TestApp.RpcServer.HelloObj"); \
  79. int ret; \
  80. string out; \
  81. ret = qPrx->testHello(0, _buffer, out); \
  82. ASSERT_TRUE(ret == 0); \
  83. ASSERT_TRUE(out == _buffer); }
  84. #define CHECK_REGISTRY_UPDATE { \
  85. LOG_CONSOLE_DEBUG << endl; \
  86. stopServer(rpc1Server); \
  87. LOG_CONSOLE_DEBUG << endl; \
  88. CDbHandle::cleanEndPoint(); \
  89. CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1); \
  90. RpcServer rpc3Server; \
  91. startServer(rpc3Server, RPC3_CONFIG()); \
  92. LOG_CONSOLE_DEBUG << endl; \
  93. wait(6000); \
  94. LOG_CONSOLE_DEBUG << endl; \
  95. HelloPrx qPrx = comm->stringToProxy<HelloPrx>("TestApp.RpcServer.HelloObj"); \
  96. string out = ""; \
  97. LOG_CONSOLE_DEBUG << endl; \
  98. int ret = qPrx->testHello(0, _buffer, out); \
  99. LOG_CONSOLE_DEBUG << endl; \
  100. ASSERT_TRUE(ret == 0); \
  101. ASSERT_TRUE(out == _buffer); \
  102. stopServer(rpc2Server); \
  103. LOG_CONSOLE_DEBUG << endl; \
  104. wait(100); \
  105. LOG_CONSOLE_DEBUG << endl; \
  106. out = ""; \
  107. ret = qPrx->testHello(0, _buffer, out); \
  108. LOG_CONSOLE_DEBUG << endl; \
  109. ASSERT_TRUE(ret == 0); \
  110. ASSERT_TRUE(out == _buffer); \
  111. stopServer(rpc3Server); }
  112. #define CREATE_COR \
  113. { \
  114. auto scheduler = TC_CoroutineScheduler::create(); \
  115. ServantProxyThreadData::getData()->_sched = scheduler; \
  116. scheduler->go([&]() \
  117. { \
  118. { HELLO_CALL; wait(6500); HELLO_CALL } \
  119. }); \
  120. scheduler->run(); \
  121. }
  122. #define HTTP_CALL {\
  123. ServantPrx prx = comm->stringToProxy<ServantPrx>("TestApp.RpcServer.HttpObj"); \
  124. prx->tars_set_protocol(ServantProxy::PROTOCOL_HTTP1, 5); \
  125. string buff = _buffer + "-" + TC_Common::tostr(TC_Thread::CURRENT_THREADID()); \
  126. shared_ptr<TC_HttpRequest> req = std::make_shared<TC_HttpRequest>(); \
  127. req->setPostRequest("http://tars.com/hello", buff, true); \
  128. req->setHeader("Connection", "close"); \
  129. shared_ptr<TC_HttpResponse> rsp; \
  130. int count = 10; \
  131. while(count-- > 0) { prx->http_call("hello", req, rsp); } }
  132. #define HTTP_CREATE_COR \
  133. { \
  134. auto scheduler = TC_CoroutineScheduler::create(); \
  135. ServantProxyThreadData::getData()->_sched = scheduler; \
  136. scheduler->go([&]() \
  137. { \
  138. { HTTP_CALL; wait(6200); HTTP_CALL } \
  139. }); \
  140. scheduler->run(); \
  141. }
  142. TEST_F(HelloTest, registryRpc)
  143. {
  144. // shared_ptr<Communicator> c = getCommunicator();
  145. START_FRAMEWORK_SERVER_1_2
  146. shared_ptr<Communicator> c = getCommunicator();
  147. rpcFromRegistry(c.get());
  148. LOG_CONSOLE_DEBUG << endl;
  149. funcInCoroutine([&](){
  150. rpcFromRegistry(c.get());
  151. });
  152. funcInCoroutine([&](){
  153. shared_ptr<Communicator> comm = getCommunicator();
  154. rpcFromRegistry(comm.get());
  155. });
  156. LOG_CONSOLE_DEBUG << endl;
  157. stopServer(rpc1Server);
  158. stopServer(rpc2Server);
  159. stopServer(fs);
  160. }
  161. //一致hash测试
  162. TEST_F(HelloTest, registryRpcConHashInvoke)
  163. {
  164. START_FRAMEWORK_SERVER_1_2
  165. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9989, 1);
  166. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  167. shared_ptr<Communicator> c = getCommunicator();
  168. rpcConHashFromRegistry(c.get());
  169. stopServer(rpc1Server);
  170. stopServer(rpc2Server);
  171. STOP_FRAMEWORK_SERVER;
  172. }
  173. TEST_F(HelloTest, registryRpcUpdateList)
  174. {
  175. shared_ptr<Communicator> comm = getCommunicator();
  176. comm->setProperty("refresh-endpoint-interval", "5000");
  177. START_FRAMEWORK_SERVER_1_2
  178. HELLO_CALL
  179. CHECK_REGISTRY_UPDATE
  180. STOP_FRAMEWORK_SERVER
  181. }
  182. TEST_F(HelloTest, registryRpcUpdateListInCoroutine1)
  183. {
  184. shared_ptr<Communicator> comm = getCommunicator();
  185. comm->setProperty("refresh-endpoint-interval", "5000");
  186. START_FRAMEWORK_SERVER_1_2
  187. funcInCoroutine([&]()
  188. {
  189. HELLO_CALL
  190. CHECK_REGISTRY_UPDATE
  191. }
  192. );
  193. STOP_FRAMEWORK_SERVER
  194. }
  195. TEST_F(HelloTest, registryRpcUpdateListInCoroutine2)
  196. {
  197. shared_ptr<Communicator> comm = getCommunicator();
  198. comm->setProperty("refresh-endpoint-interval", "5000");
  199. START_FRAMEWORK_SERVER_1_2
  200. HELLO_CALL
  201. funcInCoroutine([&]()
  202. {
  203. CHECK_REGISTRY_UPDATE
  204. });
  205. STOP_FRAMEWORK_SERVER
  206. }
  207. //
  208. //TEST_F(HelloTest, registryRpcCheckUpdateList)
  209. //{
  210. // shared_ptr<Communicator> comm = getCommunicator();
  211. // comm->setProperty("refresh-endpoint-interval", "5000");
  212. //
  213. // START_FRAMEWORK_SERVER_1_2
  214. //
  215. // //发起网路调用
  216. // HELLO_CALL
  217. //
  218. // HELLO_CALL
  219. //
  220. // //构建多个线程, 都完成hello的调用
  221. // std::thread cor1([&]()
  222. // {
  223. // CREATE_COR
  224. // });
  225. //
  226. // std::thread cor2([&]()
  227. // {
  228. // CREATE_COR
  229. // });
  230. //
  231. // std::thread cor3([&]()
  232. // {
  233. // CREATE_COR
  234. // });
  235. //
  236. // TC_Common::msleep(200);
  237. //
  238. // vector<shared_ptr<CommunicatorEpoll>> v = comm->getAllCommunicatorEpoll();
  239. //
  240. // //5个网络通信器(2个公有, 3个私有)
  241. // ASSERT_TRUE(v.size() == 5);
  242. // ASSERT_FALSE(v[0]->isSchedCommunicatorEpoll());
  243. // ASSERT_FALSE(v[1]->isSchedCommunicatorEpoll());
  244. // ASSERT_TRUE(v[2]->isSchedCommunicatorEpoll());
  245. // ASSERT_TRUE(v[3]->isSchedCommunicatorEpoll());
  246. // ASSERT_TRUE(v[4]->isSchedCommunicatorEpoll());
  247. //
  248. // vector<ObjectProxy*> vop;
  249. //
  250. // for (auto ce : v)
  251. // {
  252. // ObjectProxy* op = ce->hasObjectProxy("TestApp.RpcServer.HelloObj");
  253. //
  254. // if (op)
  255. // {
  256. // vop.push_back(op);
  257. // }
  258. // }
  259. //
  260. // //网络通信器都有对象
  261. // ASSERT_TRUE(vop.size() == 5);
  262. //
  263. // for (auto op : vop)
  264. // {
  265. // vector<AdapterProxy*> adapters = op->getAdapters();
  266. //
  267. // ASSERT_TRUE(adapters.size() == 2);
  268. // ASSERT_TRUE(adapters[0]->trans()->getConnectEndpoint().getPort() == 9990);
  269. // ASSERT_TRUE(adapters[1]->trans()->getConnectEndpoint().getPort() == 9991);
  270. // }
  271. //
  272. // stopServer(rpc1Server);
  273. //
  274. // //更新主控ip list
  275. // CDbHandle::cleanEndPoint();
  276. // CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  277. //
  278. // RpcServer rpc3Server;
  279. // startServer(rpc3Server, RPC3_CONFIG());
  280. // //等待主控更新时间
  281. // wait(6000);
  282. //
  283. // //调用两次, 这样两个公有网路通信器都会更新ip list
  284. // HELLO_CALL
  285. //
  286. // HELLO_CALL
  287. //
  288. // stopServer(rpc2Server);
  289. //
  290. // wait(1000);
  291. //
  292. // for (auto op : vop)
  293. // {
  294. // vector<AdapterProxy*> adapters = op->getAdapters();
  295. //
  296. // for(auto adapter : adapters)
  297. // {
  298. // if(adapter->trans()->getConnectEndpoint().getPort() == 9992)
  299. // {
  300. // ASSERT_TRUE(adapter->isActiveInReg());
  301. // }
  302. // else
  303. // {
  304. // ASSERT_FALSE(adapter->isActiveInReg());
  305. // }
  306. // }
  307. // }
  308. //
  309. // //第三个服务也停掉
  310. // stopServer(rpc3Server);
  311. // cor1.detach();
  312. // cor2.detach();
  313. // cor3.detach();
  314. // STOP_FRAMEWORK_SERVER
  315. //}
  316. //
  317. //
  318. //TEST_F(HelloTest, registryHttpRpcCheckUpdateList)
  319. //{
  320. // shared_ptr<Communicator> comm = getCommunicator();
  321. // comm->setProperty("refresh-endpoint-interval", "5000");
  322. //
  323. // START_FRAMEWORK_HTTP_SERVER_1_2
  324. //
  325. // //发起网路调用
  326. // HTTP_CALL
  327. //
  328. // //构建多个线程, 都完成hello的调用
  329. // std::thread cor1([&]()
  330. // {
  331. // //注意http call里面会wait一下在调用, 等待ip list更新
  332. // HTTP_CREATE_COR
  333. // });
  334. //
  335. // TC_Common::msleep(200);
  336. //
  337. // vector<shared_ptr<CommunicatorEpoll>> v = comm->getAllCommunicatorEpoll();
  338. //
  339. // //3个网络通信器(2个公有, 1个私有)
  340. // ASSERT_TRUE(v.size() == 3);
  341. // ASSERT_FALSE(v[0]->isSchedCommunicatorEpoll());
  342. // ASSERT_FALSE(v[1]->isSchedCommunicatorEpoll());
  343. // ASSERT_TRUE(v[2]->isSchedCommunicatorEpoll());
  344. //
  345. // ServantPrx prx = comm->stringToProxy<ServantPrx>("TestApp.RpcServer.HttpObj"); \
  346. //
  347. // vector<ObjectProxy*> vop = prx->getObjectProxys();
  348. // //并行连接数*网络通信器个数
  349. // ASSERT_TRUE(vop.size() == 5*v.size());
  350. //
  351. // for (auto op : vop)
  352. // {
  353. // vector<AdapterProxy*> adapters = op->getAdapters();
  354. //
  355. // ASSERT_TRUE(adapters.size() == 2);
  356. // ASSERT_TRUE(adapters[0]->trans()->getConnectEndpoint().getPort() == 8180);
  357. // ASSERT_TRUE(adapters[1]->trans()->getConnectEndpoint().getPort() == 8181);
  358. // }
  359. //
  360. // stopServer(rpc1Server);
  361. //
  362. // //更新主控ip list
  363. // CDbHandle::cleanEndPoint();
  364. // CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8182, 1);
  365. //
  366. // RpcServer rpc3Server;
  367. // startServer(rpc3Server, RPC3_CONFIG());
  368. // //等待主控更新时间
  369. // wait(6000);
  370. // //调用, 触发ip list更新
  371. // HTTP_CALL
  372. // stopServer(rpc2Server);
  373. // wait(100);
  374. // stopServer(rpc3Server);
  375. //
  376. // for (auto op : vop)
  377. // {
  378. // vector<AdapterProxy*> adapters = op->getAdapters();
  379. //
  380. // for(auto adapter : adapters)
  381. // {
  382. // if(adapter->trans()->getConnectEndpoint().getPort() == 8182)
  383. // {
  384. // ASSERT_TRUE(adapter->isActiveInReg());
  385. // }
  386. // else
  387. // {
  388. // ASSERT_FALSE(adapter->isActiveInReg());
  389. // }
  390. // }
  391. // }
  392. //
  393. // cor1.detach();
  394. // STOP_FRAMEWORK_SERVER
  395. //}
  396. TEST_F(HelloTest, registryRpcHashTagInvoke)
  397. {
  398. START_FRAMEWORK_SERVER_1_2
  399. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9989, 1);
  400. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  401. shared_ptr<Communicator> c = getCommunicator();
  402. string obj = "TestApp.RpcServer.HelloObj#9999";
  403. HelloPrx prx = c->stringToProxy<HelloPrx>(obj);
  404. string out;
  405. int co = prx->testHello(0, _buffer, out);
  406. ASSERT_TRUE(co == 0);
  407. stopServer(rpc1Server);
  408. stopServer(rpc2Server);
  409. STOP_FRAMEWORK_SERVER;
  410. }
  411. TEST_F(HelloTest, registryRpcMultiHashTagInvoke)
  412. {
  413. START_FRAMEWORK_SERVER_1_2
  414. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9989, 1);
  415. CDbHandle::addInactiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
  416. shared_ptr<Communicator> c = getCommunicator();
  417. string obj = "TestApp.RpcServer.HelloObj";
  418. HelloPrx prx = c->stringToProxy<HelloPrx>(obj);
  419. string out;
  420. int co = prx->testHello(0, _buffer, out);
  421. ASSERT_TRUE(co == 0);
  422. stopServer(rpc1Server);
  423. stopServer(rpc2Server);
  424. STOP_FRAMEWORK_SERVER;
  425. }
  426. TEST_F(HelloTest, registryPush)
  427. {
  428. string obj = "TestApp.RpcServer.HelloObj";
  429. START_FRAMEWORK_SERVER_1_2
  430. shared_ptr<Communicator> c = getCommunicator();
  431. HelloPrx prx = c->stringToProxy<HelloPrx>(obj);
  432. vector<EndpointInfo> activeEndPoint;
  433. vector<EndpointInfo> inactiveEndPoint;
  434. int co;
  435. int i = 20;
  436. while(--i)
  437. {
  438. co = prx->testPort();
  439. ASSERT_TRUE(co == 9990 || co == 9991);
  440. }
  441. CDbHandle::cleanEndPoint();
  442. CDbHandle::addActiveEndPoint(obj, 9990, 1);
  443. CDbHandle::addInactiveEndPoint(obj, 9991, 1);
  444. CDbHandle::push();
  445. TC_Common::sleep(1);
  446. co = prx->testPort();
  447. prx->tars_endpoints(activeEndPoint, inactiveEndPoint);
  448. for(auto e : activeEndPoint)
  449. {
  450. LOG_CONSOLE_DEBUG << e.getEndpoint().toString() << endl;
  451. }
  452. TC_Common::sleep(1);
  453. i = 10;
  454. while(--i>0)
  455. {
  456. co = prx->testPort();
  457. LOG_CONSOLE_DEBUG << co << endl;
  458. ASSERT_TRUE(co == 9990);
  459. }
  460. stopServer(rpc1Server);
  461. stopServer(rpc2Server);
  462. STOP_FRAMEWORK_SERVER;
  463. }