acceptor.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <inttypes.h>
  18. #include <gflags/gflags.h>
  19. #include "butil/fd_guard.h" // fd_guard
  20. #include "butil/fd_utility.h" // make_close_on_exec
  21. #include "butil/time.h" // gettimeofday_us
  22. #include "brpc/acceptor.h"
  23. namespace brpc {
  24. static const int INITIAL_CONNECTION_CAP = 65536;
  25. Acceptor::Acceptor(bthread_keytable_pool_t* pool)
  26. : InputMessenger()
  27. , _keytable_pool(pool)
  28. , _status(UNINITIALIZED)
  29. , _idle_timeout_sec(-1)
  30. , _close_idle_tid(INVALID_BTHREAD)
  31. , _listened_fd(-1)
  32. , _acception_id(0)
  33. , _empty_cond(&_map_mutex)
  34. , _ssl_ctx(NULL) {
  35. }
  36. Acceptor::~Acceptor() {
  37. StopAccept(0);
  38. Join();
  39. }
  40. int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,
  41. const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
  42. if (listened_fd < 0) {
  43. LOG(FATAL) << "Invalid listened_fd=" << listened_fd;
  44. return -1;
  45. }
  46. BAIDU_SCOPED_LOCK(_map_mutex);
  47. if (_status == UNINITIALIZED) {
  48. if (Initialize() != 0) {
  49. LOG(FATAL) << "Fail to initialize Acceptor";
  50. return -1;
  51. }
  52. _status = READY;
  53. }
  54. if (_status != READY) {
  55. LOG(FATAL) << "Acceptor hasn't stopped yet: status=" << status();
  56. return -1;
  57. }
  58. if (idle_timeout_sec > 0) {
  59. if (bthread_start_background(&_close_idle_tid, NULL,
  60. CloseIdleConnections, this) != 0) {
  61. LOG(FATAL) << "Fail to start bthread";
  62. return -1;
  63. }
  64. }
  65. _idle_timeout_sec = idle_timeout_sec;
  66. _ssl_ctx = ssl_ctx;
  67. // Creation of _acception_id is inside lock so that OnNewConnections
  68. // (which may run immediately) should see sane fields set below.
  69. SocketOptions options;
  70. options.fd = listened_fd;
  71. options.user = this;
  72. options.on_edge_triggered_events = OnNewConnections;
  73. if (Socket::Create(options, &_acception_id) != 0) {
  74. // Close-idle-socket thread will be stopped inside destructor
  75. LOG(FATAL) << "Fail to create _acception_id";
  76. return -1;
  77. }
  78. _listened_fd = listened_fd;
  79. _status = RUNNING;
  80. return 0;
  81. }
  82. void* Acceptor::CloseIdleConnections(void* arg) {
  83. Acceptor* am = static_cast<Acceptor*>(arg);
  84. std::vector<SocketId> checking_fds;
  85. const uint64_t CHECK_INTERVAL_US = 1000000UL;
  86. while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
  87. // TODO: this is not efficient for a lot of connections(>100K)
  88. am->ListConnections(&checking_fds);
  89. for (size_t i = 0; i < checking_fds.size(); ++i) {
  90. SocketUniquePtr s;
  91. if (Socket::Address(checking_fds[i], &s) == 0) {
  92. s->ReleaseReferenceIfIdle(am->_idle_timeout_sec);
  93. }
  94. }
  95. }
  96. return NULL;
  97. }
  98. void Acceptor::StopAccept(int /*closewait_ms*/) {
  99. // Currently `closewait_ms' is useless since we have to wait until
  100. // existing requests are finished. Otherwise, contexts depended by
  101. // the requests may be deleted and invalid.
  102. {
  103. BAIDU_SCOPED_LOCK(_map_mutex);
  104. if (_status != RUNNING) {
  105. return;
  106. }
  107. _status = STOPPING;
  108. }
  109. // Don't set _acception_id to 0 because BeforeRecycle needs it.
  110. Socket::SetFailed(_acception_id);
  111. // SetFailed all existing connections. Connections added after this piece
  112. // of code will be SetFailed directly in OnNewConnectionsUntilEAGAIN
  113. std::vector<SocketId> erasing_ids;
  114. ListConnections(&erasing_ids);
  115. for (size_t i = 0; i < erasing_ids.size(); ++i) {
  116. SocketUniquePtr socket;
  117. if (Socket::Address(erasing_ids[i], &socket) == 0) {
  118. if (socket->shall_fail_me_at_server_stop()) {
  119. // Mainly streaming connections, should be SetFailed() to
  120. // trigger callbacks to NotifyOnFailed() to remove references,
  121. // otherwise the sockets are often referenced by corresponding
  122. // objects and delay server's stopping which requires all
  123. // existing sockets to be recycled.
  124. socket->SetFailed(ELOGOFF, "Server is stopping");
  125. } else {
  126. // Message-oriented RPC connections. Just release the addtional
  127. // reference in the socket, which will be recycled when current
  128. // requests have been processed.
  129. socket->ReleaseAdditionalReference();
  130. }
  131. } // else: This socket already called `SetFailed' before
  132. }
  133. }
  134. int Acceptor::Initialize() {
  135. if (_socket_map.init(INITIAL_CONNECTION_CAP) != 0) {
  136. LOG(FATAL) << "Fail to initialize FlatMap, size="
  137. << INITIAL_CONNECTION_CAP;
  138. return -1;
  139. }
  140. return 0;
  141. }
  142. // NOTE: Join() can happen before StopAccept()
  143. void Acceptor::Join() {
  144. std::unique_lock<butil::Mutex> mu(_map_mutex);
  145. if (_status != STOPPING && _status != RUNNING) { // no need to join.
  146. return;
  147. }
  148. // `_listened_fd' will be set to -1 once it has been recycled
  149. while (_listened_fd > 0 || !_socket_map.empty()) {
  150. _empty_cond.Wait();
  151. }
  152. const int saved_idle_timeout_sec = _idle_timeout_sec;
  153. _idle_timeout_sec = 0;
  154. const bthread_t saved_close_idle_tid = _close_idle_tid;
  155. mu.unlock();
  156. // Join the bthread outside lock.
  157. if (saved_idle_timeout_sec > 0) {
  158. bthread_stop(saved_close_idle_tid);
  159. bthread_join(saved_close_idle_tid, NULL);
  160. }
  161. {
  162. BAIDU_SCOPED_LOCK(_map_mutex);
  163. _status = READY;
  164. }
  165. }
  166. size_t Acceptor::ConnectionCount() const {
  167. // Notice that _socket_map may be modified concurrently. This actually
  168. // assumes that size() is safe to call concurrently.
  169. return _socket_map.size();
  170. }
  171. void Acceptor::ListConnections(std::vector<SocketId>* conn_list,
  172. size_t max_copied) {
  173. if (conn_list == NULL) {
  174. LOG(FATAL) << "Param[conn_list] is NULL";
  175. return;
  176. }
  177. conn_list->clear();
  178. // Add additional 10(randomly small number) so that even if
  179. // ConnectionCount is inaccurate, enough space is reserved
  180. conn_list->reserve(ConnectionCount() + 10);
  181. std::unique_lock<butil::Mutex> mu(_map_mutex);
  182. if (!_socket_map.initialized()) {
  183. // Optional. Uninitialized FlatMap should be iteratable.
  184. return;
  185. }
  186. // Copy all the SocketId (protected by mutex) into a temporary
  187. // container to avoid dealing with sockets inside the mutex.
  188. size_t ntotal = 0;
  189. size_t n = 0;
  190. for (SocketMap::const_iterator it = _socket_map.begin();
  191. it != _socket_map.end(); ++it, ++ntotal) {
  192. if (ntotal >= max_copied) {
  193. return;
  194. }
  195. if (++n >= 256/*max iterated one pass*/) {
  196. SocketMap::PositionHint hint;
  197. _socket_map.save_iterator(it, &hint);
  198. n = 0;
  199. mu.unlock(); // yield
  200. mu.lock();
  201. it = _socket_map.restore_iterator(hint);
  202. if (it == _socket_map.begin()) { // resized
  203. conn_list->clear();
  204. }
  205. if (it == _socket_map.end()) {
  206. break;
  207. }
  208. }
  209. conn_list->push_back(it->first);
  210. }
  211. }
  212. void Acceptor::ListConnections(std::vector<SocketId>* conn_list) {
  213. return ListConnections(conn_list, std::numeric_limits<size_t>::max());
  214. }
  215. void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
  216. while (1) {
  217. struct sockaddr in_addr;
  218. socklen_t in_len = sizeof(in_addr);
  219. butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len));
  220. if (in_fd < 0) {
  221. // no EINTR because listened fd is non-blocking.
  222. if (errno == EAGAIN) {
  223. return;
  224. }
  225. // Do NOT return -1 when `accept' failed, otherwise `_listened_fd'
  226. // will be closed. Continue to consume all the events until EAGAIN
  227. // instead.
  228. // If the accept was failed, the error may repeat constantly,
  229. // limit frequency of logging.
  230. PLOG_EVERY_SECOND(ERROR)
  231. << "Fail to accept from listened_fd=" << acception->fd();
  232. continue;
  233. }
  234. Acceptor* am = dynamic_cast<Acceptor*>(acception->user());
  235. if (NULL == am) {
  236. LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor";
  237. acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor");
  238. return;
  239. }
  240. SocketId socket_id;
  241. SocketOptions options;
  242. options.keytable_pool = am->_keytable_pool;
  243. options.fd = in_fd;
  244. options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
  245. options.user = acception->user();
  246. options.on_edge_triggered_events = InputMessenger::OnNewMessages;
  247. options.initial_ssl_ctx = am->_ssl_ctx;
  248. if (Socket::Create(options, &socket_id) != 0) {
  249. LOG(ERROR) << "Fail to create Socket";
  250. continue;
  251. }
  252. in_fd.release(); // transfer ownership to socket_id
  253. // There's a funny race condition here. After Socket::Create, messages
  254. // from the socket are already handled and a RPC is possibly done
  255. // before the socket is added into _socket_map below. This is found in
  256. // ChannelTest.skip_parallel in test/brpc_channel_unittest.cpp (running
  257. // on machines with few cores) where the _messenger.ConnectionCount()
  258. // may surprisingly be 0 even if the RPC is already done.
  259. SocketUniquePtr sock;
  260. if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
  261. bool is_running = true;
  262. {
  263. BAIDU_SCOPED_LOCK(am->_map_mutex);
  264. is_running = (am->status() == RUNNING);
  265. // Always add this socket into `_socket_map' whether it
  266. // has been `SetFailed' or not, whether `Acceptor' is
  267. // running or not. Otherwise, `Acceptor::BeforeRecycle'
  268. // may be called (inside Socket::OnRecycle) after `Acceptor'
  269. // has been destroyed
  270. am->_socket_map.insert(socket_id, ConnectStatistics());
  271. }
  272. if (!is_running) {
  273. LOG(WARNING) << "Acceptor on fd=" << acception->fd()
  274. << " has been stopped, discard newly created " << *sock;
  275. sock->SetFailed(ELOGOFF, "Acceptor on fd=%d has been stopped, "
  276. "discard newly created %s", acception->fd(),
  277. sock->description().c_str());
  278. return;
  279. }
  280. } // else: The socket has already been destroyed, Don't add its id
  281. // into _socket_map
  282. }
  283. }
  284. void Acceptor::OnNewConnections(Socket* acception) {
  285. int progress = Socket::PROGRESS_INIT;
  286. do {
  287. OnNewConnectionsUntilEAGAIN(acception);
  288. if (acception->Failed()) {
  289. return;
  290. }
  291. } while (acception->MoreReadEvents(&progress));
  292. }
  293. void Acceptor::BeforeRecycle(Socket* sock) {
  294. BAIDU_SCOPED_LOCK(_map_mutex);
  295. if (sock->id() == _acception_id) {
  296. // Set _listened_fd to -1 when acception socket has been recycled
  297. // so that we are ensured no more events will arrive (and `Join'
  298. // will return to its caller)
  299. _listened_fd = -1;
  300. _empty_cond.Broadcast();
  301. return;
  302. }
  303. // If a Socket could not be addressed shortly after its creation, it
  304. // was not added into `_socket_map'.
  305. _socket_map.erase(sock->id());
  306. if (_socket_map.empty()) {
  307. _empty_cond.Broadcast();
  308. }
  309. }
  310. } // namespace brpc