server.cpp 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // A server to receive EchoRequest and send back EchoResponse asynchronously.
  18. #include <gflags/gflags.h>
  19. #include <butil/logging.h>
  20. #include <brpc/server.h>
  21. #include "echo.pb.h"
  22. DEFINE_bool(echo_attachment, true, "Echo attachment as well");
  23. DEFINE_int32(port, 8002, "TCP Port of this server");
  24. DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
  25. "read/write operations during the last `idle_timeout_s'");
  26. DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state "
  27. "(waiting for client to close connection before server stops)");
  28. DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
  29. butil::atomic<int> nsd(0);
  30. struct MySessionLocalData {
  31. MySessionLocalData() : x(123) {
  32. nsd.fetch_add(1, butil::memory_order_relaxed);
  33. }
  34. ~MySessionLocalData() {
  35. nsd.fetch_sub(1, butil::memory_order_relaxed);
  36. }
  37. int x;
  38. };
  39. class MySessionLocalDataFactory : public brpc::DataFactory {
  40. public:
  41. void* CreateData() const {
  42. return new MySessionLocalData;
  43. }
  44. void DestroyData(void* d) const {
  45. delete static_cast<MySessionLocalData*>(d);
  46. }
  47. };
  48. butil::atomic<int> ntls(0);
  49. struct MyThreadLocalData {
  50. MyThreadLocalData() : y(0) {
  51. ntls.fetch_add(1, butil::memory_order_relaxed);
  52. }
  53. ~MyThreadLocalData() {
  54. ntls.fetch_sub(1, butil::memory_order_relaxed);
  55. }
  56. static void deleter(void* d) {
  57. delete static_cast<MyThreadLocalData*>(d);
  58. }
  59. int y;
  60. };
  61. class MyThreadLocalDataFactory : public brpc::DataFactory {
  62. public:
  63. void* CreateData() const {
  64. return new MyThreadLocalData;
  65. }
  66. void DestroyData(void* d) const {
  67. MyThreadLocalData::deleter(d);
  68. }
  69. };
  70. struct AsyncJob {
  71. MySessionLocalData* expected_session_local_data;
  72. int expected_session_value;
  73. brpc::Controller* cntl;
  74. const example::EchoRequest* request;
  75. example::EchoResponse* response;
  76. google::protobuf::Closure* done;
  77. void run();
  78. void run_and_delete() {
  79. run();
  80. delete this;
  81. }
  82. };
  83. static void* process_thread(void* args) {
  84. AsyncJob* job = static_cast<AsyncJob*>(args);
  85. job->run_and_delete();
  86. return NULL;
  87. }
  88. // Your implementation of example::EchoService
  89. class EchoServiceWithThreadAndSessionLocal : public example::EchoService {
  90. public:
  91. EchoServiceWithThreadAndSessionLocal() {
  92. CHECK_EQ(0, bthread_key_create(&_tls2_key, MyThreadLocalData::deleter));
  93. }
  94. ~EchoServiceWithThreadAndSessionLocal() {
  95. CHECK_EQ(0, bthread_key_delete(_tls2_key));
  96. };
  97. void Echo(google::protobuf::RpcController* cntl_base,
  98. const example::EchoRequest* request,
  99. example::EchoResponse* response,
  100. google::protobuf::Closure* done) {
  101. brpc::ClosureGuard done_guard(done);
  102. brpc::Controller* cntl =
  103. static_cast<brpc::Controller*>(cntl_base);
  104. // Get the session-local data which is created by ServerOptions.session_local_data_factory
  105. // and reused between different RPC. All session-local data are
  106. // destroyed upon server destruction.
  107. MySessionLocalData* sd = static_cast<MySessionLocalData*>(cntl->session_local_data());
  108. if (sd == NULL) {
  109. cntl->SetFailed("Require ServerOptions.session_local_data_factory to be"
  110. " set with a correctly implemented instance");
  111. LOG(ERROR) << cntl->ErrorText();
  112. return;
  113. }
  114. const int expected_value = sd->x + (((uintptr_t)cntl) & 0xFFFFFFFF);
  115. sd->x = expected_value;
  116. // Get the thread-local data which is created by ServerOptions.thread_local_data_factory
  117. // and reused between different threads. All thread-local data are
  118. // destroyed upon server destruction.
  119. // "tls" is short for "thread local storage".
  120. MyThreadLocalData* tls =
  121. static_cast<MyThreadLocalData*>(brpc::thread_local_data());
  122. if (tls == NULL) {
  123. cntl->SetFailed("Require ServerOptions.thread_local_data_factory "
  124. "to be set with a correctly implemented instance");
  125. LOG(ERROR) << cntl->ErrorText();
  126. return;
  127. }
  128. tls->y = expected_value;
  129. // You can create bthread-local data for your own.
  130. // The interfaces are similar with pthread equivalence:
  131. // pthread_key_create -> bthread_key_create
  132. // pthread_key_delete -> bthread_key_delete
  133. // pthread_getspecific -> bthread_getspecific
  134. // pthread_setspecific -> bthread_setspecific
  135. MyThreadLocalData* tls2 =
  136. static_cast<MyThreadLocalData*>(bthread_getspecific(_tls2_key));
  137. if (tls2 == NULL) {
  138. tls2 = new MyThreadLocalData;
  139. CHECK_EQ(0, bthread_setspecific(_tls2_key, tls2));
  140. }
  141. tls2->y = expected_value + 1;
  142. // sleep awhile to force context switching.
  143. bthread_usleep(10000);
  144. // tls is unchanged after context switching.
  145. CHECK_EQ(tls, brpc::thread_local_data());
  146. CHECK_EQ(expected_value, tls->y);
  147. CHECK_EQ(tls2, bthread_getspecific(_tls2_key));
  148. CHECK_EQ(expected_value + 1, tls2->y);
  149. // Process the request asynchronously.
  150. AsyncJob* job = new AsyncJob;
  151. job->expected_session_local_data = sd;
  152. job->expected_session_value = expected_value;
  153. job->cntl = cntl;
  154. job->request = request;
  155. job->response = response;
  156. job->done = done;
  157. bthread_t th;
  158. CHECK_EQ(0, bthread_start_background(&th, NULL, process_thread, job));
  159. // We don't want to call done->Run() here, release the guard.
  160. done_guard.release();
  161. LOG_EVERY_SECOND(INFO) << "ntls=" << ntls.load(butil::memory_order_relaxed)
  162. << " nsd=" << nsd.load(butil::memory_order_relaxed);
  163. }
  164. private:
  165. bthread_key_t _tls2_key;
  166. };
  167. void AsyncJob::run() {
  168. brpc::ClosureGuard done_guard(done);
  169. // Sleep some time to make sure that Echo() exits.
  170. bthread_usleep(10000);
  171. // Still the session-local data that we saw in Echo().
  172. // This is the major difference between session-local data and thread-local
  173. // data which was already destroyed upon Echo() exit.
  174. MySessionLocalData* sd = static_cast<MySessionLocalData*>(cntl->session_local_data());
  175. CHECK_EQ(expected_session_local_data, sd);
  176. CHECK_EQ(expected_session_value, sd->x);
  177. // Echo request and its attachment
  178. response->set_message(request->message());
  179. if (FLAGS_echo_attachment) {
  180. cntl->response_attachment().append(cntl->request_attachment());
  181. }
  182. }
  183. int main(int argc, char* argv[]) {
  184. // Parse gflags. We recommend you to use gflags as well.
  185. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
  186. // The factory to create MySessionLocalData. Must be valid when server is running.
  187. MySessionLocalDataFactory session_local_data_factory;
  188. MyThreadLocalDataFactory thread_local_data_factory;
  189. // Generally you only need one Server.
  190. brpc::Server server;
  191. // For more options see `brpc/server.h'.
  192. brpc::ServerOptions options;
  193. options.idle_timeout_sec = FLAGS_idle_timeout_s;
  194. options.max_concurrency = FLAGS_max_concurrency;
  195. options.session_local_data_factory = &session_local_data_factory;
  196. options.thread_local_data_factory = &thread_local_data_factory;
  197. // Instance of your service.
  198. EchoServiceWithThreadAndSessionLocal echo_service_impl;
  199. // Add the service into server. Notice the second parameter, because the
  200. // service is put on stack, we don't want server to delete it, otherwise
  201. // use brpc::SERVER_OWNS_SERVICE.
  202. if (server.AddService(&echo_service_impl,
  203. brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
  204. LOG(ERROR) << "Fail to add service";
  205. return -1;
  206. }
  207. // Start the server.
  208. if (server.Start(FLAGS_port, &options) != 0) {
  209. LOG(ERROR) << "Fail to start EchoServer";
  210. return -1;
  211. }
  212. // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
  213. server.RunUntilAskedToQuit();
  214. CHECK_EQ(ntls, 0);
  215. CHECK_EQ(nsd, 0);
  216. return 0;
  217. }