123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- /*
- * Copyright [2021] JD.com, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "da_listener.h"
- #include "da_array.h"
- #include "da_server.h"
- #include "da_conn.h"
- #include "da_log.h"
- #include "da_core.h"
- #include "da_event.h"
- #include "da_stats.h"
- #include "da_msg.h"
- void listener_ref(struct conn *l, void *owner)
- {
- struct server_pool *pool = owner;
- ASSERT(owner == NULL);
- l->owner = pool;
- l->family = pool->family;
- l->addrlen = pool->addrlen;
- l->addr = pool->addr;
- pool->listener = l;
- }
- void listener_unref(struct conn *l)
- {
- ASSERT(l->type & LISTENER);
- ASSERT(l->owner != NULL);
- struct server_pool *pool;
- pool = l->owner;
- l->owner = NULL;
- pool->listener = NULL;
- }
- void listener_close(struct context *ctx, struct conn *l)
- {
- ASSERT(ctx != NULL);
- ASSERT(l->flag & LISTENER);
- int status;
- if (l->fd < 0) {
- l->unref(l);
- conn_put(l);
- return;
- }
- ASSERT(l->rmsg == NULL);
- ASSERT(l->smsg == NULL);
- l->unref(l);
- status = event_del_conn(ctx->evb, l);
- if (status < 0) {
- log_error("event del conn p %d failed, ignored: %s", l->fd,
- strerror(errno));
- }
- status = close(l->fd);
- if (status < 0) {
- log_error("close p %d failed, ignored: %s", l->fd,
- strerror(errno));
- }
- l->fd = -1;
- conn_put(l);
- }
- static int listener_reuse(struct conn *l)
- {
- int status;
- struct sockaddr_un *un;
- switch (l->family) {
- case AF_INET:
- case AF_INET6:
- set_reuseaddr(l->fd);
- break;
- case AF_UNIX:
- un = (struct sockaddr_un *)l->addr;
- unlink(un->sun_path);
- status = 0;
- break;
- default:
- status = -1;
- }
- return status;
- }
- static int listener_listen(struct context *ctx, struct conn *l)
- {
- int status;
- struct server_pool *pool = l->owner;
- ASSERT(l->type & LISTENER);
- l->fd = socket(l->family, SOCK_STREAM, 0);
- if (l->fd < 0) {
- log_error("socket failed: %s", strerror(errno));
- return -1;
- }
- status = listener_reuse(l);
- if (status < 0) {
- log_error(
- "reuse of addr '%.*s' for listening on p %d failed: %s",
- pool->addrstr.len, pool->addrstr.data, l->fd,
- strerror(errno));
- return status;
- }
- status = bind(l->fd, pool->addr, pool->addrlen);
- if (status < 0) {
- log_error("bind on p %d to addr '%.*s' failed: %s", l->fd,
- pool->addrstr.len, pool->addrstr.data,
- strerror(errno));
- return -1;
- }
- status = listen(l->fd, pool->backlog);
- if (status < 0) {
- log_error("listen on p %d on addr '%.*s' failed: %s", l->fd,
- pool->addrstr.len, pool->addrstr.data,
- strerror(errno));
- return -1;
- }
- status = set_nonblocking(l->fd);
- if (status < 0) {
- log_error("set nonblock on p %d on addr '%.*s' failed: %s",
- l->fd, pool->addrstr.len, pool->addrstr.data,
- strerror(errno));
- return -1;
- }
- return 0;
- }
- static int listener_inherited_listen(struct context *ctx, struct conn *l)
- {
- int status;
- int fd;
- struct server_pool *pool = l->owner;
- ASSERT(p->proxy);
- fd = core_inherited_socket(da_unresolve_addr(l->addr, l->addrlen));
- if (fd > 0) {
- l->fd = fd;
- } else {
- status = listener_listen(ctx, l);
- if (status != 0) {
- return status;
- }
- }
- status = event_add_conn(ctx->evb, l);
- if (status < 0) {
- log_error("event add conn p %d on addr '%.*s' failed: %s",
- l->fd, pool->addrstr.len, pool->addrstr.data,
- strerror(errno));
- return -1;
- }
- status = event_del_out(ctx->evb, l);
- if (status < 0) {
- log_error("event del out p %d on addr '%.*s' failed: %s", l->fd,
- pool->addrstr.len, pool->addrstr.data,
- strerror(errno));
- return -1;
- }
- return 0;
- }
- int listener_each_init(void *elem, void *data)
- {
- int status;
- struct server_pool *pool = elem;
- struct conn *l;
- l = get_listener(pool);
- if (l == NULL) {
- log_error("init listener fail,get conn fail!");
- return -1;
- }
- //status = listener_listen(pool->ctx, l);
- status = listener_inherited_listen(pool->ctx, l);
- if (status < 0) {
- l->close(pool->ctx, l);
- log_error("listener %d listen fail!", l->fd);
- return -1;
- }
- log_debug("pool:%p addr '%.*s' ,listen success", pool,
- pool->addrstr.len, pool->addrstr.data);
- return 0;
- }
- int listener_each_deinit(void *elem, void *data)
- {
- struct server_pool *pool = elem;
- struct conn *l;
- l = pool->listener;
- if (l != NULL) {
- l->close(pool->ctx, l);
- }
- log_debug(" addr '%.*s' ,deinit listener success", pool->addrstr.len,
- pool->addrstr.data);
- return 0;
- }
- int listener_init(struct context *ctx)
- {
- int status;
- ASSERT(array_n(&ctx->pool) != 0);
- status = array_each(&ctx->pool, listener_each_init, NULL);
- if (status != 0) {
- listener_deinit(ctx);
- return status;
- }
- log_debug("init all pool's listener");
- return 0;
- }
- void listener_deinit(struct context *ctx)
- {
- int status;
- ASSERT(array_n(&ctx->pool) != 0);
- status = array_each(&ctx->pool, listener_each_deinit, NULL);
- if (status != 0) {
- return;
- }
- log_debug("deinit all listener success");
- return;
- }
- static int listener_accept(struct context *ctx, struct conn *l)
- {
- int status;
- int fd;
- struct conn *c;
- ASSERT(l->type & LISTENER);
- ASSERT(l->fd > 0);
- ASSERT((l->flag & RECV_ACTIVE) && (l->flag & RECV_READY));
- for (;;) {
- fd = accept(l->fd, NULL, NULL);
- if (fd < 0) {
- if (errno == EINTR) {
- log_debug(
- "accept on listener %d not ready - eintr",
- l->fd);
- continue;
- }
- /*
- * 多进程情况下,同时accept会出现错误,errno=11,程序吞掉这个错误
- */
- if (errno == EAGAIN || errno == EWOULDBLOCK ||
- errno == ECONNABORTED) {
- log_debug("accept on l %d not ready - eagain",
- l->fd);
- l->flag &= ~RECV_READY;
- return 0;
- }
- if (errno == EMFILE || errno == ENFILE) {
- log_debug(
- "accept on listener :%d fail :no enough fd for use",
- l->fd);
- l->flag &= ~RECV_READY;
- return 0;
- }
- log_error("accept on listener %d failed: %s", l->fd,
- strerror(errno));
- return -1;
- }
- break;
- }
- /*
- * 对全局的FD资源进行限制,每个进程单独资源
- */
- if (get_ncurr_cconn() >= ctx->max_ncconn ||
- get_ncurr_cconn() >
- ((struct server_pool *)(l->owner))->client_connections) {
- log_error(
- "current conn:%d is biger than max client connection for ctx:%d",
- get_ncurr_cconn(), ctx->max_ncconn);
- status = close(fd);
- if (status < 0) {
- log_error("close client %d failed, ignored: %s", fd,
- strerror(errno));
- }
- return 0;
- }
- c = get_client_conn(l->owner);
- if (c == NULL) {
- log_error("get conn for client %d from p %d failed: %s", fd,
- l->fd, strerror(errno));
- status = close(fd);
- if (status < 0) {
- log_error("close client %d failed, ignored: %s", fd,
- strerror(errno));
- }
- return -1;
- }
- c->fd = fd;
- status = set_nonblocking(c->fd);
- if (status < 0) {
- log_error("set nonblock on client %d from p %d failed: %s",
- c->fd, l->fd, strerror(errno));
- c->close(ctx, c);
- return status;
- }
- /*所有的客户端连接在exec之后全部关闭*/
- status = fcntl(c->fd, F_SETFD, FD_CLOEXEC);
- if (status < 0) {
- log_error("fcntl FD_CLOEXEC on c %d from p %d failed: %s",
- c->fd, l->fd, strerror(errno));
- c->close(ctx, c);
- return status;
- }
- /*
- * 对于ipv4与ipv6协议关闭negale算法
- */
- if (l->family == AF_INET || l->family == AF_INET6) {
- status = set_tcpnodelay(c->fd);
- if (status < 0) {
- log_error(
- "set tcpnodelay on client %d from listener %d failed, ignored: %s",
- c->fd, l->fd, strerror(errno));
- }
- }
- stats_pool_incr(ctx, c->owner, client_connections);
- status = event_add_conn(ctx->evb, c);
- if (status < 0) {
- log_error("event add conn from client %d failed: %s", c->fd,
- strerror(errno));
- c->close(ctx, c);
- return status;
- }
- // set connected tag for client conn
- c->connected = 1;
- log_debug("accepted client %d on listener %d", c->fd, l->fd);
- /* send mysql server welcome info after tcp connected. */
- struct msg *smsg;
- if (c->writecached == 0 && c->connected == 1) {
- c->stage = CONN_STAGE_LOGGING_IN;
- smsg = msg_get(c, true);
- if (smsg == NULL) {
- c->error = 1;
- c->err = CONN_MSG_GET_ERR;
- return -1;
- }
- status = net_send_server_greeting(c, smsg);
- if (status < 0) {
- log_error("server greeting info build error:%d",
- status);
- c->error = 1;
- c->err = CONN_MSG_GET_ERR;
- msg_put(smsg);
- return -1;
- }
- if (c->writecached == 0 && c->connected == 1) {
- log_debug("writecached & connected");
- cache_send_event(c);
- }
- }
- c->enqueue_outq(ctx, c, smsg);
- return 0;
- }
- int listener_recv(struct context *ctx, struct conn *conn)
- {
- int status;
- ASSERT(conn->type & LISTENER);
- ASSERT(conn->flag & RECV_ACTIVE);
- conn->flag |= RECV_READY;
- do {
- status = listener_accept(ctx, conn);
- if (status != 0) {
- return status;
- }
- } while (conn->flag & RECV_READY);
- return 0;
- }
|