hello_test.h 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. 
  2. #include "server/HelloServer.h"
  3. #include "server/Hello.h"
  4. #include "gtest/gtest.h"
  5. #include "certs.h"
  6. #include <mutex>
  7. #include <iostream>
  8. using namespace tars;
  9. using namespace Test;
  10. class PushCallBack : public ServantProxyCallback
  11. {
  12. public:
  13. int onDispatch(ReqMessagePtr msg)
  14. {
  15. if(msg->request.sFuncName == "printResult")
  16. {
  17. string sRet;
  18. sRet.assign(&(msg->response->sBuffer[0]), msg->response->sBuffer.size());
  19. _onprintresult = true;
  20. // printResult(msg->request.iRequestId, sRet);
  21. return 0;
  22. }
  23. else if(msg->response->iRequestId == 0)
  24. {
  25. string sRet;
  26. sRet.assign(&(msg->response->sBuffer[0]), msg->response->sBuffer.size());
  27. ++_onpushinfo;
  28. // printPushInfo(sRet);
  29. return 0;
  30. }
  31. else
  32. {
  33. LOG_CONSOLE_DEBUG << "no match func!" <<endl;
  34. }
  35. return -3;
  36. }
  37. void onConnect(const TC_Endpoint &ep)
  38. {
  39. _onconnect = true;
  40. // LOG_CONSOLE_DEBUG << "onConnect:" << ep.toString() << ", " << _onconnect << ", " << this << endl;
  41. }
  42. void onClose()
  43. {
  44. _onclose = true;
  45. // LOG_CONSOLE_DEBUG << "onClose:" << _onclose << endl;
  46. }
  47. bool _onclose = false;
  48. bool _onconnect = false;
  49. bool _onprintresult = false;
  50. int _onpushinfo = 0;
  51. };
  52. typedef tars::TC_AutoPtr<PushCallBack> PushCallBackPtr;
  53. struct ClientHelloCallback : public HelloPrxCallback
  54. {
  55. ClientHelloCallback(int64_t t, int i, int c, const string &buff, std::atomic<int> &count) : start(t), cur(i), count(c), buffer(buff), callback_count(count)
  56. {
  57. }
  58. ClientHelloCallback(HelloPrx prx, std::atomic<int> &count) : _prx(prx), callback_count(count)
  59. {
  60. }
  61. //回调函数
  62. virtual void callback_testTrans(int ret, const string &r);
  63. //回调函数
  64. virtual void callback_testHello(int ret, const string &r);
  65. virtual void callback_testSyncTrans(tars::Int32 ret, const std::string& r);
  66. virtual void callback_testHello_exception(tars::Int32 ret)
  67. {
  68. LOG_CONSOLE_DEBUG << "callback exception:" << ret << endl;
  69. exit(-1);
  70. }
  71. virtual void callback_testTrans_exception(tars::Int32 ret)
  72. {
  73. LOG_CONSOLE_DEBUG << "callback exception:" << ret << endl;
  74. exit(-1);
  75. }
  76. virtual void callback_testSyncTrans_exception(tars::Int32 ret)
  77. {
  78. LOG_CONSOLE_DEBUG << "callback exception:" << ret << endl;
  79. exit(-1);
  80. }
  81. virtual void callback_testTimeout(int ret)
  82. {
  83. callback = true;
  84. }
  85. virtual void callback_testTimeout_exception(tars::Int32 ret)
  86. {
  87. callback_exception = true;
  88. }
  89. HelloPrx _prx;
  90. bool callback = false;
  91. bool callback_exception = false;
  92. int64_t start;
  93. int cur;
  94. int count;
  95. string buffer;
  96. std::atomic<int> &callback_count;
  97. };
  98. struct HelloHttpCallback : public HttpCallback
  99. {
  100. HelloHttpCallback(int64_t t, int i, int c, const string &buff, std::atomic<int> &count) : start(t), cur(i), count(c),_buff(buff), callback_count(count)
  101. {
  102. }
  103. virtual int onHttpResponse(const shared_ptr<TC_HttpResponse> &rsp)
  104. {
  105. [&]()
  106. {
  107. ASSERT_TRUE(rsp->getContent() == _buff);
  108. }();
  109. callback_count++;
  110. if(cur == count-1)
  111. {
  112. int64_t cost = TC_Common::now2us() - start;
  113. LOG_CONSOLE_DEBUG << "count:" << count << ", " << cost << " us, avg:" << 1.*cost/count << "us" << endl;
  114. }
  115. return 0;
  116. }
  117. virtual int onHttpResponseException(int expCode)
  118. {
  119. LOG_CONSOLE_DEBUG << "onHttpResponseException expCode:" << expCode << endl;
  120. return 0;
  121. }
  122. int64_t start;
  123. int cur;
  124. int count;
  125. string _buff;
  126. std::atomic<int> &callback_count;
  127. };
  128. class CustomCallBack : public ServantProxyCallback
  129. {
  130. public:
  131. CustomCallBack(std::atomic<int> &count) : callback_count(count) {}
  132. virtual int onDispatch(ReqMessagePtr msg)
  133. {
  134. if(msg->response->iRet != tars::TARSSERVERSUCCESS)
  135. {
  136. cout << "ret error:" << msg->response->iRet << endl;
  137. }
  138. else
  139. {
  140. // cout << "succ:" << string(msg->response->sBuffer.data(), msg->response->sBuffer.size()) << endl;
  141. }
  142. ++callback_count;
  143. return msg->response->iRet;
  144. }
  145. virtual void onClose()
  146. {
  147. }
  148. std::atomic<int> &callback_count;
  149. };
  150. typedef tars::TC_AutoPtr<CustomCallBack> CustomCallBackPtr;
  151. class HelloTest : public testing::Test
  152. {
  153. public:
  154. //添加日志
  155. static void SetUpTestCase()
  156. {
  157. }
  158. static void TearDownTestCase()
  159. {
  160. }
  161. virtual void SetUp()
  162. {
  163. _buffer.assign(100, 'a');
  164. _conf = CONFIG();
  165. TC_File::removeFile("RpcServer.tarsdat", false);
  166. }
  167. virtual void TearDown()
  168. {
  169. LOG_CONSOLE_DEBUG << "fd count:" << getFdCounts() << endl;
  170. }
  171. template<typename T>
  172. void startServer(T &server, TC_Config conf, TC_EpollServer::SERVER_OPEN_COROUTINE openCoroutine = TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD)
  173. {
  174. conf.set("/tars/application/server<opencoroutine>", TC_Common::tostr(openCoroutine));
  175. ASSERT_TRUE(conf.get("/tars/application/server<opencoroutine>") == TC_Common::tostr(openCoroutine));
  176. server.main(conf.tostr());
  177. server.start();
  178. server.waitForReady();
  179. }
  180. void startServer(HelloServer &server, TC_EpollServer::SERVER_OPEN_COROUTINE openCoroutine);
  181. template<typename T>
  182. void stopServer(T &server)
  183. {
  184. server.terminate();
  185. server.getThreadControl().join();
  186. }
  187. TC_Endpoint getEndpoint(const string &adapter);
  188. string getObj(TC_Config &conf, const string &adapter)
  189. {
  190. return conf.get("/tars/application/server/" + adapter + "<servant>") + "@" + conf.get("/tars/application/server/" + adapter + "<endpoint>");
  191. }
  192. TC_Endpoint getEndpoint(TC_Config &conf, const string &adapter)
  193. {
  194. return TC_Endpoint(conf.get("/tars/application/server/" + adapter + "<endpoint>"));
  195. }
  196. TC_Endpoint getLocalEndpoint(const TC_Config &conf)
  197. {
  198. return TC_Endpoint(conf.get("/tars/application/server<local>"));
  199. }
  200. template<typename T>
  201. T getObj(Communicator *comm, const string &adapter)
  202. {
  203. comm->setProperty("sendqueuelimit", "1000000");
  204. // comm->setProperty("asyncqueuecap", "1000000");
  205. string obj = getObj(_conf, adapter);
  206. T prx = comm->stringToProxy<T>(obj);
  207. prx->tars_timeout(60000);
  208. prx->tars_async_timeout(60000);
  209. return prx;
  210. }
  211. int getFdCounts()
  212. {
  213. //linux下才生效, 其他平台都是返回的0
  214. vector<string> fds;
  215. TC_File::listDirectory("/proc/" + TC_Common::tostr(getpid()) + "/fd", fds, false);
  216. return fds.size();
  217. }
  218. int getSocketFd(int iSocketType = SOCK_DGRAM, int iDomain = AF_INET)
  219. {
  220. TC_Socket s1;
  221. s1.createSocket(iSocketType, AF_INET);
  222. s1.setOwner(false);
  223. return s1.getfd();
  224. }
  225. void waitForFinish(atomic<int> &callback_count, int count);
  226. void wait(int millseconds);
  227. void syncCustom(Communicator *comm, const string &adapter = "CustomAdapter");
  228. void asyncCustom(Communicator *comm, const string &adapter = "CustomAdapter");
  229. void testPush(Communicator *comm, const string &adapter = "PushAdapter");
  230. void testReconnect(Communicator *comm, const string &adapter = "PushAdapter");
  231. void checkWup();
  232. void checkSyncHttp(Communicator *comm, const string &adapter = "HttpAdapter", bool close = false);
  233. void checkASyncHttp(Communicator *comm, const string &adapter = "HelloAdapter", bool close = false);
  234. void checkSync(Communicator *comm, const string &adapter = "HelloAdapter");
  235. void checkSyncOnce(HelloPrx prx);
  236. void checkASyncOnce(HelloPrx prx);//Communicator *comm, const string &adapter = "HelloAdapter");
  237. void checkASync(Communicator *comm, const string &adapter = "HelloAdapter");
  238. void checkTransSyncASync(Communicator *comm);
  239. void checkTransASyncASync(Communicator *comm);
  240. void checkTransSyncSync(Communicator *comm);
  241. void checkTransASyncSync(Communicator *comm);
  242. void checkWupTransSync(Communicator *comm);
  243. void checkSyncTimeout(Communicator *comm);
  244. void checkASyncTimeout(Communicator *comm);
  245. void checkStat(Communicator *comm, int reportCount);
  246. void rpcFromRegistry(Communicator *comm);
  247. void rpcConHashFromRegistry(Communicator *comm);
  248. void forEach(function<void()> func);
  249. void forEach(function<void(Communicator *comm)> func);
  250. void transGlobalCommunicator(function<void(Communicator *)> func, Communicator *comm);
  251. void transServerCommunicator(function<void(Communicator *)> func);
  252. void transAllocCommunicator(function<void(Communicator *)> func);
  253. void transComplexCommunicator(function<void(Communicator *)> func, Communicator *comm);
  254. void funcInCoroutine(function<void()> func, bool setSched = true);
  255. void forEachInCoroutine(function<void(Communicator *comm)> func);
  256. void transInCoroutineGlobalCommunicator(function<void(Communicator *)> func, Communicator *comm);
  257. void transInCoroutineServerCommunicator(function<void(Communicator *)> func);
  258. void transInCoroutineAllocCommunicator(function<void(Communicator *)> func);
  259. void transInCoroutineComplexCommunicator(function<void(Communicator *)> func, Communicator *comm);
  260. shared_ptr<Communicator> getCommunicator();
  261. protected:
  262. int _count = 1000;
  263. // int _count = 10;
  264. string _buffer;
  265. TC_Config _conf;
  266. };