socket.h 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #ifndef BRPC_SOCKET_H
  18. #define BRPC_SOCKET_H
  19. #include <iostream> // std::ostream
  20. #include <deque> // std::deque
  21. #include <set> // std::set
  22. #include "butil/atomicops.h" // butil::atomic
  23. #include "bthread/types.h" // bthread_id_t
  24. #include "butil/iobuf.h" // butil::IOBuf, IOPortal
  25. #include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN
  26. #include "butil/endpoint.h" // butil::EndPoint
  27. #include "butil/resource_pool.h" // butil::ResourceId
  28. #include "bthread/butex.h" // butex_create_checked
  29. #include "brpc/authenticator.h" // Authenticator
  30. #include "brpc/errno.pb.h" // EFAILEDSOCKET
  31. #include "brpc/details/ssl_helper.h" // SSLState
  32. #include "brpc/stream.h" // StreamId
  33. #include "brpc/destroyable.h" // Destroyable
  34. #include "brpc/options.pb.h" // ConnectionType
  35. #include "brpc/socket_id.h" // SocketId
  36. #include "brpc/socket_message.h" // SocketMessagePtr
  37. #include "bvar/bvar.h"
  38. namespace brpc {
  39. namespace policy {
  40. class ConsistentHashingLoadBalancer;
  41. class RtmpContext;
  42. class H2GlobalStreamCreator;
  43. } // namespace policy
  44. namespace schan {
  45. class ChannelBalancer;
  46. }
  47. class Socket;
  48. class AuthContext;
  49. class EventDispatcher;
  50. class Stream;
  51. // A special closure for processing the about-to-recycle socket. Socket does
  52. // not delete SocketUser, if you want, `delete this' at the end of
  53. // BeforeRecycle().
  54. class SocketUser {
  55. public:
  56. virtual ~SocketUser() {}
  57. virtual void BeforeRecycle(Socket*) {};
  58. // Will be periodically called in a dedicated thread to check the
  59. // health.
  60. // If the return value is 0, the socket is revived.
  61. // If the return value is ESTOP, the health-checking thread quits.
  62. // The default impl is testing health by connection.
  63. virtual int CheckHealth(Socket*);
  64. // Called after revived.
  65. virtual void AfterRevived(Socket*);
  66. };
  67. // TODO: Remove this class which is replace-able with SocketMessage
  68. // A special closure for handling fd related connection. The Socket does
  69. // not delete SocketConnection, if you want, `delete this' at the end of
  70. // BeforeRecycle().
  71. class SocketConnection {
  72. public:
  73. virtual ~SocketConnection() {}
  74. virtual void BeforeRecycle(Socket*) = 0;
  75. // Establish a connection, call on_connect after connection finishes
  76. virtual int Connect(Socket*, const timespec*,
  77. int (*on_connect)(int, int, void*), void*) = 0;
  78. // Cut IOBufs into fd or SSL Channel
  79. virtual ssize_t CutMessageIntoFileDescriptor(int, butil::IOBuf**, size_t) = 0;
  80. virtual ssize_t CutMessageIntoSSLChannel(SSL*, butil::IOBuf**, size_t) = 0;
  81. };
  82. // Application-level connect. After TCP connected, the client sends some
  83. // sort of "connect" message to the server to establish application-level
  84. // connection.
  85. // Instances of AppConnect may be shared by multiple sockets and often
  86. // created by std::make_shared<T>() where T implements AppConnect
  87. class AppConnect {
  88. public:
  89. virtual ~AppConnect() {}
  90. // Called after TCP connected. Call done(error, data) when
  91. // the application-level connection is established.
  92. // Notice that `socket' can only be used for getting information of
  93. // the connection. To write into the socket, write socket->fd() with
  94. // sys_write directly. This is because Socket::Write() does not really
  95. // write out until AppConnect is done.
  96. virtual void StartConnect(const Socket* socket,
  97. void (*done)(int err, void* data),
  98. void* data) = 0;
  99. // Called when the host socket is setfailed or about to be recycled.
  100. // If the AppConnect is still in-progress, it should be canceled properly.
  101. virtual void StopConnect(Socket*) = 0;
  102. };
  103. // _s = per second, _m = per minute
  104. struct SocketStat {
  105. uint32_t in_size_s;
  106. uint32_t out_size_s;
  107. uint32_t in_num_messages_s;
  108. uint32_t out_num_messages_s;
  109. uint64_t in_size_m; // must be 64-bit
  110. uint64_t out_size_m;
  111. uint32_t in_num_messages_m;
  112. uint32_t out_num_messages_m;
  113. };
  114. struct SocketVarsCollector {
  115. SocketVarsCollector()
  116. : nsocket("rpc_socket_count")
  117. , channel_conn("rpc_channel_connection_count")
  118. , neventthread_second("rpc_event_thread_second", &neventthread)
  119. , nhealthcheck("rpc_health_check_count")
  120. , nkeepwrite_second("rpc_keepwrite_second", &nkeepwrite)
  121. , nwaitepollout("rpc_waitepollout_count")
  122. , nwaitepollout_second("rpc_waitepollout_second", &nwaitepollout)
  123. {}
  124. bvar::Adder<int64_t> nsocket;
  125. bvar::Adder<int64_t> channel_conn;
  126. bvar::Adder<int> neventthread;
  127. bvar::PerSecond<bvar::Adder<int> > neventthread_second;
  128. bvar::Adder<int64_t> nhealthcheck;
  129. bvar::Adder<int64_t> nkeepwrite;
  130. bvar::PerSecond<bvar::Adder<int64_t> > nkeepwrite_second;
  131. bvar::Adder<int64_t> nwaitepollout;
  132. bvar::PerSecond<bvar::Adder<int64_t> > nwaitepollout_second;
  133. };
  134. struct PipelinedInfo {
  135. PipelinedInfo() { reset(); }
  136. void reset() {
  137. count = 0;
  138. with_auth = false;
  139. id_wait = INVALID_BTHREAD_ID;
  140. }
  141. uint32_t count;
  142. bool with_auth;
  143. bthread_id_t id_wait;
  144. };
  145. struct SocketSSLContext {
  146. SocketSSLContext();
  147. ~SocketSSLContext();
  148. SSL_CTX* raw_ctx; // owned
  149. std::string sni_name; // useful for clients
  150. };
  151. // TODO: Comment fields
  152. struct SocketOptions {
  153. SocketOptions();
  154. // If `fd' is non-negative, set `fd' to be non-blocking and take the
  155. // ownership. Socket will close the fd(if needed) and call
  156. // user->BeforeRecycle() before recycling.
  157. int fd;
  158. butil::EndPoint remote_side;
  159. SocketUser* user;
  160. // When *edge-triggered* events happen on the file descriptor, callback
  161. // `on_edge_triggered_events' will be called. Inside the callback, user
  162. // shall read fd() in non-blocking mode until all data has been read
  163. // or EAGAIN is met, otherwise the callback will not be called again
  164. // until new data arrives. The callback will not be called from more than
  165. // one thread at any time.
  166. void (*on_edge_triggered_events)(Socket*);
  167. int health_check_interval_s;
  168. std::shared_ptr<SocketSSLContext> initial_ssl_ctx;
  169. bthread_keytable_pool_t* keytable_pool;
  170. SocketConnection* conn;
  171. std::shared_ptr<AppConnect> app_connect;
  172. // The created socket will set parsing_context with this value.
  173. Destroyable* initial_parsing_context;
  174. };
  175. // Abstractions on reading from and writing into file descriptors.
  176. // NOTE: accessed by multiple threads(frequently), align it by cacheline.
  177. class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket {
  178. friend class EventDispatcher;
  179. friend class InputMessenger;
  180. friend class Acceptor;
  181. friend class ConnectionsService;
  182. friend class SocketUser;
  183. friend class Stream;
  184. friend class Controller;
  185. friend class policy::ConsistentHashingLoadBalancer;
  186. friend class policy::RtmpContext;
  187. friend class schan::ChannelBalancer;
  188. friend class HealthCheckTask;
  189. friend class OnAppHealthCheckDone;
  190. friend class HealthCheckManager;
  191. friend class policy::H2GlobalStreamCreator;
  192. class SharedPart;
  193. struct Forbidden {};
  194. struct WriteRequest;
  195. public:
  196. const static int STREAM_FAKE_FD = INT_MAX;
  197. // NOTE: User cannot create Socket from constructor. Use Create()
  198. // instead. It's public just because of requirement of ResourcePool.
  199. Socket(Forbidden);
  200. ~Socket();
  201. // Write `msg' into this Socket and clear it. The `msg' should be an
  202. // intact request or response. To prevent messages from interleaving
  203. // with other messages, the internal file descriptor is written by one
  204. // thread at any time. Namely when only one thread tries to write, the
  205. // message is written once directly in the calling thread. If the message
  206. // is not completely written, a KeepWrite thread is created to continue
  207. // the writing. When other threads want to write simultaneously (thread
  208. // contention), they append WriteRequests to the KeepWrite thread in a
  209. // wait-free manner rather than writing to the file descriptor directly.
  210. // KeepWrite will not quit until all WriteRequests are complete.
  211. // Key properties:
  212. // - all threads have similar opportunities to write, no one is starved.
  213. // - Write once when uncontended(most cases).
  214. // - Wait-free when contended.
  215. struct WriteOptions {
  216. // `id_wait' is signalled when this Socket is SetFailed. To disable
  217. // the signal, set this field to INVALID_BTHREAD_ID.
  218. // `on_reset' of `id_wait' is NOT called when Write() returns non-zero.
  219. // Default: INVALID_BTHREAD_ID
  220. bthread_id_t id_wait;
  221. // If no connection exists, a connection will be established to
  222. // remote_side() regarding deadline `abstime'. NULL means no timeout.
  223. // Default: NULL
  224. const timespec* abstime;
  225. // Will be queued to implement positional correspondence with responses
  226. // Default: 0
  227. uint32_t pipelined_count;
  228. // [Only effective when pipelined_count is non-zero]
  229. // The request contains authenticating information which will be
  230. // responded by the server and processed specially when dealing
  231. // with the response.
  232. bool with_auth;
  233. // Do not return EOVERCROWDED
  234. // Default: false
  235. bool ignore_eovercrowded;
  236. WriteOptions()
  237. : id_wait(INVALID_BTHREAD_ID), abstime(NULL)
  238. , pipelined_count(0), with_auth(false)
  239. , ignore_eovercrowded(false) {}
  240. };
  241. int Write(butil::IOBuf *msg, const WriteOptions* options = NULL);
  242. // Write an user-defined message. `msg' is released when Write() is
  243. // successful and *may* remain unchanged otherwise.
  244. int Write(SocketMessagePtr<>& msg, const WriteOptions* options = NULL);
  245. // The file descriptor
  246. int fd() const { return _fd.load(butil::memory_order_relaxed); }
  247. // ip/port of the local end of the connection
  248. butil::EndPoint local_side() const { return _local_side; }
  249. // ip/port of the other end of the connection.
  250. butil::EndPoint remote_side() const { return _remote_side; }
  251. // Positive value enables health checking.
  252. // Initialized by SocketOptions.health_check_interval_s.
  253. int health_check_interval() const { return _health_check_interval_s; }
  254. // The unique identifier.
  255. SocketId id() const { return _this_id; }
  256. // `user' parameter passed to Create().
  257. SocketUser* user() const { return _user; }
  258. // `conn' parameter passed to Create()
  259. void set_conn(SocketConnection* conn) { _conn = conn; }
  260. SocketConnection* conn() const { return _conn; }
  261. // Saved contexts for parsing. Reset before trying new protocols and
  262. // recycling of the socket.
  263. void reset_parsing_context(Destroyable*);
  264. Destroyable* release_parsing_context();
  265. Destroyable* parsing_context() const
  266. { return _parsing_context.load(butil::memory_order_consume); }
  267. // Try to set _parsing_context to *ctx when _parsing_context is NULL.
  268. // If _parsing_context is NULL, the set is successful and true is returned.
  269. // Otherwise, *ctx is Destroy()-ed and replaced with the value of
  270. // _parsing_context, and false is returned. This process is thread-safe.
  271. template <typename T> bool initialize_parsing_context(T** ctx);
  272. // Connection-specific result of authentication.
  273. const AuthContext* auth_context() const { return _auth_context; }
  274. AuthContext* mutable_auth_context();
  275. // Create a Socket according to `options', put the identifier into `id'.
  276. // Returns 0 on sucess, -1 otherwise.
  277. static int Create(const SocketOptions& options, SocketId* id);
  278. // Place the Socket associated with identifier `id' into unique_ptr `ptr',
  279. // which will be released automatically when out of scope (w/o explicit
  280. // std::move). User can still access `ptr' after calling ptr->SetFailed()
  281. // before release of `ptr'.
  282. // This function is wait-free.
  283. // Returns 0 on success, -1 when the Socket was SetFailed().
  284. static int Address(SocketId id, SocketUniquePtr* ptr);
  285. // Re-address current socket into `ptr'.
  286. // Always succeed even if this socket is failed.
  287. void ReAddress(SocketUniquePtr* ptr);
  288. // Returns 0 on success, 1 on failed socket, -1 on recycled.
  289. static int AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr);
  290. // Mark this Socket or the Socket associated with `id' as failed.
  291. // Any later Address() of the identifier shall return NULL unless the
  292. // Socket was revivied by HealthCheckThread. The Socket is NOT recycled
  293. // after calling this function, instead it will be recycled when no one
  294. // references it. Internal fields of the Socket are still accessible
  295. // after calling this function. Calling SetFailed() of a Socket more
  296. // than once is OK.
  297. // This function is lock-free.
  298. // Returns -1 when the Socket was already SetFailed(), 0 otherwise.
  299. int SetFailed();
  300. int SetFailed(int error_code, const char* error_fmt, ...)
  301. __attribute__ ((__format__ (__printf__, 3, 4)));
  302. static int SetFailed(SocketId id);
  303. void AddRecentError();
  304. int64_t recent_error_count() const;
  305. int isolated_times() const;
  306. void FeedbackCircuitBreaker(int error_code, int64_t latency_us);
  307. bool Failed() const;
  308. bool DidReleaseAdditionalRereference() const
  309. { return _recycle_flag.load(butil::memory_order_relaxed); }
  310. // Notify `id' object (by calling bthread_id_error) when this Socket
  311. // has been `SetFailed'. If it already has, notify `id' immediately
  312. void NotifyOnFailed(bthread_id_t id);
  313. // Release the additional reference which added inside `Create'
  314. // before so that `Socket' will be recycled automatically once
  315. // on one is addressing it.
  316. int ReleaseAdditionalReference();
  317. // `ReleaseAdditionalReference' this Socket iff it has no data
  318. // transmission during the last `idle_seconds'
  319. int ReleaseReferenceIfIdle(int idle_seconds);
  320. // Set ELOGOFF flag to this `Socket' which means further requests
  321. // through this `Socket' will receive an ELOGOFF error. This only
  322. // affects return value of `IsAvailable' and won't close the inner
  323. // fd. Once set, this flag can only be cleared inside `WaitAndReset'.
  324. void SetLogOff();
  325. // Check Whether the socket is available for user requests.
  326. bool IsAvailable() const;
  327. // Start to process edge-triggered events from the fd.
  328. // This function does not block caller.
  329. static int StartInputEvent(SocketId id, uint32_t events,
  330. const bthread_attr_t& thread_attr);
  331. static const int PROGRESS_INIT = 1;
  332. bool MoreReadEvents(int* progress);
  333. // Fight for the right to authenticate this socket. Only one
  334. // fighter will get 0 as return value. Others will wait until
  335. // authentication finishes (succeed or not) and the error code
  336. // will be filled into `auth_error'. The winner MUST call
  337. // authentication finishes (succeed or not). The winner MUST call
  338. // `SetAuthentication' (whether authentication succeed or not)
  339. // to wake up waiters.
  340. // Return 0 on success, error code on failure.
  341. int FightAuthentication(int* auth_error);
  342. // Set the authentication result and signal all the waiters.
  343. // This function can only be called after a successfule
  344. // `FightAuthentication', otherwise it's regarded as an error
  345. void SetAuthentication(int error_code);
  346. // Since some protocols are not able to store correlation id in their
  347. // headers (such as nova-pbrpc, http), we have to store it here. Note
  348. // that there can only be 1 RPC call on this socket at any time, otherwise
  349. // use PushPipelinedInfo/PopPipelinedInfo instead.
  350. void set_correlation_id(uint64_t correlation_id)
  351. { _correlation_id = correlation_id; }
  352. uint64_t correlation_id() const { return _correlation_id; }
  353. // For protocols that need positional correspondence between responses
  354. // and requests.
  355. void PushPipelinedInfo(const PipelinedInfo&);
  356. bool PopPipelinedInfo(PipelinedInfo* info);
  357. // Undo previous PopPipelinedInfo
  358. void GivebackPipelinedInfo(const PipelinedInfo&);
  359. void set_preferred_index(int index) { _preferred_index = index; }
  360. int preferred_index() const { return _preferred_index; }
  361. void set_type_of_service(int tos) { _tos = tos; }
  362. // Call this method every second (roughly)
  363. void UpdateStatsEverySecond(int64_t now_ms);
  364. // Copy stat into `out'. If UpdateStatsEverySecond was never called, all
  365. // fields will be zero.
  366. void GetStat(SocketStat* out) const;
  367. // Call this when you receive an EOF event. `SetFailed' will be
  368. // called at last if EOF event is no longer postponed
  369. void SetEOF();
  370. // Postpone EOF event until `CheckEOF' has been called
  371. void PostponeEOF();
  372. void CheckEOF();
  373. SSLState ssl_state() const { return _ssl_state; }
  374. bool is_ssl() const { return ssl_state() == SSL_CONNECTED; }
  375. X509* GetPeerCertificate() const;
  376. // Print debugging inforamtion of `id' into the ostream.
  377. static void DebugSocket(std::ostream&, SocketId id);
  378. // Number of Heahth checking since last socket failure.
  379. int health_check_count() const { return _hc_count; }
  380. // True if this socket was created by Connect.
  381. bool CreatedByConnect() const;
  382. // Get an UNUSED socket connecting to the same place as this socket
  383. // from the SocketPool of this socket.
  384. int GetPooledSocket(SocketUniquePtr* pooled_socket);
  385. // Return this socket which MUST be got from GetPooledSocket to its
  386. // main_socket's pool.
  387. int ReturnToPool();
  388. // True if this socket has SocketPool
  389. bool HasSocketPool() const;
  390. // Put all sockets in _shared_part->socket_pool into `list'.
  391. void ListPooledSockets(std::vector<SocketId>* list, size_t max_count = 0);
  392. // Return true on success
  393. bool GetPooledSocketStats(int* numfree, int* numinflight);
  394. // Create a socket connecting to the same place as this socket.
  395. int GetShortSocket(SocketUniquePtr* short_socket);
  396. // Get and persist a socket connecting to the same place as this socket.
  397. // If an agent socket was already created and persisted, it's returned
  398. // directly (provided other constraints are satisfied)
  399. // If `checkfn' is not NULL, and the checking result on the socket that
  400. // would be returned is false, the socket is abandoned and the getting
  401. // process is restarted.
  402. // For example, http2 connections may run out of stream_id after long time
  403. // running and a new socket should be created. In order not to affect
  404. // LoadBalancers or NamingServices that may reference the Socket, agent
  405. // socket can be used for the communication and replaced periodically but
  406. // the main socket is unchanged.
  407. int GetAgentSocket(SocketUniquePtr* out, bool (*checkfn)(Socket*));
  408. // Take a peek at existing agent socket (no creation).
  409. // Returns 0 on success.
  410. int PeekAgentSocket(SocketUniquePtr* out) const;
  411. // Where the stats of this socket are accumulated to.
  412. SocketId main_socket_id() const;
  413. // Share the stats with the socket.
  414. void ShareStats(Socket* s);
  415. // Call this method to let the server SetFailed() this socket when the
  416. // socket becomes idle or Server.Stop() is called. Useful for stopping
  417. // streaming connections which are often referenced by many places,
  418. // without SetFailed(), the ref-count may never hit zero.
  419. void fail_me_at_server_stop() { _fail_me_at_server_stop = true; }
  420. bool shall_fail_me_at_server_stop() const { return _fail_me_at_server_stop; }
  421. // Tag the socket so that the response coming back from socket will be
  422. // parsed progressively. For example: in HTTP, the RPC may end w/o reading
  423. // the body part fully.
  424. void read_will_be_progressive(ConnectionType t)
  425. { _connection_type_for_progressive_read = t; }
  426. // True if read_will_be_progressive() was called.
  427. bool is_read_progressive() const
  428. { return _connection_type_for_progressive_read != CONNECTION_TYPE_UNKNOWN; }
  429. // Handle the socket according to its connection_type when the progressive
  430. // reading is finally done.
  431. void OnProgressiveReadCompleted();
  432. // Last cpuwide-time at when this socket was read or write.
  433. int64_t last_active_time_us() const {
  434. return std::max(
  435. _last_readtime_us.load(butil::memory_order_relaxed),
  436. _last_writetime_us.load(butil::memory_order_relaxed));
  437. }
  438. // A brief description of this socket, consistent with os << *this
  439. std::string description() const;
  440. // Returns true if the remote side is overcrowded.
  441. bool is_overcrowded() const { return _overcrowded; }
  442. bthread_keytable_pool_t* keytable_pool() const { return _keytable_pool; }
  443. private:
  444. DISALLOW_COPY_AND_ASSIGN(Socket);
  445. int ConductError(bthread_id_t);
  446. int StartWrite(WriteRequest*, const WriteOptions&);
  447. int Dereference();
  448. friend void DereferenceSocket(Socket*);
  449. static int Status(SocketId, int32_t* nref = NULL); // for unit-test.
  450. // Perform SSL handshake after TCP connection has been established.
  451. // Create SSL session inside and block (in bthread) until handshake
  452. // has completed. Application layer I/O is forbidden during this
  453. // process to avoid concurrent I/O on the underlying fd
  454. // Returns 0 on success, -1 otherwise
  455. int SSLHandshake(int fd, bool server_mode);
  456. // Based upon whether the underlying channel is using SSL (if
  457. // SSLState is SSL_UNKNOWN, try to detect at first), read data
  458. // using the corresponding method into `_read_buf'. Returns read
  459. // bytes on success, 0 on EOF, -1 otherwise and errno is set
  460. ssize_t DoRead(size_t size_hint);
  461. // Based upon whether the underlying channel is using SSL, write
  462. // `req' using the corresponding method. Returns written bytes on
  463. // success, -1 otherwise and errno is set
  464. ssize_t DoWrite(WriteRequest* req);
  465. // Called before returning to pool.
  466. void OnRecycle();
  467. // [Not thread-safe] Wait for EPOLLOUT event on `fd'. If `pollin' is
  468. // true, EPOLLIN event will also be included and EPOLL_CTL_MOD will
  469. // be used instead of EPOLL_CTL_ADD. Note that spurious wakeups may
  470. // occur when this function returns, so make sure to check whether fd
  471. // is writable or not even when it returns 0
  472. int WaitEpollOut(int fd, bool pollin, const timespec* abstime);
  473. // [Not thread-safe] Establish a tcp connection to `remote_side()'
  474. // If `on_connect' is NULL, this function blocks current thread
  475. // until connected/timeout. Otherwise, it returns immediately after
  476. // starting a connection request and `on_connect' will be called
  477. // when connecting completes (whether it succeeds or not)
  478. // Returns the socket fd on success, -1 otherwise
  479. int Connect(const timespec* abstime,
  480. int (*on_connect)(int fd, int err, void* data), void* data);
  481. int CheckConnected(int sockfd);
  482. // [Not thread-safe] Only used by `Write'.
  483. // Returns:
  484. // 0 - Already connected
  485. // 1 - Trying to establish connection
  486. // -1 - Failed to connect to remote side
  487. int ConnectIfNot(const timespec* abstime, WriteRequest* req);
  488. int ResetFileDescriptor(int fd);
  489. // Wait until nref hits `expected_nref' and reset some internal resources.
  490. int WaitAndReset(int32_t expected_nref);
  491. // Make this socket addressable again.
  492. void Revive();
  493. static void* ProcessEvent(void*);
  494. static void* KeepWrite(void*);
  495. bool IsWriteComplete(WriteRequest* old_head, bool singular_node,
  496. WriteRequest** new_tail);
  497. void ReturnFailedWriteRequest(
  498. WriteRequest*, int error_code, const std::string& error_text);
  499. void ReturnSuccessfulWriteRequest(WriteRequest*);
  500. WriteRequest* ReleaseWriteRequestsExceptLast(
  501. WriteRequest*, int error_code, const std::string& error_text);
  502. void ReleaseAllFailedWriteRequests(WriteRequest*);
  503. // Generic callback for Socket to handle epollout event
  504. static int HandleEpollOut(SocketId socket_id);
  505. class EpollOutRequest;
  506. // Callback to handle epollout event whose request data
  507. // is `EpollOutRequest'
  508. int HandleEpollOutRequest(int error_code, EpollOutRequest* req);
  509. // Callback when an EpollOutRequest reaches timeout
  510. static void HandleEpollOutTimeout(void* arg);
  511. // Callback when connection event reaches (succeeded or not)
  512. // This callback will be passed to `Connect'
  513. static int KeepWriteIfConnected(int fd, int err, void* data);
  514. static void CheckConnectedAndKeepWrite(int fd, int err, void* data);
  515. static void AfterAppConnected(int err, void* data);
  516. static void CreateVarsOnce();
  517. // Default impl. of health checking.
  518. int CheckHealth();
  519. // Add a stream over this Socket. And |stream_id| would be automatically
  520. // closed when this socket fails.
  521. // Retuns 0 on success. -1 otherwise, indicating that this is currently a
  522. // broken socket.
  523. int AddStream(StreamId stream_id);
  524. int RemoveStream(StreamId stream_id);
  525. void ResetAllStreams();
  526. bool ValidFileDescriptor(int fd);
  527. // For stats.
  528. void AddInputBytes(size_t bytes);
  529. void AddInputMessages(size_t count);
  530. void AddOutputBytes(size_t bytes);
  531. void AddOutputMessages(size_t count);
  532. SharedPart* GetSharedPart() const;
  533. SharedPart* GetOrNewSharedPart();
  534. SharedPart* GetOrNewSharedPartSlower();
  535. void CheckEOFInternal();
  536. // _error_code is set after a socket becomes failed, during the time
  537. // gap, _error_code is 0. The race condition is by-design and acceptable.
  538. // To always get a non-zero error_code, readers should call this method
  539. // instead of reading _error_code directly.
  540. int non_zero_error_code() const {
  541. const int tmp = _error_code;
  542. return tmp ? tmp : EFAILEDSOCKET;
  543. }
  544. void CancelUnwrittenBytes(size_t bytes);
  545. private:
  546. // unsigned 32-bit version + signed 32-bit referenced-count.
  547. // Meaning of version:
  548. // * Created version: no SetFailed() is called on the Socket yet. Must be
  549. // same evenness with initial _versioned_ref because during lifetime of
  550. // a Socket on the slot, the version is added with 1 twice. This is
  551. // also the version encoded in SocketId.
  552. // * Failed version: = created version + 1, SetFailed()-ed but returned.
  553. // * Other versions: the socket is already recycled.
  554. butil::atomic<uint64_t> _versioned_ref;
  555. // In/Out bytes/messages, SocketPool etc
  556. // _shared_part is shared by a main socket and all its pooled sockets.
  557. // Can't use intrusive_ptr because the creation is based on optimistic
  558. // locking and relies on atomic CAS. We manage references manually.
  559. butil::atomic<SharedPart*> _shared_part;
  560. // [ Set in dispatcher ]
  561. // To keep the callback in at most one bthread at any time. Read comments
  562. // about ProcessEvent in socket.cpp to understand the tricks.
  563. butil::atomic<int> _nevent;
  564. // May be set by Acceptor to share keytables between reading threads
  565. // on sockets created by the Acceptor.
  566. bthread_keytable_pool_t* _keytable_pool;
  567. // [ Set in ResetFileDescriptor ]
  568. butil::atomic<int> _fd; // -1 when not connected.
  569. int _tos; // Type of service which is actually only 8bits.
  570. int64_t _reset_fd_real_us; // When _fd was reset, in microseconds.
  571. // Address of peer. Initialized by SocketOptions.remote_side.
  572. butil::EndPoint _remote_side;
  573. // Address of self. Initialized in ResetFileDescriptor().
  574. butil::EndPoint _local_side;
  575. // Called when edge-triggered events happened on `_fd'. Read comments
  576. // of EventDispatcher::AddConsumer (event_dispatcher.h)
  577. // carefully before implementing the callback.
  578. void (*_on_edge_triggered_events)(Socket*);
  579. // A set of callbacks to monitor important events of this socket.
  580. // Initialized by SocketOptions.user
  581. SocketUser* _user;
  582. // Customize creation of the connection. Initialized by SocketOptions.conn
  583. SocketConnection* _conn;
  584. // User-level connection after TCP-connected.
  585. // Initialized by SocketOptions.app_connect.
  586. std::shared_ptr<AppConnect> _app_connect;
  587. // Identifier of this Socket in ResourcePool
  588. SocketId _this_id;
  589. // last chosen index of the protocol as a heuristic value to avoid
  590. // iterating all protocol handlers each time.
  591. int _preferred_index;
  592. // Number of HC since the last SetFailed() was called. Set to 0 when the
  593. // socket is revived. Only set in HealthCheckTask::OnTriggeringTask()
  594. int _hc_count;
  595. // Size of current incomplete message, set to 0 on complete.
  596. uint32_t _last_msg_size;
  597. // Average message size of last #MSG_SIZE_WINDOW messages (roughly)
  598. uint32_t _avg_msg_size;
  599. // Storing data read from `_fd' but cut-off yet.
  600. butil::IOPortal _read_buf;
  601. // Set with cpuwide_time_us() at last read operation
  602. butil::atomic<int64_t> _last_readtime_us;
  603. // Saved context for parsing, reset before trying other protocols.
  604. butil::atomic<Destroyable*> _parsing_context;
  605. // Saving the correlation_id of RPC on protocols that cannot put
  606. // correlation_id on-wire and do not send multiple requests on one
  607. // connection simultaneously.
  608. uint64_t _correlation_id;
  609. // Non-zero when health-checking is on.
  610. int _health_check_interval_s;
  611. // +-1 bit-+---31 bit---+
  612. // | flag | counter |
  613. // +-------+------------+
  614. // 1-bit flag to ensure `SetEOF' to be called only once
  615. // 31-bit counter of requests that are currently being processed
  616. butil::atomic<uint32_t> _ninprocess;
  617. // +---32 bit---+---32 bit---+
  618. // | auth flag | auth error |
  619. // +------------+------------+
  620. // Meanings of `auth flag':
  621. // 0 - not authenticated yet
  622. // 1 - authentication completed (whether it succeeded or not
  623. // depends on `auth error')
  624. butil::atomic<uint64_t> _auth_flag_error;
  625. bthread_id_t _auth_id;
  626. // Stores authentication result/context of this socket. This only
  627. // exists in server side
  628. AuthContext* _auth_context;
  629. SSLState _ssl_state;
  630. SSL* _ssl_session; // owner
  631. std::shared_ptr<SocketSSLContext> _ssl_ctx;
  632. // Pass from controller, for progressive reading.
  633. ConnectionType _connection_type_for_progressive_read;
  634. butil::atomic<bool> _controller_released_socket;
  635. // True if the socket is too full to write.
  636. volatile bool _overcrowded;
  637. bool _fail_me_at_server_stop;
  638. // Set by SetLogOff
  639. butil::atomic<bool> _logoff_flag;
  640. // Flag used to mark whether additional reference has been decreased
  641. // by either `SetFailed' or `SetRecycle'
  642. butil::atomic<bool> _recycle_flag;
  643. // Concrete error information from SetFailed()
  644. // Accesses to these 2 fields(especially _error_text) must be protected
  645. // by _id_wait_list_mutex
  646. int _error_code;
  647. std::string _error_text;
  648. butil::atomic<SocketId> _agent_socket_id;
  649. butil::Mutex _pipeline_mutex;
  650. std::deque<PipelinedInfo>* _pipeline_q;
  651. // For storing call-id of in-progress RPC.
  652. pthread_mutex_t _id_wait_list_mutex;
  653. bthread_id_list_t _id_wait_list;
  654. // Set with cpuwide_time_us() at last write operation
  655. butil::atomic<int64_t> _last_writetime_us;
  656. // Queued but written
  657. butil::atomic<int64_t> _unwritten_bytes;
  658. // Butex to wait for EPOLLOUT event
  659. butil::atomic<int>* _epollout_butex;
  660. // Storing data that are not flushed into `fd' yet.
  661. butil::atomic<WriteRequest*> _write_head;
  662. butil::Mutex _stream_mutex;
  663. std::set<StreamId> *_stream_set;
  664. butil::atomic<int64_t> _ninflight_app_health_check;
  665. };
  666. } // namespace brpc
  667. // Sleep a while when `write_expr' returns negative with errno=EOVERCROWDED
  668. // Implemented as a macro rather than a field of Socket.WriteOptions because
  669. // the macro works for other functions besides Socket.Write as well.
  670. #define BRPC_HANDLE_EOVERCROWDED(write_expr) \
  671. ({ \
  672. int64_t __ret_code__; \
  673. int sleep_time = 250; \
  674. while (true) { \
  675. __ret_code__ = (write_expr); \
  676. if (__ret_code__ >= 0 || errno != ::brpc::EOVERCROWDED) { \
  677. break; \
  678. } \
  679. sleep_time *= 2; \
  680. if (sleep_time > 2000) { sleep_time = 2000; } \
  681. ::bthread_usleep(sleep_time); \
  682. } \
  683. __ret_code__; \
  684. })
  685. // Sleep a while when `write_expr' returns negative with errno=EOVERCROWDED.
  686. // The sleep is done for at most `nretry' times.
  687. #define BRPC_HANDLE_EOVERCROWDED_N(write_expr, nretry) \
  688. ({ \
  689. int64_t __ret_code__ = 0; \
  690. int sleep_time = 250; \
  691. for (int i = static_cast<int>(nretry); i >= 0; --i) { \
  692. __ret_code__ = (write_expr); \
  693. if (__ret_code__ >= 0 || errno != ::brpc::EOVERCROWDED) { \
  694. break; \
  695. } \
  696. sleep_time *= 2; \
  697. if (sleep_time > 2000) { sleep_time = 2000; } \
  698. ::bthread_usleep(sleep_time); \
  699. } \
  700. __ret_code__; \
  701. })
  702. namespace std {
  703. ostream& operator<<(ostream& os, const brpc::Socket& sock);
  704. }
  705. #include "brpc/socket_inl.h"
  706. #endif // BRPC_SOCKET_H