rpc_server.h 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. /*
  2. Copyright (c) 2020 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __RPC_SERVER_H__
  14. #define __RPC_SERVER_H__
  15. #include <map>
  16. #include <string>
  17. #include <errno.h>
  18. #include <workflow/WFServer.h>
  19. #include <workflow/WFHttpServer.h>
  20. #include "rpc_types.h"
  21. #include "rpc_service.h"
  22. #include "rpc_options.h"
  23. #include "rpc_trace_module.h"
  24. #include "rpc_metrics_module.h"
  25. namespace srpc
  26. {
  27. template<class RPCTYPE>
  28. class RPCServer : public WFServer<typename RPCTYPE::REQ,
  29. typename RPCTYPE::RESP>
  30. {
  31. public:
  32. using REQTYPE = typename RPCTYPE::REQ;
  33. using RESPTYPE = typename RPCTYPE::RESP;
  34. using TASK = RPCServerTask<REQTYPE, RESPTYPE>;
  35. using SERIES = typename TASK::RPCSeries;
  36. protected:
  37. using NETWORKTASK = WFNetworkTask<REQTYPE, RESPTYPE>;
  38. public:
  39. RPCServer();
  40. RPCServer(const struct RPCServerParams *params);
  41. int add_service(RPCService *service);
  42. const RPCService* find_service(const std::string& name) const;
  43. void add_filter(RPCFilter *filter);
  44. protected:
  45. RPCServer(const struct RPCServerParams *params,
  46. std::function<void (NETWORKTASK *)>&& process);
  47. CommSession *new_session(long long seq, CommConnection *conn) override;
  48. void server_process(NETWORKTASK *task) const;
  49. private:
  50. std::mutex mutex;
  51. std::map<std::string, RPCService *> service_map;
  52. RPCModule *modules[SRPC_MODULE_MAX] = { NULL };
  53. };
  54. ////////
  55. // inl
  56. template<class RPCTYPE>
  57. inline RPCServer<RPCTYPE>::RPCServer():
  58. WFServer<REQTYPE, RESPTYPE>(&RPC_SERVER_PARAMS_DEFAULT,
  59. std::bind(&RPCServer::server_process,
  60. this, std::placeholders::_1))
  61. {}
  62. template<class RPCTYPE>
  63. inline RPCServer<RPCTYPE>::RPCServer(const struct RPCServerParams *params):
  64. WFServer<REQTYPE, RESPTYPE>(params,
  65. std::bind(&RPCServer::server_process,
  66. this, std::placeholders::_1))
  67. {}
  68. template<class RPCTYPE>
  69. inline RPCServer<RPCTYPE>::RPCServer(const struct RPCServerParams *params,
  70. std::function<void (NETWORKTASK *)>&& process):
  71. WFServer<REQTYPE, RESPTYPE>(&params, std::move(process))
  72. {}
  73. template<class RPCTYPE>
  74. inline int RPCServer<RPCTYPE>::add_service(RPCService* service)
  75. {
  76. const auto it = this->service_map.emplace(service->get_name(), service);
  77. if (!it.second)
  78. {
  79. errno = EEXIST;
  80. return -1;
  81. }
  82. return 0;
  83. }
  84. template<>
  85. inline int RPCServer<RPCTYPESRPC>::add_service(RPCService* service)
  86. {
  87. const std::string &name = service->get_name();
  88. const auto it = this->service_map.emplace(name, service);
  89. if (!it.second)
  90. {
  91. errno = EEXIST;
  92. return -1;
  93. }
  94. auto pos = name.find_last_of('.');
  95. if (pos != std::string::npos)
  96. this->service_map.emplace(name.substr(pos + 1), service);
  97. return 0;
  98. }
  99. template<>
  100. inline int RPCServer<RPCTYPESRPCHttp>::add_service(RPCService* service)
  101. {
  102. const std::string &name = service->get_name();
  103. const auto it = this->service_map.emplace(name, service);
  104. if (!it.second)
  105. {
  106. errno = EEXIST;
  107. return -1;
  108. }
  109. auto pos = name.find_last_of('.');
  110. if (pos != std::string::npos)
  111. this->service_map.emplace(name.substr(pos + 1), service);
  112. return 0;
  113. }
  114. template<class RPCTYPE>
  115. void RPCServer<RPCTYPE>::add_filter(RPCFilter *filter)
  116. {
  117. using CLIENT_TASK = RPCClientTask<typename RPCTYPE::REQ,
  118. typename RPCTYPE::RESP>;
  119. using SERVER_TASK = RPCServerTask<typename RPCTYPE::REQ,
  120. typename RPCTYPE::RESP>;
  121. int type = filter->get_module_type();
  122. this->mutex.lock();
  123. if (type < SRPC_MODULE_MAX && type >= 0)
  124. {
  125. RPCModule *module = this->modules[type];
  126. if (!module)
  127. {
  128. switch (type)
  129. {
  130. case RPCModuleTypeTrace:
  131. module = new RPCTraceModule<SERVER_TASK, CLIENT_TASK>();
  132. break;
  133. case RPCModuleTypeMetrics:
  134. module = new RPCMetricsModule<SERVER_TASK, CLIENT_TASK>();
  135. break;
  136. default:
  137. break;
  138. }
  139. this->modules[type] = module;
  140. }
  141. if (module)
  142. module->add_filter(filter);
  143. }
  144. this->mutex.unlock();
  145. return;
  146. }
  147. template<class RPCTYPE>
  148. inline const RPCService *
  149. RPCServer<RPCTYPE>::find_service(const std::string& name) const
  150. {
  151. const auto it = this->service_map.find(name);
  152. if (it != this->service_map.cend())
  153. return it->second;
  154. return NULL;
  155. }
  156. template<class RPCTYPE>
  157. inline CommSession *RPCServer<RPCTYPE>::new_session(long long seq,
  158. CommConnection *conn)
  159. {
  160. /* TODO: Change to a factory function. */
  161. std::list<RPCModule *> module;
  162. for (int i = 0; i < SRPC_MODULE_MAX; i++)
  163. {
  164. if (this->modules[i])
  165. module.push_back(this->modules[i]);
  166. }
  167. auto *task = new TASK(this, this->process, std::move(module));
  168. task->set_keep_alive(this->params.keep_alive_timeout);
  169. task->get_req()->set_size_limit(this->params.request_size_limit);
  170. return task;
  171. }
  172. template<class RPCTYPE>
  173. void RPCServer<RPCTYPE>::server_process(NETWORKTASK *task) const
  174. {
  175. auto *req = task->get_req();
  176. auto *resp = task->get_resp();
  177. int status_code;
  178. if (!req->deserialize_meta())
  179. status_code = RPCStatusMetaError;
  180. else
  181. {
  182. auto *server_task = static_cast<TASK *>(task);
  183. RPCModuleData *task_data = server_task->mutable_module_data();
  184. req->get_meta_module_data(*task_data);
  185. RPCTYPE::server_reply_init(req, resp);
  186. auto *service = this->find_service(req->get_service_name());
  187. if (!service)
  188. status_code = RPCStatusServiceNotFound;
  189. else
  190. {
  191. auto *rpc = service->find_method(req->get_method_name());
  192. if (!rpc)
  193. status_code = RPCStatusMethodNotFound;
  194. else
  195. {
  196. for (auto *module : this->modules)
  197. {
  198. if (module)
  199. module->server_task_begin(server_task, *task_data);
  200. }
  201. status_code = req->decompress();
  202. if (status_code == RPCStatusOK)
  203. status_code = (*rpc)(server_task->worker);
  204. }
  205. }
  206. SERIES *series = static_cast<SERIES *>(series_of(task));
  207. series->set_module_data(task_data);
  208. }
  209. resp->set_status_code(status_code);
  210. }
  211. template<>
  212. inline const RPCService *
  213. RPCServer<RPCTYPEThrift>::find_service(const std::string& name) const
  214. {
  215. if (this->service_map.empty())
  216. return NULL;
  217. return this->service_map.cbegin()->second;
  218. }
  219. template<>
  220. inline const RPCService *
  221. RPCServer<RPCTYPEThriftHttp>::find_service(const std::string& name) const
  222. {
  223. if (this->service_map.empty())
  224. return NULL;
  225. return this->service_map.cbegin()->second;
  226. }
  227. } // namespace srpc
  228. #endif