1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285 |
- #include "util/tc_transceiver.h"
- #include "util/tc_logger.h"
- #if TARS_SSL
- #include "util/tc_openssl.h"
- #endif
- #include <sstream>
- namespace tars
- {
- class CloseClourse
- {
- public:
- CloseClourse(TC_Transceiver *trans, TC_Transceiver::CloseReason reason, const string &err) : _trans(trans), _reason(reason), _err(err)
- {}
- ~CloseClourse() {
- _trans->tcpClose(false, _reason, _err);
- }
- protected:
- TC_Transceiver *_trans;
- TC_Transceiver::CloseReason _reason;
- string _err;
- };
- #define THROW_ERROR(x, r, y) { CloseClourse c(this, r, y); THROW_EXCEPTION_SYSCODE(x, y); }
- static const int BUFFER_SIZE = 16 * 1024;
- ///////////////////////////////////////////////////////////////////////
- int TC_Transceiver::createSocket(bool udp, bool isLocal, bool isIpv6)
- {
- #if TARGET_PLATFORM_WINDOWS
- int domain = (isIpv6 ? PF_INET6 : PF_INET);
- #else
- int domain = isLocal ? PF_LOCAL : (isIpv6 ? PF_INET6 : PF_INET);
- #endif
- int type = udp ? SOCK_DGRAM : SOCK_STREAM;
- TC_Socket s;
- s.createSocket(type, domain);
- if(!udp)
- {
- s.setTcpNoDelay();
- s.setKeepAlive();
- s.setNoCloseWait();
- }
- else
- {
- s.setRecvBufferSize(512*1024);
- s.setSendBufferSize(512*1024);
- }
- s.setOwner(false);
- s.setblock(false);
- return s.getfd();
- }
- bool TC_Transceiver::doConnect(int fd, const struct sockaddr *addr, socklen_t len)
- {
- bool bConnected = false;
- int iRet = ::connect(fd, addr, len);
- if (iRet == 0)
- {
- bConnected = true;
- }
- else if (!TC_Socket::isInProgress())
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect error, " + _desc);//, TC_Exception::getSystemCode());
- }
- // LOG_CONSOLE_DEBUG << bConnected << endl;
- return bConnected;
- }
- TC_Transceiver::TC_Transceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
- : _epoller(epoller)
- , _ep(ep)
- , _desc(ep.toString())
- , _fd(-1)
- , _connStatus(eUnconnected)
- , _sendBuffer(this)
- , _recvBuffer(this)
- , _authState(eAuthInit)
- {
- // LOG_CONSOLE_DEBUG << endl;
- if (ep.isUdp())
- {
- _pRecvBuffer = std::make_shared<TC_NetWorkBuffer::Buffer>();
- _nRecvBufferSize = DEFAULT_RECV_BUFFERSIZE;
- _pRecvBuffer->alloc(_nRecvBufferSize);
- }
- _serverAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
- }
- TC_Transceiver::~TC_Transceiver()
- {
- if(!isValid()) return;
- if(_ep.isTcp())
- {
- tcpClose(true, CR_DECONSTRUCTOR, "");
- }
- else
- {
- udpClose();
- }
- }
- void TC_Transceiver::initializeClient(const oncreate_callback &oncreate,
- const onclose_callback &onclose,
- const onconnect_callback &onconnect,
- const onrequest_callback &onrequest,
- const onparser_callback &onparser,
- const onopenssl_callback &onopenssl,
- const oncompletepackage_callback &onfinish)
- {
- _isServer = false;
- _createSocketCallback = oncreate;
- _onConnectCallback = onconnect;
- _onRequestCallback = onrequest;
- _onCloseCallback = onclose;
- _onParserCallback = onparser;
- _onCompletePackageCallback = onfinish;
- _onOpensslCallback = onopenssl;
- }
- void TC_Transceiver::initializeServer(const onclose_callback &onclose,
- const onrequest_callback &onrequest,
- const onparser_callback &onparser,
- const onopenssl_callback &onopenssl,
- const oncompletepackage_callback &onfinish)
- {
- _isServer = true;
- _connStatus = eConnected;
- _onRequestCallback = onrequest;
- _onCloseCallback = onclose;
- _onParserCallback = onparser;
- _onCompletePackageCallback = onfinish;
- _onOpensslCallback = onopenssl;
- #if TARS_SSL
- if (isSSL())
- {
- _openssl = _onOpensslCallback(this);
- if (!_openssl)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error]");
- }
- _openssl->init(true);
- _openssl->recvBuffer()->setConnection(this);
- int ret = _openssl->doHandshake(_sendBuffer);
- if (ret != 0)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_SSL_HANDSHAKE, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error: " + _openssl->getErrMsg() + "]");
- }
- // send the encrypt data from write buffer
- if (!_sendBuffer.empty())
- {
- doRequest();
- }
- }
- #endif
- }
- void TC_Transceiver::setClientAuthCallback(const onclientsendauth_callback &onsendauth, const onclientverifyauth_callback &onverifyauth)
- {
- _onClientSendAuthCallback = onsendauth;
-
- _onClientVerifyAuthCallback = onverifyauth;
- }
- void TC_Transceiver::setServerAuthCallback(const onserververifyauth_callback &onverifyauth)
- {
- _onServerVerifyAuthCallback = onverifyauth;
- }
- void TC_Transceiver::setBindAddr(const char *host)
- {
- if(_isServer)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_Type, "setBindAddr(" + string(host) + ") only use in client, " + _desc);
- }
- _bindAddr = TC_Socket::createSockAddr(host);
- }
- void TC_Transceiver::setBindAddr(const TC_Socket::addr_type &bindAddr)
- {
- if(_isServer)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_Type,"setBindAddr only use in client, " + _desc);
- }
- _bindAddr = bindAddr;
- }
- shared_ptr<TC_Epoller::EpollInfo> TC_Transceiver::bindFd(int fd)
- {
- if(!_isServer)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_Type, "client should not call bindFd, " + _desc);
- }
- _connStatus = eConnected;
- _fd = fd;
- //设置套接口选项
- for(size_t i=0; i< _socketOpts.size(); ++i)
- {
- setsockopt(_fd,_socketOpts[i].level,_socketOpts[i].optname, (const char*)_socketOpts[i].optval,_socketOpts[i].optlen);
- }
- _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
- getpeername(_fd, _clientAddr.first.get(), &_clientAddr.second);
- _epollInfo = _epoller->createEpollInfo(_fd);
- return _epollInfo;
- }
- void TC_Transceiver::setUdpRecvBuffer(size_t nSize)
- {
- _nRecvBufferSize = nSize;
- _pRecvBuffer->alloc(_nRecvBufferSize);
- }
- void TC_Transceiver::checkConnect()
- {
- //检查连接是否有错误
- if(isConnecting())
- {
- int iVal = 0;
- SOCKET_LEN_TYPE iLen = static_cast<SOCKET_LEN_TYPE>(sizeof(int));
- int ret = ::getsockopt(_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&iVal), &iLen);
- if (ret < 0 || iVal)
- {
- string err = TC_Exception::parseError(iVal);
- THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect " + _desc + " error:" + err);
- }
- _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
- getpeername(_fd, _clientAddr.first.get(), &_clientAddr.second);
- if(_bindAddr.first)
- {
- //如果服务器终止后,服务器可以第二次快速启动而不用等待一段时间
- int iReuseAddr = 1;
- setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&iReuseAddr, sizeof(int));
- ::bind(_fd, _bindAddr.first.get(), _bindAddr.second);
- }
- setConnected();
- }
- }
- void TC_Transceiver::parseConnectAddress()
- {
- if (isConnectIPv6())
- {
- TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), *(sockaddr_in6*)_serverAddr.first.get());
- }
- else
- {
- TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), *(sockaddr_in*)_serverAddr.first.get());
- }
- }
- bool TC_Transceiver::isSSL() const
- {
- return _ep.isSSL();
- }
- void TC_Transceiver::connect()
- {
- if(_isServer)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call connect, " + _desc);
- }
- if(isValid())
- {
- return;
- }
- if(_connStatus == eConnecting || _connStatus == eConnected)
- {
- return;
- }
- if (_ep.isUdp())
- {
- _fd = createSocket(true, false, isConnectIPv6());
- _connStatus = eConnected;
- _epollInfo = _epoller->createEpollInfo(_fd);
- _proxyInfo = _createSocketCallback(this);
- if(_proxyInfo)
- {
- _desc = _proxyInfo->getEndpoint().toString();
- }
- //每次连接前都重新解析一下地址, 避免dns变了!
- parseConnectAddress();
- }
- else
- {
- _fd = createSocket(false, false, isConnectIPv6());
- _isConnTimeout = false;
- _epollInfo = _epoller->createEpollInfo(_fd);
- _connTimerId = _epoller->postDelayed(_connTimeout, std::bind(&TC_Transceiver::checkConnectTimeout, this));
- _proxyInfo = _createSocketCallback(this);
- if(_proxyInfo)
- {
- _desc = _proxyInfo->getEndpoint().toString();
- }
- //每次连接前都重新解析一下地址, 避免dns变了!
- parseConnectAddress();
- bool bConnected = doConnect(_fd, _serverAddr.first.get(), _serverAddr.second);
- if(bConnected)
- {
- setConnected();
- }
- else
- {
- _connStatus = TC_Transceiver::eConnecting;
- }
- }
- //设置套接口选项
- for(size_t i=0; i< _socketOpts.size(); ++i)
- {
- setsockopt(_fd,_socketOpts[i].level,_socketOpts[i].optname, (const char*)_socketOpts[i].optval,_socketOpts[i].optlen);
- }
- }
- void TC_Transceiver::checkConnectTimeout()
- {
- if(_connStatus != eConnected)
- {
- _isConnTimeout = true;
- THROW_ERROR(TC_Transceiver_Exception, CR_ConnectTimeout, "connect timeout, " + _desc);
- }
- }
- void TC_Transceiver::setConnected()
- {
- if(_isServer)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call setConnected, " + _desc);
- }
- _connStatus = eConnected;
- if(_proxyInfo)
- {
- connectProxy();
- }
- else
- {
- onSetConnected();
- }
- }
- void TC_Transceiver::onSetConnected()
- {
- if(_isServer)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onSetConnected, " + _desc);
- }
- onConnect();
- _onConnectCallback(this);
- if (!isSSL())
- {
- doAuthReq();
- }
- }
- void TC_Transceiver::onConnect()
- {
- if(_isServer)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onConnect, " + _desc);
- }
- _epoller->erase(_connTimerId);
- _connTimerId = 0;
- #if TARS_SSL
- if (isSSL())
- {
- _openssl = _onOpensslCallback(this);
- if (!_openssl)
- {
- close();
- return;
- }
- _openssl->init(false);
- _openssl->setReadBufferSize(1024 * 8);
- _openssl->setWriteBufferSize(1024 * 8);
- _openssl->recvBuffer()->setConnection(this);
- int ret = _openssl->doHandshake(_sendBuffer);
- if (ret != 0)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "ssl hande shake failed, " + _desc + ", error:" + _openssl->getErrMsg());
- }
- // send the encrypt data from write buffer
- if (!_sendBuffer.empty())
- {
- doRequest();
- }
- return;
- }
- #endif
- }
- void TC_Transceiver::doAuthReq()
- {
- if (_ep.getAuthType() == TC_Endpoint::AUTH_TYPENONE)
- {
- _authState = eAuthSucc;
- _onRequestCallback(this);
- }
- else
- {
- //如果是客户端, 则主动发起鉴权请求
- shared_ptr<TC_NetWorkBuffer::Buffer> buff = _onClientSendAuthCallback(this);
- #if TARS_SSL
- if(this->isSSL())
- {
- int ret = _openssl->write(buff->buffer(), (uint32_t) buff->length(), _sendBuffer);
- if(ret != 0)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "ssl write failed, " + _desc + ", error:" + _openssl->getErrMsg());
- return ;
- }
- }
- else
- {
- _sendBuffer.addBuffer(buff);
- }
- #else
- _sendBuffer.addBuffer(buff);
- #endif
- doRequest();
- }
- }
- void TC_Transceiver::connectProxy()
- {
- assert(_proxyInfo);
- vector<char> buff;
- bool succ = _proxyInfo->sendProxyPacket(buff, _ep);
- if(!succ)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_SEND, "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg());
- }
- _sendBuffer.addBuffer(buff);
- doRequest();
- }
- int TC_Transceiver::doCheckProxy(const char *buff, size_t length)
- {
- if(!_proxyInfo || _proxyInfo->isSuccess())
- return 0;
- bool succ = _proxyInfo->recvProxyPacket(buff, length);
- if(!succ)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_RECV, "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg());
- }
- if(!_proxyInfo->isSuccess())
- {
- connectProxy();
- }
- else
- {
- onSetConnected();
- }
- return 1;
- }
- void TC_Transceiver::udpClose()
- {
- if (_ep.isUdp())
- {
- _epoller->releaseEpollInfo(_epollInfo);
- _epollInfo.reset();
- TC_Port::closeSocket(_fd);
- _fd = -1;
- _connStatus = eUnconnected;
- _sendBuffer.clearBuffers();
- _recvBuffer.clearBuffers();
- }
- }
- void TC_Transceiver::close()
- {
- // LOG_CONSOLE_DEBUG << this << endl;
- if(!isValid()) return;
- if(_ep.isTcp())
- {
- tcpClose(false, CR_ACTIVE, "active call");
- }
- else
- {
- udpClose();
- }
- }
- void TC_Transceiver::tcpClose(bool deconstructor, CloseReason reason, const string &err)
- {
- if(_ep.isTcp() && isValid())
- {
- #if TARS_SSL
- if (_openssl)
- {
- _openssl->release();
- _openssl.reset();
- }
- #endif
- //LOG_CONSOLE_DEBUG << this << ", " << _fd << ", " << reason << ", " << err << ", " << deconstructor << endl;
- _epoller->releaseEpollInfo(_epollInfo);
- _epollInfo.reset();
- TC_Port::closeSocket(_fd);
- _fd = -1;
- _connStatus = eUnconnected;
- _sendBuffer.clearBuffers();
- _recvBuffer.clearBuffers();
- _authState = eAuthInit;
- if(!deconstructor)
- {
- //注意必须放在最后, 主要避免_onCloseCallback里面析构了链接, 从而导致又进入tcpClose
- //放在最后就不会有问题了, 因为不会再进入这个函数
- _onCloseCallback(this, reason, err);
- }
- }
- }
- void TC_Transceiver::doRequest()
- {
- if(!isValid()) return ;
- checkConnect();
- //buf不为空,先发送buffer的内容
- while(!_sendBuffer.empty())
- {
- auto data = _sendBuffer.getBufferPointer();
- assert(data.first != NULL && data.second != 0);
- int iRet = this->send(data.first, (uint32_t) data.second, 0);
- if (iRet <= 0)
- {
- return;
- }
- _sendBuffer.moveHeader(iRet);
- }
- if(_sendBuffer.empty())
- {
- _onRequestCallback(this);
- }
- }
- TC_Transceiver::ReturnStatus TC_Transceiver::sendRequest(const shared_ptr<TC_NetWorkBuffer::Buffer> &buff, const TC_Socket::addr_type& addr)
- {
- // LOG_CONSOLE_DEBUG << buff->length() << endl;
- //空数据 直接返回成功
- if(buff->empty()) {
- return eRetOk;
- }
- // assert(_sendBuffer.empty());
- //buf不为空, 表示之前的数据还没发送完, 直接返回失败, 等buffer可写了,epoll会通知写事件
- if(!_sendBuffer.empty()) {
- //不应该运行到这里
- return eRetNotSend;
- }
- if(eConnected != _connStatus)
- {
- return eRetNotSend;
- }
- if(_proxyInfo && !_proxyInfo->isSuccess()) {
- return eRetNotSend;
- }
- if (_ep.isTcp() && _ep.getAuthType() == TC_Endpoint::AUTH_TYPELOCAL && _authState != eAuthSucc)
- {
- #if TARS_SSL
- if (isSSL() && !_openssl)
- {
- return eRetNotSend;
- }
- #endif
- return eRetNotSend; // 需要鉴权但还没通过,不能发送非认证消息
- }
- #if TARS_SSL
- // 握手数据已加密,直接发送,会话数据需加密
- if (isSSL())
- {
- if(!_openssl->isHandshaked()) {
- return eRetNotSend;
- }
- int ret = _openssl->write(buff->buffer(), (uint32_t) buff->length(), _sendBuffer);
- if(ret != 0)
- {
- close();
- return eRetError;
- }
- buff->clear();
- }
- else
- {
- _sendBuffer.addBuffer(buff);
- }
- #else
- _sendBuffer.addBuffer(buff);
- #endif
- // LOG_CONSOLE_DEBUG << _sendBuffer.getBufferLength() << endl;
- _lastAddr = addr;
- do
- {
- auto data = _sendBuffer.getBufferPointer();
- int iRet = this->send(data.first, (uint32_t) data.second, 0);
- if(iRet < 0)
- {
- if(!isValid())
- {
- _sendBuffer.clearBuffers();
- return eRetError;
- }
- else
- {
- return eRetFull;
- }
- }
- _sendBuffer.moveHeader(iRet);
- // assert(iRet != 0);
- }
- while(!_sendBuffer.empty());
- return eRetOk;
- }
- void TC_Transceiver::doAuthCheck(TC_NetWorkBuffer *buff)
- {
- if (!buff->empty() && _ep.isTcp() && _ep.getAuthType() == TC_Endpoint::AUTH_TYPELOCAL && _authState != eAuthSucc)
- {
- TC_NetWorkBuffer::PACKET_TYPE type;
- if(_isServer)
- {
- //验证鉴权
- auto ret = _onServerVerifyAuthCallback(*buff, this);
- type = ret.first;
- if(type == TC_NetWorkBuffer::PACKET_FULL)
- {
- _authState = eAuthSucc;
- //服务器端, 鉴权通过, 可以响应包
- sendRequest(ret.second, _serverAddr);
- }
- }
- else
- {
- type = _onClientVerifyAuthCallback(*buff, this);
- if(type == TC_NetWorkBuffer::PACKET_FULL)
- {
- _authState = eAuthSucc;
- //客户端, 鉴权通过可以发送业务包了
- _onRequestCallback(this);
- }
- }
-
- if(type == TC_NetWorkBuffer::PACKET_ERR)
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "[TC_Transceiver::doProtocolAnalysis, auth error]");
- }
- }
- }
- int TC_Transceiver::doProtocolAnalysis(TC_NetWorkBuffer *buff)
- {
- doAuthCheck(buff);
- TC_NetWorkBuffer::PACKET_TYPE ret;
- int packetCount = 0;
- int ioriginal = 0;
- int isurplus = 0;
- try
- {
- do
- {
- ioriginal = buff->getBuffers().size();
- ret = _onParserCallback(*buff, this);
- isurplus = buff->getBuffers().size();
- if(ret == TC_NetWorkBuffer::PACKET_FULL || ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE)
- {
- ++packetCount;
- }
- if(ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE) {
- //full close模式下, 需要关闭连接
- tcpClose(false, CR_PROTOCOL, "protocol full close");
- }
- if(_onCompletePackageCallback) {
- //收到一个完整的包
- _onCompletePackageCallback(this);
- }
- // 当收到完整包时,解析完包后,buffer没movehead,则报错
- if (ret == TC_NetWorkBuffer::PACKET_FULL && ioriginal == isurplus)
- {
- ret = TC_NetWorkBuffer::PACKET_FULL_CLOSE;
- string err = "parser buffer movehead error, " + _desc;
- tcpClose(false, CR_PROTOCOL, err); // 这个地方会将连接关闭,为了方便后期问题定位
- throw TC_Transceiver_Exception(err);
- }
- }
- while (ret == TC_NetWorkBuffer::PACKET_FULL);
- }
- catch (exception & ex) {
- THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error:" + string(ex.what()) + "]");
- }
- catch (...) {
- THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error");
- }
- if (ret == TC_NetWorkBuffer::PACKET_ERR)
- {
- string err = "parser decode error, " + _desc;
- tcpClose(false, CR_PROTOCOL, err);
- throw TC_Transceiver_Exception(err);
- }
- return packetCount;
- }
- //////////////////////////////////////////////////////////
- TC_TCPTransceiver::TC_TCPTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
- : TC_Transceiver(epoller, ep)
- {
- assert(epoller);
- }
- //不同的内存分配机制
- #if 0
- void TC_TCPTransceiver::doResponse()
- {
- checkConnect();
- int iRet = 0;
- int packetCount = 0;
- do
- {
- char buff[BUFFER_SIZE];
- if ((iRet = this->recv((void*)buff, BUFFER_SIZE, 0)) > 0)
- {
- int check = doCheckProxy(buff, iRet);
- if(check != 0)
- {
- _recvBuffer.clearBuffers();
- return;
- }
- _recvBuffer.addBuffer(buff, iRet);
- //解析协议
- packetCount += doProtocolAnalysis(&_recvBuffer);
- //收包太多了, 中断一下, 释放线程给send等
- if (packetCount >= 2000 && isValid())
- {
- _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
- break;
- }
- //接收的数据小于buffer大小, 内核会再次通知你
- if(iRet < BUFFER_SIZE)
- {
- break;
- }
- }
- }
- while (iRet>0);
- }
- #else
- void TC_TCPTransceiver::doResponse()
- {
- checkConnect();
- int iRet = 0;
- int packetCount = 0;
- do
- {
- auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE/8, BUFFER_SIZE);
- uint32_t left = (uint32_t)data->left();
- if ((iRet = this->recv((void*)data->free(), left, 0)) > 0)
- {
- int check = doCheckProxy(data->free(), iRet);
- if(check != 0)
- {
- _recvBuffer.clearBuffers();
- return;
- }
- data->addWriteIdx(iRet);
- _recvBuffer.addLength(iRet);
- //解析协议
- packetCount += doProtocolAnalysis(&_recvBuffer);
- //收包太多了, 中断一下, 释放线程给send等
- if (packetCount >= 2000 && isValid())
- {
- _epollInfo->mod(EPOLLIN | EPOLLOUT);
- // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
- break;
- }
- //接收的数据小于buffer大小, 内核会再次通知你
- if(iRet < (int)left)
- {
- break;
- }
- }
- }
- while (iRet>0);
- }
- #endif
- int TC_TCPTransceiver::send(const void* buf, uint32_t len, uint32_t flag)
- {
- //只有是连接状态才能收发数据
- if(eConnected != _connStatus)
- {
- return -1;
- }
- int iRet = ::send(_fd, (const char*)buf, len, flag);
- // LOG_CONSOLE_DEBUG << this << ", send, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << ", len:" << len << endl;
- if (iRet < 0 && !TC_Socket::isPending())
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_SEND, "TC_TCPTransceiver::send, " + _desc + ", fd:" + TC_Common::tostr(_fd));
- }
- #if TARGET_PLATFORM_WINDOWS
- if(iRet < 0 && TC_Socket::isPending())
- {
- _epollInfo->mod(EPOLLIN | EPOLLOUT);
- }
- #endif
- return iRet;
- }
- int TC_TCPTransceiver::recv(void* buf, uint32_t len, uint32_t flag)
- {
- //只有是连接状态才能收发数据
- if(eConnected != _connStatus)
- return -1;
- int iRet = ::recv(_fd, (char*)buf, len, flag);
- // LOG_CONSOLE_DEBUG << this << ", recv, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << endl;
- if (iRet == 0 || (iRet < 0 && !TC_Socket::isPending()))
- {
- int nerr = TC_Exception::getSystemCode();
- string err = "recv error, errno:" + TC_Common::tostr(nerr) + "," + TC_Exception::parseError(nerr);
- THROW_ERROR(TC_Transceiver_Exception, CR_RECV, err + ", " + _desc + ", fd:" + TC_Common::tostr(_fd));
- }
- #if TARGET_PLATFORM_WINDOWS
- if(iRet < 0 && TC_Socket::isPending())
- {
- _epollInfo->mod(EPOLLIN | EPOLLOUT);
- }
- #endif
- return iRet;
- }
- /////////////////////////////////////////////////////////////////
- #if TARS_SSL
- TC_SSLTransceiver::TC_SSLTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
- : TC_TCPTransceiver(epoller, ep)
- {
- }
- #if 0
- void TC_SSLTransceiver::doResponse()
- {
- checkConnect();
- int iRet = 0;
- int packetCount = 0;
- do
- {
- char buff[BUFFER_SIZE] = {0x00};
- if ((iRet = this->recv(buff, BUFFER_SIZE, 0)) > 0)
- {
- int check = doCheckProxy(buff, iRet);
- if(check != 0)
- {
- return;
- }
- const bool preHandshake = _openssl->isHandshaked();
- int ret = _openssl->read(buff, iRet, _sendBuffer);
- if (ret != 0)
- {
- // LOG_CONSOLE_DEBUG << "ret:" << ret << ", " << _openssl->getErrMsg() << endl;
- THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, SSL_read handshake failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]");
- }
- else if(!_sendBuffer.empty())
- {
- // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse SSL_read prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << ", send handshake len:" << _sendBuffer.getBufferLength() << endl;
- int ret = doRequest();
- if(ret < 0)
- {
- // doRequest失败 close fd
- if (!isValid())
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, ssl doRequest failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]");
- }
- else
- {
- return;
- }
- }
- }
- // LOG_CONSOLE_DEBUG << "recv length:" << iRet << ", preHandshake:" << preHandshake << endl;
- if (!_openssl->isHandshaked())
- {
- // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse not handshake, prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
- return;
- }
- if (!preHandshake)
- {
- if(_isServer)
- {
- _onRequestCallback(this);
- }
- else
- {
- //握手完毕, 客户端直接发送鉴权请求
- doAuthReq();
- // doAuthReq失败,会close fd, 这里判断下是否还有效
- if (!isValid())
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_SSL,
- "[TC_SSLTransceiver::doResponse, doAuthReq failed: " + _desc + ", info: " +
- _openssl->getErrMsg() + "]");
- }
- else
- {
- // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
- }
- }
- }
- TC_NetWorkBuffer *rbuf = _openssl->recvBuffer();
- //解析协议
- packetCount += doProtocolAnalysis(rbuf);
- //收包太多了, 中断一下, 释放线程给send等
- if (packetCount >= 1000 && isValid())
- {
- _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
- break;
- }
- //接收的数据小于buffer大小, 内核会再次通知你
- if(iRet < BUFFER_SIZE)
- {
- break;
- }
- }
- }
- while (iRet>0);
- }
- #else
- void TC_SSLTransceiver::doResponse()
- {
- checkConnect();
- int iRet = 0;
- int packetCount = 0;
- do
- {
- auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE/8, BUFFER_SIZE);
- uint32_t left = (uint32_t)data->left();
- if ((iRet = this->recv((void*)data->free(), left, 0)) > 0)
- {
- int check = doCheckProxy(data->free(), iRet);
- if(check != 0)
- {
- return;
- }
- const bool preHandshake = _openssl->isHandshaked();
- int ret = _openssl->read(data->free(), iRet, _sendBuffer);
- if (ret != 0)
- {
- // LOG_CONSOLE_DEBUG << "ret:" << ret << ", " << _openssl->getErrMsg() << endl;
- THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, SSL_read handshake failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]");
- }
- else if(!_sendBuffer.empty())
- {
- doRequest();
- }
- if (!_openssl->isHandshaked())
- {
- // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse not handshake, prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
- return;
- }
- if (!preHandshake)
- {
- if(_isServer)
- {
- _onRequestCallback(this);
- }
- else
- {
- //握手完毕, 客户端直接发送鉴权请求
- doAuthReq();
- // doAuthReq失败,会close fd, 这里判断下是否还有效
- if (!isValid())
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_SSL,
- "[TC_SSLTransceiver::doResponse, doAuthReq failed: " + _desc + ", info: " +
- _openssl->getErrMsg() + "]");
- }
- else
- {
- // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl;
- }
- }
- }
- TC_NetWorkBuffer *rbuf = _openssl->recvBuffer();
- //解析协议
- packetCount += doProtocolAnalysis(rbuf);
- //收包太多了, 中断一下, 释放线程给send等
- if (packetCount >= 1000 && isValid())
- {
- _epollInfo->mod(EPOLLIN | EPOLLOUT);
- // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
- break;
- }
- //接收的数据小于buffer大小, 内核会再次通知你
- if(iRet < left)
- {
- break;
- }
- }
- }
- while (iRet>0);
- }
- #endif
- #endif
- /////////////////////////////////////////////////////////////////
- TC_UDPTransceiver::TC_UDPTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep)
- : TC_Transceiver(epoller, ep)
- {
- }
- TC_UDPTransceiver::~TC_UDPTransceiver()
- {
- }
- void TC_UDPTransceiver::doResponse()
- {
- checkConnect();
- int iRet = 0;
- int packetCount = 0;
- do
- {
- _recvBuffer.clearBuffers();
- auto data = _recvBuffer.getOrCreateBuffer(_nRecvBufferSize, _nRecvBufferSize);
- uint32_t left = (uint32_t)data->left();
- if ((iRet = this->recv((void *)data->free(), left, 0)) > 0)
- {
- data->addWriteIdx(iRet);
- _recvBuffer.addLength(iRet);
- //解析协议
- packetCount += doProtocolAnalysis(&_recvBuffer);
- // LOG_CONSOLE_DEBUG << iRet << ", " << packetCount << endl;
- //收包太多了, 中断一下, 释放线程给send等
- if (packetCount >= 1000 && isValid())
- {
- _epollInfo->mod(EPOLLIN | EPOLLOUT);
- // _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT);
- break;
- }
- }
- }
- while (iRet > 0);
- }
- int TC_UDPTransceiver::send(const void* buf, uint32_t len, uint32_t flag)
- {
- if(!isValid()) return -1;
- int iRet = 0;
- if(_isServer)
- {
- iRet=::sendto(_fd, (const char*)buf, len, flag, _lastAddr.first.get(), _lastAddr.second);
- }
- else
- {
- iRet=::sendto(_fd, (const char*)buf, len, flag, _serverAddr.first.get(), _serverAddr.second);
- }
- if(iRet > 0)
- {
- //udp只发一次 发送一半也算全部发送成功
- return len;
- }
- if (iRet < 0 && TC_Socket::isPending())
- {
- //EAGAIN, 认为没有发送
- return 0;
- }
- return iRet;
- }
- int TC_UDPTransceiver::recv(void* buf, uint32_t len, uint32_t flag)
- {
- if(!isValid()) return -1;
- _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str());
- int iRet = ::recvfrom(_fd, (char*)buf, len, flag, _clientAddr.first.get(), &_clientAddr.second); //need check from_ip & port
- // cout << "recv :" << iRet << endl;
- // if(iRet < 0)
- // {
- // LOG_CONSOLE_DEBUG << this << ", " << TC_Socket::isPending() << ", " << _isServer << ", recv, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << ", len:" << len << endl;
- // }
- if(!_isServer)
- {
- //客户端才会关闭连接, 会重建socket, 服务端不会
- if (iRet < 0 && !TC_Socket::isPending())
- {
- THROW_ERROR(TC_Transceiver_Exception, CR_RECV, "TC_UDPTransceiver::udp recv, " + _desc + ", fd:" + TC_Common::tostr(_fd));
- return 0;
- }
- }
- return iRet;
- }
- /////////////////////////////////////////////////////////////////
- }
|