Monitor.java 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package com.jd.platform.jlog.client.task;
  2. import com.jd.platform.jlog.client.worker.WorkerInfoHolder;
  3. import com.jd.platform.jlog.core.Configurator;
  4. import com.jd.platform.jlog.core.ConfiguratorFactory;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.util.List;
  8. import java.util.concurrent.Executors;
  9. import java.util.concurrent.ScheduledExecutorService;
  10. import java.util.concurrent.TimeUnit;
  11. /**
  12. * @author tangbohu
  13. * @version 1.0.0
  14. * @ClassName Watchdog.java
  15. * @createTime 2022年02月12日 10:20:00
  16. */
  17. public class Monitor {
  18. private final static Logger LOGGER = LoggerFactory.getLogger(Monitor.class);
  19. /**
  20. * 开始获取workerIp地址并保存</>
  21. * 监听workerIp地址变化
  22. */
  23. public void start() {
  24. fetchWorkerInfo();
  25. }
  26. /**
  27. * 每隔30秒拉取worker信息
  28. */
  29. private void fetchWorkerInfo() {
  30. ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
  31. //开启拉取etcd的worker信息,如果拉取失败,则定时继续拉取
  32. scheduledExecutorService.scheduleAtFixedRate(() -> {
  33. LOGGER.info("trying to connect to config center and fetch worker info");
  34. try {
  35. fetch();
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }, 0, 30, TimeUnit.SECONDS);
  40. }
  41. /**
  42. * 从配置中心获取worker的ip集合
  43. */
  44. private void fetch() throws Exception {
  45. Configurator configurator = ConfiguratorFactory.getInstance();
  46. //获取所有worker的ip
  47. List<String> addresses;
  48. try {
  49. //如果设置了机房属性,则拉取同机房的worker。如果同机房没worker,则拉取所有
  50. addresses = configurator.getList("workers");
  51. //全是空,给个警告
  52. if (addresses == null || addresses.isEmpty()) {
  53. LOGGER.warn("very important warn !!! workers ip info is null!!!");
  54. return;
  55. }
  56. //将对应的worker保存下来
  57. WorkerInfoHolder.mergeAndConnectNew(addresses);
  58. } catch (Exception ex) {
  59. LOGGER.error("config server connected fail. Check the config address!!!");
  60. }
  61. }
  62. }