da_server.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <inttypes.h>
  17. #include "da_server.h"
  18. #include "da_hashkit.h"
  19. #include "da_errno.h"
  20. #include "da_log.h"
  21. #include "da_event.h"
  22. #include "da_request.h"
  23. #include "da_response.h"
  24. #include "da_core.h"
  25. #include "da_conf.h"
  26. #include "da_stats.h"
  27. #include "da_time.h"
  28. static int keep_alive = 1; // 开启keepalive属性. 缺省值: 0(关闭)
  29. static int keep_idle = 5; // 如果在60秒内没有任何数据交互,则进行探测. 缺省值:7200(s)
  30. static int keep_interval = 5; // 探测时发探测包的时间间隔为5秒. 缺省值:75(s)
  31. static int keep_count = 1; // 探测重试的次数. 全部超时则认定连接失效..缺省值:9(次)
  32. void instance_ref(struct conn *conn, void *owner) {
  33. struct cache_instance *ins = owner;
  34. ASSERT(conn->type & BACKWORK);
  35. ASSERT(conn->owner == NULL);
  36. conn->family = ins->family;
  37. conn->addr = ins->addr;
  38. conn->addrlen = ins->addrlen;
  39. conn->owner = owner;
  40. ins->ns_conn_q++;
  41. TAILQ_INSERT_TAIL(&ins->s_conn_q, conn, conn_tqe);
  42. log_debug("ref conn %p owner %p into '%.*s", conn, ins, ins->pname.len,ins->pname.data);
  43. }
  44. void instance_unref(struct conn *conn) {
  45. struct cache_instance *ins = conn->owner;
  46. ASSERT(conn->type & BACKWORK);
  47. ASSERT(conn->owner == NULL);
  48. conn->owner = NULL;
  49. ASSERT(server->ns_conn_q != 0);
  50. ins->ns_conn_q--;
  51. TAILQ_REMOVE(&ins->s_conn_q, conn, conn_tqe);
  52. log_debug("unref conn %p owner %p from '%.*s'", conn, ins, ins->pname.len, ins->pname.data);
  53. }
  54. int server_active(struct conn *conn) {
  55. ASSERT(conn->type & BACKWORK);
  56. if (!TAILQ_EMPTY(&conn->imsg_q)) {
  57. log_debug("s %d is active", conn->fd);
  58. return true;
  59. }
  60. if (rbtree_min(&conn->msg_tree) != NULL) {
  61. log_debug("s %d is active", conn->fd);
  62. return true;
  63. }
  64. if (conn->rmsg != NULL) {
  65. log_debug("s %d is active", conn->fd);
  66. return true;
  67. }
  68. if (conn->smsg != NULL) {
  69. log_debug("s %d is active", conn->fd);
  70. return true;
  71. }
  72. log_debug("s %d is inactive", conn->fd);
  73. return false;
  74. }
  75. static uint32_t server_pool_hash(struct server_pool *pool, uint8_t *key,
  76. uint32_t keylen) {
  77. ASSERT(array_n(&pool->server) != 0);
  78. if (array_n(&pool->server) == 1) {
  79. return 0;
  80. }
  81. ASSERT(key != NULL && keylen != 0);
  82. return pool->key_hash((char *) key, keylen);
  83. }
  84. uint32_t server_pool_idx(struct server_pool *pool, uint8_t *key,
  85. uint32_t keylen) {
  86. ASSERT(array_n(&pool->server) != 0);
  87. //ASSERT(key != NULL && keylen != 0);
  88. uint32_t hash, idx;
  89. if(key == NULL && keylen == 0)
  90. {
  91. log_debug("server_pool_idx 2\n");
  92. return array_n(&pool->server) - 1;
  93. hash = server_pool_hash(pool, key, keylen);
  94. idx = ketama_dispatch(pool->continuum, pool->ncontinuum, hash, 2);
  95. }
  96. else
  97. {
  98. log_debug("server_pool_idx 1\n");
  99. return 0;
  100. hash = server_pool_hash(pool, key, keylen);
  101. idx = ketama_dispatch(pool->continuum, pool->ncontinuum, hash, 1);
  102. }
  103. log_debug("server_pool_idx keylen:%d, key:%p, server num: %d, idx: %d\n", keylen, key, array_n(&pool->server), idx);
  104. ASSERT(idx < array_n(&pool->server));
  105. return idx;
  106. }
  107. static struct server *server_pool_server(struct server_pool *pool,
  108. struct msg *msg) {
  109. struct server *server;
  110. uint32_t idx;
  111. idx = msg->idx;
  112. server = array_get(&pool->server, idx);
  113. return server;
  114. }
  115. static int instance_connect(struct context *ctx, struct cache_instance *instance,
  116. struct conn *conn) {
  117. ASSERT(ctx != NULL);
  118. ASSERT(server != NULL);
  119. ASSERT(conn != NULL);
  120. int status;
  121. if (conn->fd > 0) {
  122. return 0;
  123. }
  124. log_debug("connect to server '%.*s' family:%d", instance->pname.len, instance->pname.data, conn->family);
  125. conn->fd = socket(conn->family, SOCK_STREAM, 0);
  126. if (conn->fd < 0) {
  127. log_error("socket for server '%s' failed: %s", instance->addr->sa_data, strerror(errno));
  128. status = CONN_CREATSOCK_ERR;
  129. goto conn_error;
  130. }
  131. status = fcntl(conn->fd, F_SETFD, FD_CLOEXEC);
  132. if (status < 0) {
  133. log_error("fcntl FD_CLOEXEC on s %d for server '%.*s' failed: %s",
  134. conn->fd, instance->pname.len,instance->pname.data, strerror(errno));
  135. goto conn_error;
  136. }
  137. status = set_nonblocking(conn->fd);
  138. if (status < 0) {
  139. log_error("set nonblock on s %d for server '%.*s' failed: %s", conn->fd,
  140. instance->pname.len, instance->pname.data, strerror(errno));
  141. status = CONN_SETNOBLOCK_ERR;
  142. goto conn_error;
  143. }
  144. //if (instance->pname.data[0] != '/') {
  145. status = set_tcpnodelay(conn->fd);
  146. if (status != 0) {
  147. log_warning(
  148. "set tcpnodelay on s %d for server '%.*s' failed, ignored: %s",
  149. conn->fd, instance->pname.len, instance->pname.data,
  150. strerror(errno));
  151. }
  152. //}
  153. status = event_add_conn(ctx->evb, conn);
  154. if (status < 0) {
  155. log_error("event add conn s %d for server '%.*s' failed: %s", conn->fd,
  156. instance->pname.len, instance->pname.data, strerror(errno));
  157. status = CONN_EPOLLADD_ERR;
  158. goto conn_error;
  159. }
  160. status = connect(conn->fd, conn->addr, conn->addrlen);
  161. if (status != 0) {
  162. if (errno == EINPROGRESS) {
  163. conn->connecting = 1;
  164. log_debug("connecting on s %d to server '%.*s'", conn->fd,
  165. instance->pname.len, instance->pname.data);
  166. return 0;
  167. }
  168. log_error("connect on s %d to server '%.*s' failed: %s", conn->fd,
  169. instance->pname.len, instance->pname.data, strerror(errno));
  170. status = CONN_CONNECT_ERR;
  171. goto conn_error;
  172. }
  173. struct server *server;
  174. struct server_pool *pool;
  175. server = instance->owner;
  176. pool = server->owner;
  177. keep_alive = pool->auto_remove_replica ? 1 : 0;
  178. status = setsockopt(conn->fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&keep_alive, sizeof(keep_alive));
  179. status = setsockopt(conn->fd, SOL_TCP, TCP_KEEPIDLE, (void*)&keep_idle, sizeof(keep_idle));
  180. status = setsockopt(conn->fd, SOL_TCP, TCP_KEEPINTVL, (void*)&keep_interval, sizeof(keep_interval));
  181. status = setsockopt(conn->fd, SOL_TCP, TCP_KEEPCNT, (void*)&keep_count, sizeof(keep_count));
  182. if (status < 0)
  183. {
  184. goto conn_error;
  185. }
  186. ASSERT(conn->connecting ==0 && conn->connected ==0);
  187. conn->connected = 1;
  188. log_debug("connected on s %d to server '%.*s'", conn->fd, instance->pname.len, instance->pname.data);
  189. return 0;
  190. conn_error:
  191. conn->error = 1;
  192. conn->err = status;
  193. return -1;
  194. }
  195. static struct conn *instance_conn(struct cache_instance *ins){
  196. struct server_pool *pool;
  197. struct server *server;
  198. struct conn *conn;
  199. server = ins->owner;
  200. pool = server->owner;
  201. /*
  202. * FIXME: handle multiple server connections per server and do load
  203. * balancing on it. Support multiple algorithms for
  204. * 'server_connections:' > 0 key
  205. */
  206. if (ins->ns_conn_q < pool->server_connections) {
  207. conn = get_instance_conn(ins);
  208. return conn;
  209. }
  210. ASSERT(server->ns_conn_q == pool->server_connections);
  211. /*
  212. * Pick a server connection from the head of the queue and insert
  213. * it back into the tail of queue to maintain the lru order
  214. */
  215. conn = TAILQ_FIRST(&ins->s_conn_q);
  216. ASSERT(!conn->client && !conn->proxy);
  217. TAILQ_REMOVE(&ins->s_conn_q, conn, conn_tqe);
  218. TAILQ_INSERT_TAIL(&ins->s_conn_q, conn, conn_tqe);
  219. return conn;
  220. }
  221. struct cache_instance *get_instance_from_array(struct array replica_array, uint16_t *array_idx, uint16_t *cnt)
  222. {
  223. int i, idx = 0,w,t;
  224. int nreplica = array_n(&replica_array);
  225. struct cache_instance *ci = NULL;
  226. if (nreplica != 0)
  227. {
  228. for (i = 0; i < nreplica + 1; i++) {
  229. idx = (i + (*array_idx) + nreplica) % nreplica;
  230. ci = array_get(&replica_array, idx);
  231. w = ci->weight;
  232. if ((*cnt) < w && ci->nerr < ci->ns_conn_q) {
  233. (*cnt)++;
  234. *array_idx = idx;
  235. //printf("FFF i :%.*s nerr%d failtime%d \n", ci->pname.len, ci->pname.data, ci->nerr, ci->failure_num);
  236. return ci;
  237. }
  238. else if (ci->nerr >= ci->ns_conn_q && ci->failure_num < FAIL_TIME_LIMIT) {
  239. t = 1;
  240. t = t << ci->failure_num;
  241. t = t * 1000;
  242. printf("i :%.*s now_ms%"PRIu64" failtime%"PRIu64" t%"PRIu64" \n", ci->pname.len, ci->pname.data, now_ms, ci->last_failure_ms, t);
  243. if ((now_ms - ci->last_failure_ms) > t) {
  244. (*cnt) = 0;
  245. (*array_idx) = idx + 1;
  246. ci->nerr = 0;
  247. return ci;
  248. }
  249. else {
  250. (*cnt) = 0;
  251. continue;
  252. }
  253. }
  254. else {
  255. (*cnt) = 0;
  256. continue;
  257. }
  258. }
  259. (*array_idx) = idx;
  260. }
  261. return ci;
  262. }
  263. static struct cache_instance *get_instance_from_server(struct server *server) {
  264. struct cache_instance *ci = NULL;
  265. ci = get_instance_from_array(server->high_ptry_ins,&server->high_prty_idx, &server->high_prty_cnt);
  266. if (ci == NULL)
  267. {
  268. ci = get_instance_from_array(server->low_prty_ins, &server->low_prty_idx, &server->low_prty_cnt);
  269. }
  270. return ci;
  271. }
  272. //static struct cache_instance *get_instance_from_server(struct server *server){
  273. //
  274. // struct cache_instance *ci;
  275. // int nreplica_q1, nreplica_q2,w,i,idx;
  276. // uint64_t t ;
  277. // nreplica_q1 = array_n(&server->high_ptry_ins);
  278. // if (nreplica_q1 != 0)
  279. // {
  280. // for (i = 0; i < nreplica_q1 + 1; i++) {
  281. // idx = (i + server->high_prty_idx + nreplica_q1) % nreplica_q1;
  282. // ci = array_get(&server->high_ptry_ins, idx);
  283. // w = ci->weight;
  284. // if (server->high_prty_cnt < w && ci->nerr < ci->ns_conn_q) {
  285. // server->high_prty_cnt++;
  286. // server->high_prty_idx = idx;
  287. // printf("FFF i :%.*s nerr%d failtime%d \n", ci->pname.len, ci->pname.data, ci->nerr, ci->failure_num);
  288. // return ci;
  289. // }
  290. // else if (ci->nerr >= ci->ns_conn_q && ci->failure_num < FAIL_TIME_LIMIT) {
  291. // t = 1;
  292. // t = t << ci->failure_num;
  293. // t = t * 1000;
  294. // printf("i :%.*s now_ms%"PRIu64" failtime%"PRIu64" t%"PRIu64" \n", ci->pname.len, ci->pname.data, now_ms, ci->last_failure_ms, t);
  295. // if ((now_ms - ci->last_failure_ms) > t){
  296. // server->high_prty_cnt = 1;
  297. // server->high_prty_idx = idx;
  298. // ci->nerr = 0;
  299. // return ci;
  300. // }
  301. // else{
  302. // server->high_prty_cnt = 0;
  303. // continue;
  304. // }
  305. //
  306. // }
  307. // else {
  308. // server->high_prty_cnt = 0;
  309. // continue;
  310. // }
  311. //
  312. // }
  313. // server->high_prty_idx = idx;
  314. // }
  315. // nreplica_q2 = array_n(&server->low_prty_ins);
  316. // if (nreplica_q2 != 0)
  317. // {
  318. // for (i = 0; i < nreplica_q2 + 1; i++) {
  319. // idx = (i + server->low_prty_idx + nreplica_q2) % nreplica_q2;
  320. // ci = array_get(&server->low_prty_ins, idx);
  321. // w = ci->weight;
  322. // if (server->low_prty_cnt < w && ci->nerr < ci->ns_conn_q) {
  323. // server->low_prty_cnt++;
  324. // server->low_prty_idx = idx;
  325. // return ci;
  326. // }
  327. // else if (ci->nerr >= ci->ns_conn_q && ci->failure_num < FAIL_TIME_LIMIT) {
  328. // t = 1;
  329. // t = t << ci->failure_num;
  330. // t = t * 1000;
  331. // //printf("i :%.*s now_ms%"PRIu64" failtime%"PRIu64" t%"PRIu64" \n", ci->pname.len, ci->pname.data, now_ms, ci->last_failure_ms, t);
  332. // if ((now_ms - ci->last_failure_ms) > t) {
  333. // server->low_prty_cnt = 1;
  334. // server->low_prty_idx = idx;
  335. // ci->nerr = 0;
  336. // return ci;
  337. // }
  338. // else {
  339. // server->low_prty_cnt = 0;
  340. // continue;
  341. // }
  342. //
  343. // }
  344. // else {
  345. // server->low_prty_cnt = 0;
  346. // continue;
  347. // }
  348. // }
  349. //
  350. // server->low_prty_idx = idx;
  351. // }
  352. // return NULL;
  353. //}
  354. struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool,
  355. struct msg *msg) {
  356. int status;
  357. struct server *server;
  358. struct conn *conn = NULL;
  359. struct cache_instance *ci;
  360. /* from a given {key, keylen} pick a server from pool */
  361. server = server_pool_server(pool, msg);
  362. if (server == NULL) {
  363. return NULL;
  364. }
  365. // if write cmd always send to master or replica is disable in sys
  366. // always forward to master instance
  367. if (msg->cmd != MSG_REQ_GET || pool->replica_enable == 0)
  368. {
  369. conn = instance_conn(server->master);
  370. if (conn == NULL) {
  371. return NULL;
  372. }
  373. status = instance_connect(ctx, server->master, conn);
  374. if (status != 0) {
  375. log_error("instance connect failed, close server.");
  376. server_close(ctx, conn);
  377. return NULL;
  378. }
  379. ci = conn->owner;
  380. return conn;
  381. }
  382. else
  383. {
  384. ci = get_instance_from_server(server);
  385. if (ci == NULL){
  386. log_error("No machine is normal");
  387. return NULL;
  388. }
  389. ci->num++;
  390. conn = instance_conn(ci);
  391. if (conn == NULL) {
  392. return NULL;
  393. }
  394. status = instance_connect(ctx, ci, conn);
  395. if (status != 0) {
  396. log_error("instance connect failed, close server.");
  397. server_close(ctx, conn);
  398. return NULL;
  399. }
  400. return conn;
  401. }
  402. return conn;
  403. }
  404. void server_connected(struct context *ctx, struct conn *conn) {
  405. struct cache_instance *ins = conn->owner;
  406. ASSERT(conn->type & BACKWORK);
  407. ASSERT(conn->connecting && !conn->connected);
  408. stats_server_incr(ctx, ins, server_connections);
  409. conn->connecting = 0;
  410. conn->connected = 1;
  411. log_debug("connected on s %d to server '%.*s'", conn->fd, ins->pname.len, ins->pname.data);
  412. }
  413. static void
  414. server_close_stats(struct context *ctx, struct cache_instance *ci, int err,
  415. unsigned eof, unsigned connected)
  416. {
  417. if (connected) {
  418. stats_server_decr(ctx, ci, server_connections);
  419. }
  420. if (eof) {
  421. stats_server_incr(ctx, ci, server_eof);
  422. return;
  423. }
  424. if(err)
  425. {
  426. stats_server_incr(ctx, ci, server_err);
  427. }
  428. return;
  429. }
  430. void server_close(struct context *ctx, struct conn *conn) {
  431. int status;
  432. struct msg *msg, *nmsg;
  433. struct conn *c_conn; /* peer client connection */
  434. struct rbnode *node, *nnode;
  435. struct cache_instance *ci;
  436. //TODO
  437. ci = conn->owner;
  438. server_close_stats(ctx, ci, conn->err, conn->eof,
  439. conn->connected);
  440. if (conn->fd < 0) { /*has been closed*/
  441. conn->unref(conn);
  442. conn_put(conn);
  443. return;
  444. }
  445. struct server *server;
  446. struct server_pool *pool;
  447. server = ci->owner;
  448. pool = server->owner;
  449. if (pool->auto_remove_replica){
  450. incr_instance_failure_time(ci);
  451. }
  452. for (msg = TAILQ_FIRST(&conn->imsg_q); msg != NULL; msg = nmsg) {
  453. nmsg = TAILQ_NEXT(msg, s_i_tqe);
  454. conn->dequeue_inq(ctx, conn, msg);
  455. /*
  456. * client close connection,msg can be swallow
  457. */
  458. if (msg->swallow) {
  459. log_debug("close s %d swallow req %"PRIu64" len %"PRIu32 " type %d",
  460. conn->fd, msg->id, msg->mlen, msg->cmd);
  461. msg->done = 1;
  462. req_put(msg);
  463. } else {
  464. c_conn = msg->owner;
  465. msg->done = 1;
  466. msg->error = 1;
  467. msg->err = MSG_BACKWORKER_ERR;
  468. ci = conn->owner;
  469. stats_server_incr(ctx, ci, server_request_error);
  470. if (msg->frag_owner != NULL) {
  471. msg->frag_owner->nfrag_done++;
  472. }
  473. //if req is done , call req_forward
  474. if (req_done(c_conn, msg)) {
  475. rsp_forward(ctx, c_conn, msg);
  476. }
  477. }
  478. }
  479. ASSERT(TAILQ_EMPTY(&conn->imsg_q));
  480. for (node = rbtree_min(&conn->msg_tree); node != NULL; node = nnode) {
  481. msg = (struct msg *) node->data;
  482. conn->de_msgtree(ctx, conn, msg);
  483. nnode = rbtree_min(&conn->msg_tree);
  484. if (msg->swallow) {
  485. log_debug("close s %d swallow req %"PRIu64" len %"PRIu32 " type %d",
  486. conn->fd, msg->id, msg->mlen, msg->cmd);
  487. req_put(msg);
  488. } else {
  489. c_conn = msg->owner;
  490. msg->done = 1;
  491. msg->error = 1;
  492. msg->err = MSG_BACKWORKER_ERR;
  493. //TODO
  494. ci = conn->owner;
  495. stats_server_incr(ctx, ci, server_request_error);
  496. if (msg->frag_owner != NULL) {
  497. msg->frag_owner->nfrag_done++;
  498. }
  499. //if req is done , call req_forward
  500. if (req_done(c_conn, msg)) {
  501. rsp_forward(ctx, c_conn, msg);
  502. }
  503. }
  504. }
  505. msg = conn->rmsg;
  506. if (msg != NULL) {
  507. conn->rmsg = NULL;
  508. ASSERT(!msg->request);ASSERT(msg->peer == NULL);
  509. rsp_put(msg);
  510. log_debug(
  511. "close s %d discarding rsp %"PRIu64" len %"PRIu32" " "in error",
  512. conn->fd, msg->id, msg->mlen);
  513. }
  514. conn->unref(conn);
  515. status = close(conn->fd);
  516. if (status < 0) {
  517. log_error("close s %d failed, ignored: %s", conn->fd, strerror(errno));
  518. }
  519. conn->fd = -1;
  520. conn_put(conn);
  521. return;
  522. }
  523. static int server_each_set_owner(void *elem, void *data) {
  524. struct server *s = elem;
  525. struct server_pool *sp = data;
  526. s->owner = sp;
  527. return 0;
  528. }
  529. /*
  530. * init server
  531. */
  532. int server_init(struct array *server, struct array *conf_server,
  533. struct server_pool *sp) {
  534. int status;
  535. uint32_t nserver;
  536. nserver = array_n(conf_server);
  537. ASSERT(nserver != 0);
  538. ASSERT(array_n(server) == 0);
  539. status = array_init(server, nserver, sizeof(struct server));
  540. if (status != 0) {
  541. return status;
  542. }
  543. status = array_each(conf_server, conf_server_each_transform, server);
  544. if (status != 0) {
  545. //server_deinit(server);
  546. return status;
  547. }
  548. ASSERT(array_n(server) == nserver);
  549. status = array_each(server, server_each_set_owner, sp);
  550. if (status != 0) {
  551. //server_deinit(server);
  552. return status;
  553. }
  554. log_debug("init %"PRIu32" servers in pool %"PRIu32" '%.*s'", nserver,
  555. sp->idx, sp->name.len, sp->name.data);
  556. return 0;
  557. }
  558. void instance_deinit(struct array *instance) {
  559. uint32_t i, nserver;
  560. for (i = 0, nserver = array_n(instance); i < nserver; i++) {
  561. struct cache_instance *ci;
  562. ci = array_pop(instance);
  563. printf("ip : %.*s, num : %d, fail_time : %d\n", ci->pname.len, ci->pname.data, ci->num, ci->last_failure_ms);
  564. string_deinit(&ci->pname);
  565. ASSERT(TAILQ_EMPTY(&ci->s_conn_q) && ci->ns_conn_q == 0);
  566. }
  567. array_deinit(instance);
  568. }
  569. void server_deinit(struct array *server) {
  570. uint32_t i, nserver;
  571. for (i = 0, nserver = array_n(server); i < nserver; i++) {
  572. struct server *s;
  573. s = array_pop(server);
  574. string_deinit(&s->name);
  575. printf("\n first r\n");
  576. instance_deinit(&s->high_ptry_ins);
  577. printf("\n second r\n");
  578. instance_deinit(&s->low_prty_ins);
  579. }
  580. array_deinit(server);
  581. }
  582. static int server_each_preconnect(void *elem, void *data) {
  583. int status;
  584. struct server *server;
  585. struct server_pool *pool;
  586. struct conn *conn;
  587. int ninstance,i;
  588. struct cache_instance *ins;
  589. server = elem;
  590. pool = server->owner;
  591. conn = instance_conn(server->master);
  592. if (conn == NULL) {
  593. return -1;
  594. }
  595. status = instance_connect(pool->ctx, server->master, conn);
  596. if (status != 0) {
  597. log_warning("connect to server '%.*s' failed, ignored: %s",
  598. server->master->pname.len, server->master->pname.data, strerror(errno));
  599. server_close(pool->ctx, conn);
  600. }
  601. if (pool->replica_enable) // if enable replica, connect all instance
  602. {
  603. ninstance = array_n(&server->high_ptry_ins);
  604. for (i = 0; i < ninstance; i++)
  605. {
  606. ins = array_get(&server->high_ptry_ins, i);
  607. if (ins == server->master) continue;
  608. conn = instance_conn(ins);
  609. if (conn == NULL) {
  610. return -1;
  611. }
  612. status = instance_connect(pool->ctx, ins, conn);
  613. if (status != 0) {
  614. log_warning("connect to server '%.*s' failed, ignored: %s",
  615. ins->pname.len,ins->pname.data, strerror(errno));
  616. server_close(pool->ctx, conn);
  617. }
  618. }
  619. ninstance = array_n(&server->low_prty_ins);
  620. for (i = 0; i < ninstance; i++)
  621. {
  622. ins = array_get(&server->low_prty_ins, i);
  623. conn = instance_conn(ins);
  624. if (conn == NULL) {
  625. return -1;
  626. }
  627. status = instance_connect(pool->ctx, ins, conn);
  628. if (status != 0) {
  629. log_warning("connect to server '%.*s' failed, ignored: %s",
  630. ins->pname.len, ins->pname.data, strerror(errno));
  631. server_close(pool->ctx, conn);
  632. }
  633. }
  634. }
  635. //else// only connect master
  636. //{
  637. // conn = instance_conn(server->master);
  638. // if (conn == NULL) {
  639. // return -1;
  640. // }
  641. // status = instance_connect(pool->ctx, server->master, conn);
  642. // if (status != 0) {
  643. // log_warning("connect to server '%.*s' failed, ignored: %s",
  644. // server->master->pname.len, server->master->pname.data, strerror(errno));
  645. // server_close(pool->ctx, conn);
  646. // }
  647. //}
  648. return 0;
  649. }
  650. static int instance_disconnect(struct array *elem, void *data)
  651. {
  652. struct cache_instance *ins;
  653. struct server_pool *pool = data;
  654. int i, nrepilca;
  655. nrepilca = array_n(elem);
  656. for ( i = 0; i < nrepilca; i++) {
  657. ins = array_get(elem, i);
  658. while (!TAILQ_EMPTY(&ins->s_conn_q)) {
  659. struct conn *conn;
  660. conn = TAILQ_FIRST(&ins->s_conn_q);
  661. ins = conn->owner;
  662. conn->close(pool->ctx, conn);
  663. log_debug("close conn : %d in server '%.*s'", conn->fd,
  664. ins->pname.len, ins->pname.data);
  665. }
  666. }
  667. return 0;
  668. }
  669. static int server_each_disconnect(void *elem, void *data) {
  670. struct server *server;
  671. struct server_pool *pool;
  672. //struct cache_instance *ins;
  673. server = elem;
  674. pool = server->owner;
  675. /*while (!TAILQ_EMPTY(&server->master.s_conn_q)) {
  676. struct conn *conn;
  677. conn = TAILQ_FIRST(&server->master.s_conn_q);
  678. ins = conn->owner;
  679. conn->close(pool->ctx, conn);
  680. log_debug("close conn : %d in server '%.*s'", conn->fd,
  681. server->master.pname.len, server->master.pname.data);
  682. }*/
  683. instance_disconnect(&server->high_ptry_ins,pool);
  684. instance_disconnect(&server->low_prty_ins, pool);
  685. return 0;
  686. }
  687. static int server_pool_each_preconnect(void *elem, void *data) {
  688. int status;
  689. struct server_pool *sp;
  690. sp = (struct server_pool *) elem;
  691. if (!sp->preconnect) {
  692. return 0;
  693. }
  694. status = array_each(&sp->server, server_each_preconnect, NULL);
  695. if (status != 0) {
  696. return status;
  697. }
  698. return 0;
  699. }
  700. int server_pool_preconnect(struct context *ctx) {
  701. int status;
  702. status = array_each(&ctx->pool, server_pool_each_preconnect, NULL);
  703. if (status != 0) {
  704. return status;
  705. }
  706. return 0;
  707. }
  708. static int server_pool_each_disconnect(void *elem, void *data) {
  709. int status;
  710. struct server_pool *sp;
  711. sp = (struct server_pool *)elem;
  712. status = array_each(&sp->server, server_each_disconnect, NULL);
  713. if (status != 0) {
  714. return status;
  715. }
  716. return 0;
  717. }
  718. void server_pool_disconnect(struct context *ctx) {
  719. array_each(&ctx->pool, server_pool_each_disconnect, NULL);
  720. }
  721. static int server_pool_each_set_owner(void *elem, void *data) {
  722. struct server_pool *sp = elem;
  723. struct context *ctx = data;
  724. sp->ctx = ctx;
  725. return 0;
  726. }
  727. /*
  728. * calc the total server connection numbers
  729. */
  730. static int server_pool_each_calc_connections(void *elem, void *data) {
  731. struct server_pool *sp = elem;
  732. struct server *server;
  733. int ninstance,i;
  734. struct context *ctx = data;
  735. ninstance = 0;
  736. for (i = 0; i < array_n(&sp->server); i++){
  737. server = array_get(&sp->server, i);
  738. if (sp->replica_enable) {
  739. ninstance += array_n(&server->high_ptry_ins);
  740. ninstance += array_n(&server->low_prty_ins);
  741. }
  742. }
  743. ctx->max_nsconn += sp->server_connections * ninstance;
  744. ctx->max_nsconn += 1; /* pool listening socket */
  745. ctx->sum_nconn += sp->client_connections;
  746. ctx->sum_nconn += sp->server_connections * ninstance;
  747. ctx->sum_nconn += 1; /* pool listening socket */
  748. return 0;
  749. }
  750. /*
  751. * init the ketama structure
  752. */
  753. int server_pool_run(struct server_pool *pool) {
  754. ASSERT(array_n(&pool->server) != 0);
  755. return ketama_update(pool);
  756. }
  757. /*
  758. * 对于每个server pool 构建后台的服务器结构,构建hash环
  759. */
  760. static int server_pool_each_run(void *elem, void *data) {
  761. return server_pool_run(elem);
  762. }
  763. /*
  764. * 初始化server pool
  765. */
  766. int server_pool_init(struct array *server_pool, struct array *conf_pool,
  767. struct context *ctx) {
  768. int status;
  769. uint32_t npool;
  770. npool = array_n(conf_pool);
  771. ASSERT(npool != 0);
  772. ASSERT(array_n(server_pool) == 0);
  773. status = array_init(server_pool, npool, sizeof(struct server_pool));
  774. if (status != 0) {
  775. return status;
  776. }
  777. /* transform conf pool to server pool */
  778. // 对于conf_pool中的每个对象调用conf_pool_each_transform函数
  779. status = array_each(conf_pool, conf_pool_each_transform, server_pool);
  780. if (status != 0) {
  781. server_pool_deinit(server_pool);
  782. return status;
  783. }ASSERT(array_n(server_pool) == npool);
  784. /* set ctx as the server pool owner */
  785. status = array_each(server_pool, server_pool_each_set_owner, ctx);
  786. if (status != 0) {
  787. server_pool_deinit(server_pool);
  788. return status;
  789. }
  790. /* compute max server connections */
  791. ctx->max_nsconn = 0;
  792. status = array_each(server_pool, server_pool_each_calc_connections, ctx);
  793. if (status != 0) {
  794. server_pool_deinit(server_pool);
  795. return status;
  796. }
  797. /* update server pool continuum */
  798. status = array_each(server_pool, server_pool_each_run, NULL);
  799. if (status != 0) {
  800. server_pool_deinit(server_pool);
  801. return status;
  802. }
  803. log_debug("init %"PRIu32" pools", npool);
  804. return 0;
  805. }
  806. static void server_pool_disconnect_client(struct server_pool *pool)
  807. {
  808. log_debug("disconnect %d clients on pool %"PRIu32" '%.*s'",
  809. pool->c_conn_count, pool->idx, pool->name.len, pool->name.data);
  810. while (!TAILQ_EMPTY(&pool->c_conn_q)) {
  811. struct conn *c = TAILQ_FIRST(&pool->c_conn_q);
  812. c->close(pool->ctx, c);
  813. }
  814. }
  815. /*
  816. * destory server pool
  817. */
  818. void server_pool_deinit(struct array *server_pool) {
  819. uint32_t i, npool;
  820. for (i = 0, npool = array_n(server_pool); i < npool; i++) {
  821. struct server_pool *sp;
  822. sp = array_pop(server_pool);
  823. ASSERT(sp->p_conn == NULL);
  824. ASSERT(TAILQ_EMPTY(&sp->c_conn_q) && sp->nc_conn_q == 0);
  825. server_pool_disconnect_client(sp);
  826. if (sp->continuum != NULL) {
  827. free(sp->continuum);
  828. sp->ncontinuum = 0;
  829. sp->nserver_continuum = 0;
  830. }
  831. string_deinit(&sp->accesskey);
  832. string_deinit(&sp->addrstr);
  833. string_deinit(&sp->accesskey);
  834. string_deinit(&sp->module_idc);
  835. server_deinit(&sp->server);
  836. if(sp->top_percentile_param)
  837. free(sp->top_percentile_param);
  838. log_debug("deinit pool %"PRIu32" '%.*s'", sp->idx, sp->name.len,
  839. sp->name.data);
  840. }
  841. array_deinit(server_pool);
  842. }
  843. /*
  844. * server timeout
  845. */
  846. int server_timeout(struct conn *conn) {
  847. struct cache_instance *ins;
  848. struct server *server;
  849. struct server_pool *pool;
  850. ASSERT(!conn->client && !conn->proxy);
  851. ins = conn->owner;
  852. server = ins->owner;
  853. pool = server->owner;
  854. return pool->timeout;
  855. }
  856. /*decr instance failure time*/
  857. int decr_instance_failure_time(struct msg *msg)
  858. {
  859. struct cache_instance *ci;
  860. if (msg->peer_conn == NULL){
  861. return -1;
  862. }
  863. if (msg->peer_conn->type & BACKWORK){
  864. ci = msg->peer_conn->owner;
  865. if (ci != NULL){
  866. ci->failure_num = 0;
  867. ci->nerr = 0;
  868. return 0;
  869. }
  870. }
  871. return -1;
  872. }
  873. int incr_instance_failure_time(struct cache_instance *ci)
  874. {
  875. ci->nerr++;
  876. if (ci->nerr >= ci->ns_conn_q) {
  877. if (ci->failure_num < FAIL_TIME_LIMIT) {
  878. ci->failure_num++;
  879. ci->last_failure_ms = now_ms;
  880. }
  881. }
  882. return 0;
  883. }