hello_test.h 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. #include "server/HelloServer.h"
  2. #include "server/Hello.h"
  3. #include "gtest/gtest.h"
  4. #include "certs.h"
  5. #include <mutex>
  6. #include <iostream>
  7. using namespace tars;
  8. using namespace Test;
  9. class PushCallBack : public ServantProxyCallback
  10. {
  11. public:
  12. int onDispatch(ReqMessagePtr msg)
  13. {
  14. if(msg->request.sFuncName == "printResult")
  15. {
  16. string sRet;
  17. sRet.assign(&(msg->response->sBuffer[0]), msg->response->sBuffer.size());
  18. _onprintresult = true;
  19. // printResult(msg->request.iRequestId, sRet);
  20. return 0;
  21. }
  22. else if(msg->response->iRequestId == 0)
  23. {
  24. string sRet;
  25. sRet.assign(&(msg->response->sBuffer[0]), msg->response->sBuffer.size());
  26. ++_onpushinfo;
  27. // printPushInfo(sRet);
  28. return 0;
  29. }
  30. else
  31. {
  32. LOG_CONSOLE_DEBUG << "no match func!" <<endl;
  33. }
  34. return -3;
  35. }
  36. void onConnect(const TC_Endpoint &ep)
  37. {
  38. _onconnect = true;
  39. // LOG_CONSOLE_DEBUG << "onConnect:" << ep.toString() << ", " << _onconnect << ", " << this << endl;
  40. }
  41. void onClose()
  42. {
  43. _onclose = true;
  44. // LOG_CONSOLE_DEBUG << "onClose:" << _onclose << endl;
  45. }
  46. bool _onclose = false;
  47. bool _onconnect = false;
  48. bool _onprintresult = false;
  49. int _onpushinfo = 0;
  50. };
  51. typedef tars::TC_AutoPtr<PushCallBack> PushCallBackPtr;
  52. struct ClientHelloCallback : public HelloPrxCallback
  53. {
  54. ClientHelloCallback(int64_t t, int i, int c, const string &buff, std::atomic<int> &count) : start(t), cur(i), count(c), buffer(buff), callback_count(count)
  55. {
  56. }
  57. ClientHelloCallback(HelloPrx prx, std::atomic<int> &count) : _prx(prx), callback_count(count)
  58. {
  59. }
  60. //回调函数
  61. virtual void callback_testTrans(int ret, const string &r);
  62. //回调函数
  63. virtual void callback_testHello(int ret, const string &r);
  64. virtual void callback_testSyncTrans(tars::Int32 ret, const std::string& r);
  65. virtual void callback_testHello_exception(tars::Int32 ret)
  66. {
  67. LOG_CONSOLE_DEBUG << "callback exception:" << ret << endl;
  68. exit(-1);
  69. }
  70. virtual void callback_testTrans_exception(tars::Int32 ret)
  71. {
  72. LOG_CONSOLE_DEBUG << "callback exception:" << ret << endl;
  73. exit(-1);
  74. }
  75. virtual void callback_testSyncTrans_exception(tars::Int32 ret)
  76. {
  77. LOG_CONSOLE_DEBUG << "callback exception:" << ret << endl;
  78. exit(-1);
  79. }
  80. virtual void callback_testTimeout(int ret)
  81. {
  82. callback = true;
  83. }
  84. virtual void callback_testTimeout_exception(tars::Int32 ret)
  85. {
  86. callback_exception = true;
  87. }
  88. HelloPrx _prx;
  89. bool callback = false;
  90. bool callback_exception = false;
  91. int64_t start;
  92. int cur;
  93. int count;
  94. string buffer;
  95. std::atomic<int> &callback_count;
  96. };
  97. struct HelloHttpCallback : public HttpCallback
  98. {
  99. HelloHttpCallback(int64_t t, int i, int c, const string &buff, std::atomic<int> &count) : start(t), cur(i), count(c),_buff(buff), callback_count(count)
  100. {
  101. }
  102. virtual int onHttpResponse(const shared_ptr<TC_HttpResponse> &rsp)
  103. {
  104. [&]()
  105. {
  106. ASSERT_TRUE(rsp->getContent() == _buff);
  107. }();
  108. callback_count++;
  109. if(cur == count-1)
  110. {
  111. int64_t cost = TC_Common::now2us() - start;
  112. LOG_CONSOLE_DEBUG << "count:" << count << ", " << cost << " us, avg:" << 1.*cost/count << "us" << endl;
  113. }
  114. return 0;
  115. }
  116. virtual int onHttpResponseException(int expCode)
  117. {
  118. LOG_CONSOLE_DEBUG << "onHttpResponseException expCode:" << expCode << endl;
  119. return 0;
  120. }
  121. int64_t start;
  122. int cur;
  123. int count;
  124. string _buff;
  125. std::atomic<int> &callback_count;
  126. };
  127. class CustomCallBack : public ServantProxyCallback
  128. {
  129. public:
  130. CustomCallBack(std::atomic<int> &count) : callback_count(count) {}
  131. virtual int onDispatch(ReqMessagePtr msg)
  132. {
  133. if(msg->response->iRet != tars::TARSSERVERSUCCESS)
  134. {
  135. cout << "ret error:" << msg->response->iRet << endl;
  136. }
  137. else
  138. {
  139. // cout << "succ:" << string(msg->response->sBuffer.data(), msg->response->sBuffer.size()) << endl;
  140. }
  141. ++callback_count;
  142. return msg->response->iRet;
  143. }
  144. virtual void onClose()
  145. {
  146. }
  147. std::atomic<int> &callback_count;
  148. };
  149. typedef tars::TC_AutoPtr<CustomCallBack> CustomCallBackPtr;
  150. class HelloTest : public testing::Test
  151. {
  152. public:
  153. //添加日志
  154. static void SetUpTestCase()
  155. {
  156. }
  157. static void TearDownTestCase()
  158. {
  159. }
  160. virtual void SetUp()
  161. {
  162. _buffer.assign(100, 'a');
  163. _conf = CONFIG();
  164. TC_File::removeFile("RpcServer.tarsdat", false);
  165. }
  166. virtual void TearDown()
  167. {
  168. LOG_CONSOLE_DEBUG << "fd count:" << getFdCounts() << endl;
  169. }
  170. template<typename T>
  171. void startServer(T &server, TC_Config conf, TC_EpollServer::SERVER_OPEN_COROUTINE openCoroutine = TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD)
  172. {
  173. conf.set("/tars/application/server<opencoroutine>", TC_Common::tostr(openCoroutine));
  174. ASSERT_TRUE(conf.get("/tars/application/server<opencoroutine>") == TC_Common::tostr(openCoroutine));
  175. server.main(conf.tostr());
  176. server.start();
  177. server.waitForReady();
  178. }
  179. void startServer(HelloServer &server, TC_EpollServer::SERVER_OPEN_COROUTINE openCoroutine);
  180. template<typename T>
  181. void stopServer(T &server)
  182. {
  183. server.terminate();
  184. server.getThreadControl().join();
  185. }
  186. TC_Endpoint getEndpoint(const string &adapter);
  187. string getObj(TC_Config &conf, const string &adapter)
  188. {
  189. return conf.get("/tars/application/server/" + adapter + "<servant>") + "@" + conf.get("/tars/application/server/" + adapter + "<endpoint>");
  190. }
  191. TC_Endpoint getEndpoint(TC_Config &conf, const string &adapter)
  192. {
  193. return TC_Endpoint(conf.get("/tars/application/server/" + adapter + "<endpoint>"));
  194. }
  195. TC_Endpoint getLocalEndpoint(const TC_Config &conf)
  196. {
  197. return TC_Endpoint(conf.get("/tars/application/server<local>"));
  198. }
  199. template<typename T>
  200. T getObj(Communicator *comm, const string &adapter)
  201. {
  202. comm->setProperty("sendqueuelimit", "1000000");
  203. // comm->setProperty("asyncqueuecap", "1000000");
  204. string obj = getObj(_conf, adapter);
  205. T prx = comm->stringToProxy<T>(obj);
  206. prx->tars_timeout(60000);
  207. prx->tars_async_timeout(60000);
  208. return prx;
  209. }
  210. int getFdCounts()
  211. {
  212. //linux下才生效, 其他平台都是返回的0
  213. vector<string> fds;
  214. TC_File::listDirectory("/proc/" + TC_Common::tostr(getpid()) + "/fd", fds, false);
  215. return fds.size();
  216. }
  217. int getSocketFd(int iSocketType = SOCK_DGRAM, int iDomain = AF_INET)
  218. {
  219. TC_Socket s1;
  220. s1.createSocket(iSocketType, AF_INET);
  221. s1.setOwner(false);
  222. return s1.getfd();
  223. }
  224. void waitForFinish(atomic<int> &callback_count, int count);
  225. void wait(int millseconds);
  226. void syncCustom(Communicator *comm, const string &adapter = "CustomAdapter");
  227. void asyncCustom(Communicator *comm, const string &adapter = "CustomAdapter");
  228. void testPush(Communicator *comm, const string &adapter = "PushAdapter");
  229. void testReconnect(Communicator *comm, const string &adapter = "PushAdapter");
  230. void checkWup();
  231. void checkSyncHttp(Communicator *comm, const string &adapter = "HttpAdapter", bool close = false);
  232. void checkASyncHttp(Communicator *comm, const string &adapter = "HelloAdapter", bool close = false);
  233. void checkSync(Communicator *comm, const string &adapter = "HelloAdapter");
  234. void checkSyncOnce(HelloPrx prx);
  235. void checkASyncOnce(HelloPrx prx);//Communicator *comm, const string &adapter = "HelloAdapter");
  236. void checkASync(Communicator *comm, const string &adapter = "HelloAdapter");
  237. void checkTransSyncASync(Communicator *comm);
  238. void checkTransASyncASync(Communicator *comm);
  239. void checkTransSyncSync(Communicator *comm);
  240. void checkTransASyncSync(Communicator *comm);
  241. void checkWupTransSync(Communicator *comm);
  242. void checkSyncTimeout(Communicator *comm);
  243. void checkASyncTimeout(Communicator *comm);
  244. void checkStat(Communicator *comm, int reportCount);
  245. void rpcFromRegistry(Communicator *comm);
  246. void forEach(function<void()> func);
  247. void forEach(function<void(Communicator *comm)> func);
  248. void transGlobalCommunicator(function<void(Communicator *)> func, Communicator *comm);
  249. void transServerCommunicator(function<void(Communicator *)> func);
  250. void transAllocCommunicator(function<void(Communicator *)> func);
  251. void transComplexCommunicator(function<void(Communicator *)> func, Communicator *comm);
  252. void funcInCoroutine(function<void()> func, bool setSched = true);
  253. void forEachInCoroutine(function<void(Communicator *comm)> func);
  254. void transInCoroutineGlobalCommunicator(function<void(Communicator *)> func, Communicator *comm);
  255. void transInCoroutineServerCommunicator(function<void(Communicator *)> func);
  256. void transInCoroutineAllocCommunicator(function<void(Communicator *)> func);
  257. void transInCoroutineComplexCommunicator(function<void(Communicator *)> func, Communicator *comm);
  258. shared_ptr<Communicator> getCommunicator();
  259. protected:
  260. int _count = 1000;
  261. // int _count = 10;
  262. string _buffer;
  263. TC_Config _conf;
  264. };