#include "util/tc_transceiver.h" #include "util/tc_logger.h" #if TARS_SSL #include "util/tc_openssl.h" #endif #include namespace tars { class CloseClourse { public: CloseClourse(TC_Transceiver *trans, TC_Transceiver::CloseReason reason, const string &err) : _trans(trans), _reason(reason), _err(err) {} ~CloseClourse() { _trans->tcpClose(false, _reason, _err); } protected: TC_Transceiver *_trans; TC_Transceiver::CloseReason _reason; string _err; }; #define THROW_ERROR(x, r, y) { CloseClourse c(this, r, y); THROW_EXCEPTION_SYSCODE(x, y); } static const int BUFFER_SIZE = 16 * 1024; /////////////////////////////////////////////////////////////////////// int TC_Transceiver::createSocket(bool udp, bool isLocal, bool isIpv6) { #if TARGET_PLATFORM_WINDOWS int domain = (isIpv6 ? PF_INET6 : PF_INET); #else int domain = isLocal ? PF_LOCAL : (isIpv6 ? PF_INET6 : PF_INET); #endif int type = udp ? SOCK_DGRAM : SOCK_STREAM; TC_Socket s; s.createSocket(type, domain); if(!udp) { s.setTcpNoDelay(); s.setKeepAlive(); s.setNoCloseWait(); } else { s.setRecvBufferSize(512*1024); s.setSendBufferSize(512*1024); } s.setOwner(false); s.setblock(false); return s.getfd(); } bool TC_Transceiver::doConnect(int fd, const struct sockaddr *addr, socklen_t len) { bool bConnected = false; int iRet = ::connect(fd, addr, len); if (iRet == 0) { bConnected = true; } else if (!TC_Socket::isInProgress()) { THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect error, " + _desc);//, TC_Exception::getSystemCode()); } // LOG_CONSOLE_DEBUG << bConnected << endl; return bConnected; } TC_Transceiver::TC_Transceiver(TC_Epoller* epoller, const TC_Endpoint &ep) : _epoller(epoller) , _ep(ep) , _desc(ep.toString()) , _fd(-1) , _connStatus(eUnconnected) , _sendBuffer(this) , _recvBuffer(this) , _authState(eAuthInit) { // LOG_CONSOLE_DEBUG << endl; if (ep.isUdp()) { _pRecvBuffer = std::make_shared(); _nRecvBufferSize = DEFAULT_RECV_BUFFERSIZE; _pRecvBuffer->alloc(_nRecvBufferSize); } _serverAddr = TC_Socket::createSockAddr(_ep.getHost().c_str()); } TC_Transceiver::~TC_Transceiver() { if(!isValid()) return; if(_ep.isTcp()) { tcpClose(true, CR_DECONSTRUCTOR, ""); } else { udpClose(); } } void TC_Transceiver::initializeClient(const oncreate_callback &oncreate, const onclose_callback &onclose, const onconnect_callback &onconnect, const onrequest_callback &onrequest, const onparser_callback &onparser, const onopenssl_callback &onopenssl, const oncompletepackage_callback &onfinish) { _isServer = false; _createSocketCallback = oncreate; _onConnectCallback = onconnect; _onRequestCallback = onrequest; _onCloseCallback = onclose; _onParserCallback = onparser; _onCompletePackageCallback = onfinish; _onOpensslCallback = onopenssl; } void TC_Transceiver::initializeServer(const onclose_callback &onclose, const onrequest_callback &onrequest, const onparser_callback &onparser, const onopenssl_callback &onopenssl, const oncompletepackage_callback &onfinish) { _isServer = true; _connStatus = eConnected; _onRequestCallback = onrequest; _onCloseCallback = onclose; _onParserCallback = onparser; _onCompletePackageCallback = onfinish; _onOpensslCallback = onopenssl; #if TARS_SSL if (isSSL()) { _openssl = _onOpensslCallback(this); if (!_openssl) { THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error]"); } _openssl->init(true); _openssl->recvBuffer()->setConnection(this); int ret = _openssl->doHandshake(_sendBuffer); if (ret != 0) { THROW_ERROR(TC_Transceiver_Exception, CR_SSL_HANDSHAKE, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error: " + _openssl->getErrMsg() + "]"); } // send the encrypt data from write buffer if (!_sendBuffer.empty()) { doRequest(); } } #endif } void TC_Transceiver::setClientAuthCallback(const onclientsendauth_callback &onsendauth, const onclientverifyauth_callback &onverifyauth) { _onClientSendAuthCallback = onsendauth; _onClientVerifyAuthCallback = onverifyauth; } void TC_Transceiver::setServerAuthCallback(const onserververifyauth_callback &onverifyauth) { _onServerVerifyAuthCallback = onverifyauth; } void TC_Transceiver::setBindAddr(const char *host) { if(_isServer) { THROW_ERROR(TC_Transceiver_Exception, CR_Type, "setBindAddr(" + string(host) + ") only use in client, " + _desc); } _bindAddr = TC_Socket::createSockAddr(host); } void TC_Transceiver::setBindAddr(const TC_Socket::addr_type &bindAddr) { if(_isServer) { THROW_ERROR(TC_Transceiver_Exception, CR_Type,"setBindAddr only use in client, " + _desc); } _bindAddr = bindAddr; } shared_ptr TC_Transceiver::bindFd(int fd) { if(!_isServer) { THROW_ERROR(TC_Transceiver_Exception, CR_Type, "client should not call bindFd, " + _desc); } _connStatus = eConnected; _fd = fd; //设置套接口选项 for(size_t i=0; i< _socketOpts.size(); ++i) { setsockopt(_fd,_socketOpts[i].level,_socketOpts[i].optname, (const char*)_socketOpts[i].optval,_socketOpts[i].optlen); } _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str()); getpeername(_fd, _clientAddr.first.get(), &_clientAddr.second); _epollInfo = _epoller->createEpollInfo(_fd); return _epollInfo; } void TC_Transceiver::setUdpRecvBuffer(size_t nSize) { _nRecvBufferSize = nSize; _pRecvBuffer->alloc(_nRecvBufferSize); } void TC_Transceiver::checkConnect() { //检查连接是否有错误 if(isConnecting()) { int iVal = 0; SOCKET_LEN_TYPE iLen = static_cast(sizeof(int)); int ret = ::getsockopt(_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast(&iVal), &iLen); if (ret < 0 || iVal) { string err = TC_Exception::parseError(iVal); THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect " + _desc + " error:" + err); } _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str()); getpeername(_fd, _clientAddr.first.get(), &_clientAddr.second); if(_bindAddr.first) { //如果服务器终止后,服务器可以第二次快速启动而不用等待一段时间 int iReuseAddr = 1; setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&iReuseAddr, sizeof(int)); ::bind(_fd, _bindAddr.first.get(), _bindAddr.second); } setConnected(); } } void TC_Transceiver::parseConnectAddress() { if (isConnectIPv6()) { TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), *(sockaddr_in6*)_serverAddr.first.get()); } else { TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), *(sockaddr_in*)_serverAddr.first.get()); } } bool TC_Transceiver::isSSL() const { return _ep.isSSL(); } void TC_Transceiver::connect() { if(_isServer) { THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call connect, " + _desc); } if(isValid()) { return; } if(_connStatus == eConnecting || _connStatus == eConnected) { return; } if (_ep.isUdp()) { _fd = createSocket(true, false, isConnectIPv6()); _connStatus = eConnected; _epollInfo = _epoller->createEpollInfo(_fd); _proxyInfo = _createSocketCallback(this); if(_proxyInfo) { _desc = _proxyInfo->getEndpoint().toString(); } //每次连接前都重新解析一下地址, 避免dns变了! parseConnectAddress(); } else { _fd = createSocket(false, false, isConnectIPv6()); _isConnTimeout = false; _epollInfo = _epoller->createEpollInfo(_fd); _connTimerId = _epoller->postDelayed(_connTimeout, std::bind(&TC_Transceiver::checkConnectTimeout, this)); _proxyInfo = _createSocketCallback(this); if(_proxyInfo) { _desc = _proxyInfo->getEndpoint().toString(); } //每次连接前都重新解析一下地址, 避免dns变了! parseConnectAddress(); bool bConnected = doConnect(_fd, _serverAddr.first.get(), _serverAddr.second); if(bConnected) { setConnected(); } else { _connStatus = TC_Transceiver::eConnecting; } } //设置套接口选项 for(size_t i=0; i< _socketOpts.size(); ++i) { setsockopt(_fd,_socketOpts[i].level,_socketOpts[i].optname, (const char*)_socketOpts[i].optval,_socketOpts[i].optlen); } } void TC_Transceiver::checkConnectTimeout() { if(_connStatus != eConnected) { _isConnTimeout = true; THROW_ERROR(TC_Transceiver_Exception, CR_ConnectTimeout, "connect timeout, " + _desc); } } void TC_Transceiver::setConnected() { if(_isServer) { THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call setConnected, " + _desc); } _connStatus = eConnected; if(_proxyInfo) { connectProxy(); } else { onSetConnected(); } } void TC_Transceiver::onSetConnected() { if(_isServer) { THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onSetConnected, " + _desc); } onConnect(); _onConnectCallback(this); if (!isSSL()) { doAuthReq(); } } void TC_Transceiver::onConnect() { if(_isServer) { THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onConnect, " + _desc); } _epoller->erase(_connTimerId); _connTimerId = 0; #if TARS_SSL if (isSSL()) { _openssl = _onOpensslCallback(this); if (!_openssl) { close(); return; } _openssl->init(false); _openssl->setReadBufferSize(1024 * 8); _openssl->setWriteBufferSize(1024 * 8); _openssl->recvBuffer()->setConnection(this); int ret = _openssl->doHandshake(_sendBuffer); if (ret != 0) { THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "ssl hande shake failed, " + _desc + ", error:" + _openssl->getErrMsg()); } // send the encrypt data from write buffer if (!_sendBuffer.empty()) { doRequest(); } return; } #endif } void TC_Transceiver::doAuthReq() { if (_ep.getAuthType() == TC_Endpoint::AUTH_TYPENONE) { _authState = eAuthSucc; _onRequestCallback(this); } else { //如果是客户端, 则主动发起鉴权请求 shared_ptr buff = _onClientSendAuthCallback(this); #if TARS_SSL if(this->isSSL()) { int ret = _openssl->write(buff->buffer(), (uint32_t) buff->length(), _sendBuffer); if(ret != 0) { THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "ssl write failed, " + _desc + ", error:" + _openssl->getErrMsg()); return ; } } else { _sendBuffer.addBuffer(buff); } #else _sendBuffer.addBuffer(buff); #endif doRequest(); } } void TC_Transceiver::connectProxy() { assert(_proxyInfo); vector buff; bool succ = _proxyInfo->sendProxyPacket(buff, _ep); if(!succ) { THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_SEND, "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg()); } _sendBuffer.addBuffer(buff); doRequest(); } int TC_Transceiver::doCheckProxy(const char *buff, size_t length) { if(!_proxyInfo || _proxyInfo->isSuccess()) return 0; bool succ = _proxyInfo->recvProxyPacket(buff, length); if(!succ) { THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_RECV, "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg()); } if(!_proxyInfo->isSuccess()) { connectProxy(); } else { onSetConnected(); } return 1; } void TC_Transceiver::udpClose() { if (_ep.isUdp()) { _epoller->releaseEpollInfo(_epollInfo); _epollInfo.reset(); TC_Port::closeSocket(_fd); _fd = -1; _connStatus = eUnconnected; _sendBuffer.clearBuffers(); _recvBuffer.clearBuffers(); } } void TC_Transceiver::close() { // LOG_CONSOLE_DEBUG << this << endl; if(!isValid()) return; if(_ep.isTcp()) { tcpClose(false, CR_ACTIVE, "active call"); } else { udpClose(); } } void TC_Transceiver::tcpClose(bool deconstructor, CloseReason reason, const string &err) { if(_ep.isTcp() && isValid()) { #if TARS_SSL if (_openssl) { _openssl->release(); _openssl.reset(); } #endif //LOG_CONSOLE_DEBUG << this << ", " << _fd << ", " << reason << ", " << err << ", " << deconstructor << endl; _epoller->releaseEpollInfo(_epollInfo); _epollInfo.reset(); TC_Port::closeSocket(_fd); _fd = -1; _connStatus = eUnconnected; _sendBuffer.clearBuffers(); _recvBuffer.clearBuffers(); _authState = eAuthInit; if(!deconstructor) { //注意必须放在最后, 主要避免_onCloseCallback里面析构了链接, 从而导致又进入tcpClose //放在最后就不会有问题了, 因为不会再进入这个函数 _onCloseCallback(this, reason, err); } } } void TC_Transceiver::doRequest() { if(!isValid()) return ; checkConnect(); //buf不为空,先发送buffer的内容 while(!_sendBuffer.empty()) { auto data = _sendBuffer.getBufferPointer(); assert(data.first != NULL && data.second != 0); int iRet = this->send(data.first, (uint32_t) data.second, 0); if (iRet <= 0) { return; } _sendBuffer.moveHeader(iRet); } if(_sendBuffer.empty()) { _onRequestCallback(this); } } TC_Transceiver::ReturnStatus TC_Transceiver::sendRequest(const shared_ptr &buff, const TC_Socket::addr_type& addr) { // LOG_CONSOLE_DEBUG << buff->length() << endl; //空数据 直接返回成功 if(buff->empty()) { return eRetOk; } // assert(_sendBuffer.empty()); //buf不为空, 表示之前的数据还没发送完, 直接返回失败, 等buffer可写了,epoll会通知写事件 if(!_sendBuffer.empty()) { //不应该运行到这里 return eRetNotSend; } if(eConnected != _connStatus) { return eRetNotSend; } if(_proxyInfo && !_proxyInfo->isSuccess()) { return eRetNotSend; } if (_ep.isTcp() && _ep.getAuthType() == TC_Endpoint::AUTH_TYPELOCAL && _authState != eAuthSucc) { #if TARS_SSL if (isSSL() && !_openssl) { return eRetNotSend; } #endif return eRetNotSend; // 需要鉴权但还没通过,不能发送非认证消息 } #if TARS_SSL // 握手数据已加密,直接发送,会话数据需加密 if (isSSL()) { if(!_openssl->isHandshaked()) { return eRetNotSend; } int ret = _openssl->write(buff->buffer(), (uint32_t) buff->length(), _sendBuffer); if(ret != 0) { close(); return eRetError; } buff->clear(); } else { _sendBuffer.addBuffer(buff); } #else _sendBuffer.addBuffer(buff); #endif // LOG_CONSOLE_DEBUG << _sendBuffer.getBufferLength() << endl; _lastAddr = addr; do { auto data = _sendBuffer.getBufferPointer(); int iRet = this->send(data.first, (uint32_t) data.second, 0); if(iRet < 0) { if(!isValid()) { _sendBuffer.clearBuffers(); return eRetError; } else { return eRetFull; } } _sendBuffer.moveHeader(iRet); // assert(iRet != 0); } while(!_sendBuffer.empty()); return eRetOk; } void TC_Transceiver::doAuthCheck(TC_NetWorkBuffer *buff) { if (!buff->empty() && _ep.isTcp() && _ep.getAuthType() == TC_Endpoint::AUTH_TYPELOCAL && _authState != eAuthSucc) { TC_NetWorkBuffer::PACKET_TYPE type; if(_isServer) { //验证鉴权 auto ret = _onServerVerifyAuthCallback(*buff, this); type = ret.first; if(type == TC_NetWorkBuffer::PACKET_FULL) { _authState = eAuthSucc; //服务器端, 鉴权通过, 可以响应包 sendRequest(ret.second, _serverAddr); } } else { type = _onClientVerifyAuthCallback(*buff, this); if(type == TC_NetWorkBuffer::PACKET_FULL) { _authState = eAuthSucc; //客户端, 鉴权通过可以发送业务包了 _onRequestCallback(this); } } if(type == TC_NetWorkBuffer::PACKET_ERR) { THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "[TC_Transceiver::doProtocolAnalysis, auth error]"); } } } int TC_Transceiver::doProtocolAnalysis(TC_NetWorkBuffer *buff) { doAuthCheck(buff); TC_NetWorkBuffer::PACKET_TYPE ret; int packetCount = 0; try { do { ret = _onParserCallback(*buff, this); if(ret == TC_NetWorkBuffer::PACKET_FULL || ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE) { ++packetCount; } if(ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE) { //full close模式下, 需要关闭连接 tcpClose(false, CR_PROTOCOL, "protocol full close"); } if(_onCompletePackageCallback) { //收到一个完整的包 _onCompletePackageCallback(this); } } while (ret == TC_NetWorkBuffer::PACKET_FULL); } catch (exception & ex) { THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error:" + string(ex.what()) + "]"); } catch (...) { THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error"); } if (ret == TC_NetWorkBuffer::PACKET_ERR) { string err = "parser decode error, " + _desc; tcpClose(false, CR_PROTOCOL, err); throw TC_Transceiver_Exception(err); } return packetCount; } ////////////////////////////////////////////////////////// TC_TCPTransceiver::TC_TCPTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep) : TC_Transceiver(epoller, ep) { assert(epoller); } //不同的内存分配机制 #if 0 void TC_TCPTransceiver::doResponse() { checkConnect(); int iRet = 0; int packetCount = 0; do { char buff[BUFFER_SIZE]; if ((iRet = this->recv((void*)buff, BUFFER_SIZE, 0)) > 0) { int check = doCheckProxy(buff, iRet); if(check != 0) { _recvBuffer.clearBuffers(); return; } _recvBuffer.addBuffer(buff, iRet); //解析协议 packetCount += doProtocolAnalysis(&_recvBuffer); //收包太多了, 中断一下, 释放线程给send等 if (packetCount >= 2000 && isValid()) { _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT); break; } //接收的数据小于buffer大小, 内核会再次通知你 if(iRet < BUFFER_SIZE) { break; } } } while (iRet>0); } #else void TC_TCPTransceiver::doResponse() { checkConnect(); int iRet = 0; int packetCount = 0; do { auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE/8, BUFFER_SIZE); uint32_t left = (uint32_t)data->left(); if ((iRet = this->recv((void*)data->free(), left, 0)) > 0) { int check = doCheckProxy(data->free(), iRet); if(check != 0) { _recvBuffer.clearBuffers(); return; } data->addWriteIdx(iRet); _recvBuffer.addLength(iRet); //解析协议 packetCount += doProtocolAnalysis(&_recvBuffer); //收包太多了, 中断一下, 释放线程给send等 if (packetCount >= 2000 && isValid()) { _epollInfo->mod(EPOLLIN | EPOLLOUT); // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT); break; } //接收的数据小于buffer大小, 内核会再次通知你 if(iRet < (int)left) { break; } } } while (iRet>0); } #endif int TC_TCPTransceiver::send(const void* buf, uint32_t len, uint32_t flag) { //只有是连接状态才能收发数据 if(eConnected != _connStatus) { return -1; } int iRet = ::send(_fd, (const char*)buf, len, flag); // LOG_CONSOLE_DEBUG << this << ", send, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << ", len:" << len << endl; if (iRet < 0 && !TC_Socket::isPending()) { THROW_ERROR(TC_Transceiver_Exception, CR_SEND, "TC_TCPTransceiver::send, " + _desc + ", fd:" + TC_Common::tostr(_fd)); } #if TARGET_PLATFORM_WINDOWS if(iRet < 0 && TC_Socket::isPending()) { _epollInfo->mod(EPOLLIN | EPOLLOUT); } #endif return iRet; } int TC_TCPTransceiver::recv(void* buf, uint32_t len, uint32_t flag) { //只有是连接状态才能收发数据 if(eConnected != _connStatus) return -1; int iRet = ::recv(_fd, (char*)buf, len, flag); // LOG_CONSOLE_DEBUG << this << ", recv, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << endl; if (iRet == 0 || (iRet < 0 && !TC_Socket::isPending())) { int nerr = TC_Exception::getSystemCode(); string err = "recv error, errno:" + TC_Common::tostr(nerr) + "," + TC_Exception::parseError(nerr); THROW_ERROR(TC_Transceiver_Exception, CR_RECV, err + ", " + _desc + ", fd:" + TC_Common::tostr(_fd)); } #if TARGET_PLATFORM_WINDOWS if(iRet < 0 && TC_Socket::isPending()) { _epollInfo->mod(EPOLLIN | EPOLLOUT); } #endif return iRet; } ///////////////////////////////////////////////////////////////// #if TARS_SSL TC_SSLTransceiver::TC_SSLTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep) : TC_TCPTransceiver(epoller, ep) { } #if 0 void TC_SSLTransceiver::doResponse() { checkConnect(); int iRet = 0; int packetCount = 0; do { char buff[BUFFER_SIZE] = {0x00}; if ((iRet = this->recv(buff, BUFFER_SIZE, 0)) > 0) { int check = doCheckProxy(buff, iRet); if(check != 0) { return; } const bool preHandshake = _openssl->isHandshaked(); int ret = _openssl->read(buff, iRet, _sendBuffer); if (ret != 0) { // LOG_CONSOLE_DEBUG << "ret:" << ret << ", " << _openssl->getErrMsg() << endl; THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, SSL_read handshake failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]"); } else if(!_sendBuffer.empty()) { // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse SSL_read prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << ", send handshake len:" << _sendBuffer.getBufferLength() << endl; int ret = doRequest(); if(ret < 0) { // doRequest失败 close fd if (!isValid()) { THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, ssl doRequest failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]"); } else { return; } } } // LOG_CONSOLE_DEBUG << "recv length:" << iRet << ", preHandshake:" << preHandshake << endl; if (!_openssl->isHandshaked()) { // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse not handshake, prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl; return; } if (!preHandshake) { if(_isServer) { _onRequestCallback(this); } else { //握手完毕, 客户端直接发送鉴权请求 doAuthReq(); // doAuthReq失败,会close fd, 这里判断下是否还有效 if (!isValid()) { THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, doAuthReq failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]"); } else { // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl; } } } TC_NetWorkBuffer *rbuf = _openssl->recvBuffer(); //解析协议 packetCount += doProtocolAnalysis(rbuf); //收包太多了, 中断一下, 释放线程给send等 if (packetCount >= 1000 && isValid()) { _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT); break; } //接收的数据小于buffer大小, 内核会再次通知你 if(iRet < BUFFER_SIZE) { break; } } } while (iRet>0); } #else void TC_SSLTransceiver::doResponse() { checkConnect(); int iRet = 0; int packetCount = 0; do { auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE/8, BUFFER_SIZE); uint32_t left = (uint32_t)data->left(); if ((iRet = this->recv((void*)data->free(), left, 0)) > 0) { int check = doCheckProxy(data->free(), iRet); if(check != 0) { return; } const bool preHandshake = _openssl->isHandshaked(); int ret = _openssl->read(data->free(), iRet, _sendBuffer); if (ret != 0) { // LOG_CONSOLE_DEBUG << "ret:" << ret << ", " << _openssl->getErrMsg() << endl; THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, SSL_read handshake failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]"); } else if(!_sendBuffer.empty()) { doRequest(); } if (!_openssl->isHandshaked()) { // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse not handshake, prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl; return; } if (!preHandshake) { if(_isServer) { _onRequestCallback(this); } else { //握手完毕, 客户端直接发送鉴权请求 doAuthReq(); // doAuthReq失败,会close fd, 这里判断下是否还有效 if (!isValid()) { THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, doAuthReq failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]"); } else { // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl; } } } TC_NetWorkBuffer *rbuf = _openssl->recvBuffer(); //解析协议 packetCount += doProtocolAnalysis(rbuf); //收包太多了, 中断一下, 释放线程给send等 if (packetCount >= 1000 && isValid()) { _epollInfo->mod(EPOLLIN | EPOLLOUT); // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT); break; } //接收的数据小于buffer大小, 内核会再次通知你 if(iRet < left) { break; } } } while (iRet>0); } #endif #endif ///////////////////////////////////////////////////////////////// TC_UDPTransceiver::TC_UDPTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep) : TC_Transceiver(epoller, ep) { } TC_UDPTransceiver::~TC_UDPTransceiver() { } void TC_UDPTransceiver::doResponse() { checkConnect(); int iRet = 0; int packetCount = 0; do { _recvBuffer.clearBuffers(); auto data = _recvBuffer.getOrCreateBuffer(_nRecvBufferSize, _nRecvBufferSize); uint32_t left = (uint32_t)data->left(); if ((iRet = this->recv((void *)data->free(), left, 0)) > 0) { data->addWriteIdx(iRet); _recvBuffer.addLength(iRet); //解析协议 packetCount += doProtocolAnalysis(&_recvBuffer); // LOG_CONSOLE_DEBUG << iRet << ", " << packetCount << endl; //收包太多了, 中断一下, 释放线程给send等 if (packetCount >= 1000 && isValid()) { _epollInfo->mod(EPOLLIN | EPOLLOUT); // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT); break; } } } while (iRet > 0); } int TC_UDPTransceiver::send(const void* buf, uint32_t len, uint32_t flag) { if(!isValid()) return -1; int iRet = 0; if(_isServer) { iRet=::sendto(_fd, (const char*)buf, len, flag, _lastAddr.first.get(), _lastAddr.second); } else { iRet=::sendto(_fd, (const char*)buf, len, flag, _serverAddr.first.get(), _serverAddr.second); } if(iRet > 0) { //udp只发一次 发送一半也算全部发送成功 return len; } if (iRet < 0 && TC_Socket::isPending()) { //EAGAIN, 认为没有发送 return 0; } return iRet; } int TC_UDPTransceiver::recv(void* buf, uint32_t len, uint32_t flag) { if(!isValid()) return -1; _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str()); int iRet = ::recvfrom(_fd, (char*)buf, len, flag, _clientAddr.first.get(), &_clientAddr.second); //need check from_ip & port // cout << "recv :" << iRet << endl; // if(iRet < 0) // { // LOG_CONSOLE_DEBUG << this << ", " << TC_Socket::isPending() << ", " << _isServer << ", recv, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << ", len:" << len << endl; // } if(!_isServer) { //客户端才会关闭连接, 会重建socket, 服务端不会 if (iRet < 0 && !TC_Socket::isPending()) { THROW_ERROR(TC_Transceiver_Exception, CR_RECV, "TC_UDPTransceiver::udp recv, " + _desc + ", fd:" + TC_Common::tostr(_fd)); return 0; } } return iRet; } ///////////////////////////////////////////////////////////////// }