liyunfeng 2 år sedan
förälder
incheckning
c566ac0512
28 ändrade filer med 390 tillägg och 580 borttagningar
  1. 293 0
      Dashboard/src/main/java/com/jd/platform/jlog/dashboard/utils/CharTest.java
  2. 4 4
      client/pom.xml
  3. 5 2
      client/src/main/java/com/jd/platform/jlog/client/TracerClientStarter.java
  4. 0 2
      client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java
  5. 1 1
      client/src/main/java/com/jd/platform/jlog/client/udp/UdpSender.java
  6. 0 6
      common/pom.xml
  7. 4 10
      common/src/main/java/com/jd/platform/jlog/common/handler/CompressHandler.java
  8. 1 10
      common/src/main/java/com/jd/platform/jlog/common/handler/ExtractHandler.java
  9. 8 4
      common/src/main/java/com/jd/platform/jlog/common/utils/IdWorker.java
  10. 0 15
      config/pom.xml
  11. 0 18
      example/pom.xml
  12. 4 4
      example/src/main/java/com/jd/platform/jlog/clientdemo/web/TestController.java
  13. 19 1
      example/src/main/resources/application.properties
  14. 0 67
      example/src/test/java/com/jd/platform/jlog/test/Common.java
  15. 0 85
      example/src/test/java/com/jd/platform/jlog/test/EtcdConfiguratorTest.java
  16. 0 69
      example/src/test/java/com/jd/platform/jlog/test/FileConfiguratorTest.java
  17. 0 81
      example/src/test/java/com/jd/platform/jlog/test/NacosConfiguratorTest.java
  18. 0 79
      example/src/test/java/com/jd/platform/jlog/test/TracerPacketTest.java
  19. 0 79
      example/src/test/java/com/jd/platform/jlog/test/ZKConfiguratorTest.java
  20. 1 1
      worker/pom.xml
  21. 5 1
      worker/src/main/java/com/jd/platform/jlog/worker/WorkerApplication.java
  22. 1 18
      worker/src/main/java/com/jd/platform/jlog/worker/config/CenterStarter.java
  23. 9 1
      worker/src/main/java/com/jd/platform/jlog/worker/db/Db.java
  24. 2 2
      worker/src/main/java/com/jd/platform/jlog/worker/disruptor/Producer.java
  25. 19 8
      worker/src/main/java/com/jd/platform/jlog/worker/disruptor/TracerConsumer.java
  26. 2 2
      worker/src/main/java/com/jd/platform/jlog/worker/store/TracerModelToDbStore.java
  27. 1 2
      worker/src/main/java/com/jd/platform/jlog/worker/udp/UdpServer.java
  28. 11 8
      worker/src/main/resources/application.yml

+ 293 - 0
Dashboard/src/main/java/com/jd/platform/jlog/dashboard/utils/CharTest.java

