TracerConsumer.java 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package com.jd.platform.jlog.worker.disruptor;
  2. import com.alibaba.fastjson.JSON;
  3. import com.jd.platform.jlog.common.model.RunLogMessage;
  4. import com.jd.platform.jlog.common.model.TracerBean;
  5. import com.jd.platform.jlog.common.model.TracerData;
  6. import com.jd.platform.jlog.common.utils.FastJsonUtils;
  7. import com.jd.platform.jlog.common.utils.ProtostuffUtils;
  8. import com.jd.platform.jlog.common.utils.ZstdUtils;
  9. import com.jd.platform.jlog.worker.store.TracerLogToDbStore;
  10. import com.jd.platform.jlog.worker.store.TracerModelToDbStore;
  11. import com.lmax.disruptor.WorkHandler;
  12. import io.netty.util.internal.StringUtil;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import java.time.Instant;
  16. import java.time.LocalDateTime;
  17. import java.time.ZoneId;
  18. import java.time.format.DateTimeFormatter;
  19. import java.util.*;
  20. import java.util.concurrent.atomic.LongAdder;
  21. /**
  22. * TracerConsumer
  23. *
  24. * @author wuweifeng
  25. * @version 1.0
  26. * @date 2021-08-24
  27. */
  28. public class TracerConsumer implements WorkHandler<OneTracer> {
  29. /**
  30. * 已消费完毕的总数量
  31. */
  32. private static final LongAdder totalDealCount = new LongAdder();
  33. /**
  34. * logger
  35. */
  36. private Logger logger = LoggerFactory.getLogger(getClass());
  37. /**
  38. * 待入库队列,出入参model
  39. */
  40. private TracerModelToDbStore tracerModelToDbStore;
  41. /**
  42. * 待入库队列,普通日志
  43. */
  44. private TracerLogToDbStore tracerLogToDbStore;
  45. private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  46. public TracerConsumer(TracerModelToDbStore tracerModelToDbStore, TracerLogToDbStore tracerLogToDbStore) {
  47. this.tracerModelToDbStore = tracerModelToDbStore;
  48. this.tracerLogToDbStore = tracerLogToDbStore;
  49. }
  50. @Override
  51. public void onEvent(OneTracer oneTracer) {
  52. try {
  53. long totalConsume = totalDealCount.longValue();
  54. boolean needInfo = totalConsume % 1000 == 0;
  55. //压缩后的字节数组
  56. byte[] decompressBytes = ZstdUtils.decompressBytes(oneTracer.getBytes());
  57. TracerData tracerData = ProtostuffUtils.deserialize(decompressBytes, TracerData.class);
  58. System.out.println("从事件中获取并解压的数据="+ JSON.toJSONString(tracerData));
  59. //包含了多个tracer对象
  60. List<TracerBean> tracerBeanList = tracerData.getTracerBeanList();
  61. buildTracerModel(tracerBeanList);
  62. //处理完毕,将数量加1
  63. totalDealCount.increment();
  64. } catch (Exception e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. /**
  69. * 构建要入库的对象
  70. */
  71. private void buildTracerModel(List<TracerBean> tracerBeanList) {
  72. //遍历传过来的
  73. for (TracerBean tracerBean : tracerBeanList) {
  74. //普通日志
  75. if ("-1".equals(tracerBean.getTracerId())) {
  76. dealTracerLog(tracerBean);
  77. } else {
  78. dealFilterModel(tracerBean);
  79. }
  80. }
  81. }
  82. /**
  83. * 处理中途日志
  84. */
  85. private void dealTracerLog(TracerBean tracerBean) {
  86. List<Map<String, Object>> mapList = tracerBean.getTracerObject();
  87. Map<String, Object> objectMap = mapList.get(0);
  88. //遍历value集合,里面每个都是一个RunLogMessage对象
  89. for (Object object :objectMap.values()) {
  90. Map<String, Object> map = new HashMap<>();
  91. RunLogMessage runLogMessage = (RunLogMessage) object;
  92. map.put("tracerId", runLogMessage.getTracerId());
  93. map.put("className", runLogMessage.getClassName());
  94. map.put("threadName", runLogMessage.getThreadName());
  95. map.put("methodName", runLogMessage.getMethodName());
  96. map.put("logLevel", runLogMessage.getLogLevel());
  97. map.put("content", runLogMessage.getContent());
  98. map.putAll(runLogMessage.getTagMap());
  99. tracerLogToDbStore.offer(map);
  100. }
  101. }
  102. /**
  103. * 处理filter里处理的出入参
  104. */
  105. private void dealFilterModel(TracerBean tracerBean) {
  106. List<Map<String, Object>> mapList = tracerBean.getTracerObject();
  107. Map<String, Object> requestMap = mapList.get(0);
  108. Map<String, Object> map = new HashMap<>(requestMap);
  109. long tracerId = Long.parseLong(tracerBean.getTracerId());
  110. //filter的出入参
  111. Map<String, Object> responseMap = mapList.get(mapList.size() - 1);
  112. byte[] responseBytes = "default".getBytes();
  113. if (responseMap.get("response") != null) {
  114. responseBytes = (byte[]) responseMap.get("response");
  115. }
  116. map.put("responseContent", responseBytes);
  117. map.put("costTime", tracerBean.getCostTime());
  118. map.put("tracerId", tracerId);
  119. map.put("createTime", formatLongTime(tracerBean.getCreateTime()));
  120. responseMap.remove("response");
  121. map.putAll(responseMap);
  122. tracerModelToDbStore.offer(map);
  123. }
  124. private static String formatLongTime(long time) {
  125. return DEFAULT_FORMATTER.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(time),ZoneId.systemDefault()));
  126. }
  127. }