ServantProxy.h 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef _TARS_SERVANT_PROXY_H_
  17. #define _TARS_SERVANT_PROXY_H_
  18. #include "util/tc_monitor.h"
  19. #include "util/tc_autoptr.h"
  20. #include "servant/Message.h"
  21. #include "servant/AppProtocol.h"
  22. #include "servant/TarsCurrent.h"
  23. #include "servant/EndpointInfo.h"
  24. #include "servant/CommunicatorEpoll.h"
  25. namespace tars
  26. {
  27. /////////////////////////////////////////////////////////////////////////
  28. /*
  29. * seq 管理的类
  30. */
  31. class SeqManager
  32. {
  33. public:
  34. const static uint16_t MAX_UNSIGN_SHORT = 0xffff;
  35. struct SeqInfo
  36. {
  37. uint16_t next;
  38. bool free;
  39. };
  40. /**
  41. * 构造函数
  42. */
  43. SeqManager(size_t iNum);
  44. /**
  45. * 获取seq
  46. */
  47. uint16_t get();
  48. /**
  49. * 删除seq
  50. */
  51. void del(uint16_t iSeq);
  52. private:
  53. uint16_t _num;
  54. uint16_t _free;
  55. uint16_t _freeTail;
  56. SeqInfo * _p;
  57. TC_SpinLock _mutex;
  58. };
  59. /////////////////////////////////////////////////////////////////////////
  60. /*
  61. * 线程私有数据
  62. */
  63. class ServantProxyThreadData
  64. {
  65. public:
  66. static TC_SpinLock _mutex;
  67. static SeqManager *_pSeq;
  68. static thread_local shared_ptr<ServantProxyThreadData> g_sp;
  69. /**
  70. * 构造函数
  71. */
  72. ServantProxyThreadData();
  73. /**
  74. * 析构函数
  75. */
  76. ~ServantProxyThreadData();
  77. /**
  78. * 获取线程数据,没有的话会自动创建
  79. * @return ServantProxyThreadData*
  80. */
  81. static ServantProxyThreadData * getData();
  82. public:
  83. /*
  84. * 每个线程跟客户端网络线程通信的队列
  85. */
  86. ReqInfoQueue * _reqQueue[MAX_CLIENT_THREAD_NUM]; //队列数组
  87. bool _queueInit; //是否初始化
  88. size_t _reqQNo; //请求事件通知的seq
  89. size_t _netSeq; //轮训选择网络线程的偏移量
  90. int _netThreadSeq; //网络线程发起的请求回到自己的网络线程来处理,其值为网络线程的id
  91. /**
  92. * hash属性,客户端每次调用都进行设置
  93. */
  94. bool _hash; //是否普通取模hash
  95. bool _conHash; //是否一致性hash
  96. int64_t _hashCode; //hash值
  97. /**
  98. * 染色信息
  99. */
  100. bool _dyeing; //标识当前线程是否需要染色
  101. string _dyeingKey; //染色的key值
  102. /**
  103. * 允许客户端设置接口级别的超时时间,包括同步和异步、单向
  104. */
  105. bool _hasTimeout; //是否设置超时间
  106. int _timeout; //超时时间
  107. /**
  108. * 保存调用后端服务的地址信息
  109. */
  110. string _szHost; //调用对端地址
  111. /**
  112. * 协程调度
  113. */
  114. CoroutineScheduler* _sched; //协程调度器
  115. /**
  116. * ObjectProxy
  117. */
  118. size_t _objectProxyNum; //ObjectProxy对象的个数,其个数由客户端的网络线程数决定,每个网络线程有一个ObjectProxy
  119. /**
  120. * objectProxy Pointer
  121. */
  122. shared_ptr<ObjectProxy *> _objectProxyOwn; //保存ObjectProxy对象的指针数组
  123. #ifdef _USE_OPENTRACKING
  124. std::unordered_map<std::string, std::string> _trackInfoMap;
  125. #endif
  126. };
  127. //////////////////////////////////////////////////////////////////////////
  128. // 协程并行请求的基类
  129. class CoroParallelBase : virtual public TC_HandleBase
  130. {
  131. public:
  132. /**
  133. * 构造
  134. */
  135. CoroParallelBase(int iNum)
  136. : _num(iNum)
  137. , _count(iNum)
  138. , _req_count(0)
  139. {}
  140. /**
  141. * 析构函数
  142. */
  143. virtual ~CoroParallelBase() {}
  144. /**
  145. * 增加调用协程接口请求的数目
  146. */
  147. int incReqCount() { return (++_req_count); }
  148. /**
  149. * 判断协程并行请求数目是否都发送了
  150. */
  151. bool checkAllReqSend() { return _num == _req_count; }
  152. /**
  153. * 判断协程并行请求是否都回来了
  154. */
  155. bool checkAllReqReturn() { return (--_count) == 0; }
  156. /**
  157. * 获取所有请求回来的响应
  158. */
  159. vector<ReqMessage*> getAllReqMessage()
  160. {
  161. vector<ReqMessage*> vRet;
  162. {
  163. TC_LockT<TC_SpinLock> lock(_mutex);
  164. vRet.swap(_vReqMessage);
  165. }
  166. return vRet;
  167. }
  168. /**
  169. * 插入请求回来的响应
  170. */
  171. void insert(ReqMessage* msg)
  172. {
  173. TC_LockT<TC_SpinLock> lock(_mutex);
  174. _vReqMessage.push_back(msg);
  175. }
  176. protected:
  177. /**
  178. * 并行请求的数目
  179. */
  180. int _num;
  181. /**
  182. * 并行请求的响应还未回来的数目
  183. */
  184. std::atomic<int> _count;
  185. /**
  186. * 并行请求的已发送的数目
  187. */
  188. std::atomic<int> _req_count;
  189. /**
  190. * 互斥锁
  191. */
  192. TC_SpinLock _mutex;
  193. /**
  194. * 请求的响应的容器
  195. */
  196. vector<ReqMessage*> _vReqMessage;
  197. };
  198. typedef TC_AutoPtr<CoroParallelBase> CoroParallelBasePtr;
  199. //等待所有的请求回来
  200. void coroWhenAll(const CoroParallelBasePtr &ptr);
  201. //////////////////////////////////////////////////////////////////////////
  202. // 异步回调对象的基类
  203. class ServantProxyCallback : virtual public TC_HandleBase
  204. {
  205. public:
  206. /**
  207. * 构造
  208. */
  209. ServantProxyCallback();
  210. /**
  211. * 析构函数
  212. */
  213. virtual ~ServantProxyCallback() {}
  214. /**
  215. * 获取类型
  216. * @return const string&
  217. */
  218. virtual const string& getType() { return _type; }
  219. /**
  220. * 设置类型
  221. * @return const string&
  222. */
  223. virtual void setType(const string& type) { _type = type; }
  224. /**
  225. * 设置coro并行请求的共享智能指针
  226. */
  227. virtual void setCoroParallelBasePtr(tars::CoroParallelBasePtr pPtr) { _pPtr = pPtr; }
  228. /**
  229. * 获取coro并行请求的共享智能指针
  230. */
  231. virtual const tars::CoroParallelBasePtr& getCoroParallelBasePtr() { return _pPtr; }
  232. /**
  233. * 异步请求是否在网络线程处理
  234. * tars内部用的到 业务不能设置这个值
  235. * */
  236. inline void setNetThreadProcess(bool bNetThreadProcess)
  237. {
  238. _bNetThreadProcess = bNetThreadProcess;
  239. }
  240. inline bool getNetThreadProcess()
  241. {
  242. return _bNetThreadProcess;
  243. }
  244. public:
  245. /**
  246. * 异步回调对象实现该方法,进行业务逻辑处理
  247. * @param msg
  248. * @return int
  249. */
  250. virtual int onDispatch(ReqMessagePtr ptr) = 0;
  251. protected:
  252. /**
  253. * 同一链路多个cb的时候可以用来区分class类型
  254. */
  255. string _type;
  256. /**
  257. * 异步请求是否在网络线程处理
  258. * tars内部用的到 业务不能设置这个值
  259. * */
  260. bool _bNetThreadProcess;
  261. /**
  262. * 协程并行请求的共享智能指针
  263. */
  264. tars::CoroParallelBasePtr _pPtr;
  265. };
  266. ///////////////////////////////////////////////////////////////////////////////////////////////
  267. // for http
  268. class HttpCallback : public TC_HandleBase
  269. {
  270. public:
  271. virtual int onHttpResponse(const std::map<std::string, std::string>& requestHeaders ,
  272. const std::map<std::string, std::string>& responseHeaders ,
  273. const std::vector<char>& rspBody) = 0;
  274. virtual int onHttpResponseException(const std::map<std::string, std::string>& requestHeaders,
  275. int expCode) = 0;
  276. };
  277. typedef TC_AutoPtr<HttpCallback> HttpCallbackPtr;
  278. class HttpServantProxyCallback : virtual public ServantProxyCallback
  279. {
  280. public:
  281. explicit HttpServantProxyCallback(const HttpCallbackPtr& cb);
  282. /**
  283. * 异步回调对象实现该方法,进行业务逻辑处理
  284. * @param msg
  285. * @return int
  286. */
  287. virtual int onDispatch(ReqMessagePtr ptr);
  288. /**
  289. * 异步回调对象实现该方法,进行业务逻辑处理
  290. * @param msg
  291. * @return void
  292. */
  293. virtual int onDispatchResponse(const RequestPacket &req, const ResponsePacket &rsp);
  294. /**
  295. * 异步回调对象实现该方法(异常),进行业务逻辑处理
  296. * @param msg
  297. * @return void
  298. */
  299. virtual int onDispatchException(const RequestPacket &req, const ResponsePacket &rsp);
  300. private:
  301. HttpCallbackPtr _httpCb;
  302. };
  303. //////////////////////////////////////////////////////////////////////////
  304. /**
  305. * 1:远程对象的本地代理
  306. * 2:同名servant在一个通信器中最多只有一个实例
  307. * 3:防止和用户在Tars中定义的函数名冲突,接口以tars_开头
  308. */
  309. class EndpointManagerThread;
  310. class ServantProxy : public TC_HandleBase, public TC_ThreadMutex
  311. {
  312. public:
  313. /**
  314. * 通过status传递数据时用到的缺省字符串
  315. */
  316. static string STATUS_DYED_KEY; //需要染色的用户ID
  317. static string STATUS_GRID_KEY; //需要灰度的用户ID
  318. static string STATUS_SAMPLE_KEY; //stat 采样的信息
  319. static string STATUS_RESULT_CODE; //处理结果码,tup使用
  320. static string STATUS_RESULT_DESC; //处理结果描述,tup使用
  321. static string STATUS_SETNAME_VALUE; //set调用
  322. static string TARS_MASTER_KEY; //透传主调名称信息
  323. static string STATUS_TRACK_KEY; //track信息
  324. /**
  325. * 缺省的同步调用超时时间
  326. * 超时后不保证消息不会被服务端处理
  327. */
  328. enum { DEFAULT_SYNCTIMEOUT = 3000, DEFAULT_ASYNCTIMEOUT=5000};
  329. /**
  330. * 构造函数
  331. * @param op
  332. */
  333. ServantProxy(Communicator * pCommunicator, ObjectProxy ** ppObjectProxy, size_t iClientThreadNum);
  334. /**
  335. * 析构函数
  336. */
  337. virtual ~ServantProxy();
  338. public:
  339. /**
  340. * 获取Object可用服务列表 如果启用set则获取本set的,如果启用分组,只返回同分组的服务端ip
  341. * @return void
  342. */
  343. void tars_endpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  344. /**
  345. * 获取Object可用服务列表 所有的列表
  346. * @return void
  347. */
  348. void tars_endpointsAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  349. /**
  350. * 获取Object可用服务列表 根据set名字获取
  351. * @return void
  352. */
  353. void tars_endpointsBySet(const string & sName,vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  354. /**
  355. * 获取Object可用服务列表 根据地区名字获取
  356. * @return void
  357. */
  358. void tars_endpointsByStation(const string & sName,vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  359. /**
  360. * 获取Object可用服务列表 包括指定归属地
  361. * @return vector<TC_Endpoint>
  362. **/
  363. vector<TC_Endpoint> tars_endpoints(const std::string & sStation);
  364. /**
  365. * 获取Object可用服务列表 包括指定归属地
  366. * @return void
  367. */
  368. void tars_endpoints(const std::string & sStation, vector<TC_Endpoint> & vecActive, vector<TC_Endpoint> & vecInactive);
  369. /**
  370. * 获取Object可用服务列表 如果启用分组,只返回同分组的服务端ip
  371. * @return vector<TC_Endpoint>
  372. */
  373. vector<TC_Endpoint> getEndpoint();
  374. /**
  375. * 获取Object可用服务列表 包括所有IDC
  376. * @return vector<TC_Endpoint>
  377. */
  378. vector<TC_Endpoint> getEndpoint4All();
  379. public:
  380. /**
  381. * 获取通信器
  382. *
  383. * @return Communicator*
  384. */
  385. Communicator* tars_communicator() { return _communicator; }
  386. /**
  387. * 发送测试消息到服务器
  388. */
  389. void tars_ping();
  390. /**
  391. * 异步ping, 不等回包
  392. */
  393. void tars_async_ping();
  394. /**
  395. * 设置同步调用超时时间,对该proxy上所有方法都有效
  396. * @param msecond
  397. */
  398. void tars_timeout(int msecond);
  399. /**
  400. * 获取同步调用超时时间,对该proxy上所有方法都有效
  401. * @return int
  402. */
  403. int tars_timeout() const;
  404. /**
  405. * 设置连接超时时间
  406. * @return int
  407. */
  408. void tars_connect_timeout(int conTimeout);
  409. /**
  410. * 获取所属的Object名称
  411. * @return string
  412. */
  413. string tars_name() const;
  414. /**
  415. * 获取最近一次调用的IP地址和端口
  416. * @return string
  417. */
  418. static TC_Endpoint tars_invoke_endpoint();
  419. /**
  420. * 设置用户自定义协议
  421. * @param protocol
  422. */
  423. void tars_set_protocol(const ProxyProtocol& protocol);
  424. /**
  425. *设置套接字选项
  426. */
  427. void tars_set_sockopt(int level, int optname, const void *optval, socklen_t optlen);
  428. /**
  429. * 设置超时检查参数
  430. */
  431. void tars_set_check_timeout(const CheckTimeoutInfo& checkTimeoutInfo);
  432. /**
  433. * 获取超时检查参数
  434. */
  435. CheckTimeoutInfo tars_get_check_timeout();
  436. /**
  437. * hash方法,为保证一段时间内同一个key的消息发送
  438. * 到相同的服务端,由于服务列表动态变化,所以
  439. * 不严格保证
  440. * @param key
  441. * @return ServantProxy*
  442. */
  443. virtual ServantProxy* tars_hash(int64_t key);
  444. /**
  445. * 一致性hash方法
  446. */
  447. virtual ServantProxy* tars_consistent_hash(int64_t key);
  448. /**
  449. * 清除当前的Hash数据
  450. * 空函数 为了兼容以前的
  451. * @param key
  452. * @return ServantProxy*
  453. */
  454. void tars_clear_hash();
  455. /**
  456. * 针对客户端调用接口级别的超时时间设置,包括同步和异步调用
  457. * 每次接口调用都必须设置,否则取系统默认超时时间
  458. *
  459. * @param msecond 单位毫秒
  460. * @return ServantProxy*
  461. * 示例: prxy->tars_set_timeout(3000)->sayHello();
  462. */
  463. virtual ServantProxy* tars_set_timeout(int msecond);
  464. /**
  465. * 设置异步调用超时时间,对该proxy上所有方法都有效
  466. * @param msecond
  467. */
  468. void tars_async_timeout(int msecond);
  469. /**
  470. * 获取异步调用超时时间,对该proxy上所有方法都有效
  471. * @return int
  472. */
  473. int tars_async_timeout() const;
  474. /**
  475. * 用proxy产生一个该object上的序列号
  476. * @return uint32_t
  477. */
  478. virtual uint32_t tars_gen_requestid();
  479. /**
  480. * 设置PUSH类消息的响应callback
  481. * @param cb
  482. */
  483. virtual void tars_set_push_callback(const ServantProxyCallbackPtr& cb);
  484. /**
  485. * TARS协议同步方法调用
  486. */
  487. virtual shared_ptr<ResponsePacket> tars_invoke(char cPacketType,
  488. const string& sFuncName,
  489. tars::TarsOutputStream<tars::BufferWriterVector>& buf,
  490. const map<string, string>& context,
  491. const map<string, string>& status);
  492. /**
  493. * TARS协议异步方法调用
  494. */
  495. virtual void tars_invoke_async(char cPacketType,
  496. const string& sFuncName,
  497. tars::TarsOutputStream<tars::BufferWriterVector> &buf,
  498. const map<string, string>& context,
  499. const map<string, string>& status,
  500. const ServantProxyCallbackPtr& callback,
  501. bool bCoro = false);
  502. /**
  503. * 普通协议同步远程调用
  504. */
  505. virtual void rpc_call(uint32_t requestId, const string& sFuncName,
  506. const char* buff, uint32_t len, ResponsePacket &rsp);
  507. /**
  508. * 普通协议异步调用
  509. */
  510. virtual void rpc_call_async(uint32_t requestId, const string& sFuncName,
  511. const char* buff, uint32_t len,
  512. const ServantProxyCallbackPtr& callback,
  513. bool bCoro = false);
  514. /**
  515. * http1同步远程调用
  516. */
  517. void http1_call(const std::string& method,
  518. const std::string& uri,
  519. const std::map<std::string, std::string>& headers,
  520. const std::string& body,
  521. std::map<std::string, std::string>& rheaders,
  522. std::string& rbody);
  523. /**
  524. * http2协议同步远程调用
  525. */
  526. void http2_call(const std::string& method,
  527. const std::string& uri,
  528. const std::map<std::string, std::string>& headers,
  529. const std::string& body,
  530. std::map<std::string, std::string>& rheaders,
  531. std::string& rbody);
  532. /**
  533. * http2协议异步远程调用
  534. */
  535. void http2_call_async(const std::map<std::string, std::string>& headers,
  536. const std::string& body,
  537. const HttpCallbackPtr &cb);
  538. /**
  539. * 在RequestPacket中的context设置主调信息标识
  540. */
  541. virtual void tars_setMasterFlag(bool bMasterFlag) {_masterFlag = bMasterFlag;}
  542. private:
  543. /**
  544. * 远程方法调用
  545. * @param req
  546. * @return int
  547. */
  548. void invoke(ReqMessage * msg, bool bCoroAsync = false);
  549. /**
  550. * 远程方法调用返回
  551. * @param req
  552. * @return int
  553. */
  554. void finished(ReqMessage * msg);
  555. /**
  556. * 选取一个网络线程对应的信息
  557. * @param pSptd
  558. * @return void
  559. */
  560. void selectNetThreadInfo(ServantProxyThreadData * pSptd, ObjectProxy * & pObjProxy, ReqInfoQueue * & pReqQ);
  561. /**
  562. * 检查是否需要设置染色消息
  563. * @param req
  564. */
  565. void checkDye(RequestPacket& req);
  566. private:
  567. friend class ObjectProxy;
  568. friend class AdapterProxy;
  569. /**
  570. * 通信器
  571. */
  572. Communicator * _communicator;
  573. /**
  574. * 保存ObjectProxy对象的指针数组
  575. */
  576. ObjectProxy ** _objectProxy; //保存ObjectProxy对象的指针数组
  577. shared_ptr<ObjectProxy *> _objectProxyOwn; //保存ObjectProxy对象的指针数组
  578. /**
  579. * ObjectProxy对象的个数,其个数由客户端的网络线程数决定,
  580. * 每个网络线程有一个ObjectProxy
  581. */
  582. size_t _objectProxyNum;
  583. /**
  584. * 同步调用超时(毫秒)
  585. */
  586. int _syncTimeout;
  587. /**
  588. * 同步调用超时(毫秒)
  589. */
  590. int _asyncTimeout;
  591. /**
  592. * 唯一id
  593. */
  594. std::atomic<uint32_t> _id;
  595. /**
  596. * 获取endpoint对象
  597. */
  598. std::unique_ptr<EndpointManagerThread> _endpointInfo;
  599. /**
  600. * 是否在RequestPacket中的context设置主调信息
  601. */
  602. bool _masterFlag;
  603. /*
  604. *最小的超时时间
  605. */
  606. int64_t _minTimeout;
  607. };
  608. }
  609. #endif