AsyncProcThread.cpp 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/AsyncProcThread.h"
  17. #include "servant/Communicator.h"
  18. #include "servant/StatReport.h"
  19. #include "servant/RemoteLogger.h"
  20. namespace tars
  21. {
  22. AsyncProcThread::AsyncProcThread(size_t iQueueCap, bool merge)
  23. : _terminate(false), _iQueueCap(iQueueCap), _merge(merge)
  24. {
  25. _msgQueue = new TC_CasQueue<ReqMessage*>();
  26. if(!_merge)
  27. {
  28. start();
  29. }
  30. }
  31. AsyncProcThread::~AsyncProcThread()
  32. {
  33. terminate();
  34. if(_msgQueue)
  35. {
  36. delete _msgQueue;
  37. _msgQueue = NULL;
  38. }
  39. }
  40. void AsyncProcThread::terminate()
  41. {
  42. if(!_merge) {
  43. TC_ThreadLock::Lock lock(*this);
  44. _terminate = true;
  45. notifyAll();
  46. }
  47. }
  48. void AsyncProcThread::push_back(ReqMessage * msg)
  49. {
  50. if(_merge) {
  51. //合并了, 直接回调
  52. callback(msg);
  53. }
  54. else {
  55. if(_msgQueue->size() >= _iQueueCap)
  56. {
  57. TLOGERROR("[TARS][AsyncProcThread::push_back] async_queue full:" << _msgQueue->size() << ">=" << _iQueueCap << endl);
  58. delete msg;
  59. }
  60. else
  61. {
  62. _msgQueue->push_back(msg);
  63. TC_ThreadLock::Lock lock(*this);
  64. notify();
  65. }
  66. }
  67. }
  68. void AsyncProcThread::run()
  69. {
  70. while (!_terminate)
  71. {
  72. ReqMessage * msg;
  73. if (_msgQueue->pop_front(msg))
  74. {
  75. callback(msg);
  76. }
  77. else
  78. {
  79. TC_ThreadLock::Lock lock(*this);
  80. timedWait(1000);
  81. }
  82. }
  83. }
  84. void AsyncProcThread::callback(ReqMessage * msg)
  85. {
  86. TLOGTARS("[TARS][AsyncProcThread::run] get one msg." << endl);
  87. //从回调对象把线程私有数据传递到回调线程中
  88. ServantProxyThreadData * pServantProxyThreadData = ServantProxyThreadData::getData();
  89. assert(pServantProxyThreadData != NULL);
  90. //把染色的消息设置在线程私有数据里面
  91. pServantProxyThreadData->_dyeing = msg->bDyeing;
  92. pServantProxyThreadData->_dyeingKey = msg->sDyeingKey;
  93. pServantProxyThreadData->_cookie = msg->cookie;
  94. if(msg->adapter)
  95. {
  96. pServantProxyThreadData->_szHost = msg->adapter->endpoint().desc();
  97. }
  98. try
  99. {
  100. ReqMessagePtr msgPtr = msg;
  101. msg->callback->dispatch(msgPtr);
  102. }
  103. catch (exception& e)
  104. {
  105. TLOGERROR("[TARS][AsyncProcThread exception]:" << e.what() << endl);
  106. }
  107. catch (...)
  108. {
  109. TLOGERROR("[TARS][AsyncProcThread exception.]" << endl);
  110. }
  111. }
  112. /////////////////////////////////////////////////////////////////////////
  113. }