|
@@ -20,13 +20,13 @@
|
|
#include "servant/Application.h"
|
|
#include "servant/Application.h"
|
|
#include "servant/AdminF.h"
|
|
#include "servant/AdminF.h"
|
|
#include "servant/AppCache.h"
|
|
#include "servant/AppCache.h"
|
|
-#include "servant/TarsLogger.h"
|
|
|
|
|
|
+#include "servant/RemoteLogger.h"
|
|
#include "tup/tup.h"
|
|
#include "tup/tup.h"
|
|
#include "servant/StatF.h"
|
|
#include "servant/StatF.h"
|
|
#include "servant/StatReport.h"
|
|
#include "servant/StatReport.h"
|
|
#include "util/tc_http2.h"
|
|
#include "util/tc_http2.h"
|
|
// #include "util/tc_http2clientmgr.h"
|
|
// #include "util/tc_http2clientmgr.h"
|
|
-#ifdef _USE_OPENTRACKING
|
|
|
|
|
|
+#ifdef TARS_OPENTRACKING
|
|
#include "servant/text_map_carrier.h"
|
|
#include "servant/text_map_carrier.h"
|
|
#endif
|
|
#endif
|
|
|
|
|
|
@@ -50,11 +50,11 @@ AdapterProxy::AdapterProxy(ObjectProxy * pObjectProxy,const EndpointInfo &ep,Com
|
|
, _connTimeout(false)
|
|
, _connTimeout(false)
|
|
, _connExc(false)
|
|
, _connExc(false)
|
|
, _connExcCnt(0)
|
|
, _connExcCnt(0)
|
|
-, _staticWeight(0)
|
|
|
|
|
|
+//, _staticWeight(0)
|
|
, _timeoutLogFlag(false)
|
|
, _timeoutLogFlag(false)
|
|
, _noSendQueueLimit(1000)
|
|
, _noSendQueueLimit(1000)
|
|
-, _maxSampleCount(1000)
|
|
|
|
-, _sampleRate(0)
|
|
|
|
|
|
+//, _maxSampleCount(1000)
|
|
|
|
+//, _sampleRate(0)
|
|
, _id((++_idGen))
|
|
, _id((++_idGen))
|
|
{
|
|
{
|
|
_timeoutQueue.reset(new TC_TimeoutQueueNew<ReqMessage*>());
|
|
_timeoutQueue.reset(new TC_TimeoutQueueNew<ReqMessage*>());
|
|
@@ -64,10 +64,10 @@ AdapterProxy::AdapterProxy(ObjectProxy * pObjectProxy,const EndpointInfo &ep,Com
|
|
_noSendQueueLimit = pObjectProxy->getCommunicatorEpoll()->getNoSendQueueLimit();
|
|
_noSendQueueLimit = pObjectProxy->getCommunicatorEpoll()->getNoSendQueueLimit();
|
|
}
|
|
}
|
|
|
|
|
|
- if(_communicator)
|
|
|
|
- {
|
|
|
|
- _timeoutLogFlag = _communicator->getTimeoutLogFlag();
|
|
|
|
- }
|
|
|
|
|
|
+ // if(_communicator)
|
|
|
|
+ // {
|
|
|
|
+ // _timeoutLogFlag = pObjectProxy->getCommunicatorEpoll()->getTimeoutLogFlag();
|
|
|
|
+ // }
|
|
|
|
|
|
if (ep.isTcp())
|
|
if (ep.isTcp())
|
|
{
|
|
{
|
|
@@ -92,7 +92,6 @@ AdapterProxy::~AdapterProxy()
|
|
// adapterProxy->checkActive(true);
|
|
// adapterProxy->checkActive(true);
|
|
// return adapterProxy;
|
|
// return adapterProxy;
|
|
//}
|
|
//}
|
|
-
|
|
|
|
string AdapterProxy::getSlaveName(const string& sSlaveName)
|
|
string AdapterProxy::getSlaveName(const string& sSlaveName)
|
|
{
|
|
{
|
|
string::size_type pos = sSlaveName.find(".");
|
|
string::size_type pos = sSlaveName.find(".");
|
|
@@ -110,66 +109,113 @@ string AdapterProxy::getSlaveName(const string& sSlaveName)
|
|
|
|
|
|
void AdapterProxy::initStatHead()
|
|
void AdapterProxy::initStatHead()
|
|
{
|
|
{
|
|
- vector <string> v;
|
|
|
|
- if(!ClientConfig::SetDivision.empty() &&
|
|
|
|
- StatReport::divison2SetInfo(ClientConfig::SetDivision, v)) //主调(client)启用set
|
|
|
|
- {
|
|
|
|
- _statHead.masterName = StatReport::trimAndLimitStr(ClientConfig::ModuleName + "." + v[0] + v[1] + v[2] + "@" + ClientConfig::TarsVersion, StatReport::MAX_MASTER_NAME_LEN);
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- _statHead.masterName = StatReport::trimAndLimitStr(ClientConfig::ModuleName + "@" + ClientConfig::TarsVersion, StatReport::MAX_MASTER_NAME_LEN);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- const string sSlaveName = getSlaveName(_objectProxy->name());
|
|
|
|
- string sSlaveSet = _endpoint.setDivision();
|
|
|
|
- if(!sSlaveSet.empty() &&
|
|
|
|
- StatReport::divison2SetInfo(sSlaveSet, v)) //被调启用set
|
|
|
|
- {
|
|
|
|
- _statHead.slaveSetName = v[0];
|
|
|
|
- _statHead.slaveSetArea = v[1];
|
|
|
|
- _statHead.slaveSetID = v[2];
|
|
|
|
- _statHead.slaveName = StatReport::trimAndLimitStr(sSlaveName + "." + v[0] + v[1] + v[2], StatReport::MAX_MASTER_NAME_LEN);
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- _statHead.slaveName = StatReport::trimAndLimitStr(sSlaveName, StatReport::MAX_MASTER_NAME_LEN);
|
|
|
|
- }
|
|
|
|
|
|
+ vector <string> v;
|
|
|
|
+ if(!ClientConfig::SetDivision.empty() &&
|
|
|
|
+ StatReport::divison2SetInfo(ClientConfig::SetDivision, v)) //主调(client)启用set
|
|
|
|
+ {
|
|
|
|
+ _statHead.masterName = StatReport::trimAndLimitStr(ClientConfig::ModuleName + "." + v[0] + v[1] + v[2] + "@" + ClientConfig::TarsVersion, StatReport::MAX_MASTER_NAME_LEN);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ _statHead.masterName = StatReport::trimAndLimitStr(ClientConfig::ModuleName + "@" + ClientConfig::TarsVersion, StatReport::MAX_MASTER_NAME_LEN);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ const string sSlaveName = getSlaveName(_objectProxy->name());
|
|
|
|
+ string sSlaveSet = _endpoint.setDivision();
|
|
|
|
+ if(!sSlaveSet.empty() &&
|
|
|
|
+ StatReport::divison2SetInfo(sSlaveSet, v)) //被调启用set
|
|
|
|
+ {
|
|
|
|
+ _statHead.slaveSetName = v[0];
|
|
|
|
+ _statHead.slaveSetArea = v[1];
|
|
|
|
+ _statHead.slaveSetID = v[2];
|
|
|
|
+ _statHead.slaveName = StatReport::trimAndLimitStr(sSlaveName + "." + v[0] + v[1] + v[2], StatReport::MAX_MASTER_NAME_LEN);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ _statHead.slaveName = StatReport::trimAndLimitStr(sSlaveName, StatReport::MAX_MASTER_NAME_LEN);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ _statHead.slaveIp = StatReport::trimAndLimitStr(_endpoint.host(), StatReport::MAX_MASTER_IP_LEN);
|
|
|
|
+ _statHead.slavePort = _endpoint.port();
|
|
|
|
+ _statHead.returnValue = 0;
|
|
|
|
+}
|
|
|
|
|
|
- _statHead.slaveIp = StatReport::trimAndLimitStr(_endpoint.host(), StatReport::MAX_MASTER_IP_LEN);
|
|
|
|
- _statHead.slavePort = _endpoint.port();
|
|
|
|
- _statHead.returnValue = 0;
|
|
|
|
|
|
+//bool AdapterProxy::invoke_sync(ReqMessage * msg)
|
|
|
|
+//{
|
|
|
|
+// if(!_trans->hasConnected()) {
|
|
|
|
+// TLOGTARS("[TARS][AdapterProxy::invoke_sync " << _objectProxy->name() << ", " << _endpoint.desc() << "]" << endl);
|
|
|
|
+// return false;
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// //生成requestid
|
|
|
|
+// //taf调用 而且 不是单向调用
|
|
|
|
+// if (!msg->bFromRpc)
|
|
|
|
+// {
|
|
|
|
+// // msg->request.iRequestId = _objectProxy->generateId();
|
|
|
|
+// msg->request.iRequestId = _timeoutQueue->generateId();
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// msg->sReqData->setBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans));
|
|
|
|
+//
|
|
|
|
+// msg->response = std::make_shared<ResponsePacket>();
|
|
|
|
+//
|
|
|
|
+// //这里得加锁
|
|
|
|
+// std::lock_guard<std::mutex> lock(_mutex);
|
|
|
|
+//
|
|
|
|
+// _trans->sendRecv(msg);
|
|
|
|
+//
|
|
|
|
+// return true;
|
|
|
|
+//}
|
|
|
|
+
|
|
|
|
+void AdapterProxy::onConnect()
|
|
|
|
+{
|
|
|
|
+ _objectProxy->onConnect(this);
|
|
}
|
|
}
|
|
|
|
|
|
-int AdapterProxy::invoke(ReqMessage * msg)
|
|
|
|
|
|
+int AdapterProxy::invoke_connection_serial(ReqMessage * msg)
|
|
{
|
|
{
|
|
- assert(_trans != NULL);
|
|
|
|
|
|
+ assert(msg->eType != ReqMessage::ONE_WAY);
|
|
|
|
|
|
- TLOGTARS("[TARS][AdapterProxy::invoke objname:" << _objectProxy->name() << ",desc:" << _endpoint.desc() << endl);
|
|
|
|
|
|
+ msg->sReqData->setBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans.get()));
|
|
|
|
|
|
- //未发链表有长度限制
|
|
|
|
- if(_timeoutQueue->getSendListSize() >= _noSendQueueLimit)
|
|
|
|
- {
|
|
|
|
- TLOGERROR("[TARS][AdapterProxy::invoke fail,ReqInfoQueue.size > " << _noSendQueueLimit << ",objname:" << _objectProxy->name() <<",desc:"<< _endpoint.desc() << endl);
|
|
|
|
- msg->eStatus = ReqMessage::REQ_EXC;
|
|
|
|
|
|
+ msg->request.iRequestId = _timeoutQueue->generateId();
|
|
|
|
|
|
- finishInvoke(msg);
|
|
|
|
|
|
+ if(!_requestMsg && _timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData) != Transceiver::eRetError)
|
|
|
|
+ {
|
|
|
|
+ TLOGTARS("[TARS][AdapterProxy::invoke push (send) obj: " << _objectProxy->name() << ", desc:" << _endpoint.desc() << ", id: " << msg->request.iRequestId << endl);
|
|
|
|
|
|
- return 0;
|
|
|
|
- }
|
|
|
|
|
|
+ _requestMsg = msg;
|
|
|
|
|
|
- //生成requestid
|
|
|
|
- //tars调用 而且 不是单向调用
|
|
|
|
- if(!msg->bFromRpc)
|
|
|
|
- {
|
|
|
|
- msg->request.iRequestId = _timeoutQueue->generateId();
|
|
|
|
- }
|
|
|
|
|
|
+ bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
|
|
|
|
+ if (!bFlag)
|
|
|
|
+ {
|
|
|
|
+ TLOGERROR("[TARS][AdapterProxy::invoke fail1 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ",id: " << msg->request.iRequestId << "," << _objectProxy->name() << ", " << _endpoint.desc() << "]" << endl);
|
|
|
|
+ msg->eStatus = ReqMessage::REQ_EXC;
|
|
|
|
|
|
-#ifdef _USE_OPENTRACKING
|
|
|
|
- startTrack(msg);
|
|
|
|
-#endif
|
|
|
|
|
|
+ finishInvoke(msg);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ TLOGTARS("[TARS][AdapterProxy::invoke push (no send) " << _objectProxy->name() << ", " << _endpoint.desc() << ",id " << msg->request.iRequestId << endl);
|
|
|
|
+
|
|
|
|
+ //之前还没有数据没发送 或者 请求发送失败了, 进队列
|
|
|
|
+ bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime, false);
|
|
|
|
+ if (!bFlag)
|
|
|
|
+ {
|
|
|
|
+ TLOGERROR("[TARS][AdapterProxy::invoke fail2 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ", id: " << msg->request.iRequestId << ", " << _objectProxy->name() << ", " << _endpoint.desc() << "]" << endl);
|
|
|
|
+ msg->eStatus = ReqMessage::REQ_EXC;
|
|
|
|
|
|
- msg->sReqData->setBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans.get()));
|
|
|
|
|
|
+ finishInvoke(msg);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return 0;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int AdapterProxy::invoke_connection_parallel(ReqMessage * msg)
|
|
|
|
+{
|
|
|
|
+ msg->sReqData->setBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans.get()));
|
|
|
|
|
|
// TLOGERROR("[TARS][AdapterProxy::invoke insert timeout queue fail, queue size:" << _timeoutQueue->size() << ", id:" << msg->request.iRequestId << ", obj:" <<_objectProxy->name() << ", desc:" << _endpoint.desc() <<endl);
|
|
// TLOGERROR("[TARS][AdapterProxy::invoke insert timeout queue fail, queue size:" << _timeoutQueue->size() << ", id:" << msg->request.iRequestId << ", obj:" <<_objectProxy->name() << ", desc:" << _endpoint.desc() <<endl);
|
|
|
|
|
|
@@ -181,7 +227,7 @@ int AdapterProxy::invoke(ReqMessage * msg)
|
|
//请求发送成功了,单向调用直接返回
|
|
//请求发送成功了,单向调用直接返回
|
|
if(msg->eType == ReqMessage::ONE_WAY)
|
|
if(msg->eType == ReqMessage::ONE_WAY)
|
|
{
|
|
{
|
|
- #ifdef _USE_OPENTRACKING
|
|
|
|
|
|
+ #ifdef TARS_OPENTRACKING
|
|
finishTrack(msg);
|
|
finishTrack(msg);
|
|
#endif
|
|
#endif
|
|
|
|
|
|
@@ -218,24 +264,121 @@ int AdapterProxy::invoke(ReqMessage * msg)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-#ifdef _USE_OPENTRACKING
|
|
|
|
- if(msg->eType == ReqMessage::ONE_WAY)
|
|
|
|
|
|
+ return 0;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int AdapterProxy::invoke(ReqMessage * msg)
|
|
|
|
+{
|
|
|
|
+ assert(_trans != NULL);
|
|
|
|
+
|
|
|
|
+ TLOGTARS("[TARS][AdapterProxy::invoke " << _objectProxy->name() << ", " << _endpoint.desc() << "]" << endl);
|
|
|
|
+
|
|
|
|
+ //未发链表有长度限制
|
|
|
|
+ if (_timeoutQueue->getSendListSize() >= _noSendQueueLimit)
|
|
{
|
|
{
|
|
- finishTrack(msg);
|
|
|
|
|
|
+ TLOGERROR("[TARS][AdapterProxy::invoke fail,ReqInfoQueue.size>" << _noSendQueueLimit << "," << _objectProxy->name() << "," << _endpoint.desc() << "]" << endl);
|
|
|
|
+ msg->eStatus = ReqMessage::REQ_EXC;
|
|
|
|
+
|
|
|
|
+ finishInvoke(msg);
|
|
|
|
+ return 0;
|
|
}
|
|
}
|
|
-#endif
|
|
|
|
- return 0;
|
|
|
|
|
|
+
|
|
|
|
+ //生成requestid
|
|
|
|
+ //taf调用 而且 不是单向调用
|
|
|
|
+ if (!msg->bFromRpc)
|
|
|
|
+ {
|
|
|
|
+ msg->request.iRequestId = _timeoutQueue->generateId();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if(_objectProxy->getServantProxy()->tars_connection_serial() > 0)
|
|
|
|
+ {
|
|
|
|
+ invoke_connection_serial(msg);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ invoke_connection_parallel(msg);
|
|
|
|
+ }
|
|
|
|
+//
|
|
|
|
+// msg->sReqData->setBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans.get()));
|
|
|
|
+//
|
|
|
|
+// //当前队列是空的, 且是连接复用模式, 交给连接发送数据
|
|
|
|
+// //连接连上 buffer不为空 发送数据成功
|
|
|
|
+// if (_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData) != Transceiver::eRetError)
|
|
|
|
+// {
|
|
|
|
+// TLOGTARS("[TARS][AdapterProxy::invoke push (send) obj: " << _objectProxy->name() << ", desc:" << _endpoint.desc() << ", id: " << msg->request.iRequestId << endl);
|
|
|
|
+//
|
|
|
|
+// //请求发送成功了 处理采样
|
|
|
|
+// //这个请求发送成功了。单向调用直接返回
|
|
|
|
+// if (msg->eType == ReqMessage::ONE_WAY)
|
|
|
|
+// {
|
|
|
|
+// delete msg;
|
|
|
|
+// msg = NULL;
|
|
|
|
+//
|
|
|
|
+// return 0;
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
|
|
|
|
+// if (!bFlag)
|
|
|
|
+// {
|
|
|
|
+// TLOGERROR("[TARS][AdapterProxy::invoke fail1 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ",id: " << msg->request.iRequestId << "," << _objectProxy->name() << ", " << _endpoint.desc() << "]" << endl);
|
|
|
|
+// msg->eStatus = ReqMessage::REQ_EXC;
|
|
|
|
+//
|
|
|
|
+// finishInvoke(msg);
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// else
|
|
|
|
+// {
|
|
|
|
+// TLOGTARS("[TARS][AdapterProxy::invoke push (no send) " << _objectProxy->name() << ", " << _endpoint.desc() << ",id " << msg->request.iRequestId << endl);
|
|
|
|
+//
|
|
|
|
+// //之前还没有数据没发送 或者 请求发送失败了, 进队列
|
|
|
|
+// bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime, false);
|
|
|
|
+// if (!bFlag)
|
|
|
|
+// {
|
|
|
|
+// TLOGERROR("[TARS][AdapterProxy::invoke fail2 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ", id: " << msg->request.iRequestId << ", " << _objectProxy->name() << ", " << _endpoint.desc() << "]" << endl);
|
|
|
|
+// msg->eStatus = ReqMessage::REQ_EXC;
|
|
|
|
+//
|
|
|
|
+// finishInvoke(msg);
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+
|
|
|
|
+ return 0;
|
|
}
|
|
}
|
|
|
|
|
|
-void AdapterProxy::doInvoke()
|
|
|
|
|
|
+void AdapterProxy::doInvoke_serial()
|
|
{
|
|
{
|
|
- while(!_timeoutQueue->sendListEmpty())
|
|
|
|
- {
|
|
|
|
- ReqMessage * msg = NULL;
|
|
|
|
|
|
+ assert(_requestMsg == NULL);
|
|
|
|
+
|
|
|
|
+ if(!_timeoutQueue->sendListEmpty())
|
|
|
|
+ {
|
|
|
|
+ ReqMessage * msg = NULL;
|
|
|
|
+
|
|
|
|
+ _timeoutQueue->getSend(msg);
|
|
|
|
+
|
|
|
|
+ int iRet = _trans->sendRequest(msg->sReqData);
|
|
|
|
+
|
|
|
|
+ //发送失败 返回
|
|
|
|
+ if (iRet == Transceiver::eRetError)
|
|
|
|
+ {
|
|
|
|
+ TLOGTARS("[TARS][AdapterProxy::doInvoke sendRequest failed, obj:" << _objectProxy->name() << ",desc:" << _endpoint.desc() << ",id:" << msg->request.iRequestId << ", ret:" << iRet << endl);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
|
|
- _timeoutQueue->getSend(msg);
|
|
|
|
|
|
+ //送send 队列中清掉, 但是保留在定时队列中
|
|
|
|
+ _timeoutQueue->popSend(false);
|
|
|
|
|
|
- int iRet = _trans->sendRequest(msg->sReqData);
|
|
|
|
|
|
+ _requestMsg = msg;
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void AdapterProxy::doInvoke_parallel()
|
|
|
|
+{
|
|
|
|
+ while(!_timeoutQueue->sendListEmpty())
|
|
|
|
+ {
|
|
|
|
+ ReqMessage * msg = NULL;
|
|
|
|
+
|
|
|
|
+ _timeoutQueue->getSend(msg);
|
|
|
|
+
|
|
|
|
+ int iRet = _trans->sendRequest(msg->sReqData);
|
|
|
|
|
|
//发送失败 返回
|
|
//发送失败 返回
|
|
if(iRet == Transceiver::eRetError)
|
|
if(iRet == Transceiver::eRetError)
|
|
@@ -243,7 +386,6 @@ void AdapterProxy::doInvoke()
|
|
TLOGTARS("[TARS][AdapterProxy::doInvoke sendRequest failed, obj:" << _objectProxy->name() << ",desc:" << _endpoint.desc() << ",id:" << msg->request.iRequestId << ", ret:" << iRet << endl);
|
|
TLOGTARS("[TARS][AdapterProxy::doInvoke sendRequest failed, obj:" << _objectProxy->name() << ",desc:" << _endpoint.desc() << ",id:" << msg->request.iRequestId << ", ret:" << iRet << endl);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- TLOGTARS("[TARS][AdapterProxy::doInvoke sendRequest obj:" << _objectProxy->name() << ",desc:" << _endpoint.desc() << ",id:" << msg->request.iRequestId << ",ret:" << iRet << endl);
|
|
|
|
|
|
|
|
//请求发送成功了 处理采样
|
|
//请求发送成功了 处理采样
|
|
//...
|
|
//...
|
|
@@ -264,6 +406,20 @@ void AdapterProxy::doInvoke()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+void AdapterProxy::doInvoke(bool initInvoke)
|
|
|
|
+{
|
|
|
|
+ if(_objectProxy->getServantProxy()->tars_connection_serial() > 0)
|
|
|
|
+ {
|
|
|
|
+ if(initInvoke)
|
|
|
|
+ {
|
|
|
|
+ doInvoke_serial();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ doInvoke_parallel();
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
|
|
void AdapterProxy::finishInvoke(bool bFail)
|
|
void AdapterProxy::finishInvoke(bool bFail)
|
|
{
|
|
{
|
|
@@ -375,7 +531,13 @@ int AdapterProxy::getConTimeout()
|
|
|
|
|
|
bool AdapterProxy::checkActive(bool bForceConnect, bool onlyCheck)
|
|
bool AdapterProxy::checkActive(bool bForceConnect, bool onlyCheck)
|
|
{
|
|
{
|
|
- time_t now = TNOW;
|
|
|
|
|
|
+ if(onlyCheck)
|
|
|
|
+ {
|
|
|
|
+ return _trans->hasConnected();
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ time_t now = TNOW;
|
|
|
|
|
|
TLOGTARS("[TARS][AdapterProxy::checkActive objname:" << _objectProxy->name()
|
|
TLOGTARS("[TARS][AdapterProxy::checkActive objname:" << _objectProxy->name()
|
|
<< ",desc:" << _endpoint.desc()
|
|
<< ",desc:" << _endpoint.desc()
|
|
@@ -452,10 +614,11 @@ bool AdapterProxy::checkActive(bool bForceConnect, bool onlyCheck)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- return _trans->hasConnected();
|
|
|
|
|
|
+ //已经建立连接了才返回
|
|
|
|
+ return _trans->hasConnected();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
void AdapterProxy::setConTimeout(bool bConTimeout)
|
|
void AdapterProxy::setConTimeout(bool bConTimeout)
|
|
{
|
|
{
|
|
if(bConTimeout != _connTimeout)
|
|
if(bConTimeout != _connTimeout)
|
|
@@ -469,24 +632,70 @@ void AdapterProxy::setConTimeout(bool bConTimeout)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+void AdapterProxy::onSetInactive()
|
|
|
|
+{
|
|
|
|
+ _activeStatus = false;
|
|
|
|
+
|
|
|
|
+ _nextRetryTime = TNOW + _objectProxy->checkTimeoutInfo().tryTimeInterval;
|
|
|
|
+
|
|
|
|
+ //需要关闭连接
|
|
|
|
+ _trans->close();
|
|
|
|
+}
|
|
|
|
+
|
|
//屏蔽结点
|
|
//屏蔽结点
|
|
void AdapterProxy::setInactive()
|
|
void AdapterProxy::setInactive()
|
|
{
|
|
{
|
|
- _activeStatus = false;
|
|
|
|
|
|
+ onSetInactive();
|
|
|
|
+
|
|
|
|
+ _objectProxy->getServantProxy()->onSetInactive(_endpoint);
|
|
|
|
+
|
|
|
|
+ TLOGTARS("[TARS][AdapterProxy::setInactive, " << _objectProxy->name() << ", " << _endpoint.desc() << ", inactive]" << endl);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void AdapterProxy::finishInvoke_serial(shared_ptr<ResponsePacket> & rsp)
|
|
|
|
+{
|
|
|
|
+ TLOGTARS("[TARS][AdapterProxy::finishInvoke(ResponsePacket), " << _objectProxy->name() << ", " << _endpoint.desc() << ", id:" << rsp->iRequestId << "]" << endl);
|
|
|
|
+
|
|
|
|
+ if (!_requestMsg)
|
|
|
|
+ {
|
|
|
|
+ if(_timeoutLogFlag)
|
|
|
|
+ {
|
|
|
|
+ TLOGERROR("[TARS][AdapterProxy::finishInvoke(ResponsePacket),"
|
|
|
|
+ << _objectProxy->name()
|
|
|
|
+ << ",get req-ptr NULL,may be timeout,id:"
|
|
|
|
+ << rsp->iRequestId << ",desc:" << _endpoint.desc() << "]" << endl);
|
|
|
|
+ }
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
|
|
- _nextRetryTime = TNOW + _objectProxy->checkTimeoutInfo().tryTimeInterval;
|
|
|
|
|
|
+ ReqMessage * msg = _requestMsg;
|
|
|
|
|
|
- _trans->close();
|
|
|
|
|
|
+ //这里的队列中的发送链表中的数据可能已经在timeout的时候删除了,因此可能会core,在erase中要加判断
|
|
|
|
+ //获取请求信息
|
|
|
|
+ bool retErase = _timeoutQueue->erase(_requestMsg->request.iRequestId, msg);
|
|
|
|
|
|
- TLOGTARS("[TARS][AdapterProxy::setInactive objname:" << _objectProxy->name() << ",desc:" << _endpoint.desc() << ",inactive" << endl);
|
|
|
|
|
|
+ assert(retErase);
|
|
|
|
+ assert(_requestMsg == msg);
|
|
|
|
+ assert(msg->eType != ReqMessage::ONE_WAY);
|
|
|
|
+ assert(msg->eStatus == ReqMessage::REQ_REQ);
|
|
|
|
+
|
|
|
|
+ _requestMsg = NULL;
|
|
|
|
+
|
|
|
|
+ msg->eStatus = ReqMessage::REQ_RSP;
|
|
|
|
+
|
|
|
|
+ msg->response = rsp;
|
|
|
|
+
|
|
|
|
+ finishInvoke(msg);
|
|
|
|
+
|
|
|
|
+ //检查连接状态
|
|
|
|
+ checkActive();
|
|
}
|
|
}
|
|
|
|
|
|
-void AdapterProxy::finishInvoke(shared_ptr<ResponsePacket> & rsp)
|
|
|
|
|
|
+void AdapterProxy::finishInvoke_parallel(shared_ptr<ResponsePacket> & rsp)
|
|
{
|
|
{
|
|
-// TLOGTARS("[TARS][AdapterProxy::finishInvoke(ResponsePacket) obj:" << _objectProxy->name() << ", desc:" << _endpoint.desc()
|
|
|
|
-// << ", id:" << rsp->iRequestId << endl);
|
|
|
|
|
|
+ TLOGTARS("[TARS][AdapterProxy::finishInvoke(ResponsePacket), " << _objectProxy->name() << ", " << _endpoint.desc() << ", id:" << rsp->iRequestId << "]" << endl);
|
|
|
|
|
|
- ReqMessage * msg = NULL;
|
|
|
|
|
|
+ ReqMessage * msg = NULL;
|
|
|
|
|
|
//requestid 为0 是push消息
|
|
//requestid 为0 是push消息
|
|
if(rsp->iRequestId == 0)
|
|
if(rsp->iRequestId == 0)
|
|
@@ -534,6 +743,70 @@ void AdapterProxy::finishInvoke(shared_ptr<ResponsePacket> & rsp)
|
|
finishInvoke(msg);
|
|
finishInvoke(msg);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+void AdapterProxy::finishInvoke(shared_ptr<ResponsePacket> & rsp)
|
|
|
|
+{
|
|
|
|
+ TLOGTARS("[TARS][AdapterProxy::finishInvoke(ResponsePacket), " << _objectProxy->name() << ", " << _endpoint.desc() << ", id:" << rsp->iRequestId << "]" << endl);
|
|
|
|
+
|
|
|
|
+ if(_objectProxy->getServantProxy()->tars_connection_serial() > 0)
|
|
|
|
+ {
|
|
|
|
+ finishInvoke_serial(rsp);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ finishInvoke_parallel(rsp);
|
|
|
|
+ }
|
|
|
|
+//
|
|
|
|
+// ReqMessage * msg = NULL;
|
|
|
|
+//
|
|
|
|
+// if (rsp->iRequestId == 0)
|
|
|
|
+// {
|
|
|
|
+// //requestid 为0 是push消息
|
|
|
|
+// //
|
|
|
|
+// //push callback is null
|
|
|
|
+// if (!_objectProxy->getPushCallback())
|
|
|
|
+// {
|
|
|
|
+// TLOGERROR("[TARS][AdapterProxy::finishInvoke(ResponsePacket), request id is 0, pushcallback is null, " << _objectProxy->name() << ", " << _endpoint.desc() << "]" << endl);
|
|
|
|
+// return;
|
|
|
|
+// }
|
|
|
|
+// msg = new ReqMessage();
|
|
|
|
+// msg->eStatus = ReqMessage::REQ_RSP;
|
|
|
|
+// msg->eType = ReqMessage::ASYNC_CALL;
|
|
|
|
+// msg->bFromRpc = true;
|
|
|
|
+// msg->bPush = true;
|
|
|
|
+// msg->proxy = _objectProxy->getServantProxy();
|
|
|
|
+// msg->pObjectProxy = _objectProxy;
|
|
|
|
+// msg->adapter = this;
|
|
|
|
+// msg->callback = _objectProxy->getPushCallback();
|
|
|
|
+// }
|
|
|
|
+// else
|
|
|
|
+// {
|
|
|
|
+// //这里的队列中的发送链表中的数据可能已经在timeout的时候删除了,因此可能会core,在erase中要加判断
|
|
|
|
+// //获取请求信息
|
|
|
|
+// bool retErase = _timeoutQueue->erase(rsp->iRequestId, msg);
|
|
|
|
+//
|
|
|
|
+// //找不到此id信息
|
|
|
|
+// if (!retErase)
|
|
|
|
+// {
|
|
|
|
+// if (_timeoutLogFlag)
|
|
|
|
+// {
|
|
|
|
+// TLOGERROR("[TARS][AdapterProxy::finishInvoke(ResponsePacket),"
|
|
|
|
+// << _objectProxy->name()
|
|
|
|
+// << ",get req-ptr NULL,may be timeout,id:"
|
|
|
|
+// << rsp->iRequestId << ",desc:" << _endpoint.desc() << "]" << endl);
|
|
|
|
+// }
|
|
|
|
+// return ;
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// assert(msg->eStatus == ReqMessage::REQ_REQ);
|
|
|
|
+//
|
|
|
|
+// msg->eStatus = ReqMessage::REQ_RSP;
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// msg->response = rsp;
|
|
|
|
+//
|
|
|
|
+// finishInvoke(msg);
|
|
|
|
+}
|
|
|
|
+
|
|
void AdapterProxy::finishInvoke(ReqMessage * msg)
|
|
void AdapterProxy::finishInvoke(ReqMessage * msg)
|
|
{
|
|
{
|
|
assert(msg->eStatus != ReqMessage::REQ_REQ);
|
|
assert(msg->eStatus != ReqMessage::REQ_REQ);
|
|
@@ -543,7 +816,7 @@ void AdapterProxy::finishInvoke(ReqMessage * msg)
|
|
<< ", status:" << msg->eStatus
|
|
<< ", status:" << msg->eStatus
|
|
<< ", ret: " << msg->response->iRet << endl);
|
|
<< ", ret: " << msg->response->iRet << endl);
|
|
|
|
|
|
-#ifdef _USE_OPENTRACKING
|
|
|
|
|
|
+#ifdef TARS_OPENTRACKING
|
|
finishTrack(msg);
|
|
finishTrack(msg);
|
|
#endif
|
|
#endif
|
|
|
|
|
|
@@ -595,16 +868,14 @@ void AdapterProxy::finishInvoke(ReqMessage * msg)
|
|
ReqMessagePtr msgPtr = msg;
|
|
ReqMessagePtr msgPtr = msg;
|
|
try
|
|
try
|
|
{
|
|
{
|
|
- msg->callback->onDispatch(msgPtr);
|
|
|
|
|
|
+ msg->callback->dispatch(msgPtr);
|
|
}
|
|
}
|
|
catch (exception & e)
|
|
catch (exception & e)
|
|
{
|
|
{
|
|
- //FDLOG("taferror")<<"[TAF]AdapterProxy::finishInvoke(ReqMessage) exp:"<<e.what()<<" ,line:"<<__LINE__<<endl;
|
|
|
|
- TLOGERROR("[TAF]AdapterProxy::finishInvoke(ReqMessage) exp:" << e.what() << " ,line:" << __LINE__ << endl);
|
|
|
|
|
|
+ TLOGERROR("[TARS]AdapterProxy::finishInvoke(ReqMessage) exp:" << e.what() << " ,line:" << __LINE__ << endl);
|
|
}
|
|
}
|
|
catch (...)
|
|
catch (...)
|
|
{
|
|
{
|
|
- //FDLOG("taferror")<<"[TAF]AdapterProxy::finishInvoke(ReqMessage) exp:unknown line:"<<__LINE__<<endl;
|
|
|
|
TLOGERROR("[TARS]AdapterProxy::finishInvoke(ReqMessage) exp:unknown line:|" << __LINE__ << endl);
|
|
TLOGERROR("[TARS]AdapterProxy::finishInvoke(ReqMessage) exp:unknown line:|" << __LINE__ << endl);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -641,11 +912,6 @@ void AdapterProxy::finishInvoke(ReqMessage * msg)
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
-//ObjectProxy * AdapterProxy::getObjProxy()
|
|
|
|
-//{
|
|
|
|
-// return _objectProxy;
|
|
|
|
-//}
|
|
|
|
-
|
|
|
|
void AdapterProxy::doTimeout()
|
|
void AdapterProxy::doTimeout()
|
|
{
|
|
{
|
|
ReqMessage * msg;
|
|
ReqMessage * msg;
|
|
@@ -653,7 +919,14 @@ void AdapterProxy::doTimeout()
|
|
{
|
|
{
|
|
TLOGTARS("[TARS][AdapterProxy::doTimeout obj:" << _objectProxy->name() << ",desc:" << _endpoint.desc() << ",id " << msg->request.iRequestId << endl);
|
|
TLOGTARS("[TARS][AdapterProxy::doTimeout obj:" << _objectProxy->name() << ",desc:" << _endpoint.desc() << ",id " << msg->request.iRequestId << endl);
|
|
|
|
|
|
- assert(msg->eStatus == ReqMessage::REQ_REQ);
|
|
|
|
|
|
+// assert(msg->eStatus == ReqMessage::REQ_REQ);
|
|
|
|
+
|
|
|
|
+ if(msg == _requestMsg)
|
|
|
|
+ {
|
|
|
|
+ _requestMsg = NULL;
|
|
|
|
+ //timeout, close
|
|
|
|
+ _trans->close();
|
|
|
|
+ }
|
|
|
|
|
|
msg->eStatus = ReqMessage::REQ_TIME;
|
|
msg->eStatus = ReqMessage::REQ_TIME;
|
|
|
|
|
|
@@ -677,7 +950,7 @@ void AdapterProxy::doTimeout()
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
-#ifdef _USE_OPENTRACKING
|
|
|
|
|
|
+#ifdef TARS_OPENTRACKING
|
|
void AdapterProxy::startTrack(ReqMessage * msg)
|
|
void AdapterProxy::startTrack(ReqMessage * msg)
|
|
{
|
|
{
|
|
if(!_communicator->_traceManager)
|
|
if(!_communicator->_traceManager)
|
|
@@ -784,7 +1057,7 @@ void AdapterProxy::stat(ReqMessage * msg)
|
|
_statBody[msg->request.sFuncName] = body;
|
|
_statBody[msg->request.sFuncName] = body;
|
|
}
|
|
}
|
|
|
|
|
|
- if(LOG->isNeedLog(TarsRollLogger::INFO_LOG))
|
|
|
|
|
|
+ if(LOG->isNeedLog(LocalRollLogger::INFO_LOG))
|
|
{
|
|
{
|
|
ostringstream os;
|
|
ostringstream os;
|
|
os.str("");
|
|
os.str("");
|
|
@@ -849,10 +1122,9 @@ void AdapterProxy::addConnExc(bool bExc)
|
|
{
|
|
{
|
|
if(!_connExc && _connExcCnt++ >= _objectProxy->checkTimeoutInfo().maxConnectExc)
|
|
if(!_connExc && _connExcCnt++ >= _objectProxy->checkTimeoutInfo().maxConnectExc)
|
|
{
|
|
{
|
|
- if(!_connExc)
|
|
|
|
- {
|
|
|
|
- TLOGERROR("[TARS][AdapterProxy::addConnExc desc:"<< _endpoint.desc() << ",connect exception status is true! (connect error)"<<endl);
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ TLOGERROR("[TARS][AdapterProxy::addConnExc desc:"<< _endpoint.desc() << ",connect exception status is true! (connect error)"<<endl);
|
|
|
|
+
|
|
setInactive();
|
|
setInactive();
|
|
_connExc = true;
|
|
_connExc = true;
|
|
}
|
|
}
|