UdpSender.java 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package com.jd.platform.jlog.client.udp;
  2. import com.jd.platform.jlog.client.Context;
  3. import com.jd.platform.jlog.client.modeholder.ModeHolder;
  4. import com.jd.platform.jlog.client.worker.WorkerInfoHolder;
  5. import com.jd.platform.jlog.common.constant.Constant;
  6. import com.jd.platform.jlog.common.constant.LogTypeEnum;
  7. import com.jd.platform.jlog.common.model.TracerBean;
  8. import com.jd.platform.jlog.common.model.RunLogMessage;
  9. import com.jd.platform.jlog.common.model.TracerData;
  10. import com.jd.platform.jlog.common.utils.AsyncPool;
  11. import com.jd.platform.jlog.common.utils.AsyncWorker;
  12. import io.netty.channel.ChannelFuture;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import java.net.InetSocketAddress;
  16. import java.util.*;
  17. import java.util.concurrent.LinkedBlockingQueue;
  18. import java.util.concurrent.TimeUnit;
  19. import java.util.concurrent.atomic.AtomicLong;
  20. /**
  21. * udp对外发消息
  22. *
  23. * @author wuweifeng
  24. * @version 1.0
  25. * @date 2021-08-16
  26. */
  27. public class UdpSender {
  28. /**
  29. * logger
  30. */
  31. private static Logger logger = LoggerFactory.getLogger(UdpSender.class);
  32. /**
  33. * 本地队列满了后丢弃的数量
  34. */
  35. private static final AtomicLong FAIL_OFFER_COUNT = new AtomicLong();
  36. /**
  37. * 本地队列,已写入的总数量
  38. */
  39. private static final AtomicLong SUCCESS_OFFER_COUNT = new AtomicLong();
  40. /**
  41. * 本地logger日志队列,已写入的总数量
  42. */
  43. private static final AtomicLong SUCCESS_LOGGER_OFFER_COUNT = new AtomicLong();
  44. /**
  45. * 出入参集中营,最多积压5万条
  46. */
  47. private static final LinkedBlockingQueue<TracerBean> tracerBeanQueue = new LinkedBlockingQueue<>(50000);
  48. /**
  49. * 日志集中营,最多积压5万条
  50. */
  51. private static final LinkedBlockingQueue<RunLogMessage> logBeanQueue = new LinkedBlockingQueue<>(50000);
  52. /**
  53. * 写入队列
  54. */
  55. public static void offerBean(TracerBean tracerModel) {
  56. //容量是否已满
  57. boolean success = tracerBeanQueue.offer(tracerModel);
  58. if (!success) {
  59. long failCount = FAIL_OFFER_COUNT.incrementAndGet();
  60. if (failCount % 10 == 0) {
  61. logger.info("用户跟踪队列已满,当前丢弃的数量为: " + failCount);
  62. }
  63. } else {
  64. long successCount = SUCCESS_OFFER_COUNT.incrementAndGet();
  65. if (successCount % 1000 == 0) {
  66. logger.info("用户跟踪已产生数量:" + successCount + ",当前队列积压数量:" + tracerBeanQueue.size());
  67. }
  68. }
  69. }
  70. /**
  71. * 写入log队列
  72. */
  73. public static void offerLogger(RunLogMessage runLogMessage) {
  74. //容量是否已满
  75. boolean success = logBeanQueue.offer(runLogMessage);
  76. if (!success) {
  77. long failCount = FAIL_OFFER_COUNT.incrementAndGet();
  78. if (failCount % 10 == 0) {
  79. logger.info("用户Logger队列已满,当前丢弃的数量为: " + failCount);
  80. }
  81. } else {
  82. long successCount = SUCCESS_LOGGER_OFFER_COUNT.incrementAndGet();
  83. if (successCount % 10000 == 0) {
  84. logger.info("用户Logger已产生数量:" + successCount + ",当前队列积压数量:" + logBeanQueue.size());
  85. }
  86. }
  87. }
  88. /**
  89. * 定时向worker发送
  90. */
  91. public static void uploadToWorker() {
  92. //filter拦截到的出入参
  93. AsyncPool.asyncDo(() -> {
  94. while (true) {
  95. try {
  96. List<TracerBean> tempTracers = new ArrayList<>();
  97. TracerBean tracerBean = tracerBeanQueue.take();
  98. tempTracers.add(tracerBean);
  99. TracerData tracerData = new TracerData();
  100. tracerData.setTracerBeanList(tempTracers);
  101. tracerData.setType(LogTypeEnum.SPAN);
  102. send(tracerData);
  103. } catch (Exception e) {
  104. e.printStackTrace();
  105. }
  106. }
  107. });
  108. //用户中途打的各日志
  109. AsyncPool.asyncDo(() -> {
  110. while (true) {
  111. try {
  112. //要么key达到500个,要么达到1秒,就汇总上报给worker一次
  113. List<RunLogMessage> tempLogs = new ArrayList<>();
  114. AsyncWorker.drain(logBeanQueue, tempLogs, 500, 1, TimeUnit.SECONDS);
  115. if (tempLogs.isEmpty()) {
  116. continue;
  117. }
  118. TracerData tracerData = new TracerData();
  119. tracerData.setTempLogs(tempLogs);
  120. tracerData.setType(LogTypeEnum.TRADE);
  121. send(tracerData);
  122. } catch (Exception e) {
  123. e.printStackTrace();
  124. }
  125. }
  126. });
  127. }
  128. /**
  129. * 往worker发traceBean
  130. */
  131. private static void send(TracerData tracerData) throws InterruptedException {
  132. if(!ModeHolder.getSendMode().getUnicast()){
  133. List<String>ips= WorkerInfoHolder.selectWorkers();
  134. for(String ip:ips){
  135. String[] ipPort = ip.split(Constant.SPLITER);
  136. //发往worker的ip
  137. InetSocketAddress remoteAddress = new InetSocketAddress(ipPort[0], Integer.valueOf(ipPort[1]));
  138. tracerData.setAddress(remoteAddress);
  139. ChannelFuture future = Context.CHANNEL.writeAndFlush(tracerData);
  140. //同步操作,否则会出现bug
  141. future.sync();
  142. }
  143. return;
  144. }else {
  145. Context.CHANNEL.writeAndFlush(tracerData);
  146. }
  147. }
  148. }