Browse Source

add MultiCommunicator, fix multi communicator bug

ruanshudong 3 years ago
parent
commit
12e9fbf8e5

+ 1 - 0
examples/QuickStartDemo/CMakeLists.txt

@@ -2,4 +2,5 @@
 include_directories(HelloServer/Server)
 add_subdirectory(HelloServer)
 add_subdirectory(ProxyServer)
+add_subdirectory(MultiCommunicator)
 

+ 2 - 0
examples/QuickStartDemo/MultiCommunicator/CMakeLists.txt

@@ -0,0 +1,2 @@
+build_tars_server("MultiCommunicator" "")
+

+ 314 - 0
examples/QuickStartDemo/MultiCommunicator/Hello.h

@@ -0,0 +1,314 @@
+// **********************************************************************
+// This file was generated by a TARS parser!
+// TARS version 2.4.13.
+// **********************************************************************
+
+#ifndef __HELLO_H_
+#define __HELLO_H_
+
+#include <map>
+#include <string>
+#include <vector>
+#include "tup/Tars.h"
+#include "tup/TarsJson.h"
+using namespace std;
+#include "servant/ServantProxy.h"
+#include "servant/Servant.h"
+
+
+namespace TestApp
+{
+
+    /* callback of async proxy for client */
+    class HelloPrxCallback: public tars::ServantProxyCallback
+    {
+    public:
+        virtual ~HelloPrxCallback(){}
+        virtual void callback_test(tars::Int32 ret)
+        { throw std::runtime_error("callback_test() override incorrect."); }
+        virtual void callback_test_exception(tars::Int32 ret)
+        { throw std::runtime_error("callback_test_exception() override incorrect."); }
+
+    public:
+        virtual const map<std::string, std::string> & getResponseContext() const
+        {
+            CallbackThreadData * pCbtd = CallbackThreadData::getData();
+            assert(pCbtd != NULL);
+
+            if(!pCbtd->getContextValid())
+            {
+                throw TC_Exception("cann't get response context");
+            }
+            return pCbtd->getResponseContext();
+        }
+
+    public:
+        virtual int onDispatch(tars::ReqMessagePtr msg)
+        {
+            static ::std::string __Hello_all[]=
+            {
+                "test"
+            };
+            pair<string*, string*> r = equal_range(__Hello_all, __Hello_all+1, string(msg->request.sFuncName));
+            if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
+            switch(r.first - __Hello_all)
+            {
+                case 0:
+                {
+                    if (msg->response->iRet != tars::TARSSERVERSUCCESS)
+                    {
+                        callback_test_exception(msg->response->iRet);
+
+                        return msg->response->iRet;
+                    }
+                    tars::TarsInputStream<tars::BufferReader> _is;
+
+                    _is.setBuffer(msg->response->sBuffer);
+                    tars::Int32 _ret;
+                    _is.read(_ret, 0, true);
+
+                    CallbackThreadData * pCbtd = CallbackThreadData::getData();
+                    assert(pCbtd != NULL);
+
+                    pCbtd->setResponseContext(msg->response->context);
+
+                    callback_test(_ret);
+
+                    pCbtd->delResponseContext();
+
+                    return tars::TARSSERVERSUCCESS;
+
+                }
+            }
+            return tars::TARSSERVERNOFUNCERR;
+        }
+
+    };
+    typedef tars::TC_AutoPtr<HelloPrxCallback> HelloPrxCallbackPtr;
+
+    /* callback of coroutine async proxy for client */
+    class HelloCoroPrxCallback: public HelloPrxCallback
+    {
+    public:
+        virtual ~HelloCoroPrxCallback(){}
+    public:
+        virtual const map<std::string, std::string> & getResponseContext() const { return _mRspContext; }
+
+        virtual void setResponseContext(const map<std::string, std::string> &mContext) { _mRspContext = mContext; }
+
+    public:
+        int onDispatch(tars::ReqMessagePtr msg)
+        {
+            static ::std::string __Hello_all[]=
+            {
+                "test"
+            };
+
+            pair<string*, string*> r = equal_range(__Hello_all, __Hello_all+1, string(msg->request.sFuncName));
+            if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
+            switch(r.first - __Hello_all)
+            {
+                case 0:
+                {
+                    if (msg->response->iRet != tars::TARSSERVERSUCCESS)
+                    {
+                        callback_test_exception(msg->response->iRet);
+
+                        return msg->response->iRet;
+                    }
+                    tars::TarsInputStream<tars::BufferReader> _is;
+
+                    _is.setBuffer(msg->response->sBuffer);
+                    try
+                    {
+                        tars::Int32 _ret;
+                        _is.read(_ret, 0, true);
+
+                        setResponseContext(msg->response->context);
+
+                        callback_test(_ret);
+
+                    }
+                    catch(std::exception &ex)
+                    {
+                        callback_test_exception(tars::TARSCLIENTDECODEERR);
+
+                        return tars::TARSCLIENTDECODEERR;
+                    }
+                    catch(...)
+                    {
+                        callback_test_exception(tars::TARSCLIENTDECODEERR);
+
+                        return tars::TARSCLIENTDECODEERR;
+                    }
+
+                    return tars::TARSSERVERSUCCESS;
+
+                }
+            }
+            return tars::TARSSERVERNOFUNCERR;
+        }
+
+    protected:
+        map<std::string, std::string> _mRspContext;
+    };
+    typedef tars::TC_AutoPtr<HelloCoroPrxCallback> HelloCoroPrxCallbackPtr;
+
+    /* proxy for client */
+    class HelloProxy : public tars::ServantProxy
+    {
+    public:
+        typedef map<string, string> TARS_CONTEXT;
+        tars::Int32 test(const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL)
+        {
+            tars::TarsOutputStream<tars::BufferWriterVector> _os;
+            std::map<string, string> _mStatus;
+            shared_ptr<tars::ResponsePacket> rep = tars_invoke(tars::TARSNORMAL,"test", _os, context, _mStatus);
+            if(pResponseContext)
+            {
+                pResponseContext->swap(rep->context);
+            }
+
+            tars::TarsInputStream<tars::BufferReader> _is;
+            _is.setBuffer(rep->sBuffer);
+            tars::Int32 _ret;
+            _is.read(_ret, 0, true);
+            return _ret;
+        }
+
+        void async_test(HelloPrxCallbackPtr callback,const map<string, string>& context = TARS_CONTEXT())
+        {
+            tars::TarsOutputStream<tars::BufferWriterVector> _os;
+            std::map<string, string> _mStatus;
+            tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback);
+        }
+        
+        void coro_test(HelloCoroPrxCallbackPtr callback,const map<string, string>& context = TARS_CONTEXT())
+        {
+            tars::TarsOutputStream<tars::BufferWriterVector> _os;
+            std::map<string, string> _mStatus;
+            tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback, true);
+        }
+
+        HelloProxy* tars_hash(int64_t key)
+        {
+            return (HelloProxy*)ServantProxy::tars_hash(key);
+        }
+
+        HelloProxy* tars_consistent_hash(int64_t key)
+        {
+            return (HelloProxy*)ServantProxy::tars_consistent_hash(key);
+        }
+
+        HelloProxy* tars_set_timeout(int msecond)
+        {
+            return (HelloProxy*)ServantProxy::tars_set_timeout(msecond);
+        }
+
+        static const char* tars_prxname() { return "HelloProxy"; }
+    };
+    typedef tars::TC_AutoPtr<HelloProxy> HelloPrx;
+
+    /* servant for server */
+    class Hello : public tars::Servant
+    {
+    public:
+        virtual ~Hello(){}
+        virtual tars::Int32 test(tars::TarsCurrentPtr current) = 0;
+        static void async_response_test(tars::TarsCurrentPtr current, tars::Int32 _ret)
+        {
+            if (current->getRequestVersion() == TUPVERSION )
+            {
+                UniAttribute<tars::BufferWriterVector, tars::BufferReader>  tarsAttr;
+                tarsAttr.setVersion(current->getRequestVersion());
+                tarsAttr.put("", _ret);
+                tarsAttr.put("tars_ret", _ret);
+
+                vector<char> sTupResponseBuffer;
+                tarsAttr.encode(sTupResponseBuffer);
+                current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer);
+            }
+            else if (current->getRequestVersion() == JSONVERSION)
+            {
+                tars::JsonValueObjPtr _p = new tars::JsonValueObj();
+                _p->value["tars_ret"] = tars::JsonOutput::writeJson(_ret);
+                vector<char> sJsonResponseBuffer;
+                tars::TC_Json::writeValue(_p, sJsonResponseBuffer);
+                current->sendResponse(tars::TARSSERVERSUCCESS, sJsonResponseBuffer);
+            }
+            else
+            {
+                tars::TarsOutputStream<tars::BufferWriterVector> _os;
+                _os.write(_ret, 0);
+
+                current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer());
+            }
+        }
+
+    public:
+        int onDispatch(tars::TarsCurrentPtr _current, vector<char> &_sResponseBuffer)
+        {
+            static ::std::string __TestApp__Hello_all[]=
+            {
+                "test"
+            };
+
+            pair<string*, string*> r = equal_range(__TestApp__Hello_all, __TestApp__Hello_all+1, _current->getFuncName());
+            if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
+            switch(r.first - __TestApp__Hello_all)
+            {
+                case 0:
+                {
+                    tars::TarsInputStream<tars::BufferReader> _is;
+                    _is.setBuffer(_current->getRequestBuffer());
+                    if (_current->getRequestVersion() == TUPVERSION)
+                    {
+                        UniAttribute<tars::BufferWriterVector, tars::BufferReader>  tarsAttr;
+                        tarsAttr.setVersion(_current->getRequestVersion());
+                        tarsAttr.decode(_current->getRequestBuffer());
+                    }
+                    else if (_current->getRequestVersion() == JSONVERSION)
+                    {
+                        tars::JsonValueObjPtr _jsonPtr = tars::JsonValueObjPtr::dynamicCast(tars::TC_Json::getValue(_current->getRequestBuffer()));
+                    }
+                    else
+                    {
+                    }
+                    tars::Int32 _ret = test(_current);
+                    if(_current->isResponse())
+                    {
+                        if (_current->getRequestVersion() == TUPVERSION)
+                        {
+                            UniAttribute<tars::BufferWriterVector, tars::BufferReader>  tarsAttr;
+                            tarsAttr.setVersion(_current->getRequestVersion());
+                            tarsAttr.put("", _ret);
+                            tarsAttr.put("tars_ret", _ret);
+                            tarsAttr.encode(_sResponseBuffer);
+                        }
+                        else if (_current->getRequestVersion() == JSONVERSION)
+                        {
+                            tars::JsonValueObjPtr _p = new tars::JsonValueObj();
+                            _p->value["tars_ret"] = tars::JsonOutput::writeJson(_ret);
+                            tars::TC_Json::writeValue(_p, _sResponseBuffer);
+                        }
+                        else
+                        {
+                            tars::TarsOutputStream<tars::BufferWriterVector> _os;
+                            _os.write(_ret, 0);
+                            _os.swap(_sResponseBuffer);
+                        }
+                    }
+                    return tars::TARSSERVERSUCCESS;
+
+                }
+            }
+            return tars::TARSSERVERNOFUNCERR;
+        }
+    };
+
+
+}
+
+
+
+#endif

