tc_transceiver.cpp 30 KB


  1. #include "util/tc_transceiver.h"
  2. #include "util/tc_logger.h"
  3. #if TARS_SSL
  4. #include "util/tc_openssl.h"
  5. #endif
  6. #include <sstream>
  7. namespace tars
  8. {
  9. class CloseClourse
  10. {
  11. public:
  12. CloseClourse(TC_Transceiver *trans, TC_Transceiver::CloseReason reason, const string &err) : _trans(trans), _reason(reason), _err(err)
  13. {}
  14. ~CloseClourse() {
  15. _trans->tcpClose(false, _reason, _err);
  16. }
  17. protected:
  18. TC_Transceiver *_trans;
  19. TC_Transceiver::CloseReason _reason;
  20. string _err;
  21. };
  22. #define THROW_ERROR(x, r, y) { CloseClourse c(this, r, y); THROW_EXCEPTION_SYSCODE(x, y); }
  23. static const int BUFFER_SIZE = 16 * 1024;
  24. ///////////////////////////////////////////////////////////////////////
  25. int TC_Transceiver::createSocket(bool udp, bool isLocal, bool isIpv6)
  26. {
  27. #if TARGET_PLATFORM_WINDOWS
  28. int domain = (isIpv6 ? PF_INET6 : PF_INET);
  29. #else
  30. int domain = isLocal ? PF_LOCAL : (isIpv6 ? PF_INET6 : PF_INET);
  31. #endif
  32. int type = udp ? SOCK_DGRAM : SOCK_STREAM;
  33. TC_Socket s;
  34. s.createSocket(type, domain);
  35. if(!udp)
  36. {
  37. s.setTcpNoDelay();
  38. s.setKeepAlive();
  39. s.setNoCloseWait();
  40. }
  41. else
  42. {
  43. s.setRecvBufferSize(512*1024);
  44. s.setSendBufferSize(512*1024);
  45. }
  46. s.setOwner(false);
  47. s.setblock(false);
  48. return s.getfd();
  49. }
  50. bool TC_Transceiver::doConnect(int fd, const struct sockaddr *addr, socklen_t len)
  51. {
  52. bool bConnected = false;
  53. int iRet = ::connect(fd, addr, len);
  54. if (iRet == 0)
  55. {
  56. bConnected = true;
  57. }
  58. else if (!TC_Socket::isInProgress())
  59. {
  60. THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect error, " + _desc);//, TC_Exception::getSystemCode());
  61. }
  62. // LOG_CONSOLE_DEBUG << bConnected << endl;
  63. return bConnected;
  64. }
  65. TC_Transceiver::TC_Transceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
  66. : _epoller(epoller)
  67. , _ep(ep)
  68. , _desc(ep.toString())
  69. , _fd(-1)
  70. , _connStatus(eUnconnected)
  71. , _sendBuffer(this)
  72. , _recvBuffer(this)
  73. , _authState(eAuthInit)
  74. {
  75. // LOG_CONSOLE_DEBUG << endl;
  76. if (ep.isUdp())
  77. {
  78. _pRecvBuffer = std::make_shared<TC_NetWorkBuffer::Buffer>();
  79. _nRecvBufferSize = DEFAULT_RECV_BUFFERSIZE;
  80. _pRecvBuffer->alloc(_nRecvBufferSize);
  81. }
  82. _serverAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
  83. }
  84. TC_Transceiver::~TC_Transceiver()
  85. {
  86. if(!isValid()) return;
  87. if(_ep.isTcp())
  88. {
  89. tcpClose(true, CR_DECONSTRUCTOR, "");
  90. }
  91. else
  92. {
  93. udpClose();
  94. }
  95. }
  96. void TC_Transceiver::initializeClient(const oncreate_callback &oncreate,
  97. const onclose_callback &onclose,
  98. const onconnect_callback &onconnect,
  99. const onrequest_callback &onrequest,
  100. const onparser_callback &onparser,
  101. const onopenssl_callback &onopenssl,
  102. const oncompletepackage_callback &onfinish)
  103. {
  104. _isServer = false;
  105. _createSocketCallback = oncreate;
  106. _onConnectCallback = onconnect;
  107. _onRequestCallback = onrequest;
  108. _onCloseCallback = onclose;
  109. _onParserCallback = onparser;
  110. _onCompletePackageCallback = onfinish;
  111. _onOpensslCallback = onopenssl;
  112. }
  113. void TC_Transceiver::initializeServer(const onclose_callback &onclose,
  114. const onrequest_callback &onrequest,
  115. const onparser_callback &onparser,
  116. const onopenssl_callback &onopenssl,
  117. const oncompletepackage_callback &onfinish)
  118. {
  119. _isServer = true;
  120. _connStatus = eConnected;
  121. _onRequestCallback = onrequest;
  122. _onCloseCallback = onclose;
  123. _onParserCallback = onparser;
  124. _onCompletePackageCallback = onfinish;
  125. _onOpensslCallback = onopenssl;
  126. #if TARS_SSL
  127. if (isSSL())
  128. {
  129. _openssl = _onOpensslCallback(this);
  130. if (!_openssl)
  131. {
  132. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error]");
  133. }
  134. _openssl->init(true);
  135. _openssl->recvBuffer()->setConnection(this);
  136. int ret = _openssl->doHandshake(_sendBuffer);
  137. if (ret != 0)
  138. {
  139. THROW_ERROR(TC_Transceiver_Exception, CR_SSL_HANDSHAKE, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error: " + _openssl->getErrMsg() + "]");
  140. }
  141. // send the encrypt data from write buffer
  142. if (!_sendBuffer.empty())
  143. {
  144. doRequest();
  145. }
  146. }
  147. #endif
  148. }
  149. void TC_Transceiver::setClientAuthCallback(const onclientsendauth_callback &onsendauth, const onclientverifyauth_callback &onverifyauth)
  150. {
  151. _onClientSendAuthCallback = onsendauth;
  152. _onClientVerifyAuthCallback = onverifyauth;
  153. }
  154. void TC_Transceiver::setServerAuthCallback(const onserververifyauth_callback &onverifyauth)
  155. {
  156. _onServerVerifyAuthCallback = onverifyauth;
  157. }
  158. void TC_Transceiver::setBindAddr(const char *host)
  159. {
  160. if(_isServer)
  161. {
  162. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "setBindAddr(" + string(host) + ") only use in client, " + _desc);
  163. }
  164. _bindAddr = TC_Socket::createSockAddr(host);
  165. }
  166. void TC_Transceiver::setBindAddr(const TC_Socket::addr_type &bindAddr)
  167. {
  168. if(_isServer)
  169. {
  170. THROW_ERROR(TC_Transceiver_Exception, CR_Type,"setBindAddr only use in client, " + _desc);
  171. }
  172. _bindAddr = bindAddr;
  173. }
  174. shared_ptr<TC_Epoller::EpollInfo> TC_Transceiver::bindFd(int fd)
  175. {
  176. if(!_isServer)
  177. {
  178. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "client should not call bindFd, " + _desc);
  179. }
  180. _connStatus = eConnected;
  181. _fd = fd;
  182. //设置套接口选项
  183. for(size_t i=0; i< _socketOpts.size(); ++i)
  184. {
  185. setsockopt(_fd,_socketOpts[i].level,_socketOpts[i].optname, (const char*)_socketOpts[i].optval,_socketOpts[i].optlen);
  186. }
  187. _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
  188. getpeername(_fd, _clientAddr.first.get(), &_clientAddr.second);
  189. _epollInfo = _epoller->createEpollInfo(_fd);
  190. return _epollInfo;
  191. }
  192. void TC_Transceiver::setUdpRecvBuffer(size_t nSize)
  193. {
  194. _nRecvBufferSize = nSize;
  195. _pRecvBuffer->alloc(_nRecvBufferSize);
  196. }
  197. void TC_Transceiver::checkConnect()
  198. {
  199. //检查连接是否有错误
  200. if(isConnecting())
  201. {
  202. int iVal = 0;
  203. SOCKET_LEN_TYPE iLen = static_cast<SOCKET_LEN_TYPE>(sizeof(int));
  204. int ret = ::getsockopt(_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&iVal), &iLen);
  205. if (ret < 0 || iVal)
  206. {
  207. string err = TC_Exception::parseError(iVal);
  208. THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect " + _desc + " error:" + err);
  209. }
  210. _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
  211. getpeername(_fd, _clientAddr.first.get(), &_clientAddr.second);
  212. if(_bindAddr.first)
  213. {
  214. //如果服务器终止后,服务器可以第二次快速启动而不用等待一段时间
  215. int iReuseAddr = 1;
  216. setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&iReuseAddr, sizeof(int));
  217. ::bind(_fd, _bindAddr.first.get(), _bindAddr.second);
  218. }
  219. setConnected();
  220. }
  221. }
  222. void TC_Transceiver::parseConnectAddress()
  223. {
  224. if (isConnectIPv6())
  225. {
  226. TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), *(sockaddr_in6*)_serverAddr.first.get());
  227. }
  228. else
  229. {
  230. TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), *(sockaddr_in*)_serverAddr.first.get());
  231. }
  232. }
  233. bool TC_Transceiver::isSSL() const
  234. {
  235. return _ep.isSSL();
  236. }
  237. void TC_Transceiver::connect()
  238. {
  239. if(_isServer)
  240. {
  241. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call connect, " + _desc);
  242. }
  243. if(isValid())
  244. {
  245. return;
  246. }
  247. if(_connStatus == eConnecting || _connStatus == eConnected)
  248. {
  249. return;
  250. }
  251. if (_ep.isUdp())
  252. {
  253. _fd = createSocket(true, false, isConnectIPv6());
  254. _connStatus = eConnected;
  255. _epollInfo = _epoller->createEpollInfo(_fd);
  256. _proxyInfo = _createSocketCallback(this);
  257. if(_proxyInfo)
  258. {
  259. _desc = _proxyInfo->getEndpoint().toString();
  260. }
  261. //每次连接前都重新解析一下地址, 避免dns变了!
  262. parseConnectAddress();
  263. }
  264. else
  265. {
  266. _fd = createSocket(false, false, isConnectIPv6());
  267. _isConnTimeout = false;
  268. _epollInfo = _epoller->createEpollInfo(_fd);
  269. _connTimerId = _epoller->postDelayed(_connTimeout, std::bind(&TC_Transceiver::checkConnectTimeout, this));
  270. _proxyInfo = _createSocketCallback(this);
  271. if(_proxyInfo)
  272. {
  273. _desc = _proxyInfo->getEndpoint().toString();
  274. }
  275. //每次连接前都重新解析一下地址, 避免dns变了!
  276. parseConnectAddress();
  277. bool bConnected = doConnect(_fd, _serverAddr.first.get(), _serverAddr.second);
  278. if(bConnected)
  279. {
  280. setConnected();
  281. }
  282. else
  283. {
  284. _connStatus = TC_Transceiver::eConnecting;
  285. }
  286. }
  287. //设置套接口选项
  288. for(size_t i=0; i< _socketOpts.size(); ++i)
  289. {
  290. setsockopt(_fd,_socketOpts[i].level,_socketOpts[i].optname, (const char*)_socketOpts[i].optval,_socketOpts[i].optlen);
  291. }
  292. }
  293. void TC_Transceiver::checkConnectTimeout()
  294. {
  295. if(_connStatus != eConnected)
  296. {
  297. _isConnTimeout = true;
  298. THROW_ERROR(TC_Transceiver_Exception, CR_ConnectTimeout, "connect timeout, " + _desc);
  299. }
  300. }
  301. void TC_Transceiver::setConnected()
  302. {
  303. if(_isServer)
  304. {
  305. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call setConnected, " + _desc);
  306. }
  307. _connStatus = eConnected;
  308. if(_proxyInfo)
  309. {
  310. connectProxy();
  311. }
  312. else
  313. {
  314. onSetConnected();
  315. }
  316. }
  317. void TC_Transceiver::onSetConnected()
  318. {
  319. if(_isServer)
  320. {
  321. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onSetConnected, " + _desc);
  322. }
  323. onConnect();
  324. _onConnectCallback(this);
  325. if (!isSSL())
  326. {
  327. doAuthReq();
  328. }
  329. }
  330. void TC_Transceiver::onConnect()
  331. {
  332. if(_isServer)
  333. {
  334. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onConnect, " + _desc);
  335. }
  336. _epoller->erase(_connTimerId);
  337. _connTimerId = 0;
  338. #if TARS_SSL
  339. if (isSSL())
  340. {
  341. _openssl = _onOpensslCallback(this);
  342. if (!_openssl)
  343. {
  344. close();
  345. return;
  346. }
  347. _openssl->init(false);
  348. _openssl->setReadBufferSize(1024 * 8);
  349. _openssl->setWriteBufferSize(1024 * 8);
  350. _openssl->recvBuffer()->setConnection(this);
  351. int ret = _openssl->doHandshake(_sendBuffer);
  352. if (ret != 0)
  353. {
  354. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "ssl hande shake failed, " + _desc + ", error:" + _openssl->getErrMsg());
  355. }
  356. // send the encrypt data from write buffer
  357. if (!_sendBuffer.empty())
  358. {
  359. doRequest();
  360. }
  361. return;
  362. }
  363. #endif
  364. }
  365. void TC_Transceiver::doAuthReq()
  366. {
  367. if (_ep.getAuthType() == TC_Endpoint::AUTH_TYPENONE)
  368. {
  369. _authState = eAuthSucc;
  370. _onRequestCallback(this);
  371. }
  372. else
  373. {
  374. //如果是客户端, 则主动发起鉴权请求
  375. shared_ptr<TC_NetWorkBuffer::Buffer> buff = _onClientSendAuthCallback(this);
  376. #if TARS_SSL
  377. if(this->isSSL())
  378. {
  379. int ret = _openssl->write(buff->buffer(), (uint32_t) buff->length(), _sendBuffer);
  380. if(ret != 0)
  381. {
  382. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "ssl write failed, " + _desc + ", error:" + _openssl->getErrMsg());
  383. return ;
  384. }
  385. }
  386. else
  387. {
  388. _sendBuffer.addBuffer(buff);
  389. }
  390. #else
  391. _sendBuffer.addBuffer(buff);
  392. #endif
  393. doRequest();
  394. }
  395. }
  396. void TC_Transceiver::connectProxy()
  397. {
  398. assert(_proxyInfo);
  399. vector<char> buff;
  400. bool succ = _proxyInfo->sendProxyPacket(buff, _ep);
  401. if(!succ)
  402. {
  403. THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_SEND, "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg());
  404. }
  405. _sendBuffer.addBuffer(buff);
  406. doRequest();
  407. }
  408. int TC_Transceiver::doCheckProxy(const char *buff, size_t length)
  409. {
  410. if(!_proxyInfo || _proxyInfo->isSuccess())
  411. return 0;
  412. bool succ = _proxyInfo->recvProxyPacket(buff, length);
  413. if(!succ)
  414. {
  415. THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_RECV, "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg());
  416. }
  417. if(!_proxyInfo->isSuccess())
  418. {
  419. connectProxy();
  420. }
  421. else
  422. {
  423. onSetConnected();
  424. }
  425. return 1;
  426. }
  427. void TC_Transceiver::udpClose()
  428. {
  429. if (_ep.isUdp())
  430. {
  431. _epoller->releaseEpollInfo(_epollInfo);
  432. _epollInfo.reset();
  433. TC_Port::closeSocket(_fd);
  434. _fd = -1;
  435. _connStatus = eUnconnected;
  436. _sendBuffer.clearBuffers();
  437. _recvBuffer.clearBuffers();
  438. }
  439. }
  440. void TC_Transceiver::close()
  441. {
  442. // LOG_CONSOLE_DEBUG << this << endl;
  443. if(!isValid()) return;
  444. if(_ep.isTcp())
  445. {
  446. tcpClose(false, CR_ACTIVE, "active call");
  447. }
  448. else
  449. {
  450. udpClose();
  451. }
  452. }
  453. void TC_Transceiver::tcpClose(bool deconstructor, CloseReason reason, const string &err)
  454. {
  455. if(_ep.isTcp() && isValid())
  456. {
  457. #if TARS_SSL
  458. if (_openssl)
  459. {
  460. _openssl->release();
  461. _openssl.reset();
  462. }
  463. #endif
  464. //LOG_CONSOLE_DEBUG << this << ", " << _fd << ", " << reason << ", " << err << ", " << deconstructor << endl;
  465. _epoller->releaseEpollInfo(_epollInfo);
  466. _epollInfo.reset();
  467. TC_Port::closeSocket(_fd);
  468. _fd = -1;
  469. _connStatus = eUnconnected;
  470. _sendBuffer.clearBuffers();
  471. _recvBuffer.clearBuffers();
  472. _authState = eAuthInit;
  473. if(!deconstructor)
  474. {
  475. //注意必须放在最后, 主要避免_onCloseCallback里面析构了链接, 从而导致又进入tcpClose
  476. //放在最后就不会有问题了, 因为不会再进入这个函数
  477. _onCloseCallback(this, reason, err);
  478. }
  479. }
  480. }
  481. void TC_Transceiver::doRequest()
  482. {
  483. if(!isValid()) return ;
  484. checkConnect();
  485. //buf不为空,先发送buffer的内容
  486. while(!_sendBuffer.empty())
  487. {
  488. auto data = _sendBuffer.getBufferPointer();
  489. assert(data.first != NULL && data.second != 0);
  490. int iRet = this->send(data.first, (uint32_t) data.second, 0);
  491. if (iRet <= 0)
  492. {
  493. return;
  494. }
  495. _sendBuffer.moveHeader(iRet);
  496. }
  497. if(_sendBuffer.empty())
  498. {
  499. _onRequestCallback(this);
  500. }
  501. }
  502. TC_Transceiver::ReturnStatus TC_Transceiver::sendRequest(const shared_ptr<TC_NetWorkBuffer::Buffer> &buff, const TC_Socket::addr_type& addr)
  503. {
  504. // LOG_CONSOLE_DEBUG << buff->length() << endl;
  505. //空数据 直接返回成功
  506. if(buff->empty()) {
  507. return eRetOk;
  508. }
  509. // assert(_sendBuffer.empty());
  510. //buf不为空, 表示之前的数据还没发送完, 直接返回失败, 等buffer可写了,epoll会通知写事件
  511. if(!_sendBuffer.empty()) {
  512. //不应该运行到这里
  513. return eRetNotSend;
  514. }
  515. if(eConnected != _connStatus)
  516. {
  517. return eRetNotSend;
  518. }
  519. if(_proxyInfo && !_proxyInfo->isSuccess()) {
  520. return eRetNotSend;
  521. }
  522. if (_ep.isTcp() && _ep.getAuthType() == TC_Endpoint::AUTH_TYPELOCAL && _authState != eAuthSucc)
  523. {
  524. #if TARS_SSL
  525. if (isSSL() && !_openssl)
  526. {
  527. return eRetNotSend;
  528. }
  529. #endif
  530. return eRetNotSend; // 需要鉴权但还没通过,不能发送非认证消息
  531. }
  532. #if TARS_SSL
  533. // 握手数据已加密,直接发送,会话数据需加密
  534. if (isSSL())
  535. {
  536. if(!_openssl->isHandshaked()) {
  537. return eRetNotSend;
  538. }
  539. int ret = _openssl->write(buff->buffer(), (uint32_t) buff->length(), _sendBuffer);
  540. if(ret != 0)
  541. {
  542. close();
  543. return eRetError;
  544. }
  545. buff->clear();
  546. }
  547. else
  548. {
  549. _sendBuffer.addBuffer(buff);
  550. }
  551. #else
  552. _sendBuffer.addBuffer(buff);
  553. #endif
  554. // LOG_CONSOLE_DEBUG << _sendBuffer.getBufferLength() << endl;
  555. _lastAddr = addr;
  556. do
  557. {
  558. auto data = _sendBuffer.getBufferPointer();
  559. int iRet = this->send(data.first, (uint32_t) data.second, 0);
  560. if(iRet < 0)
  561. {
  562. if(!isValid())
  563. {
  564. _sendBuffer.clearBuffers();
  565. return eRetError;
  566. }
  567. else
  568. {
  569. return eRetFull;
  570. }
  571. }
  572. _sendBuffer.moveHeader(iRet);
  573. // assert(iRet != 0);
  574. }
  575. while(!_sendBuffer.empty());
  576. return eRetOk;
  577. }
  578. void TC_Transceiver::doAuthCheck(TC_NetWorkBuffer *buff)
  579. {
  580. if (!buff->empty() && _ep.isTcp() && _ep.getAuthType() == TC_Endpoint::AUTH_TYPELOCAL && _authState != eAuthSucc)
  581. {
  582. TC_NetWorkBuffer::PACKET_TYPE type;
  583. if(_isServer)
  584. {
  585. //验证鉴权
  586. auto ret = _onServerVerifyAuthCallback(*buff, this);
  587. type = ret.first;
  588. if(type == TC_NetWorkBuffer::PACKET_FULL)
  589. {
  590. _authState = eAuthSucc;
  591. //服务器端, 鉴权通过, 可以响应包
  592. sendRequest(ret.second, _serverAddr);
  593. }
  594. }
  595. else
  596. {
  597. type = _onClientVerifyAuthCallback(*buff, this);
  598. if(type == TC_NetWorkBuffer::PACKET_FULL)
  599. {
  600. _authState = eAuthSucc;
  601. //客户端, 鉴权通过可以发送业务包了
  602. _onRequestCallback(this);
  603. }
  604. }
  605. if(type == TC_NetWorkBuffer::PACKET_ERR)
  606. {
  607. THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "[TC_Transceiver::doProtocolAnalysis, auth error]");
  608. }
  609. }
  610. }
  611. int TC_Transceiver::doProtocolAnalysis(TC_NetWorkBuffer *buff)
  612. {
  613. doAuthCheck(buff);
  614. TC_NetWorkBuffer::PACKET_TYPE ret;
  615. int packetCount = 0;
  616. try
  617. {
  618. do
  619. {
  620. ret = _onParserCallback(*buff, this);
  621. if(ret == TC_NetWorkBuffer::PACKET_FULL || ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE)
  622. {
  623. ++packetCount;
  624. }
  625. if(ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE) {
  626. //full close模式下, 需要关闭连接
  627. tcpClose(false, CR_PROTOCOL, "protocol full close");
  628. }
  629. if(_onCompletePackageCallback) {
  630. //收到一个完整的包
  631. _onCompletePackageCallback(this);
  632. }
  633. }
  634. while (ret == TC_NetWorkBuffer::PACKET_FULL);
  635. }
  636. catch (exception & ex) {
  637. THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error:" + string(ex.what()) + "]");
  638. }
  639. catch (...) {
  640. THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error");
  641. }
  642. if (ret == TC_NetWorkBuffer::PACKET_ERR)
  643. {
  644. string err = "parser decode error, " + _desc;
  645. tcpClose(false, CR_PROTOCOL, err);
  646. throw TC_Transceiver_Exception(err);
  647. }
  648. return packetCount;
  649. }
  650. //////////////////////////////////////////////////////////
  651. TC_TCPTransceiver::TC_TCPTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
  652. : TC_Transceiver(epoller, ep)
  653. {
  654. assert(epoller);
  655. }
  656. //不同的内存分配机制
  657. #if 0
  658. void TC_TCPTransceiver::doResponse()
  659. {
  660. checkConnect();
  661. int iRet = 0;
  662. int packetCount = 0;
  663. do
  664. {
  665. char buff[BUFFER_SIZE];
  666. if ((iRet = this->recv((void*)buff, BUFFER_SIZE, 0)) > 0)
  667. {
  668. int check = doCheckProxy(buff, iRet);
  669. if(check != 0)
  670. {
  671. _recvBuffer.clearBuffers();
  672. return;
  673. }
  674. _recvBuffer.addBuffer(buff, iRet);
  675. //解析协议
  676. packetCount += doProtocolAnalysis(&_recvBuffer);
  677. //收包太多了, 中断一下, 释放线程给send等
  678. if (packetCount >= 2000 && isValid())
  679. {
  680. _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
  681. break;
  682. }
  683. //接收的数据小于buffer大小, 内核会再次通知你
  684. if(iRet < BUFFER_SIZE)
  685. {
  686. break;
  687. }
  688. }
  689. }
  690. while (iRet>0);
  691. }
  692. #else
  693. void TC_TCPTransceiver::doResponse()
  694. {
  695. checkConnect();
  696. int iRet = 0;
  697. int packetCount = 0;
  698. do
  699. {
  700. auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE/8, BUFFER_SIZE);
  701. uint32_t left = (uint32_t)data->left();
  702. if ((iRet = this->recv((void*)data->free(), left, 0)) > 0)
  703. {
  704. int check = doCheckProxy(data->free(), iRet);
  705. if(check != 0)
  706. {
  707. _recvBuffer.clearBuffers();
  708. return;
  709. }
  710. data->addWriteIdx(iRet);
  711. _recvBuffer.addLength(iRet);
  712. //解析协议
  713. packetCount += doProtocolAnalysis(&_recvBuffer);
  714. //收包太多了, 中断一下, 释放线程给send等
  715. if (packetCount >= 2000 && isValid())
  716. {
  717. _epollInfo->mod(EPOLLIN | EPOLLOUT);
  718. // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
  719. break;
  720. }
  721. //接收的数据小于buffer大小, 内核会再次通知你
  722. if(iRet < (int)left)
  723. {
  724. break;
  725. }
  726. }
  727. }
  728. while (iRet>0);
  729. }
  730. #endif
  731. int TC_TCPTransceiver::send(const void* buf, uint32_t len, uint32_t flag)
  732. {
  733. //只有是连接状态才能收发数据
  734. if(eConnected != _connStatus)
  735. {
  736. return -1;
  737. }
  738. int iRet = ::send(_fd, (const char*)buf, len, flag);
  739. // LOG_CONSOLE_DEBUG << this << ", send, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << ", len:" << len << endl;
  740. if (iRet < 0 && !TC_Socket::isPending())
  741. {
  742. THROW_ERROR(TC_Transceiver_Exception, CR_SEND, "TC_TCPTransceiver::send, " + _desc + ", fd:" + TC_Common::tostr(_fd));
  743. }
  744. #if TARGET_PLATFORM_WINDOWS
  745. if(iRet < 0 && TC_Socket::isPending())
  746. {
  747. _epollInfo->mod(EPOLLIN | EPOLLOUT);
  748. }
  749. #endif
  750. return iRet;
  751. }
  752. int TC_TCPTransceiver::recv(void* buf, uint32_t len, uint32_t flag)
  753. {
  754. //只有是连接状态才能收发数据
  755. if(eConnected != _connStatus)
  756. return -1;
  757. int iRet = ::recv(_fd, (char*)buf, len, flag);
  758. // LOG_CONSOLE_DEBUG << this << ", recv, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << endl;
  759. if (iRet == 0 || (iRet < 0 && !TC_Socket::isPending()))
  760. {
  761. int nerr = TC_Exception::getSystemCode();
  762. string err = "recv error, errno:" + TC_Common::tostr(nerr) + "," + TC_Exception::parseError(nerr);
  763. THROW_ERROR(TC_Transceiver_Exception, CR_RECV, err + ", " + _desc + ", fd:" + TC_Common::tostr(_fd));
  764. }
  765. #if TARGET_PLATFORM_WINDOWS
  766. if(iRet < 0 && TC_Socket::isPending())
  767. {
  768. _epollInfo->mod(EPOLLIN | EPOLLOUT);
  769. }
  770. #endif
  771. return iRet;
  772. }
  773. /////////////////////////////////////////////////////////////////
  774. #if TARS_SSL
  775. TC_SSLTransceiver::TC_SSLTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
  776. : TC_TCPTransceiver(epoller, ep)
  777. {
  778. }
  779. #if 0
  780. void TC_SSLTransceiver::doResponse()
  781. {
  782. checkConnect();
  783. int iRet = 0;
  784. int packetCount = 0;
  785. do
  786. {
  787. char buff[BUFFER_SIZE] = {0x00};
  788. if ((iRet = this->recv(buff, BUFFER_SIZE, 0)) > 0)
  789. {
  790. int check = doCheckProxy(buff, iRet);
  791. if(check != 0)
  792. {
  793. return;
  794. }
  795. const bool preHandshake = _openssl->isHandshaked();
  796. int ret = _openssl->read(buff, iRet, _sendBuffer);
  797. if (ret != 0)
  798. {
  799. // LOG_CONSOLE_DEBUG << "ret:" << ret << ", " << _openssl->getErrMsg() << endl;
  800. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, SSL_read handshake failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]");
  801. }
  802. else if(!_sendBuffer.empty())
  803. {
  804. // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse SSL_read prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << ", send handshake len:" << _sendBuffer.getBufferLength() << endl;
  805. int ret = doRequest();
  806. if(ret < 0)
  807. {
  808. // doRequest失败 close fd
  809. if (!isValid())
  810. {
  811. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, ssl doRequest failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]");
  812. }
  813. else
  814. {
  815. return;
  816. }
  817. }
  818. }
  819. // LOG_CONSOLE_DEBUG << "recv length:" << iRet << ", preHandshake:" << preHandshake << endl;
  820. if (!_openssl->isHandshaked())
  821. {
  822. // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse not handshake, prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
  823. return;
  824. }
  825. if (!preHandshake)
  826. {
  827. if(_isServer)
  828. {
  829. _onRequestCallback(this);
  830. }
  831. else
  832. {
  833. //握手完毕, 客户端直接发送鉴权请求
  834. doAuthReq();
  835. // doAuthReq失败,会close fd, 这里判断下是否还有效
  836. if (!isValid())
  837. {
  838. THROW_ERROR(TC_Transceiver_Exception, CR_SSL,
  839. "[TC_SSLTransceiver::doResponse, doAuthReq failed: " + _desc + ", info: " +
  840. _openssl->getErrMsg() + "]");
  841. }
  842. else
  843. {
  844. // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
  845. }
  846. }
  847. }
  848. TC_NetWorkBuffer *rbuf = _openssl->recvBuffer();
  849. //解析协议
  850. packetCount += doProtocolAnalysis(rbuf);
  851. //收包太多了, 中断一下, 释放线程给send等
  852. if (packetCount >= 1000 && isValid())
  853. {
  854. _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
  855. break;
  856. }
  857. //接收的数据小于buffer大小, 内核会再次通知你
  858. if(iRet < BUFFER_SIZE)
  859. {
  860. break;
  861. }
  862. }
  863. }
  864. while (iRet>0);
  865. }
  866. #else
  867. void TC_SSLTransceiver::doResponse()
  868. {
  869. checkConnect();
  870. int iRet = 0;
  871. int packetCount = 0;
  872. do
  873. {
  874. auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE/8, BUFFER_SIZE);
  875. uint32_t left = (uint32_t)data->left();
  876. if ((iRet = this->recv((void*)data->free(), left, 0)) > 0)
  877. {
  878. int check = doCheckProxy(data->free(), iRet);
  879. if(check != 0)
  880. {
  881. return;
  882. }
  883. const bool preHandshake = _openssl->isHandshaked();
  884. int ret = _openssl->read(data->free(), iRet, _sendBuffer);
  885. if (ret != 0)
  886. {
  887. // LOG_CONSOLE_DEBUG << "ret:" << ret << ", " << _openssl->getErrMsg() << endl;
  888. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, SSL_read handshake failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]");
  889. }
  890. else if(!_sendBuffer.empty())
  891. {
  892. doRequest();
  893. }
  894. if (!_openssl->isHandshaked())
  895. {
  896. // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse not handshake, prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
  897. return;
  898. }
  899. if (!preHandshake)
  900. {
  901. if(_isServer)
  902. {
  903. _onRequestCallback(this);
  904. }
  905. else
  906. {
  907. //握手完毕, 客户端直接发送鉴权请求
  908. doAuthReq();
  909. // doAuthReq失败,会close fd, 这里判断下是否还有效
  910. if (!isValid())
  911. {
  912. THROW_ERROR(TC_Transceiver_Exception, CR_SSL,
  913. "[TC_SSLTransceiver::doResponse, doAuthReq failed: " + _desc + ", info: " +
  914. _openssl->getErrMsg() + "]");
  915. }
  916. else
  917. {
  918. // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
  919. }
  920. }
  921. }
  922. TC_NetWorkBuffer *rbuf = _openssl->recvBuffer();
  923. //解析协议
  924. packetCount += doProtocolAnalysis(rbuf);
  925. //收包太多了, 中断一下, 释放线程给send等
  926. if (packetCount >= 1000 && isValid())
  927. {
  928. _epollInfo->mod(EPOLLIN | EPOLLOUT);
  929. // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
  930. break;
  931. }
  932. //接收的数据小于buffer大小, 内核会再次通知你
  933. if(iRet < left)
  934. {
  935. break;
  936. }
  937. }
  938. }
  939. while (iRet>0);
  940. }
  941. #endif
  942. #endif
  943. /////////////////////////////////////////////////////////////////
  944. TC_UDPTransceiver::TC_UDPTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
  945. : TC_Transceiver(epoller, ep)
  946. {
  947. }
  948. TC_UDPTransceiver::~TC_UDPTransceiver()
  949. {
  950. }
  951. void TC_UDPTransceiver::doResponse()
  952. {
  953. checkConnect();
  954. int iRet = 0;
  955. int packetCount = 0;
  956. do
  957. {
  958. _recvBuffer.clearBuffers();
  959. auto data = _recvBuffer.getOrCreateBuffer(_nRecvBufferSize, _nRecvBufferSize);
  960. uint32_t left = (uint32_t)data->left();
  961. if ((iRet = this->recv((void *)data->free(), left, 0)) > 0)
  962. {
  963. data->addWriteIdx(iRet);
  964. _recvBuffer.addLength(iRet);
  965. //解析协议
  966. packetCount += doProtocolAnalysis(&_recvBuffer);
  967. // LOG_CONSOLE_DEBUG << iRet << ", " << packetCount << endl;
  968. //收包太多了, 中断一下, 释放线程给send等
  969. if (packetCount >= 1000 && isValid())
  970. {
  971. _epollInfo->mod(EPOLLIN | EPOLLOUT);
  972. // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
  973. break;
  974. }
  975. }
  976. }
  977. while (iRet > 0);
  978. }
  979. int TC_UDPTransceiver::send(const void* buf, uint32_t len, uint32_t flag)
  980. {
  981. if(!isValid()) return -1;
  982. int iRet = 0;
  983. if(_isServer)
  984. {
  985. iRet=::sendto(_fd, (const char*)buf, len, flag, _lastAddr.first.get(), _lastAddr.second);
  986. }
  987. else
  988. {
  989. iRet=::sendto(_fd, (const char*)buf, len, flag, _serverAddr.first.get(), _serverAddr.second);
  990. }
  991. if(iRet > 0)
  992. {
  993. //udp只发一次 发送一半也算全部发送成功
  994. return len;
  995. }
  996. if (iRet < 0 && TC_Socket::isPending())
  997. {
  998. //EAGAIN, 认为没有发送
  999. return 0;
  1000. }
  1001. return iRet;
  1002. }
  1003. int TC_UDPTransceiver::recv(void* buf, uint32_t len, uint32_t flag)
  1004. {
  1005. if(!isValid()) return -1;
  1006. _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
  1007. int iRet = ::recvfrom(_fd, (char*)buf, len, flag, _clientAddr.first.get(), &_clientAddr.second); //need check from_ip & port
  1008. // cout << "recv :" << iRet << endl;
  1009. // if(iRet < 0)
  1010. // {
  1011. // LOG_CONSOLE_DEBUG << this << ", " << TC_Socket::isPending() << ", " << _isServer << ", recv, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << ", len:" << len << endl;
  1012. // }
  1013. if(!_isServer)
  1014. {
  1015. //客户端才会关闭连接, 会重建socket, 服务端不会
  1016. if (iRet < 0 && !TC_Socket::isPending())
  1017. {
  1018. THROW_ERROR(TC_Transceiver_Exception, CR_RECV, "TC_UDPTransceiver::udp recv, " + _desc + ", fd:" + TC_Common::tostr(_fd));
  1019. return 0;
  1020. }
  1021. }
  1022. return iRet;
  1023. }
  1024. /////////////////////////////////////////////////////////////////
  1025. }