123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 |
-
- #include "hello_test.h"
- #include "servant/CommunicatorEpoll.h"
- #include "servant/ObjectProxy.h"
- #include "server/framework/DbHandle.h"
- #include "server/FrameworkServer.h"
- #include "QueryF.h"
- TEST_F(HelloTest, registryQuery)
- {
- shared_ptr<Communicator> c = getCommunicator();
- TC_Config conf = CLIENT_CONFIG();
- // conf.parseString(CLIENT_CONFIG);
- c->setProperty(conf);
- TC_Config fconf = FRAMEWORK_CONFIG();
- CDbHandle::cleanEndPoint();
- CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9990, 1);
- CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9991, 1);
- FrameworkServer fs;
- startServer(fs, FRAMEWORK_CONFIG());
- string obj = getObj(fconf, "RegistryAdapter");
- QueryFPrx qPrx = c->stringToProxy<QueryFPrx>(obj);
- {
- std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
- ASSERT_TRUE(endpoints.size() == 2);
- ASSERT_TRUE(endpoints[0].port == 9990);
- ASSERT_TRUE(endpoints[1].port == 9991);
- }
- {
- LOG_CONSOLE_DEBUG << "add TestApp.RpcServer.HelloObj 9992" << endl;
- CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
- TC_Common::sleep(6);
- qPrx->getEndpoint();
- TC_Common::msleep(100);
- {
- std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
- ASSERT_TRUE(endpoints.size() == 3);
- ASSERT_TRUE(endpoints[0].port == 9990);
- ASSERT_TRUE(endpoints[1].port == 9991);
- ASSERT_TRUE(endpoints[2].port == 9992);
- }
- }
- {
- LOG_CONSOLE_DEBUG << "add TestApp.RpcServer.HelloObj 9993" << endl;
- CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9993, 1);
- TC_Common::sleep(6);
- qPrx->getEndpoint();
- TC_Common::msleep(100);
- {
- std::vector<EndpointF> endpoints = qPrx->findObjectById("TestApp.RpcServer.HelloObj");
- ASSERT_TRUE(endpoints.size() == 4);
- ASSERT_TRUE(endpoints[0].port == 9990);
- ASSERT_TRUE(endpoints[1].port == 9991);
- ASSERT_TRUE(endpoints[2].port == 9992);
- ASSERT_TRUE(endpoints[3].port == 9993);
- }
- }
- stopServer(fs);
- }
- #define START_FRAMEWORK_SERVER_1_2 \
- CDbHandle::cleanEndPoint(); \
- CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9990, 1); \
- CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9991, 1); \
- FrameworkServer fs; \
- startServer(fs, FRAMEWORK_CONFIG()); \
- RpcServer rpc1Server; \
- startServer(rpc1Server, RPC1_CONFIG()); \
- RpcServer rpc2Server; \
- startServer(rpc2Server, RPC2_CONFIG());
- #define START_FRAMEWORK_HTTP_SERVER_1_2 \
- CDbHandle::cleanEndPoint(); \
- CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8180, 1); \
- CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8181, 1); \
- FrameworkServer fs; \
- startServer(fs, FRAMEWORK_CONFIG()); \
- RpcServer rpc1Server; \
- startServer(rpc1Server, RPC1_CONFIG()); \
- RpcServer rpc2Server; \
- startServer(rpc2Server, RPC2_CONFIG());
- #define STOP_FRAMEWORK_SERVER stopServer(fs);
- #define HELLO_CALL {\
- HelloPrx qPrx = comm->stringToProxy<HelloPrx>("TestApp.RpcServer.HelloObj"); \
- int ret; \
- string out; \
- ret = qPrx->testHello(0, _buffer, out); \
- ASSERT_TRUE(ret == 0); \
- ASSERT_TRUE(out == _buffer); }
- #define CHECK_REGISTRY_UPDATE {\
- stopServer(rpc1Server); \
- CDbHandle::cleanEndPoint(); \
- CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1); \
- RpcServer rpc3Server; \
- startServer(rpc3Server, RPC3_CONFIG()); \
- wait(6000); \
- HelloPrx qPrx = comm->stringToProxy<HelloPrx>("TestApp.RpcServer.HelloObj"); \
- string out = ""; \
- int ret = qPrx->testHello(0, _buffer, out); \
- ASSERT_TRUE(ret == 0); \
- ASSERT_TRUE(out == _buffer); \
- stopServer(rpc2Server); \
- wait(100); \
- out = ""; \
- ret = qPrx->testHello(0, _buffer, out); \
- ASSERT_TRUE(ret == 0); \
- ASSERT_TRUE(out == _buffer); \
- stopServer(rpc3Server); }
- #define CREATE_COR \
- { \
- auto scheduler = TC_CoroutineScheduler::create(); \
- ServantProxyThreadData::getData()->_sched = scheduler; \
- scheduler->go([&]() \
- { \
- { HELLO_CALL; wait(6500); HELLO_CALL } \
- }); \
- scheduler->run(); \
- }
- #define HTTP_CALL {\
- ServantPrx prx = comm->stringToProxy<ServantPrx>("TestApp.RpcServer.HttpObj"); \
- prx->tars_set_protocol(ServantProxy::PROTOCOL_HTTP1, 5); \
- string buff = _buffer + "-" + TC_Common::tostr(TC_Thread::CURRENT_THREADID()); \
- shared_ptr<TC_HttpRequest> req = std::make_shared<TC_HttpRequest>(); \
- req->setPostRequest("http://tars.com/hello", buff, true); \
- req->setHeader("Connection", "close"); \
- shared_ptr<TC_HttpResponse> rsp; \
- int count = 10; \
- while(count-- > 0) { prx->http_call("hello", req, rsp); } }
- #define HTTP_CREATE_COR \
- { \
- auto scheduler = TC_CoroutineScheduler::create(); \
- ServantProxyThreadData::getData()->_sched = scheduler; \
- scheduler->go([&]() \
- { \
- { HTTP_CALL; wait(6200); HTTP_CALL } \
- }); \
- scheduler->run(); \
- }
- TEST_F(HelloTest, registryRpc)
- {
- // shared_ptr<Communicator> c = getCommunicator();
- START_FRAMEWORK_SERVER_1_2
- shared_ptr<Communicator> c = getCommunicator();
- rpcFromRegistry(c.get());
- LOG_CONSOLE_DEBUG << endl;
- funcInCoroutine([&](){
- rpcFromRegistry(c.get());
- });
- funcInCoroutine([&](){
- shared_ptr<Communicator> comm = getCommunicator();
- rpcFromRegistry(comm.get());
- });
- LOG_CONSOLE_DEBUG << endl;
- stopServer(rpc1Server);
- stopServer(rpc2Server);
- stopServer(fs);
- }
- TEST_F(HelloTest, registryRpcUpdateList)
- {
- shared_ptr<Communicator> comm = getCommunicator();
- comm->setProperty("refresh-endpoint-interval", "5000");
- START_FRAMEWORK_SERVER_1_2
- HELLO_CALL
- CHECK_REGISTRY_UPDATE
- STOP_FRAMEWORK_SERVER
- }
- TEST_F(HelloTest, registryRpcUpdateListInCoroutine1)
- {
- shared_ptr<Communicator> comm = getCommunicator();
- comm->setProperty("refresh-endpoint-interval", "5000");
- START_FRAMEWORK_SERVER_1_2
- funcInCoroutine([&]()
- {
- HELLO_CALL
- CHECK_REGISTRY_UPDATE
- });
- STOP_FRAMEWORK_SERVER
- }
- TEST_F(HelloTest, registryRpcUpdateListInCoroutine2)
- {
- shared_ptr<Communicator> comm = getCommunicator();
- comm->setProperty("refresh-endpoint-interval", "5000");
- START_FRAMEWORK_SERVER_1_2
- HELLO_CALL
- funcInCoroutine([&]()
- {
- CHECK_REGISTRY_UPDATE
- });
- STOP_FRAMEWORK_SERVER
- }
- TEST_F(HelloTest, registryRpcCheckUpdateList)
- {
- shared_ptr<Communicator> comm = getCommunicator();
- comm->setProperty("refresh-endpoint-interval", "5000");
- START_FRAMEWORK_SERVER_1_2
- //发起网路调用
- HELLO_CALL
- HELLO_CALL
- //构建多个线程, 都完成hello的调用
- std::thread cor1([&]()
- {
- CREATE_COR
- });
- std::thread cor2([&]()
- {
- CREATE_COR
- });
- std::thread cor3([&]()
- {
- CREATE_COR
- });
- TC_Common::msleep(200);
- vector<shared_ptr<CommunicatorEpoll>> v = comm->getAllCommunicatorEpoll();
- //5个网络通信器(2个公有, 3个私有)
- ASSERT_TRUE(v.size() == 5);
- ASSERT_FALSE(v[0]->isSchedCommunicatorEpoll());
- ASSERT_FALSE(v[1]->isSchedCommunicatorEpoll());
- ASSERT_TRUE(v[2]->isSchedCommunicatorEpoll());
- ASSERT_TRUE(v[3]->isSchedCommunicatorEpoll());
- ASSERT_TRUE(v[4]->isSchedCommunicatorEpoll());
- vector<ObjectProxy*> vop;
- for (auto ce : v)
- {
- ObjectProxy* op = ce->hasObjectProxy("TestApp.RpcServer.HelloObj");
- if (op)
- {
- vop.push_back(op);
- }
- }
- //网络通信器都有对象
- ASSERT_TRUE(vop.size() == 5);
- for (auto op : vop)
- {
- vector<AdapterProxy*> adapters = op->getAdapters();
- // LOG_CONSOLE_DEBUG << adapters.size() << endl;
- // for(auto a : adapters)
- // {
- // LOG_CONSOLE_DEBUG << a->trans()->getConnectEndpoint().toString() << endl;
- // }
- ASSERT_TRUE(adapters.size() == 2);
- ASSERT_TRUE(adapters[0]->trans()->getConnectEndpoint().getPort() == 9990);
- ASSERT_TRUE(adapters[1]->trans()->getConnectEndpoint().getPort() == 9991);
- }
- stopServer(rpc1Server);
- //更新主控ip list
- CDbHandle::cleanEndPoint();
- CDbHandle::addActiveEndPoint("TestApp.RpcServer.HelloObj", 9992, 1);
- RpcServer rpc3Server;
- startServer(rpc3Server, RPC3_CONFIG());
- //等待主控更新时间
- wait(6000);
- //调用两次, 这样两个公有网路通信器都会更新ip list
- HELLO_CALL
- HELLO_CALL
- stopServer(rpc2Server);
- wait(1000);
- for (auto op : vop)
- {
- vector<AdapterProxy*> adapters = op->getAdapters();
- for(auto adapter : adapters)
- {
- // LOG_CONSOLE_DEBUG << "isSched:"<< op->getCommunicatorEpoll()->isSchedCommunicatorEpoll()
- // << ", netThreadSeq:" << op->getCommunicatorEpoll()->getCommunicatorNetThreadSeq()
- // << ", " << adapter->trans()->getConnectEndpoint().toString()
- // << ", isActiveInReg:" << adapter->isActiveInReg() << endl;
- if(adapter->trans()->getConnectEndpoint().getPort() == 9992)
- {
- ASSERT_TRUE(adapter->isActiveInReg());
- }
- else
- {
- ASSERT_FALSE(adapter->isActiveInReg());
- }
- }
- }
- //第三个服务也停掉
- stopServer(rpc3Server);
- cor1.detach();
- cor2.detach();
- cor3.detach();
- STOP_FRAMEWORK_SERVER
- }
- TEST_F(HelloTest, registryHttpRpcCheckUpdateList)
- {
- shared_ptr<Communicator> comm = getCommunicator();
- comm->setProperty("refresh-endpoint-interval", "5000");
- START_FRAMEWORK_HTTP_SERVER_1_2
- //发起网路调用
- HTTP_CALL
- //构建多个线程, 都完成hello的调用
- std::thread cor1([&]()
- {
- //注意http call里面会wait一下在调用, 等待ip list更新
- HTTP_CREATE_COR
- });
- TC_Common::msleep(200);
- vector<shared_ptr<CommunicatorEpoll>> v = comm->getAllCommunicatorEpoll();
- // for_each(v.begin(), v.end(), [](const shared_ptr<CommunicatorEpoll> &c){
- // LOG_CONSOLE_DEBUG << c->getCommunicatorNetThreadSeq() << ", " << c->isSchedCommunicatorEpoll() << endl;
- // });
- //3个网络通信器(2个公有, 1个私有)
- ASSERT_TRUE(v.size() == 3);
- ASSERT_FALSE(v[0]->isSchedCommunicatorEpoll());
- ASSERT_FALSE(v[1]->isSchedCommunicatorEpoll());
- ASSERT_TRUE(v[2]->isSchedCommunicatorEpoll());
- ServantPrx prx = comm->stringToProxy<ServantPrx>("TestApp.RpcServer.HttpObj"); \
- vector<ObjectProxy*> vop = prx->getObjectProxys();
- //并行连接数*网络通信器个数
- ASSERT_TRUE(vop.size() == 5*v.size());
- for (auto op : vop)
- {
- vector<AdapterProxy*> adapters = op->getAdapters();
- // for_each(adapters.begin(), adapters.end(), [](AdapterProxy*ap){
- // LOG_CONSOLE_DEBUG << ap->trans()->getConnectEndpoint().toString() << endl;
- // });
- // LOG_CONSOLE_DEBUG << op << ", -------------------------------------------------" << endl;
- ASSERT_TRUE(adapters.size() == 2);
- ASSERT_TRUE(adapters[0]->trans()->getConnectEndpoint().getPort() == 8180);
- ASSERT_TRUE(adapters[1]->trans()->getConnectEndpoint().getPort() == 8181);
- }
- stopServer(rpc1Server);
- //更新主控ip list
- CDbHandle::cleanEndPoint();
- CDbHandle::addActiveEndPoint("TestApp.RpcServer.HttpObj", 8182, 1);
- RpcServer rpc3Server;
- startServer(rpc3Server, RPC3_CONFIG());
- //等待主控更新时间
- wait(6000);
- //调用, 触发ip list更新
- HTTP_CALL
- stopServer(rpc2Server);
- wait(100);
- stopServer(rpc3Server);
- for (auto op : vop)
- {
- vector<AdapterProxy*> adapters = op->getAdapters();
- for(auto adapter : adapters)
- {
- // LOG_CONSOLE_DEBUG << "isSched:"<< op->getCommunicatorEpoll()->isSchedCommunicatorEpoll()
- // << ", netThreadSeq:" << op->getCommunicatorEpoll()->getCommunicatorNetThreadSeq()
- // << ", " << adapter->trans()->getConnectEndpoint().toString()
- // << ", isActiveInReg:" << adapter->isActiveInReg() << endl;
- if(adapter->trans()->getConnectEndpoint().getPort() == 8182)
- {
- ASSERT_TRUE(adapter->isActiveInReg());
- }
- else
- {
- ASSERT_FALSE(adapter->isActiveInReg());
- }
- }
- }
- cor1.detach();
- STOP_FRAMEWORK_SERVER
- }
|