123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429 |
- /**
- * Tencent is pleased to support the open source community by making Tars available.
- *
- * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
- *
- * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * https://opensource.org/licenses/BSD-3-Clause
- *
- * Unless required by applicable law or agreed to in writing, software distributed
- * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
- #include "servant/ObjectProxy.h"
- #include "servant/Communicator.h"
- #include "servant/Global.h"
- #include "servant/EndpointManager.h"
- #include "servant/AppCache.h"
- #include "util/tc_common.h"
- #include "util/tc_clientsocket.h"
- #include "servant/RemoteLogger.h"
- namespace tars
- {
- ////////////////////////////////////////////////////////////////////////////////////////////
- ObjectProxy::ObjectProxy(CommunicatorEpoll * pCommunicatorEpoll, const string & sObjectProxyName,const string& setName)
- : _communicatorEpoll(pCommunicatorEpoll)
- , _invokeSetId(setName)
- , _isInvokeBySet(false)
- // , _id(0)
- , _hasSetProtocol(false)
- , _conTimeout(1000)
- , _servantProxy(NULL)
- {
- string::size_type pos = sObjectProxyName.find_first_of('@');
- if(pos != string::npos)
- {
- _name = sObjectProxyName.substr(0,pos);
- _address = sObjectProxyName.substr(pos+1);
- }
- else
- {
- _name = sObjectProxyName;
- //启用set或者指定set调用
- if(ClientConfig::SetOpen || !_invokeSetId.empty())
- {
- //指定set调用时,指定set的优先级最高
- _invokeSetId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
- _isInvokeBySet = true;
- }
- }
- pos = _name.find_first_of('#');
- if(pos != string::npos)
- {
- _hash = _name.substr(pos+1);
- _name = _name.substr(0,pos);
- }
- _proxyProtocol.requestFunc = ProxyProtocol::tarsRequest;
- _proxyProtocol.responseFunc = ProxyProtocol::tarsResponse;
- _endpointManger.reset(new EndpointManager(this, _communicatorEpoll->getCommunicator(), sObjectProxyName, pCommunicatorEpoll->isFirstNetThread(), setName));
- }
- ObjectProxy::~ObjectProxy()
- {
- }
- void ObjectProxy::initialize()
- {
- }
- const vector<AdapterProxy*> & ObjectProxy::getAdapters()
- {
- return _endpointManger->getAdapters();
- }
- int ObjectProxy::loadLocator()
- {
- if(_endpointManger->getDirectProxy())
- {
- //直接连接
- return 0;
- }
- string locator = _communicatorEpoll->getCommunicator()->getProperty("locator");
- if (locator.find_first_not_of('@') == string::npos)
- {
- TLOGERROR("[Locator is not valid:" << locator << "]" << endl);
- return -1;
- }
- QueryFPrx prx = _communicatorEpoll->getCommunicator()->stringToProxy<QueryFPrx>(locator);
- _endpointManger->setLocatorPrx(prx);
- return 0;
- }
- void ObjectProxy::setPushCallbacks(const ServantProxyCallbackPtr& cb)
- {
- _pushCallback = cb;
- }
- ServantProxyCallbackPtr ObjectProxy::getPushCallback()
- {
- return _pushCallback;
- }
- void ObjectProxy::setProxyProtocol(const ProxyProtocol& protocol)
- {
- if(_hasSetProtocol)
- {
- return ;
- }
- _hasSetProtocol = true;
- _proxyProtocol = protocol;
- }
- ProxyProtocol& ObjectProxy::getProxyProtocol()
- {
- return _proxyProtocol;
- }
- void ObjectProxy::setSocketOpt(int level, int optname, const void *optval, SOCKET_LEN_TYPE optlen)
- {
- SocketOpt socketOpt;
- socketOpt.level = level;
- socketOpt.optname = optname;
- socketOpt.optval = optval;
- socketOpt.optlen = optlen;
- _socketOpts.push_back(socketOpt);
- }
- vector<SocketOpt>& ObjectProxy::getSocketOpt()
- {
- return _socketOpts;
- }
- //
- //bool ObjectProxy::invoke_sync(ReqMessage * msg)
- //{
- // TLOGTARS("[TARS][ObjectProxy::invoke_sync, " << _name << ", begin]" << endl);
- //
- // //选择一个远程服务的Adapter来调用
- // AdapterProxy * pAdapterProxy = NULL;
- // //选一个活的
- // _endpointManger->selectAdapterProxy(msg, pAdapterProxy, true);
- //
- // if(!pAdapterProxy)
- // {
- // return false;
- // }
- //
- // msg->adapter = pAdapterProxy;
- // return pAdapterProxy->invoke_sync(msg);
- //}
- void ObjectProxy::invoke(ReqMessage * msg)
- {
- TLOGTARS("[TARS][ObjectProxy::invoke, objname:" << _name << ", begin...]" << endl);
- //选择一个远程服务的Adapter来调用
- AdapterProxy * pAdapterProxy = NULL;
- bool bFirst = _endpointManger->selectAdapterProxy(msg, pAdapterProxy, false);
- if(bFirst)
- {
- //判断是否请求过主控
- bool bRet = _reqTimeoutQueue.push(msg,msg->request.iTimeout+msg->iBeginTime);
- assert(bRet);
- //把数据缓存在obj里面
- TLOGTARS("[TARS][ObjectProxy::invoke, objname:" << _name << ", select adapter proxy not valid (have not inovoke reg)]" << endl);
- return;
- }
- if(!pAdapterProxy)
- {
- TLOGERROR("[TARS][ObjectProxy::invoke, objname:"<< _name << ", selectAdapterProxy is null]"<<endl);
- msg->response->iRet = TARSADAPTERNULL;
- doInvokeException(msg);
- return ;
- }
- msg->adapter = pAdapterProxy;
- //连接还没有建立, 暂时先放队列里面
- if(!msg->adapter->getTransceiver()->hasConnected())
- {
- bool bRet = _reqTimeoutQueue.push(msg,msg->request.iTimeout+msg->iBeginTime);
- assert(bRet);
- //把数据缓存在obj里面
- TLOGTARS("[TARS][ObjectProxy::invoke, " << _name << ", select adapter proxy not connected (have not inovoke reg)]" << endl);
- return;
- }
- pAdapterProxy->invoke(msg);
- }
- void ObjectProxy::onConnect(AdapterProxy *adapterProxy)
- {
- while(!_reqTimeoutQueue.empty())
- {
- TLOGTARS("[TARS][ObjectProxy::doInvoke, " << _name << ", pop...]" << endl);
- ReqMessage * msg = NULL;
- _reqTimeoutQueue.pop(msg);
- assert(msg != NULL);
- if(msg->adapter != NULL && msg->adapter != adapterProxy)
- {
- //选择一个远程服务的Adapter来调用
- _endpointManger->selectAdapterProxy(msg, adapterProxy, false);
- if (!adapterProxy)
- {
- //这里肯定是请求过主控
- TLOGERROR("[TARS][ObjectProxy::doInvoke, " << _name << ", selectAdapterProxy is null]" << endl);
- msg->response->iRet = TARSADAPTERNULL;
- doInvokeException(msg);
- return;
- }
- msg->adapter = adapterProxy;
- }
- else
- {
- msg->adapter = adapterProxy;
- }
- adapterProxy->invoke(msg);
- }
- }
- void ObjectProxy::onNotifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive)
- {
- if(_servantProxy) {
- _servantProxy->onNotifyEndpoints(active, inactive);
- }
- }
- void ObjectProxy::doInvoke()
- {
- TLOGTARS("[TARS][ObjectProxy::doInvoke, objname:" << _name << ", begin...]" << endl);
- for(auto it = _reqTimeoutQueue.begin(); it != _reqTimeoutQueue.end(); ++it)
- {
- ReqMessage * msg = (*it).ptr;
- AdapterProxy* adapterProxy;
- //选择一个远程服务的Adapter来调用, selectAdapterProxy会发起连接
- _endpointManger->selectAdapterProxy(msg, adapterProxy, false);
- }
- //
- // while(!_reqTimeoutQueue.empty())
- // {
- // TLOGTARS("[TARS][ObjectProxy::doInvoke, " << _name << ", pop...]" << endl);
- //
- // ReqMessage * msg = NULL;
- // _reqTimeoutQueue.pop(msg);
- //
- // assert(msg != NULL);
- //
- // AdapterProxy* adapterProxy;
- //
- // //选择一个远程服务的Adapter来调用
- // _endpointManger->selectAdapterProxy(msg, adapterProxy, false);
- //
- // if (!adapterProxy) {
- // //这里肯定是请求过主控
- // TLOGERROR("[TARS][ObjectProxy::doInvoke, " << _name << ", selectAdapterProxy is null]" << endl);
- // msg->response->iRet = JCEADAPTERNULL;
- // doInvokeException(msg);
- // return;
- // }
- //
- // msg->adapter = adapterProxy;
- //
- // adapterProxy->invoke(msg);
- // }
- }
- void ObjectProxy::doInvokeException(ReqMessage * msg)
- {
- // TLOGTARS("[TARS][ObjectProxy::doInvokeException, objname:" << _name << "]" << endl);
- //单向调用出现异常直接删除请求
- if(msg->eType == ReqMessage::ONE_WAY)
- {
- delete msg;
- return;
- }
- //标识请求异常
- msg->eStatus = ReqMessage::REQ_EXC;
- if(msg->eType == ReqMessage::SYNC_CALL)
- {
- if(!msg->bCoroFlag)
- {
- assert(msg->pMonitor);
- TC_ThreadLock::Lock sync(*(msg->pMonitor));
- msg->pMonitor->notify();
- msg->bMonitorFin = true;
- }
- else
- {
- msg->sched->put(msg->iCoroId);
- }
- return;
- }
- if(msg->callback)
- {
- if(!msg->bCoroFlag)
- {
- if(msg->callback->getNetThreadProcess())
- {
- ReqMessagePtr msgPtr = msg;
- //如果是本线程的回调,直接本线程处理
- //比如获取endpoint
- try
- {
- msg->callback->dispatch(msgPtr);
- }
- catch(exception & e)
- {
- TLOGERROR("[TARS]ObjectProxy::doInvokeException exp:" << e.what() << " ,line:" << __LINE__ << endl);
- }
- catch(...)
- {
- TLOGERROR("[TARS]ObjectProxy::doInvokeException exp:unknown line:|" << __LINE__ << endl);
- }
- }
- else
- {
- //异步回调,放入回调处理线程中
- _communicatorEpoll->pushAsyncThreadQueue(msg);
- }
- }
- else
- {
- CoroParallelBasePtr ptr = msg->callback->getCoroParallelBasePtr();
- if(ptr)
- {
- ptr->insert(msg);
- if(ptr->checkAllReqReturn())
- {
- msg->sched->put(msg->iCoroId);
- }
- }
- else
- {
- TLOGERROR("[TARS]ObjectProxy::doInvokeException coro parallel callback error, objname:" << _name << endl);
- delete msg;
- }
- }
- }
- }
- void ObjectProxy::doTimeout()
- {
- const vector<AdapterProxy*> & vAdapterProxy = _endpointManger->getAdapters();
- for(size_t iAdapter=0; iAdapter< vAdapterProxy.size();++iAdapter)
- {
- vAdapterProxy[iAdapter]->doTimeout();
- }
- ReqMessage * reqInfo = NULL;
- while(_reqTimeoutQueue.timeout(reqInfo))
- {
- TLOGERROR("[TARS][ObjectProxy::doTimeout, objname:" << _name << ", queue timeout error]" << endl);
- reqInfo->response->iRet = TARSINVOKETIMEOUT;
- doInvokeException(reqInfo);
- }
- }
- void ObjectProxy::mergeStat(map<StatMicMsgHead, StatMicMsgBody> & mStatMicMsg)
- {
- const vector<AdapterProxy*> & vAdapterProxy = _endpointManger->getAdapters();
- for(size_t iAdapter=0; iAdapter< vAdapterProxy.size();++iAdapter)
- {
- vAdapterProxy[iAdapter]->mergeStat(mStatMicMsg);
- }
- }
- void ObjectProxy::onSetInactive(const EndpointInfo& ep)
- {
- const vector<AdapterProxy*> & vAdapterProxy = _endpointManger->getAdapters();
- for(size_t iAdapter=0; iAdapter< vAdapterProxy.size();++iAdapter)
- {
- if(vAdapterProxy[iAdapter]->endpoint() == ep)
- {
- vAdapterProxy[iAdapter]->onSetInactive();
- }
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////
- }
|