rpc_task.inl 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695
  1. /*
  2. Copyright (c) 2020 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <string>
  16. #include <functional>
  17. #include <workflow/WFGlobal.h>
  18. #include <workflow/WFTask.h>
  19. #include <workflow/WFTaskFactory.h>
  20. #include <workflow/WFFuture.h>
  21. #include "rpc_basic.h"
  22. #include "rpc_message.h"
  23. #include "rpc_options.h"
  24. #include "rpc_global.h"
  25. namespace srpc
  26. {
  27. class RPCWorker
  28. {
  29. public:
  30. RPCWorker(RPCContext *ctx, RPCMessage *req, RPCMessage *resp)
  31. {
  32. this->ctx = ctx;
  33. this->req = req;
  34. this->resp = resp;
  35. this->__server_serialize = NULL;
  36. }
  37. ~RPCWorker()
  38. {
  39. delete this->ctx;
  40. delete this->pb_input;
  41. delete this->pb_output;
  42. delete this->thrift_intput;
  43. delete this->thrift_output;
  44. }
  45. void set_server_input(ProtobufIDLMessage *input)
  46. {
  47. this->pb_input = input;
  48. }
  49. void set_server_input(ThriftIDLMessage *input)
  50. {
  51. this->thrift_intput = input;
  52. }
  53. void set_server_output(ProtobufIDLMessage *output)
  54. {
  55. this->pb_output = output;
  56. this->__server_serialize = &RPCWorker::resp_serialize_pb;
  57. }
  58. void set_server_output(ThriftIDLMessage *output)
  59. {
  60. this->thrift_output = output;
  61. this->__server_serialize = &RPCWorker::resp_serialize_thrift;
  62. }
  63. int server_serialize()
  64. {
  65. if (!this->__server_serialize)
  66. return RPCStatusOK;
  67. return (this->*__server_serialize)();
  68. }
  69. public:
  70. RPCContext *ctx;
  71. RPCMessage *req;
  72. RPCMessage *resp;
  73. private:
  74. int resp_serialize_pb()
  75. {
  76. return this->resp->serialize(this->pb_output);
  77. }
  78. int resp_serialize_thrift()
  79. {
  80. return this->resp->serialize(this->thrift_output);
  81. }
  82. int (RPCWorker::*__server_serialize)();
  83. ProtobufIDLMessage *pb_input = NULL;
  84. ProtobufIDLMessage *pb_output = NULL;
  85. ThriftIDLMessage *thrift_intput = NULL;
  86. ThriftIDLMessage *thrift_output = NULL;
  87. };
  88. template<class RPCREQ, class RPCRESP>
  89. class RPCClientTask : public WFComplexClientTask<RPCREQ, RPCRESP>
  90. {
  91. public:
  92. // before rpc call
  93. void set_data_type(RPCDataType type);
  94. void set_compress_type(RPCCompressType type);
  95. void set_retry_max(int retry_max);
  96. void set_attachment_nocopy(const char *attachment, size_t len);
  97. int set_uri_fragment(const std::string& fragment);
  98. int serialize_input(const ProtobufIDLMessage *in);
  99. int serialize_input(const ThriftIDLMessage *in);
  100. // similar to opentracing: log({{"event", "error"}, {"message", "application log"}});
  101. void log(const RPCLogVector& fields);
  102. // Baggage Items, which are just key:value pairs that cross process boundaries
  103. void add_baggage(const std::string& key, const std::string& value);
  104. bool get_baggage(const std::string& key, std::string& value);
  105. bool set_http_header(const std::string& name, const std::string& value);
  106. bool add_http_header(const std::string& name, const std::string& value);
  107. // JsonPrintOptions
  108. void set_json_add_whitespace(bool on);
  109. void set_json_always_print_enums_as_ints(bool on);
  110. void set_json_preserve_proto_field_names(bool on);
  111. void set_json_always_print_primitive_fields(bool on);
  112. protected:
  113. using user_done_t = std::function<int (int, RPCWorker&)>;
  114. using WFComplexClientTask<RPCREQ, RPCRESP>::set_callback;
  115. void init_failed() override;
  116. bool check_request() override;
  117. CommMessageOut *message_out() override;
  118. bool finish_once() override;
  119. void rpc_callback(WFNetworkTask<RPCREQ, RPCRESP> *task);
  120. int first_timeout() override { return watch_timeout_; }
  121. public:
  122. RPCClientTask(const std::string& service_name,
  123. const std::string& method_name,
  124. const RPCTaskParams *params,
  125. std::list<RPCModule *>&& modules,
  126. user_done_t&& user_done);
  127. bool get_remote(std::string& ip, unsigned short *port) const;
  128. RPCModuleData *mutable_module_data() { return &module_data_; }
  129. void set_module_data(RPCModuleData data) { module_data_ = std::move(data); }
  130. private:
  131. template<class IDL>
  132. int __serialize_input(const IDL *in);
  133. user_done_t user_done_;
  134. bool init_failed_;
  135. int watch_timeout_;
  136. RPCModuleData module_data_;
  137. std::list<RPCModule *> modules_;
  138. };
  139. template<class RPCREQ, class RPCRESP>
  140. class RPCServerTask : public WFServerTask<RPCREQ, RPCRESP>
  141. {
  142. public:
  143. RPCServerTask(CommService *service,
  144. std::function<void (WFNetworkTask<RPCREQ, RPCRESP> *)>& process,
  145. std::list<RPCModule *>&& modules) :
  146. WFServerTask<RPCREQ, RPCRESP>(service, WFGlobal::get_scheduler(), process),
  147. worker(new RPCContextImpl<RPCREQ, RPCRESP>(this, &module_data_),
  148. &this->req, &this->resp),
  149. modules_(std::move(modules))
  150. {
  151. }
  152. public:
  153. class RPCSeries : public WFServerTask<RPCREQ, RPCRESP>::Series
  154. {
  155. public:
  156. RPCSeries(WFServerTask<RPCREQ, RPCRESP> *task) :
  157. WFServerTask<RPCREQ, RPCRESP>::Series(task),
  158. module_data(NULL)
  159. {}
  160. RPCModuleData *get_module_data() { return this->module_data; }
  161. void set_module_data(RPCModuleData *data) { this->module_data = data; }
  162. virtual void *get_specific(const char *key)
  163. {
  164. if (strcmp(key, SRPC_MODULE_DATA) == 0)
  165. return this->module_data;
  166. else
  167. return NULL;
  168. }
  169. private:
  170. RPCModuleData *module_data;
  171. };
  172. protected:
  173. CommMessageOut *message_out() override;
  174. void handle(int state, int error) override;
  175. public:
  176. bool get_remote(std::string& ip, unsigned short *port) const;
  177. RPCModuleData *mutable_module_data() { return &module_data_; }
  178. void set_module_data(RPCModuleData data) { module_data_ = std::move(data); }
  179. public:
  180. RPCWorker worker;
  181. private:
  182. RPCModuleData module_data_;
  183. std::list<RPCModule *> modules_;
  184. };
  185. template<class OUTPUT>
  186. static void RPCAsyncFutureCallback(OUTPUT *output, srpc::RPCContext *ctx)
  187. {
  188. using RESULT = std::pair<OUTPUT, srpc::RPCSyncContext>;
  189. auto *pr = static_cast<WFPromise<RESULT> *>(ctx->get_user_data());
  190. RESULT res;
  191. res.second.seqid = ctx->get_seqid();
  192. res.second.errmsg = ctx->get_errmsg();
  193. res.second.remote_ip = ctx->get_remote_ip();
  194. res.second.status_code = ctx->get_status_code();
  195. res.second.error = ctx->get_error();
  196. res.second.success = ctx->success();
  197. res.second.timeout_reason = ctx->get_timeout_reason();
  198. if (res.second.success)
  199. res.first = std::move(*output);
  200. pr->set_value(std::move(res));
  201. delete pr;
  202. }
  203. template<class OUTPUT>
  204. static void ThriftSendCallback(OUTPUT *output, srpc::RPCContext *ctx)
  205. {
  206. auto *receiver = static_cast<ThriftReceiver<OUTPUT> *>(ctx->get_user_data());
  207. receiver->mutex.lock();
  208. receiver->ctx.seqid = ctx->get_seqid();
  209. receiver->ctx.errmsg = ctx->get_errmsg();
  210. receiver->ctx.remote_ip = ctx->get_remote_ip();
  211. receiver->ctx.status_code = ctx->get_status_code();
  212. receiver->ctx.error = ctx->get_error();
  213. receiver->ctx.success = ctx->success();
  214. if (receiver->ctx.success)
  215. receiver->output = std::move(*output);
  216. receiver->is_done = true;
  217. receiver->cond.notify_one();
  218. receiver->mutex.unlock();
  219. }
  220. template<class OUTPUT>
  221. static inline int
  222. ClientRPCDoneImpl(int status_code,
  223. RPCWorker& worker,
  224. const std::function<void (OUTPUT *, RPCContext *)>& rpc_done)
  225. {
  226. if (status_code == RPCStatusOK)
  227. {
  228. OUTPUT out;
  229. status_code = worker.resp->deserialize(&out);
  230. if (status_code == RPCStatusOK)
  231. rpc_done(&out, worker.ctx);
  232. return status_code;
  233. }
  234. rpc_done(NULL, worker.ctx);
  235. return status_code;
  236. }
  237. template<class RPCREQ, class RPCRESP>
  238. CommMessageOut *RPCServerTask<RPCREQ, RPCRESP>::message_out()
  239. {
  240. int status_code = this->worker.server_serialize();
  241. if (status_code == RPCStatusOK)
  242. status_code = this->resp.compress();
  243. if (status_code == RPCStatusOK)
  244. {
  245. if (!this->resp.serialize_meta())
  246. status_code = RPCStatusMetaError;
  247. }
  248. if (this->resp.get_status_code() == RPCStatusOK)
  249. this->resp.set_status_code(status_code);
  250. // for server, this is the where series->module_data stored
  251. RPCModuleData *data = this->mutable_module_data();
  252. for (auto *module : modules_)
  253. module->server_task_end(this, *data);
  254. this->resp.set_meta_module_data(*data);
  255. if (status_code == RPCStatusOK)
  256. return this->WFServerTask<RPCREQ, RPCRESP>::message_out();
  257. errno = EBADMSG;
  258. return NULL;
  259. }
  260. template<class RPCREQ, class RPCRESP>
  261. void RPCServerTask<RPCREQ, RPCRESP>::handle(int state, int error)
  262. {
  263. if (state != WFT_STATE_TOREPLY)
  264. return WFServerTask<RPCREQ, RPCRESP>::handle(state, error);
  265. this->state = WFT_STATE_TOREPLY;
  266. this->target = this->get_target();
  267. RPCSeries *series = new RPCSeries(this);
  268. series->start();
  269. }
  270. template<class RPCREQ, class RPCRESP>
  271. inline void RPCClientTask<RPCREQ, RPCRESP>::set_data_type(RPCDataType type)
  272. {
  273. this->req.set_data_type(type);
  274. }
  275. template<class RPCREQ, class RPCRESP>
  276. inline void RPCClientTask<RPCREQ, RPCRESP>::set_compress_type(RPCCompressType type)
  277. {
  278. this->req.set_compress_type(type);
  279. }
  280. template<class RPCREQ, class RPCRESP>
  281. inline void RPCClientTask<RPCREQ, RPCRESP>::set_attachment_nocopy(const char *attachment,
  282. size_t len)
  283. {
  284. this->req.set_attachment_nocopy(attachment, len);
  285. }
  286. template<class RPCREQ, class RPCRESP>
  287. int RPCClientTask<RPCREQ, RPCRESP>::set_uri_fragment(const std::string& fragment)
  288. {
  289. char *str = strdup(fragment.c_str());
  290. if (str)
  291. {
  292. free(this->uri_.fragment);
  293. this->uri_.fragment = str;
  294. return 0;
  295. }
  296. return -1;
  297. }
  298. template<class RPCREQ, class RPCRESP>
  299. inline void RPCClientTask<RPCREQ, RPCRESP>::set_retry_max(int retry_max)
  300. {
  301. this->retry_max_ = retry_max;
  302. }
  303. template<class RPCREQ, class RPCRESP>
  304. inline int RPCClientTask<RPCREQ, RPCRESP>::serialize_input(const ProtobufIDLMessage *in)
  305. {
  306. return __serialize_input<ProtobufIDLMessage>(in);
  307. }
  308. template<class RPCREQ, class RPCRESP>
  309. inline int RPCClientTask<RPCREQ, RPCRESP>::serialize_input(const ThriftIDLMessage *in)
  310. {
  311. return __serialize_input<ThriftIDLMessage>(in);
  312. }
  313. template<class RPCREQ, class RPCRESP>
  314. template<class IDL>
  315. inline int RPCClientTask<RPCREQ, RPCRESP>::__serialize_input(const IDL *in)
  316. {
  317. if (init_failed_ == false)
  318. {
  319. int status_code = this->req.serialize(in);
  320. this->resp.set_status_code(status_code);
  321. if (status_code == RPCStatusOK)
  322. return 0;
  323. }
  324. return -1;
  325. }
  326. template<class RPCREQ, class RPCRESP>
  327. inline RPCClientTask<RPCREQ, RPCRESP>::RPCClientTask(
  328. const std::string& service_name,
  329. const std::string& method_name,
  330. const RPCTaskParams *params,
  331. std::list<RPCModule *>&& modules,
  332. user_done_t&& user_done):
  333. WFComplexClientTask<RPCREQ, RPCRESP>(0, nullptr),
  334. user_done_(std::move(user_done)),
  335. init_failed_(false),
  336. modules_(std::move(modules))
  337. {
  338. if (user_done_)
  339. this->set_callback(std::bind(&RPCClientTask::rpc_callback,
  340. this, std::placeholders::_1));
  341. this->set_send_timeout(params->send_timeout);
  342. this->set_receive_timeout(params->receive_timeout);
  343. watch_timeout_ = params->watch_timeout;
  344. this->set_keep_alive(params->keep_alive_timeout);
  345. this->set_retry_max(params->retry_max);
  346. if (params->compress_type != RPCCompressNone)
  347. this->req.set_compress_type(params->compress_type);
  348. if (params->data_type != RPCDataUndefined)
  349. this->req.set_data_type(params->data_type);
  350. this->req.set_service_name(service_name);
  351. this->req.set_method_name(method_name);
  352. }
  353. template<class RPCREQ, class RPCRESP>
  354. void RPCClientTask<RPCREQ, RPCRESP>::init_failed()
  355. {
  356. init_failed_ = true;
  357. }
  358. template<class RPCREQ, class RPCRESP>
  359. bool RPCClientTask<RPCREQ, RPCRESP>::check_request()
  360. {
  361. int status_code = this->resp.get_status_code();
  362. return status_code == RPCStatusOK || status_code == RPCStatusUndefined;
  363. }
  364. template<class RPCREQ, class RPCRESP>
  365. CommMessageOut *RPCClientTask<RPCREQ, RPCRESP>::message_out()
  366. {
  367. this->req.set_seqid(this->get_task_seq());
  368. int status_code = this->req.compress();
  369. void *series_data = series_of(this)->get_specific(SRPC_MODULE_DATA);
  370. RPCModuleData *data = (RPCModuleData *)series_data;
  371. if (data)
  372. this->set_module_data(*data);
  373. data = this->mutable_module_data();
  374. for (auto *module : modules_)
  375. module->client_task_begin(this, *data);
  376. this->req.set_meta_module_data(*data);
  377. if (status_code == RPCStatusOK)
  378. {
  379. if (!this->req.serialize_meta())
  380. status_code = RPCStatusMetaError;
  381. }
  382. if (status_code == RPCStatusOK)
  383. return this->WFClientTask<RPCREQ, RPCRESP>::message_out();
  384. this->disable_retry();
  385. this->resp.set_status_code(status_code);
  386. errno = EBADMSG;
  387. return NULL;
  388. }
  389. template<class RPCREQ, class RPCRESP>
  390. bool RPCClientTask<RPCREQ, RPCRESP>::finish_once()
  391. {
  392. int status_code = this->resp.get_status_code();
  393. if (this->state == WFT_STATE_SUCCESS &&
  394. (status_code == RPCStatusOK || status_code == RPCStatusUndefined))
  395. {
  396. if (this->resp.deserialize_meta() == false)
  397. this->resp.set_status_code(RPCStatusMetaError);
  398. }
  399. return true;
  400. }
  401. template<class RPCREQ, class RPCRESP>
  402. void RPCClientTask<RPCREQ, RPCRESP>::rpc_callback(WFNetworkTask<RPCREQ, RPCRESP> *task)
  403. {
  404. RPCWorker worker(new RPCContextImpl<RPCREQ, RPCRESP>(this, &module_data_),
  405. &this->req, &this->resp);
  406. int status_code = this->resp.get_status_code();
  407. if (status_code != RPCStatusOK && status_code != RPCStatusUndefined)
  408. {
  409. this->state = WFT_STATE_TASK_ERROR;
  410. this->error = status_code;
  411. }
  412. else if (this->state == WFT_STATE_SUCCESS)
  413. {
  414. status_code = this->resp.decompress();
  415. if (status_code == RPCStatusOK)
  416. {
  417. this->resp.set_status_code(RPCStatusOK);
  418. status_code = user_done_(status_code, worker);
  419. }
  420. if (status_code != RPCStatusOK)
  421. {
  422. this->state = WFT_STATE_TASK_ERROR;
  423. this->error = status_code;
  424. }
  425. }
  426. if (this->state == WFT_STATE_TASK_ERROR)
  427. {
  428. switch (this->error)
  429. {
  430. case WFT_ERR_URI_PARSE_FAILED:
  431. case WFT_ERR_URI_SCHEME_INVALID:
  432. case WFT_ERR_URI_PORT_INVALID:
  433. status_code = RPCStatusURIInvalid;
  434. break;
  435. case WFT_ERR_UPSTREAM_UNAVAILABLE:
  436. status_code = RPCStatusUpstreamFailed;
  437. break;
  438. default:
  439. break;
  440. }
  441. }
  442. else if (this->state != WFT_STATE_SUCCESS)
  443. {
  444. switch (this->state)
  445. {
  446. case WFT_STATE_SYS_ERROR:
  447. status_code = RPCStatusSystemError;
  448. break;
  449. case WFT_STATE_SSL_ERROR:
  450. status_code = RPCStatusSSLError;
  451. break;
  452. case WFT_STATE_DNS_ERROR:
  453. status_code = RPCStatusDNSError;
  454. break;
  455. case WFT_STATE_ABORTED:
  456. status_code = RPCStatusProcessTerminated;
  457. break;
  458. default:
  459. status_code = RPCStatusUndefined;
  460. break;
  461. }
  462. }
  463. this->resp.set_status_code(status_code);
  464. this->resp.set_error(this->error);
  465. if (!modules_.empty())
  466. {
  467. void *series_data;
  468. RPCModuleData *resp_data = this->mutable_module_data();
  469. if (resp_data->empty()) // get series module data failed previously
  470. {
  471. series_data = series_of(this)->get_specific(SRPC_MODULE_DATA);
  472. if (series_data)
  473. resp_data = (RPCModuleData *)series_data;
  474. }
  475. // else
  476. // this->resp.get_meta_module_data(resp_data);
  477. for (auto *module : modules_)
  478. module->client_task_end(this, *resp_data);
  479. }
  480. if (status_code != RPCStatusOK)
  481. user_done_(status_code, worker);
  482. }
  483. template<class RPCREQ, class RPCRESP>
  484. bool RPCClientTask<RPCREQ, RPCRESP>::get_remote(std::string& ip,
  485. unsigned short *port) const
  486. {
  487. struct sockaddr_storage addr;
  488. socklen_t addrlen = sizeof (addr);
  489. char buf[INET6_ADDRSTRLEN + 1] = { 0 };
  490. if (this->get_peer_addr((struct sockaddr *)&addr, &addrlen) == 0 &&
  491. RPCCommon::addr_to_string((struct sockaddr *)&addr, buf,
  492. INET6_ADDRSTRLEN + 1, port) == true)
  493. {
  494. ip = buf;
  495. return true;
  496. }
  497. return false;
  498. }
  499. template<class RPCREQ, class RPCRESP>
  500. bool RPCServerTask<RPCREQ, RPCRESP>::get_remote(std::string& ip,
  501. unsigned short *port) const
  502. {
  503. struct sockaddr_storage addr;
  504. socklen_t addrlen = sizeof (addr);
  505. char buf[INET6_ADDRSTRLEN + 1] = { 0 };
  506. if (this->get_peer_addr((struct sockaddr *)&addr, &addrlen) == 0 &&
  507. RPCCommon::addr_to_string((struct sockaddr *)&addr, buf,
  508. INET6_ADDRSTRLEN + 1, port) == true)
  509. {
  510. ip = buf;
  511. return true;
  512. }
  513. return false;
  514. }
  515. template<class RPCREQ, class RPCRESP>
  516. void RPCClientTask<RPCREQ, RPCRESP>::log(const RPCLogVector& fields)
  517. {
  518. std::string key;
  519. std::string value;
  520. RPCCommon::log_format(key, value, fields);
  521. module_data_.insert(std::make_pair(std::move(key), std::move(value)));
  522. }
  523. template<class RPCREQ, class RPCRESP>
  524. void RPCClientTask<RPCREQ, RPCRESP>::add_baggage(const std::string& key,
  525. const std::string& value)
  526. {
  527. module_data_[key] = value;
  528. }
  529. template<class RPCREQ, class RPCRESP>
  530. bool RPCClientTask<RPCREQ, RPCRESP>::get_baggage(const std::string& key,
  531. std::string& value)
  532. {
  533. const auto it = module_data_.find(key);
  534. if (it != module_data_.cend())
  535. {
  536. value = it->second;
  537. return true;
  538. }
  539. return false;
  540. }
  541. template<class RPCREQ, class RPCRESP>
  542. inline void RPCClientTask<RPCREQ, RPCRESP>::set_json_add_whitespace(bool on)
  543. {
  544. this->req.set_json_add_whitespace(on);
  545. }
  546. template<class RPCREQ, class RPCRESP>
  547. inline void RPCClientTask<RPCREQ, RPCRESP>::set_json_always_print_enums_as_ints(bool on)
  548. {
  549. this->req.set_json_enums_as_ints(on);
  550. }
  551. template<class RPCREQ, class RPCRESP>
  552. inline void RPCClientTask<RPCREQ, RPCRESP>::set_json_preserve_proto_field_names(bool on)
  553. {
  554. this->req.set_json_preserve_names(on);
  555. }
  556. template<class RPCREQ, class RPCRESP>
  557. inline void RPCClientTask<RPCREQ, RPCRESP>::set_json_always_print_primitive_fields(bool on)
  558. {
  559. this->req.set_json_print_primitive(on);
  560. }
  561. template<class RPCREQ, class RPCRESP>
  562. inline bool RPCClientTask<RPCREQ, RPCRESP>::set_http_header(const std::string& name,
  563. const std::string& value)
  564. {
  565. return this->req.set_http_header(name, value);
  566. }
  567. template<class RPCREQ, class RPCRESP>
  568. inline bool RPCClientTask<RPCREQ, RPCRESP>::add_http_header(const std::string& name,
  569. const std::string& value)
  570. {
  571. return this->req.add_http_header(name, value);
  572. }
  573. } // namespace srpc