123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269 |
- /**
- * Tencent is pleased to support the open source community by making Tars available.
- *
- * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
- *
- * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * https://opensource.org/licenses/BSD-3-Clause
- *
- * Unless required by applicable law or agreed to in writing, software distributed
- * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
- #ifndef _TARS_SERVANT_PROXY_H_
- #define _TARS_SERVANT_PROXY_H_
- #include "util/tc_common.h"
- #include "util/tc_uuid_generator.h"
- #include "util/tc_monitor.h"
- #include "util/tc_autoptr.h"
- #include "util/tc_proxy_info.h"
- #include "util/tc_singleton.h"
- #include "servant/Message.h"
- #include "servant/AppProtocol.h"
- #include "servant/Current.h"
- #include "servant/EndpointInfo.h"
- namespace tars
- {
- class CommunicatorEpoll;
- class EndpointInfo;
- /////////////////////////////////////////////////////////////////////////
- //seq 管理的类
- class SeqManager
- {
- public:
- const static uint16_t MAX_UNSIGN_SHORT = 0xffff;
- struct SeqInfo
- {
- uint16_t next;
- bool free;
- };
- /**
- * 构造函数
- */
- SeqManager(uint16_t iNum);
- /**
- * 析构
- */
- ~SeqManager();
- /**
- * 获取seq
- */
- uint16_t get();
- /**
- * 删除seq
- */
- void del(uint16_t iSeq);
- private:
- uint16_t _num;
- uint16_t _free;
- uint16_t _freeTail;
- SeqInfo * _p;
- TC_SpinLock _mutex;
- };
- /////////////////////////////////////////////////////////////////////////
- /*
- * 线程私有数据
- */
- class ServantProxyThreadData : public std::enable_shared_from_this<ServantProxyThreadData>
- {
- public:
- /**
- * 全局不死的数据, 私用指针, 且不delete
- * 业务不需要主动使用该对象!
- */
- class Immortal
- {
- public:
- Immortal();
- ~Immortal();
- void add(ServantProxyThreadData *data);
- void erase(ServantProxyThreadData* data);
- void erase(Communicator * comm);
- unordered_set<ServantProxyThreadData *> getList();
- SeqManager *getSeqManager() { return _pSeq.get(); }
- protected:
- unordered_set<ServantProxyThreadData*> _sp_list;
- TC_ThreadMutex _mutex;
- unique_ptr<SeqManager> _pSeq;
-
- };
- static shared_ptr<Immortal> g_immortal;
- public:
- static thread_local shared_ptr<ServantProxyThreadData> g_sp;
- /**
- * global Immortal ptr, 避免Immortal提前被释放掉
- */
- shared_ptr<Immortal> _sp_immortal;
- /**
- * 构造函数
- */
- ServantProxyThreadData();
- /**
- * 析构函数
- */
- ~ServantProxyThreadData();
- /**
- * 获取线程数据,没有的话会自动创建
- * @return ServantProxyThreadData*
- */
- static ServantProxyThreadData * getData();
- /**
- * 析构通信器时调用
- * @param communicator
- */
- static void deconstructor(Communicator *communicator);
- /**
- * move掉
- */
- ThreadPrivateData move();
- /**
- * 业务发起调用的线程和网络通信器间都有一个队列
- */
- struct CommunicatorEpollReqQueueInfo
- {
- weak_ptr<ReqInfoQueue> _reqQueue;
- weak_ptr<CommunicatorEpoll> _communicatorEpoll;
- };
- //每发起调用的线程 记录的 公有网络通信器数据
- //此时业务线程和
- struct CommunicatorEpollInfo
- {
- /*
- * 每个线程跟客户端网络线程通信的队列
- * <网络线程序号, 网络通信信息>
- */
- vector<CommunicatorEpollReqQueueInfo> _info;
- size_t _netSeq = 0; //轮训选择网络线程的偏移量
- Communicator *_communicator = NULL;
- };
- /**
- * 业务线程处于协程模式下, 记录当前网络通信器信息
- * 此时业务线程和网络通信器是一对一的, 即用自身线程对应的私有网络通信器即可
- */
- struct SchedCommunicatorEpollInfo
- {
- CommunicatorEpollReqQueueInfo _info;
- Communicator *_communicator = NULL;
- };
- /**
- * 初始化当前业务线程和网络通信器之间的关系(构建发送队列)
- */
- shared_ptr<ServantProxyThreadData::CommunicatorEpollInfo> addCommunicatorEpoll(const shared_ptr<CommunicatorEpoll> &ce);
- /**
- * 通信器析构时调用
- * @param communicator
- */
- void erase(Communicator *communicator);
- /**
- * 获取公有通信器对应的网络通信器等基本信息
- * @param communicator
- * @return
- */
- shared_ptr<CommunicatorEpollInfo> getCommunicatorEpollInfo(Communicator *communicator);
- /**
- * 获取私有通信器对应的网络通信器等基本信息
- * @param communicator
- * @return
- */
- shared_ptr<SchedCommunicatorEpollInfo> getSchedCommunicatorEpollInfo(Communicator *communicator);
- protected:
- /**
- * communicator对应的公用网路通信器
- */
- unordered_map<Communicator*, shared_ptr<CommunicatorEpollInfo>> _communicatorEpollInfo;
- /**
- * 私有的网络通信器, 每个业务线程都对应一个, 业务线程是协程模式下使用
- */
- unordered_map<Communicator*, shared_ptr<SchedCommunicatorEpollInfo>> _schedCommunicatorEpollInfo;
- public:
- //lock
- TC_ThreadMutex _mutex;
- //业务线程的序号, 通知网络线程时, 知道用哪个notify来唤醒网路线程
- uint16_t _reqQNo;
- /**
- * 协程调度
- */
- shared_ptr<TC_CoroutineScheduler> _sched;
- /**
- * 线程私有数据
- */
- ThreadPrivateData _data;
- /**
- * 当前线程是否关联了网络通信器, 如果关联了, 则表示当前线程处于网络线程中!
- */
- CommunicatorEpoll *_communicatorEpoll = NULL;
- ///////////////////////////////////////////////////////////////////////////////////////
- /**
- * 调用链追踪信息
- */
- struct TraceContext
- {
- int traceType; // 0 不用打参数, 1 只打客户端调用参数, 2 客户端服务端参数都打印
- string traceID; // traceID
- string spanID; // spanID
- string parentSpanID; // 父spanID
- enum E_SpanType
- {
- EST_CS = 1,
- EST_CR = 2,
- EST_SR = 4,
- EST_SS = 8,
- EST_TS,
- EST_TE,
- };
- // key 分两种情况,1.rpc调用; 2.异步回调
- bool init(const string& k)
- {
- vector<string> vs = TC_Common::sepstr<string>(k, "|");
- if (vs.size() == 2)
- {
- traceID = vs[0];
- parentSpanID = vs[1];
- spanID = "";
- traceType =initType(traceID);
- return true;
- }
- else if (vs.size() == 3)
- {
- traceID = vs[0];
- spanID = vs[1];
- parentSpanID = vs[2];
- traceType = initType(traceID);
- return true;
- }
- else
- {
- reset();
- }
- return false;
- }
- static int initType(const string& tid)
- {
- string::size_type pos = tid.find("-");
- int type = 0;
- if (pos != string::npos)
- {
- type = strtol(tid.substr(0, pos).c_str(), NULL, 16);
- }
- if (type < 0 || type > 15)
- {
- type = 0;
- }
- return type;
- }
- void reset()
- {
- traceID = "";
- spanID = "";
- parentSpanID = "";
- traceType = 0;
- }
- TraceContext()
- {
- }
- TraceContext(const string& k)
- {
- init(k);
- }
- void newSpan()
- {
- spanID = TC_UUIDGenerator::getInstance()->genID();
- }
- string getKey(E_SpanType es) const
- {
- switch (es)
- {
- case EST_CS:
- case EST_CR:
- case EST_TS:
- case EST_TE:
- return traceID + "|" + spanID + "|" + parentSpanID;
- break;
- case EST_SR:
- case EST_SS:
- return traceID + "|" + parentSpanID + "|*";
- break;
- default:
- break;
- }
- return "";
- }
- string getKey(bool full) const
- {
- return full ? (traceID + "|" + spanID + "|" + parentSpanID) : (traceID + "|" + spanID);
- }
- static bool needParam(E_SpanType es, int type)
- {
- if (es == EST_TS)
- {
- es = EST_CS;
- }
- else if (es == EST_TE)
- {
- es = EST_CR;
- }
- return (bool)((int)es & type);
- }
- };
- bool _traceCall; //标识当前线程是否需要调用链追踪
- TraceContext _traceContext; //调用链追踪信息
- string getTraceKey(TraceContext::E_SpanType es) const
- {
- return _traceContext.getKey(es);
- }
- string getTraceKey(bool full = false) const
- {
- return _traceContext.getKey(full);
- }
- void newSpan()
- {
- _traceContext.newSpan();
- }
- bool initTrace(const string& k)
- {
- return _traceContext.init(k);
- }
- int getTraceType() const
- {
- return _traceContext.traceType;
- }
- bool needTraceParam(TraceContext::E_SpanType es)
- {
- return _traceContext.needParam(es, _traceContext.traceType);
- }
- static bool needTraceParam(TraceContext::E_SpanType es, const string& k)
- {
- int type = TraceContext::initType(k);
- return TraceContext::needParam(es, type);
- }
- ////////////////////////////////////////////////////////////////////////////////////调用链追踪 end/////
- };
- //////////////////////////////////////////////////////////////////////////
- // 协程并行请求的基类
- class CoroParallelBase : virtual public TC_HandleBase
- {
- public:
- /**
- * 构造
- */
- CoroParallelBase(int iNum)
- : _num(iNum)
- , _count(iNum)
- , _req_count(0)
- {}
- /**
- * 析构函数
- */
- virtual ~CoroParallelBase() {}
- /**
- * 增加调用协程接口请求的数目
- */
- int incReqCount() { return (++_req_count); }
- /**
- * 判断协程并行请求数目是否都发送了
- */
- bool checkAllReqSend() { return _num == _req_count; }
- /**
- * 判断协程并行请求是否都回来了
- */
- bool checkAllReqReturn() { return (--_count) == 0; }
- /**
- * 获取所有请求回来的响应
- */
- vector<ReqMessage*> getAllReqMessage()
- {
- vector<ReqMessage*> vRet;
- {
- TC_LockT<TC_SpinLock> lock(_mutex);
- vRet.swap(_vReqMessage);
- _vReqMessage.clear();
- }
- return vRet;
- }
- /**
- * 插入请求回来的响应
- */
- void insert(ReqMessage* msg)
- {
- TC_LockT<TC_SpinLock> lock(_mutex);
- _vReqMessage.push_back(msg);
- }
- protected:
- /**
- * 并行请求的数目
- */
- int _num;
- /**
- * 并行请求的响应还未回来的数目
- */
- std::atomic<int> _count;
- /**
- * 并行请求的已发送的数目
- */
- std::atomic<int> _req_count;
- /**
- * 互斥锁
- */
- TC_SpinLock _mutex;
- /**
- * 请求的响应的容器
- */
- vector<ReqMessage*> _vReqMessage;
- };
- typedef TC_AutoPtr<CoroParallelBase> CoroParallelBasePtr;
- //等待所有的请求回来
- void coroWhenAll(const CoroParallelBasePtr &ptr);
- //////////////////////////////////////////////////////////////////////////
- // 异步回调对象的基类
- class ServantProxyCallback : virtual public TC_HandleBase
- {
- public:
- /**
- * 构造
- */
- ServantProxyCallback();
- /**
- * 析构函数
- */
- virtual ~ServantProxyCallback() {}
- /**
- * 设置发起调用的servant
- * @param prx
- */
- void setServantPrx(const ServantPrx &prx) { _servantPrx = prx; }
- /**
- * 获取类型
- * @return const string&
- */
- virtual const string& getType() { return _type; }
- /**
- * 设置类型
- * @return const string&
- */
- virtual void setType(const string& type) { _type = type; }
- /**
- * 设置coro并行请求的共享智能指针
- */
- virtual void setCoroParallelBasePtr(CoroParallelBasePtr pPtr) { _pPtr = pPtr; }
- /**
- * 获取coro并行请求的共享智能指针
- */
- virtual const CoroParallelBasePtr& getCoroParallelBasePtr() { return _pPtr; }
- /**
- * 异步请求是否在网络线程处理
- * 内部用的到 业务不能设置这个值
- * */
- inline void setNetThreadProcess(bool bNetThreadProcess)
- {
- _bNetThreadProcess = bNetThreadProcess;
- }
- inline bool getNetThreadProcess()
- {
- return _bNetThreadProcess;
- }
- public:
- /**
- * dispatch, call onDispatch
- * @param msg
- * @return
- */
- int dispatch(ReqMessagePtr msg);
- protected:
- /**
- * 异步回调对象实现该方法,进行业务逻辑处理
- * @param msg
- * @return int
- */
- virtual int onDispatch(ReqMessagePtr msg) = 0;
- /**
- * 连接关闭掉了(push callback 才有效),老版本的onClose不带ep,为了兼容并且带上ep
- */
- virtual void onClose() {};
- virtual void onClose(const TC_Endpoint& ep) {onClose();};
- /**
- * 连接已建立(push callback 才有效)
- */
- virtual void onConnect(const TC_Endpoint& ep) {};
- friend class AdapterProxy;
- protected:
- /**
- * 同一链路多个cb的时候可以用来区分class类型
- */
- string _type;
- /**
- * 异步请求是否在网络线程处理
- * 内部用的到 业务不能设置这个值
- * */
- bool _bNetThreadProcess;
- /**
- * 协程并行请求的共享智能指针
- */
- CoroParallelBasePtr _pPtr;
- /**
- * servant prx
- */
- ServantPrx _servantPrx;
- };
- ///////////////////////////////////////////////////////////////////////////////////////////////
- // for http
- class HttpCallback : public TC_HandleBase
- {
- public:
- virtual int onHttpResponse(const shared_ptr<TC_HttpResponse> &rsp) = 0;
- virtual int onHttpResponseException(int expCode) = 0;
- };
- typedef TC_AutoPtr<HttpCallback> HttpCallbackPtr;
- class HttpServantProxyCallback : virtual public ServantProxyCallback
- {
- public:
- explicit HttpServantProxyCallback(const HttpCallbackPtr& cb);
- /**
- * 异步回调对象实现该方法,进行业务逻辑处理
- * @param msg
- * @return int
- */
- virtual int onDispatch(ReqMessagePtr ptr);
- /**
- * 异步回调对象实现该方法,进行业务逻辑处理
- * @param msg
- * @return void
- */
- virtual int onDispatchResponse(const RequestPacket &req, const ResponsePacket &rsp);
- /**
- * 异步回调对象实现该方法(异常),进行业务逻辑处理
- * @param msg
- * @return void
- */
- virtual int onDispatchException(const RequestPacket &req, const ResponsePacket &rsp);
- private:
- HttpCallbackPtr _httpCb;
- };
- //////////////////////////////////////////////////////////////////////////
- /**
- * 1:远程对象的本地代理
- * 2:同名servant在一个通信器中最多只有一个实例
- * 3:防止和用户在Tars中定义的函数名冲突,接口以tars_开头
- */
- class EndpointManagerThread;
- class ServantProxy : public TC_HandleBase, public TC_ThreadMutex
- {
- public:
- /**
- * 通过status传递数据时用到的缺省字符串
- */
- static string STATUS_DYED_KEY; //需要染色的用户ID
- static string STATUS_GRID_KEY; //需要灰度染色的用户ID
- static string STATUS_RESULT_CODE; //处理结果码,tup使用
- static string STATUS_RESULT_DESC; //处理结果描述,tup使用
- static string STATUS_SETNAME_VALUE; //set调用
- static string STATUS_TRACE_KEY; //trace信息
- ///////////////////////////////////////////////////////////////////
- /**
- * socket选项
- */
- struct SocketOpt
- {
- int level;
- int optname;
- const void *optval;
- SOCKET_LEN_TYPE optlen;
- };
- /**
- * 缺省的同步调用超时时间
- * 超时后不保证消息不会被服务端处理
- */
- enum { DEFAULT_SYNCTIMEOUT = 3000, DEFAULT_ASYNCTIMEOUT=5000};
- /**
- * default connection serial num
- */
- const static int DEFAULT_CONNECTION_SERIAL = 10;
- //自定义回调
- typedef std::function<void(ReqMessagePtr)> custom_callback;
- /**
- * 内置四种协议支持
- */
- enum SERVANT_PROTOCOL
- {
- PROTOCOL_TARS, //默认tars服务的协议
- PROTOCOL_HTTP1, //http协议
- PROTOCOL_HTTP2, //http2协议
- PROTOCOL_GRPC, //grpc协议
- };
- /**
- * 代理设置
- */
- enum SERVANT_PROXY
- {
- PROXY_SOCK4, //支持sock4代理
- PROXY_SOCK5, //支持sock5代理
- PROXY_HTTP, //支持http代理
- };
- /**
- * 构造函数
- * @param op
- */
- ServantProxy(Communicator * pCommunicator, const string& name,const string& setName);
- /**
- * 析构函数
- */
- virtual ~ServantProxy();
- public:
- /**
- * 获取Object可用服务列表 如果启用set则获取本set的,如果启用分组,只返回同分组的服务端ip
- * @return void
- */
- void tars_endpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
- /**
- * 获取Object可用服务列表 所有的列表
- * @return void
- */
- void tars_endpointsAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
- /**
- * 获取Object可用服务列表 根据set名字获取
- * @return void
- */
- void tars_endpointsBySet(const string & sName,vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
- /**
- * 获取Object可用服务列表 根据地区名字获取
- * @return void
- */
- void tars_endpointsByStation(const string & sName,vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
- /**
- * 获取Object可用服务列表 包括指定归属地
- * @return vector<TC_Endpoint>
- **/
- vector<TC_Endpoint> tars_endpoints(const std::string & sStation);
- /**
- * 获取Object可用服务列表 包括指定归属地
- * @return void
- */
- void tars_endpoints(const std::string & sStation, vector<TC_Endpoint> & vecActive, vector<TC_Endpoint> & vecInactive);
- /**
- * 获取Object可用服务列表 如果启用分组,只返回同分组的服务端ip
- * @return vector<TC_Endpoint>
- */
- vector<TC_Endpoint> getEndpoint();
- /**
- * 获取Object可用服务列表 包括所有IDC
- * @return vector<TC_Endpoint>
- */
- vector<TC_Endpoint> getEndpoint4All();
- /**
- * 获取通信器
- *
- * @return Communicator*
- */
- Communicator* tars_communicator() { return _communicator; }
- /**
- * 发送测试消息到服务器
- */
- void tars_ping();
- /**
- * 异步ping, 不等回包
- */
- void tars_async_ping();
- /**
- * 设置代理
- * @param type
- * @param ep
- * @param user
- * @param pass
- */
- void tars_set_proxy(SERVANT_PROXY type, const TC_Endpoint &ep, const string &user, const string &pass);
- /**
- * 设置同步调用超时时间,对该proxy上所有方法都有效
- * @param msecond
- */
- void tars_timeout(int msecond);
- /**
- * 获取同步调用超时时间,对该proxy上所有方法都有效
- * @return int
- */
- int tars_timeout() const;
- /**
- * 获取连接超时时间
- * @return int
- */
- int tars_connect_timeout() const;
- /**
- * 设置连接超时时间
- * @return int
- */
- void tars_connect_timeout(int conTimeout);
- /**
- * set auto reconnect time
- * @return int, second
- */
- void tars_reconnect(int second);
- /**
- * 获取所属的Object名称
- * @return string
- */
- const string &tars_name() const;
- /**
- * set name
- * @return
- */
- const string &tars_setName() const;
- /**
- * 获取所属的Object名称#hash@address(即传入stringToProxy中的地址)
- * @return string
- */
- string tars_full_name() const;
- /**
- * 获取最近一次调用的IP地址和端口
- * @return string
- */
- static TC_Endpoint tars_invoke_endpoint();
- /**
- * 设置连接为多连接, 串行模式
- * @param connectionSerial, <=0: 连接复用模式(一个连接上同时跑多个请求, 响应包), >0: 连接串行模式(连接个数), 同一个连接上并行只能跑一个包(http协议)
- */
- void tars_connection_serial(int connectionSerial);
- /**
- * 获取连接并发模式
- * @return int
- */
- int tars_connection_serial() const;
- /**
- * 直接设置内置支持的协议
- */
- void tars_set_protocol(SERVANT_PROTOCOL protocol, int connectionSerial = 0);
- /**
- * 设置用户自定义协议
- * @param protocol
- */
- void tars_set_protocol(const ProxyProtocol& protocol, int connectionSerial = 0);
- /**
- * get protocol
- * @return
- */
- const ProxyProtocol &tars_get_protocol() const;
- /**
- *设置套接字选项
- */
- void tars_set_sockopt(int level, int optname, const void *optval, SOCKET_LEN_TYPE optlen);
- /**
- * 获取套接字选项
- */
- vector<SocketOpt> tars_get_sockopt() const;
- /**
- * 设置超时检查参数
- */
- void tars_set_check_timeout(const CheckTimeoutInfo& checkTimeoutInfo);
- /**
- * 获取超时检查参数
- */
- CheckTimeoutInfo tars_get_check_timeout();
- /**
- * hash方法,为保证一段时间内同一个key的消息发送
- * 到相同的服务端,由于服务列表动态变化,所以
- * 不严格保证
- * @param key
- * @return ServantProxy*
- */
- virtual ServantProxy* tars_hash(int64_t key);
- /**
- * 一致性hash方法
- */
- virtual ServantProxy* tars_consistent_hash(int64_t key);
- /**
- * 清除当前的Hash数据
- * 空函数 为了兼容以前的
- * @param key
- * @return ServantProxy*
- */
- void tars_clear_hash();
- /**
- * 针对客户端调用接口级别的超时时间设置,包括同步和异步调用
- * 每次接口调用都必须设置,否则取系统默认超时时间
- *
- * @param msecond 单位毫秒
- * @return ServantProxy*
- * 示例: prxy->tars_set_timeout(3000)->sayHello();
- */
- virtual ServantProxy* tars_set_timeout(int msecond);
- /**
- * 设置异步调用超时时间,对该proxy上所有方法都有效
- * @param msecond
- */
- void tars_async_timeout(int msecond);
- /**
- * 获取异步调用超时时间,对该proxy上所有方法都有效
- * @return int
- */
- int tars_async_timeout() const;
- /**
- * 主动更新端口
- * @param active
- * @param inactive
- */
- void tars_update_endpoints(const set<EndpointInfo> &active, const set<EndpointInfo> &inactive);
- /**
- * 设置自定义回调(注意不在异步回调线程执行, 而是在网络线程中回调, 注意不要阻塞)
- * (这种模式下callback hash无效)
- * @param callback
- */
- void tars_set_custom_callback(custom_callback callback);
- /**
- * callback启用hash模式, 根据到服务端连接hash, 即同一个服务端连接过来的请求落入到一个异步回调线程中
- */
- void tars_enable_callback_hash();
- /*
- * 用proxy产生一个该object上的序列号
- * @return uint32_t
- */
- virtual uint32_t tars_gen_requestid();
- /**
- * 设置PUSH类消息的响应callback
- * @param cb
- */
- virtual void tars_set_push_callback(const ServantProxyCallbackPtr& cb);
- /**
- * 获取PUSH类消息的callback对象
- */
- ServantProxyCallbackPtr tars_get_push_callback();
- /**
- * 超时策略获取和设置
- * @return CheckTimeoutInfo&
- */
- inline const CheckTimeoutInfo& tars_check_timeout_info() const { return _checkTimeoutInfo; }
- /**
- * 普通协议同步远程调用
- */
- virtual void rpc_call(uint32_t requestId, const string& sFuncName,
- const char* buff, uint32_t len, ResponsePacket &rsp);
- /**
- * 普通协议异步调用
- */
- virtual void rpc_call_async(uint32_t requestId, const string& sFuncName,
- const char* buff, uint32_t len,
- const ServantProxyCallbackPtr& callback,
- bool bCoro = false);
- /**
- * http1/2协议同步远程调用
- * @param funcName: 调用名称, 这里只是做统计用
- */
- void http_call(const string &funcName, shared_ptr<TC_HttpRequest> &request, shared_ptr<TC_HttpResponse> &response);
- /**
- * http1/2协议异步远程调用
- * @param funcName: 调用名称, 这里只是做统计用
- */
- void http_call_async(const string &funcName, shared_ptr<TC_HttpRequest> &request, const HttpCallbackPtr &cb, bool bCoro = false);
- /**
- * TARS协议同步方法调用
- */
- shared_ptr<ResponsePacket> tars_invoke(char cPacketType,
- const string& sFuncName,
- tars::TarsOutputStream<tars::BufferWriterVector>& buf,
- const map<string, string>& context,
- const map<string, string>& status);
- /**
- * TARS协议同步方法调用
- */
- shared_ptr<ResponsePacket> tars_invoke(char cPacketType,
- const string& sFuncName,
- const vector<char>& buf,
- const map<string, string>& context,
- const map<string, string>& status);
- /**
- * TARS协议异步方法调用
- */
- void tars_invoke_async(char cPacketType,
- const string& sFuncName,
- tars::TarsOutputStream<tars::BufferWriterVector> &buf,
- const map<string, string>& context,
- const map<string, string>& status,
- const ServantProxyCallbackPtr& callback,
- bool bCoro = false);
- /**
- * TARS协议异步方法调用
- */
- void tars_invoke_async(char cPacketType,
- const string& sFuncName,
- const vector<char> &buf,
- const map<string, string>& context,
- const map<string, string>& status,
- const ServantProxyCallbackPtr& callback,
- bool bCoro = false);
- /**
- * 获取所有objectproxy(包括子servant), 该函数主要给自动测试使用!
- * @return
- */
- vector<ObjectProxy*> getObjectProxys();
- protected:
- /**
- * 获得可以复用的servant
- * @return
- */
- ServantPrx getServantPrx(ReqMessage *msg);
- /**
- * get proxy info
- */
- inline const std::shared_ptr<TC_ProxyInfo::ProxyBaseInfo>& getProxyInfo() { return _proxyBaseInfo; }
- /**
- *
- */
- void tars_initialize();
- /**
- *
- * @param prx
- * @param f
- */
- void travelObjectProxys(ServantProxy *prx, function<void(ObjectProxy*)> f);
- friend class ServantProxyCallback;
- friend class Communicator;
- friend class ServantProxyFactory;
- private:
- /**
- * 获取第一个ObjectProxy
- * @return
- */
- ObjectProxy *getObjectProxy(size_t netThreadSeq = 0);
- /**
- *
- * @param func
- */
- void forEachObject(std::function<void(ObjectProxy*)> func);
- private:
- /**
- * 远程方法调用
- * @param req
- * @return int
- */
- void invoke(ReqMessage *msg, bool bCoroAsync = false);
- /**
- * 选择某个servant来发送
- * @param msg
- * @param bCoroAsync
- */
- int servant_invoke(ReqMessage *msg, bool bCoroAsync);
- /**
- * 选取一个网络线程对应的信息
- * @param pSptd
- * @return void
- */
- void selectNetThreadInfo(ServantProxyThreadData *pSptd, ObjectProxy *&pObjProxy, shared_ptr<ReqInfoQueue> &pReqQ);
- /**
- * 检查是否需要设置染色消息
- * @param req
- */
- void checkDye(RequestPacket& req);
- /**
- * 检查是否需要设置调用链追踪
- * @param req
- */
- void checkTrace(RequestPacket &req);
- /**
- * 更新endpoint
- * @param active
- * @param inactive
- */
- void onNotifyEndpoints(CommunicatorEpoll *communicatorEpoll, const set<EndpointInfo> & active,const set<EndpointInfo> & inactive);
- /**
- * 端口不活跃
- */
- void onSetInactive(const EndpointInfo &ep);
- /**
- * 检查是否需要设置cookie
- * @param req
- */
- void checkCookie(RequestPacket &req);
- private:
- friend class ObjectProxy;
- friend class AdapterProxy;
- friend class CommunicatorEpoll;
- /**
- * 通信器
- */
- Communicator * _communicator;
- /**
- * 保存ObjectProxy对象的指针数组
- */
- ObjectProxy * _objectProxy; //保存ObjectProxy对象的指针数组
- /**
- * 同步调用超时(毫秒)
- */
- int _syncTimeout;
- /**
- * 同步调用超时(毫秒)
- */
- int _asyncTimeout;
- /**
- * 唯一id
- */
- std::atomic<uint32_t> _id;
- /**
- * 获取endpoint对象
- */
- std::unique_ptr<EndpointManagerThread> _endpointInfo;
-
- /**
- * 是否在RequestPacket中的context设置主调信息
- */
- bool _masterFlag;
- /*
- *最小的超时时间
- */
- int64_t _minTimeout;
- /**
- * 最大连接串行数(默认0, 表示连接并行请求)
- */
- int _connectionSerial = 0;
- /**
- * 短连接使用http使用
- */
- ServantPrx _rootPrx;
- /**
- *
- */
- std::atomic<int> _servantId{0};
- /**
- *
- */
- std::mutex _servantMutex;
- /**
- *
- */
- vector<ServantPrx> _servantList;
- /**
- * 代理的基本信息
- */
- std::shared_ptr<TC_ProxyInfo::ProxyBaseInfo> _proxyBaseInfo;
- /**
- * custom callback
- */
- custom_callback _callback;
- /**
- * callback hash
- */
- bool _callbackHash = false;
- /**
- * 链接超时
- */
- int _connTimeout = DEFAULT_ASYNCTIMEOUT;
- /*
- * 请求和响应的协议解析器
- */
- ProxyProtocol _proxyProtocol;
- /*
- * push消息 callback
- */
- ServantProxyCallbackPtr _pushCallback;
- /*
- * 超时控制策略信息
- */
- CheckTimeoutInfo _checkTimeoutInfo;
- /*
- * socket选项
- */
- vector<SocketOpt> _socketOpts;
- };
- }
- #endif
|