da_ketama.c 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <stdio.h>
  17. #include <stdlib.h>
  18. #include <math.h>
  19. #include <inttypes.h>
  20. #include "da_hashkit.h"
  21. #include "../da_time.h"
  22. #include "../da_core.h"
  23. #include "../da_log.h"
  24. /*
  25. * 一致性hash节点数
  26. */
  27. #define KETAMA_CONTINUUM_ADDITION 10 /* # extra slots to build into continuum */
  28. #define KETAMA_POINTS_PER_SERVER 160 /* 40 points per hash */
  29. #define KETAMA_MAX_HOSTLEN 86
  30. static uint32_t ketama_hash(const char *key, size_t key_length,
  31. uint32_t alignment) {
  32. unsigned char results[16];
  33. md5_signature((unsigned char*) key, key_length, results);
  34. return ((uint32_t) (results[3 + alignment * 4] & 0xFF) << 24)
  35. | ((uint32_t) (results[2 + alignment * 4] & 0xFF) << 16)
  36. | ((uint32_t) (results[1 + alignment * 4] & 0xFF) << 8)
  37. | (results[0 + alignment * 4] & 0xFF);
  38. }
  39. static int ketama_item_cmp(const void *t1, const void *t2) {
  40. const struct continuum *ct1 = t1, *ct2 = t2;
  41. if (ct1->value == ct2->value) {
  42. return 0;
  43. } else if (ct1->value > ct2->value) {
  44. return 1;
  45. } else {
  46. return -1;
  47. }
  48. }
  49. #if defined DA_COMPATIBLE_MODE && DA_COMPATIBLE_MODE == 1
  50. int ketama_update(struct server_pool *pool) {
  51. uint32_t nserver; /* # server - live and dead */
  52. uint32_t pointer_counter; /* # pointers on continuum */
  53. int pointer_index; /* pointer index */
  54. uint32_t points_per_server; /* points per server */
  55. uint32_t continuum_index; /* continuum index */
  56. uint32_t continuum_addition; /* extra space in the continuum */
  57. uint32_t server_index; /* server index */
  58. uint32_t value; /* continuum value */
  59. ASSERT(array_n(&pool->server) > 0);
  60. nserver = array_n(&pool->server);
  61. continuum_addition = KETAMA_CONTINUUM_ADDITION;
  62. points_per_server = 100;
  63. /*
  64. * Allocate the continuum for the pool, the first time, and every time we
  65. * add a new server to the pool
  66. */
  67. if (nserver > pool->nserver_continuum) {
  68. struct continuum *continuum;
  69. uint32_t nserver_continuum = nserver + continuum_addition;
  70. uint32_t ncontinuum = nserver_continuum * points_per_server;
  71. continuum = realloc(pool->continuum, sizeof(*continuum) * ncontinuum);
  72. if (continuum == NULL) {
  73. return -1;
  74. }
  75. //设置pool的一致性hash环
  76. pool->continuum = continuum;
  77. pool->nserver_continuum = nserver_continuum;
  78. /* pool->ncontinuum is initialized later as it could be <= ncontinuum */
  79. }
  80. /*
  81. * Build a continuum with the servers that are live and points from
  82. * these servers that are proportial to their weight
  83. */
  84. continuum_index = 0;
  85. pointer_counter = 0;
  86. for (server_index = 0; server_index < nserver; server_index++) {
  87. struct server *server;
  88. server = array_get(&pool->server, server_index);
  89. for (pointer_index = 0;
  90. pointer_index < points_per_server;
  91. pointer_index++) {
  92. char host[KETAMA_MAX_HOSTLEN] = "";
  93. size_t hostlen;
  94. hostlen = snprintf(host, KETAMA_MAX_HOSTLEN, "%.*s#%u",
  95. server->name.len, server->name.data, pointer_index);
  96. value = hash_chash(host,hostlen);
  97. pool->continuum[continuum_index].index = server_index;
  98. pool->continuum[continuum_index++].value = value;
  99. }
  100. pointer_counter += points_per_server;
  101. }
  102. pool->ncontinuum = pointer_counter;
  103. //对continum进行排序
  104. qsort(pool->continuum, pool->ncontinuum, sizeof(*pool->continuum),
  105. ketama_item_cmp);
  106. for (pointer_index = 0;
  107. pointer_index < ((nserver * points_per_server) - 1);
  108. pointer_index++) {
  109. if (pointer_index + 1 >= pointer_counter) {
  110. break;
  111. }ASSERT(
  112. pool->continuum[pointer_index].value
  113. <= pool->continuum[pointer_index + 1].value);
  114. }
  115. return 0;
  116. }
  117. #else
  118. int ketama_update(struct server_pool *pool) {
  119. uint32_t nserver; /* # server - live and dead */
  120. uint32_t pointer_per_server; /* pointers per server proportional to weight */
  121. uint32_t pointer_per_hash; /* pointers per hash */
  122. uint32_t pointer_counter; /* # pointers on continuum */
  123. uint32_t pointer_index; /* pointer index */
  124. uint32_t points_per_server; /* points per server */
  125. uint32_t continuum_index; /* continuum index */
  126. uint32_t continuum_addition; /* extra space in the continuum */
  127. uint32_t server_index; /* server index */
  128. uint32_t value; /* continuum value */
  129. uint32_t total_weight; /* total live server weight */
  130. ASSERT(array_n(&pool->server) > 0);
  131. /*
  132. * Count live servers and total weight, and also update the next time to
  133. * rebuild the distribution
  134. */
  135. nserver = array_n(&pool->server);
  136. total_weight = 0;
  137. for (server_index = 0; server_index < nserver; server_index++) {
  138. struct server *server = array_get(&pool->server, server_index);
  139. //‘host:port:weight’ or ‘host:port:weight name’
  140. ASSERT(server->weight > 0);
  141. total_weight += server->weight;
  142. }
  143. continuum_addition = KETAMA_CONTINUUM_ADDITION;
  144. points_per_server = KETAMA_POINTS_PER_SERVER;
  145. /*
  146. * Allocate the continuum for the pool, the first time, and every time we
  147. * add a new server to the pool
  148. */
  149. if (nserver > pool->nserver_continuum) {
  150. struct continuum *continuum;
  151. uint32_t nserver_continuum = nserver + continuum_addition;
  152. uint32_t ncontinuum = nserver_continuum * points_per_server;
  153. continuum = realloc(pool->continuum, sizeof(*continuum) * ncontinuum);
  154. if (continuum == NULL) {
  155. return -1;
  156. }
  157. //设置pool的一致性hash环
  158. pool->continuum = continuum;
  159. pool->nserver_continuum = nserver_continuum;
  160. /* pool->ncontinuum is initialized later as it could be <= ncontinuum */
  161. }
  162. /*
  163. * Build a continuum with the servers that are live and points from
  164. * these servers that are proportial to their weight
  165. */
  166. continuum_index = 0;
  167. pointer_counter = 0;
  168. for (server_index = 0; server_index < nserver; server_index++) {
  169. struct server *server;
  170. float pct;
  171. server = array_get(&pool->server, server_index);
  172. pct = (float) server->weight / (float) total_weight; //权重计算
  173. pointer_per_server = (uint32_t) ((floorf(
  174. (float) (pct * KETAMA_POINTS_PER_SERVER / 4 * (float) nserver
  175. + 0.0000000001))) * 4);
  176. pointer_per_hash = 4;
  177. for (pointer_index = 1;
  178. pointer_index <= pointer_per_server / pointer_per_hash;
  179. pointer_index++) {
  180. char host[KETAMA_MAX_HOSTLEN] = "";
  181. size_t hostlen;
  182. uint32_t x;
  183. hostlen = snprintf(host, KETAMA_MAX_HOSTLEN, "%.*s-%u",
  184. server->name.len, server->name.data, pointer_index - 1);
  185. for (x = 0; x < pointer_per_hash; x++) {
  186. value = ketama_hash(host, hostlen, x);
  187. pool->continuum[continuum_index].index = server_index;
  188. pool->continuum[continuum_index++].value = value;
  189. }
  190. }
  191. pointer_counter += pointer_per_server;
  192. }
  193. pool->ncontinuum = pointer_counter;
  194. //对continum进行排序
  195. qsort(pool->continuum, pool->ncontinuum, sizeof(*pool->continuum),
  196. ketama_item_cmp);
  197. for (pointer_index = 0;
  198. pointer_index < ((nserver * KETAMA_POINTS_PER_SERVER) - 1);
  199. pointer_index++) {
  200. if (pointer_index + 1 >= pointer_counter) {
  201. break;
  202. }ASSERT(
  203. pool->continuum[pointer_index].value
  204. <= pool->continuum[pointer_index + 1].value);
  205. }
  206. return 0;
  207. }
  208. #endif
  209. uint32_t ketama_dispatch(struct continuum *continuum, uint32_t ncontinuum,
  210. uint32_t hash) {
  211. struct continuum *begin, *end, *left, *right, *middle;
  212. ASSERT(continuum != NULL);
  213. ASSERT(ncontinuum != 0);
  214. begin = left = continuum;
  215. end = right = continuum + ncontinuum;
  216. while (left < right) {
  217. middle = left + (right - left) / 2;
  218. if (middle->value < hash) {
  219. left = middle + 1;
  220. } else {
  221. right = middle;
  222. }
  223. }
  224. if (right == end) {
  225. right = begin;
  226. }
  227. log_debug("ncontinuum: %d, idx: %d\n", ncontinuum, right->index);
  228. return right->index;
  229. }