input_messenger.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <gflags/gflags.h>
  18. #include "butil/fd_guard.h" // fd_guard
  19. #include "butil/logging.h" // CHECK
  20. #include "butil/time.h" // cpuwide_time_us
  21. #include "butil/fd_utility.h" // make_non_blocking
  22. #include "bthread/bthread.h" // bthread_start_background
  23. #include "bthread/unstable.h" // bthread_flush
  24. #include "bvar/bvar.h" // bvar::Adder
  25. #include "brpc/options.pb.h" // ProtocolType
  26. #include "brpc/reloadable_flags.h" // BRPC_VALIDATE_GFLAG
  27. #include "brpc/protocol.h" // ListProtocols
  28. #include "brpc/input_messenger.h"
  29. namespace brpc {
  30. InputMessenger* g_messenger = NULL;
  31. static pthread_once_t g_messenger_init = PTHREAD_ONCE_INIT;
  32. static void InitClientSideMessenger() {
  33. g_messenger = new InputMessenger;
  34. }
  35. InputMessenger* get_or_new_client_side_messenger() {
  36. pthread_once(&g_messenger_init, InitClientSideMessenger);
  37. return g_messenger;
  38. }
  39. static ProtocolType FindProtocolOfHandler(const InputMessageHandler& h);
  40. // NOTE: This flag was true by default before r31206. But since we have
  41. // /connections to view all the active connections, logging closing does not
  42. // help much to locate problems, and crashings are unlikely to be associated
  43. // with an EOF.
  44. DEFINE_bool(log_connection_close, false,
  45. "Print log when remote side closes the connection");
  46. BRPC_VALIDATE_GFLAG(log_connection_close, PassValidate);
  47. DECLARE_bool(usercode_in_pthread);
  48. DECLARE_uint64(max_body_size);
  49. const size_t MSG_SIZE_WINDOW = 10; // Take last so many message into stat.
  50. const size_t MIN_ONCE_READ = 4096;
  51. const size_t MAX_ONCE_READ = 524288;
  52. ParseResult InputMessenger::CutInputMessage(
  53. Socket* m, size_t* index, bool read_eof) {
  54. const int preferred = m->preferred_index();
  55. const int max_index = (int)_max_index.load(butil::memory_order_acquire);
  56. // Try preferred handler first. The preferred_index is set on last
  57. // selection or by client.
  58. if (preferred >= 0 && preferred <= max_index
  59. && _handlers[preferred].parse != NULL) {
  60. ParseResult result =
  61. _handlers[preferred].parse(&m->_read_buf, m, read_eof, _handlers[preferred].arg);
  62. if (result.is_ok() ||
  63. result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
  64. *index = preferred;
  65. return result;
  66. } else if (result.error() != PARSE_ERROR_TRY_OTHERS) {
  67. // Critical error, return directly.
  68. LOG_IF(ERROR, result.error() == PARSE_ERROR_TOO_BIG_DATA)
  69. << "A message from " << m->remote_side()
  70. << "(protocol=" << _handlers[preferred].name
  71. << ") is bigger than " << FLAGS_max_body_size
  72. << " bytes, the connection will be closed."
  73. " Set max_body_size to allow bigger messages";
  74. return result;
  75. }
  76. if (m->CreatedByConnect() &&
  77. // baidu_std may fall to streaming_rpc
  78. (ProtocolType)preferred != PROTOCOL_BAIDU_STD) {
  79. // The protocol is fixed at client-side, no need to try others.
  80. LOG(ERROR) << "Fail to parse response from " << m->remote_side()
  81. << " by " << _handlers[preferred].name
  82. << " at client-side";
  83. return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
  84. }
  85. // Clear context before trying next protocol which probably has
  86. // an incompatible context with the current one.
  87. if (m->parsing_context()) {
  88. m->reset_parsing_context(NULL);
  89. }
  90. m->set_preferred_index(-1);
  91. }
  92. for (int i = 0; i <= max_index; ++i) {
  93. if (i == preferred || _handlers[i].parse == NULL) {
  94. // Don't try preferred handler(already tried) or invalid handler
  95. continue;
  96. }
  97. ParseResult result = _handlers[i].parse(&m->_read_buf, m, read_eof, _handlers[i].arg);
  98. if (result.is_ok() ||
  99. result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
  100. m->set_preferred_index(i);
  101. *index = i;
  102. return result;
  103. } else if (result.error() != PARSE_ERROR_TRY_OTHERS) {
  104. // Critical error, return directly.
  105. LOG_IF(ERROR, result.error() == PARSE_ERROR_TOO_BIG_DATA)
  106. << "A message from " << m->remote_side()
  107. << "(protocol=" << _handlers[i].name
  108. << ") is bigger than " << FLAGS_max_body_size
  109. << " bytes, the connection will be closed."
  110. " Set max_body_size to allow bigger messages";
  111. return result;
  112. }
  113. // Clear context before trying next protocol which definitely has
  114. // an incompatible context with the current one.
  115. if (m->parsing_context()) {
  116. m->reset_parsing_context(NULL);
  117. }
  118. // Try other protocols.
  119. }
  120. return MakeParseError(PARSE_ERROR_TRY_OTHERS);
  121. }
  122. void* ProcessInputMessage(void* void_arg) {
  123. InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg);
  124. msg->_process(msg);
  125. return NULL;
  126. }
  127. struct RunLastMessage {
  128. inline void operator()(InputMessageBase* last_msg) {
  129. ProcessInputMessage(last_msg);
  130. }
  131. };
  132. static void QueueMessage(InputMessageBase* to_run_msg,
  133. int* num_bthread_created,
  134. bthread_keytable_pool_t* keytable_pool) {
  135. if (!to_run_msg) {
  136. return;
  137. }
  138. // Create bthread for last_msg. The bthread is not scheduled
  139. // until bthread_flush() is called (in the worse case).
  140. // TODO(gejun): Join threads.
  141. bthread_t th;
  142. bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
  143. BTHREAD_ATTR_PTHREAD :
  144. BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
  145. tmp.keytable_pool = keytable_pool;
  146. if (bthread_start_background(
  147. &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
  148. ++*num_bthread_created;
  149. } else {
  150. ProcessInputMessage(to_run_msg);
  151. }
  152. }
  153. void InputMessenger::OnNewMessages(Socket* m) {
  154. // Notes:
  155. // - If the socket has only one message, the message will be parsed and
  156. // processed in this bthread. nova-pbrpc and http works in this way.
  157. // - If the socket has several messages, all messages will be parsed (
  158. // meaning cutting from butil::IOBuf. serializing from protobuf is part of
  159. // "process") in this bthread. All messages except the last one will be
  160. // processed in separate bthreads. To minimize the overhead, scheduling
  161. // is batched(notice the BTHREAD_NOSIGNAL and bthread_flush).
  162. // - Verify will always be called in this bthread at most once and before
  163. // any process.
  164. InputMessenger* messenger = static_cast<InputMessenger*>(m->user());
  165. const InputMessageHandler* handlers = messenger->_handlers;
  166. int progress = Socket::PROGRESS_INIT;
  167. // Notice that all *return* no matter successful or not will run last
  168. // message, even if the socket is about to be closed. This should be
  169. // OK in most cases.
  170. std::unique_ptr<InputMessageBase, RunLastMessage> last_msg;
  171. bool read_eof = false;
  172. while (!read_eof) {
  173. const int64_t received_us = butil::cpuwide_time_us();
  174. const int64_t base_realtime = butil::gettimeofday_us() - received_us;
  175. // Calculate bytes to be read.
  176. size_t once_read = m->_avg_msg_size * 16;
  177. if (once_read < MIN_ONCE_READ) {
  178. once_read = MIN_ONCE_READ;
  179. } else if (once_read > MAX_ONCE_READ) {
  180. once_read = MAX_ONCE_READ;
  181. }
  182. // Read.
  183. const ssize_t nr = m->DoRead(once_read);
  184. if (nr <= 0) {
  185. if (0 == nr) {
  186. // Set `read_eof' flag and proceed to feed EOF into `Protocol'
  187. // (implied by m->_read_buf.empty), which may produce a new
  188. // `InputMessageBase' under some protocols such as HTTP
  189. LOG_IF(WARNING, FLAGS_log_connection_close) << *m << " was closed by remote side";
  190. read_eof = true;
  191. } else if (errno != EAGAIN) {
  192. if (errno == EINTR) {
  193. continue; // just retry
  194. }
  195. const int saved_errno = errno;
  196. PLOG(WARNING) << "Fail to read from " << *m;
  197. m->SetFailed(saved_errno, "Fail to read from %s: %s",
  198. m->description().c_str(), berror(saved_errno));
  199. return;
  200. } else if (!m->MoreReadEvents(&progress)) {
  201. return;
  202. } else { // new events during processing
  203. continue;
  204. }
  205. }
  206. m->AddInputBytes(nr);
  207. // Avoid this socket to be closed due to idle_timeout_s
  208. m->_last_readtime_us.store(received_us, butil::memory_order_relaxed);
  209. size_t last_size = m->_read_buf.length();
  210. int num_bthread_created = 0;
  211. while (1) {
  212. size_t index = 8888;
  213. ParseResult pr = messenger->CutInputMessage(m, &index, read_eof);
  214. if (!pr.is_ok()) {
  215. if (pr.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
  216. // incomplete message, re-read.
  217. // However, some buffer may have been consumed
  218. // under protocols like HTTP. Record this size
  219. m->_last_msg_size += (last_size - m->_read_buf.length());
  220. break;
  221. } else if (pr.error() == PARSE_ERROR_TRY_OTHERS) {
  222. LOG(WARNING)
  223. << "Close " << *m << " due to unknown message: "
  224. << butil::ToPrintable(m->_read_buf);
  225. m->SetFailed(EINVAL, "Close %s due to unknown message",
  226. m->description().c_str());
  227. return;
  228. } else {
  229. LOG(WARNING) << "Close " << *m << ": " << pr.error_str();
  230. m->SetFailed(EINVAL, "Close %s: %s",
  231. m->description().c_str(), pr.error_str());
  232. return;
  233. }
  234. }
  235. m->AddInputMessages(1);
  236. // Calculate average size of messages
  237. const size_t cur_size = m->_read_buf.length();
  238. if (cur_size == 0) {
  239. // _read_buf is consumed, it's good timing to return blocks
  240. // cached internally back to TLS, otherwise the memory is not
  241. // reused until next message arrives which is quite uncertain
  242. // in situations that most connections are idle.
  243. m->_read_buf.return_cached_blocks();
  244. }
  245. m->_last_msg_size += (last_size - cur_size);
  246. last_size = cur_size;
  247. const size_t old_avg = m->_avg_msg_size;
  248. if (old_avg != 0) {
  249. m->_avg_msg_size = (old_avg * (MSG_SIZE_WINDOW - 1) + m->_last_msg_size)
  250. / MSG_SIZE_WINDOW;
  251. } else {
  252. m->_avg_msg_size = m->_last_msg_size;
  253. }
  254. m->_last_msg_size = 0;
  255. if (pr.message() == NULL) { // the Process() step can be skipped.
  256. continue;
  257. }
  258. pr.message()->_received_us = received_us;
  259. pr.message()->_base_real_us = base_realtime;
  260. // This unique_ptr prevents msg to be lost before transfering
  261. // ownership to last_msg
  262. DestroyingPtr<InputMessageBase> msg(pr.message());
  263. QueueMessage(last_msg.release(), &num_bthread_created,
  264. m->_keytable_pool);
  265. if (handlers[index].process == NULL) {
  266. LOG(ERROR) << "process of index=" << index << " is NULL";
  267. continue;
  268. }
  269. m->ReAddress(&msg->_socket);
  270. m->PostponeEOF();
  271. msg->_process = handlers[index].process;
  272. msg->_arg = handlers[index].arg;
  273. if (handlers[index].verify != NULL) {
  274. int auth_error = 0;
  275. if (0 == m->FightAuthentication(&auth_error)) {
  276. // Get the right to authenticate
  277. if (handlers[index].verify(msg.get())) {
  278. m->SetAuthentication(0);
  279. } else {
  280. m->SetAuthentication(ERPCAUTH);
  281. LOG(WARNING) << "Fail to authenticate " << *m;
  282. m->SetFailed(ERPCAUTH, "Fail to authenticate %s",
  283. m->description().c_str());
  284. return;
  285. }
  286. } else {
  287. LOG_IF(FATAL, auth_error != 0) <<
  288. "Impossible! Socket should have been "
  289. "destroyed when authentication failed";
  290. }
  291. }
  292. if (!m->is_read_progressive()) {
  293. // Transfer ownership to last_msg
  294. last_msg.reset(msg.release());
  295. } else {
  296. QueueMessage(msg.release(), &num_bthread_created,
  297. m->_keytable_pool);
  298. bthread_flush();
  299. num_bthread_created = 0;
  300. }
  301. }
  302. if (num_bthread_created) {
  303. bthread_flush();
  304. }
  305. }
  306. if (read_eof) {
  307. m->SetEOF();
  308. }
  309. }
  310. InputMessenger::InputMessenger(size_t capacity)
  311. : _handlers(NULL)
  312. , _max_index(-1)
  313. , _non_protocol(false)
  314. , _capacity(capacity) {
  315. }
  316. InputMessenger::~InputMessenger() {
  317. delete[] _handlers;
  318. _handlers = NULL;
  319. _max_index.store(-1, butil::memory_order_relaxed);
  320. _capacity = 0;
  321. }
  322. int InputMessenger::AddHandler(const InputMessageHandler& handler) {
  323. if (handler.parse == NULL || handler.process == NULL
  324. || handler.name == NULL) {
  325. CHECK(false) << "Invalid argument";
  326. return -1;
  327. }
  328. BAIDU_SCOPED_LOCK(_add_handler_mutex);
  329. if (NULL == _handlers) {
  330. _handlers = new (std::nothrow) InputMessageHandler[_capacity];
  331. if (NULL == _handlers) {
  332. LOG(FATAL) << "Fail to new array of InputMessageHandler";
  333. return -1;
  334. }
  335. memset(_handlers, 0, sizeof(*_handlers) * _capacity);
  336. _non_protocol = false;
  337. }
  338. if (_non_protocol) {
  339. CHECK(false) << "AddNonProtocolHandler was invoked";
  340. return -1;
  341. }
  342. ProtocolType type = FindProtocolOfHandler(handler);
  343. if (type == PROTOCOL_UNKNOWN) {
  344. CHECK(false) << "Adding a handler which doesn't belong to any protocol";
  345. return -1;
  346. }
  347. const int index = type;
  348. if (index >= (int)_capacity) {
  349. LOG(FATAL) << "Can't add more handlers than " << _capacity;
  350. return -1;
  351. }
  352. if (_handlers[index].parse == NULL) {
  353. // The same protocol might be added more than twice
  354. _handlers[index] = handler;
  355. } else if (_handlers[index].parse != handler.parse
  356. || _handlers[index].process != handler.process) {
  357. CHECK(_handlers[index].parse == handler.parse);
  358. CHECK(_handlers[index].process == handler.process);
  359. return -1;
  360. }
  361. if (index > _max_index.load(butil::memory_order_relaxed)) {
  362. _max_index.store(index, butil::memory_order_release);
  363. }
  364. return 0;
  365. }
  366. int InputMessenger::AddNonProtocolHandler(const InputMessageHandler& handler) {
  367. if (handler.parse == NULL || handler.process == NULL
  368. || handler.name == NULL) {
  369. CHECK(false) << "Invalid argument";
  370. return -1;
  371. }
  372. BAIDU_SCOPED_LOCK(_add_handler_mutex);
  373. if (NULL == _handlers) {
  374. _handlers = new (std::nothrow) InputMessageHandler[_capacity];
  375. if (NULL == _handlers) {
  376. LOG(FATAL) << "Fail to new array of InputMessageHandler";
  377. return -1;
  378. }
  379. memset(_handlers, 0, sizeof(*_handlers) * _capacity);
  380. _non_protocol = true;
  381. }
  382. if (!_non_protocol) {
  383. CHECK(false) << "AddHandler was invoked";
  384. return -1;
  385. }
  386. const int index = _max_index.load(butil::memory_order_relaxed) + 1;
  387. _handlers[index] = handler;
  388. _max_index.store(index, butil::memory_order_release);
  389. return 0;
  390. }
  391. int InputMessenger::Create(const butil::EndPoint& remote_side,
  392. time_t health_check_interval_s,
  393. SocketId* id) {
  394. SocketOptions options;
  395. options.remote_side = remote_side;
  396. options.user = this;
  397. options.on_edge_triggered_events = OnNewMessages;
  398. options.health_check_interval_s = health_check_interval_s;
  399. return Socket::Create(options, id);
  400. }
  401. int InputMessenger::Create(SocketOptions options, SocketId* id) {
  402. options.user = this;
  403. options.on_edge_triggered_events = OnNewMessages;
  404. return Socket::Create(options, id);
  405. }
  406. int InputMessenger::FindProtocolIndex(const char* name) const {
  407. for (size_t i = 0; i < _capacity; ++i) {
  408. if (_handlers[i].parse != NULL
  409. && strcmp(name, _handlers[i].name) == 0) {
  410. return i;
  411. }
  412. }
  413. return -1;
  414. }
  415. int InputMessenger::FindProtocolIndex(ProtocolType type) const {
  416. const Protocol* proto = FindProtocol(type);
  417. if (NULL == proto) {
  418. return -1;
  419. }
  420. return FindProtocolIndex(proto->name);
  421. }
  422. const char* InputMessenger::NameOfProtocol(int n) const {
  423. if (n < 0 || (size_t)n >= _capacity || _handlers[n].parse == NULL) {
  424. return "unknown"; // use lowercase to be consistent with valid names.
  425. }
  426. return _handlers[n].name;
  427. }
  428. static ProtocolType FindProtocolOfHandler(const InputMessageHandler& h) {
  429. std::vector<std::pair<ProtocolType, Protocol> > vec;
  430. ListProtocols(&vec);
  431. for (size_t i = 0; i < vec.size(); ++i) {
  432. if (vec[i].second.parse == h.parse &&
  433. ((vec[i].second.process_request == h.process)
  434. // ^^ server side
  435. || (vec[i].second.process_response == h.process))
  436. // ^^ client side
  437. && strcmp(vec[i].second.name, h.name) == 0) {
  438. return vec[i].first;
  439. }
  440. }
  441. return PROTOCOL_UNKNOWN;
  442. }
  443. void InputMessageBase::Destroy() {
  444. // Release base-specific resources.
  445. if (_socket) {
  446. _socket->CheckEOF();
  447. _socket.reset();
  448. }
  449. DestroyImpl();
  450. // This object may be destroyed, don't touch fields anymore.
  451. }
  452. Socket* InputMessageBase::ReleaseSocket() {
  453. Socket* s = _socket.release();
  454. if (s) {
  455. s->CheckEOF();
  456. }
  457. return s;
  458. }
  459. InputMessageBase::~InputMessageBase() {}
  460. } // namespace brpc