ObjectProxy.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/ObjectProxy.h"
  17. #include "servant/Communicator.h"
  18. #include "servant/CommunicatorEpoll.h"
  19. #include "servant/Global.h"
  20. #include "servant/EndpointManager.h"
  21. #include "servant/AppCache.h"
  22. #include "util/tc_common.h"
  23. #include "util/tc_clientsocket.h"
  24. #include "servant/RemoteLogger.h"
  25. namespace tars
  26. {
  27. ///////////////////////////////////////////////////////////////////////////////////
  28. ///////////////////////////////////////////////////////////////////////////////////
  29. ObjectProxy::ObjectProxy(CommunicatorEpoll *pCommunicatorEpoll, ServantProxy *servantProxy, const string & sObjectProxyName,const string& setName)
  30. : _communicatorEpoll(pCommunicatorEpoll)
  31. , _sObjectProxyName(sObjectProxyName)
  32. , _invokeSetId(setName)
  33. , _isInvokeBySet(false)
  34. , _servantProxy(servantProxy)
  35. {
  36. string::size_type pos = sObjectProxyName.find_first_of('@');
  37. if(pos != string::npos)
  38. {
  39. _name = sObjectProxyName.substr(0,pos);
  40. _address = sObjectProxyName.substr(pos+1);
  41. }
  42. else
  43. {
  44. _name = sObjectProxyName;
  45. //启用set或者指定set调用
  46. if(ClientConfig::SetOpen || !_invokeSetId.empty())
  47. {
  48. //指定set调用时,指定set的优先级最高
  49. _invokeSetId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
  50. _isInvokeBySet = true;
  51. }
  52. }
  53. pos = _name.find_first_of('#');
  54. if(pos != string::npos)
  55. {
  56. _hash = _name.substr(pos+1);
  57. _name = _name.substr(0,pos);
  58. }
  59. }
  60. ObjectProxy::~ObjectProxy()
  61. {
  62. }
  63. void ObjectProxy::initialize(bool rootServant)
  64. {
  65. _endpointManger.reset(new EndpointManager(this, _communicatorEpoll->getCommunicator(), _communicatorEpoll->isFirstNetThread()));
  66. _endpointManger->init(_sObjectProxyName, _invokeSetId, rootServant);
  67. _hasInitialize = true;
  68. }
  69. const vector<AdapterProxy*> & ObjectProxy::getAdapters()
  70. {
  71. return _endpointManger->getAdapters();
  72. }
  73. int ObjectProxy::loadLocator()
  74. {
  75. if(_endpointManger->getDirectProxy())
  76. {
  77. //直接连接
  78. return 0;
  79. }
  80. string locator = _communicatorEpoll->getCommunicator()->getProperty("locator");
  81. if (locator.find_first_not_of('@') == string::npos)
  82. {
  83. TLOGERROR("[Locator is not valid:" << locator << "]" << endl);
  84. return -1;
  85. }
  86. QueryFPrx prx = _communicatorEpoll->getCommunicator()->stringToProxy<QueryFPrx>(locator);
  87. _endpointManger->setLocatorPrx(prx);
  88. return 0;
  89. }
  90. void ObjectProxy::invoke(ReqMessage * msg)
  91. {
  92. TLOGTARS("[ObjectProxy::invoke, objname:" << _name << ", begin...]" << endl);
  93. //选择一个远程服务的Adapter来调用
  94. AdapterProxy * pAdapterProxy = NULL;
  95. bool bFirst = _endpointManger->selectAdapterProxy(msg, pAdapterProxy);
  96. if(bFirst)
  97. {
  98. //判断是否请求过主控
  99. bool bRet = _reqTimeoutQueue.push(msg,msg->request.iTimeout+msg->iBeginTime);
  100. assert(bRet);
  101. //把数据缓存在obj里面
  102. TLOGTARS("[ObjectProxy::invoke, objname:" << _name << ", select adapter proxy not valid (have not invoke reg)]" << endl);
  103. return;
  104. }
  105. if(!pAdapterProxy)
  106. {
  107. TLOGERROR("[ObjectProxy::invoke, objname:"<< _name << ", selectAdapterProxy is null]"<<endl);
  108. msg->response->iRet = TARSADAPTERNULL;
  109. doInvokeException(msg);
  110. return ;
  111. }
  112. msg->adapter = pAdapterProxy;
  113. //连接还没有建立, 暂时先放队列里面
  114. if(!msg->adapter->trans()->hasConnected())
  115. {
  116. bool bRet = _reqTimeoutQueue.push(msg, this->_servantProxy->tars_connect_timeout() + msg->iBeginTime);
  117. assert(bRet);
  118. //把数据缓存在obj里面
  119. TLOGTARS("[ObjectProxy::invoke, " << _name << ", select adapter proxy not connected (have not invoke reg)]" << endl);
  120. return;
  121. }
  122. pAdapterProxy->invoke(msg);
  123. }
  124. void ObjectProxy::prepareConnection(AdapterProxy *adapterProxy)
  125. {
  126. while(!_reqTimeoutQueue.empty())
  127. {
  128. TLOGTARS("[ObjectProxy::doInvoke, " << _name << ", conection queue pop size:" << _reqTimeoutQueue.size() << "]" << endl);
  129. ReqMessage * msg = NULL;
  130. _reqTimeoutQueue.pop(msg);
  131. assert(msg != NULL);
  132. //第一个请求包,adapter必然为NULL,如果需要hash,则重新选择一次
  133. if (msg->adapter == NULL && msg->data._hash)
  134. {
  135. //选取的adapter和之前的不一样(hash的原因), 需要重新选择一个远程服务的Adapter来调用
  136. _endpointManger->selectAdapterProxy(msg, adapterProxy);
  137. if (!adapterProxy)
  138. {
  139. //这里肯定是请求过主控
  140. TLOGERROR("[ObjectProxy::doInvoke, " << _name << ", selectAdapterProxy is null]" << endl);
  141. msg->response->iRet = TARSADAPTERNULL;
  142. doInvokeException(msg);
  143. return;
  144. }
  145. msg->adapter = adapterProxy;
  146. }
  147. else
  148. {
  149. msg->adapter = adapterProxy;
  150. }
  151. adapterProxy->invoke(msg);
  152. }
  153. }
  154. void ObjectProxy::onConnect(AdapterProxy *adapterProxy)
  155. {
  156. prepareConnection(adapterProxy);
  157. }
  158. void ObjectProxy::onNotifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive)
  159. {
  160. if(this->getRootServantProxy()) {
  161. this->getRootServantProxy()->onNotifyEndpoints(_communicatorEpoll, active, inactive);
  162. }
  163. }
  164. //主控查询到地址后过来的
  165. void ObjectProxy::doInvoke()
  166. {
  167. assert(this->getCommunicatorEpoll()->getThreadId() == this_thread::get_id());
  168. TLOGTARS("[ObjectProxy::doInvoke, objname:" << _name << ", begin...]" << endl);
  169. for(auto it = _reqTimeoutQueue.begin(); it != _reqTimeoutQueue.end(); ++it)
  170. {
  171. ReqMessage * msg = (*it).ptr;
  172. AdapterProxy* adapterProxy;
  173. //选择一个远程服务的Adapter来调用, selectAdapterProxy会发起连接
  174. _endpointManger->selectAdapterProxy(msg, adapterProxy);
  175. }
  176. }
  177. void ObjectProxy::doInvokeException(ReqMessage * msg)
  178. {
  179. //单向调用出现异常直接删除请求
  180. if(msg->eType == ReqMessage::ONE_WAY)
  181. {
  182. delete msg;
  183. return;
  184. }
  185. //标识请求异常
  186. msg->eStatus = ReqMessage::REQ_EXC;
  187. if(msg->eType == ReqMessage::SYNC_CALL)
  188. {
  189. if(!msg->sched)
  190. {
  191. assert(msg->pMonitor);
  192. msg->pMonitor->notify();
  193. }
  194. else
  195. {
  196. msg->sched->put(msg->iCoroId);
  197. }
  198. return;
  199. }
  200. if(msg->callback)
  201. {
  202. if(!msg->sched)
  203. {
  204. if(msg->callback->getNetThreadProcess())
  205. {
  206. ReqMessagePtr msgPtr = msg;
  207. //如果是本线程的回调,直接本线程处理
  208. //比如获取endpoint
  209. try
  210. {
  211. msg->callback->dispatch(msgPtr);
  212. }
  213. catch(exception & e)
  214. {
  215. TLOGERROR("[ObjectProxy::doInvokeException exp:"<<e.what()<<" ,line:"<<__LINE__<<endl);
  216. }
  217. catch(...)
  218. {
  219. TLOGERROR("[ObjectProxy::doInvokeException exp:unknown line:|"<<__LINE__<<endl);
  220. }
  221. }
  222. else
  223. {
  224. //先确保adapter 非null
  225. if (msg->adapter)
  226. {
  227. //异步回调,放入回调处理线程中
  228. _communicatorEpoll->pushAsyncThreadQueue(msg);
  229. }
  230. else
  231. {
  232. TLOGERROR("[ObjectProxy::doInvokeException push adapter is null|" << __LINE__ << endl);
  233. }
  234. }
  235. }
  236. else
  237. {
  238. CoroParallelBasePtr ptr = msg->callback->getCoroParallelBasePtr();
  239. if(ptr)
  240. {
  241. ptr->insert(msg);
  242. if(ptr->checkAllReqReturn())
  243. {
  244. msg->sched->put(msg->iCoroId);
  245. }
  246. }
  247. else
  248. {
  249. TLOGERROR("[ObjectProxy::doInvokeException coro parallel callback error,obj:" << _name << endl);
  250. delete msg;
  251. msg = NULL;
  252. }
  253. }
  254. }
  255. }
  256. void ObjectProxy::doTimeout()
  257. {
  258. if(!_hasInitialize)
  259. {
  260. return;
  261. }
  262. assert(this->getCommunicatorEpoll()->getThreadId() == this_thread::get_id());
  263. const vector<AdapterProxy*> & vAdapterProxy = _endpointManger->getAdapters();
  264. for(size_t iAdapter=0; iAdapter< vAdapterProxy.size();++iAdapter)
  265. {
  266. if(vAdapterProxy[iAdapter] != NULL)
  267. {
  268. vAdapterProxy[iAdapter]->doTimeout();
  269. }
  270. }
  271. ReqMessage * reqInfo = NULL;
  272. while(_reqTimeoutQueue.timeout(reqInfo))
  273. {
  274. reqInfo->response->iRet = TARSINVOKETIMEOUT;
  275. doInvokeException(reqInfo);
  276. }
  277. }
  278. void ObjectProxy::doKeepAlive()
  279. {
  280. if(!_hasInitialize)
  281. {
  282. return;
  283. }
  284. assert(this->getCommunicatorEpoll()->getThreadId() == this_thread::get_id());
  285. const vector<AdapterProxy*> & vAdapterProxy = _endpointManger->getAdapters();
  286. for(size_t iAdapter=0; iAdapter< vAdapterProxy.size();++iAdapter)
  287. {
  288. if(vAdapterProxy[iAdapter] != NULL)
  289. {
  290. vAdapterProxy[iAdapter]->doKeepAlive();
  291. }
  292. }
  293. }
  294. void ObjectProxy::mergeStat(map<StatMicMsgHead, StatMicMsgBody> & mStatMicMsg)
  295. {
  296. if(!_hasInitialize)
  297. {
  298. return;
  299. }
  300. assert(this->getCommunicatorEpoll()->getThreadId() == this_thread::get_id());
  301. const vector<AdapterProxy*> & vAdapterProxy = _endpointManger->getAdapters();
  302. for(size_t iAdapter=0; iAdapter< vAdapterProxy.size();++iAdapter)
  303. {
  304. if(vAdapterProxy[iAdapter] != NULL)
  305. {
  306. vAdapterProxy[iAdapter]->mergeStat(mStatMicMsg);
  307. }
  308. }
  309. }
  310. void ObjectProxy::onSetInactive(const EndpointInfo& ep)
  311. {
  312. if(!_hasInitialize)
  313. {
  314. return;
  315. }
  316. assert(this->getCommunicatorEpoll()->getThreadId() == this_thread::get_id());
  317. const vector<AdapterProxy*> & vAdapterProxy = _endpointManger->getAdapters();
  318. for(size_t iAdapter=0; iAdapter< vAdapterProxy.size();++iAdapter)
  319. {
  320. if(vAdapterProxy[iAdapter] != NULL)
  321. {
  322. if (vAdapterProxy[iAdapter]->endpoint() == ep)
  323. {
  324. vAdapterProxy[iAdapter]->onSetInactive();
  325. }
  326. }
  327. }
  328. }
  329. //////////////////////////////////////////////////////////////////////////////////
  330. }