da_stats.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <stddef.h>
  17. #include <sys/time.h>
  18. #include <pthread.h>
  19. #include <inttypes.h>
  20. #include <sys/mman.h>
  21. #include "da_core.h"
  22. #include "da_stats.h"
  23. #include "da_server.h"
  24. #include "da_log.h"
  25. #include "da_time.h"
  26. struct string http_rsp;
  27. static char *report_url = "";
  28. static char *report_url_instance = "";
  29. static int httptimeout = 2;
  30. static int report_offset = 0;
  31. struct stats_desc {
  32. char *name; /* stats name */
  33. stats_type_t type;
  34. char *desc; /* stats description */
  35. };
  36. #define DEFINE_ACTION(_name, _type, _desc) { .type = _type, .name = string(#_name) },
  37. static struct stats_metric stats_pool_codec[] = {
  38. STATS_POOL_CODEC( DEFINE_ACTION ) };
  39. static struct stats_metric stats_server_codec[] = {
  40. STATS_SERVER_CODEC( DEFINE_ACTION ) };
  41. #undef DEFINE_ACTION
  42. #define DEFINE_ACTION(_name, _type, _desc) { .type = _type, .name = #_name, .desc = _desc },
  43. static struct stats_desc stats_pool_desc[] = {
  44. STATS_POOL_CODEC( DEFINE_ACTION ) };
  45. static struct stats_desc stats_server_desc[] = {
  46. STATS_SERVER_CODEC( DEFINE_ACTION ) };
  47. #undef DEFINE_ACTION
  48. /* wake lock for lock timeout*/
  49. static pthread_mutex_t wakeLock;
  50. void stats_describe(void) {
  51. uint32_t i;
  52. write_stderr("pool stats:");
  53. for (i = 0; i < NELEMS(stats_pool_desc); i++) {
  54. write_stderr(" %-20s\"%s\"", stats_pool_desc[i].name,
  55. stats_pool_desc[i].desc);
  56. }
  57. write_stderr("");
  58. write_stderr("server stats:");
  59. for (i = 0; i < NELEMS(stats_server_desc); i++) {
  60. write_stderr(" %-20s\"%s\"", stats_server_desc[i].name,
  61. stats_server_desc[i].desc);
  62. }
  63. }
  64. static void stats_metric_init(struct stats_metric *stm) {
  65. switch (stm->type) {
  66. case STATS_COUNTER:
  67. stm->value.counter = 0LL;
  68. break;
  69. case STATS_GAUGE:
  70. stm->value.counter = 0LL;
  71. break;
  72. case STATS_TIMESTAMP:
  73. stm->value.timestamp = 0LL;
  74. break;
  75. default:
  76. log_error("error stats_type");
  77. }
  78. }
  79. static void stats_metric_reset(struct array *stats_metric) {
  80. uint32_t i, nmetric;
  81. nmetric = array_n(stats_metric);
  82. ASSERT(nmetric == STATS_POOL_NFIELD || nmetric == STATS_SERVER_NFIELD);
  83. for (i = 0; i < nmetric; i++) {
  84. struct stats_metric *stm = array_get(stats_metric, i);
  85. stats_metric_init(stm);
  86. }
  87. }
  88. static void stats_pool_reset(struct array *stats_pool) {
  89. uint32_t i, npool;
  90. npool = array_n(stats_pool);
  91. for (i = 0; i < npool; i++) {
  92. struct stats_pool *stp = array_get(stats_pool, i);
  93. uint32_t j, nserver;
  94. stats_metric_reset(&stp->metric);
  95. nserver = array_n(&stp->server);
  96. for (j = 0; j < nserver; j++) {
  97. struct stats_server *sts = array_get(&stp->server, j);
  98. stats_metric_reset(&sts->metric);
  99. }
  100. }
  101. }
  102. static void stats_aggregate_item(struct stats_file_item *item_list,
  103. struct array *shadow_metric, int list_size) {
  104. int i;
  105. struct stats_metric *stm;
  106. for (i = 0; i < list_size; i++) {
  107. stm = array_get(shadow_metric, i);
  108. switch (item_list[i].type) {
  109. case STATS_COUNTER:
  110. item_list[i].stat_once = stm->value.counter;
  111. item_list[i].stat_all += stm->value.counter;
  112. break;
  113. case STATS_GAUGE:
  114. item_list[i].stat_once = stm->value.counter;
  115. item_list[i].stat_all += stm->value.counter;
  116. break;
  117. case STATS_TIMESTAMP:
  118. if (stm->value.timestamp) {
  119. item_list[i].stat_once = stm->value.counter;
  120. item_list[i].stat_all = stm->value.counter;
  121. }
  122. break;
  123. default:
  124. log_error("error stats_type");
  125. }
  126. }
  127. }
  128. static void stats_aggregate_reset(struct stats_file_item *item_list,
  129. int list_size) {
  130. int i;
  131. for (i = 0; i < list_size; i++) {
  132. switch (item_list[i].type) {
  133. case STATS_COUNTER:
  134. item_list[i].stat_once = 0;
  135. break;
  136. case STATS_GAUGE:
  137. item_list[i].stat_once = 0;
  138. break;
  139. case STATS_TIMESTAMP:
  140. item_list[i].stat_once = 0;
  141. item_list[i].stat_all = 0;
  142. break;
  143. default:
  144. log_error("error stats_type");
  145. }
  146. }
  147. }
  148. static void stats_aggregate(struct stats *st) {
  149. uint32_t i;
  150. if (st->aggregate == 0) {
  151. //log_debug("skip aggregate of shadow %p as generator is slow",
  152. // st->shadow.elem);
  153. for (i = 0; i < array_n(&st->aggregator); i++) {
  154. uint32_t j;
  155. struct stats_file_pool *stfp;
  156. stfp = array_get(&st->aggregator, i);
  157. stats_aggregate_reset(stfp->pool_item_list,
  158. stfp->phead->poolfields);
  159. for (j = 0; j < array_n(&stfp->stats_file_servers); j++) {
  160. struct stats_file_server *stfs;
  161. stfs = array_get(&stfp->stats_file_servers, j);
  162. stats_aggregate_reset(stfs->server_item_list,
  163. stfs->shead->serverfields);
  164. }
  165. }
  166. return;
  167. }
  168. for (i = 0; i < array_n(&st->aggregator); i++) {
  169. uint32_t j;
  170. struct stats_file_pool *stfp;
  171. struct stats_pool *stp;
  172. stp = array_get(&st->shadow, i);
  173. stfp = array_get(&st->aggregator, i);
  174. stats_aggregate_item(stfp->pool_item_list, &stp->metric,
  175. stfp->phead->poolfields);
  176. for (j = 0; j < array_n(&stfp->stats_file_servers); j++) {
  177. struct stats_file_server *stfs;
  178. struct stats_server *sts;
  179. sts = array_get(&stp->server, j);
  180. stfs = array_get(&stfp->stats_file_servers, j);
  181. stats_aggregate_item(stfs->server_item_list, &sts->metric,
  182. stfs->shead->serverfields);
  183. }
  184. }
  185. /*
  186. * Reset shadow (b) stats before giving it back to generator to keep
  187. * stats addition idempotent
  188. */
  189. stats_pool_reset(&st->shadow);
  190. st->aggregate = 0;
  191. return;
  192. }
  193. /* report data to monitor center*/
  194. static int stats_report(struct stats *st) {
  195. return 0;
  196. }
  197. static void stats_loop_callback(void *arg1, int arg2) {
  198. struct stats *st = arg1;
  199. int n = arg2;
  200. /* aggregate stats from shadow (b) -> sum (c) */
  201. stats_aggregate(st);
  202. /* report data to monitor center*/
  203. if (n == 1) {
  204. stats_report(st);
  205. }
  206. return;
  207. }
  208. static void event_loop_stats(event_stats_cb_t cb, void *arg) {
  209. struct stats *st = arg;
  210. time_t next = 0;
  211. struct timeval tv;
  212. gettimeofday(&tv, NULL);
  213. next = (tv.tv_sec / 10) * 10 + 10+report_offset;
  214. struct timespec ts;
  215. ts.tv_sec = next;
  216. ts.tv_nsec = 0;
  217. _set_remote_log_fd_();
  218. while (pthread_mutex_timedlock(&wakeLock, &ts) != 0) {
  219. gettimeofday(&tv, NULL);
  220. if (tv.tv_sec >= next) {
  221. //call stats_loop_callback
  222. cb(st, 1);
  223. gettimeofday(&tv, NULL);
  224. //report add an offset
  225. next = (tv.tv_sec / 10) * 10 + 10 + report_offset;
  226. }
  227. ts.tv_sec = next;
  228. ts.tv_nsec = 0;
  229. }
  230. pthread_mutex_unlock(&wakeLock);
  231. pthread_mutex_destroy(&wakeLock);
  232. }
  233. static void *stats_loop(void *arg) {
  234. event_loop_stats(stats_loop_callback, arg);
  235. return NULL;
  236. }
  237. static int stats_start_aggregator(struct stats *st) {
  238. int status;
  239. if (!stats_enabled) {
  240. return 0;
  241. }
  242. report_offset =rand() % 10;
  243. //init the globe mutex
  244. pthread_mutex_init(&wakeLock, NULL);
  245. if (pthread_mutex_trylock(&wakeLock) == 0) {
  246. status = pthread_create(&st->tid, NULL, stats_loop, st);
  247. if (status < 0) {
  248. log_error("stats aggregator create failed: %s", strerror(status));
  249. return -1;
  250. }
  251. }
  252. return 0;
  253. }
  254. static void stats_stop_aggregator(struct stats *st) {
  255. if (!stats_enabled) {
  256. return;
  257. }
  258. if (pthread_mutex_trylock(&wakeLock) == 0) {
  259. pthread_mutex_unlock(&wakeLock);
  260. }
  261. return;
  262. }
  263. static void *mount(const char *filename, int rw, int *size) {
  264. char path[256];
  265. void *_map;
  266. int fd;
  267. mkdir(STATS_DIR, 0755);
  268. if (access(STATS_DIR, W_OK | X_OK) < 0) {
  269. log_error("stats dir(%s): Not writable", STATS_DIR);
  270. return NULL;
  271. }
  272. int flag = rw;
  273. int prot = 0;
  274. switch (flag) {
  275. case O_WRONLY:
  276. case O_RDWR:
  277. default:
  278. flag = O_CREAT | O_RDWR;
  279. prot = PROT_WRITE | PROT_READ;
  280. break;
  281. case O_RDONLY:
  282. prot = PROT_READ;
  283. break;
  284. }
  285. da_strcpy(path, STATS_DIR);
  286. da_strcat(path, filename);
  287. if ((fd = open(path, flag, 0644)) < 0) {
  288. log_error("open failed[path:%s], errno:%d %m", path, errno);
  289. return NULL;
  290. }
  291. struct stat st;
  292. fstat(fd, &st);
  293. if (O_RDONLY == rw) {
  294. *size = st.st_size;
  295. } else if (st.st_size != *size) {
  296. int unused;
  297. unused = ftruncate(fd, *size);
  298. }
  299. _map = (char *) mmap(0, *size, prot, MAP_SHARED, fd, 0);
  300. if (MAP_FAILED == _map) {
  301. log_error("map failed[path:%s, size:%d, _fd:%d], errno:%d %m", path,
  302. *size, fd, errno);
  303. close(fd);
  304. return NULL;
  305. }
  306. close(fd);
  307. log_debug("map success[path:%s, size:%d, _fd:%d]", path, *size, fd);
  308. return _map;
  309. }
  310. static int stats_file_mount(struct array *_map_items, struct array *server_pool) {
  311. int i, status, npool;
  312. pid_t pid;
  313. array_null(_map_items);
  314. pid = getpid();
  315. npool = array_n(server_pool);
  316. status = array_init(_map_items, npool, sizeof(struct _map_item));
  317. if (status < 0) {
  318. log_error("_map_items array init fail,lack of memory");
  319. return -1;
  320. }
  321. for (i = 0; i < npool; i++) {
  322. int nserver, size, j, k, l,ninstance;
  323. uint8_t *pos;
  324. struct _map_item *mi = array_push(_map_items);
  325. struct server_pool *sp = array_get(server_pool, i);
  326. nserver = array_n(&sp->server);
  327. ninstance = 0;
  328. for (j = 0; j < nserver; j++)
  329. {
  330. struct server *s;
  331. s = array_get(&sp->server,j);
  332. ninstance += array_n(&s->high_ptry_ins);
  333. ninstance += array_n(&s->low_prty_ins);
  334. }
  335. size = sizeof(struct stats_file_pool_head)
  336. + STATS_POOL_NFIELD * sizeof(struct stats_file_item)
  337. + ninstance
  338. * (sizeof(struct stats_file_server_head)
  339. + STATS_SERVER_NFIELD
  340. * sizeof(struct stats_file_item));
  341. da_snprintf(mi->filename, 256, "%s_%s_%d", STATS_FILE, sp->name.data,
  342. pid);
  343. mi->_map_size = size;
  344. mi->_map_start = mount(mi->filename, O_RDWR, &size);
  345. if (mi->_map_start == NULL) {
  346. log_error("mmap stats file[name:%s] for %s failed", mi->filename,
  347. sp->name.data);
  348. return -1;
  349. }
  350. //reinit the mmap mem
  351. pos = mi->_map_start;
  352. struct stats_file_pool_head * sfph = (struct stats_file_pool_head *) pos;
  353. da_strcpy(sfph->poolname, sp->name.data);
  354. sfph->mid = sp->mid;
  355. sfph->poolfields = STATS_POOL_NFIELD;
  356. sfph->servernum = ninstance;
  357. pos += sizeof(struct stats_file_pool_head);
  358. for (j = 0; j < STATS_POOL_NFIELD; j++) {
  359. struct stats_file_item *sfi = (struct stats_file_item *) pos;
  360. sfi->type = stats_pool_desc[j].type;
  361. sfi->stat_once = 0;
  362. sfi->stat_all = 0;
  363. pos += sizeof(struct stats_file_item);
  364. }
  365. for (j = 0; j < nserver; j++) {
  366. struct server *s = array_get(&sp->server, j);
  367. ninstance = array_n(&s->high_ptry_ins);
  368. for(l = 0; l < ninstance ; l++)
  369. {
  370. struct cache_instance *ins = array_get(&s->high_ptry_ins,l);
  371. struct stats_file_server_head * sfsh =
  372. (struct stats_file_server_head *) pos;
  373. da_strcpy(sfsh->servername, s->name.data);
  374. sfsh->sid = ins->idx;
  375. sfsh->serverfields = STATS_SERVER_NFIELD;
  376. pos += sizeof(struct stats_file_server_head);
  377. for (k = 0; k < STATS_SERVER_NFIELD; k++) {
  378. struct stats_file_item *sfi = (struct stats_file_item *) pos;
  379. sfi->type = stats_pool_desc[k].type;
  380. sfi->stat_once = 0;
  381. sfi->stat_all = 0;
  382. pos += sizeof(struct stats_file_item);
  383. }
  384. }
  385. ninstance = array_n(&s->low_prty_ins);
  386. for (l = 0; l < ninstance; l++)
  387. {
  388. struct cache_instance *ins = array_get(&s->low_prty_ins, l);
  389. struct stats_file_server_head * sfsh =
  390. (struct stats_file_server_head *) pos;
  391. da_strcpy(sfsh->servername, s->name.data);
  392. sfsh->sid = ins->idx;
  393. sfsh->serverfields = STATS_SERVER_NFIELD;
  394. pos += sizeof(struct stats_file_server_head);
  395. for (k = 0; k < STATS_SERVER_NFIELD; k++) {
  396. struct stats_file_item *sfi = (struct stats_file_item *) pos;
  397. sfi->type = stats_pool_desc[k].type;
  398. sfi->stat_once = 0;
  399. sfi->stat_all = 0;
  400. pos += sizeof(struct stats_file_item);
  401. }
  402. }
  403. }
  404. }
  405. return 0;
  406. }
  407. static int stats_file_unmount(struct array *_map_items) {
  408. int i, status;
  409. char path[256];
  410. for (i = 0; i < array_n(_map_items); i++) {
  411. struct _map_item *mi = array_pop(_map_items);
  412. munmap((void *) mi->_map_start, mi->_map_size);
  413. da_strcpy(path, STATS_DIR);
  414. da_strcat(path, mi->filename);
  415. status = unlink(path);
  416. if (status < 0) {
  417. log_error("unlink of pid file '%s' failed, ignored: %s", path,
  418. strerror(errno));
  419. }
  420. }
  421. array_deinit(_map_items);
  422. return 0;
  423. }
  424. static int stats_aggregator_map(struct array *_map_items,
  425. struct array *aggregator) {
  426. int i, j, npool, status;
  427. array_null(aggregator);
  428. npool = array_n(_map_items);
  429. status = array_init(aggregator, npool, sizeof(struct stats_file_pool));
  430. if (status != 0) {
  431. log_error("init stats_file_pool error, lack of memory");
  432. return status;
  433. }
  434. for (i = 0; i < npool; i++) {
  435. struct _map_item *mi = array_get(_map_items, i);
  436. struct stats_file_pool *stfp = array_push(aggregator);
  437. uint8_t *temp = (uint8_t *) mi->_map_start;
  438. stfp->phead = (struct stats_file_pool_head *) temp;
  439. temp += sizeof(struct stats_file_pool_head);
  440. stfp->pool_item_list = (struct stats_file_item *) temp;
  441. temp += sizeof(struct stats_file_item) * stfp->phead->poolfields;
  442. array_null(&stfp->stats_file_servers);
  443. status = array_init(&stfp->stats_file_servers, stfp->phead->servernum,
  444. sizeof(struct stats_file_server));
  445. for (j = 0; j < stfp->phead->servernum; j++) {
  446. struct stats_file_server *stfs = array_push(
  447. &stfp->stats_file_servers);
  448. stfs->shead = (struct stats_file_server_head *) temp;
  449. temp += sizeof(struct stats_file_server_head);
  450. stfs->server_item_list = (struct stats_file_item *) temp;
  451. temp += sizeof(struct stats_file_item) * stfs->shead->serverfields;
  452. }
  453. }
  454. return 0;
  455. }
  456. static int stats_aggregator_unmap(struct array *aggregator) {
  457. int i;
  458. for (i = 0; i < array_n(aggregator); i++) {
  459. array_pop(aggregator);
  460. }
  461. array_deinit(aggregator);
  462. return 0;
  463. }
  464. static void stats_metric_deinit(struct array *metric) {
  465. uint32_t i, nmetric;
  466. nmetric = array_n(metric);
  467. for (i = 0; i < nmetric; i++) {
  468. array_pop(metric);
  469. }
  470. array_deinit(metric);
  471. }
  472. static int stats_pool_metric_init(struct array *stats_metric) {
  473. int status;
  474. uint32_t i, nfield = STATS_POOL_NFIELD;
  475. status = array_init(stats_metric, nfield, sizeof(struct stats_metric));
  476. if (status != 0) {
  477. return status;
  478. }
  479. for (i = 0; i < nfield; i++) {
  480. struct stats_metric *stm = array_push(stats_metric);
  481. /* initialize from pool codec first */
  482. *stm = stats_pool_codec[i];
  483. /* initialize individual metric */
  484. stats_metric_init(stm);
  485. }
  486. return 0;
  487. }
  488. static int stats_server_metric_init(struct stats_server *sts) {
  489. int status;
  490. uint32_t i, nfield = STATS_SERVER_NFIELD;
  491. status = array_init(&sts->metric, nfield, sizeof(struct stats_metric));
  492. if (status != 0) {
  493. return status;
  494. }
  495. for (i = 0; i < nfield; i++) {
  496. struct stats_metric *stm = array_push(&sts->metric);
  497. /* initialize from server codec first */
  498. *stm = stats_server_codec[i];
  499. /* initialize individual metric */
  500. stats_metric_init(stm);
  501. }
  502. return 0;
  503. }
  504. static int stats_server_init(struct stats_server *sts, struct server *s) {
  505. int status;
  506. sts->name = s->name;
  507. array_null(&sts->metric);
  508. status = stats_server_metric_init(sts);
  509. if (status != 0) {
  510. return status;
  511. }
  512. log_debug("init stats server '%.*s' with %"PRIu32" metric", sts->name.len,
  513. sts->name.data, array_n(&sts->metric));
  514. return 0;
  515. }
  516. static int stats_server_map(struct array *stats_server, struct array *server) {
  517. int status;
  518. uint32_t i, nserver, j, ninstance;;
  519. struct stats_server *sts;
  520. int cnt = 0;
  521. nserver = array_n(server);
  522. ASSERT(nserver != 0);
  523. status = array_init(stats_server, nserver*2, sizeof(struct stats_server));
  524. if (status != 0) {
  525. return status;
  526. }
  527. for (i = 0; i < nserver; i++) {
  528. struct server *s = array_get(server, i);
  529. ninstance = array_n(&s->high_ptry_ins);
  530. for (j = 0; j < ninstance; j++)
  531. {
  532. sts = array_push(stats_server);
  533. status = stats_server_init(sts, s);
  534. cnt++;
  535. }
  536. ninstance = array_n(&s->low_prty_ins);
  537. for (j = 0; j < ninstance; j++)
  538. {
  539. sts = array_push(stats_server);
  540. status = stats_server_init(sts, s);
  541. cnt++;
  542. }
  543. if (status != 0) {
  544. return status;
  545. }
  546. }
  547. log_debug("map %"PRIu32" stats servers %"PRIu32" stats instance", nserver, ninstance);
  548. return 0;
  549. }
  550. static void stats_server_unmap(struct array *stats_server) {
  551. uint32_t i, nserver;
  552. nserver = array_n(stats_server);
  553. for (i = 0; i < nserver; i++) {
  554. struct stats_server *sts = array_pop(stats_server);
  555. stats_metric_deinit(&sts->metric);
  556. }
  557. array_deinit(stats_server);
  558. log_debug("unmap %"PRIu32" stats servers", nserver);
  559. }
  560. static int stats_pool_init(struct stats_pool *stp, struct server_pool *sp) {
  561. int status;
  562. stp->mid = sp->mid;
  563. stp->port = sp->port;
  564. stp->name = sp->name;
  565. stp->main_report = sp->main_report;
  566. stp->instance_report = sp->instance_report;
  567. array_null(&stp->metric);
  568. array_null(&stp->server);
  569. status = stats_pool_metric_init(&stp->metric);
  570. if (status != 0) {
  571. return status;
  572. }
  573. status = stats_server_map(&stp->server, &sp->server);
  574. if (status != 0) {
  575. stats_metric_deinit(&stp->metric);
  576. return status;
  577. }
  578. log_debug(
  579. "init stats pool '%.*s' with %"PRIu32" metric and " "%"PRIu32" server",
  580. stp->name.len, stp->name.data, array_n(&stp->metric),
  581. array_n(&stp->metric));
  582. return 0;
  583. }
  584. static int stats_pool_map(struct array *stats_pool, struct array *server_pool) {
  585. int status;
  586. uint32_t i, npool;
  587. npool = array_n(server_pool);
  588. ASSERT(npool != 0);
  589. status = array_init(stats_pool, npool, sizeof(struct stats_pool));
  590. if (status != 0) {
  591. return status;
  592. }
  593. for (i = 0; i < npool; i++) {
  594. struct server_pool *sp = array_get(server_pool, i);
  595. struct stats_pool *stp = array_push(stats_pool);
  596. status = stats_pool_init(stp, sp);
  597. if (status != 0) {
  598. return status;
  599. }
  600. }
  601. log_debug("map %"PRIu32" stats pools", npool);
  602. return 0;
  603. }
  604. static void stats_pool_unmap(struct array *stats_pool) {
  605. uint32_t i, npool;
  606. npool = array_n(stats_pool);
  607. for (i = 0; i < npool; i++) {
  608. struct stats_pool *stp = array_pop(stats_pool);
  609. stats_metric_deinit(&stp->metric);
  610. stats_server_unmap(&stp->server);
  611. }
  612. array_deinit(stats_pool);
  613. log_debug("unmap %"PRIu32" stats pool", npool);
  614. }
  615. struct stats *stats_create(int stats_interval, char * localip, struct array *server_pool) {
  616. int status;
  617. struct stats *st;
  618. st = malloc(sizeof(*st));
  619. if (st == NULL) {
  620. return NULL;
  621. }
  622. array_null(&st->current);
  623. array_null(&st->shadow);
  624. st->interval = stats_interval;
  625. st->start_ts = now_ms;
  626. st->tid = (pthread_t) -1;
  627. st->updated = 0;
  628. st->aggregate = 0;
  629. strncpy(st->localip, localip, sizeof(st->localip));
  630. status = stats_pool_map(&st->current, server_pool);
  631. if (status != 0) {
  632. goto error;
  633. }
  634. status = stats_pool_map(&st->shadow, server_pool);
  635. if (status != 0) {
  636. goto error;
  637. }
  638. status = stats_file_mount(&st->_map_items, server_pool);
  639. if (status != 0) {
  640. goto error;
  641. }
  642. status = stats_aggregator_map(&st->_map_items, &st->aggregator);
  643. if (status != 0) {
  644. goto error;
  645. }
  646. status = stats_start_aggregator(st);
  647. if (status != 0) {
  648. goto error;
  649. }
  650. return st;
  651. error: stats_destroy(st);
  652. return NULL;
  653. }
  654. void stats_destroy(struct stats *st) {
  655. stats_stop_aggregator(st);
  656. stats_aggregator_unmap(&st->aggregator);
  657. stats_file_unmount(&st->_map_items);
  658. stats_pool_unmap(&st->shadow);
  659. stats_pool_unmap(&st->current);
  660. free(st);
  661. }
  662. void stats_swap(struct stats *st) {
  663. if (!stats_enabled) {
  664. return;
  665. }
  666. if (st->aggregate == 1) {
  667. //log_debug("skip swap of current %p shadow %p as aggregator "
  668. // "is busy", st->current.elem, st->shadow.elem);
  669. return;
  670. }
  671. if (st->updated == 0) {
  672. //log_debug("skip swap of current %p shadow %p as there is "
  673. // "nothing new", st->current.elem, st->shadow.elem);
  674. return;
  675. }
  676. //log_debug("swap stats current %p shadow %p", st->current.elem,
  677. // st->shadow.elem);
  678. //shadow has been reset
  679. array_swap(&st->current, &st->shadow);
  680. st->updated = 0;
  681. st->aggregate = 1;
  682. }
  683. static struct stats_metric *stats_pool_to_metric(struct context *ctx,
  684. struct server_pool *pool, stats_pool_field_t fidx) {
  685. struct stats *st;
  686. struct stats_pool *stp;
  687. struct stats_metric *stm;
  688. uint32_t pidx;
  689. pidx = pool->idx;
  690. st = ctx->stats;
  691. stp = array_get(&st->current, pidx);
  692. stm = array_get(&stp->metric, fidx);
  693. st->updated = 1;
  694. log_debug("metric '%.*s' in pool %"PRIu32"", stm->name.len, stm->name.data,
  695. pidx);
  696. return stm;
  697. }
  698. void _stats_pool_incr(struct context *ctx, struct server_pool *pool,
  699. stats_pool_field_t fidx) {
  700. struct stats_metric *stm;
  701. stm = stats_pool_to_metric(ctx, pool, fidx);
  702. ASSERT(stm->type == STATS_COUNTER || stm->type == STATS_GAUGE);
  703. stm->value.counter++;
  704. log_debug("incr field '%.*s' to %"PRId64"", stm->name.len, stm->name.data,
  705. stm->value.counter);
  706. }
  707. void _stats_pool_decr(struct context *ctx, struct server_pool *pool,
  708. stats_pool_field_t fidx) {
  709. struct stats_metric *stm;
  710. stm = stats_pool_to_metric(ctx, pool, fidx);
  711. ASSERT(stm->type == STATS_GAUGE);
  712. stm->value.counter--;
  713. log_debug("decr field '%.*s' to %"PRId64"", stm->name.len, stm->name.data,
  714. stm->value.counter);
  715. }
  716. void _stats_pool_incr_by(struct context *ctx, struct server_pool *pool,
  717. stats_pool_field_t fidx, int64_t val) {
  718. struct stats_metric *stm;
  719. stm = stats_pool_to_metric(ctx, pool, fidx);
  720. ASSERT(stm->type == STATS_COUNTER || stm->type == STATS_GAUGE);
  721. stm->value.counter += val;
  722. log_debug("incr by field '%.*s' to %"PRId64"", stm->name.len,
  723. stm->name.data, stm->value.counter);
  724. }
  725. void _stats_pool_decr_by(struct context *ctx, struct server_pool *pool,
  726. stats_pool_field_t fidx, int64_t val) {
  727. struct stats_metric *stm;
  728. stm = stats_pool_to_metric(ctx, pool, fidx);
  729. ASSERT(stm->type == STATS_GAUGE);
  730. stm->value.counter -= val;
  731. log_debug("decr by field '%.*s' to %"PRId64"", stm->name.len,
  732. stm->name.data, stm->value.counter);
  733. }
  734. void _stats_pool_set_ts(struct context *ctx, struct server_pool *pool,
  735. stats_pool_field_t fidx, int64_t val) {
  736. struct stats_metric *stm;
  737. stm = stats_pool_to_metric(ctx, pool, fidx);
  738. ASSERT(stm->type == STATS_TIMESTAMP);
  739. stm->value.timestamp = val;
  740. log_debug("set ts field '%.*s' to %"PRId64"", stm->name.len, stm->name.data,
  741. stm->value.timestamp);
  742. }
  743. static struct stats_metric *
  744. stats_server_to_metric(struct context *ctx, struct cache_instance *ins,
  745. stats_server_field_t fidx) {
  746. struct stats *st;
  747. struct stats_pool *stp;
  748. struct stats_server *sts;
  749. struct stats_metric *stm;
  750. struct server_pool *sp;
  751. struct server *server,*tmp_server;
  752. uint32_t pidx, sidx, i, n ;
  753. n = 0;
  754. sidx = ins->idx;
  755. server = ins->owner;
  756. pidx = server->owner->idx;
  757. sp = server->owner;
  758. for (i = 0; i < server->idx; i++)
  759. {
  760. tmp_server = array_get(&sp->server, i);
  761. n += array_n(&tmp_server->high_ptry_ins);
  762. n += array_n(&tmp_server->low_prty_ins);
  763. }
  764. sidx += n;
  765. st = ctx->stats;
  766. stp = array_get(&st->current, pidx);
  767. sts = array_get(&stp->server, sidx);
  768. stm = array_get(&sts->metric, fidx);
  769. st->updated = 1;
  770. log_debug("metric '%.*s' in pool %"PRIu32" server %"PRIu32"", stm->name.len,
  771. stm->name.data, pidx, sidx);
  772. return stm;
  773. }
  774. void _stats_server_incr(struct context *ctx, struct cache_instance *ins,
  775. stats_server_field_t fidx) {
  776. struct stats_metric *stm;
  777. stm = stats_server_to_metric(ctx, ins, fidx);
  778. ASSERT(stm->type == STATS_COUNTER || stm->type == STATS_GAUGE);
  779. stm->value.counter++;
  780. log_debug("incr field '%.*s' to %"PRId64"", stm->name.len, stm->name.data,
  781. stm->value.counter);
  782. }
  783. void _stats_server_decr(struct context *ctx, struct cache_instance *ins,
  784. stats_server_field_t fidx) {
  785. struct stats_metric *stm;
  786. stm = stats_server_to_metric(ctx, ins, fidx);
  787. ASSERT(stm->type == STATS_GAUGE);
  788. stm->value.counter--;
  789. log_debug("decr field '%.*s' to %"PRId64"", stm->name.len, stm->name.data,
  790. stm->value.counter);
  791. }
  792. void _stats_server_incr_by(struct context *ctx, struct cache_instance *ins,
  793. stats_server_field_t fidx, int64_t val) {
  794. struct stats_metric *stm;
  795. stm = stats_server_to_metric(ctx, ins, fidx);
  796. ASSERT(stm->type == STATS_COUNTER || stm->type == STATS_GAUGE);
  797. stm->value.counter += val;
  798. log_debug("incr by field '%.*s' to %"PRId64"", stm->name.len,
  799. stm->name.data, stm->value.counter);
  800. }
  801. void _stats_server_decr_by(struct context *ctx, struct cache_instance *ins,
  802. stats_server_field_t fidx, int64_t val) {
  803. struct stats_metric *stm;
  804. stm = stats_server_to_metric(ctx, ins, fidx);
  805. ASSERT(stm->type == STATS_GAUGE);
  806. stm->value.counter -= val;
  807. log_debug("decr by field '%.*s' to %"PRId64"", stm->name.len,
  808. stm->name.data, stm->value.counter);
  809. }
  810. void _stats_server_set_ts(struct context *ctx, struct cache_instance *ins,
  811. stats_server_field_t fidx, int64_t val) {
  812. struct stats_metric *stm;
  813. stm = stats_server_to_metric(ctx, ins, fidx);
  814. ASSERT(stm->type == STATS_TIMESTAMP);
  815. stm->value.timestamp = val;
  816. log_debug("set ts field '%.*s' to %"PRId64"", stm->name.len, stm->name.data,
  817. stm->value.timestamp);
  818. }