UdpClient.java 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package com.jd.platform.jlog.client.udp;
  2. import com.jd.platform.jlog.client.Context;
  3. import com.jd.platform.jlog.client.modeholder.ModeHolder;
  4. import com.jd.platform.jlog.client.worker.WorkerInfoHolder;
  5. import com.jd.platform.jlog.common.constant.Constant;
  6. import com.jd.platform.jlog.common.model.TracerData;
  7. import com.jd.platform.jlog.common.utils.AsyncPool;
  8. import com.jd.platform.jlog.common.utils.ProtostuffUtils;
  9. import com.jd.platform.jlog.common.utils.ZstdUtils;
  10. import io.netty.bootstrap.Bootstrap;
  11. import io.netty.buffer.ByteBuf;
  12. import io.netty.channel.Channel;
  13. import io.netty.channel.ChannelHandlerContext;
  14. import io.netty.channel.ChannelInitializer;
  15. import io.netty.channel.nio.NioEventLoopGroup;
  16. import io.netty.channel.socket.DatagramPacket;
  17. import io.netty.channel.socket.nio.NioDatagramChannel;
  18. import io.netty.handler.codec.MessageToMessageEncoder;
  19. import java.net.InetSocketAddress;
  20. import java.util.List;
  21. /**
  22. * UdpClient
  23. * @author wuweifeng
  24. * @version 1.0
  25. * @date 2021-08-10
  26. */
  27. public class UdpClient {
  28. /**
  29. * 大日志即超过6w行
  30. */
  31. private static final long COMPRESS_BYTES_LEN = 60000L;
  32. /**
  33. * 启动udp客户端
  34. */
  35. public void start() {
  36. AsyncPool.asyncDo(this::startUdp);
  37. }
  38. /**
  39. * startUdp
  40. */
  41. private void startUdp() {
  42. //1.NioEventLoopGroup是执行者
  43. NioEventLoopGroup group = new NioEventLoopGroup();
  44. //2.启动器
  45. Bootstrap bootstrap = new Bootstrap();
  46. //3.配置启动器
  47. bootstrap.group(group) //3.1指定group
  48. .channel(NioDatagramChannel.class) //3.2指定channel
  49. .handler(new ChannelInitializer<NioDatagramChannel>() {
  50. @Override
  51. protected void initChannel(NioDatagramChannel nioDatagramChannel) {
  52. //3.4在pipeline中加入编码器,和解码器(用来处理返回的消息)
  53. nioDatagramChannel.pipeline().addLast(new MyUdpEncoder());
  54. }
  55. });
  56. //4.bind并返回一个channel
  57. try {
  58. Channel channel = bootstrap.bind(8887).sync().channel();
  59. Context.CHANNEL = channel;
  60. //6.等待channel的close
  61. channel.closeFuture().sync();
  62. //7.关闭group
  63. group.shutdownGracefully();
  64. } catch (InterruptedException e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. /**
  69. * 编码器,将要发送的消息(这里是一个String)封装到一个DatagramPacket中
  70. */
  71. private static class MyUdpEncoder extends MessageToMessageEncoder<TracerData> {
  72. @Override
  73. protected void encode(ChannelHandlerContext channelHandlerContext, TracerData tracerData, List<Object> list) {
  74. byte[] bytes = ProtostuffUtils.serialize(tracerData);
  75. byte[] compressBytes = ZstdUtils.compress(bytes);
  76. //判断压缩完是否过大,过大走http接口请求worker
  77. if (compressBytes.length >= COMPRESS_BYTES_LEN) {
  78. //放入发okhttp的队列
  79. HttpSender.offerBean(compressBytes,tracerData.getAddress());
  80. return;
  81. }
  82. ByteBuf buf = channelHandlerContext.alloc().buffer(compressBytes.length);
  83. buf.writeBytes(compressBytes);
  84. InetSocketAddress remoteAddress=null;
  85. if(ModeHolder.getSendMode().getUnicast()){
  86. //挑选worker
  87. String workerIpPort = WorkerInfoHolder.chooseWorker();
  88. if (workerIpPort == null) {
  89. return;
  90. }
  91. String[] ipPort = workerIpPort.split(Constant.SPLITER);
  92. //发往worker的ip
  93. remoteAddress= new InetSocketAddress(ipPort[0], Integer.valueOf(ipPort[1]));
  94. }else{
  95. remoteAddress=tracerData.getAddress();
  96. }
  97. DatagramPacket packet = new DatagramPacket(buf, remoteAddress);
  98. list.add(packet);
  99. }
  100. }
  101. }