@@ -0,0 +1,293 @@
+package com.jd.platform.jlog.dashboard.utils;
+
+import com.alibaba.fastjson.JSON;
+import org.springframework.util.StringUtils;
+import ru.yandex.clickhouse.ClickHouseConnection;
+import ru.yandex.clickhouse.ClickHouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * 测试类 别用
+ */
+@Deprecated
+public class CharTest {
+    public static void main2(String[] args) throws SQLException, UnsupportedEncodingException, InterruptedException {
+
+        List<Map<String, Object>> list = new ArrayList<>();
+        String sql = "select * from tracer_model";
+
+        Connection connection = getConn();
+        Statement statement = connection.createStatement();
+
+        try {
+            int id = new Random().nextInt(10000);
+            String str = "SOSOSOSOSA1232哈哈哈";
+            byte[] bt = ZstdUtils.compress(str.getBytes());
+
+            PreparedStatement pstmt = connection.prepareStatement("insert into test values(?, ?)");
+            for (int i = 0; i < 1; i++) {
+                pstmt.setInt(1, id);
+                pstmt.setString(2, new String(bt,"ISO8859-1"));
+                pstmt.addBatch();
+            }
+           // pstmt.executeBatch();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+
+        ResultSet results = statement.executeQuery(sql);
+        ResultSetMetaData rsmd = results.getMetaData();
+
+        while(results.next()){
+            Map<String, Object> row = new HashMap<>();
+            for(int i = 1; i <= rsmd.getColumnCount(); i++){
+                row.put(rsmd.getColumnName(i), results.getObject(rsmd.getColumnName(i)));
+            }
+            list.add(row);
+        }
+
+
+        String str4 = "SOSOSOSOSA1232哈哈哈";
+        byte[] bt4 = ZstdUtils.compress(str4.getBytes());
+        System.out.println("压缩后的 bt4 ==>  "+Arrays.toString(bt4));
+        System.out.println("list => "+list);
+        for (Map<String, Object> objectMap : list) {
+            if("7648".equals((objectMap.get("id").toString()))){
+                System.out.println("bt4 ==> "+Arrays.toString(objectMap.get("name").toString()
+                        .getBytes("ISO8859-1")));
+                String result = ZstdUtils.decompress(objectMap.get("name").toString()
+                        .getBytes("ISO8859-1"));
+                System.out.println("解密之后的结果: "+result);
+            }
+
+        }
+
+
+        System.out.println("==================================");
+        cutString(str4,8);
+
+    }
+
+    public static Connection getConn() {
+
+        String username = "default";
+      //  String password = "123456";
+        String address = "jdbc:clickhouse://101.42.242.201:8123";
+        String db = "default";
+        int socketTimeout = 600000;
+
+        ClickHouseProperties properties = new ClickHouseProperties();
+        properties.setUser(username);
+      //  properties.setPassword(password);
+        properties.setDatabase(db);
+        properties.setSocketTimeout(socketTimeout);
+        ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties);
+
+        ClickHouseConnection conn = null;
+        try {
+            conn = clickHouseDataSource.getConnection();
+            return conn;
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+
+    private static byte[] format(String dbStr){
+        String[] arr0 = dbStr.split(",");
+
+        byte[] bt = new byte[arr0.length];
+        for (int i = 0; i < arr0.length; i++) {
+            if(i == 0){
+                String first = arr0[0].replace("[", "");
+                System.out.println("first --> "+first);
+                bt[i] = Byte.valueOf(first);
+                continue;
+            }
+            if(i ==  arr0.length-1){
+                String last = arr0[arr0.length-1].replace("]", "");
+                bt[i] = Byte.valueOf(last.trim());
+                continue;
+            }
+            System.out.println(arr0[i]+"   len => "+arr0[i].length());
+            bt[i] = Byte.parseByte(arr0[i].trim());
+        }
+        return bt;
+    }
+
+
+
+
+    public static void cutString(String str, int n) throws UnsupportedEncodingException, InterruptedException {
+
+
+        AtomicInteger fail = new AtomicInteger(0);
+        AtomicInteger ok = new AtomicInteger(0);
+
+        for (int j = 0; j < 2; j++) {
+            new Thread(() -> {
+                for (int i = 0; i < 5; i++) {
+                    int id = new Random().nextInt(99000000);
+                    String ml = String.valueOf(getRandomChar()) +getRandomChar()+getRandomChar()+getRandomChar()+id;
+                    System.out.println("old--> "+ml);
+                    byte[] zipBt= ZstdUtils.compress(ml.getBytes());
+                    String newStr = null;
+
+                    try {
+                        newStr = new String(zipBt,StandardCharsets.ISO_8859_1);
+                    } catch (Exception e) {}
+
+                  //  byte[] resul = newStr.getBytes();
+                  //  System.out.println("新字符串的bt => "+Arrays.toString(resul));
+
+                    try {
+                        byte[] resul = sub(newStr, 4);
+                        if(resul[0]==40 && resul[1]==-62 && resul[2]==-75 && resul[3]==47 && resul[4]==-61 && resul[5]==-67 && resul[6]==32){
+                            ok.incrementAndGet();
+                        }else{
+                            fail.incrementAndGet();
+                        }
+
+                    } catch (UnsupportedEncodingException e) {
+                        e.printStackTrace();
+                    }
+                    try {
+                        String val = ZstdUtils.decompress(newStr.getBytes(StandardCharsets.ISO_8859_1));
+                        System.out.println("result--> "+val);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }).start();
+        }
+
+
+        Thread.sleep(5000);
+
+        System.out.println("ok == > "+ok.get());
+        System.out.println("fail == > "+fail.get());
+
+    }
+
+    private static byte[] sub(String str, int n) throws UnsupportedEncodingException {
+       // System.out.println("str => "+str);
+        StringBuilder sb = new StringBuilder();
+        int count=0;
+        for (int i = 0; i < str.length(); i++) {
+            String c = String.valueOf(str.charAt(i));
+            sb.append(c);
+            count+=c.getBytes("ISO8859-1").length;
+            if(count>n){
+               // System.out.println("count=> "+count);
+                break;
+            }
+        }
+        return sb.toString().getBytes();
+      //  System.out.println(" bs==> "+ Arrays.toString(sb.toString().getBytes()));
+    }
+
+    public static char getRandomChar() {
+        return (char) (0x4e00 + (int) (Math.random() * (0x9fa5 - 0x4e00 + 1)));
+    }
+
+
+    public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException, SQLException {
+        testChar();
+
+
+        String str = "SOSOSOSOSA1232哈哈哈";
+        byte[] bt = ZstdUtils.compress(str.getBytes());
+
+        String newnewStr = new String(bt, "ISO8859-1");
+        System.out.println("bt4 ==> "+Arrays.toString(newnewStr
+                .getBytes("ISO8859-1")));
+        String result = ZstdUtils.decompress(newnewStr
+                .getBytes("ISO8859-1"));
+        System.out.println("解密之后的结果: "+result);
+
+        if(1==1){
+            return;
+        }
+       // insert();
+        List<Map<String, Object>> list = new ArrayList<>();
+        String sql = "select * from tracer_model";
+
+        Connection connection = getConn();
+        Statement statement = connection.createStatement();
+        ResultSet results = statement.executeQuery(sql);
+        ResultSetMetaData rsmd = results.getMetaData();
+
+        while(results.next()){
+            Map<String, Object> row = new HashMap<>();
+            for(int i = 1; i <= rsmd.getColumnCount(); i++){
+                Object obj = results.getObject(rsmd.getColumnName(i));
+                if(isZip(obj.toString())){
+                    System.out.println("%%%%%%%%### is zip"+obj.toString());
+                    obj = obj.toString().getBytes(StandardCharsets.ISO_8859_1);
+                }
+               // System.out.println("obj=> "+obj);
+                row.put(rsmd.getColumnName(i), obj);
+            }
+            list.add(row);
+        }
+        System.out.println(JSON.toJSONString(list));
+    }
+
+    public static boolean isZip(String str) throws UnsupportedEncodingException {
+
+        if(StringUtils.isEmpty(str)){
+            return false;
+        }
+        StringBuilder sb = new StringBuilder();
+        int count=0;
+        for (int i = 0; i < str.length(); i++) {
+            String c = String.valueOf(str.charAt(i));
+            sb.append(c);
+            count += c.getBytes("ISO8859-1").length;
+            if(count > 4){
+                break;
+            }
+        }
+        byte[] bs = str.getBytes("ISO8859-1");
+       // byte[] bs = sb.toString().getBytes("ISO8859-1");
+        System.out.println(str+ " <=== ### ==> "+ Arrays.toString(bs));
+
+        return bs[0] == 40 && bs[1] == -62 && bs[2] == -75 && bs[3] == 47 && bs[4] == -61 && bs[5] == -67 && bs[6] == 32;
+    }
+
+    private static void insert() throws SQLException, UnsupportedEncodingException {
+        int id = new Random().nextInt(10000);
+        String str = "滴滴员工tangbohu的终身代号是什么???是9527";
+        byte[] bt = ZstdUtils.compress(str.getBytes());
+
+        PreparedStatement pstmt = getConn().prepareStatement("insert into tracer_model (responseContent) values(?)");
+        for (int i = 0; i < 1; i++) {
+            pstmt.setString(1, new String(bt,"ISO8859-1"));
+            pstmt.addBatch();
+        }
+         pstmt.executeBatch();
+    }
+
+    //[40, -75, 47, -3, 32,
+    private static void testChar() throws UnsupportedEncodingException {
+        for (int i = 0; i < 10; i++) {
+            String str =String.valueOf(getRandomChar())+ String.valueOf(getRandomChar()) +i;
+            byte[] initBs = ZstdUtils.compress(str.getBytes());
+            String zipStr = new String(initBs, "ISO8859-1");
+            System.out.println("@@@ => "+ Arrays.toString(zipStr.getBytes("ISO8859-1")));
+        }
+
+      /*  String str = "滴滴员工tangbohu的终身代号是什么???是9527";
+        byte[] initBs = ZstdUtils.compress(str.getBytes());
+        String zipStr = new String(initBs, "ISO8859-1");
+        System.out.println("@@@ => "+ Arrays.toString(zipStr.getBytes("ISO8859-1")));
+ */
+    }
+}

+ 4 - 4
client/pom.xml

@@ -34,16 +34,16 @@
             <version>1.4-SNAPSHOT</version>
         </dependency>-->
 
-        <dependency>
+      <!--  <dependency>
             <groupId>com.jd.platfrom.jlog</groupId>
             <artifactId>config-apollo</artifactId>
             <version>1.4-SNAPSHOT</version>
-        </dependency>
-      <!--  <dependency>
+        </dependency>-->
+        <dependency>
             <groupId>com.jd.platfrom.jlog</groupId>
             <artifactId>config-core</artifactId>
             <version>1.4-SNAPSHOT</version>
-        </dependency>-->
+        </dependency>
         <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>servlet-api</artifactId>

+ 5 - 2
client/src/main/java/com/jd/platform/jlog/client/TracerClientStarter.java

@@ -4,6 +4,9 @@ package com.jd.platform.jlog.client;
 import com.alibaba.fastjson.JSON;
 import com.jd.platform.jlog.client.mdc.Mdc;
 import com.jd.platform.jlog.client.task.Monitor;
+import com.jd.platform.jlog.client.udp.HttpSender;
+import com.jd.platform.jlog.client.udp.UdpClient;
+import com.jd.platform.jlog.client.udp.UdpSender;
 import com.jd.platform.jlog.common.handler.TagConfig;
 import com.jd.platform.jlog.core.ClientHandlerBuilder;
 import com.jd.platform.jlog.core.Configurator;
@@ -88,14 +91,14 @@ public class TracerClientStarter {
         Monitor starter = new Monitor();
         starter.start();
 
-     /* UdpClient udpClient = new UdpClient();
+        UdpClient udpClient = new UdpClient();
         udpClient.start();
 
         //开启发送
         UdpSender.uploadToWorker();
 
         //开启大对象http发送
-        HttpSender.uploadToWorker();*/
+        HttpSender.uploadToWorker();
     }
 
 

+ 0 - 2
client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java

@@ -68,8 +68,6 @@ public class UdpClient {
             //7.关闭group
             group.shutdownGracefully();
         } catch (InterruptedException e) {
-            System.out.println("NioEventLoopGroup ==>  "+e.toString());
-
             e.printStackTrace();
         }
     }

+ 1 - 1
client/src/main/java/com/jd/platform/jlog/client/udp/UdpSender.java

@@ -119,7 +119,7 @@ public class UdpSender {
 
                     List<TracerBean> tempTracers = new ArrayList<>();
                     TracerBean tracerBean = new TracerBean();
-                    tracerBean.setTracerId("-99999");
+                    tracerBean.setTracerId("-1");
                     List<Map<String, Object>> tracerObject = new ArrayList<>();
 
                     Map<String, Object> map = new HashMap<>();

+ 0 - 6
common/pom.xml

@@ -8,7 +8,6 @@
         <version>1.4-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-
     <artifactId>common</artifactId>
 
     <properties>
@@ -18,11 +17,6 @@
         <zstd.version>1.5.0-4</zstd.version>
         <fastjson.version>1.2.70</fastjson.version>
         <protostuff.version>1.7.2</protostuff.version>
-
-       <!-- <zookeeper.version>3.4.14</zookeeper.version>
-        <caffeine.version>2.8.0</caffeine.version>
-        <hutool.version>5.1.0</hutool.version>
-        <hp-etcd.version>0.0.16</hp-etcd.version>-->
     </properties>
 
     <dependencies>

+ 4 - 10
common/src/main/java/com/jd/platform/jlog/common/handler/CompressHandler.java

@@ -1,10 +1,6 @@
 package com.jd.platform.jlog.common.handler;
 
-import com.alibaba.fastjson.JSON;
 import com.jd.platform.jlog.common.utils.ZstdUtils;
-import com.sun.istack.internal.NotNull;
-
-import java.util.Base64;
 import java.util.Map;
 
 import static com.jd.platform.jlog.common.constant.Constant.MIN;
@@ -56,7 +52,7 @@ public class CompressHandler {
     }
 
     public static Map<String, Object> compressReq(Map<String, Object> map){
-        if(instance == null || !isMatched(instance.compress, E_REQ)){
+        if(instance == null || !isMatched(instance.compress, C_REQ)){
             return  map;
         }
 
@@ -67,12 +63,12 @@ public class CompressHandler {
     }
 
     public static byte[] compressLog(byte[] contentBytes){
-        if(instance == null || !isMatched(instance.compress, E_LOG)){ return contentBytes; }
+        if(instance == null || !isMatched(instance.compress, C_LOG)){ return contentBytes; }
         return doCompress(contentBytes);
     }
 
     public static byte[] compressResp(byte[] contentBytes){
-        if(instance == null || !isMatched(instance.compress, E_RESP)){ return contentBytes; }
+        if(instance == null || !isMatched(instance.compress, C_RESP)){ return contentBytes; }
         return doCompress(contentBytes);
     }
 
@@ -80,9 +76,7 @@ public class CompressHandler {
         if(contentBytes.length < instance.threshold){
             return contentBytes;
         }
-        //最终的要发往worker的response,经历了base64压缩
-        byte[] bytes = ZstdUtils.compress(contentBytes);
-        return Base64.getEncoder().encode(bytes);
+        return ZstdUtils.compress(contentBytes);
     }
 
 

+ 1 - 10
common/src/main/java/com/jd/platform/jlog/common/handler/ExtractHandler.java

@@ -1,10 +1,8 @@
 package com.jd.platform.jlog.common.handler;
 
 import com.alibaba.fastjson.JSON;
-import com.jd.platform.jlog.common.utils.CollectionUtil;
 import com.jd.platform.jlog.common.utils.ConfigUtil;
 import com.jd.platform.jlog.common.utils.StringUtil;
-import com.sun.istack.internal.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,18 +85,13 @@ public class ExtractHandler {
     public static Map<String, Object> extractReqTag(Map<String, Object> reqMap) {
 
         if(instance == null || !isMatched(instance.extract, E_REQ)){ return null; }
-
-        System.out.println("### REQ INSTANCE :"+instance.toString());
-
         Map<String, Object> tagMap = new HashMap<>(instance.reqTags.size());
         for (String tag : instance.reqTags) {
             Object val = reqMap.get(tag);
             if(val != null){
                 tagMap.put(tag, val);
             }
-        }
-        System.out.println("提取到了请求入参日志标签:"+JSON.toJSONString(tagMap));
-        return tagMap;
+        }return tagMap;
     }
 
 
@@ -129,7 +122,6 @@ public class ExtractHandler {
                 }
             }
         }
-        System.out.println("提取到了请求log日志标签:"+JSON.toJSONString(tagMap));
         return tagMap;
     }
 
@@ -150,7 +142,6 @@ public class ExtractHandler {
                 requestMap.put(tag, val);
             }
         }
-        System.out.println("提取到了请求出参日志标签:"+JSON.toJSONString(requestMap));
         return requestMap;
     }
 

+ 8 - 4
common/src/main/java/com/jd/platform/jlog/common/utils/IdWorker.java

@@ -1,6 +1,5 @@
 package com.jd.platform.jlog.common.utils;
 
-import com.sun.tools.javac.util.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,14 +85,19 @@ public class IdWorker {
      * @param workerId 工作进程Id
      */
     private static void setWorkerId(final Long workerId) {
-        Assert.check(workerId >= 0L && workerId < WORKER_ID_MAX_VALUE);
-        IdWorker.workerId = workerId;
+        if(workerId >= 0L && workerId < WORKER_ID_MAX_VALUE){
+            IdWorker.workerId = workerId;
+        }else{
+            throw new RuntimeException("workerId is illegal");
+        }
     }
 
     //下一个ID生成算法
     public static long nextId() {
         long time = System.currentTimeMillis();
-        Assert.check(lastTime <= time, "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds"+lastTime);
+        if(lastTime > time){
+            throw new RuntimeException("Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds"+lastTime);
+        }
         if (lastTime == time) {
             if (0L == (sequence = ++sequence & SEQUENCE_MASK)) {
                 time = waitUntilNextTime(time);

+ 0 - 15
config/pom.xml

@@ -1,19 +1,4 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~  Copyright 1999-2019 Seata.io Group.
-  ~
-  ~  Licensed under the Apache License, Version 2.0 (the "License");
-  ~  you may not use this file except in compliance with the License.
-  ~  You may obtain a copy of the License at
-  ~
-  ~       http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~  Unless required by applicable law or agreed to in writing, software
-  ~  distributed under the License is distributed on an "AS IS" BASIS,
-  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~  See the License for the specific language governing permissions and
-  ~  limitations under the License.
-  -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

+ 0 - 18
example/pom.xml

@@ -40,24 +40,6 @@
             <artifactId>spring-boot-configuration-processor</artifactId>
             <optional>true</optional>
         </dependency>
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-test</artifactId>
-            <version>2.5.5</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.13.2</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-test</artifactId>
-            <version>5.3.10</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>

+ 4 - 4
example/src/main/java/com/jd/platform/jlog/clientdemo/web/TestController.java

@@ -66,11 +66,11 @@ public class TestController {
 
     @PostMapping(value = "/test", consumes = MediaType.APPLICATION_JSON_VALUE)
     public Object test(@RequestParam Integer uid, @RequestParam Integer newKey,@RequestBody TestReq req) {
-        String config = ConfiguratorFactory.getInstance().getString("reqTags");
-      //  System.out.println("tagConfig ===> " + tagConfig.toString());
         RequestLog.info("|errno=val3||node=val4||这是随便的log|");
-
-        return 1;
+        if(newKey == 1){
+            return 1;
+        }
+        return "滴滴员工tangbohu的终身代号是什么???是9527";
     }
 
 

+ 19 - 1
example/src/main/resources/application.properties

@@ -3,4 +3,22 @@ serverAddr=101.42.242.201:2379
 
 apollo.meta=http://127.0.0.1:8080
 apollo.config-service=http://127.0.0.1:8080
-app.id=order
+app.id=order
+
+
+tag-config.reqTags[0]=uid
+tag-config.reqTags[1]=url
+
+tag-config.logTags[0]=node
+tag-config.logTags[1]=bizType
+
+tag-config.respTags[0]=errno
+tag-config.respTags[1]=msg
+tag-config.delimiter=|
+tag-config.join==
+tag-config.extract=41
+
+compress=68
+threshold=10
+
+workers=[1,2,3]

+ 0 - 67
example/src/test/java/com/jd/platform/jlog/test/Common.java

@@ -1,67 +0,0 @@
-package com.jd.platform.jlog.test;
-
-import com.alibaba.fastjson.JSON;
-import com.jd.platform.jlog.common.handler.TagConfig;
-import com.jd.platform.jlog.core.Configurator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.List;
-import java.util.Random;
-
-/**
- * @author tangbohu
- * @version 1.0.0
- * @ClassName Common.java
- * @Description TODO
- * @createTime 2022年03月01日 07:36:00
- */
-public class Common {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(Common.class);
-
-
-    public static void getTest(Configurator configurator){
-
-        LOGGER.info("配置器类型:{}", configurator.getType());
-        String addr = configurator.getString("serverAddr");
-        LOGGER.info("配置器get addr:{}", addr);
-        TagConfig tagConfig = configurator.getObject("tag-config", TagConfig.class);
-        LOGGER.info("配置器get tagConfig:{}", tagConfig.toString());
-        List workers = configurator.getList("workers");
-        LOGGER.info("配置器get workers:{}", JSON.toJSONString(workers));
-    }
-
-
-    public static void modifyFile(String path)throws Exception{
-
-        String temp;
-        File file = new File(path);
-        FileInputStream fis = new FileInputStream(file);
-        InputStreamReader isr = new InputStreamReader(fis);
-        BufferedReader br = new BufferedReader(isr);
-        StringBuffer buf = new StringBuffer();
-
-        int id = new Random().nextInt(1000);
-        int num = 0;
-        // 保存该行前面的内容
-        while ((temp = br.readLine()) != null) {
-            if(num == 0){
-                buf = buf.append("testKey: ").append(id);
-            }else{
-                buf = buf.append(temp);
-            }
-            num++;
-            buf = buf.append(System.getProperty("line.separator"));
-        }
-
-        br.close();
-        FileOutputStream fos = new FileOutputStream(file);
-        PrintWriter pw = new PrintWriter(fos);
-        pw.write(buf.toString().toCharArray());
-        pw.flush();
-        pw.close();
-    }
-
-}

+ 0 - 85
example/src/test/java/com/jd/platform/jlog/test/EtcdConfiguratorTest.java

@@ -1,85 +0,0 @@
-package com.jd.platform.jlog.test;
-
-import com.alibaba.fastjson.JSON;
-import com.jd.platform.jlog.clientdemo.ExampleApplication;
-import com.jd.platform.jlog.core.Configurator;
-import com.jd.platform.jlog.core.ConfiguratorFactory;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import java.util.List;
-import java.util.Random;
-
-import static com.jd.platform.jlog.test.Common.getTest;
-
-/**
- * @author tangbohu
- * @version 1.0.0
- * @ClassName EtcdConfiguratorTest.java
- * @Description TODO
- * @createTime 2022年03月03日 07:35:00
- */
-@SpringBootTest(classes = ExampleApplication.class)
-@RunWith(SpringRunner.class)
-public class EtcdConfiguratorTest {
-
-
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(EtcdConfiguratorTest.class);
-
-
-    private Configurator configurator = null;
-
-
-
-    @Before
-    public void init() {
-        configurator = ConfiguratorFactory.getInstance();
-        getTest(configurator);
-    }
-
-   // @Test
-    public void testUpdateCFG() throws Exception {
-        List<String> workers = configurator.getList("workers");
-        LOGGER.info("初始化的workers:{}", JSON.toJSONString(workers));
-        String myIp = "121.1.1.0";
-        if(workers.contains(myIp)){
-            // do nothing
-            LOGGER.info("自己的IP还在配置list里 什么也不做");
-            return;
-        }else{
-            LOGGER.info("自己的IP不在配置list里 添加进去并发布");
-            workers.add(myIp);
-        }
-        configurator.putConfig("workers",JSON.toJSONString(workers));
-        List<String> workers2 = configurator.getList("workers");
-        LOGGER.info("最新的workers:{}", JSON.toJSONString(workers2));
-    }
-
-    @Test
-    public void testAddConfigListener() throws Exception {
-        int i1 = new Random().nextInt(2000);
-        int i2 = new Random().nextInt(2000);
-
-        String val1 = configurator.getString("testKey");
-        LOGGER.info("初始化的testKey的val:{}", val1);
-        LOGGER.info("添加监听器后, 修改配置testKey = {}", i1);
-        configurator.putConfig("testKey",i1 + "");
-        LOGGER.info("修改完毕 准备触发监听器");
-        Thread.sleep(5000);
-        String val2 = configurator.getString("testKey");
-        LOGGER.info("第一次修改后的的val:{}", val2);
-        Thread.sleep(5000);
-     //   LOGGER.info("移除监听器后:修改配置testKey = {}",i2);
-     //   configurator.putConfig("testKey",i2 + "");
-        LOGGER.info("准备验证监听器是否停止  最新testKey={}", configurator.getString("testKey"));
-        LOGGER.info("第二次添加监听器");
-        Thread.sleep(22000);
-
-    }
-}

+ 0 - 69
example/src/test/java/com/jd/platform/jlog/test/FileConfiguratorTest.java

@@ -1,69 +0,0 @@
-package com.jd.platform.jlog.test;
-
-import com.alibaba.fastjson.JSON;
-import com.jd.platform.jlog.clientdemo.ExampleApplication;
-import com.jd.platform.jlog.core.Configurator;
-import com.jd.platform.jlog.core.ConfiguratorFactory;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-import org.yaml.snakeyaml.Yaml;
-
-import java.io.*;
-import java.util.Map;
-import java.util.Properties;
-
-import static com.jd.platform.jlog.test.Common.getTest;
-import static com.jd.platform.jlog.test.Common.modifyFile;
-
-
-/**
- * @author tangbohu
- * @version 1.0.0
- * @ClassName FileConfiguratorTest.java
- * @Description TODO
- * @createTime 2022年02月28日 19:45:00
- */
-@SpringBootTest(classes = ExampleApplication.class)
-@RunWith(SpringRunner.class)
-public class FileConfiguratorTest {
-
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(FileConfiguratorTest.class);
-
-
-    private Configurator configurator = null;
-
-
-
-    @Before
-    public void init() {
-        configurator = ConfiguratorFactory.getInstance();
-        getTest(configurator);
-    }
-
-
-
-    @Test
-    public void testAddConfigListener() throws Exception {
-        String path = "/Users/didi/Desktop/jlog/example/target/classes/bakapplication.yml";
-        Properties props = new Properties();
-        FileInputStream fis = new FileInputStream(new File(path));
-        if (path.contains("yml")) {
-            props.putAll(new Yaml().loadAs(fis, Map.class));
-        } else {
-            props.load(fis);
-        }
-        LOGGER.info("读取文件:{} 最新配置:{}", path, JSON.toJSONString(props));
-        modifyFile(path);
-        LOGGER.info("修改文件完毕 准备触发监听器");
-        Thread.sleep(10000);
-    }
-
-
-
-}

+ 0 - 81
example/src/test/java/com/jd/platform/jlog/test/NacosConfiguratorTest.java

@@ -1,81 +0,0 @@
-package com.jd.platform.jlog.test;
-
-import com.alibaba.fastjson.JSON;
-import com.jd.platform.jlog.clientdemo.ExampleApplication;
-import com.jd.platform.jlog.core.Configurator;
-import com.jd.platform.jlog.core.ConfiguratorFactory;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import java.util.List;
-import java.util.Random;
-
-import static com.jd.platform.jlog.test.Common.getTest;
-
-/**
- * @author tangbohu
- * @version 1.0.0
- * @ClassName NacosConfiguratorTest.java
- * @Description TODO
- * @createTime 2022年03月01日 07:35:00
- */
-@SpringBootTest(classes = ExampleApplication.class)
-@RunWith(SpringRunner.class)
-public class NacosConfiguratorTest {
-
-
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(NacosConfiguratorTest.class);
-
-
-    private Configurator configurator = null;
-
-
-
-    @Before
-    public void init() {
-        configurator = ConfiguratorFactory.getInstance();
-        getTest(configurator);
-    }
-
-    @Test
-    public void testUpdateCFG() throws Exception {
-        List<String> workers = configurator.getList("workers");
-        LOGGER.info("初始化的workers:{}", JSON.toJSONString(workers));
-        String myIp = "121.1.1.0";
-        if(workers.contains(myIp)){
-            // do nothing
-            LOGGER.info("自己的IP还在配置list里 什么也不做");
-            return;
-        }else{
-            LOGGER.info("自己的IP不在配置list里 添加进去并发布");
-            workers.add(myIp);
-        }
-        configurator.putConfig("workers",JSON.toJSONString(workers));
-        List<String> workers2 = configurator.getList("workers");
-        LOGGER.info("最新的workers:{}", JSON.toJSONString(workers2));
-    }
-
-    @Test
-    public void testAddConfigListener() throws Exception {
-        int i1 = new Random().nextInt(2000);
-        int i2 = new Random().nextInt(2000);
-
-        String val1 = configurator.getString("testKey");
-        LOGGER.info("初始化的testKey的val:{}", val1);
-       // configurator.putConfig("testKey",i1 + "");
-        LOGGER.info("修改完毕 准备触发监听器");
-        Thread.sleep(1000);
-        String val2 = configurator.getString("testKey");
-        LOGGER.info("第一次修改后的的val:{}", val2);
-        configurator.putConfig("testKey",i2 + "");
-        LOGGER.info("准备验证监听器是否停止  最新testKey={}", configurator.getString("testKey"));
-
-        Thread.sleep(35000);
-    }
-}

+ 0 - 79
example/src/test/java/com/jd/platform/jlog/test/TracerPacketTest.java

@@ -1,79 +0,0 @@
-package com.jd.platform.jlog.test;
-
-import com.alibaba.fastjson.JSON;
-import com.jd.platform.jlog.client.udp.UdpSender;
-import com.jd.platform.jlog.clientdemo.ExampleApplication;
-import com.jd.platform.jlog.common.model.TracerBean;
-import com.jd.platform.jlog.common.utils.IpUtils;
-import com.jd.platform.jlog.common.utils.ZstdUtils;
-import org.apache.tomcat.util.codec.binary.Base64;
-
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-
-/**
- * 跳过过滤器,手动发送供worker消费的日志
- *
- * @author shenkaiwen5
- * @version 1.0
- * @date 2021-12-27
- */
-//@SpringBootTest(classes = ExampleApplication.class)
-//@RunWith(SpringRunner.class)
-public class TracerPacketTest {
-
-    //@Test
-    public void testSendUdp() {
-        TracerBean tracerBean = new TracerBean();
-        List<Map<String, Object>> tracerObject = new ArrayList<>();
-        tracerBean.setTracerObject(tracerObject);
-        //将request信息保存
-        Map<String, Object> requestMap = new HashMap<>(16);
-        requestMap.put("tracerId", "1");
-        requestMap.put("pin", UUID.randomUUID());
-        requestMap.put("appName", "myTest");
-        requestMap.put("uuid", "uuid" + UUID.randomUUID());
-        requestMap.put("client", "android");
-        requestMap.put("clientVersion", "10.3.2");
-        requestMap.put("ip", "127.0.0.1");
-        requestMap.put("serverIp", "127.0.0.1");
-        requestMap.put("uri", "test");
-        tracerObject.add(requestMap);
-
-        //设置耗时
-        tracerBean.setCostTime((int) (System.currentTimeMillis() - tracerBean.getCreateTime()));
-
-        try {
-            byte[] contentBytes = testString().getBytes(StandardCharsets.UTF_8);
-
-            //最终的要发往worker的response
-            byte[] bytes = ZstdUtils.compress(contentBytes);
-            byte[] base64Bytes = Base64.encodeBase64(bytes);
-            Map<String, Object> responseMap = new HashMap<>(8);
-            responseMap.put("response", base64Bytes);
-
-            tracerObject.add(responseMap);
-            UdpSender.offerBean(tracerBean);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * 生成测试的返回值
-     */
-    private String testString() {
-        return JSON.toJSONString(new MyRes(IpUtils.getIp() + " send a test udp message."));
-    }
-
-    /**
-     * 测试返回值类
-     */
-    class MyRes {
-        String context;
-
-        MyRes(String s) {
-            this.context = s;
-        }
-    }
-}

+ 0 - 79
example/src/test/java/com/jd/platform/jlog/test/ZKConfiguratorTest.java

@@ -1,79 +0,0 @@
-package com.jd.platform.jlog.test;
-
-import com.alibaba.fastjson.JSON;
-import com.jd.platform.jlog.clientdemo.ExampleApplication;
-import com.jd.platform.jlog.core.Configurator;
-import com.jd.platform.jlog.core.ConfiguratorFactory;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import java.util.List;
-
-import java.util.Random;
-
-import static com.jd.platform.jlog.test.Common.getTest;
-
-/**
- * @author tangbohu
- * @version 1.0.0
- * @ClassName ZKConfiguratorTest.java
- * @Description TODO
- * @createTime 2022年03月01日 07:35:00
- */
-@SpringBootTest(classes = ExampleApplication.class)
-@RunWith(SpringRunner.class)
-public class ZKConfiguratorTest {
-
-
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(ZKConfiguratorTest.class);
-
-
-    private Configurator configurator = null;
-
-
-
-    @Before
-    public void init() {
-        configurator = ConfiguratorFactory.getInstance();
-        getTest(configurator);
-    }
-
-    @Test
-    public void testUpdateCFG() throws Exception {
-        List<String> workers = configurator.getList("workers");
-        LOGGER.info("初始化的workers:{}", JSON.toJSONString(workers));
-        String myIp = "121.0";
-        if(workers.contains(myIp)){
-            LOGGER.info("自己的IP还在配置list里 什么也不做");
-            return;
-        }else{
-            LOGGER.info("自己的IP不在配置list里 添加进去并发布");
-            workers.add(myIp);
-        }
-        configurator.putConfig("workers",JSON.toJSONString(workers));
-        List<String> workers2 = configurator.getList("workers");
-        LOGGER.info("最新的workers:{}", JSON.toJSONString(workers2));
-    }
-
-    @Test
-    public void testAddConfigListener() throws Exception {
-        int i1 = new Random().nextInt(2000);
-        int i2 = new Random().nextInt(2000);
-
-        String val1 = configurator.getString("testKey");
-        LOGGER.info("初始化的testKey的val:{}", val1);
-        LOGGER.info("添加监听器后, 修改配置testKey = {}", i1);
-       // configurator.putConfig("testKey",i1 + "");
-        LOGGER.info("修改完毕 准备触发监听器");
-        Thread.sleep(1000);
-        String val2 = configurator.getString("testKey");
-        LOGGER.info("第一次修改后的的val:{}", val2);
-        Thread.sleep(1000);
-    }
-}

+ 1 - 1
worker/pom.xml

@@ -24,7 +24,7 @@
         </dependency>
         <dependency>
             <groupId>com.jd.platfrom.jlog</groupId>
-            <artifactId>config-zk</artifactId>
+            <artifactId>config-core</artifactId>
             <version>1.4-SNAPSHOT</version>
         </dependency>
 

+ 5 - 1
worker/src/main/java/com/jd/platform/jlog/worker/WorkerApplication.java

@@ -10,7 +10,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 public class WorkerApplication {
 
     public static void main(String[] args) {
-        SpringApplication.run(WorkerApplication.class, args);
+        try {
+            SpringApplication.run(WorkerApplication.class, args);
+        }catch (Exception e){
+            e.printStackTrace();
+        }
     }
 
 }

+ 1 - 18
worker/src/main/java/com/jd/platform/jlog/worker/config/CenterStarter.java

@@ -24,22 +24,6 @@ import java.util.concurrent.TimeUnit;
  */
 @Component
 public class CenterStarter {
-    /**
-     * 该worker为哪个app服务
-     */
-    @Value("${config.workerPath}")
-    private String workerPath;
-    /**
-     * 配置中心地址
-     */
-    @Value("${config.server}")
-    private String configServer;
-    /**
-     * 机房
-     */
-    @Value("${config.mdc}")
-    private String mdc;
-
 
     /**
      * 上报自己的ip到配置中心
@@ -68,8 +52,7 @@ public class CenterStarter {
      * 在配置中心存放的key
      */
     private String buildKey() {
-        String hostName = IpUtils.getHostName();
-        return Constant.WORKER_PATH + workerPath + "/" + hostName;
+        return IpUtils.getHostName();
     }
 
     /**

+ 9 - 1
worker/src/main/java/com/jd/platform/jlog/worker/db/Db.java

@@ -1,10 +1,12 @@
 package com.jd.platform.jlog.worker.db;
 
+import com.alibaba.fastjson.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -13,6 +15,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static java.nio.charset.StandardCharsets.*;
+
 /**
  * https://blog.csdn.net/linglongxin24/article/details/53769957
  *
@@ -81,7 +85,11 @@ public class Db {
 //            logger.info(sql.toString());
             for (Map<String, Object> data : datas) {
                 for (int k = 0; k < keys.length; k++) {
-                    preparedStatement.setObject(k + 1, data.get(keys[k]));
+                    Object val = data.get(keys[k]);
+                    if(val instanceof byte[]){
+                        val = new String((byte[]) val, ISO_8859_1);
+                    }
+                    preparedStatement.setObject(k + 1, val);
                 }
                 preparedStatement.addBatch();
             }

+ 2 - 2
worker/src/main/java/com/jd/platform/jlog/worker/disruptor/Producer.java

@@ -46,8 +46,8 @@ public class Producer {
             logger.info("生产消费队列,已接收:" + totalReceive);
         }
         try {
-            OneTracer OneTracer = ringBuffer.get(sequence);
-            OneTracer.setBytes(bytes);
+            OneTracer oneTracer = ringBuffer.get(sequence);
+            oneTracer.setBytes(bytes);
         } finally {
             ringBuffer.publish(sequence);
         }

+ 19 - 8
worker/src/main/java/com/jd/platform/jlog/worker/disruptor/TracerConsumer.java

@@ -1,5 +1,6 @@
 package com.jd.platform.jlog.worker.disruptor;
 
+import com.alibaba.fastjson.JSON;
 import com.jd.platform.jlog.common.model.RunLogMessage;
 import com.jd.platform.jlog.common.model.TracerBean;
 import com.jd.platform.jlog.common.model.TracerData;
@@ -13,10 +14,11 @@ import io.netty.util.internal.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
 import java.util.concurrent.atomic.LongAdder;
 
 /**
@@ -44,6 +46,9 @@ public class TracerConsumer implements WorkHandler<OneTracer> {
      */
     private TracerLogToDbStore tracerLogToDbStore;
 
+    private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+
     public TracerConsumer(TracerModelToDbStore tracerModelToDbStore, TracerLogToDbStore tracerLogToDbStore) {
         this.tracerModelToDbStore = tracerModelToDbStore;
         this.tracerLogToDbStore = tracerLogToDbStore;
@@ -59,6 +64,9 @@ public class TracerConsumer implements WorkHandler<OneTracer> {
             byte[] decompressBytes = ZstdUtils.decompressBytes(oneTracer.getBytes());
 
             TracerData tracerData = ProtostuffUtils.deserialize(decompressBytes, TracerData.class);
+
+            System.out.println("从事件中获取并解压的数据="+ JSON.toJSONString(tracerData));
+
             //包含了多个tracer对象
             List<TracerBean> tracerBeanList = tracerData.getTracerBeanList();
             buildTracerModel(tracerBeanList);
@@ -78,9 +86,8 @@ public class TracerConsumer implements WorkHandler<OneTracer> {
         //遍历传过来的
         for (TracerBean tracerBean : tracerBeanList) {
             //普通日志
-            if ("-99999".equals(tracerBean.getTracerId())) {
+            if ("-1".equals(tracerBean.getTracerId())) {
                 dealTracerLog(tracerBean);
-
             } else {
                 dealFilterModel(tracerBean);
             }
@@ -121,7 +128,7 @@ public class TracerConsumer implements WorkHandler<OneTracer> {
 
         Map<String, Object> map = new HashMap<>(requestMap);
 
-        long tracerId = requestMap.get("tracerId") == null ? 0 : Long.valueOf(requestMap.get("tracerId").toString());
+        long tracerId = Long.parseLong(tracerBean.getTracerId());
         //filter的出入参
         Map<String, Object> responseMap = mapList.get(mapList.size() - 1);
 
@@ -129,13 +136,17 @@ public class TracerConsumer implements WorkHandler<OneTracer> {
         if (responseMap.get("response") != null) {
             responseBytes = (byte[]) responseMap.get("response");
         }
-
         map.put("responseContent", responseBytes);
         map.put("costTime", tracerBean.getCostTime());
         map.put("tracerId", tracerId);
+        map.put("createTime", formatLongTime(tracerBean.getCreateTime()));
         responseMap.remove("response");
         map.putAll(responseMap);
         tracerModelToDbStore.offer(map);
     }
 
+    private static String formatLongTime(long time) {
+        return DEFAULT_FORMATTER.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(time),ZoneId.systemDefault()));
+    }
+
 }

+ 2 - 2
worker/src/main/java/com/jd/platform/jlog/worker/store/TracerModelToDbStore.java

@@ -1,7 +1,7 @@
 package com.jd.platform.jlog.worker.store;
 
-import com.google.common.collect.Queues;
 import com.jd.platform.jlog.common.utils.AsyncPool;
+import com.jd.platform.jlog.common.utils.AsyncWorker;
 import com.jd.platform.jlog.common.utils.CollectionUtil;
 import com.jd.platform.jlog.worker.db.Db;
 import org.slf4j.Logger;
@@ -93,7 +93,7 @@ public class TracerModelToDbStore {
                     try {
                         List<Map<String, Object>> tempModels = new ArrayList<>();
                         //每1s入库一次
-                        Queues.drain(modelQueue, tempModels, Integer.valueOf(batchSize), interval, TimeUnit.SECONDS);
+                        AsyncWorker.drain(modelQueue, tempModels, Integer.valueOf(batchSize), interval, TimeUnit.SECONDS);
                         if (CollectionUtil.isEmpty(tempModels)) {
                             continue;
                         }

+ 1 - 2
worker/src/main/java/com/jd/platform/jlog/worker/udp/UdpServer.java

@@ -12,6 +12,7 @@ import io.netty.handler.codec.MessageToMessageDecoder;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -76,8 +77,6 @@ public class UdpServer {
 
             byte[] body = new byte[buf.readableBytes()];
             buf.readBytes(body);
-
-//            tracerBeanStore.offer(body);
             producer.sendData(body);
         }
     }

+ 11 - 8
worker/src/main/resources/application.yml

@@ -5,17 +5,17 @@ thread:
 queue:
   maxSize: ${queueSize:16384}
   preDbSize: ${preDbSize:10000}
-#etcd的地址,如有多个用逗号分隔
-serverAddr: 101.42.242.201:2181
+
+#serverAddr: 101.42.242.201:2181
 
 server:
-  port: 8080
+  port: 8086
 #ck信息,自行修改
 clickhouse:
-  url: jdbc:clickhouse://${MYSQL_HOST:127.0.0.1}:${MYSQL_PORT:8123}
-  db: ${DB_NAME:test}
-  username: ${MYSQL_USER:test}
-  password: ${MYSQL_PASS:test}
+  url: jdbc:clickhouse://101.42.242.201:8123
+  db: default
+  username: default
+  password:
   batchSize: ${BATCH_SIZE:5000}
   poolSize: ${POOL_SIZE:5}
   insertInterval: ${INSERT_INTERVAL:5}
@@ -24,4 +24,7 @@ log:
   batchSize: ${BATCH_SIZE:5000}
   poolSize: ${POOL_SIZE:2}
   insertInterval: ${INSERT_INTERVAL:5}
-  preDbSize: ${preDbSize:10000}
+  preDbSize: ${preDbSize:10000}
+
+
+workers: "['192.268.1.2:8888','192.268.1.3:8888']"