EtcdListener.java 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package com.jd.platform.jlog.etcd;
  2. import com.google.protobuf.ByteString;
  3. import com.ibm.etcd.api.Event;
  4. import com.ibm.etcd.api.KeyValue;
  5. import com.ibm.etcd.api.RangeResponse;
  6. import com.ibm.etcd.client.kv.KvClient;
  7. import com.ibm.etcd.client.kv.WatchUpdate;
  8. import com.jd.platform.jlog.common.handler.JcProperties;
  9. import com.jd.platform.jlog.common.utils.CollectionUtil;
  10. import com.jd.platform.jlog.common.utils.StringUtil;
  11. import com.jd.platform.jlog.core.ClientHandlerBuilder;
  12. import com.jd.platform.jlog.core.ConfigChangeEvent;
  13. import com.jd.platform.jlog.core.ConfigChangeListener;
  14. import com.jd.platform.jlog.core.ConfigChangeType;
  15. import java.io.IOException;
  16. import java.io.StringReader;
  17. import java.util.List;
  18. import java.util.Set;
  19. import static com.jd.platform.jlog.etcd.EtcdConfigurator.PROPERTIES;
  20. import static com.jd.platform.jlog.etcd.EtcdConfigurator.PROPERTIES_PATH;
  21. /**
  22. * @author tangbohu
  23. * @version 1.0.0
  24. * @ClassName EtcdListener.java
  25. * @Description TODO
  26. * @createTime 2022年02月21日 23:34:00
  27. */
  28. public class EtcdListener implements ConfigChangeListener {
  29. private KvClient.WatchIterator iterator;
  30. public EtcdListener() {
  31. iterator = EtcdConfigurator.client.getKvClient().watch(ByteString.copyFromUtf8(ROOT)).asPrefix().start();
  32. getExecutorService().submit(() -> {
  33. while (iterator.hasNext()){
  34. try {
  35. WatchUpdate update = iterator.next();
  36. Event eve = update.getEvents().get(0);
  37. KeyValue kv = eve.getKv();
  38. Event.EventType eveType = eve.getType();
  39. ConfigChangeType changeType = eveType.equals(Event.EventType.DELETE) ? ConfigChangeType.MODIFY : ConfigChangeType.DELETE;
  40. ConfigChangeEvent event = new ConfigChangeEvent();
  41. event.setKey(kv.getKey().toStringUtf8()).setNewValue(kv.getValue().toStringUtf8()).setChangeType(changeType);
  42. onChangeEvent(event);
  43. }catch (RuntimeException e){
  44. e.printStackTrace();
  45. }
  46. }
  47. });
  48. }
  49. @Override
  50. public void onShutDown() {
  51. this.iterator.close();
  52. getExecutorService().shutdownNow();
  53. }
  54. @Override
  55. public void onChangeEvent(ConfigChangeEvent event) {
  56. RangeResponse rangeResponse = EtcdConfigurator.client.getKvClient().get(ByteString.copyFromUtf8(PROPERTIES_PATH)).sync();
  57. List<KeyValue> keyValues = rangeResponse.getKvsList();
  58. if (CollectionUtil.isEmpty(keyValues)) {
  59. return;
  60. }
  61. String val = keyValues.get(0).getValue().toStringUtf8();
  62. JcProperties props = new JcProperties();
  63. if(StringUtil.isNotBlank(val)){
  64. try {
  65. props.load(new StringReader(val));
  66. } catch (IOException e) {
  67. e.printStackTrace();
  68. }
  69. Set<String> diffKeys = CollectionUtil.diffKeys(props, PROPERTIES);
  70. if(!diffKeys.isEmpty()){
  71. PROPERTIES = props;
  72. for (String diffKey : diffKeys) {
  73. LOGGER.warn("NACOS {} 配置变更 key={} 变更事件:{}", event.getKey(), diffKey,
  74. String.valueOf(props.get(diffKey)),
  75. String.valueOf(PROPERTIES.get(diffKey)));
  76. }
  77. ClientHandlerBuilder.refresh();
  78. }
  79. }
  80. }
  81. }