config_full.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. #include <fstream>
  2. #include "config.h"
  3. #include "workflow/WFGlobal.h"
  4. #include "workflow/UpstreamManager.h"
  5. #include "workflow/UpstreamPolicies.h"
  6. #include "srpc/rpc_metrics_filter.h"
  7. #include "srpc/rpc_trace_filter.h"
  8. using namespace srpc;
  9. // default upstream_route_t
  10. static unsigned int default_consistent_hash(const char *path, const char *query,
  11. const char *fragment)
  12. {
  13. return 0;
  14. }
  15. static unsigned int default_select_route(const char *path, const char *query,
  16. const char *fragment)
  17. {
  18. return 0;
  19. }
  20. static void set_endpoint_params(const wfrest::Json& data,
  21. struct EndpointParams *params)
  22. {
  23. *params = ENDPOINT_PARAMS_DEFAULT;
  24. for (const auto& it : data)
  25. {
  26. if (it.key() == "max_connections")
  27. params->max_connections = data["max_connections"];
  28. else if (it.key() == "connect_timeout")
  29. params->connect_timeout = data["connect_timeout"];
  30. else if (it.key() == "response_timeout")
  31. params->response_timeout = data["response_timeout"];
  32. else if (it.key() == "ssl_connect_timeout")
  33. params->ssl_connect_timeout = data["ssl_connect_timeout"];
  34. else if (it.key() == "use_tls_sni")
  35. params->use_tls_sni = data["use_tls_sni"];
  36. else
  37. {
  38. printf("[INFO][set_endpoint_params] Unknown key : %s\n",
  39. it.key().c_str());
  40. }
  41. }
  42. }
  43. static void load_global(const wfrest::Json& data)
  44. {
  45. struct WFGlobalSettings settings = GLOBAL_SETTINGS_DEFAULT;
  46. std::string resolv_conf_path;
  47. std::string hosts_path;
  48. for (const auto& it : data)
  49. {
  50. if (it.key() == "endpoint_params")
  51. {
  52. set_endpoint_params(data["endpoint_params"],
  53. &settings.endpoint_params);
  54. }
  55. else if (it.key() == "dns_server_params")
  56. {
  57. set_endpoint_params(data["dns_server_params"],
  58. &settings.dns_server_params);
  59. }
  60. else if (it.key() == "dns_ttl_default")
  61. settings.dns_ttl_default = data["dns_ttl_default"];
  62. else if (it.key() == "dns_ttl_min")
  63. settings.dns_ttl_min = data["dns_ttl_min"];
  64. else if (it.key() == "dns_threads")
  65. settings.dns_threads = data["dns_threads"];
  66. else if (it.key() == "poller_threads")
  67. settings.poller_threads = data["poller_threads"];
  68. else if (it.key() == "handler_threads")
  69. settings.handler_threads = data["handler_threads"];
  70. else if (it.key() == "compute_threads")
  71. settings.compute_threads = data["compute_threads"];
  72. else if (it.key() == "resolv_conf_path")
  73. {
  74. resolv_conf_path = data["resolv_conf_path"].get<std::string>();
  75. settings.resolv_conf_path = resolv_conf_path.c_str();
  76. }
  77. else if (it.key() == "hosts_path")
  78. {
  79. hosts_path = data["hosts_path"].get<std::string>();
  80. settings.hosts_path = hosts_path.c_str();
  81. }
  82. else
  83. printf("[INFO][load_global] Unknown key : %s\n", it.key().c_str());
  84. }
  85. WORKFLOW_library_init(&settings);
  86. }
  87. static bool load_upstream_server(const wfrest::Json& data,
  88. std::vector<std::string>& hosts,
  89. std::vector<AddressParams>& params)
  90. {
  91. AddressParams param;
  92. hosts.clear();
  93. params.clear();
  94. for (const auto& server : data)
  95. {
  96. if (server.has("host") == false)
  97. {
  98. printf("[ERROR][load_upstream] Invalid upstream server\n");
  99. continue;
  100. }
  101. param = ADDRESS_PARAMS_DEFAULT;
  102. if (server.has("params"))
  103. {
  104. for (const auto& p : server["params"])
  105. {
  106. if (p.key() == "endpoint_params")
  107. set_endpoint_params(p.value(), &param.endpoint_params);
  108. else if (p.key() == "weight")
  109. param.weight = p.value().get<unsigned short>();
  110. else if (p.key() == "max_fails")
  111. param.max_fails = p.value().get<unsigned int>();
  112. else if (p.key() == "dns_ttl_default")
  113. param.dns_ttl_default = p.value().get<unsigned int>();
  114. else if (p.key() == "dns_ttl_min")
  115. param.dns_ttl_min = p.value().get<unsigned int>();
  116. else if (p.key() == "server_type")
  117. param.server_type = p.value().get<int>();
  118. else if (p.key() == "group_id")
  119. param.group_id = p.value().get<int>();
  120. else
  121. printf("[ERROR][load_upstream] Invalid params: %s\n",
  122. p.key().c_str());
  123. }
  124. }
  125. hosts.push_back(server["host"]);
  126. params.push_back(param);
  127. }
  128. if (hosts.size() == 0)
  129. return false;
  130. else
  131. return true;
  132. }
  133. static void load_upstream(const wfrest::Json& data)
  134. {
  135. std::string name;
  136. std::string type;
  137. bool try_another;
  138. std::vector<std::string> hosts;
  139. std::vector<AddressParams> params;
  140. for (const auto& it : data)
  141. {
  142. if (it.has("name") == false ||
  143. it.has("type") == false ||
  144. it.has("server") == false ||
  145. load_upstream_server(it["server"], hosts, params) == false)
  146. {
  147. printf("[ERROR][load_upstream] Invalid upstream\n");
  148. continue;
  149. }
  150. name = it["name"].get<std::string>();
  151. type = it["type"].get<std::string>();
  152. if (it.has("try_another"))
  153. try_another = it["try_another"];
  154. else
  155. try_another = false;
  156. if (type == "weighted_random")
  157. {
  158. UpstreamManager::upstream_create_weighted_random(name, try_another);
  159. }
  160. else if (type == "consistent_hash")
  161. {
  162. UpstreamManager::upstream_create_consistent_hash(name,
  163. default_consistent_hash);
  164. }
  165. else if (type == "round_robin")
  166. {
  167. UpstreamManager::upstream_create_round_robin(name, try_another);
  168. }
  169. else if (type == "manual")
  170. {
  171. UpstreamManager::upstream_create_manual(name,
  172. default_select_route,
  173. try_another,
  174. default_consistent_hash);
  175. }
  176. else if (type == "vnswrr")
  177. {
  178. UpstreamManager::upstream_create_vnswrr(name);
  179. }
  180. else
  181. {
  182. printf("[INFO][load_upstream] Unknown type : %s\n", type.c_str());
  183. continue;
  184. }
  185. for (size_t i = 0; i < hosts.size(); i++)
  186. UpstreamManager::upstream_add_server(name, hosts[i], &params[i]);
  187. }
  188. }
  189. void RPCConfig::load_server()
  190. {
  191. if (this->data["server"].has("port"))
  192. this->s_port = this->data["server"]["port"];
  193. if (this->data["server"].has("root"))
  194. this->root_path = this->data["server"]["root"].get<std::string>();
  195. if (this->data["server"].has("cert_file"))
  196. this->s_cert_file = this->data["server"]["cert_file"].get<std::string>();
  197. if (this->data["server"].has("file_key"))
  198. this->s_file_key = this->data["server"]["file_key"].get<std::string>();
  199. if (this->data["server"].has("error_page"))
  200. {
  201. for (const auto& it : this->data["server"]["error_page"])
  202. {
  203. std::string page;
  204. if (it.has("error") == true && it.has("error") == true)
  205. {
  206. page = it["page"].get<std::string>();
  207. for (const auto& e : it["error"])
  208. this->error_page.insert(std::make_pair(e.get<int>(), page));
  209. }
  210. else
  211. {
  212. printf("[ERROR][load_file_service] Invalid error_page\n");
  213. continue;
  214. }
  215. }
  216. }
  217. }
  218. void RPCConfig::load_client()
  219. {
  220. if (this->data["client"].has("remote_host"))
  221. this->c_host = this->data["client"]["remote_host"].get<std::string>();
  222. if (this->data["client"].has("remote_port"))
  223. this->c_port = this->data["client"]["remote_port"];
  224. if (this->data["client"].has("redirect_max"))
  225. this->c_redirect_max = this->data["client"]["redirect_max"];
  226. if (this->data["client"].has("retry_max"))
  227. this->c_retry_max = this->data["client"]["retry_max"];
  228. if (this->data["client"].has("user_name"))
  229. this->c_user_name = this->data["client"]["user_name"].get<std::string>();
  230. if (this->data["client"].has("password"))
  231. this->c_password = this->data["client"]["password"].get<std::string>();
  232. }
  233. bool RPCConfig::load(const char *file)
  234. {
  235. FILE *fp = fopen(file, "r");
  236. if (!fp)
  237. return false;
  238. this->data = wfrest::Json::parse(fp);
  239. fclose(fp);
  240. if (this->data.is_valid() == false)
  241. return false;
  242. for (const auto& it : this->data)
  243. {
  244. if (it.key() == "server")
  245. this->load_server();
  246. else if (it.key() == "client")
  247. this->load_client();
  248. else if (it.key() == "global")
  249. load_global(it.value());
  250. else if (it.key() == "upstream")
  251. load_upstream(it.value());
  252. else if (it.key() == "metrics")
  253. this->load_metrics();
  254. else if (it.key() == "trace")
  255. this->load_trace();
  256. else
  257. printf("[INFO][RPCConfig::load] Unknown key: %s\n", it.key().c_str());
  258. }
  259. return true;
  260. };
  261. void RPCConfig::load_metrics()
  262. {
  263. for (const auto& it : this->data["metrics"])
  264. {
  265. if (it.has("filter") == false)
  266. continue;
  267. std::string filter_name = it["filter"];
  268. if (filter_name.compare("prometheus") == 0)
  269. {
  270. if (it.has("port") == false)
  271. continue;
  272. RPCMetricsPull *filter = new RPCMetricsPull();
  273. unsigned short port = it["port"];
  274. filter->init(port);
  275. this->filters.push_back(filter);
  276. }
  277. else if (filter_name.compare("opentelemetry") == 0)
  278. {
  279. if (it.has("address") == false)
  280. continue;
  281. std::string name = it["filter_name"];
  282. std::string url = it["address"];
  283. unsigned int redirect_max = OTLP_HTTP_REDIRECT_MAX;
  284. unsigned int retry_max = OTLP_HTTP_RETRY_MAX;
  285. size_t report_threshold = RPC_REPORT_THREHOLD_DEFAULT;
  286. size_t report_interval = RPC_REPORT_INTERVAL_DEFAULT;
  287. if (it.has("redirect_max"))
  288. redirect_max = it["redirect_max"];
  289. if (it.has("retry_max"))
  290. retry_max = it["retry_max"];
  291. if (it.has("report_threshold"))
  292. report_threshold = it["report_threshold"];
  293. if (it.has("report_interval_ms"))
  294. report_interval = it["report_interval_ms"];
  295. RPCMetricsOTel *filter = new RPCMetricsOTel(name,
  296. url,
  297. redirect_max,
  298. retry_max,
  299. report_threshold,
  300. report_interval);
  301. if (it.has("attributes"))
  302. {
  303. for (const auto& kv : it["attributes"])
  304. {
  305. if (kv.has("key") == false || kv.has("value") == false)
  306. continue;
  307. filter->add_attributes(kv["key"], kv["value"]);
  308. }
  309. }
  310. this->filters.push_back(filter);
  311. }
  312. else
  313. {
  314. printf("[ERROR][RPCConfig::load_metrics] Unknown metrics: %s\n",
  315. filter_name.c_str());
  316. }
  317. }
  318. }
  319. void RPCConfig::load_trace()
  320. {
  321. for (const auto& it : this->data["trace"])
  322. {
  323. if (it.has("filter") == false)
  324. continue;
  325. std::string filter_name = it["filter"];
  326. size_t spans_per_second = SPANS_PER_SECOND_DEFAULT;
  327. if (filter_name.compare("default") == 0)
  328. {
  329. if (it.has("spans_per_second"))
  330. spans_per_second = it["spans_per_second"];
  331. auto *filter = new RPCTraceDefault(spans_per_second);
  332. this->filters.push_back(filter);
  333. }
  334. else if (filter_name.compare("opentelemetry") == 0)
  335. {
  336. if (it.has("address") == false)
  337. continue;
  338. std::string url = it["address"];
  339. unsigned int redirect_max = OTLP_HTTP_REDIRECT_MAX;
  340. unsigned int retry_max = OTLP_HTTP_RETRY_MAX;
  341. size_t report_threshold = RPC_REPORT_THREHOLD_DEFAULT;
  342. size_t report_interval = RPC_REPORT_INTERVAL_DEFAULT;
  343. if (it.has("redirect_max"))
  344. redirect_max = it["redirect_max"];
  345. if (it.has("retry_max"))
  346. retry_max = it["retry_max"];
  347. if (it.has("report_threshold"))
  348. report_threshold = it["report_threshold"];
  349. if (it.has("report_interval_ms"))
  350. report_interval = it["report_interval_ms"];
  351. auto *filter = new RPCTraceOpenTelemetry(url,
  352. redirect_max,
  353. retry_max,
  354. spans_per_second,
  355. report_threshold,
  356. report_interval);
  357. if (it.has("attributes"))
  358. {
  359. for (const auto& kv : it["attributes"])
  360. {
  361. if (kv.has("key") == false || kv.has("value") == false)
  362. continue;
  363. filter->add_attributes(kv["key"], kv["value"]);
  364. }
  365. }
  366. this->filters.push_back(filter);
  367. }
  368. else
  369. {
  370. printf("[ERROR][RPCConfig::load_metrics] Unknown metrics: %s\n",
  371. filter_name.c_str());
  372. }
  373. }
  374. }
  375. void RPCConfig::load_filter(SRPCServer& server)
  376. {
  377. for (auto *filter : this->filters)
  378. server.add_filter(filter);
  379. }
  380. void RPCConfig::load_filter(SRPCClient& client)
  381. {
  382. for (auto *filter : this->filters)
  383. client.add_filter(filter);
  384. }
  385. void RPCConfig::load_filter(SRPCHttpServer& server)
  386. {
  387. for (auto *filter : this->filters)
  388. server.add_filter(filter);
  389. }
  390. void RPCConfig::load_filter(SRPCHttpClient& client)
  391. {
  392. for (auto *filter : this->filters)
  393. client.add_filter(filter);
  394. }
  395. void RPCConfig::load_filter(BRPCServer& server)
  396. {
  397. for (auto *filter : this->filters)
  398. server.add_filter(filter);
  399. }
  400. void RPCConfig::load_filter(BRPCClient& client)
  401. {
  402. for (auto *filter : this->filters)
  403. client.add_filter(filter);
  404. }
  405. void RPCConfig::load_filter(ThriftServer& server)
  406. {
  407. for (auto *filter : this->filters)
  408. server.add_filter(filter);
  409. }
  410. void RPCConfig::load_filter(ThriftClient& client)
  411. {
  412. for (auto *filter : this->filters)
  413. client.add_filter(filter);
  414. }
  415. void RPCConfig::load_filter(ThriftHttpServer& server)
  416. {
  417. for (auto *filter : this->filters)
  418. server.add_filter(filter);
  419. }
  420. void RPCConfig::load_filter(ThriftHttpClient& client)
  421. {
  422. for (auto *filter : this->filters)
  423. client.add_filter(filter);
  424. }
  425. void RPCConfig::load_filter(TRPCServer& server)
  426. {
  427. for (auto *filter : this->filters)
  428. server.add_filter(filter);
  429. }
  430. void RPCConfig::load_filter(TRPCClient& client)
  431. {
  432. for (auto *filter : this->filters)
  433. client.add_filter(filter);
  434. }
  435. void RPCConfig::load_filter(TRPCHttpServer& server)
  436. {
  437. for (auto *filter : this->filters)
  438. server.add_filter(filter);
  439. }
  440. void RPCConfig::load_filter(TRPCHttpClient& client)
  441. {
  442. for (auto *filter : this->filters)
  443. client.add_filter(filter);
  444. }
  445. RPCConfig::~RPCConfig()
  446. {
  447. for (size_t i = 0; i < this->filters.size(); i++)
  448. delete this->filters[i];
  449. }