+ 8 - 0
examples/QuickStartDemo/MultiCommunicator/Hello.tars

@@ -0,0 +1,8 @@
+module TestApp
+{
+
+    interface Hello
+    {
+        int test();
+    };
+};

+ 80 - 0
examples/QuickStartDemo/MultiCommunicator/main.cpp

@@ -0,0 +1,80 @@
+#include <iostream>
+#include "servant/Communicator.h"
+#include "servant/Application.h"
+#include "Hello.h"
+using namespace std;
+using namespace TestApp;
+using namespace tars;
+
+
+class Demo : public TC_Singleton<Demo>, public TC_ThreadLock{
+    private:
+        TestApp::HelloPrx _hello1Prx;
+        TestApp::HelloPrx _hello2Prx;
+        CommunicatorPtr _comm1 = CommunicatorFactory::getInstance()->getCommunicator("comm1");
+        CommunicatorPtr _comm2 = CommunicatorFactory::getInstance()->getCommunicator("comm2");
+    public:
+        Demo(){
+            _comm1->setProperty("locator", "taf.tafregistry.QueryObj@tcp -h 123.123.123.123 -p 8080 -t 60000");
+            _comm2->setProperty("locator", "taf.tafregistry.QueryObj@tcp -h 123.123.123.123 -p 8080 -t 60000");
+            _hello1Prx = _comm1->stringToProxy<TestApp::HelloPrx>("Hello.HelloServer.HelloObj");
+            _hello2Prx = _comm2->stringToProxy<TestApp::HelloPrx>("Hello.HelloServer.HelloObj");
+        }
+        void sendOneWay1(){
+            try{
+                _hello1Prx->async_test(NULL);
+                cout<<"sent one one way request in thread:" << std::this_thread::get_id() << "\n" << endl;
+            }catch(...){
+                cout<<"ent one one way request in thread:" << std::this_thread::get_id() << " error" << "\n" << endl;
+            }
+        }
+        void sendOneWay2(){
+            try{
+                _hello2Prx->async_test(NULL);
+                cout<<"sent one one way request in thread:" << std::this_thread::get_id() << "\n" << endl;
+            }catch(...){
+                cout<<"ent one one way request in thread:" << std::this_thread::get_id() << " error" << "\n" << endl;
+            }
+        }
+};
+
+#define DemoMgr (Demo::getInstance())
+
+void callByMixObjSingleton(){
+    while (true)
+    {
+        DemoMgr->sendOneWay1();
+        DemoMgr->sendOneWay2();
+        usleep(10000);
+    }
+};
+//void callByMixObj(){
+//    Demo demo;
+//    while (true)
+//    {
+//        demo.sendOneWay1();
+//        demo.sendOneWay2();
+//        usleep(10000);
+//    }
+//};
+int main(){
+    try
+    {
+        std::thread t1(callByMixObjSingleton);
+//        std::thread t2(callByMixObjSingleton);
+        // std::thread t1(callByMixObj);
+        // std::thread t2(callByMixObj);
+        t1.join();
+//        t2.join();
+    }
+    catch(exception& e)
+    {
+        cerr << "exception:" << e.what() << endl;
+    }
+    catch (...)
+    {
+        cerr << "unknown exception." << endl;
+    }
+
+    return 0;
+}

