TracerLogToDbStore.java 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package com.jd.platform.jlog.worker.store;
  2. import com.jd.platform.jlog.common.utils.AsyncPool;
  3. import com.jd.platform.jlog.common.utils.AsyncWorker;
  4. import com.jd.platform.jlog.worker.db.Db;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.Resource;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. import java.util.Map;
  13. import java.util.Random;
  14. import java.util.concurrent.LinkedBlockingQueue;
  15. import java.util.concurrent.TimeUnit;
  16. import java.util.concurrent.atomic.LongAdder;
  17. /**
  18. * 解析好的数据暂存和入库
  19. * @author wuweifeng
  20. * @version 1.0
  21. * @date 2021-08-21
  22. */
  23. @Component
  24. public class TracerLogToDbStore {
  25. /**
  26. * 待入库的数据
  27. */
  28. private LinkedBlockingQueue<Map<String, Object>> logQueue;
  29. /**
  30. * logger
  31. */
  32. private Logger logger = LoggerFactory.getLogger(getClass());
  33. /**
  34. * db
  35. */
  36. @Resource
  37. private Db db;
  38. /**
  39. * 已入库总数量
  40. */
  41. private final LongAdder totalInsertCount = new LongAdder();
  42. /**
  43. * 每批往ck写多少条
  44. */
  45. @Value("${log.batchSize}")
  46. private String batchSize;
  47. /**
  48. * 几个线程去入库
  49. */
  50. @Value("${log.poolSize}")
  51. private String poolSize;
  52. /**
  53. * 间隔几秒入库
  54. */
  55. @Value("${log.insertInterval}")
  56. private int interval;
  57. /**
  58. * 待入库队列长度
  59. */
  60. @Value("${log.preDbSize}")
  61. private int preDbSize;
  62. /**
  63. * 写入队列
  64. */
  65. public void offer(Map<String, Object> map) {
  66. boolean success = logQueue.offer(map);
  67. //如果队列已满,则做其他处理
  68. if (!success) {
  69. }
  70. }
  71. /**
  72. * 入库
  73. */
  74. public void beginIntoDb() {
  75. logger.info("logQueue init success");
  76. //初始化队列长度
  77. logQueue = new LinkedBlockingQueue<>(preDbSize);
  78. int pool = Integer.parseInt(poolSize);
  79. for (int i = 0; i < pool; i++) {
  80. AsyncPool.asyncDo(() -> {
  81. try {
  82. Thread.sleep(new Random().nextInt(3000));
  83. } catch (InterruptedException e) {
  84. e.printStackTrace();
  85. }
  86. while (true) {
  87. try {
  88. List<Map<String, Object>> tempModels = new ArrayList<>();
  89. //每1s入库一次
  90. AsyncWorker.drain(logQueue, tempModels, Integer.valueOf(batchSize), interval, TimeUnit.SECONDS);
  91. if (tempModels.size() == 0) {
  92. continue;
  93. }
  94. //批量插入
  95. int successCount = db.insertAll("tracer_log", tempModels);
  96. totalInsertCount.add(successCount);
  97. logger.info("log成功入库 " + tempModels.size() + " 条, 累计已入库 " + totalInsertCount.longValue() + ", 待入库队列size " + logQueue.size());
  98. } catch (Exception e) {
  99. e.printStackTrace();
  100. }
  101. }
  102. });
  103. }
  104. }
  105. }