1
0

socket_map.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <gflags/gflags.h>
  18. #include <map>
  19. #include "bthread/bthread.h"
  20. #include "butil/time.h"
  21. #include "butil/scoped_lock.h"
  22. #include "butil/logging.h"
  23. #include "brpc/log.h"
  24. #include "brpc/protocol.h"
  25. #include "brpc/input_messenger.h"
  26. #include "brpc/reloadable_flags.h"
  27. #include "brpc/socket_map.h"
  28. namespace brpc {
  29. DEFINE_int32(health_check_interval, 3,
  30. "seconds between consecutive health-checkings");
  31. // NOTE: Must be limited to positive to guarantee correctness of SocketMapRemove.
  32. BRPC_VALIDATE_GFLAG(health_check_interval, PositiveInteger);
  33. DEFINE_int32(idle_timeout_second, 10,
  34. "Pooled connections without data transmission for so many "
  35. "seconds will be closed. No effect for non-positive values");
  36. BRPC_VALIDATE_GFLAG(idle_timeout_second, PassValidate);
  37. DEFINE_int32(defer_close_second, 0,
  38. "Defer close of connections for so many seconds even if the"
  39. " connection is not used by anyone. Close immediately for "
  40. "non-positive values.");
  41. BRPC_VALIDATE_GFLAG(defer_close_second, PassValidate);
  42. DEFINE_bool(show_socketmap_in_vars, false,
  43. "[DEBUG] Describe SocketMaps in /vars");
  44. BRPC_VALIDATE_GFLAG(show_socketmap_in_vars, PassValidate);
  45. static pthread_once_t g_socket_map_init = PTHREAD_ONCE_INIT;
  46. static butil::static_atomic<SocketMap*> g_socket_map = BUTIL_STATIC_ATOMIC_INIT(NULL);
  47. class GlobalSocketCreator : public SocketCreator {
  48. public:
  49. int CreateSocket(const SocketOptions& opt, SocketId* id) {
  50. SocketOptions sock_opt = opt;
  51. sock_opt.health_check_interval_s = FLAGS_health_check_interval;
  52. return get_client_side_messenger()->Create(sock_opt, id);
  53. }
  54. };
  55. static void CreateClientSideSocketMap() {
  56. SocketMap* socket_map = new SocketMap;
  57. SocketMapOptions options;
  58. options.socket_creator = new GlobalSocketCreator;
  59. options.idle_timeout_second_dynamic = &FLAGS_idle_timeout_second;
  60. options.defer_close_second_dynamic = &FLAGS_defer_close_second;
  61. if (socket_map->Init(options) != 0) {
  62. LOG(FATAL) << "Fail to init SocketMap";
  63. exit(1);
  64. }
  65. g_socket_map.store(socket_map, butil::memory_order_release);
  66. }
  67. SocketMap* get_client_side_socket_map() {
  68. // The consume fence makes sure that we see a NULL or a fully initialized
  69. // SocketMap.
  70. return g_socket_map.load(butil::memory_order_consume);
  71. }
  72. SocketMap* get_or_new_client_side_socket_map() {
  73. get_or_new_client_side_messenger();
  74. pthread_once(&g_socket_map_init, CreateClientSideSocketMap);
  75. return g_socket_map.load(butil::memory_order_consume);
  76. }
  77. int SocketMapInsert(const SocketMapKey& key, SocketId* id,
  78. const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
  79. return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx);
  80. }
  81. int SocketMapFind(const SocketMapKey& key, SocketId* id) {
  82. SocketMap* m = get_client_side_socket_map();
  83. if (m) {
  84. return m->Find(key, id);
  85. }
  86. return -1;
  87. }
  88. void SocketMapRemove(const SocketMapKey& key) {
  89. SocketMap* m = get_client_side_socket_map();
  90. if (m) {
  91. // TODO: We don't have expected_id to pass right now since the callsite
  92. // at NamingServiceThread is hard to be fixed right now. As long as
  93. // FLAGS_health_check_interval is limited to positive, SocketMapInsert
  94. // never replaces the sockets, skipping comparison is still right.
  95. m->Remove(key, INVALID_SOCKET_ID);
  96. }
  97. }
  98. void SocketMapList(std::vector<SocketId>* ids) {
  99. SocketMap* m = get_client_side_socket_map();
  100. if (m) {
  101. m->List(ids);
  102. } else {
  103. ids->clear();
  104. }
  105. }
  106. // ========== SocketMap impl. ============
  107. SocketMapOptions::SocketMapOptions()
  108. : socket_creator(NULL)
  109. , suggested_map_size(1024)
  110. , idle_timeout_second_dynamic(NULL)
  111. , idle_timeout_second(0)
  112. , defer_close_second_dynamic(NULL)
  113. , defer_close_second(0) {
  114. }
  115. SocketMap::SocketMap()
  116. : _exposed_in_bvar(false)
  117. , _this_map_bvar(NULL)
  118. , _has_close_idle_thread(false) {
  119. }
  120. SocketMap::~SocketMap() {
  121. RPC_VLOG << "Destroying SocketMap=" << this;
  122. if (_has_close_idle_thread) {
  123. bthread_stop(_close_idle_thread);
  124. bthread_join(_close_idle_thread, NULL);
  125. }
  126. if (!_map.empty()) {
  127. std::ostringstream err;
  128. int nleft = 0;
  129. for (Map::iterator it = _map.begin(); it != _map.end(); ++it) {
  130. SingleConnection* sc = &it->second;
  131. if ((!sc->socket->Failed() ||
  132. sc->socket->health_check_interval() > 0/*HC enabled*/) &&
  133. sc->ref_count != 0) {
  134. ++nleft;
  135. if (nleft == 0) {
  136. err << "Left in SocketMap(" << this << "):";
  137. }
  138. err << ' ' << *sc->socket;
  139. }
  140. }
  141. if (nleft) {
  142. LOG(ERROR) << err.str();
  143. }
  144. }
  145. delete _this_map_bvar;
  146. _this_map_bvar = NULL;
  147. delete _options.socket_creator;
  148. _options.socket_creator = NULL;
  149. }
  150. int SocketMap::Init(const SocketMapOptions& options) {
  151. if (_options.socket_creator != NULL) {
  152. LOG(ERROR) << "Already initialized";
  153. return -1;
  154. }
  155. _options = options;
  156. if (_options.socket_creator == NULL) {
  157. LOG(ERROR) << "SocketOptions.socket_creator must be set";
  158. return -1;
  159. }
  160. if (_map.init(_options.suggested_map_size, 70) != 0) {
  161. LOG(ERROR) << "Fail to init _map";
  162. return -1;
  163. }
  164. if (_options.idle_timeout_second_dynamic != NULL ||
  165. _options.idle_timeout_second > 0) {
  166. if (bthread_start_background(&_close_idle_thread, NULL,
  167. RunWatchConnections, this) != 0) {
  168. LOG(FATAL) << "Fail to start bthread";
  169. return -1;
  170. }
  171. _has_close_idle_thread = true;
  172. }
  173. return 0;
  174. }
  175. void SocketMap::Print(std::ostream& os) {
  176. // TODO: Elaborate.
  177. size_t count = 0;
  178. {
  179. std::unique_lock<butil::Mutex> mu(_mutex);
  180. count = _map.size();
  181. }
  182. os << "count=" << count;
  183. }
  184. void SocketMap::PrintSocketMap(std::ostream& os, void* arg) {
  185. static_cast<SocketMap*>(arg)->Print(os);
  186. }
  187. int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
  188. const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
  189. std::unique_lock<butil::Mutex> mu(_mutex);
  190. SingleConnection* sc = _map.seek(key);
  191. if (sc) {
  192. if (!sc->socket->Failed() ||
  193. sc->socket->health_check_interval() > 0/*HC enabled*/) {
  194. ++sc->ref_count;
  195. *id = sc->socket->id();
  196. return 0;
  197. }
  198. // A socket w/o HC is failed (permanently), replace it.
  199. SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion.
  200. _map.erase(key); // in principle, we can override the entry in map w/o
  201. // removing and inserting it again. But this would make error branches
  202. // below have to remove the entry before returning, which is
  203. // error-prone. We prefer code maintainability here.
  204. sc = NULL;
  205. }
  206. SocketId tmp_id;
  207. SocketOptions opt;
  208. opt.remote_side = key.peer.addr;
  209. opt.initial_ssl_ctx = ssl_ctx;
  210. if (_options.socket_creator->CreateSocket(opt, &tmp_id) != 0) {
  211. PLOG(FATAL) << "Fail to create socket to " << key.peer;
  212. return -1;
  213. }
  214. // Add a reference to make sure that sc->socket is always accessible. Not
  215. // use SocketUniquePtr which cannot put into containers before c++11.
  216. // The ref will be removed at entry's removal.
  217. SocketUniquePtr ptr;
  218. if (Socket::Address(tmp_id, &ptr) != 0) {
  219. LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
  220. return -1;
  221. }
  222. SingleConnection new_sc = { 1, ptr.release(), 0 };
  223. _map[key] = new_sc;
  224. *id = tmp_id;
  225. bool need_to_create_bvar = false;
  226. if (FLAGS_show_socketmap_in_vars && !_exposed_in_bvar) {
  227. _exposed_in_bvar = true;
  228. need_to_create_bvar = true;
  229. }
  230. mu.unlock();
  231. if (need_to_create_bvar) {
  232. char namebuf[32];
  233. int len = snprintf(namebuf, sizeof(namebuf), "rpc_socketmap_%p", this);
  234. _this_map_bvar = new bvar::PassiveStatus<std::string>(
  235. butil::StringPiece(namebuf, len), PrintSocketMap, this);
  236. }
  237. return 0;
  238. }
  239. void SocketMap::Remove(const SocketMapKey& key, SocketId expected_id) {
  240. return RemoveInternal(key, expected_id, false);
  241. }
  242. void SocketMap::RemoveInternal(const SocketMapKey& key,
  243. SocketId expected_id,
  244. bool remove_orphan) {
  245. std::unique_lock<butil::Mutex> mu(_mutex);
  246. SingleConnection* sc = _map.seek(key);
  247. if (!sc) {
  248. return;
  249. }
  250. if (!remove_orphan &&
  251. (expected_id == INVALID_SOCKET_ID || expected_id == sc->socket->id())) {
  252. --sc->ref_count;
  253. }
  254. if (sc->ref_count == 0) {
  255. // NOTE: save the gflag which may be reloaded at any time
  256. const int defer_close_second = _options.defer_close_second_dynamic ?
  257. *_options.defer_close_second_dynamic
  258. : _options.defer_close_second;
  259. if (!remove_orphan && defer_close_second > 0) {
  260. // Start count down on this Socket
  261. sc->no_ref_us = butil::cpuwide_time_us();
  262. } else {
  263. Socket* const s = sc->socket;
  264. _map.erase(key);
  265. bool need_to_create_bvar = false;
  266. if (FLAGS_show_socketmap_in_vars && !_exposed_in_bvar) {
  267. _exposed_in_bvar = true;
  268. need_to_create_bvar = true;
  269. }
  270. mu.unlock();
  271. if (need_to_create_bvar) {
  272. char namebuf[32];
  273. int len = snprintf(namebuf, sizeof(namebuf), "rpc_socketmap_%p", this);
  274. _this_map_bvar = new bvar::PassiveStatus<std::string>(
  275. butil::StringPiece(namebuf, len), PrintSocketMap, this);
  276. }
  277. s->ReleaseAdditionalReference(); // release extra ref
  278. SocketUniquePtr ptr(s); // Dereference
  279. }
  280. }
  281. }
  282. int SocketMap::Find(const SocketMapKey& key, SocketId* id) {
  283. BAIDU_SCOPED_LOCK(_mutex);
  284. SingleConnection* sc = _map.seek(key);
  285. if (sc) {
  286. *id = sc->socket->id();
  287. return 0;
  288. }
  289. return -1;
  290. }
  291. void SocketMap::List(std::vector<SocketId>* ids) {
  292. ids->clear();
  293. BAIDU_SCOPED_LOCK(_mutex);
  294. for (Map::iterator it = _map.begin(); it != _map.end(); ++it) {
  295. ids->push_back(it->second.socket->id());
  296. }
  297. }
  298. void SocketMap::List(std::vector<butil::EndPoint>* pts) {
  299. pts->clear();
  300. BAIDU_SCOPED_LOCK(_mutex);
  301. for (Map::iterator it = _map.begin(); it != _map.end(); ++it) {
  302. pts->push_back(it->second.socket->remote_side());
  303. }
  304. }
  305. void SocketMap::ListOrphans(int64_t defer_us, std::vector<SocketMapKey>* out) {
  306. out->clear();
  307. const int64_t now = butil::cpuwide_time_us();
  308. BAIDU_SCOPED_LOCK(_mutex);
  309. for (Map::iterator it = _map.begin(); it != _map.end(); ++it) {
  310. SingleConnection& sc = it->second;
  311. if (sc.ref_count == 0 && now - sc.no_ref_us >= defer_us) {
  312. out->push_back(it->first);
  313. }
  314. }
  315. }
  316. void* SocketMap::RunWatchConnections(void* arg) {
  317. static_cast<SocketMap*>(arg)->WatchConnections();
  318. return NULL;
  319. }
  320. void SocketMap::WatchConnections() {
  321. std::vector<SocketId> main_sockets;
  322. std::vector<SocketId> pooled_sockets;
  323. std::vector<SocketMapKey> orphan_sockets;
  324. const uint64_t CHECK_INTERVAL_US = 1000000UL;
  325. while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
  326. // NOTE: save the gflag which may be reloaded at any time.
  327. const int idle_seconds = _options.idle_timeout_second_dynamic ?
  328. *_options.idle_timeout_second_dynamic
  329. : _options.idle_timeout_second;
  330. if (idle_seconds > 0) {
  331. // Check idle pooled connections
  332. List(&main_sockets);
  333. for (size_t i = 0; i < main_sockets.size(); ++i) {
  334. SocketUniquePtr s;
  335. if (Socket::Address(main_sockets[i], &s) == 0) {
  336. s->ListPooledSockets(&pooled_sockets);
  337. for (size_t i = 0; i < pooled_sockets.size(); ++i) {
  338. SocketUniquePtr s2;
  339. if (Socket::Address(pooled_sockets[i], &s2) == 0) {
  340. s2->ReleaseReferenceIfIdle(idle_seconds);
  341. }
  342. }
  343. }
  344. }
  345. }
  346. // Check connections without Channel. This works when `defer_seconds'
  347. // <= 0, in which case orphan connections will be closed immediately
  348. // NOTE: save the gflag which may be reloaded at any time
  349. const int defer_seconds = _options.defer_close_second_dynamic ?
  350. *_options.defer_close_second_dynamic :
  351. _options.defer_close_second;
  352. ListOrphans(defer_seconds * 1000000L, &orphan_sockets);
  353. for (size_t i = 0; i < orphan_sockets.size(); ++i) {
  354. RemoveInternal(orphan_sockets[i], INVALID_SOCKET_ID, true);
  355. }
  356. }
  357. }
  358. } // namespace brpc