acceptor.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. // Copyright (c) 2014 Baidu, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Authors: Rujie Jiang(jiangrujie@baidu.com)
  15. // Ge,Jun(gejun@baidu.com)
  16. #include <gflags/gflags.h>
  17. #include "butil/fd_guard.h" // fd_guard
  18. #include "butil/fd_utility.h" // make_close_on_exec
  19. #include "butil/time.h" // gettimeofday_us
  20. #include "brpc/acceptor.h"
  21. namespace brpc {
  22. static const int INITIAL_CONNECTION_CAP = 65536;
  23. Acceptor::Acceptor(bthread_keytable_pool_t* pool)
  24. : InputMessenger()
  25. , _keytable_pool(pool)
  26. , _status(UNINITIALIZED)
  27. , _idle_timeout_sec(-1)
  28. , _close_idle_tid(INVALID_BTHREAD)
  29. , _listened_fd(-1)
  30. , _acception_id(0)
  31. , _empty_cond(&_map_mutex)
  32. , _ssl_ctx(NULL) {
  33. }
  34. Acceptor::~Acceptor() {
  35. StopAccept(0);
  36. Join();
  37. }
  38. int Acceptor::StartAccept(
  39. int listened_fd, int idle_timeout_sec, SSL_CTX* ssl_ctx) {
  40. if (listened_fd < 0) {
  41. LOG(FATAL) << "Invalid listened_fd=" << listened_fd;
  42. return -1;
  43. }
  44. BAIDU_SCOPED_LOCK(_map_mutex);
  45. if (_status == UNINITIALIZED) {
  46. if (Initialize() != 0) {
  47. LOG(FATAL) << "Fail to initialize Acceptor";
  48. return -1;
  49. }
  50. _status = READY;
  51. }
  52. if (_status != READY) {
  53. LOG(FATAL) << "Acceptor hasn't stopped yet: status=" << status();
  54. return -1;
  55. }
  56. if (idle_timeout_sec > 0) {
  57. if (bthread_start_background(&_close_idle_tid, NULL,
  58. CloseIdleConnections, this) != 0) {
  59. LOG(FATAL) << "Fail to start bthread";
  60. return -1;
  61. }
  62. }
  63. _idle_timeout_sec = idle_timeout_sec;
  64. _ssl_ctx = ssl_ctx;
  65. // Creation of _acception_id is inside lock so that OnNewConnections
  66. // (which may run immediately) should see sane fields set below.
  67. SocketOptions options;
  68. options.fd = listened_fd;
  69. options.user = this;
  70. options.on_edge_triggered_events = OnNewConnections;
  71. if (Socket::Create(options, &_acception_id) != 0) {
  72. // Close-idle-socket thread will be stopped inside destructor
  73. LOG(FATAL) << "Fail to create _acception_id";
  74. return -1;
  75. }
  76. _listened_fd = listened_fd;
  77. _status = RUNNING;
  78. return 0;
  79. }
  80. void* Acceptor::CloseIdleConnections(void* arg) {
  81. Acceptor* am = static_cast<Acceptor*>(arg);
  82. std::vector<SocketId> checking_fds;
  83. const uint64_t CHECK_INTERVAL_US = 1000000UL;
  84. while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
  85. // TODO: this is not efficient for a lot of connections(>100K)
  86. am->ListConnections(&checking_fds);
  87. for (size_t i = 0; i < checking_fds.size(); ++i) {
  88. SocketUniquePtr s;
  89. if (Socket::Address(checking_fds[i], &s) == 0) {
  90. s->ReleaseReferenceIfIdle(am->_idle_timeout_sec);
  91. }
  92. }
  93. }
  94. return NULL;
  95. }
  96. void Acceptor::StopAccept(int /*closewait_ms*/) {
  97. // Currently `closewait_ms' is useless since we have to wait until
  98. // existing requests are finished. Otherwise, contexts depended by
  99. // the requests may be deleted and invalid.
  100. {
  101. BAIDU_SCOPED_LOCK(_map_mutex);
  102. if (_status != RUNNING) {
  103. return;
  104. }
  105. _status = STOPPING;
  106. }
  107. // Don't set _acception_id to 0 because BeforeRecycle needs it.
  108. Socket::SetFailed(_acception_id);
  109. // SetFailed all existing connections. Connections added after this piece
  110. // of code will be SetFailed directly in OnNewConnectionsUntilEAGAIN
  111. std::vector<SocketId> erasing_ids;
  112. ListConnections(&erasing_ids);
  113. for (size_t i = 0; i < erasing_ids.size(); ++i) {
  114. SocketUniquePtr socket;
  115. if (Socket::Address(erasing_ids[i], &socket) == 0) {
  116. if (socket->shall_fail_me_at_server_stop()) {
  117. // Mainly streaming connections, should be SetFailed() to
  118. // trigger callbacks to NotifyOnFailed() to remove references,
  119. // otherwise the sockets are often referenced by corresponding
  120. // objects and delay server's stopping which requires all
  121. // existing sockets to be recycled.
  122. socket->SetFailed(ELOGOFF, "Server is stopping");
  123. } else {
  124. // Message-oriented RPC connections. Just release the addtional
  125. // reference in the socket, which will be recycled when current
  126. // requests have been processed.
  127. socket->ReleaseAdditionalReference();
  128. }
  129. } // else: This socket already called `SetFailed' before
  130. }
  131. }
  132. int Acceptor::Initialize() {
  133. if (_socket_map.init(INITIAL_CONNECTION_CAP) != 0) {
  134. LOG(FATAL) << "Fail to initialize FlatMap, size="
  135. << INITIAL_CONNECTION_CAP;
  136. return -1;
  137. }
  138. return 0;
  139. }
  140. // NOTE: Join() can happen before StopAccept()
  141. void Acceptor::Join() {
  142. std::unique_lock<butil::Mutex> mu(_map_mutex);
  143. if (_status != STOPPING && _status != RUNNING) { // no need to join.
  144. return;
  145. }
  146. // `_listened_fd' will be set to -1 once it has been recycled
  147. while (_listened_fd > 0 || !_socket_map.empty()) {
  148. _empty_cond.Wait();
  149. }
  150. const int saved_idle_timeout_sec = _idle_timeout_sec;
  151. _idle_timeout_sec = 0;
  152. const bthread_t saved_close_idle_tid = _close_idle_tid;
  153. mu.unlock();
  154. // Join the bthread outside lock.
  155. if (saved_idle_timeout_sec > 0) {
  156. bthread_stop(saved_close_idle_tid);
  157. bthread_join(saved_close_idle_tid, NULL);
  158. }
  159. {
  160. BAIDU_SCOPED_LOCK(_map_mutex);
  161. _status = READY;
  162. }
  163. }
  164. size_t Acceptor::ConnectionCount() const {
  165. // Notice that _socket_map may be modified concurrently. This actually
  166. // assumes that size() is safe to call concurrently.
  167. return _socket_map.size();
  168. }
  169. void Acceptor::ListConnections(std::vector<SocketId>* conn_list,
  170. size_t max_copied) {
  171. if (conn_list == NULL) {
  172. LOG(FATAL) << "Param[conn_list] is NULL";
  173. return;
  174. }
  175. conn_list->clear();
  176. // Add additional 10(randomly small number) so that even if
  177. // ConnectionCount is inaccurate, enough space is reserved
  178. conn_list->reserve(ConnectionCount() + 10);
  179. std::unique_lock<butil::Mutex> mu(_map_mutex);
  180. if (!_socket_map.initialized()) {
  181. // Optional. Uninitialized FlatMap should be iteratable.
  182. return;
  183. }
  184. // Copy all the SocketId (protected by mutex) into a temporary
  185. // container to avoid dealing with sockets inside the mutex.
  186. size_t ntotal = 0;
  187. size_t n = 0;
  188. for (SocketMap::const_iterator it = _socket_map.begin();
  189. it != _socket_map.end(); ++it, ++ntotal) {
  190. if (ntotal >= max_copied) {
  191. return;
  192. }
  193. if (++n >= 256/*max iterated one pass*/) {
  194. SocketMap::PositionHint hint;
  195. _socket_map.save_iterator(it, &hint);
  196. n = 0;
  197. mu.unlock(); // yield
  198. mu.lock();
  199. it = _socket_map.restore_iterator(hint);
  200. if (it == _socket_map.begin()) { // resized
  201. conn_list->clear();
  202. }
  203. if (it == _socket_map.end()) {
  204. break;
  205. }
  206. }
  207. conn_list->push_back(it->first);
  208. }
  209. }
  210. void Acceptor::ListConnections(std::vector<SocketId>* conn_list) {
  211. return ListConnections(conn_list, std::numeric_limits<size_t>::max());
  212. }
  213. void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
  214. while (1) {
  215. struct sockaddr in_addr;
  216. socklen_t in_len = sizeof(in_addr);
  217. butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len));
  218. if (in_fd < 0) {
  219. // no EINTR because listened fd is non-blocking.
  220. if (errno == EAGAIN) {
  221. return;
  222. }
  223. // Do NOT return -1 when `accept' failed, otherwise `_listened_fd'
  224. // will be closed. Continue to consume all the events until EAGAIN
  225. // instead.
  226. // If the accept was failed, the error may repeat constantly,
  227. // limit frequency of logging.
  228. PLOG_EVERY_SECOND(ERROR)
  229. << "Fail to accept from listened_fd=" << acception->fd();
  230. continue;
  231. }
  232. Acceptor* am = dynamic_cast<Acceptor*>(acception->user());
  233. if (NULL == am) {
  234. LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor";
  235. acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor");
  236. return;
  237. }
  238. SocketId socket_id;
  239. SocketOptions options;
  240. options.keytable_pool = am->_keytable_pool;
  241. options.fd = in_fd;
  242. options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
  243. options.user = acception->user();
  244. options.on_edge_triggered_events = InputMessenger::OnNewMessages;
  245. options.ssl_ctx = am->_ssl_ctx;
  246. if (Socket::Create(options, &socket_id) != 0) {
  247. LOG(ERROR) << "Fail to create Socket";
  248. continue;
  249. }
  250. in_fd.release(); // transfer ownership to socket_id
  251. SocketUniquePtr sock;
  252. if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
  253. bool is_running = true;
  254. {
  255. BAIDU_SCOPED_LOCK(am->_map_mutex);
  256. is_running = (am->status() == RUNNING);
  257. // Always add this socket into `_socket_map' whether it
  258. // has been `SetFailed' or not, whether `Acceptor' is
  259. // running or not. Otherwise, `Acceptor::BeforeRecycle'
  260. // may be called (inside Socket::OnRecycle) after `Acceptor'
  261. // has been destroyed
  262. am->_socket_map.insert(socket_id, ConnectStatistics());
  263. }
  264. if (!is_running) {
  265. LOG(WARNING) << "Acceptor already stopped, discard "
  266. << "new connection, SocketId=" << socket_id;
  267. sock->SetFailed(ELOGOFF, "Acceptor already stopped, discard "
  268. "new connection, SocketId=%" PRIu64, socket_id);
  269. return;
  270. }
  271. } // else: The socket has already been destroyed, Don't add its id
  272. // into _socket_map
  273. }
  274. }
  275. void Acceptor::OnNewConnections(Socket* acception) {
  276. int progress = Socket::PROGRESS_INIT;
  277. do {
  278. OnNewConnectionsUntilEAGAIN(acception);
  279. if (acception->Failed()) {
  280. return;
  281. }
  282. } while (acception->MoreReadEvents(&progress));
  283. }
  284. void Acceptor::BeforeRecycle(Socket* sock) {
  285. BAIDU_SCOPED_LOCK(_map_mutex);
  286. if (sock->id() == _acception_id) {
  287. // Set _listened_fd to -1 when acception socket has been recycled
  288. // so that we are ensured no more events will arrive (and `Join'
  289. // will return to its caller)
  290. _listened_fd = -1;
  291. _empty_cond.Broadcast();
  292. return;
  293. }
  294. // If a Socket could not be addressed shortly after its creation, it
  295. // was not added into `_socket_map'.
  296. _socket_map.erase(sock->id());
  297. if (_socket_map.empty()) {
  298. _empty_cond.Broadcast();
  299. }
  300. }
  301. } // namespace brpc