ServantProxy.h 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef _TARS_SERVANT_PROXY_H_
  17. #define _TARS_SERVANT_PROXY_H_
  18. #include "util/tc_common.h"
  19. #include "util/tc_uuid_generator.h"
  20. #include "util/tc_monitor.h"
  21. #include "util/tc_autoptr.h"
  22. #include "util/tc_proxy_info.h"
  23. #include "util/tc_singleton.h"
  24. #include "servant/Message.h"
  25. #include "servant/AppProtocol.h"
  26. #include "servant/Current.h"
  27. #include "servant/EndpointInfo.h"
  28. namespace tars
  29. {
  30. class CommunicatorEpoll;
  31. class EndpointInfo;
  32. /////////////////////////////////////////////////////////////////////////
  33. //seq 管理的类
  34. class SeqManager
  35. {
  36. public:
  37. const static uint16_t MAX_UNSIGN_SHORT = 0xffff;
  38. struct SeqInfo
  39. {
  40. uint16_t next;
  41. bool free;
  42. };
  43. /**
  44. * 构造函数
  45. */
  46. SeqManager(uint16_t iNum);
  47. /**
  48. * 析构
  49. */
  50. ~SeqManager();
  51. /**
  52. * 获取seq
  53. */
  54. uint16_t get();
  55. /**
  56. * 删除seq
  57. */
  58. void del(uint16_t iSeq);
  59. private:
  60. uint16_t _num;
  61. uint16_t _free;
  62. uint16_t _freeTail;
  63. SeqInfo * _p;
  64. TC_SpinLock _mutex;
  65. };
  66. /////////////////////////////////////////////////////////////////////////
  67. /*
  68. * 线程私有数据
  69. */
  70. class ServantProxyThreadData : public std::enable_shared_from_this<ServantProxyThreadData>
  71. {
  72. public:
  73. /**
  74. * 全局不死的数据, 私用指针, 且不delete
  75. * 业务不需要主动使用该对象!
  76. */
  77. class Immortal
  78. {
  79. public:
  80. Immortal();
  81. ~Immortal();
  82. void add(ServantProxyThreadData *data);
  83. void erase(ServantProxyThreadData* data);
  84. void erase(Communicator * comm);
  85. unordered_set<ServantProxyThreadData *> getList();
  86. SeqManager *getSeqManager() { return _pSeq.get(); }
  87. protected:
  88. unordered_set<ServantProxyThreadData*> _sp_list;
  89. TC_ThreadMutex _mutex;
  90. unique_ptr<SeqManager> _pSeq;
  91. };
  92. static shared_ptr<Immortal> g_immortal;
  93. public:
  94. static thread_local shared_ptr<ServantProxyThreadData> g_sp;
  95. /**
  96. * global Immortal ptr, 避免Immortal提前被释放掉
  97. */
  98. shared_ptr<Immortal> _sp_immortal;
  99. /**
  100. * 构造函数
  101. */
  102. ServantProxyThreadData();
  103. /**
  104. * 析构函数
  105. */
  106. ~ServantProxyThreadData();
  107. /**
  108. * 获取线程数据,没有的话会自动创建
  109. * @return ServantProxyThreadData*
  110. */
  111. static ServantProxyThreadData * getData();
  112. /**
  113. * 析构通信器时调用
  114. * @param communicator
  115. */
  116. static void deconstructor(Communicator *communicator);
  117. /**
  118. * move掉
  119. */
  120. ThreadPrivateData move();
  121. /**
  122. * 业务发起调用的线程和网络通信器间都有一个队列
  123. */
  124. struct CommunicatorEpollReqQueueInfo
  125. {
  126. weak_ptr<ReqInfoQueue> _reqQueue;
  127. weak_ptr<CommunicatorEpoll> _communicatorEpoll;
  128. };
  129. //每发起调用的线程 记录的 公有网络通信器数据
  130. //此时业务线程和
  131. struct CommunicatorEpollInfo
  132. {
  133. /*
  134. * 每个线程跟客户端网络线程通信的队列
  135. * <网络线程序号, 网络通信信息>
  136. */
  137. vector<CommunicatorEpollReqQueueInfo> _info;
  138. size_t _netSeq = 0; //轮训选择网络线程的偏移量
  139. Communicator *_communicator = NULL;
  140. };
  141. /**
  142. * 业务线程处于协程模式下, 记录当前网络通信器信息
  143. * 此时业务线程和网络通信器是一对一的, 即用自身线程对应的私有网络通信器即可
  144. */
  145. struct SchedCommunicatorEpollInfo
  146. {
  147. CommunicatorEpollReqQueueInfo _info;
  148. Communicator *_communicator = NULL;
  149. };
  150. /**
  151. * 初始化当前业务线程和网络通信器之间的关系(构建发送队列)
  152. */
  153. shared_ptr<ServantProxyThreadData::CommunicatorEpollInfo> addCommunicatorEpoll(const shared_ptr<CommunicatorEpoll> &ce);
  154. /**
  155. * 通信器析构时调用
  156. * @param communicator
  157. */
  158. void erase(Communicator *communicator);
  159. /**
  160. * 获取公有通信器对应的网络通信器等基本信息
  161. * @param communicator
  162. * @return
  163. */
  164. shared_ptr<CommunicatorEpollInfo> getCommunicatorEpollInfo(Communicator *communicator);
  165. /**
  166. * 获取私有通信器对应的网络通信器等基本信息
  167. * @param communicator
  168. * @return
  169. */
  170. shared_ptr<SchedCommunicatorEpollInfo> getSchedCommunicatorEpollInfo(Communicator *communicator);
  171. protected:
  172. /**
  173. * communicator对应的公用网路通信器
  174. */
  175. unordered_map<Communicator*, shared_ptr<CommunicatorEpollInfo>> _communicatorEpollInfo;
  176. /**
  177. * 私有的网络通信器, 每个业务线程都对应一个, 业务线程是协程模式下使用
  178. */
  179. unordered_map<Communicator*, shared_ptr<SchedCommunicatorEpollInfo>> _schedCommunicatorEpollInfo;
  180. public:
  181. //lock
  182. TC_ThreadMutex _mutex;
  183. //业务线程的序号, 通知网络线程时, 知道用哪个notify来唤醒网路线程
  184. uint16_t _reqQNo;
  185. /**
  186. * 协程调度
  187. */
  188. shared_ptr<TC_CoroutineScheduler> _sched;
  189. /**
  190. * 线程私有数据
  191. */
  192. ThreadPrivateData _data;
  193. /**
  194. * 当前线程是否关联了网络通信器, 如果关联了, 则表示当前线程处于网络线程中!
  195. */
  196. CommunicatorEpoll *_communicatorEpoll = NULL;
  197. ///////////////////////////////////////////////////////////////////////////////////////
  198. /**
  199. * 调用链追踪信息
  200. */
  201. struct TraceContext
  202. {
  203. int traceType; // 0 不用打参数, 1 只打客户端调用参数, 2 客户端服务端参数都打印
  204. string traceID; // traceID
  205. string spanID; // spanID
  206. string parentSpanID; // 父spanID
  207. enum E_SpanType
  208. {
  209. EST_CS = 1,
  210. EST_CR = 2,
  211. EST_SR = 4,
  212. EST_SS = 8,
  213. EST_TS,
  214. EST_TE,
  215. };
  216. // key 分两种情况,1.rpc调用; 2.异步回调
  217. bool init(const string& k)
  218. {
  219. vector<string> vs = TC_Common::sepstr<string>(k, "|");
  220. if (vs.size() == 2)
  221. {
  222. traceID = vs[0];
  223. parentSpanID = vs[1];
  224. spanID = "";
  225. traceType =initType(traceID);
  226. return true;
  227. }
  228. else if (vs.size() == 3)
  229. {
  230. traceID = vs[0];
  231. spanID = vs[1];
  232. parentSpanID = vs[2];
  233. traceType = initType(traceID);
  234. return true;
  235. }
  236. else
  237. {
  238. reset();
  239. }
  240. return false;
  241. }
  242. static int initType(const string& tid)
  243. {
  244. string::size_type pos = tid.find("-");
  245. int type = 0;
  246. if (pos != string::npos)
  247. {
  248. type = strtol(tid.substr(0, pos).c_str(), NULL, 16);
  249. }
  250. if (type < 0 || type > 15)
  251. {
  252. type = 0;
  253. }
  254. return type;
  255. }
  256. void reset()
  257. {
  258. traceID = "";
  259. spanID = "";
  260. parentSpanID = "";
  261. traceType = 0;
  262. }
  263. TraceContext()
  264. {
  265. }
  266. TraceContext(const string& k)
  267. {
  268. init(k);
  269. }
  270. void newSpan()
  271. {
  272. spanID = TC_UUIDGenerator::getInstance()->genID();
  273. }
  274. string getKey(E_SpanType es) const
  275. {
  276. switch (es)
  277. {
  278. case EST_CS:
  279. case EST_CR:
  280. case EST_TS:
  281. case EST_TE:
  282. return traceID + "|" + spanID + "|" + parentSpanID;
  283. break;
  284. case EST_SR:
  285. case EST_SS:
  286. return traceID + "|" + parentSpanID + "|*";
  287. break;
  288. default:
  289. break;
  290. }
  291. return "";
  292. }
  293. string getKey(bool full) const
  294. {
  295. return full ? (traceID + "|" + spanID + "|" + parentSpanID) : (traceID + "|" + spanID);
  296. }
  297. static bool needParam(E_SpanType es, int type)
  298. {
  299. if (es == EST_TS)
  300. {
  301. es = EST_CS;
  302. }
  303. else if (es == EST_TE)
  304. {
  305. es = EST_CR;
  306. }
  307. return (bool)((int)es & type);
  308. }
  309. };
  310. bool _traceCall; //标识当前线程是否需要调用链追踪
  311. TraceContext _traceContext; //调用链追踪信息
  312. string getTraceKey(TraceContext::E_SpanType es) const
  313. {
  314. return _traceContext.getKey(es);
  315. }
  316. string getTraceKey(bool full = false) const
  317. {
  318. return _traceContext.getKey(full);
  319. }
  320. void newSpan()
  321. {
  322. _traceContext.newSpan();
  323. }
  324. bool initTrace(const string& k)
  325. {
  326. return _traceContext.init(k);
  327. }
  328. int getTraceType() const
  329. {
  330. return _traceContext.traceType;
  331. }
  332. bool needTraceParam(TraceContext::E_SpanType es)
  333. {
  334. return _traceContext.needParam(es, _traceContext.traceType);
  335. }
  336. static bool needTraceParam(TraceContext::E_SpanType es, const string& k)
  337. {
  338. int type = TraceContext::initType(k);
  339. return TraceContext::needParam(es, type);
  340. }
  341. ////////////////////////////////////////////////////////////////////////////////////调用链追踪 end/////
  342. };
  343. //////////////////////////////////////////////////////////////////////////
  344. // 协程并行请求的基类
  345. class CoroParallelBase : virtual public TC_HandleBase
  346. {
  347. public:
  348. /**
  349. * 构造
  350. */
  351. CoroParallelBase(int iNum)
  352. : _num(iNum)
  353. , _count(iNum)
  354. , _req_count(0)
  355. {}
  356. /**
  357. * 析构函数
  358. */
  359. virtual ~CoroParallelBase() {}
  360. /**
  361. * 增加调用协程接口请求的数目
  362. */
  363. int incReqCount() { return (++_req_count); }
  364. /**
  365. * 判断协程并行请求数目是否都发送了
  366. */
  367. bool checkAllReqSend() { return _num == _req_count; }
  368. /**
  369. * 判断协程并行请求是否都回来了
  370. */
  371. bool checkAllReqReturn() { return (--_count) == 0; }
  372. /**
  373. * 获取所有请求回来的响应
  374. */
  375. vector<ReqMessage*> getAllReqMessage()
  376. {
  377. vector<ReqMessage*> vRet;
  378. {
  379. TC_LockT<TC_SpinLock> lock(_mutex);
  380. vRet.swap(_vReqMessage);
  381. _vReqMessage.clear();
  382. }
  383. return vRet;
  384. }
  385. /**
  386. * 插入请求回来的响应
  387. */
  388. void insert(ReqMessage* msg)
  389. {
  390. TC_LockT<TC_SpinLock> lock(_mutex);
  391. _vReqMessage.push_back(msg);
  392. }
  393. protected:
  394. /**
  395. * 并行请求的数目
  396. */
  397. int _num;
  398. /**
  399. * 并行请求的响应还未回来的数目
  400. */
  401. std::atomic<int> _count;
  402. /**
  403. * 并行请求的已发送的数目
  404. */
  405. std::atomic<int> _req_count;
  406. /**
  407. * 互斥锁
  408. */
  409. TC_SpinLock _mutex;
  410. /**
  411. * 请求的响应的容器
  412. */
  413. vector<ReqMessage*> _vReqMessage;
  414. };
  415. typedef TC_AutoPtr<CoroParallelBase> CoroParallelBasePtr;
  416. //等待所有的请求回来
  417. void coroWhenAll(const CoroParallelBasePtr &ptr);
  418. //////////////////////////////////////////////////////////////////////////
  419. // 异步回调对象的基类
  420. class ServantProxyCallback : virtual public TC_HandleBase
  421. {
  422. public:
  423. /**
  424. * 构造
  425. */
  426. ServantProxyCallback();
  427. /**
  428. * 析构函数
  429. */
  430. virtual ~ServantProxyCallback() {}
  431. /**
  432. * 设置发起调用的servant
  433. * @param prx
  434. */
  435. void setServantPrx(const ServantPrx &prx) { _servantPrx = prx; }
  436. /**
  437. * 获取类型
  438. * @return const string&
  439. */
  440. virtual const string& getType() { return _type; }
  441. /**
  442. * 设置类型
  443. * @return const string&
  444. */
  445. virtual void setType(const string& type) { _type = type; }
  446. /**
  447. * 设置coro并行请求的共享智能指针
  448. */
  449. virtual void setCoroParallelBasePtr(CoroParallelBasePtr pPtr) { _pPtr = pPtr; }
  450. /**
  451. * 获取coro并行请求的共享智能指针
  452. */
  453. virtual const CoroParallelBasePtr& getCoroParallelBasePtr() { return _pPtr; }
  454. /**
  455. * 异步请求是否在网络线程处理
  456. * 内部用的到 业务不能设置这个值
  457. * */
  458. inline void setNetThreadProcess(bool bNetThreadProcess)
  459. {
  460. _bNetThreadProcess = bNetThreadProcess;
  461. }
  462. inline bool getNetThreadProcess()
  463. {
  464. return _bNetThreadProcess;
  465. }
  466. public:
  467. /**
  468. * dispatch, call onDispatch
  469. * @param msg
  470. * @return
  471. */
  472. int dispatch(ReqMessagePtr msg);
  473. protected:
  474. /**
  475. * 异步回调对象实现该方法,进行业务逻辑处理
  476. * @param msg
  477. * @return int
  478. */
  479. virtual int onDispatch(ReqMessagePtr msg) = 0;
  480. /**
  481. * 连接关闭掉了(push callback 才有效),老版本的onClose不带ep,为了兼容并且带上ep
  482. */
  483. virtual void onClose() {};
  484. virtual void onClose(const TC_Endpoint& ep) {onClose();};
  485. /**
  486. * 连接已建立(push callback 才有效)
  487. */
  488. virtual void onConnect(const TC_Endpoint& ep) {};
  489. friend class AdapterProxy;
  490. protected:
  491. /**
  492. * 同一链路多个cb的时候可以用来区分class类型
  493. */
  494. string _type;
  495. /**
  496. * 异步请求是否在网络线程处理
  497. * 内部用的到 业务不能设置这个值
  498. * */
  499. bool _bNetThreadProcess;
  500. /**
  501. * 协程并行请求的共享智能指针
  502. */
  503. CoroParallelBasePtr _pPtr;
  504. /**
  505. * servant prx
  506. */
  507. ServantPrx _servantPrx;
  508. };
  509. ///////////////////////////////////////////////////////////////////////////////////////////////
  510. // for http
  511. class HttpCallback : public TC_HandleBase
  512. {
  513. public:
  514. virtual int onHttpResponse(const shared_ptr<TC_HttpResponse> &rsp) = 0;
  515. virtual int onHttpResponseException(int expCode) = 0;
  516. };
  517. typedef TC_AutoPtr<HttpCallback> HttpCallbackPtr;
  518. class HttpServantProxyCallback : virtual public ServantProxyCallback
  519. {
  520. public:
  521. explicit HttpServantProxyCallback(const HttpCallbackPtr& cb);
  522. /**
  523. * 异步回调对象实现该方法,进行业务逻辑处理
  524. * @param msg
  525. * @return int
  526. */
  527. virtual int onDispatch(ReqMessagePtr ptr);
  528. /**
  529. * 异步回调对象实现该方法,进行业务逻辑处理
  530. * @param msg
  531. * @return void
  532. */
  533. virtual int onDispatchResponse(const RequestPacket &req, const ResponsePacket &rsp);
  534. /**
  535. * 异步回调对象实现该方法(异常),进行业务逻辑处理
  536. * @param msg
  537. * @return void
  538. */
  539. virtual int onDispatchException(const RequestPacket &req, const ResponsePacket &rsp);
  540. private:
  541. HttpCallbackPtr _httpCb;
  542. };
  543. //////////////////////////////////////////////////////////////////////////
  544. /**
  545. * 1:远程对象的本地代理
  546. * 2:同名servant在一个通信器中最多只有一个实例
  547. * 3:防止和用户在Tars中定义的函数名冲突,接口以tars_开头
  548. */
  549. class EndpointManagerThread;
  550. class ServantProxy : public TC_HandleBase, public TC_ThreadMutex
  551. {
  552. public:
  553. /**
  554. * 通过status传递数据时用到的缺省字符串
  555. */
  556. static string STATUS_DYED_KEY; //需要染色的用户ID
  557. static string STATUS_GRID_KEY; //需要灰度染色的用户ID
  558. static string STATUS_RESULT_CODE; //处理结果码,tup使用
  559. static string STATUS_RESULT_DESC; //处理结果描述,tup使用
  560. static string STATUS_SETNAME_VALUE; //set调用
  561. static string STATUS_TRACE_KEY; //trace信息
  562. ///////////////////////////////////////////////////////////////////
  563. /**
  564. * socket选项
  565. */
  566. struct SocketOpt
  567. {
  568. int level;
  569. int optname;
  570. const void *optval;
  571. SOCKET_LEN_TYPE optlen;
  572. };
  573. /**
  574. * 缺省的同步调用超时时间
  575. * 超时后不保证消息不会被服务端处理
  576. */
  577. enum { DEFAULT_SYNCTIMEOUT = 3000, DEFAULT_ASYNCTIMEOUT=5000};
  578. /**
  579. * default connection serial num
  580. */
  581. const static int DEFAULT_CONNECTION_SERIAL = 10;
  582. //自定义回调
  583. typedef std::function<void(ReqMessagePtr)> custom_callback;
  584. /**
  585. * 内置四种协议支持
  586. */
  587. enum SERVANT_PROTOCOL
  588. {
  589. PROTOCOL_TARS, //默认tars服务的协议
  590. PROTOCOL_HTTP1, //http协议
  591. PROTOCOL_HTTP2, //http2协议
  592. PROTOCOL_GRPC, //grpc协议
  593. };
  594. /**
  595. * 代理设置
  596. */
  597. enum SERVANT_PROXY
  598. {
  599. PROXY_SOCK4, //支持sock4代理
  600. PROXY_SOCK5, //支持sock5代理
  601. PROXY_HTTP, //支持http代理
  602. };
  603. /**
  604. * 构造函数
  605. * @param op
  606. */
  607. ServantProxy(Communicator * pCommunicator, const string& name,const string& setName);
  608. /**
  609. * 析构函数
  610. */
  611. virtual ~ServantProxy();
  612. public:
  613. /**
  614. * 获取Object可用服务列表 如果启用set则获取本set的,如果启用分组,只返回同分组的服务端ip
  615. * @return void
  616. */
  617. void tars_endpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  618. /**
  619. * 获取Object可用服务列表 所有的列表
  620. * @return void
  621. */
  622. void tars_endpointsAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  623. /**
  624. * 获取Object可用服务列表 根据set名字获取
  625. * @return void
  626. */
  627. void tars_endpointsBySet(const string & sName,vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  628. /**
  629. * 获取Object可用服务列表 根据地区名字获取
  630. * @return void
  631. */
  632. void tars_endpointsByStation(const string & sName,vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  633. /**
  634. * 获取Object可用服务列表 包括指定归属地
  635. * @return vector<TC_Endpoint>
  636. **/
  637. vector<TC_Endpoint> tars_endpoints(const std::string & sStation);
  638. /**
  639. * 获取Object可用服务列表 包括指定归属地
  640. * @return void
  641. */
  642. void tars_endpoints(const std::string & sStation, vector<TC_Endpoint> & vecActive, vector<TC_Endpoint> & vecInactive);
  643. /**
  644. * 获取Object可用服务列表 如果启用分组,只返回同分组的服务端ip
  645. * @return vector<TC_Endpoint>
  646. */
  647. vector<TC_Endpoint> getEndpoint();
  648. /**
  649. * 获取Object可用服务列表 包括所有IDC
  650. * @return vector<TC_Endpoint>
  651. */
  652. vector<TC_Endpoint> getEndpoint4All();
  653. /**
  654. * 获取通信器
  655. *
  656. * @return Communicator*
  657. */
  658. Communicator* tars_communicator() { return _communicator; }
  659. /**
  660. * 发送测试消息到服务器
  661. */
  662. void tars_ping();
  663. /**
  664. * 异步ping, 不等回包
  665. */
  666. void tars_async_ping();
  667. /**
  668. * 设置代理
  669. * @param type
  670. * @param ep
  671. * @param user
  672. * @param pass
  673. */
  674. void tars_set_proxy(SERVANT_PROXY type, const TC_Endpoint &ep, const string &user, const string &pass);
  675. /**
  676. * 设置同步调用超时时间,对该proxy上所有方法都有效
  677. * @param msecond
  678. */
  679. void tars_timeout(int msecond);
  680. /**
  681. * 获取同步调用超时时间,对该proxy上所有方法都有效
  682. * @return int
  683. */
  684. int tars_timeout() const;
  685. /**
  686. * 获取连接超时时间
  687. * @return int
  688. */
  689. int tars_connect_timeout() const;
  690. /**
  691. * 设置连接超时时间
  692. * @return int
  693. */
  694. void tars_connect_timeout(int conTimeout);
  695. /**
  696. * set auto reconnect time
  697. * @return int, second
  698. */
  699. void tars_reconnect(int second);
  700. /**
  701. * 获取所属的Object名称
  702. * @return string
  703. */
  704. const string &tars_name() const;
  705. /**
  706. * set name
  707. * @return
  708. */
  709. const string &tars_setName() const;
  710. /**
  711. * 获取所属的Object名称#hash@address(即传入stringToProxy中的地址)
  712. * @return string
  713. */
  714. string tars_full_name() const;
  715. /**
  716. * 获取最近一次调用的IP地址和端口
  717. * @return string
  718. */
  719. static TC_Endpoint tars_invoke_endpoint();
  720. /**
  721. * 设置连接为多连接, 串行模式
  722. * @param connectionSerial, <=0: 连接复用模式(一个连接上同时跑多个请求, 响应包), >0: 连接串行模式(连接个数), 同一个连接上并行只能跑一个包(http协议)
  723. */
  724. void tars_connection_serial(int connectionSerial);
  725. /**
  726. * 获取连接并发模式
  727. * @return int
  728. */
  729. int tars_connection_serial() const;
  730. /**
  731. * 直接设置内置支持的协议
  732. */
  733. void tars_set_protocol(SERVANT_PROTOCOL protocol, int connectionSerial = 0);
  734. /**
  735. * 设置用户自定义协议
  736. * @param protocol
  737. */
  738. void tars_set_protocol(const ProxyProtocol& protocol, int connectionSerial = 0);
  739. /**
  740. * get protocol
  741. * @return
  742. */
  743. const ProxyProtocol &tars_get_protocol() const;
  744. /**
  745. *设置套接字选项
  746. */
  747. void tars_set_sockopt(int level, int optname, const void *optval, SOCKET_LEN_TYPE optlen);
  748. /**
  749. * 获取套接字选项
  750. */
  751. vector<SocketOpt> tars_get_sockopt() const;
  752. /**
  753. * 设置超时检查参数
  754. */
  755. void tars_set_check_timeout(const CheckTimeoutInfo& checkTimeoutInfo);
  756. /**
  757. * 获取超时检查参数
  758. */
  759. CheckTimeoutInfo tars_get_check_timeout();
  760. /**
  761. * hash方法,为保证一段时间内同一个key的消息发送
  762. * 到相同的服务端,由于服务列表动态变化,所以
  763. * 不严格保证
  764. * @param key
  765. * @return ServantProxy*
  766. */
  767. virtual ServantProxy* tars_hash(int64_t key);
  768. /**
  769. * 一致性hash方法
  770. */
  771. virtual ServantProxy* tars_consistent_hash(int64_t key);
  772. /**
  773. * 清除当前的Hash数据
  774. * 空函数 为了兼容以前的
  775. * @param key
  776. * @return ServantProxy*
  777. */
  778. void tars_clear_hash();
  779. /**
  780. * 针对客户端调用接口级别的超时时间设置,包括同步和异步调用
  781. * 每次接口调用都必须设置,否则取系统默认超时时间
  782. *
  783. * @param msecond 单位毫秒
  784. * @return ServantProxy*
  785. * 示例: prxy->tars_set_timeout(3000)->sayHello();
  786. */
  787. virtual ServantProxy* tars_set_timeout(int msecond);
  788. /**
  789. * 设置异步调用超时时间,对该proxy上所有方法都有效
  790. * @param msecond
  791. */
  792. void tars_async_timeout(int msecond);
  793. /**
  794. * 获取异步调用超时时间,对该proxy上所有方法都有效
  795. * @return int
  796. */
  797. int tars_async_timeout() const;
  798. /**
  799. * 主动更新端口
  800. * @param active
  801. * @param inactive
  802. */
  803. void tars_update_endpoints(const set<EndpointInfo> &active, const set<EndpointInfo> &inactive);
  804. /**
  805. * 设置自定义回调(注意不在异步回调线程执行, 而是在网络线程中回调, 注意不要阻塞)
  806. * (这种模式下callback hash无效)
  807. * @param callback
  808. */
  809. void tars_set_custom_callback(custom_callback callback);
  810. /**
  811. * callback启用hash模式, 根据到服务端连接hash, 即同一个服务端连接过来的请求落入到一个异步回调线程中
  812. */
  813. void tars_enable_callback_hash();
  814. /*
  815. * 用proxy产生一个该object上的序列号
  816. * @return uint32_t
  817. */
  818. virtual uint32_t tars_gen_requestid();
  819. /**
  820. * 设置PUSH类消息的响应callback
  821. * @param cb
  822. */
  823. virtual void tars_set_push_callback(const ServantProxyCallbackPtr& cb);
  824. /**
  825. * 获取PUSH类消息的callback对象
  826. */
  827. ServantProxyCallbackPtr tars_get_push_callback();
  828. /**
  829. * 超时策略获取和设置
  830. * @return CheckTimeoutInfo&
  831. */
  832. inline const CheckTimeoutInfo& tars_check_timeout_info() const { return _checkTimeoutInfo; }
  833. /**
  834. * 普通协议同步远程调用
  835. */
  836. virtual void rpc_call(uint32_t requestId, const string& sFuncName,
  837. const char* buff, uint32_t len, ResponsePacket &rsp);
  838. /**
  839. * 普通协议异步调用
  840. */
  841. virtual void rpc_call_async(uint32_t requestId, const string& sFuncName,
  842. const char* buff, uint32_t len,
  843. const ServantProxyCallbackPtr& callback,
  844. bool bCoro = false);
  845. /**
  846. * http1/2协议同步远程调用
  847. * @param funcName: 调用名称, 这里只是做统计用
  848. */
  849. void http_call(const string &funcName, shared_ptr<TC_HttpRequest> &request, shared_ptr<TC_HttpResponse> &response);
  850. /**
  851. * http1/2协议异步远程调用
  852. * @param funcName: 调用名称, 这里只是做统计用
  853. */
  854. void http_call_async(const string &funcName, shared_ptr<TC_HttpRequest> &request, const HttpCallbackPtr &cb, bool bCoro = false);
  855. /**
  856. * TARS协议同步方法调用
  857. */
  858. shared_ptr<ResponsePacket> tars_invoke(char cPacketType,
  859. const string& sFuncName,
  860. tars::TarsOutputStream<tars::BufferWriterVector>& buf,
  861. const map<string, string>& context,
  862. const map<string, string>& status);
  863. /**
  864. * TARS协议同步方法调用
  865. */
  866. shared_ptr<ResponsePacket> tars_invoke(char cPacketType,
  867. const string& sFuncName,
  868. const vector<char>& buf,
  869. const map<string, string>& context,
  870. const map<string, string>& status);
  871. /**
  872. * TARS协议异步方法调用
  873. */
  874. void tars_invoke_async(char cPacketType,
  875. const string& sFuncName,
  876. tars::TarsOutputStream<tars::BufferWriterVector> &buf,
  877. const map<string, string>& context,
  878. const map<string, string>& status,
  879. const ServantProxyCallbackPtr& callback,
  880. bool bCoro = false);
  881. /**
  882. * TARS协议异步方法调用
  883. */
  884. void tars_invoke_async(char cPacketType,
  885. const string& sFuncName,
  886. const vector<char> &buf,
  887. const map<string, string>& context,
  888. const map<string, string>& status,
  889. const ServantProxyCallbackPtr& callback,
  890. bool bCoro = false);
  891. /**
  892. * 获取所有objectproxy(包括子servant), 该函数主要给自动测试使用!
  893. * @return
  894. */
  895. vector<ObjectProxy*> getObjectProxys();
  896. protected:
  897. /**
  898. * 获得可以复用的servant
  899. * @return
  900. */
  901. ServantPrx getServantPrx(ReqMessage *msg);
  902. /**
  903. * get proxy info
  904. */
  905. inline const std::shared_ptr<TC_ProxyInfo::ProxyBaseInfo>& getProxyInfo() { return _proxyBaseInfo; }
  906. /**
  907. *
  908. */
  909. void tars_initialize();
  910. /**
  911. *
  912. * @param prx
  913. * @param f
  914. */
  915. void travelObjectProxys(ServantProxy *prx, function<void(ObjectProxy*)> f);
  916. friend class ServantProxyCallback;
  917. friend class Communicator;
  918. friend class ServantProxyFactory;
  919. private:
  920. /**
  921. * 获取第一个ObjectProxy
  922. * @return
  923. */
  924. ObjectProxy *getObjectProxy(size_t netThreadSeq = 0);
  925. /**
  926. *
  927. * @param func
  928. */
  929. void forEachObject(std::function<void(ObjectProxy*)> func);
  930. private:
  931. /**
  932. * 远程方法调用
  933. * @param req
  934. * @return int
  935. */
  936. void invoke(ReqMessage *msg, bool bCoroAsync = false);
  937. /**
  938. * 选择某个servant来发送
  939. * @param msg
  940. * @param bCoroAsync
  941. */
  942. int servant_invoke(ReqMessage *msg, bool bCoroAsync);
  943. /**
  944. * 选取一个网络线程对应的信息
  945. * @param pSptd
  946. * @return void
  947. */
  948. void selectNetThreadInfo(ServantProxyThreadData *pSptd, ObjectProxy *&pObjProxy, shared_ptr<ReqInfoQueue> &pReqQ);
  949. /**
  950. * 检查是否需要设置染色消息
  951. * @param req
  952. */
  953. void checkDye(RequestPacket& req);
  954. /**
  955. * 检查是否需要设置调用链追踪
  956. * @param req
  957. */
  958. void checkTrace(RequestPacket &req);
  959. /**
  960. * 更新endpoint
  961. * @param active
  962. * @param inactive
  963. */
  964. void onNotifyEndpoints(CommunicatorEpoll *communicatorEpoll, const set<EndpointInfo> & active,const set<EndpointInfo> & inactive);
  965. /**
  966. * 端口不活跃
  967. */
  968. void onSetInactive(const EndpointInfo &ep);
  969. /**
  970. * 检查是否需要设置cookie
  971. * @param req
  972. */
  973. void checkCookie(RequestPacket &req);
  974. private:
  975. friend class ObjectProxy;
  976. friend class AdapterProxy;
  977. friend class CommunicatorEpoll;
  978. /**
  979. * 通信器
  980. */
  981. Communicator * _communicator;
  982. /**
  983. * 保存ObjectProxy对象的指针数组
  984. */
  985. ObjectProxy * _objectProxy; //保存ObjectProxy对象的指针数组
  986. /**
  987. * 同步调用超时(毫秒)
  988. */
  989. int _syncTimeout;
  990. /**
  991. * 同步调用超时(毫秒)
  992. */
  993. int _asyncTimeout;
  994. /**
  995. * 唯一id
  996. */
  997. std::atomic<uint32_t> _id;
  998. /**
  999. * 获取endpoint对象
  1000. */
  1001. std::unique_ptr<EndpointManagerThread> _endpointInfo;
  1002. /**
  1003. * 是否在RequestPacket中的context设置主调信息
  1004. */
  1005. bool _masterFlag;
  1006. /*
  1007. *最小的超时时间
  1008. */
  1009. int64_t _minTimeout;
  1010. /**
  1011. * 最大连接串行数(默认0, 表示连接并行请求)
  1012. */
  1013. int _connectionSerial = 0;
  1014. /**
  1015. * 短连接使用http使用
  1016. */
  1017. ServantPrx _rootPrx;
  1018. /**
  1019. *
  1020. */
  1021. std::atomic<int> _servantId{0};
  1022. /**
  1023. *
  1024. */
  1025. std::mutex _servantMutex;
  1026. /**
  1027. *
  1028. */
  1029. vector<ServantPrx> _servantList;
  1030. /**
  1031. * 代理的基本信息
  1032. */
  1033. std::shared_ptr<TC_ProxyInfo::ProxyBaseInfo> _proxyBaseInfo;
  1034. /**
  1035. * custom callback
  1036. */
  1037. custom_callback _callback;
  1038. /**
  1039. * callback hash
  1040. */
  1041. bool _callbackHash = false;
  1042. /**
  1043. * 链接超时
  1044. */
  1045. int _connTimeout = DEFAULT_ASYNCTIMEOUT;
  1046. /*
  1047. * 请求和响应的协议解析器
  1048. */
  1049. ProxyProtocol _proxyProtocol;
  1050. /*
  1051. * push消息 callback
  1052. */
  1053. ServantProxyCallbackPtr _pushCallback;
  1054. /*
  1055. * 超时控制策略信息
  1056. */
  1057. CheckTimeoutInfo _checkTimeoutInfo;
  1058. /*
  1059. * socket选项
  1060. */
  1061. vector<SocketOpt> _socketOpts;
  1062. };
  1063. }
  1064. #endif