+ 3 - 1
servant/libservant/CommunicatorEpoll.cpp

@@ -491,7 +491,9 @@ void CommunicatorEpoll::run()
 {
     ServantProxyThreadData * pSptd = ServantProxyThreadData::getData();
     assert(pSptd != NULL);
-    pSptd->_netThreadSeq = (int)_netThreadSeq;
+
+	pSptd->_communicatorEpollInfo[this->getCommunicator()]._netThreadSeq = (int)_netThreadSeq;
+//    pSptd->_netThreadSeq = (int)_netThreadSeq;
 
     while (!_terminate)
     {

+ 55 - 38
servant/libservant/ServantProxy.cpp

@@ -98,10 +98,10 @@ void SeqManager::del(uint16_t iSeq)
 
 ///////////////////////////////////////////////////////////////
 ServantProxyThreadData::ServantProxyThreadData()
-: _queueInit(false)
-, _reqQNo(0)
-, _netSeq(0)
-, _netThreadSeq(-1)
+//: _queueInit(false)
+: _reqQNo(0)
+//, _netSeq(0)
+//, _netThreadSeq(-1)
 , _hash(false)
 , _conHash(false)
 , _hashCode(-1)
@@ -109,7 +109,7 @@ ServantProxyThreadData::ServantProxyThreadData()
 , _hasTimeout(false)
 , _timeout(0)
 , _sched(NULL)
-, _objectProxyNum(0)
+//, _objectProxyNum(0)
 {
 }
 
@@ -117,24 +117,26 @@ ServantProxyThreadData::~ServantProxyThreadData()
 {
     try
     {
-        if(_queueInit)
-        {
-            for(size_t i=0;i<_objectProxyNum;++i)
-            {
-                if(_objectProxyOwn.get()[i])
-                {
-                    ReqMessage * msg = new ReqMessage();
-                    msg->eType = ReqMessage::THREAD_EXIT;
+//        if(_queueInit)
+//        {
+        	for(auto it = _communicatorEpollInfo.begin(); it != _communicatorEpollInfo.end(); ++it) {
+		        for (size_t i = 0; i < it->second._objectProxyNum; ++i) {
+			        if (it->second._objectProxyOwn.get()[i]) {
+				        ReqMessage *msg = new ReqMessage();
+				        msg->eType = ReqMessage::THREAD_EXIT;
 
-                    bool bEmpty = false;
-                    _reqQueue[i]->push_back(msg, bEmpty);
+				        bool bEmpty = false;
+				        it->second._reqQueue[i]->push_back(msg, bEmpty);
 
-                    _objectProxyOwn.get()[i]->getCommunicatorEpoll()->notifyDel(_reqQNo);
-                }
-            }
-            _queueInit = false;
+				        it->second._objectProxyOwn.get()[i]->getCommunicatorEpoll()->notifyDel(_reqQNo);
+			        }
+		        }
+//		        _queueInit = false;
+	        }
 
-        }
+	    _communicatorEpollInfo.clear();
+
+//        }
 
         _pSeq->del(_reqQNo);
     }
@@ -647,6 +649,8 @@ void ServantProxy::invoke(ReqMessage * msg, bool bCoroAsync)
     //选择网络线程
     selectNetThreadInfo(pSptd, pObjProxy, pReqQ);
 
+    assert(pReqQ != NULL);
+
     //调用发起时间
     msg->iBeginTime   = TNOWMS;
     msg->pObjectProxy = pObjProxy;
@@ -718,7 +722,7 @@ void ServantProxy::invoke(ReqMessage * msg, bool bCoroAsync)
 
     if (!pReqQ->push_back(msg, bEmpty))
     {
-        TLOGERROR("[ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl);
+        TLOGERROR("[ServantProxy::invoke msgQueue push_back error]" << endl);
 
         delete msg;
         msg = NULL;
@@ -1182,44 +1186,57 @@ void ServantProxy::http_call_async(const string &funcName, shared_ptr<TC_HttpReq
 //选取一个网络线程对应的信息
 void ServantProxy::selectNetThreadInfo(ServantProxyThreadData *pSptd, ObjectProxy *&pObjProxy, ReqInfoQueue *&pReqQ)
 {
+	ServantProxyThreadData::CommunicatorEpollInfo &communicatorEpollInfo = pSptd->_communicatorEpollInfo[this->tars_communicator()];
+
     //指针为空 就new一个
-    if (!pSptd->_queueInit)
+    if (!communicatorEpollInfo._queueInit)
     {
         for (size_t i = 0; i < _objectProxyNum; ++i)
         {
-            pSptd->_reqQueue[i] = new ReqInfoQueue(_objectProxy[0]->getCommunicatorEpoll()->getNoSendQueueLimit());
+//            pSptd->_reqQueue[i] = new ReqInfoQueue(_objectProxy[0]->getCommunicatorEpoll()->getNoSendQueueLimit());
+	        communicatorEpollInfo._reqQueue[i] = new ReqInfoQueue(_objectProxy[0]->getCommunicatorEpoll()->getNoSendQueueLimit());
         }
-        pSptd->_objectProxyNum = _objectProxyNum;
-        pSptd->_objectProxyOwn = _objectProxyOwn;
-        pSptd->_queueInit      = true;
+	    communicatorEpollInfo._objectProxyNum = _objectProxyNum;
+	    communicatorEpollInfo._objectProxyOwn = _objectProxyOwn;
+	    communicatorEpollInfo._queueInit      = true;
     }
 
-    if (_objectProxyNum == 1)
+	if (_objectProxyNum == 1)
     {
         pObjProxy = *_objectProxy;
-        pReqQ     = pSptd->_reqQueue[0];
+//        pReqQ     = pSptd->_reqQueue[0];
+	    pReqQ     = communicatorEpollInfo._reqQueue[0];
     }
     else
     {
-        if (pSptd->_netThreadSeq >= 0)
+//        if (pSptd->_netThreadSeq >= 0)
+	    if (communicatorEpollInfo._netThreadSeq >= 0)
         {
             //网络线程发起的请求
-            assert(pSptd->_netThreadSeq < static_cast<int>(_objectProxyNum));
+//            assert(pSptd->_netThreadSeq < static_cast<int>(_objectProxyNum));
+	        assert(communicatorEpollInfo._netThreadSeq < static_cast<int>(_objectProxyNum));
 
-            pObjProxy = *(_objectProxy + pSptd->_netThreadSeq);
-            pReqQ     = pSptd->_reqQueue[pSptd->_netThreadSeq];
+//	        pObjProxy = *(_objectProxy + pSptd->_netThreadSeq);
+		    pObjProxy = *(_objectProxy + communicatorEpollInfo._netThreadSeq);
+//		    pReqQ     = pSptd->_reqQueue[pSptd->_netThreadSeq];
+		    pReqQ     = communicatorEpollInfo._reqQueue[communicatorEpollInfo._netThreadSeq];
         }
         else
         {
             //用线程的私有数据来保存选到的seq
-            pObjProxy = *(_objectProxy + pSptd->_netSeq);
-            pReqQ     = pSptd->_reqQueue[pSptd->_netSeq];
-            pSptd->_netSeq++;
-
-            if (pSptd->_netSeq == _objectProxyNum)
-                pSptd->_netSeq = 0;
+//            pObjProxy = *(_objectProxy + pSptd->_netSeq);
+	        pObjProxy = *(_objectProxy + communicatorEpollInfo._netSeq);
+//	        pReqQ     = pSptd->_reqQueue[pSptd->_netSeq];
+		    pReqQ     = communicatorEpollInfo._reqQueue[communicatorEpollInfo._netSeq];
+//		    pSptd->_netSeq++;
+		    communicatorEpollInfo._netSeq++;
+
+            if (communicatorEpollInfo._netSeq == _objectProxyNum)
+	            communicatorEpollInfo._netSeq = 0;
         }
     }
+
+    assert(pReqQ != NULL);
 }
 
 void ServantProxy::checkDye(RequestPacket& req)

+ 41 - 13
servant/servant/ServantProxy.h

@@ -105,15 +105,43 @@ public:
      */
 	static void reset();
 
+//	, _reqQNo(0)
+//	, _netSeq(0)
+//	, _netThreadSeq(-1)
+
+	//每发起调用的线程 记录的 针对某通信器的 数据,
+	struct CommunicatorEpollInfo
+	{
+		bool           _queueInit = false;                       //是否初始化
+
+		/*
+		 * 每个线程跟客户端网络线程通信的队列
+		 */
+		ReqInfoQueue * _reqQueue[MAX_CLIENT_THREAD_NUM]; //队列数组
+		size_t         _netSeq = 0;                          //轮训选择网络线程的偏移量
+		int            _netThreadSeq = -1;                    //网络线程发起的请求回到自己的网络线程来处理,其值为网络线程的id
+
+		/**
+		 * ObjectProxy
+		 */
+		size_t         _objectProxyNum = 0;                  //ObjectProxy对象的个数,其个数由客户端的网络线程数决定,每个网络线程有一个ObjectProxy
+
+		/**
+		 *  objectProxy Pointer
+		 */
+		shared_ptr<ObjectProxy *> _objectProxyOwn;                    //保存ObjectProxy对象的指针数组
+	};
 public:
+
+	unordered_map<Communicator*, CommunicatorEpollInfo>    _communicatorEpollInfo;
     /*
      * 每个线程跟客户端网络线程通信的队列
      */
-    ReqInfoQueue * _reqQueue[MAX_CLIENT_THREAD_NUM]; //队列数组
-    bool           _queueInit;                       //是否初始化
+//    ReqInfoQueue * _reqQueue[MAX_CLIENT_THREAD_NUM]; //队列数组
+//    bool           _queueInit;                       //是否初始化
     uint16_t       _reqQNo;                          //请求事件通知的seq
-    size_t         _netSeq;                          //轮训选择网络线程的偏移量
-    int            _netThreadSeq;                     //网络线程发起的请求回到自己的网络线程来处理,其值为网络线程的id
+//    size_t         _netSeq;                          //轮训选择网络线程的偏移量
+//    int            _netThreadSeq;                    //网络线程发起的请求回到自己的网络线程来处理,其值为网络线程的id
 
     /**
      * hash属性,客户端每次调用都进行设置
@@ -144,15 +172,15 @@ public:
      */
     CoroutineScheduler*        _sched;                   //协程调度器
 
-    /**
-     * ObjectProxy
-     */
-    size_t         _objectProxyNum;                  //ObjectProxy对象的个数,其个数由客户端的网络线程数决定,每个网络线程有一个ObjectProxy
-
-    /**
-     *  objectProxy Pointer
-     */
-    shared_ptr<ObjectProxy *> _objectProxyOwn;                    //保存ObjectProxy对象的指针数组
+//    /**
+//     * ObjectProxy
+//     */
+//    size_t         _objectProxyNum;                  //ObjectProxy对象的个数,其个数由客户端的网络线程数决定,每个网络线程有一个ObjectProxy
+//
+//    /**
+//     *  objectProxy Pointer
+//     */
+//    shared_ptr<ObjectProxy *> _objectProxyOwn;                    //保存ObjectProxy对象的指针数组
 #ifdef TARS_OPENTRACKING
     std::unordered_map<std::string, std::string> _trackInfoMap;
 #endif

File diff suppressed because it is too large
+ 132 - 132
tools/tarsgrammar/tars.tab.cpp


+ 18 - 18
tools/tarsparse/tars.lex.cpp

@@ -513,7 +513,7 @@ int yy_flex_debug = 0;
 #define YY_MORE_ADJ 0
 #define YY_RESTORE_YY_MORE_OFFSET
 char *yytext;
-#line 1 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 1 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 /**
  * Tencent is pleased to support the open source community by making Tars available.
  *
@@ -529,7 +529,7 @@ char *yytext;
  * CONDITIONS OF ANY KIND, either express or implied. See the License for the 
  * specific language governing permissions and limitations under the License.
  */
-#line 20 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 20 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 #include <map>
 #include <string>
 #include <sstream>
@@ -742,7 +742,7 @@ YY_DECL
 	register char *yy_cp, *yy_bp;
 	register int yy_act;
     
-#line 67 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 67 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 
 
 #line 749 "tars.lex.cpp"
@@ -840,12 +840,12 @@ do_action:	/* This label is used only to access EOF actions. */
 
 case 1:
 YY_RULE_SETUP
-#line 69 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 69 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 { BEGIN(INCL); }
 	YY_BREAK
 case 2:
 YY_RULE_SETUP
-#line 71 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 71 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     if ( include_file_stack_ptr >= MAX_INCLUDE_DEPTH )
     {
@@ -878,7 +878,7 @@ YY_RULE_SETUP
 	YY_BREAK
 case YY_STATE_EOF(INITIAL):
 case YY_STATE_EOF(INCL):
-#line 101 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 101 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     --include_file_stack_ptr;
     if ( include_file_stack_ptr < 0 )
@@ -897,14 +897,14 @@ case YY_STATE_EOF(INCL):
 	YY_BREAK
 case 3:
 YY_RULE_SETUP
-#line 117 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 117 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     return TARS_SCOPE_DELIMITER;
 }
 	YY_BREAK
 case 4:
 YY_RULE_SETUP
-#line 121 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 121 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     // C++ comment
     bool e = false;
@@ -925,7 +925,7 @@ YY_RULE_SETUP
 	YY_BREAK
 case 5:
 YY_RULE_SETUP
-#line 139 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 139 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     // C comment
     bool e = false;
@@ -976,7 +976,7 @@ YY_RULE_SETUP
 	YY_BREAK
 case 6:
 YY_RULE_SETUP
-#line 187 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 187 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     StringGrammarPtr ident  = new StringGrammar;
     ident->v            = yytext;
@@ -987,7 +987,7 @@ YY_RULE_SETUP
 case 7:
 /* rule 7 can match eol */
 YY_RULE_SETUP
-#line 194 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 194 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     StringGrammarPtr ident  = new StringGrammar;
     ident->v            = yytext;
@@ -1000,7 +1000,7 @@ YY_RULE_SETUP
 	YY_BREAK
 case 8:
 YY_RULE_SETUP
-#line 204 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 204 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     StringGrammarPtr str = new StringGrammar;
     bool e = false;
@@ -1115,7 +1115,7 @@ YY_RULE_SETUP
 	YY_BREAK
 case 9:
 YY_RULE_SETUP
-#line 316 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 316 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     errno = 0;
     IntergerGrammarPtr ptr = new IntergerGrammar;
@@ -1140,7 +1140,7 @@ YY_RULE_SETUP
 	YY_BREAK
 case 10:
 YY_RULE_SETUP
-#line 338 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 338 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     errno = 0;
     FloatGrammarPtr ptr = new FloatGrammar;
@@ -1175,7 +1175,7 @@ YY_RULE_SETUP
 case 11:
 /* rule 11 can match eol */
 YY_RULE_SETUP
-#line 369 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 369 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     if(yytext[0] == '\n')
     {
@@ -1185,7 +1185,7 @@ YY_RULE_SETUP
 	YY_BREAK
 case 12:
 YY_RULE_SETUP
-#line 376 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 376 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 {
     if(yytext[0] < 32 || yytext[0] > 126)
     {
@@ -1204,7 +1204,7 @@ YY_RULE_SETUP
 	YY_BREAK
 case 13:
 YY_RULE_SETUP
-#line 392 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 392 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 ECHO;
 	YY_BREAK
 #line 1211 "tars.lex.cpp"
@@ -2214,7 +2214,7 @@ void yyfree (void * ptr )
 
 #define YYTABLES_NAME "yytables"
 
-#line 392 "/Volumes/MyData/centos/Tars/framework/tarscpp/tools/tarsgrammar/tars.l"
+#line 392 "/Volumes/MyData/centos/Tars/cpp/tools/tarsgrammar/tars.l"
 
 
 

File diff suppressed because it is too large
+ 132 - 132
tools/tarsparse/tars.tab.cpp


Some files were not shown because too many files changed in this diff