1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018 |
- /**
- * Tencent is pleased to support the open source community by making Tars available.
- *
- * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
- *
- * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * https://opensource.org/licenses/BSD-3-Clause
- *
- * Unless required by applicable law or agreed to in writing, software distributed
- * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
- #include "servant/CoroutineScheduler.h"
- #if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS
- #include <fcntl.h>
- #include <signal.h>
- #include <sys/mman.h>
- #include <sys/resource.h>
- #include <sys/stat.h>
- #include <sys/time.h>
- #include <sys/types.h>
- #include <unistd.h>
- #endif
- #include <algorithm>
- #include <cmath>
- #include <cstring>
- #include <stdexcept>
- #include <assert.h>
- #include "util/tc_timeprovider.h"
- #include "servant/RemoteLogger.h"
- #include "servant/ServantHandle.h"
- namespace tars
- {
- #if TARGET_PLATFORM_WINDOWS
- // x86_64
- // test x86_64 before i386 because icc might
- // define __i686__ for x86_64 too
- #if defined(__x86_64__) || defined(__x86_64) \
- || defined(__amd64__) || defined(__amd64) \
- || defined(_M_X64) || defined(_M_AMD64)
- // Windows seams not to provide a constant or function
- // telling the minimal stacksize
- # define MIN_STACKSIZE 8 * 1024
- #else
- # define MIN_STACKSIZE 4 * 1024
- #endif
- void system_info_( SYSTEM_INFO * si) {
- ::GetSystemInfo( si);
- }
- SYSTEM_INFO system_info() {
- static SYSTEM_INFO si;
- static std::once_flag flag;
- std::call_once( flag, static_cast< void(*)( SYSTEM_INFO *) >( system_info_), & si);
- return si;
- }
- std::size_t pagesize() {
- return static_cast< std::size_t >( system_info().dwPageSize);
- }
- // Windows seams not to provide a limit for the stacksize
- // libcoco uses 32k+4k bytes as minimum
- bool stack_traits::is_unbounded() {
- return true;
- }
- std::size_t stack_traits::page_size() {
- return pagesize();
- }
- std::size_t stack_traits::default_size() {
- return 128 * 1024;
- }
- // because Windows seams not to provide a limit for minimum stacksize
- std::size_t stack_traits::minimum_size() {
- return MIN_STACKSIZE;
- }
- // because Windows seams not to provide a limit for maximum stacksize
- // maximum_size() can never be called (pre-condition ! is_unbounded() )
- std::size_t stack_traits::maximum_size() {
- assert( ! is_unbounded() );
- return 1 * 1024 * 1024 * 1024; // 1GB
- }
- stack_context stack_traits::allocate(size_t size_) {
- // calculate how many pages are required
- const std::size_t pages(static_cast< std::size_t >( std::ceil( static_cast< float >( size_) / stack_traits::page_size() ) ) );
- // add one page at bottom that will be used as guard-page
- const std::size_t size__ = ( pages + 1) * stack_traits::page_size();
- void * vp = ::VirtualAlloc( 0, size__, MEM_COMMIT, PAGE_READWRITE);
- if ( ! vp) throw std::bad_alloc();
- DWORD old_options;
- const BOOL result = ::VirtualProtect(
- vp, stack_traits::page_size(), PAGE_READWRITE | PAGE_GUARD /*PAGE_NOACCESS*/, & old_options);
- assert( FALSE != result);
- stack_context sctx;
- sctx.size = size__;
- sctx.sp = static_cast< char * >( vp) + sctx.size;
- return sctx;
- }
- void stack_traits::deallocate( stack_context & sctx) {
- assert( sctx.sp);
- void * vp = static_cast< char * >( sctx.sp) - sctx.size;
- ::VirtualFree( vp, 0, MEM_RELEASE);
- }
- #else
- // 128kb recommended stack size
- // # define MINSIGSTKSZ (131072)
- void pagesize_( std::size_t * size) {
- // conform to POSIX.1-2001
- * size = ::sysconf( _SC_PAGESIZE);
- }
- void stacksize_limit_( rlimit * limit) {
- // conforming to POSIX.1-2001
- ::getrlimit( RLIMIT_STACK, limit);
- }
- std::size_t pagesize() {
- static std::size_t size = 0;
- static std::once_flag flag;
- std::call_once( flag, pagesize_, & size);
- return size;
- }
- rlimit stacksize_limit() {
- static rlimit limit;
- static std::once_flag flag;
- std::call_once( flag, stacksize_limit_, & limit);
- return limit;
- }
- bool stack_traits::is_unbounded() {
- return RLIM_INFINITY == stacksize_limit().rlim_max;
- }
- std::size_t stack_traits::page_size() {
- return pagesize();
- }
- std::size_t stack_traits::default_size() {
- return 128 * 1024;
- }
- std::size_t stack_traits::minimum_size() {
- return MINSIGSTKSZ;
- }
- std::size_t stack_traits::maximum_size() {
- assert( ! is_unbounded() );
- return static_cast< std::size_t >( stacksize_limit().rlim_max);
- }
- stack_context stack_traits::allocate(std::size_t size_) {
- // calculate how many pages are required
- const std::size_t pages(static_cast< std::size_t >( std::ceil( static_cast< float >( size_) / stack_traits::page_size() ) ) );
- // add one page at bottom that will be used as guard-page
- const std::size_t size__ = ( pages + 1) * stack_traits::page_size();
- // conform to POSIX.4 (POSIX.1b-1993, _POSIX_C_SOURCE=199309L)
- #if defined(MAP_ANON)
- void * vp = ::mmap( 0, size__, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
- #else
- void * vp = ::mmap( 0, size__, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
- #endif
- if ( MAP_FAILED == vp) throw std::bad_alloc();
- // conforming to POSIX.1-2001
- const int result( ::mprotect( vp, stack_traits::page_size(), PROT_NONE) );
- assert( 0 == result);
- stack_context sctx;
- sctx.size = size__;
- sctx.sp = static_cast< char * >( vp) + sctx.size;
- return sctx;
- }
- void stack_traits::deallocate(stack_context & sctx) {
- assert( sctx.sp);
- void * vp = static_cast< char * >( sctx.sp) - sctx.size;
- // conform to POSIX.4 (POSIX.1b-1993, _POSIX_C_SOURCE=199309L)
- ::munmap( vp, sctx.size);
- }
- #endif
- ////////////////////////////////////////////////////////
- CoroutineInfo::CoroutineInfo()
- : _prev(NULL)
- , _next(NULL)
- // , _main(true)
- , _scheduler(NULL)
- , _uid(0)
- , _eStatus(CORO_FREE)
- // , _ctx_to(NULL)
- {
- }
- CoroutineInfo::CoroutineInfo(CoroutineScheduler* scheduler)
- : _prev(NULL)
- , _next(NULL)
- // , _main(false)
- , _scheduler(scheduler)
- , _uid(0)
- , _eStatus(CORO_FREE)
- // , _ctx_to(NULL)
- {
- }
- CoroutineInfo::CoroutineInfo(CoroutineScheduler* scheduler, uint32_t iUid, stack_context stack_ctx)
- : _prev(NULL)
- , _next(NULL)
- // , _main(false)
- , _scheduler(scheduler)
- , _uid(iUid)
- , _eStatus(CORO_FREE)
- , _stack_ctx(stack_ctx)
- // , _ctx_to(NULL)
- {
- }
- CoroutineInfo::~CoroutineInfo()
- {
- }
- void CoroutineInfo::registerFunc(const std::function<void ()>& callback)
- {
- _callback = callback;
- _init_func.coroFunc = CoroutineInfo::corotineProc;
- _init_func.args = this;
- // _ctx_to = make_fcontext(_stack_ctx.sp, _stack_ctx.size, CoroutineInfo::corotineEntry);
- // jump_fcontext(&_ctx_from, _ctx_to, (intptr_t)this, false);
- fcontext_t ctx = make_fcontext(_stack_ctx.sp, _stack_ctx.size, CoroutineInfo::corotineEntry);
- transfer_t tf = jump_fcontext(ctx, this);
- //实际的ctx
- this->setCtx(tf.fctx);
- }
- void CoroutineInfo::setStackContext(stack_context stack_ctx)
- {
- _stack_ctx = stack_ctx;
- }
- void CoroutineInfo::corotineEntry(transfer_t tf)
- {
- CoroutineInfo * coro = static_cast< CoroutineInfo * >(tf.data);
- Func func = coro->_init_func.coroFunc;
- void* args = coro->_init_func.args;
- // jump_fcontext(coro->_ctx_to, &(coro->_ctx_from), 0, false);
- transfer_t t = jump_fcontext(tf.fctx, NULL);
- coro->_scheduler->setMainCtx(t.fctx);
- try
- {
- func(args, t);
- }
- catch(std::exception &ex)
- {
- TLOGERROR("[TARS][CoroutineInfo::corotineEntry exception:" << ex.what() << endl);
- }
- catch(...)
- {
- TLOGERROR("[TARS][CoroutineInfo::corotineEntry unknown exception." << endl);
- }
- }
- void CoroutineInfo::corotineProc(void * args, transfer_t t)
- {
- CoroutineInfo *coro = (CoroutineInfo*)args;
- try
- {
- std::function<void ()> cb = coro->_callback;
- cb();
- }
- catch(std::exception &ex)
- {
- TLOGERROR("[TARS][CoroutineInfo::corotineProc exception:" << ex.what() << endl);
- }
- catch(...)
- {
- TLOGERROR("[TARS][CoroutineInfo::corotineProc unknown exception." << endl);
- }
- CoroutineScheduler* scheduler = coro->getScheduler();
- scheduler->decUsedSize();
- scheduler->moveToFreeList(coro);
- scheduler->switchCoro(&(scheduler->getMainCoroutine()));
- // scheduler->switchCoro(coro, &(scheduler->getMainCoroutine()));
- TLOGERROR("[TARS][CoroutineInfo::corotineProc no come." << endl);
- }
- //////////////////////////////////////////////////////////////
- CoroutineScheduler::CoroutineScheduler()
- : _terminal(false)
- , _poolSize(1000)
- , _stackSize(128*1024)
- , _currentSize(0)
- , _usedSize(0)
- , _uniqId(0)
- , _handle(NULL)
- , _currentCoro(NULL)
- , _all_coro(NULL)
- {
- _all_coro = new CoroutineInfo*[_poolSize+1];
- for(size_t i = 0; i <= _poolSize; ++i)
- {
- _all_coro[i] = NULL;
- }
- CoroutineInfo::CoroutineHeadInit(&_active);
- CoroutineInfo::CoroutineHeadInit(&_avail);
- CoroutineInfo::CoroutineHeadInit(&_inactive);
- CoroutineInfo::CoroutineHeadInit(&_timeout);
- CoroutineInfo::CoroutineHeadInit(&_free);
- }
- CoroutineScheduler::~CoroutineScheduler()
- {}
- void CoroutineScheduler::init(uint32_t iPoolSize, size_t iStackSize)
- {
- if(iPoolSize <= 0)
- {
- TLOGERROR("[TARS][[CoroutineScheduler::init iPoolSize <= 0." << endl);
- return ;
- }
- _terminal = false;
- _poolSize = iPoolSize;
- _stackSize = iStackSize;
- if(_poolSize <= 100)
- {
- _currentSize = _poolSize;
- }
- else
- {
- _currentSize = 100;
- }
- if(_all_coro != NULL)
- {
- delete [] _all_coro;
- _all_coro = new CoroutineInfo*[_poolSize+1];
- for(size_t i = 0; i <= _poolSize; ++i)
- {
- _all_coro[i] = NULL;
- }
- }
- _usedSize = 0;
- _uniqId = 0;
- int iSucc = 0;
- for(size_t i = 0; i < _currentSize; ++i)
- {
- CoroutineInfo *coro = new CoroutineInfo(this);
- // stack_context s_ctx;
- // int ret = _alloc.allocate(s_ctx, iStackSize);
- // if(ret != 0)
- // {
- // TLOGERROR("[TARS][CoroutineScheduler::init iPoolSize:" << iPoolSize << "|iStackSize:" << iStackSize << "|i:" << i << endl);
- // delete coro;
- // coro = NULL;
- // break;
- // }
- stack_context s_ctx = stack_traits::allocate(iStackSize);
- coro->setStackContext(s_ctx);
- uint32_t iId = generateId();
- coro->setUid(iId);
- coro->setStatus(CORO_FREE);
- //_free.push_front(coro);
- //_all.insert(make_pair(coro->getUid(), coro));
- _all_coro[iId] = coro;
- CoroutineInfo::CoroutineAddTail(coro, &_free);
- ++iSucc;
- }
- _currentSize = iSucc;
- _mainCoro.setUid(0);
- _mainCoro.setStatus(CORO_FREE);
- _currentCoro = &_mainCoro;
- TLOGDEBUG("[TARS][CoroutineScheduler::init iPoolSize:" << _poolSize << "|iCurrentSize:" << _currentSize << "|iStackSize:" << _stackSize << endl);
- }
- int CoroutineScheduler::increaseCoroPoolSize()
- {
- int iInc = ((_poolSize - _currentSize) > 100) ? 100 : (_poolSize - _currentSize);
- if(iInc <= 0)
- {
- TLOGERROR("[TARS][CoroutineScheduler::increaseCoroPoolSize full iPoolSize:" << _poolSize << "|iCurrentSize:" << _currentSize << endl);
- return -1;
- }
- int iSucc = 0;
- for(int i = 0; i < iInc; ++i)
- {
- CoroutineInfo *coro = new CoroutineInfo(this);
- uint32_t iId = generateId();
- coro->setUid(iId);
- coro->setStatus(CORO_FREE);
- // stack_context s_ctx;
- // int ret = _alloc.allocate(s_ctx, _stackSize);
- // if(ret != 0)
- // {
- // TLOGERROR("[TARS][CoroutineScheduler::increaseCoroPoolSize iPoolSize:" << _poolSize << "|iStackSize:" << _stackSize << "|i:" << i << endl);
- // delete coro;
- // coro = NULL;
- // break;
- // }
- stack_context s_ctx = stack_traits::allocate(_stackSize);
- coro->setStackContext(s_ctx);
- _all_coro[iId] = coro;
- CoroutineInfo::CoroutineAddTail(coro, &_free);
- ++iSucc;
- }
- if(iSucc == 0)
- {
- TLOGERROR("[TARS][CoroutineScheduler::increaseCoroPoolSize cannot create iInc:" << iInc << "|iPoolSize:" << _poolSize << endl);
- return -1;
- }
- _currentSize += iSucc;
- return 0;
- }
- uint32_t CoroutineScheduler::createCoroutine(const std::function<void ()> &callback)
- {
- if(_usedSize >= _currentSize || CoroutineInfo::CoroutineHeadEmpty(&_free))
- {
- int iRet = increaseCoroPoolSize();
- if(iRet != 0)
- return 0;
- }
- CoroutineInfo *coro = _free._next;
- assert(coro != NULL);
- CoroutineInfo::CoroutineDel(coro);
- _usedSize++;
- coro->setStatus(CORO_AVAIL);
- CoroutineInfo::CoroutineAddTail(coro, &_avail);
- coro->registerFunc(callback);
- return coro->getUid();
- }
- void CoroutineScheduler::run()
- {
- while(!_terminal)
- {
- if(CoroutineInfo::CoroutineHeadEmpty(&_avail) && CoroutineInfo::CoroutineHeadEmpty(&_active))
- {
- TC_ThreadLock::Lock lock(_monitor);
- if(_activeCoroQueue.size() <= 0)
- {
- _monitor.timedWait(1000);
- }
- }
- wakeupbytimeout();
- wakeupbyself();
- wakeup();
- if(!CoroutineInfo::CoroutineHeadEmpty(&_active))
- {
- int iLoop = 100;
- while(iLoop > 0 && !CoroutineInfo::CoroutineHeadEmpty(&_active))
- {
- CoroutineInfo *coro = _active._next;
- assert(coro != NULL);
- // switchCoro(&_mainCoro, coro);
- switchCoro(coro);
- --iLoop;
- }
- }
- if(!CoroutineInfo::CoroutineHeadEmpty(&_avail))
- {
- CoroutineInfo *coro = _avail._next;
- assert(coro != NULL);
- // switchCoro(&_mainCoro, coro);
- switchCoro(coro);
- }
- if(_usedSize == 0)
- break;
- }
- destroy();
- }
- void CoroutineScheduler::tars_run()
- {
- if(!_terminal)
- {
- wakeupbytimeout();
- wakeupbyself();
- wakeup();
- if(!CoroutineInfo::CoroutineHeadEmpty(&_active))
- {
- int iLoop = 100;
- while(iLoop > 0 && !CoroutineInfo::CoroutineHeadEmpty(&_active))
- {
- CoroutineInfo *coro = _active._next;
- assert(coro != NULL);
- // switchCoro(&_mainCoro, coro);
- switchCoro(coro);
- --iLoop;
- }
- }
- if(!CoroutineInfo::CoroutineHeadEmpty(&_avail))
- {
- int iLoop = 100;
- while(iLoop > 0 && !CoroutineInfo::CoroutineHeadEmpty(&_avail))
- {
- CoroutineInfo *coro = _avail._next;
- assert(coro != NULL);
- // switchCoro(&_mainCoro, coro);
- switchCoro(coro);
- --iLoop;
- }
- }
- }
- }
- void CoroutineScheduler::yield(bool bFlag)
- {
- if(bFlag)
- {
- putbyself(_currentCoro->getUid());
- }
- moveToInactive(_currentCoro);
- // switchCoro(_currentCoro, &_mainCoro);
- switchCoro(&_mainCoro);
- }
- void CoroutineScheduler::sleep(int iSleepTime)
- {
- int64_t iNow = TNOWMS;
- int64_t iTimeout = iNow + (iSleepTime >= 0 ? iSleepTime : -iSleepTime);
- _timeoutCoroId.insert(make_pair(iTimeout, _currentCoro->getUid()));
- moveToTimeout(_currentCoro);
- // switchCoro(_currentCoro, &_mainCoro);
- switchCoro(&_mainCoro);
- }
- void CoroutineScheduler::putbyself(uint32_t iCoroId)
- {
- if(!_terminal)
- {
- _needActiveCoroId.push_back(iCoroId);
- }
- }
- void CoroutineScheduler::wakeupbyself()
- {
- if(!_terminal)
- {
- if(_needActiveCoroId.size() > 0)
- {
- list<uint32_t>::iterator it = _needActiveCoroId.begin();
- while(it != _needActiveCoroId.end())
- {
- CoroutineInfo *coro = _all_coro[*it];
- assert(coro != NULL);
- moveToAvail(coro);
- ++it;
- }
- _needActiveCoroId.clear();
- }
- }
- }
- void CoroutineScheduler::put(uint32_t iCoroId)
- {
- if(!_terminal)
- {
- _activeCoroQueue.push_back(iCoroId);
- if(_handle)
- {
- _handle->notifyFilter();
- }
- else
- {
- TC_ThreadLock::Lock lock(_monitor);
- _monitor.notifyAll();
- }
- }
- }
- void CoroutineScheduler::wakeup()
- {
- if(!_terminal)
- {
- if(_activeCoroQueue.size() > 0)
- {
- TC_ThreadQueue<uint32_t, deque<uint32_t> >::queue_type coroIds;
- _activeCoroQueue.swap(coroIds);
- TC_ThreadQueue<uint32_t, deque<uint32_t> >::queue_type::iterator it = coroIds.begin();
- TC_ThreadQueue<uint32_t, deque<uint32_t> >::queue_type::iterator itEnd = coroIds.end();
- while(it != itEnd)
- {
- CoroutineInfo *coro = _all_coro[*it];
- assert(coro != NULL);
- moveToActive(coro);
- ++it;
- }
- }
- }
- }
- void CoroutineScheduler::wakeupbytimeout()
- {
- if(!_terminal)
- {
- if(_timeoutCoroId.size() > 0)
- {
- int64_t iNow = TNOWMS;
- while(true)
- {
- multimap<int64_t, uint32_t>::iterator it = _timeoutCoroId.begin();
- if(it == _timeoutCoroId.end() || it->first > iNow)
- break;
- CoroutineInfo *coro = _all_coro[it->second];
- assert(coro != NULL);
- moveToActive(coro);
- _timeoutCoroId.erase(it);
- }
- }
- }
- }
- void CoroutineScheduler::terminate()
- {
- _terminal = true;
- if(_handle)
- {
- _handle->notifyFilter();
- }
- else
- {
- TC_ThreadLock::Lock lock(_monitor);
- _monitor.notifyAll();
- }
- }
- uint32_t CoroutineScheduler::generateId()
- {
- uint32_t i = ++_uniqId;
- if(i == 0) {
- i = ++_uniqId;
- }
- assert(i <= _poolSize);
- return i;
- }
- // void CoroutineScheduler::switchCoro(CoroutineInfo *from, CoroutineInfo *to)
- void CoroutineScheduler::switchCoro(CoroutineInfo *to)
- {
- _currentCoro = to;
- // jump_fcontext(from->getCtx(), to->getCtx(), 0, false);
- transfer_t t = jump_fcontext(to->getCtx(), NULL);
- to->setCtx(t.fctx);
- }
- void CoroutineScheduler::moveToActive(CoroutineInfo *coro, bool bFlag)
- {
- if(coro->getStatus() == CORO_INACTIVE)
- {
- CoroutineInfo::CoroutineDel(coro);
- coro->setStatus(CORO_ACTIVE);
- CoroutineInfo::CoroutineAddTail(coro, &_active);
- }
- else if(coro->getStatus() == CORO_TIMEOUT)
- {
- CoroutineInfo::CoroutineDel(coro);
- coro->setStatus(CORO_ACTIVE);
- CoroutineInfo::CoroutineAddTail(coro, &_active);
- }
- else
- {
- TLOGERROR("[TARS][CoroutineScheduler::moveToActive ERROR|iCoroId:" << coro->getUid() << "|tyep:" << coro->getStatus() << endl);
- }
- }
- void CoroutineScheduler::moveToAvail(CoroutineInfo *coro)
- {
- if(coro->getStatus() == CORO_INACTIVE)
- {
- CoroutineInfo::CoroutineDel(coro);
- coro->setStatus(CORO_AVAIL);
- CoroutineInfo::CoroutineAddTail(coro, &_avail);
- }
- else
- {
- TLOGERROR("[TARS][CoroutineScheduler::moveToAvail ERROR:|iCoroId:" << coro->getUid() << "|tyep:" << coro->getStatus() << endl);
- }
- }
- void CoroutineScheduler::moveToInactive(CoroutineInfo *coro)
- {
- if(coro->getStatus() == CORO_ACTIVE)
- {
- CoroutineInfo::CoroutineDel(coro);
- coro->setStatus(CORO_INACTIVE);
- CoroutineInfo::CoroutineAddTail(coro, &_inactive);
- }
- else if(coro->getStatus() == CORO_AVAIL)
- {
- CoroutineInfo::CoroutineDel(coro);
- coro->setStatus(CORO_INACTIVE);
- CoroutineInfo::CoroutineAddTail(coro, &_inactive);
- }
- else
- {
- TLOGERROR("[TARS][CoroutineScheduler::moveToInactive ERROR|iCoroId:" << coro->getUid() << "|tyep:" << coro->getStatus() << endl);
- }
- }
- void CoroutineScheduler::moveToTimeout(CoroutineInfo *coro)
- {
- if(coro->getStatus() == CORO_ACTIVE)
- {
- CoroutineInfo::CoroutineDel(coro);
- coro->setStatus(CORO_TIMEOUT);
- CoroutineInfo::CoroutineAddTail(coro, &_timeout);
- }
- else if(coro->getStatus() == CORO_AVAIL)
- {
- CoroutineInfo::CoroutineDel(coro);
- coro->setStatus(CORO_TIMEOUT);
- CoroutineInfo::CoroutineAddTail(coro, &_timeout);
- }
- else
- {
- TLOGERROR("[TARS][CoroutineScheduler::moveToTimeout ERROR|iCoroId:" << coro->getUid() << "|tyep:" << coro->getStatus() << endl);
- }
- }
- void CoroutineScheduler::moveToFreeList(CoroutineInfo *coro)
- {
- if(coro->getStatus() == CORO_ACTIVE)
- {
- CoroutineInfo::CoroutineDel(coro);
- coro->setStatus(CORO_FREE);
- CoroutineInfo::CoroutineAddTail(coro, &_free);
- }
- else if(coro->getStatus() == CORO_AVAIL)
- {
- CoroutineInfo::CoroutineDel(coro);
- coro->setStatus(CORO_FREE);
- CoroutineInfo::CoroutineAddTail(coro, &_free);
- }
- else if(coro->getStatus() == CORO_INACTIVE)
- {
- CoroutineInfo::CoroutineDel(coro);
- coro->setStatus(CORO_FREE);
- CoroutineInfo::CoroutineAddTail(coro, &_free);
- }
- else if(coro->getStatus() == CORO_TIMEOUT)
- {
- CoroutineInfo::CoroutineDel(coro);
- coro->setStatus(CORO_FREE);
- CoroutineInfo::CoroutineAddTail(coro, &_free);
- }
- else
- {
- TLOGERROR("[TARS][CoroutineScheduler::moveToFreeList ERROR: already free|iCoroId:" << coro->getUid() << "|tyep:" << coro->getStatus() << endl);
- }
- }
- void CoroutineScheduler::destroy()
- {
- if(_all_coro)
- {
- for(size_t i = 1; i <= _poolSize; i++)
- {
- if(_all_coro[i])
- {
- stack_traits::deallocate(_all_coro[i]->getStackContext());
- }
- }
- delete [] _all_coro;
- }
- }
- /////////////////////////////////////////////////////////
- Coroutine::Coroutine()
- : _coroSched(NULL)
- , _num(1)
- , _maxNum(128)
- , _stackSize(128*1024)
- {
- }
- Coroutine::~Coroutine()
- {
- if(isAlive())
- {
- terminate();
- getThreadControl().join();
- }
- }
- void Coroutine::setCoroInfo(uint32_t iNum, uint32_t iMaxNum, size_t iStackSize)
- {
- _maxNum = (iMaxNum > 0 ? iMaxNum : 1);
- _num = (iNum > 0 ? (iNum <= _maxNum ? iNum : _maxNum) : 1);
- _stackSize = (iStackSize >= pagesize() ? iStackSize : pagesize());
- }
- void Coroutine::run()
- {
- initialize();
- handleCoro();
- destroy();
- }
- void Coroutine::terminate()
- {
- if(_coroSched)
- {
- _coroSched->terminate();
- }
- }
- void Coroutine::handleCoro()
- {
- _coroSched = new CoroutineScheduler();
- _coroSched->init(_maxNum, _stackSize);
- ServantProxyThreadData * pSptd = ServantProxyThreadData::getData();
- assert(pSptd != NULL);
- pSptd->_sched = _coroSched;
- for(uint32_t i = 0; i < _num; ++i)
- {
- _coroSched->createCoroutine(std::bind(&Coroutine::coroEntry, this));
- }
- _coroSched->run();
- delete _coroSched;
- _coroSched = NULL;
- }
- void Coroutine::coroEntry(Coroutine *pCoro)
- {
- try
- {
- pCoro->handle();
- }
- catch(exception &ex)
- {
- TLOGERROR("[TARS][[Coroutine::coroEntry exception:" << ex.what() << "]" << endl);
- }
- catch(...)
- {
- TLOGERROR("[TARS][[Coroutine::coroEntry unknown exception]" << endl);
- }
- }
- uint32_t Coroutine::createCoroutine(const std::function<void ()> &coroFunc)
- {
- if(_coroSched)
- {
- return _coroSched->createCoroutine(coroFunc);
- }
- else
- {
- TLOGERROR("[TARS][[Coroutine::createCoroutine coro sched no init]" << endl);
- }
- return -1;
- }
- void Coroutine::yield()
- {
- if(_coroSched)
- {
- _coroSched->yield();
- }
- else
- {
- throw CoroutineException("[TARS][[Coroutine::yield coro sched no init]");
- }
- }
- void Coroutine::Sleep(int iSleepTime)
- {
- if(_coroSched)
- {
- _coroSched->sleep(iSleepTime);
- }
- else
- {
- throw CoroutineException("[TARS][[Coroutine::yield coro sched no init]");
- }
- }
- }
|