socket_inl.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // This file contains inlined implementation of socket.h
  18. #ifndef BRPC_SOCKET_INL_H
  19. #define BRPC_SOCKET_INL_H
  20. namespace brpc {
  21. // Utility functions to combine and extract SocketId.
  22. BUTIL_FORCE_INLINE SocketId
  23. MakeSocketId(uint32_t version, butil::ResourceId<Socket> slot) {
  24. return SocketId((((uint64_t)version) << 32) | slot.value);
  25. }
  26. BUTIL_FORCE_INLINE butil::ResourceId<Socket> SlotOfSocketId(SocketId sid) {
  27. butil::ResourceId<Socket> id = { (sid & 0xFFFFFFFFul) };
  28. return id;
  29. }
  30. BUTIL_FORCE_INLINE uint32_t VersionOfSocketId(SocketId sid) {
  31. return (uint32_t)(sid >> 32);
  32. }
  33. // Utility functions to combine and extract Socket::_versioned_ref
  34. BUTIL_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) {
  35. return (uint32_t)(vref >> 32);
  36. }
  37. BUTIL_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) {
  38. return (int32_t)(vref & 0xFFFFFFFFul);
  39. }
  40. BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) {
  41. // 1: Intended conversion to uint32_t, nref=-1 is 00000000FFFFFFFF
  42. return (((uint64_t)version) << 32) | (uint32_t/*1*/)nref;
  43. }
  44. inline SocketOptions::SocketOptions()
  45. : fd(-1)
  46. , user(NULL)
  47. , on_edge_triggered_events(NULL)
  48. , health_check_interval_s(-1)
  49. , keytable_pool(NULL)
  50. , conn(NULL)
  51. , app_connect(NULL)
  52. , initial_parsing_context(NULL)
  53. {}
  54. inline int Socket::Dereference() {
  55. const SocketId id = _this_id;
  56. const uint64_t vref = _versioned_ref.fetch_sub(
  57. 1, butil::memory_order_release);
  58. const int32_t nref = NRefOfVRef(vref);
  59. if (nref > 1) {
  60. return 0;
  61. }
  62. if (__builtin_expect(nref == 1, 1)) {
  63. const uint32_t ver = VersionOfVRef(vref);
  64. const uint32_t id_ver = VersionOfSocketId(id);
  65. // Besides first successful SetFailed() adds 1 to version, one of
  66. // those dereferencing nref from 1->0 adds another 1 to version.
  67. // Notice "one of those": The wait-free Address() may make ref of a
  68. // version-unmatched slot change from 1 to 0 for mutiple times, we
  69. // have to use version as a guard variable to prevent returning the
  70. // Socket to pool more than once.
  71. //
  72. // Note: `ver == id_ver' means this socket has been `SetRecycle'
  73. // before rather than `SetFailed'; `ver == ide_ver+1' means we
  74. // had `SetFailed' this socket before. We should destroy the
  75. // socket under both situation
  76. if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) {
  77. // sees nref:1->0, try to set version=id_ver+2,--nref.
  78. // No retry: if version changes, the slot is already returned by
  79. // another one who sees nref:1->0 concurrently; if nref changes,
  80. // which must be non-zero, the slot will be returned when
  81. // nref changes from 1->0 again.
  82. // Example:
  83. // SetFailed(): --nref, sees nref:1->0 (1)
  84. // try to set version=id_ver+2 (2)
  85. // Address(): ++nref, unmatched version (3)
  86. // --nref, sees nref:1->0 (4)
  87. // try to set version=id_ver+2 (5)
  88. // 1,2,3,4,5 or 1,3,4,2,5:
  89. // SetFailed() succeeds, Address() fails at (5).
  90. // 1,3,2,4,5: SetFailed() fails with (2), the slot will be
  91. // returned by (5) of Address()
  92. // 1,3,4,5,2: SetFailed() fails with (2), the slot is already
  93. // returned by (5) of Address().
  94. uint64_t expected_vref = vref - 1;
  95. if (_versioned_ref.compare_exchange_strong(
  96. expected_vref, MakeVRef(id_ver + 2, 0),
  97. butil::memory_order_acquire,
  98. butil::memory_order_relaxed)) {
  99. OnRecycle();
  100. return_resource(SlotOfSocketId(id));
  101. return 1;
  102. }
  103. return 0;
  104. }
  105. LOG(FATAL) << "Invalid SocketId=" << id;
  106. return -1;
  107. }
  108. LOG(FATAL) << "Over dereferenced SocketId=" << id;
  109. return -1;
  110. }
  111. inline int Socket::Address(SocketId id, SocketUniquePtr* ptr) {
  112. const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
  113. Socket* const m = address_resource(slot);
  114. if (__builtin_expect(m != NULL, 1)) {
  115. // acquire fence makes sure this thread sees latest changes before
  116. // Dereference() or Revive().
  117. const uint64_t vref1 = m->_versioned_ref.fetch_add(
  118. 1, butil::memory_order_acquire);
  119. const uint32_t ver1 = VersionOfVRef(vref1);
  120. if (ver1 == VersionOfSocketId(id)) {
  121. ptr->reset(m);
  122. return 0;
  123. }
  124. const uint64_t vref2 = m->_versioned_ref.fetch_sub(
  125. 1, butil::memory_order_release);
  126. const int32_t nref = NRefOfVRef(vref2);
  127. if (nref > 1) {
  128. return -1;
  129. } else if (__builtin_expect(nref == 1, 1)) {
  130. const uint32_t ver2 = VersionOfVRef(vref2);
  131. if ((ver2 & 1)) {
  132. if (ver1 == ver2 || ver1 + 1 == ver2) {
  133. uint64_t expected_vref = vref2 - 1;
  134. if (m->_versioned_ref.compare_exchange_strong(
  135. expected_vref, MakeVRef(ver2 + 1, 0),
  136. butil::memory_order_acquire,
  137. butil::memory_order_relaxed)) {
  138. m->OnRecycle();
  139. return_resource(SlotOfSocketId(id));
  140. }
  141. } else {
  142. CHECK(false) << "ref-version=" << ver1
  143. << " unref-version=" << ver2;
  144. }
  145. } else {
  146. CHECK_EQ(ver1, ver2);
  147. // Addressed a free slot.
  148. }
  149. } else {
  150. CHECK(false) << "Over dereferenced SocketId=" << id;
  151. }
  152. }
  153. return -1;
  154. }
  155. inline void Socket::ReAddress(SocketUniquePtr* ptr) {
  156. _versioned_ref.fetch_add(1, butil::memory_order_acquire);
  157. ptr->reset(this);
  158. }
  159. inline int Socket::AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr) {
  160. const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
  161. Socket* const m = address_resource(slot);
  162. if (__builtin_expect(m != NULL, 1)) {
  163. const uint64_t vref1 = m->_versioned_ref.fetch_add(
  164. 1, butil::memory_order_acquire);
  165. const uint32_t ver1 = VersionOfVRef(vref1);
  166. if (ver1 == VersionOfSocketId(id)) {
  167. ptr->reset(m);
  168. return 0;
  169. }
  170. if (ver1 == VersionOfSocketId(id) + 1) {
  171. ptr->reset(m);
  172. return 1;
  173. }
  174. const uint64_t vref2 = m->_versioned_ref.fetch_sub(
  175. 1, butil::memory_order_release);
  176. const int32_t nref = NRefOfVRef(vref2);
  177. if (nref > 1) {
  178. return -1;
  179. } else if (__builtin_expect(nref == 1, 1)) {
  180. const uint32_t ver2 = VersionOfVRef(vref2);
  181. if ((ver2 & 1)) {
  182. if (ver1 == ver2 || ver1 + 1 == ver2) {
  183. uint64_t expected_vref = vref2 - 1;
  184. if (m->_versioned_ref.compare_exchange_strong(
  185. expected_vref, MakeVRef(ver2 + 1, 0),
  186. butil::memory_order_acquire,
  187. butil::memory_order_relaxed)) {
  188. m->OnRecycle();
  189. return_resource(slot);
  190. }
  191. } else {
  192. CHECK(false) << "ref-version=" << ver1
  193. << " unref-version=" << ver2;
  194. }
  195. } else {
  196. // Addressed a free slot.
  197. }
  198. } else {
  199. CHECK(false) << "Over dereferenced SocketId=" << id;
  200. }
  201. }
  202. return -1;
  203. }
  204. inline bool Socket::Failed() const {
  205. return VersionOfVRef(_versioned_ref.load(butil::memory_order_relaxed))
  206. != VersionOfSocketId(_this_id);
  207. }
  208. inline bool Socket::MoreReadEvents(int* progress) {
  209. // Fail to CAS means that new events arrived.
  210. return !_nevent.compare_exchange_strong(
  211. *progress, 0, butil::memory_order_release,
  212. butil::memory_order_acquire);
  213. }
  214. inline void Socket::SetLogOff() {
  215. if (!_logoff_flag.exchange(true, butil::memory_order_relaxed)) {
  216. if (fd() < 0) {
  217. // This socket hasn't been connected before (such as
  218. // short connection), so it won't receive any epoll
  219. // events. We need to `SetFailed' it to trigger health
  220. // checking, otherwise it may be blocked forever
  221. SetFailed(ELOGOFF, "The server at %s is stopping",
  222. butil::endpoint2str(remote_side()).c_str());
  223. }
  224. }
  225. }
  226. inline bool Socket::IsAvailable() const {
  227. return !_logoff_flag.load(butil::memory_order_relaxed) &&
  228. (_ninflight_app_health_check.load(butil::memory_order_relaxed) == 0);
  229. }
  230. static const uint32_t EOF_FLAG = (1 << 31);
  231. inline void Socket::PostponeEOF() {
  232. if (CreatedByConnect()) { // not needed at server-side
  233. _ninprocess.fetch_add(1, butil::memory_order_relaxed);
  234. }
  235. }
  236. inline void Socket::CheckEOF() {
  237. if (CreatedByConnect()) { // not needed at server-side
  238. CheckEOFInternal();
  239. }
  240. }
  241. inline void Socket::CheckEOFInternal() {
  242. uint32_t nref = _ninprocess.fetch_sub(1, butil::memory_order_release);
  243. if ((nref & ~EOF_FLAG) == 1) {
  244. butil::atomic_thread_fence(butil::memory_order_acquire);
  245. // It's safe to call `SetFailed' each time `_ninprocess' hits 0
  246. SetFailed(EEOF, "Got EOF of %s", description().c_str());
  247. }
  248. }
  249. inline void Socket::SetEOF() {
  250. uint32_t nref = _ninprocess.fetch_or(EOF_FLAG, butil::memory_order_relaxed);
  251. if ((nref & EOF_FLAG) == 0) {
  252. // Release the additional reference in `_ninprocess'
  253. CheckEOFInternal();
  254. }
  255. }
  256. inline void Socket::reset_parsing_context(Destroyable* new_context) {
  257. Destroyable* old_ctx = _parsing_context.exchange(
  258. new_context, butil::memory_order_acq_rel);
  259. if (old_ctx) {
  260. old_ctx->Destroy();
  261. }
  262. }
  263. inline Destroyable* Socket::release_parsing_context() {
  264. return _parsing_context.exchange(NULL, butil::memory_order_acquire);
  265. }
  266. template <typename T>
  267. bool Socket::initialize_parsing_context(T** ctx) {
  268. Destroyable* expected = NULL;
  269. if (_parsing_context.compare_exchange_strong(
  270. expected, *ctx, butil::memory_order_acq_rel,
  271. butil::memory_order_acquire)) {
  272. return true;
  273. } else {
  274. (*ctx)->Destroy();
  275. *ctx = static_cast<T*>(expected);
  276. return false;
  277. }
  278. }
  279. // NOTE: Push/Pop may be called from different threads simultaneously.
  280. inline void Socket::PushPipelinedInfo(const PipelinedInfo& pi) {
  281. BAIDU_SCOPED_LOCK(_pipeline_mutex);
  282. if (_pipeline_q == NULL) {
  283. _pipeline_q = new std::deque<PipelinedInfo>;
  284. }
  285. _pipeline_q->push_back(pi);
  286. }
  287. inline bool Socket::PopPipelinedInfo(PipelinedInfo* info) {
  288. BAIDU_SCOPED_LOCK(_pipeline_mutex);
  289. if (_pipeline_q != NULL && !_pipeline_q->empty()) {
  290. *info = _pipeline_q->front();
  291. _pipeline_q->pop_front();
  292. return true;
  293. }
  294. return false;
  295. }
  296. inline void Socket::GivebackPipelinedInfo(const PipelinedInfo& pi) {
  297. BAIDU_SCOPED_LOCK(_pipeline_mutex);
  298. if (_pipeline_q != NULL) {
  299. _pipeline_q->push_front(pi);
  300. }
  301. }
  302. inline bool Socket::ValidFileDescriptor(int fd) {
  303. return fd >= 0 && fd != STREAM_FAKE_FD;
  304. }
  305. inline Socket::SharedPart* Socket::GetSharedPart() const {
  306. return _shared_part.load(butil::memory_order_consume);
  307. }
  308. inline Socket::SharedPart* Socket::GetOrNewSharedPart() {
  309. SharedPart* shared_part = GetSharedPart();
  310. if (shared_part != NULL) { // most cases
  311. return shared_part;
  312. }
  313. return GetOrNewSharedPartSlower();
  314. }
  315. } // namespace brpc
  316. #endif // BRPC_SOCKET_INL_H