plugin_dgram.cc 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <stdio.h>
  17. #include <sys/un.h>
  18. #include <netinet/in.h>
  19. #include <sys/socket.h>
  20. #include <sys/ioctl.h>
  21. #include <linux/sockios.h>
  22. #include <string.h>
  23. #include <assert.h>
  24. #include <stdlib.h>
  25. #include <fcntl.h>
  26. #include <errno.h>
  27. #include "plugin_agent_mgr.h"
  28. #include "plugin_dgram.h"
  29. #include "plugin_unit.h"
  30. #include "../poll/poller_base.h"
  31. #include "mem_check.h"
  32. #include "../log/log.h"
  33. extern "C" {
  34. extern unsigned int get_local_ip();
  35. }
  36. static int GetSocketFamily(int fd)
  37. {
  38. struct sockaddr addr;
  39. bzero(&addr, sizeof(addr));
  40. socklen_t alen = sizeof(addr);
  41. getsockname(fd, &addr, &alen);
  42. return addr.sa_family;
  43. }
  44. PluginDgram::PluginDgram(PluginDecoderUnit *plugin_decoder, int fd)
  45. : EpollBase(plugin_decoder->owner_thread(), fd), mtu(0), _addr_len(0),
  46. _owner(plugin_decoder), _worker_notifier(NULL),
  47. _plugin_receiver(fd, PluginAgentManager::instance()->get_dll()),
  48. _plugin_sender(fd, PluginAgentManager::instance()->get_dll()),
  49. _local_ip(0)
  50. {
  51. }
  52. PluginDgram::~PluginDgram()
  53. {
  54. }
  55. int PluginDgram::do_attach()
  56. {
  57. /* init local ip */
  58. _local_ip = get_local_ip();
  59. switch (GetSocketFamily(netfd)) {
  60. default:
  61. case AF_UNIX:
  62. mtu = 16 << 20;
  63. _addr_len = sizeof(struct sockaddr_un);
  64. break;
  65. case AF_INET:
  66. mtu = 65535;
  67. _addr_len = sizeof(struct sockaddr_in);
  68. break;
  69. case AF_INET6:
  70. mtu = 65535;
  71. _addr_len = sizeof(struct sockaddr_in6);
  72. break;
  73. }
  74. //get worker notifier
  75. _worker_notifier =
  76. PluginAgentManager::instance()->get_worker_notifier();
  77. if (NULL == _worker_notifier) {
  78. log4cplus_error("worker notifier is invalid.");
  79. return -1;
  80. }
  81. enable_input();
  82. return attach_poller();
  83. }
  84. //server peer
  85. int PluginDgram::recv_request(void)
  86. {
  87. //create dgram request
  88. PluginDatagram *dgram_request = NULL;
  89. NEW(PluginDatagram(this, PluginAgentManager::instance()->get_dll()),
  90. dgram_request);
  91. if (NULL == dgram_request) {
  92. log4cplus_error("create PluginRequest for dgram failed, msg:%s",
  93. strerror(errno));
  94. return -1;
  95. }
  96. //set request info
  97. dgram_request->_skinfo.sockfd = netfd;
  98. dgram_request->_skinfo.type = SOCK_DGRAM;
  99. dgram_request->_skinfo.local_ip = _local_ip;
  100. dgram_request->_skinfo.local_port = 0;
  101. dgram_request->_incoming_notifier = _owner->get_incoming_notifier();
  102. dgram_request->_addr_len = _addr_len;
  103. dgram_request->_addr = MALLOC(_addr_len);
  104. if (NULL == dgram_request->_addr) {
  105. log4cplus_error("malloc failed, msg:%m");
  106. DELETE(dgram_request);
  107. return -1;
  108. }
  109. if (_plugin_receiver.recvfrom(dgram_request, mtu) != 0) {
  110. DELETE(dgram_request);
  111. return -1;
  112. }
  113. dgram_request->set_time_info();
  114. if (_worker_notifier->Push(dgram_request) != 0) {
  115. log4cplus_error("push plugin request failed, fd[%d]", netfd);
  116. DELETE(dgram_request);
  117. return -1;
  118. }
  119. return 0;
  120. }
  121. void PluginDgram::input_notify(void)
  122. {
  123. if (recv_request() < 0)
  124. /* */;
  125. }