input_messenger.h 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #ifndef BRPC_INPUT_MESSENGER_H
  18. #define BRPC_INPUT_MESSENGER_H
  19. #include "butil/iobuf.h" // butil::IOBuf
  20. #include "brpc/socket.h" // SocketId, SocketUser
  21. #include "brpc/parse_result.h" // ParseResult
  22. #include "brpc/input_message_base.h" // InputMessageBase
  23. namespace brpc {
  24. struct InputMessageHandler {
  25. // The callback to cut a message from `source'.
  26. // Returned message will be passed to process_request or process_response
  27. // later and Destroy()-ed by them.
  28. // Returns:
  29. // MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA):
  30. // `source' does not form a complete message yet.
  31. // MakeParseError(PARSE_ERROR_TRY_OTHERS).
  32. // `source' does not fit the protocol, the data should be tried by
  33. // other protocols. If the data is definitely corrupted (e.g. magic
  34. // header matches but other fields are wrong), pop corrupted part
  35. // from `source' before returning.
  36. // MakeMessage(InputMessageBase*):
  37. // The message is parsed successfully and cut from `source'.
  38. typedef ParseResult (*Parse)(butil::IOBuf* source, Socket *socket,
  39. bool read_eof, const void *arg);
  40. Parse parse;
  41. // The callback to handle `msg' created by a successful parse().
  42. // `msg' must be Destroy()-ed when the processing is done. To make sure
  43. // Destroy() is always called, consider using DestroyingPtr<> defined in
  44. // destroyable.h
  45. // May be called in a different thread from parse().
  46. typedef void (*Process)(InputMessageBase* msg);
  47. Process process;
  48. // The callback to verify authentication of this socket. Only called
  49. // on the first message that a socket receives. Can be NULL when
  50. // authentication is not needed or this is the client side.
  51. // Returns true on successful authentication.
  52. typedef bool (*Verify)(const InputMessageBase* msg);
  53. Verify verify;
  54. // An argument associated with the handler.
  55. const void* arg;
  56. // Name of this handler, must be string constant.
  57. const char* name;
  58. };
  59. // Process messages from connections.
  60. // `Message' corresponds to a client's request or a server's response.
  61. class InputMessenger : public SocketUser {
  62. public:
  63. explicit InputMessenger(size_t capacity = 128);
  64. ~InputMessenger();
  65. // [thread-safe] Must be called at least once before Start().
  66. // `handler' contains user-supplied callbacks to cut off and
  67. // process messages from connections.
  68. // Returns 0 on success, -1 otherwise.
  69. int AddHandler(const InputMessageHandler& handler);
  70. // [thread-safe] Create a socket to process input messages.
  71. int Create(const butil::EndPoint& remote_side,
  72. time_t health_check_interval_s,
  73. SocketId* id);
  74. // Overwrite necessary fields in `base_options' and create a socket with
  75. // the modified options.
  76. int Create(SocketOptions base_options, SocketId* id);
  77. // Returns the internal index of `InputMessageHandler' whose name=`name'
  78. // Returns -1 when not found
  79. int FindProtocolIndex(const char* name) const;
  80. int FindProtocolIndex(ProtocolType type) const;
  81. // Get name of the n-th handler
  82. const char* NameOfProtocol(int n) const;
  83. // Add a handler which doesn't belong to any registered protocol.
  84. // Note: Invoking this method indicates that you are using Socket without
  85. // Channel nor Server.
  86. int AddNonProtocolHandler(const InputMessageHandler& handler);
  87. protected:
  88. // Load data from m->fd() into m->read_buf, cut off new messages and
  89. // call callbacks.
  90. static void OnNewMessages(Socket* m);
  91. private:
  92. // Find a valid scissor from `handlers' to cut off `header' and `payload'
  93. // from m->read_buf, save index of the scissor into `index'.
  94. ParseResult CutInputMessage(Socket* m, size_t* index, bool read_eof);
  95. // User-supplied scissors and handlers.
  96. // the index of handler is exactly the same as the protocol
  97. InputMessageHandler* _handlers;
  98. // Max added protocol type
  99. butil::atomic<int> _max_index;
  100. bool _non_protocol;
  101. size_t _capacity;
  102. butil::Mutex _add_handler_mutex;
  103. };
  104. // Get the global InputMessenger at client-side.
  105. BUTIL_FORCE_INLINE InputMessenger* get_client_side_messenger() {
  106. extern InputMessenger* g_messenger;
  107. return g_messenger;
  108. }
  109. InputMessenger* get_or_new_client_side_messenger();
  110. } // namespace brpc
  111. #endif // BRPC_INPUT_MESSENGER_H