tc_transceiver.cpp 31 KB


  1. #include "util/tc_transceiver.h"
  2. #include "util/tc_logger.h"
  3. #if TARS_SSL
  4. #include "util/tc_openssl.h"
  5. #endif
  6. #include <sstream>
  7. namespace tars
  8. {
  9. class CloseClourse
  10. {
  11. public:
  12. CloseClourse(TC_Transceiver *trans, TC_Transceiver::CloseReason reason, const string &err) : _trans(trans), _reason(reason), _err(err)
  13. {}
  14. ~CloseClourse() {
  15. _trans->tcpClose(false, _reason, _err);
  16. }
  17. protected:
  18. TC_Transceiver *_trans;
  19. TC_Transceiver::CloseReason _reason;
  20. string _err;
  21. };
  22. #define THROW_ERROR(x, r, y) { CloseClourse c(this, r, y); THROW_EXCEPTION_SYSCODE(x, y); }
  23. static const int BUFFER_SIZE = 16 * 1024;
  24. ///////////////////////////////////////////////////////////////////////
  25. int TC_Transceiver::createSocket(bool udp, bool isLocal, bool isIpv6)
  26. {
  27. #if TARGET_PLATFORM_WINDOWS
  28. int domain = (isIpv6 ? PF_INET6 : PF_INET);
  29. #else
  30. int domain = isLocal ? PF_LOCAL : (isIpv6 ? PF_INET6 : PF_INET);
  31. #endif
  32. int type = udp ? SOCK_DGRAM : SOCK_STREAM;
  33. TC_Socket s;
  34. s.createSocket(type, domain);
  35. if(!udp)
  36. {
  37. s.setTcpNoDelay();
  38. s.setKeepAlive();
  39. s.setNoCloseWait();
  40. }
  41. else
  42. {
  43. s.setRecvBufferSize(512*1024);
  44. s.setSendBufferSize(512*1024);
  45. }
  46. s.setOwner(false);
  47. s.setblock(false);
  48. return s.getfd();
  49. }
  50. bool TC_Transceiver::doConnect(int fd, const struct sockaddr *addr, socklen_t len)
  51. {
  52. bool bConnected = false;
  53. int iRet = ::connect(fd, addr, len);
  54. if (iRet == 0)
  55. {
  56. bConnected = true;
  57. }
  58. else if (!TC_Socket::isInProgress())
  59. {
  60. THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect error, " + _desc);//, TC_Exception::getSystemCode());
  61. }
  62. // LOG_CONSOLE_DEBUG << bConnected << endl;
  63. return bConnected;
  64. }
  65. TC_Transceiver::TC_Transceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
  66. : _epoller(epoller)
  67. , _ep(ep)
  68. , _desc(ep.toString())
  69. , _fd(-1)
  70. , _connStatus(eUnconnected)
  71. , _sendBuffer(this)
  72. , _recvBuffer(this)
  73. , _authState(eAuthInit)
  74. {
  75. // LOG_CONSOLE_DEBUG << endl;
  76. if (ep.isUdp())
  77. {
  78. _pRecvBuffer = std::make_shared<TC_NetWorkBuffer::Buffer>();
  79. _nRecvBufferSize = DEFAULT_RECV_BUFFERSIZE;
  80. _pRecvBuffer->alloc(_nRecvBufferSize);
  81. }
  82. _serverAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
  83. }
  84. TC_Transceiver::~TC_Transceiver()
  85. {
  86. if(!isValid()) return;
  87. if(_ep.isTcp())
  88. {
  89. tcpClose(true, CR_DECONSTRUCTOR, "");
  90. }
  91. else
  92. {
  93. udpClose();
  94. }
  95. }
  96. void TC_Transceiver::initializeClient(const oncreate_callback &oncreate,
  97. const onclose_callback &onclose,
  98. const onconnect_callback &onconnect,
  99. const onrequest_callback &onrequest,
  100. const onparser_callback &onparser,
  101. const onopenssl_callback &onopenssl,
  102. const oncompletepackage_callback &onfinish)
  103. {
  104. _isServer = false;
  105. _createSocketCallback = oncreate;
  106. _onConnectCallback = onconnect;
  107. _onRequestCallback = onrequest;
  108. _onCloseCallback = onclose;
  109. _onParserCallback = onparser;
  110. _onCompletePackageCallback = onfinish;
  111. _onOpensslCallback = onopenssl;
  112. }
  113. void TC_Transceiver::initializeServer(const onclose_callback &onclose,
  114. const onrequest_callback &onrequest,
  115. const onparser_callback &onparser,
  116. const onopenssl_callback &onopenssl,
  117. const oncompletepackage_callback &onfinish)
  118. {
  119. _isServer = true;
  120. _connStatus = eConnected;
  121. _onRequestCallback = onrequest;
  122. _onCloseCallback = onclose;
  123. _onParserCallback = onparser;
  124. _onCompletePackageCallback = onfinish;
  125. _onOpensslCallback = onopenssl;
  126. #if TARS_SSL
  127. if (isSSL())
  128. {
  129. _openssl = _onOpensslCallback(this);
  130. if (!_openssl)
  131. {
  132. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error]");
  133. }
  134. _openssl->init(true);
  135. _openssl->recvBuffer()->setConnection(this);
  136. int ret = _openssl->doHandshake(_sendBuffer);
  137. if (ret != 0)
  138. {
  139. THROW_ERROR(TC_Transceiver_Exception, CR_SSL_HANDSHAKE, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error: " + _openssl->getErrMsg() + "]");
  140. }
  141. // send the encrypt data from write buffer
  142. if (!_sendBuffer.empty())
  143. {
  144. doRequest();
  145. }
  146. }
  147. #endif
  148. }
  149. void TC_Transceiver::setClientAuthCallback(const onclientsendauth_callback &onsendauth, const onclientverifyauth_callback &onverifyauth)
  150. {
  151. _onClientSendAuthCallback = onsendauth;
  152. _onClientVerifyAuthCallback = onverifyauth;
  153. }
  154. void TC_Transceiver::setServerAuthCallback(const onserververifyauth_callback &onverifyauth)
  155. {
  156. _onServerVerifyAuthCallback = onverifyauth;
  157. }
  158. void TC_Transceiver::setBindAddr(const char *host)
  159. {
  160. if(_isServer)
  161. {
  162. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "setBindAddr(" + string(host) + ") only use in client, " + _desc);
  163. }
  164. _bindAddr = TC_Socket::createSockAddr(host);
  165. }
  166. void TC_Transceiver::setBindAddr(const TC_Socket::addr_type &bindAddr)
  167. {
  168. if(_isServer)
  169. {
  170. THROW_ERROR(TC_Transceiver_Exception, CR_Type,"setBindAddr only use in client, " + _desc);
  171. }
  172. _bindAddr = bindAddr;
  173. }
  174. shared_ptr<TC_Epoller::EpollInfo> TC_Transceiver::bindFd(int fd)
  175. {
  176. if(!_isServer)
  177. {
  178. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "client should not call bindFd, " + _desc);
  179. }
  180. _connStatus = eConnected;
  181. _fd = fd;
  182. //设置套接口选项
  183. for(size_t i=0; i< _socketOpts.size(); ++i)
  184. {
  185. setsockopt(_fd,_socketOpts[i].level,_socketOpts[i].optname, (const char*)_socketOpts[i].optval,_socketOpts[i].optlen);
  186. }
  187. _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
  188. getpeername(_fd, _clientAddr.first.get(), &_clientAddr.second);
  189. _epollInfo = _epoller->createEpollInfo(_fd);
  190. return _epollInfo;
  191. }
  192. void TC_Transceiver::setUdpRecvBuffer(size_t nSize)
  193. {
  194. _nRecvBufferSize = nSize;
  195. _pRecvBuffer->alloc(_nRecvBufferSize);
  196. }
  197. void TC_Transceiver::checkConnect()
  198. {
  199. //检查连接是否有错误
  200. if(isConnecting())
  201. {
  202. int iVal = 0;
  203. SOCKET_LEN_TYPE iLen = static_cast<SOCKET_LEN_TYPE>(sizeof(int));
  204. int ret = ::getsockopt(_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&iVal), &iLen);
  205. if (ret < 0 || iVal)
  206. {
  207. string err = TC_Exception::parseError(iVal);
  208. THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect " + _desc + " error:" + err);
  209. }
  210. _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
  211. getpeername(_fd, _clientAddr.first.get(), &_clientAddr.second);
  212. if(_bindAddr.first)
  213. {
  214. //如果服务器终止后,服务器可以第二次快速启动而不用等待一段时间
  215. int iReuseAddr = 1;
  216. setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&iReuseAddr, sizeof(int));
  217. ::bind(_fd, _bindAddr.first.get(), _bindAddr.second);
  218. }
  219. setConnected();
  220. }
  221. }
  222. void TC_Transceiver::parseConnectAddress()
  223. {
  224. if (isConnectIPv6())
  225. {
  226. TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), *(sockaddr_in6*)_serverAddr.first.get());
  227. }
  228. else
  229. {
  230. TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), *(sockaddr_in*)_serverAddr.first.get());
  231. }
  232. }
  233. bool TC_Transceiver::isSSL() const
  234. {
  235. return _ep.isSSL();
  236. }
  237. void TC_Transceiver::connect()
  238. {
  239. if(_isServer)
  240. {
  241. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call connect, " + _desc);
  242. }
  243. if(isValid())
  244. {
  245. return;
  246. }
  247. if(_connStatus == eConnecting || _connStatus == eConnected)
  248. {
  249. return;
  250. }
  251. if (_ep.isUdp())
  252. {
  253. _fd = createSocket(true, false, isConnectIPv6());
  254. _connStatus = eConnected;
  255. _epollInfo = _epoller->createEpollInfo(_fd);
  256. _proxyInfo = _createSocketCallback(this);
  257. if(_proxyInfo)
  258. {
  259. _desc = _proxyInfo->getEndpoint().toString();
  260. }
  261. //每次连接前都重新解析一下地址, 避免dns变了!
  262. parseConnectAddress();
  263. }
  264. else
  265. {
  266. _fd = createSocket(false, false, isConnectIPv6());
  267. _isConnTimeout = false;
  268. _epollInfo = _epoller->createEpollInfo(_fd);
  269. _connTimerId = _epoller->postDelayed(_connTimeout, std::bind(&TC_Transceiver::checkConnectTimeout, this));
  270. _proxyInfo = _createSocketCallback(this);
  271. if(_proxyInfo)
  272. {
  273. _desc = _proxyInfo->getEndpoint().toString();
  274. }
  275. //每次连接前都重新解析一下地址, 避免dns变了!
  276. parseConnectAddress();
  277. bool bConnected = doConnect(_fd, _serverAddr.first.get(), _serverAddr.second);
  278. if(bConnected)
  279. {
  280. setConnected();
  281. }
  282. else
  283. {
  284. _connStatus = TC_Transceiver::eConnecting;
  285. }
  286. }
  287. //设置套接口选项
  288. for(size_t i=0; i< _socketOpts.size(); ++i)
  289. {
  290. setsockopt(_fd,_socketOpts[i].level,_socketOpts[i].optname, (const char*)_socketOpts[i].optval,_socketOpts[i].optlen);
  291. }
  292. }
  293. void TC_Transceiver::checkConnectTimeout()
  294. {
  295. if(_connStatus != eConnected)
  296. {
  297. _isConnTimeout = true;
  298. THROW_ERROR(TC_Transceiver_Exception, CR_ConnectTimeout, "connect timeout, " + _desc);
  299. }
  300. }
  301. void TC_Transceiver::setConnected()
  302. {
  303. if(_isServer)
  304. {
  305. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call setConnected, " + _desc);
  306. }
  307. _connStatus = eConnected;
  308. if(_proxyInfo)
  309. {
  310. connectProxy();
  311. }
  312. else
  313. {
  314. onSetConnected();
  315. }
  316. }
  317. void TC_Transceiver::onSetConnected()
  318. {
  319. if(_isServer)
  320. {
  321. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onSetConnected, " + _desc);
  322. }
  323. onConnect();
  324. _onConnectCallback(this);
  325. if (!isSSL())
  326. {
  327. doAuthReq();
  328. }
  329. }
  330. void TC_Transceiver::onConnect()
  331. {
  332. if(_isServer)
  333. {
  334. THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onConnect, " + _desc);
  335. }
  336. _epoller->erase(_connTimerId);
  337. _connTimerId = 0;
  338. #if TARS_SSL
  339. if (isSSL())
  340. {
  341. _openssl = _onOpensslCallback(this);
  342. if (!_openssl)
  343. {
  344. close();
  345. return;
  346. }
  347. _openssl->init(false);
  348. _openssl->setReadBufferSize(1024 * 8);
  349. _openssl->setWriteBufferSize(1024 * 8);
  350. _openssl->recvBuffer()->setConnection(this);
  351. int ret = _openssl->doHandshake(_sendBuffer);
  352. if (ret != 0)
  353. {
  354. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "ssl hande shake failed, " + _desc + ", error:" + _openssl->getErrMsg());
  355. }
  356. // send the encrypt data from write buffer
  357. if (!_sendBuffer.empty())
  358. {
  359. doRequest();
  360. }
  361. return;
  362. }
  363. #endif
  364. }
  365. void TC_Transceiver::doAuthReq()
  366. {
  367. if (_ep.getAuthType() == TC_Endpoint::AUTH_TYPENONE)
  368. {
  369. _authState = eAuthSucc;
  370. _onRequestCallback(this);
  371. }
  372. else
  373. {
  374. //如果是客户端, 则主动发起鉴权请求
  375. shared_ptr<TC_NetWorkBuffer::Buffer> buff = _onClientSendAuthCallback(this);
  376. #if TARS_SSL
  377. if(this->isSSL())
  378. {
  379. int ret = _openssl->write(buff->buffer(), (uint32_t) buff->length(), _sendBuffer);
  380. if(ret != 0)
  381. {
  382. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "ssl write failed, " + _desc + ", error:" + _openssl->getErrMsg());
  383. return ;
  384. }
  385. }
  386. else
  387. {
  388. _sendBuffer.addBuffer(buff);
  389. }
  390. #else
  391. _sendBuffer.addBuffer(buff);
  392. #endif
  393. doRequest();
  394. }
  395. }
  396. void TC_Transceiver::connectProxy()
  397. {
  398. assert(_proxyInfo);
  399. vector<char> buff;
  400. bool succ = _proxyInfo->sendProxyPacket(buff, _ep);
  401. if(!succ)
  402. {
  403. THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_SEND, "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg());
  404. }
  405. _sendBuffer.addBuffer(buff);
  406. doRequest();
  407. }
  408. int TC_Transceiver::doCheckProxy(const char *buff, size_t length)
  409. {
  410. if(!_proxyInfo || _proxyInfo->isSuccess())
  411. return 0;
  412. bool succ = _proxyInfo->recvProxyPacket(buff, length);
  413. if(!succ)
  414. {
  415. THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_RECV, "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg());
  416. }
  417. if(!_proxyInfo->isSuccess())
  418. {
  419. connectProxy();
  420. }
  421. else
  422. {
  423. onSetConnected();
  424. }
  425. return 1;
  426. }
  427. void TC_Transceiver::udpClose()
  428. {
  429. if (_ep.isUdp())
  430. {
  431. _epoller->releaseEpollInfo(_epollInfo);
  432. _epollInfo.reset();
  433. TC_Port::closeSocket(_fd);
  434. _fd = -1;
  435. _connStatus = eUnconnected;
  436. _sendBuffer.clearBuffers();
  437. _recvBuffer.clearBuffers();
  438. }
  439. }
  440. void TC_Transceiver::close()
  441. {
  442. // LOG_CONSOLE_DEBUG << this << endl;
  443. if(!isValid()) return;
  444. if(_ep.isTcp())
  445. {
  446. tcpClose(false, CR_ACTIVE, "active call");
  447. }
  448. else
  449. {
  450. udpClose();
  451. }
  452. }
  453. void TC_Transceiver::tcpClose(bool deconstructor, CloseReason reason, const string &err)
  454. {
  455. if(_ep.isTcp() && isValid())
  456. {
  457. #if TARS_SSL
  458. if (_openssl)
  459. {
  460. _openssl->release();
  461. _openssl.reset();
  462. }
  463. #endif
  464. //LOG_CONSOLE_DEBUG << this << ", " << _fd << ", " << reason << ", " << err << ", " << deconstructor << endl;
  465. _epoller->releaseEpollInfo(_epollInfo);
  466. _epollInfo.reset();
  467. TC_Port::closeSocket(_fd);
  468. _fd = -1;
  469. _connStatus = eUnconnected;
  470. _sendBuffer.clearBuffers();
  471. _recvBuffer.clearBuffers();
  472. _authState = eAuthInit;
  473. if(!deconstructor)
  474. {
  475. //注意必须放在最后, 主要避免_onCloseCallback里面析构了链接, 从而导致又进入tcpClose
  476. //放在最后就不会有问题了, 因为不会再进入这个函数
  477. _onCloseCallback(this, reason, err);
  478. }
  479. }
  480. }
  481. void TC_Transceiver::doRequest()
  482. {
  483. if(!isValid()) return ;
  484. checkConnect();
  485. //buf不为空,先发送buffer的内容
  486. while(!_sendBuffer.empty())
  487. {
  488. auto data = _sendBuffer.getBufferPointer();
  489. assert(data.first != NULL && data.second != 0);
  490. int iRet = this->send(data.first, (uint32_t) data.second, 0);
  491. if (iRet <= 0)
  492. {
  493. return;
  494. }
  495. _sendBuffer.moveHeader(iRet);
  496. }
  497. if(_sendBuffer.empty())
  498. {
  499. _onRequestCallback(this);
  500. }
  501. }
  502. TC_Transceiver::ReturnStatus TC_Transceiver::sendRequest(const shared_ptr<TC_NetWorkBuffer::Buffer> &buff, const TC_Socket::addr_type& addr)
  503. {
  504. // LOG_CONSOLE_DEBUG << buff->length() << endl;
  505. //空数据 直接返回成功
  506. if(buff->empty()) {
  507. return eRetOk;
  508. }
  509. // assert(_sendBuffer.empty());
  510. //buf不为空, 表示之前的数据还没发送完, 直接返回失败, 等buffer可写了,epoll会通知写事件
  511. if(!_sendBuffer.empty()) {
  512. //不应该运行到这里
  513. return eRetNotSend;
  514. }
  515. if(eConnected != _connStatus)
  516. {
  517. return eRetNotSend;
  518. }
  519. if(_proxyInfo && !_proxyInfo->isSuccess()) {
  520. return eRetNotSend;
  521. }
  522. if (_ep.isTcp() && _ep.getAuthType() == TC_Endpoint::AUTH_TYPELOCAL && _authState != eAuthSucc)
  523. {
  524. #if TARS_SSL
  525. if (isSSL() && !_openssl)
  526. {
  527. return eRetNotSend;
  528. }
  529. #endif
  530. return eRetNotSend; // 需要鉴权但还没通过,不能发送非认证消息
  531. }
  532. #if TARS_SSL
  533. // 握手数据已加密,直接发送,会话数据需加密
  534. if (isSSL())
  535. {
  536. if(!_openssl->isHandshaked()) {
  537. return eRetNotSend;
  538. }
  539. int ret = _openssl->write(buff->buffer(), (uint32_t) buff->length(), _sendBuffer);
  540. if(ret != 0)
  541. {
  542. close();
  543. return eRetError;
  544. }
  545. buff->clear();
  546. }
  547. else
  548. {
  549. _sendBuffer.addBuffer(buff);
  550. }
  551. #else
  552. _sendBuffer.addBuffer(buff);
  553. #endif
  554. // LOG_CONSOLE_DEBUG << _sendBuffer.getBufferLength() << endl;
  555. _lastAddr = addr;
  556. do
  557. {
  558. auto data = _sendBuffer.getBufferPointer();
  559. int iRet = this->send(data.first, (uint32_t) data.second, 0);
  560. if(iRet < 0)
  561. {
  562. if(!isValid())
  563. {
  564. _sendBuffer.clearBuffers();
  565. return eRetError;
  566. }
  567. else
  568. {
  569. return eRetFull;
  570. }
  571. }
  572. _sendBuffer.moveHeader(iRet);
  573. // assert(iRet != 0);
  574. }
  575. while(!_sendBuffer.empty());
  576. return eRetOk;
  577. }
  578. void TC_Transceiver::doAuthCheck(TC_NetWorkBuffer *buff)
  579. {
  580. if (!buff->empty() && _ep.isTcp() && _ep.getAuthType() == TC_Endpoint::AUTH_TYPELOCAL && _authState != eAuthSucc)
  581. {
  582. TC_NetWorkBuffer::PACKET_TYPE type;
  583. if(_isServer)
  584. {
  585. //验证鉴权
  586. auto ret = _onServerVerifyAuthCallback(*buff, this);
  587. type = ret.first;
  588. if(type == TC_NetWorkBuffer::PACKET_FULL)
  589. {
  590. _authState = eAuthSucc;
  591. //服务器端, 鉴权通过, 可以响应包
  592. sendRequest(ret.second, _serverAddr);
  593. }
  594. }
  595. else
  596. {
  597. type = _onClientVerifyAuthCallback(*buff, this);
  598. if(type == TC_NetWorkBuffer::PACKET_FULL)
  599. {
  600. _authState = eAuthSucc;
  601. //客户端, 鉴权通过可以发送业务包了
  602. _onRequestCallback(this);
  603. }
  604. }
  605. if(type == TC_NetWorkBuffer::PACKET_ERR)
  606. {
  607. THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "[TC_Transceiver::doProtocolAnalysis, auth error]");
  608. }
  609. }
  610. }
  611. int TC_Transceiver::doProtocolAnalysis(TC_NetWorkBuffer *buff)
  612. {
  613. doAuthCheck(buff);
  614. TC_NetWorkBuffer::PACKET_TYPE ret;
  615. int packetCount = 0;
  616. int ioriginal = 0;
  617. int isurplus = 0;
  618. try
  619. {
  620. do
  621. {
  622. ioriginal = buff->getBuffers().size();
  623. ret = _onParserCallback(*buff, this);
  624. isurplus = buff->getBuffers().size();
  625. if(ret == TC_NetWorkBuffer::PACKET_FULL || ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE)
  626. {
  627. ++packetCount;
  628. }
  629. if(ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE) {
  630. //full close模式下, 需要关闭连接
  631. tcpClose(false, CR_PROTOCOL, "protocol full close");
  632. }
  633. if(_onCompletePackageCallback) {
  634. //收到一个完整的包
  635. _onCompletePackageCallback(this);
  636. }
  637. // 当收到完整包时,解析完包后,buffer没movehead,则报错
  638. if (ret == TC_NetWorkBuffer::PACKET_FULL && ioriginal == isurplus)
  639. {
  640. ret = TC_NetWorkBuffer::PACKET_FULL_CLOSE;
  641. string err = "parser buffer movehead error, " + _desc;
  642. tcpClose(false, CR_PROTOCOL, err); // 这个地方会将连接关闭,为了方便后期问题定位
  643. throw TC_Transceiver_Exception(err);
  644. }
  645. }
  646. while (ret == TC_NetWorkBuffer::PACKET_FULL);
  647. }
  648. catch (exception & ex) {
  649. THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error:" + string(ex.what()) + "]");
  650. }
  651. catch (...) {
  652. THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error");
  653. }
  654. if (ret == TC_NetWorkBuffer::PACKET_ERR)
  655. {
  656. string err = "parser decode error, " + _desc;
  657. tcpClose(false, CR_PROTOCOL, err);
  658. throw TC_Transceiver_Exception(err);
  659. }
  660. return packetCount;
  661. }
  662. //////////////////////////////////////////////////////////
  663. TC_TCPTransceiver::TC_TCPTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
  664. : TC_Transceiver(epoller, ep)
  665. {
  666. assert(epoller);
  667. }
  668. //不同的内存分配机制
  669. #if 0
  670. void TC_TCPTransceiver::doResponse()
  671. {
  672. checkConnect();
  673. int iRet = 0;
  674. int packetCount = 0;
  675. do
  676. {
  677. char buff[BUFFER_SIZE];
  678. if ((iRet = this->recv((void*)buff, BUFFER_SIZE, 0)) > 0)
  679. {
  680. int check = doCheckProxy(buff, iRet);
  681. if(check != 0)
  682. {
  683. _recvBuffer.clearBuffers();
  684. return;
  685. }
  686. _recvBuffer.addBuffer(buff, iRet);
  687. //解析协议
  688. packetCount += doProtocolAnalysis(&_recvBuffer);
  689. //收包太多了, 中断一下, 释放线程给send等
  690. if (packetCount >= 2000 && isValid())
  691. {
  692. _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
  693. break;
  694. }
  695. //接收的数据小于buffer大小, 内核会再次通知你
  696. if(iRet < BUFFER_SIZE)
  697. {
  698. break;
  699. }
  700. }
  701. }
  702. while (iRet>0);
  703. }
  704. #else
  705. void TC_TCPTransceiver::doResponse()
  706. {
  707. checkConnect();
  708. int iRet = 0;
  709. int packetCount = 0;
  710. do
  711. {
  712. auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE/8, BUFFER_SIZE);
  713. uint32_t left = (uint32_t)data->left();
  714. if ((iRet = this->recv((void*)data->free(), left, 0)) > 0)
  715. {
  716. int check = doCheckProxy(data->free(), iRet);
  717. if(check != 0)
  718. {
  719. _recvBuffer.clearBuffers();
  720. return;
  721. }
  722. data->addWriteIdx(iRet);
  723. _recvBuffer.addLength(iRet);
  724. //解析协议
  725. packetCount += doProtocolAnalysis(&_recvBuffer);
  726. //收包太多了, 中断一下, 释放线程给send等
  727. if (packetCount >= 2000 && isValid())
  728. {
  729. _epollInfo->mod(EPOLLIN | EPOLLOUT);
  730. // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
  731. break;
  732. }
  733. //接收的数据小于buffer大小, 内核会再次通知你
  734. if(iRet < (int)left)
  735. {
  736. break;
  737. }
  738. }
  739. }
  740. while (iRet>0);
  741. }
  742. #endif
  743. int TC_TCPTransceiver::send(const void* buf, uint32_t len, uint32_t flag)
  744. {
  745. //只有是连接状态才能收发数据
  746. if(eConnected != _connStatus)
  747. {
  748. return -1;
  749. }
  750. int iRet = ::send(_fd, (const char*)buf, len, flag);
  751. // LOG_CONSOLE_DEBUG << this << ", send, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << ", len:" << len << endl;
  752. if (iRet < 0 && !TC_Socket::isPending())
  753. {
  754. THROW_ERROR(TC_Transceiver_Exception, CR_SEND, "TC_TCPTransceiver::send, " + _desc + ", fd:" + TC_Common::tostr(_fd));
  755. }
  756. #if TARGET_PLATFORM_WINDOWS
  757. if(iRet < 0 && TC_Socket::isPending())
  758. {
  759. _epollInfo->mod(EPOLLIN | EPOLLOUT);
  760. }
  761. #endif
  762. return iRet;
  763. }
  764. int TC_TCPTransceiver::recv(void* buf, uint32_t len, uint32_t flag)
  765. {
  766. //只有是连接状态才能收发数据
  767. if(eConnected != _connStatus)
  768. return -1;
  769. int iRet = ::recv(_fd, (char*)buf, len, flag);
  770. // LOG_CONSOLE_DEBUG << this << ", recv, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << endl;
  771. if (iRet == 0 || (iRet < 0 && !TC_Socket::isPending()))
  772. {
  773. int nerr = TC_Exception::getSystemCode();
  774. string err = "recv error, errno:" + TC_Common::tostr(nerr) + "," + TC_Exception::parseError(nerr);
  775. THROW_ERROR(TC_Transceiver_Exception, CR_RECV, err + ", " + _desc + ", fd:" + TC_Common::tostr(_fd));
  776. }
  777. #if TARGET_PLATFORM_WINDOWS
  778. if(iRet < 0 && TC_Socket::isPending())
  779. {
  780. _epollInfo->mod(EPOLLIN | EPOLLOUT);
  781. }
  782. #endif
  783. return iRet;
  784. }
  785. /////////////////////////////////////////////////////////////////
  786. #if TARS_SSL
  787. TC_SSLTransceiver::TC_SSLTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
  788. : TC_TCPTransceiver(epoller, ep)
  789. {
  790. }
  791. #if 0
  792. void TC_SSLTransceiver::doResponse()
  793. {
  794. checkConnect();
  795. int iRet = 0;
  796. int packetCount = 0;
  797. do
  798. {
  799. char buff[BUFFER_SIZE] = {0x00};
  800. if ((iRet = this->recv(buff, BUFFER_SIZE, 0)) > 0)
  801. {
  802. int check = doCheckProxy(buff, iRet);
  803. if(check != 0)
  804. {
  805. return;
  806. }
  807. const bool preHandshake = _openssl->isHandshaked();
  808. int ret = _openssl->read(buff, iRet, _sendBuffer);
  809. if (ret != 0)
  810. {
  811. // LOG_CONSOLE_DEBUG << "ret:" << ret << ", " << _openssl->getErrMsg() << endl;
  812. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, SSL_read handshake failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]");
  813. }
  814. else if(!_sendBuffer.empty())
  815. {
  816. // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse SSL_read prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << ", send handshake len:" << _sendBuffer.getBufferLength() << endl;
  817. int ret = doRequest();
  818. if(ret < 0)
  819. {
  820. // doRequest失败 close fd
  821. if (!isValid())
  822. {
  823. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, ssl doRequest failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]");
  824. }
  825. else
  826. {
  827. return;
  828. }
  829. }
  830. }
  831. // LOG_CONSOLE_DEBUG << "recv length:" << iRet << ", preHandshake:" << preHandshake << endl;
  832. if (!_openssl->isHandshaked())
  833. {
  834. // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse not handshake, prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
  835. return;
  836. }
  837. if (!preHandshake)
  838. {
  839. if(_isServer)
  840. {
  841. _onRequestCallback(this);
  842. }
  843. else
  844. {
  845. //握手完毕, 客户端直接发送鉴权请求
  846. doAuthReq();
  847. // doAuthReq失败,会close fd, 这里判断下是否还有效
  848. if (!isValid())
  849. {
  850. THROW_ERROR(TC_Transceiver_Exception, CR_SSL,
  851. "[TC_SSLTransceiver::doResponse, doAuthReq failed: " + _desc + ", info: " +
  852. _openssl->getErrMsg() + "]");
  853. }
  854. else
  855. {
  856. // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
  857. }
  858. }
  859. }
  860. TC_NetWorkBuffer *rbuf = _openssl->recvBuffer();
  861. //解析协议
  862. packetCount += doProtocolAnalysis(rbuf);
  863. //收包太多了, 中断一下, 释放线程给send等
  864. if (packetCount >= 1000 && isValid())
  865. {
  866. _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
  867. break;
  868. }
  869. //接收的数据小于buffer大小, 内核会再次通知你
  870. if(iRet < BUFFER_SIZE)
  871. {
  872. break;
  873. }
  874. }
  875. }
  876. while (iRet>0);
  877. }
  878. #else
  879. void TC_SSLTransceiver::doResponse()
  880. {
  881. checkConnect();
  882. int iRet = 0;
  883. int packetCount = 0;
  884. do
  885. {
  886. auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE/8, BUFFER_SIZE);
  887. uint32_t left = (uint32_t)data->left();
  888. if ((iRet = this->recv((void*)data->free(), left, 0)) > 0)
  889. {
  890. int check = doCheckProxy(data->free(), iRet);
  891. if(check != 0)
  892. {
  893. return;
  894. }
  895. const bool preHandshake = _openssl->isHandshaked();
  896. int ret = _openssl->read(data->free(), iRet, _sendBuffer);
  897. if (ret != 0)
  898. {
  899. // LOG_CONSOLE_DEBUG << "ret:" << ret << ", " << _openssl->getErrMsg() << endl;
  900. THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, SSL_read handshake failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]");
  901. }
  902. else if(!_sendBuffer.empty())
  903. {
  904. doRequest();
  905. }
  906. if (!_openssl->isHandshaked())
  907. {
  908. // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse not handshake, prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
  909. return;
  910. }
  911. if (!preHandshake)
  912. {
  913. if(_isServer)
  914. {
  915. _onRequestCallback(this);
  916. }
  917. else
  918. {
  919. //握手完毕, 客户端直接发送鉴权请求
  920. doAuthReq();
  921. // doAuthReq失败,会close fd, 这里判断下是否还有效
  922. if (!isValid())
  923. {
  924. THROW_ERROR(TC_Transceiver_Exception, CR_SSL,
  925. "[TC_SSLTransceiver::doResponse, doAuthReq failed: " + _desc + ", info: " +
  926. _openssl->getErrMsg() + "]");
  927. }
  928. else
  929. {
  930. // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
  931. }
  932. }
  933. }
  934. TC_NetWorkBuffer *rbuf = _openssl->recvBuffer();
  935. //解析协议
  936. packetCount += doProtocolAnalysis(rbuf);
  937. //收包太多了, 中断一下, 释放线程给send等
  938. if (packetCount >= 1000 && isValid())
  939. {
  940. _epollInfo->mod(EPOLLIN | EPOLLOUT);
  941. // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
  942. break;
  943. }
  944. //接收的数据小于buffer大小, 内核会再次通知你
  945. if(iRet < left)
  946. {
  947. break;
  948. }
  949. }
  950. }
  951. while (iRet>0);
  952. }
  953. #endif
  954. #endif
  955. /////////////////////////////////////////////////////////////////
  956. TC_UDPTransceiver::TC_UDPTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
  957. : TC_Transceiver(epoller, ep)
  958. {
  959. }
  960. TC_UDPTransceiver::~TC_UDPTransceiver()
  961. {
  962. }
  963. void TC_UDPTransceiver::doResponse()
  964. {
  965. checkConnect();
  966. int iRet = 0;
  967. int packetCount = 0;
  968. do
  969. {
  970. _recvBuffer.clearBuffers();
  971. auto data = _recvBuffer.getOrCreateBuffer(_nRecvBufferSize, _nRecvBufferSize);
  972. uint32_t left = (uint32_t)data->left();
  973. if ((iRet = this->recv((void *)data->free(), left, 0)) > 0)
  974. {
  975. data->addWriteIdx(iRet);
  976. _recvBuffer.addLength(iRet);
  977. //解析协议
  978. packetCount += doProtocolAnalysis(&_recvBuffer);
  979. // LOG_CONSOLE_DEBUG << iRet << ", " << packetCount << endl;
  980. //收包太多了, 中断一下, 释放线程给send等
  981. if (packetCount >= 1000 && isValid())
  982. {
  983. _epollInfo->mod(EPOLLIN | EPOLLOUT);
  984. // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
  985. break;
  986. }
  987. }
  988. }
  989. while (iRet > 0);
  990. }
  991. int TC_UDPTransceiver::send(const void* buf, uint32_t len, uint32_t flag)
  992. {
  993. if(!isValid()) return -1;
  994. int iRet = 0;
  995. if(_isServer)
  996. {
  997. iRet=::sendto(_fd, (const char*)buf, len, flag, _lastAddr.first.get(), _lastAddr.second);
  998. }
  999. else
  1000. {
  1001. iRet=::sendto(_fd, (const char*)buf, len, flag, _serverAddr.first.get(), _serverAddr.second);
  1002. }
  1003. if(iRet > 0)
  1004. {
  1005. //udp只发一次 发送一半也算全部发送成功
  1006. return len;
  1007. }
  1008. if (iRet < 0 && TC_Socket::isPending())
  1009. {
  1010. //EAGAIN, 认为没有发送
  1011. return 0;
  1012. }
  1013. return iRet;
  1014. }
  1015. int TC_UDPTransceiver::recv(void* buf, uint32_t len, uint32_t flag)
  1016. {
  1017. if(!isValid()) return -1;
  1018. _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
  1019. int iRet = ::recvfrom(_fd, (char*)buf, len, flag, _clientAddr.first.get(), &_clientAddr.second); //need check from_ip & port
  1020. // cout << "recv :" << iRet << endl;
  1021. // if(iRet < 0)
  1022. // {
  1023. // LOG_CONSOLE_DEBUG << this << ", " << TC_Socket::isPending() << ", " << _isServer << ", recv, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << ", len:" << len << endl;
  1024. // }
  1025. if(!_isServer)
  1026. {
  1027. //客户端才会关闭连接, 会重建socket, 服务端不会
  1028. if (iRet < 0 && !TC_Socket::isPending())
  1029. {
  1030. THROW_ERROR(TC_Transceiver_Exception, CR_RECV, "TC_UDPTransceiver::udp recv, " + _desc + ", fd:" + TC_Common::tostr(_fd));
  1031. return 0;
  1032. }
  1033. }
  1034. return iRet;
  1035. }
  1036. /////////////////////////////////////////////////////////////////
  1037. }