ObjectProxy.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/ObjectProxy.h"
  17. #include "servant/Communicator.h"
  18. #include "servant/Global.h"
  19. #include "servant/EndpointManager.h"
  20. #include "servant/AppCache.h"
  21. #include "util/tc_common.h"
  22. #include "util/tc_clientsocket.h"
  23. #include "servant/RemoteLogger.h"
  24. namespace tars
  25. {
  26. ////////////////////////////////////////////////////////////////////////////////////////////
  27. ObjectProxy::ObjectProxy(CommunicatorEpoll * pCommunicatorEpoll, const string & sObjectProxyName,const string& setName)
  28. : _communicatorEpoll(pCommunicatorEpoll)
  29. , _invokeSetId(setName)
  30. , _isInvokeBySet(false)
  31. // , _id(0)
  32. , _hasSetProtocol(false)
  33. , _conTimeout(1000)
  34. , _servantProxy(NULL)
  35. {
  36. string::size_type pos = sObjectProxyName.find_first_of('@');
  37. if(pos != string::npos)
  38. {
  39. _name = sObjectProxyName.substr(0,pos);
  40. _address = sObjectProxyName.substr(pos+1);
  41. }
  42. else
  43. {
  44. _name = sObjectProxyName;
  45. //启用set或者指定set调用
  46. if(ClientConfig::SetOpen || !_invokeSetId.empty())
  47. {
  48. //指定set调用时,指定set的优先级最高
  49. _invokeSetId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
  50. _isInvokeBySet = true;
  51. }
  52. }
  53. pos = _name.find_first_of('#');
  54. if(pos != string::npos)
  55. {
  56. _hash = _name.substr(pos+1);
  57. _name = _name.substr(0,pos);
  58. }
  59. _proxyProtocol.requestFunc = ProxyProtocol::tarsRequest;
  60. _proxyProtocol.responseFunc = ProxyProtocol::tarsResponse;
  61. _endpointManger.reset(new EndpointManager(this, _communicatorEpoll->getCommunicator(), sObjectProxyName, pCommunicatorEpoll->isFirstNetThread(), setName));
  62. }
  63. ObjectProxy::~ObjectProxy()
  64. {
  65. }
  66. void ObjectProxy::initialize()
  67. {
  68. }
  69. const vector<AdapterProxy*> & ObjectProxy::getAdapters()
  70. {
  71. return _endpointManger->getAdapters();
  72. }
  73. int ObjectProxy::loadLocator()
  74. {
  75. if(_endpointManger->getDirectProxy())
  76. {
  77. //直接连接
  78. return 0;
  79. }
  80. string locator = _communicatorEpoll->getCommunicator()->getProperty("locator");
  81. if (locator.find_first_not_of('@') == string::npos)
  82. {
  83. TLOGERROR("[Locator is not valid:" << locator << "]" << endl);
  84. return -1;
  85. }
  86. QueryFPrx prx = _communicatorEpoll->getCommunicator()->stringToProxy<QueryFPrx>(locator);
  87. _endpointManger->setLocatorPrx(prx);
  88. return 0;
  89. }
  90. void ObjectProxy::setPushCallbacks(const ServantProxyCallbackPtr& cb)
  91. {
  92. _pushCallback = cb;
  93. }
  94. ServantProxyCallbackPtr ObjectProxy::getPushCallback()
  95. {
  96. return _pushCallback;
  97. }
  98. void ObjectProxy::setProxyProtocol(const ProxyProtocol& protocol)
  99. {
  100. if(_hasSetProtocol)
  101. {
  102. return ;
  103. }
  104. _hasSetProtocol = true;
  105. _proxyProtocol = protocol;
  106. }
  107. ProxyProtocol& ObjectProxy::getProxyProtocol()
  108. {
  109. return _proxyProtocol;
  110. }
  111. void ObjectProxy::setSocketOpt(int level, int optname, const void *optval, SOCKET_LEN_TYPE optlen)
  112. {
  113. SocketOpt socketOpt;
  114. socketOpt.level = level;
  115. socketOpt.optname = optname;
  116. socketOpt.optval = optval;
  117. socketOpt.optlen = optlen;
  118. _socketOpts.push_back(socketOpt);
  119. }
  120. vector<SocketOpt>& ObjectProxy::getSocketOpt()
  121. {
  122. return _socketOpts;
  123. }
  124. //
  125. //bool ObjectProxy::invoke_sync(ReqMessage * msg)
  126. //{
  127. // TLOGTARS("[TARS][ObjectProxy::invoke_sync, " << _name << ", begin]" << endl);
  128. //
  129. // //选择一个远程服务的Adapter来调用
  130. // AdapterProxy * pAdapterProxy = NULL;
  131. // //选一个活的
  132. // _endpointManger->selectAdapterProxy(msg, pAdapterProxy, true);
  133. //
  134. // if(!pAdapterProxy)
  135. // {
  136. // return false;
  137. // }
  138. //
  139. // msg->adapter = pAdapterProxy;
  140. // return pAdapterProxy->invoke_sync(msg);
  141. //}
  142. void ObjectProxy::invoke(ReqMessage * msg)
  143. {
  144. TLOGTARS("[TARS][ObjectProxy::invoke, objname:" << _name << ", begin...]" << endl);
  145. //选择一个远程服务的Adapter来调用
  146. AdapterProxy * pAdapterProxy = NULL;
  147. bool bFirst = _endpointManger->selectAdapterProxy(msg, pAdapterProxy, false);
  148. if(bFirst)
  149. {
  150. //判断是否请求过主控
  151. bool bRet = _reqTimeoutQueue.push(msg,msg->request.iTimeout+msg->iBeginTime);
  152. assert(bRet);
  153. //把数据缓存在obj里面
  154. TLOGTARS("[TARS][ObjectProxy::invoke, objname:" << _name << ", select adapter proxy not valid (have not inovoke reg)]" << endl);
  155. return;
  156. }
  157. if(!pAdapterProxy)
  158. {
  159. TLOGERROR("[TARS][ObjectProxy::invoke, objname:"<< _name << ", selectAdapterProxy is null]"<<endl);
  160. msg->response->iRet = TARSADAPTERNULL;
  161. doInvokeException(msg);
  162. return ;
  163. }
  164. msg->adapter = pAdapterProxy;
  165. //连接还没有建立, 暂时先放队列里面
  166. if(!msg->adapter->getTransceiver()->hasConnected())
  167. {
  168. bool bRet = _reqTimeoutQueue.push(msg,msg->request.iTimeout+msg->iBeginTime);
  169. assert(bRet);
  170. //把数据缓存在obj里面
  171. TLOGTARS("[TARS][ObjectProxy::invoke, " << _name << ", select adapter proxy not connected (have not inovoke reg)]" << endl);
  172. return;
  173. }
  174. pAdapterProxy->invoke(msg);
  175. }
  176. void ObjectProxy::onConnect(AdapterProxy *adapterProxy)
  177. {
  178. while(!_reqTimeoutQueue.empty())
  179. {
  180. TLOGTARS("[TARS][ObjectProxy::doInvoke, " << _name << ", pop...]" << endl);
  181. ReqMessage * msg = NULL;
  182. _reqTimeoutQueue.pop(msg);
  183. assert(msg != NULL);
  184. if(msg->adapter != NULL && msg->adapter != adapterProxy)
  185. {
  186. //选择一个远程服务的Adapter来调用
  187. _endpointManger->selectAdapterProxy(msg, adapterProxy, false);
  188. if (!adapterProxy)
  189. {
  190. //这里肯定是请求过主控
  191. TLOGERROR("[TARS][ObjectProxy::doInvoke, " << _name << ", selectAdapterProxy is null]" << endl);
  192. msg->response->iRet = TARSADAPTERNULL;
  193. doInvokeException(msg);
  194. return;
  195. }
  196. msg->adapter = adapterProxy;
  197. }
  198. else
  199. {
  200. msg->adapter = adapterProxy;
  201. }
  202. adapterProxy->invoke(msg);
  203. }
  204. }
  205. void ObjectProxy::onNotifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive)
  206. {
  207. if(_servantProxy) {
  208. _servantProxy->onNotifyEndpoints(active, inactive);
  209. }
  210. }
  211. void ObjectProxy::doInvoke()
  212. {
  213. TLOGTARS("[TARS][ObjectProxy::doInvoke, objname:" << _name << ", begin...]" << endl);
  214. for(auto it = _reqTimeoutQueue.begin(); it != _reqTimeoutQueue.end(); ++it)
  215. {
  216. ReqMessage * msg = (*it).ptr;
  217. AdapterProxy* adapterProxy;
  218. //选择一个远程服务的Adapter来调用, selectAdapterProxy会发起连接
  219. _endpointManger->selectAdapterProxy(msg, adapterProxy, false);
  220. }
  221. //
  222. // while(!_reqTimeoutQueue.empty())
  223. // {
  224. // TLOGTARS("[TARS][ObjectProxy::doInvoke, " << _name << ", pop...]" << endl);
  225. //
  226. // ReqMessage * msg = NULL;
  227. // _reqTimeoutQueue.pop(msg);
  228. //
  229. // assert(msg != NULL);
  230. //
  231. // AdapterProxy* adapterProxy;
  232. //
  233. // //选择一个远程服务的Adapter来调用
  234. // _endpointManger->selectAdapterProxy(msg, adapterProxy, false);
  235. //
  236. // if (!adapterProxy) {
  237. // //这里肯定是请求过主控
  238. // TLOGERROR("[TARS][ObjectProxy::doInvoke, " << _name << ", selectAdapterProxy is null]" << endl);
  239. // msg->response->iRet = JCEADAPTERNULL;
  240. // doInvokeException(msg);
  241. // return;
  242. // }
  243. //
  244. // msg->adapter = adapterProxy;
  245. //
  246. // adapterProxy->invoke(msg);
  247. // }
  248. }
  249. void ObjectProxy::doInvokeException(ReqMessage * msg)
  250. {
  251. // TLOGTARS("[TARS][ObjectProxy::doInvokeException, objname:" << _name << "]" << endl);
  252. //单向调用出现异常直接删除请求
  253. if(msg->eType == ReqMessage::ONE_WAY)
  254. {
  255. delete msg;
  256. return;
  257. }
  258. //标识请求异常
  259. msg->eStatus = ReqMessage::REQ_EXC;
  260. if(msg->eType == ReqMessage::SYNC_CALL)
  261. {
  262. if(!msg->bCoroFlag)
  263. {
  264. assert(msg->pMonitor);
  265. TC_ThreadLock::Lock sync(*(msg->pMonitor));
  266. msg->pMonitor->notify();
  267. msg->bMonitorFin = true;
  268. }
  269. else
  270. {
  271. msg->sched->put(msg->iCoroId);
  272. }
  273. return;
  274. }
  275. if(msg->callback)
  276. {
  277. if(!msg->bCoroFlag)
  278. {
  279. if(msg->callback->getNetThreadProcess())
  280. {
  281. ReqMessagePtr msgPtr = msg;
  282. //如果是本线程的回调,直接本线程处理
  283. //比如获取endpoint
  284. try
  285. {
  286. msg->callback->dispatch(msgPtr);
  287. }
  288. catch(exception & e)
  289. {
  290. TLOGERROR("[TARS]ObjectProxy::doInvokeException exp:" << e.what() << " ,line:" << __LINE__ << endl);
  291. }
  292. catch(...)
  293. {
  294. TLOGERROR("[TARS]ObjectProxy::doInvokeException exp:unknown line:|" << __LINE__ << endl);
  295. }
  296. }
  297. else
  298. {
  299. //异步回调,放入回调处理线程中
  300. _communicatorEpoll->pushAsyncThreadQueue(msg);
  301. }
  302. }
  303. else
  304. {
  305. CoroParallelBasePtr ptr = msg->callback->getCoroParallelBasePtr();
  306. if(ptr)
  307. {
  308. ptr->insert(msg);
  309. if(ptr->checkAllReqReturn())
  310. {
  311. msg->sched->put(msg->iCoroId);
  312. }
  313. }
  314. else
  315. {
  316. TLOGERROR("[TARS]ObjectProxy::doInvokeException coro parallel callback error, objname:" << _name << endl);
  317. delete msg;
  318. }
  319. }
  320. }
  321. }
  322. void ObjectProxy::doTimeout()
  323. {
  324. const vector<AdapterProxy*> & vAdapterProxy = _endpointManger->getAdapters();
  325. for(size_t iAdapter=0; iAdapter< vAdapterProxy.size();++iAdapter)
  326. {
  327. vAdapterProxy[iAdapter]->doTimeout();
  328. }
  329. ReqMessage * reqInfo = NULL;
  330. while(_reqTimeoutQueue.timeout(reqInfo))
  331. {
  332. TLOGERROR("[TARS][ObjectProxy::doTimeout, objname:" << _name << ", queue timeout error]" << endl);
  333. reqInfo->response->iRet = TARSINVOKETIMEOUT;
  334. doInvokeException(reqInfo);
  335. }
  336. }
  337. void ObjectProxy::mergeStat(map<StatMicMsgHead, StatMicMsgBody> & mStatMicMsg)
  338. {
  339. const vector<AdapterProxy*> & vAdapterProxy = _endpointManger->getAdapters();
  340. for(size_t iAdapter=0; iAdapter< vAdapterProxy.size();++iAdapter)
  341. {
  342. vAdapterProxy[iAdapter]->mergeStat(mStatMicMsg);
  343. }
  344. }
  345. void ObjectProxy::onSetInactive(const EndpointInfo& ep)
  346. {
  347. const vector<AdapterProxy*> & vAdapterProxy = _endpointManger->getAdapters();
  348. for(size_t iAdapter=0; iAdapter< vAdapterProxy.size();++iAdapter)
  349. {
  350. if(vAdapterProxy[iAdapter]->endpoint() == ep)
  351. {
  352. vAdapterProxy[iAdapter]->onSetInactive();
  353. }
  354. }
  355. }
  356. ////////////////////////////////////////////////////////////////////////////////////////////////
  357. }