TracerModelToDbStore.java 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package com.jd.platform.jlog.worker.store;
  2. import com.jd.platform.jlog.common.utils.AsyncPool;
  3. import com.jd.platform.jlog.common.utils.AsyncWorker;
  4. import com.jd.platform.jlog.common.utils.CollectionUtil;
  5. import com.jd.platform.jlog.worker.db.Db;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.stereotype.Component;
  10. import javax.annotation.Resource;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. import java.util.Map;
  14. import java.util.Random;
  15. import java.util.concurrent.LinkedBlockingQueue;
  16. import java.util.concurrent.TimeUnit;
  17. import java.util.concurrent.atomic.LongAdder;
  18. /**
  19. * 解析好的数据暂存和入库
  20. * @author wuweifeng
  21. * @version 1.0
  22. * @date 2021-08-21
  23. */
  24. @Component
  25. public class TracerModelToDbStore {
  26. /**
  27. * 待入库的数据
  28. */
  29. private LinkedBlockingQueue<Map<String, Object>> modelQueue;
  30. /**
  31. * logger
  32. */
  33. private Logger logger = LoggerFactory.getLogger(getClass());
  34. /**
  35. * db
  36. */
  37. @Resource
  38. private Db db;
  39. /**
  40. * 已入库总数量
  41. */
  42. private final LongAdder totalInsertCount = new LongAdder();
  43. /**
  44. * 每批往ck写多少条
  45. */
  46. @Value("${clickhouse.batchSize}")
  47. private String batchSize;
  48. /**
  49. * 几个线程去入库
  50. */
  51. @Value("${clickhouse.poolSize}")
  52. private String poolSize;
  53. /**
  54. * 间隔几秒入库
  55. */
  56. @Value("${clickhouse.insertInterval}")
  57. private int interval;
  58. /**
  59. * 待入库队列长度
  60. */
  61. @Value("${queue.preDbSize}")
  62. private int preDbSize;
  63. /**
  64. * 写入队列
  65. */
  66. public void offer(Map<String, Object> map) {
  67. boolean success = modelQueue.offer(map);
  68. //如果队列已满,则做其他处理
  69. if (!success) {
  70. }
  71. }
  72. /**
  73. * 入库
  74. */
  75. public void beginIntoDb() {
  76. //初始化队列长度
  77. modelQueue = new LinkedBlockingQueue<>(preDbSize);
  78. int pool = Integer.parseInt(poolSize);
  79. for (int i = 0; i < pool; i++) {
  80. AsyncPool.asyncDo(() -> {
  81. try {
  82. Thread.sleep(new Random().nextInt(8000));
  83. } catch (InterruptedException e) {
  84. e.printStackTrace();
  85. }
  86. while (true) {
  87. try {
  88. List<Map<String, Object>> tempModels = new ArrayList<>();
  89. //每1s入库一次
  90. AsyncWorker.drain(modelQueue, tempModels, Integer.parseInt(batchSize), interval, TimeUnit.SECONDS);
  91. if (CollectionUtil.isEmpty(tempModels)) {
  92. continue;
  93. }
  94. //批量插入
  95. int successCount = db.insertAll("tracer_model", tempModels);
  96. totalInsertCount.add(successCount);
  97. logger.info("model成功入库 " + tempModels.size() + " 条, 累计已入库 " + totalInsertCount.longValue() + ", 待入库队列size " + modelQueue.size());
  98. } catch (Exception e) {
  99. e.printStackTrace();
  100. }
  101. }
  102. });
  103. }
  104. }
  105. }