|
@@ -758,6 +758,8 @@ void EndpointManager::updateEndpoints(const set<EndpointInfo> & active, const se
|
|
|
|
|
|
_activeProxys.clear();
|
|
|
_regProxys.clear();
|
|
|
+ _indexActiveProxys.clear();
|
|
|
+ _sortActivProxys.clear();
|
|
|
|
|
|
if(!active.empty())
|
|
|
{
|
|
@@ -797,6 +799,10 @@ void EndpointManager::updateEndpoints(const set<EndpointInfo> & active, const se
|
|
|
|
|
|
_regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
|
|
|
|
|
|
+ const string &host = iterAdapter->second->endpoint().host();
|
|
|
+ _indexActiveProxys.insert(make_pair(host, iterAdapter->second));
|
|
|
+ _sortActivProxys.insert(make_pair(host, iterAdapter->second));
|
|
|
+
|
|
|
//设置该节点的静态权重值
|
|
|
iterAdapter->second->setWeight(iter->weight());
|
|
|
}
|
|
@@ -1010,6 +1016,7 @@ AdapterProxy* EndpointManager::getHashProxyForWeight(int64_t hashCode, bool bSta
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << _vRegProxys[iIndex]->endpoint().desc() << endl);
|
|
|
if(_activeProxys.empty())
|
|
|
{
|
|
|
TLOGERROR("[EndpointManager::getHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
|
|
@@ -1096,7 +1103,47 @@ AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool b
|
|
|
TLOGTARS("[EndpointManager::getConHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
|
|
|
}
|
|
|
|
|
|
- if(_consistentHashWeight.size() > 0)
|
|
|
+ while(_consistentHashWeight.size() > 0)
|
|
|
+ {
|
|
|
+ string sNode;
|
|
|
+
|
|
|
+ // 通过一致性hash取到对应的节点
|
|
|
+ _consistentHashWeight.getNodeName(hashCode, sNode);
|
|
|
+
|
|
|
+ auto it = _indexActiveProxys.find(sNode);
|
|
|
+ // 节点不存在,可能是下线或者服务不可用
|
|
|
+ if (it == _indexActiveProxys.end())
|
|
|
+ {
|
|
|
+ updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ //被hash到的节点在主控是active的才走在流程
|
|
|
+ if (it->second->isActiveInReg() && it->second->checkActive(true))
|
|
|
+ {
|
|
|
+ return it->second;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
|
|
|
+ // 剔除节点再次hash
|
|
|
+ if (!it->second->isActiveInReg())
|
|
|
+ {
|
|
|
+ // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
|
|
|
+ _indexActiveProxys.erase(sNode);
|
|
|
+ }
|
|
|
+ // checkConHashChange里重新加回到_sortActivProxys重试
|
|
|
+ _sortActivProxys.erase(sNode);
|
|
|
+ updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
|
|
|
+
|
|
|
+ if (_indexActiveProxys.empty())
|
|
|
+ {
|
|
|
+ TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /*if(_consistentHashWeight.size() > 0)
|
|
|
{
|
|
|
unsigned int iIndex = 0;
|
|
|
|
|
@@ -1115,6 +1162,7 @@ AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool b
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "," << _vRegProxys[iIndex]->getTransceiver()->getEndpointInfo().desc() << endl);
|
|
|
if(_activeProxys.empty())
|
|
|
{
|
|
|
TLOGERROR("[EndpointManager::getConHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
|
|
@@ -1176,7 +1224,7 @@ AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool b
|
|
|
|
|
|
return adapterProxy;
|
|
|
}
|
|
|
- }
|
|
|
+ }*/
|
|
|
|
|
|
return getHashProxyForNormal(hashCode);
|
|
|
}
|
|
@@ -1203,8 +1251,38 @@ bool EndpointManager::checkHashStaticWeightChange(bool bStatic)
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
-bool EndpointManager::checkConHashChange(bool bStatic, const vector<AdapterProxy*> &vLastConHashProxys)
|
|
|
+bool EndpointManager::checkConHashChange(bool bStatic, const map<string, AdapterProxy*> &mLastConHashProxys)
|
|
|
{
|
|
|
+ // 将之前故障临时剔除的节点重新加回来重试
|
|
|
+ if (_indexActiveProxys.size() != _sortActivProxys.size())
|
|
|
+ {
|
|
|
+ for (auto &it : _indexActiveProxys)
|
|
|
+ {
|
|
|
+ _sortActivProxys[it.first] = it.second;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(mLastConHashProxys.size() != _sortActivProxys.size())
|
|
|
+ {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ auto itLast = mLastConHashProxys.begin();
|
|
|
+ auto itSort = _sortActivProxys.begin();
|
|
|
+ for (; itLast!=mLastConHashProxys.end() && itSort!=_sortActivProxys.end(); ++itLast,++itSort)
|
|
|
+ {
|
|
|
+ if (itLast->first != itSort->first)
|
|
|
+ {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ //解决服务权重更新时一致性哈希环不更新的问题
|
|
|
+ if(bStatic && itSort->second->checkWeightChanged(true))
|
|
|
+ {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+/*
|
|
|
if(vLastConHashProxys.size() != _vRegProxys.size())
|
|
|
{
|
|
|
return true;
|
|
@@ -1223,6 +1301,7 @@ bool EndpointManager::checkConHashChange(bool bStatic, const vector<AdapterProxy
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
+*/
|
|
|
|
|
|
return false;
|
|
|
}
|
|
@@ -1320,7 +1399,7 @@ void EndpointManager::updateHashProxyWeighted(bool bStatic)
|
|
|
_hashStaticRouterCache.push_back(vIndex[i]);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
TLOGTARS("EndpointManager::updateHashProxyWeighted bStatic:" << bStatic << "|_objName:" << _objName << "|endpoint:" << vRegProxys[i]->endpoint().desc() << "|iWeight:" << vRegProxys[i]->getWeight() << "|iWeightR:" << iWeight << "|iIndex:" << vIndex[i] << endl);
|
|
|
}
|
|
|
|
|
@@ -1354,10 +1433,39 @@ void EndpointManager::updateHashProxyWeighted(bool bStatic)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void EndpointManager::updateConHashProxyWeighted(bool bStatic, vector<AdapterProxy*> &vLastConHashProxys, TC_ConsistentHashNew &conHash)
|
|
|
+void EndpointManager::updateConHashProxyWeighted(bool bStatic, map<string, AdapterProxy*> &mLastConHashProxys, TC_ConsistentHashNew &conHash)
|
|
|
{
|
|
|
- if(_vRegProxys.size() <= 0)
|
|
|
+ conHash.clear();
|
|
|
+ if(_sortActivProxys.empty())
|
|
|
{
|
|
|
+ TLOGERROR("[EndpointManager::updateHashProxyWeighted _indexActiveProxys is empty], bStatic:" << bStatic << endl);
|
|
|
+ return ;
|
|
|
+ }
|
|
|
+
|
|
|
+ mLastConHashProxys = _sortActivProxys;
|
|
|
+
|
|
|
+ for (auto it = _sortActivProxys.begin(); it != _sortActivProxys.end(); ++it)
|
|
|
+ {
|
|
|
+ int iWeight = (bStatic ? (it->second->getWeight()) : 100);
|
|
|
+ if(iWeight > 0)
|
|
|
+ {
|
|
|
+ iWeight = iWeight / 4;
|
|
|
+ if(iWeight <= 0)
|
|
|
+ {
|
|
|
+ iWeight = 1;
|
|
|
+ }
|
|
|
+ // 同一服务有多个obj的情况
|
|
|
+ // 同一hash值调用不同的obj会hash到不同的服务器
|
|
|
+ // 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
|
|
|
+ // 一致性hash用host进行索引,不使用index,这里传0
|
|
|
+ conHash.addNode(it->second->endpoint().host(), 0, iWeight);
|
|
|
+ }
|
|
|
+ //防止多个服务节点权重同时更新时一致性哈希环多次更新
|
|
|
+ it->second->resetWeightChanged();
|
|
|
+ }
|
|
|
+/*
|
|
|
+ if(_vRegProxys.size() <= 0)
|
|
|
+ {
|
|
|
TLOGERROR("[EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
|
|
|
return ;
|
|
|
}
|
|
@@ -1383,7 +1491,7 @@ void EndpointManager::updateConHashProxyWeighted(bool bStatic, vector<AdapterPro
|
|
|
//防止多个服务节点权重同时更新时一致性哈希环多次更新
|
|
|
_vRegProxys[i]->resetWeightChanged();
|
|
|
}
|
|
|
-
|
|
|
+*/
|
|
|
conHash.sortNode();
|
|
|
}
|
|
|
|
|
@@ -1413,6 +1521,7 @@ AdapterProxy* EndpointManager::getHashProxyForNormal(int64_t hashCode)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ TLOGWARN("[EndpointManager::getHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << _vRegProxys[hash]->endpoint().desc() << endl);
|
|
|
if(_activeProxys.empty())
|
|
|
{
|
|
|
TLOGERROR("[EndpointManager::getHashProxyForNormal _activeEndpoints is empty]" << endl);
|
|
@@ -1494,6 +1603,47 @@ AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode)
|
|
|
TLOGTARS("[EndpointManager::getConHashProxyForNormal update _objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
|
|
|
}
|
|
|
|
|
|
+ while(_consistentHash.size() > 0)
|
|
|
+ {
|
|
|
+ string sNode;
|
|
|
+
|
|
|
+ // 通过一致性hash取到对应的节点
|
|
|
+ _consistentHash.getNodeName(hashCode, sNode);
|
|
|
+
|
|
|
+ auto it = _indexActiveProxys.find(sNode);
|
|
|
+ // 节点不存在,可能是下线或者服务不可用
|
|
|
+ if (it == _indexActiveProxys.end())
|
|
|
+ {
|
|
|
+ updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ //被hash到的节点在主控是active的才走在流程
|
|
|
+ if (it->second->isActiveInReg() && it->second->checkActive(true))
|
|
|
+ {
|
|
|
+ return it->second;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
|
|
|
+ // 剔除节点再次hash
|
|
|
+ if (!it->second->isActiveInReg())
|
|
|
+ {
|
|
|
+ // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
|
|
|
+ _indexActiveProxys.erase(sNode);
|
|
|
+ }
|
|
|
+ // checkConHashChange里重新加回到_sortActivProxys重试
|
|
|
+ _sortActivProxys.erase(sNode);
|
|
|
+ updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
|
|
|
+
|
|
|
+ if (_indexActiveProxys.empty())
|
|
|
+ {
|
|
|
+ TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+/*
|
|
|
if(_consistentHash.size() > 0)
|
|
|
{
|
|
|
unsigned int iIndex = 0;
|
|
@@ -1513,6 +1663,7 @@ AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "," << _vRegProxys[iIndex]->getTransceiver()->getEndpointInfo().desc() << endl);
|
|
|
if(_activeProxys.empty())
|
|
|
{
|
|
|
TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
|
|
@@ -1575,7 +1726,7 @@ AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode)
|
|
|
return adapterProxy;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+*/
|
|
|
return getHashProxyForNormal(hashCode);
|
|
|
}
|
|
|
|
|
@@ -1998,14 +2149,14 @@ void EndpointManagerThread::getEndpointByAll(vector<EndpointInfo> &activeEndPoin
|
|
|
pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
|
|
|
}
|
|
|
|
|
|
-void EndpointManagerThread::getEndpointBySet(const string sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
|
|
|
+void EndpointManagerThread::getEndpointBySet(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
|
|
|
{
|
|
|
EndpointThread * pThread = getEndpointThread(E_SET,sName);
|
|
|
|
|
|
pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
|
|
|
}
|
|
|
|
|
|
-void EndpointManagerThread::getEndpointByStation(const string sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
|
|
|
+void EndpointManagerThread::getEndpointByStation(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
|
|
|
{
|
|
|
EndpointThread * pThread = getEndpointThread(E_STATION,sName);
|
|
|
|
|
@@ -2027,14 +2178,14 @@ void EndpointManagerThread::getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoi
|
|
|
pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
|
|
|
}
|
|
|
|
|
|
-void EndpointManagerThread::getTCEndpointBySet(const string sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
|
|
|
+void EndpointManagerThread::getTCEndpointBySet(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
|
|
|
{
|
|
|
EndpointThread * pThread = getEndpointThread(E_SET,sName);
|
|
|
|
|
|
pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
|
|
|
}
|
|
|
|
|
|
-void EndpointManagerThread::getTCEndpointByStation(const string sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
|
|
|
+void EndpointManagerThread::getTCEndpointByStation(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
|
|
|
{
|
|
|
|
|
|
EndpointThread * pThread = getEndpointThread(E_STATION,sName);
|