123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- /**
- * Tencent is pleased to support the open source community by making Tars available.
- *
- * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
- *
- * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * https://opensource.org/licenses/BSD-3-Clause
- *
- * Unless required by applicable law or agreed to in writing, software distributed
- * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
- #include "servant/AsyncProcThread.h"
- #include "servant/Communicator.h"
- #include "servant/StatReport.h"
- #include "servant/RemoteLogger.h"
- #include "servant/AdapterProxy.h"
- namespace tars
- {
- AsyncProcThread::AsyncProcThread(size_t iQueueCap, bool merge)
- : _terminate(false), _iQueueCap(iQueueCap), _merge(merge)
- {
- _msgQueue = new TC_CasQueue<ReqMessage*>();
- if(!_merge)
- {
- start();
- }
- }
- AsyncProcThread::~AsyncProcThread()
- {
- terminate();
- if(_msgQueue)
- {
- delete _msgQueue;
- _msgQueue = NULL;
- }
- }
- void AsyncProcThread::terminate()
- {
- if(!_merge) {
- Lock lock(*this);
- _terminate = true;
- notifyAll();
- }
- }
- void AsyncProcThread::push_back(ReqMessage * msg)
- {
- if(_merge) {
- //合并了, 直接回调
- callback(msg);
- }
- else {
- if(_msgQueue->size() >= _iQueueCap)
- {
- TLOGERROR("[AsyncProcThread::push_back] async_queue full:" << _msgQueue->size() << ">=" << _iQueueCap << endl);
- delete msg;
- }
- else
- {
- _msgQueue->push_back(msg);
- TC_ThreadLock::Lock lock(*this);
- notify();
- }
- }
- }
- void AsyncProcThread::run()
- {
- while (!_terminate)
- {
- ReqMessage * msg;
- //异步请求回来的响应包处理
- if (_msgQueue->pop_front(msg))
- {
- callback(msg);
- }
- else
- {
- TC_ThreadLock::Lock lock(*this);
- timedWait(1000);
- }
- }
- ReqMessage * msg;
- while(_msgQueue->pop_front(msg))
- {
- delete msg;
- }
- ServantProxyThreadData::g_sp.reset();
- CallbackThreadData::g_sp.reset();
- }
- void AsyncProcThread::callback(ReqMessage * msg)
- {
- TLOGTARS("[AsyncProcThread::run] get one msg." << endl);
- //从回调对象把线程私有数据传递到回调线程中
- ServantProxyThreadData * pServantProxyThreadData = ServantProxyThreadData::getData();
- // assert(pServantProxyThreadData != NULL);
- //把染色的消息设置在线程私有数据里面
- pServantProxyThreadData->_data._dyeing = msg->data._dyeing;
- pServantProxyThreadData->_data._dyeingKey = msg->data._dyeingKey;
- pServantProxyThreadData->_data._cookie = msg->data._cookie;
- //=======
- // pServantProxyThreadData->_dyeing = msg->bDyeing;
- // pServantProxyThreadData->_dyeingKey = msg->sDyeingKey;
- pServantProxyThreadData->_traceCall = msg->bTraceCall;
- pServantProxyThreadData->initTrace(msg->sTraceKey);
- // pServantProxyThreadData->_cookie = msg->cookie;
- //>>>>>>> origin/delay
- if(msg->adapter)
- {
- pServantProxyThreadData->_data._szHost = msg->adapter->endpoint().desc();
- }
- try
- {
- ReqMessagePtr msgPtr = msg;
- msg->callback->dispatch(msgPtr);
- }
- catch (exception& e)
- {
- TLOGERROR("[AsyncProcThread exception]:" << e.what() << endl);
- }
- catch (...)
- {
- TLOGERROR("[AsyncProcThread exception.]" << endl);
- }
- }
- /////////////////////////////////////////////////////////////////////////
- }
|