da_listener.c 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "da_listener.h"
  17. #include "da_array.h"
  18. #include "da_server.h"
  19. #include "da_conn.h"
  20. #include "da_log.h"
  21. #include "da_core.h"
  22. #include "da_event.h"
  23. #include "da_stats.h"
  24. #include "da_msg.h"
  25. void listener_ref(struct conn *l, void *owner)
  26. {
  27. struct server_pool *pool = owner;
  28. ASSERT(owner == NULL);
  29. l->owner = pool;
  30. l->family = pool->family;
  31. l->addrlen = pool->addrlen;
  32. l->addr = pool->addr;
  33. pool->listener = l;
  34. }
  35. void listener_unref(struct conn *l)
  36. {
  37. ASSERT(l->type & LISTENER);
  38. ASSERT(l->owner != NULL);
  39. struct server_pool *pool;
  40. pool = l->owner;
  41. l->owner = NULL;
  42. pool->listener = NULL;
  43. }
  44. void listener_close(struct context *ctx, struct conn *l)
  45. {
  46. ASSERT(ctx != NULL);
  47. ASSERT(l->flag & LISTENER);
  48. int status;
  49. if (l->fd < 0) {
  50. l->unref(l);
  51. conn_put(l);
  52. return;
  53. }
  54. ASSERT(l->rmsg == NULL);
  55. ASSERT(l->smsg == NULL);
  56. l->unref(l);
  57. status = event_del_conn(ctx->evb, l);
  58. if (status < 0) {
  59. log_error("event del conn p %d failed, ignored: %s", l->fd,
  60. strerror(errno));
  61. }
  62. status = close(l->fd);
  63. if (status < 0) {
  64. log_error("close p %d failed, ignored: %s", l->fd,
  65. strerror(errno));
  66. }
  67. l->fd = -1;
  68. conn_put(l);
  69. }
  70. static int listener_reuse(struct conn *l)
  71. {
  72. int status;
  73. struct sockaddr_un *un;
  74. switch (l->family) {
  75. case AF_INET:
  76. case AF_INET6:
  77. set_reuseaddr(l->fd);
  78. break;
  79. case AF_UNIX:
  80. un = (struct sockaddr_un *)l->addr;
  81. unlink(un->sun_path);
  82. status = 0;
  83. break;
  84. default:
  85. status = -1;
  86. }
  87. return status;
  88. }
  89. static int listener_listen(struct context *ctx, struct conn *l)
  90. {
  91. int status;
  92. struct server_pool *pool = l->owner;
  93. ASSERT(l->type & LISTENER);
  94. l->fd = socket(l->family, SOCK_STREAM, 0);
  95. if (l->fd < 0) {
  96. log_error("socket failed: %s", strerror(errno));
  97. return -1;
  98. }
  99. status = listener_reuse(l);
  100. if (status < 0) {
  101. log_error(
  102. "reuse of addr '%.*s' for listening on p %d failed: %s",
  103. pool->addrstr.len, pool->addrstr.data, l->fd,
  104. strerror(errno));
  105. return status;
  106. }
  107. status = bind(l->fd, pool->addr, pool->addrlen);
  108. if (status < 0) {
  109. log_error("bind on p %d to addr '%.*s' failed: %s", l->fd,
  110. pool->addrstr.len, pool->addrstr.data,
  111. strerror(errno));
  112. return -1;
  113. }
  114. status = listen(l->fd, pool->backlog);
  115. if (status < 0) {
  116. log_error("listen on p %d on addr '%.*s' failed: %s", l->fd,
  117. pool->addrstr.len, pool->addrstr.data,
  118. strerror(errno));
  119. return -1;
  120. }
  121. status = set_nonblocking(l->fd);
  122. if (status < 0) {
  123. log_error("set nonblock on p %d on addr '%.*s' failed: %s",
  124. l->fd, pool->addrstr.len, pool->addrstr.data,
  125. strerror(errno));
  126. return -1;
  127. }
  128. return 0;
  129. }
  130. static int listener_inherited_listen(struct context *ctx, struct conn *l)
  131. {
  132. int status;
  133. int fd;
  134. struct server_pool *pool = l->owner;
  135. ASSERT(p->proxy);
  136. fd = core_inherited_socket(da_unresolve_addr(l->addr, l->addrlen));
  137. if (fd > 0) {
  138. l->fd = fd;
  139. } else {
  140. status = listener_listen(ctx, l);
  141. if (status != 0) {
  142. return status;
  143. }
  144. }
  145. status = event_add_conn(ctx->evb, l);
  146. if (status < 0) {
  147. log_error("event add conn p %d on addr '%.*s' failed: %s",
  148. l->fd, pool->addrstr.len, pool->addrstr.data,
  149. strerror(errno));
  150. return -1;
  151. }
  152. status = event_del_out(ctx->evb, l);
  153. if (status < 0) {
  154. log_error("event del out p %d on addr '%.*s' failed: %s", l->fd,
  155. pool->addrstr.len, pool->addrstr.data,
  156. strerror(errno));
  157. return -1;
  158. }
  159. return 0;
  160. }
  161. int listener_each_init(void *elem, void *data)
  162. {
  163. int status;
  164. struct server_pool *pool = elem;
  165. struct conn *l;
  166. l = get_listener(pool);
  167. if (l == NULL) {
  168. log_error("init listener fail,get conn fail!");
  169. return -1;
  170. }
  171. //status = listener_listen(pool->ctx, l);
  172. status = listener_inherited_listen(pool->ctx, l);
  173. if (status < 0) {
  174. l->close(pool->ctx, l);
  175. log_error("listener %d listen fail!", l->fd);
  176. return -1;
  177. }
  178. log_debug("pool:%p addr '%.*s' ,listen success", pool,
  179. pool->addrstr.len, pool->addrstr.data);
  180. return 0;
  181. }
  182. int listener_each_deinit(void *elem, void *data)
  183. {
  184. struct server_pool *pool = elem;
  185. struct conn *l;
  186. l = pool->listener;
  187. if (l != NULL) {
  188. l->close(pool->ctx, l);
  189. }
  190. log_debug(" addr '%.*s' ,deinit listener success", pool->addrstr.len,
  191. pool->addrstr.data);
  192. return 0;
  193. }
  194. int listener_init(struct context *ctx)
  195. {
  196. int status;
  197. ASSERT(array_n(&ctx->pool) != 0);
  198. status = array_each(&ctx->pool, listener_each_init, NULL);
  199. if (status != 0) {
  200. listener_deinit(ctx);
  201. return status;
  202. }
  203. log_debug("init all pool's listener");
  204. return 0;
  205. }
  206. void listener_deinit(struct context *ctx)
  207. {
  208. int status;
  209. ASSERT(array_n(&ctx->pool) != 0);
  210. status = array_each(&ctx->pool, listener_each_deinit, NULL);
  211. if (status != 0) {
  212. return;
  213. }
  214. log_debug("deinit all listener success");
  215. return;
  216. }
  217. static int listener_accept(struct context *ctx, struct conn *l)
  218. {
  219. int status;
  220. int fd;
  221. struct conn *c;
  222. ASSERT(l->type & LISTENER);
  223. ASSERT(l->fd > 0);
  224. ASSERT((l->flag & RECV_ACTIVE) && (l->flag & RECV_READY));
  225. for (;;) {
  226. fd = accept(l->fd, NULL, NULL);
  227. if (fd < 0) {
  228. if (errno == EINTR) {
  229. log_debug(
  230. "accept on listener %d not ready - eintr",
  231. l->fd);
  232. continue;
  233. }
  234. /*
  235. * 多进程情况下,同时accept会出现错误,errno=11,程序吞掉这个错误
  236. */
  237. if (errno == EAGAIN || errno == EWOULDBLOCK ||
  238. errno == ECONNABORTED) {
  239. log_debug("accept on l %d not ready - eagain",
  240. l->fd);
  241. l->flag &= ~RECV_READY;
  242. return 0;
  243. }
  244. if (errno == EMFILE || errno == ENFILE) {
  245. log_debug(
  246. "accept on listener :%d fail :no enough fd for use",
  247. l->fd);
  248. l->flag &= ~RECV_READY;
  249. return 0;
  250. }
  251. log_error("accept on listener %d failed: %s", l->fd,
  252. strerror(errno));
  253. return -1;
  254. }
  255. break;
  256. }
  257. /*
  258. * 对全局的FD资源进行限制,每个进程单独资源
  259. */
  260. if (get_ncurr_cconn() >= ctx->max_ncconn ||
  261. get_ncurr_cconn() >
  262. ((struct server_pool *)(l->owner))->client_connections) {
  263. log_error(
  264. "current conn:%d is biger than max client connection for ctx:%d",
  265. get_ncurr_cconn(), ctx->max_ncconn);
  266. status = close(fd);
  267. if (status < 0) {
  268. log_error("close client %d failed, ignored: %s", fd,
  269. strerror(errno));
  270. }
  271. return 0;
  272. }
  273. c = get_client_conn(l->owner);
  274. if (c == NULL) {
  275. log_error("get conn for client %d from p %d failed: %s", fd,
  276. l->fd, strerror(errno));
  277. status = close(fd);
  278. if (status < 0) {
  279. log_error("close client %d failed, ignored: %s", fd,
  280. strerror(errno));
  281. }
  282. return -1;
  283. }
  284. c->fd = fd;
  285. status = set_nonblocking(c->fd);
  286. if (status < 0) {
  287. log_error("set nonblock on client %d from p %d failed: %s",
  288. c->fd, l->fd, strerror(errno));
  289. c->close(ctx, c);
  290. return status;
  291. }
  292. /*所有的客户端连接在exec之后全部关闭*/
  293. status = fcntl(c->fd, F_SETFD, FD_CLOEXEC);
  294. if (status < 0) {
  295. log_error("fcntl FD_CLOEXEC on c %d from p %d failed: %s",
  296. c->fd, l->fd, strerror(errno));
  297. c->close(ctx, c);
  298. return status;
  299. }
  300. /*
  301. * 对于ipv4与ipv6协议关闭negale算法
  302. */
  303. if (l->family == AF_INET || l->family == AF_INET6) {
  304. status = set_tcpnodelay(c->fd);
  305. if (status < 0) {
  306. log_error(
  307. "set tcpnodelay on client %d from listener %d failed, ignored: %s",
  308. c->fd, l->fd, strerror(errno));
  309. }
  310. }
  311. stats_pool_incr(ctx, c->owner, client_connections);
  312. status = event_add_conn(ctx->evb, c);
  313. if (status < 0) {
  314. log_error("event add conn from client %d failed: %s", c->fd,
  315. strerror(errno));
  316. c->close(ctx, c);
  317. return status;
  318. }
  319. // set connected tag for client conn
  320. c->connected = 1;
  321. log_debug("accepted client %d on listener %d", c->fd, l->fd);
  322. /* send mysql server welcome info after tcp connected. */
  323. struct msg *smsg;
  324. if (c->writecached == 0 && c->connected == 1) {
  325. c->stage = CONN_STAGE_LOGGING_IN;
  326. smsg = msg_get(c, true);
  327. if (smsg == NULL) {
  328. c->error = 1;
  329. c->err = CONN_MSG_GET_ERR;
  330. return -1;
  331. }
  332. status = net_send_server_greeting(c, smsg);
  333. if (status < 0) {
  334. log_error("server greeting info build error:%d",
  335. status);
  336. c->error = 1;
  337. c->err = CONN_MSG_GET_ERR;
  338. msg_put(smsg);
  339. return -1;
  340. }
  341. if (c->writecached == 0 && c->connected == 1) {
  342. log_debug("writecached & connected");
  343. cache_send_event(c);
  344. }
  345. }
  346. c->enqueue_outq(ctx, c, smsg);
  347. return 0;
  348. }
  349. int listener_recv(struct context *ctx, struct conn *conn)
  350. {
  351. int status;
  352. ASSERT(conn->type & LISTENER);
  353. ASSERT(conn->flag & RECV_ACTIVE);
  354. conn->flag |= RECV_READY;
  355. do {
  356. status = listener_accept(ctx, conn);
  357. if (status != 0) {
  358. return status;
  359. }
  360. } while (conn->flag & RECV_READY);
  361. return 0;
  362. }