CoroutineScheduler.cpp 24 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/CoroutineScheduler.h"
  17. #if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS
  18. #include <fcntl.h>
  19. #include <signal.h>
  20. #include <sys/mman.h>
  21. #include <sys/resource.h>
  22. #include <sys/stat.h>
  23. #include <sys/time.h>
  24. #include <sys/types.h>
  25. #include <unistd.h>
  26. #endif
  27. #include <algorithm>
  28. #include <cmath>
  29. #include <cstring>
  30. #include <stdexcept>
  31. #include <assert.h>
  32. #include "util/tc_timeprovider.h"
  33. #include "servant/RemoteLogger.h"
  34. #include "servant/ServantHandle.h"
  35. namespace tars
  36. {
  37. #if TARGET_PLATFORM_WINDOWS
  38. // x86_64
  39. // test x86_64 before i386 because icc might
  40. // define __i686__ for x86_64 too
  41. #if defined(__x86_64__) || defined(__x86_64) \
  42. || defined(__amd64__) || defined(__amd64) \
  43. || defined(_M_X64) || defined(_M_AMD64)
  44. // Windows seams not to provide a constant or function
  45. // telling the minimal stacksize
  46. # define MIN_STACKSIZE 8 * 1024
  47. #else
  48. # define MIN_STACKSIZE 4 * 1024
  49. #endif
  50. void system_info_( SYSTEM_INFO * si) {
  51. ::GetSystemInfo( si);
  52. }
  53. SYSTEM_INFO system_info() {
  54. static SYSTEM_INFO si;
  55. static std::once_flag flag;
  56. std::call_once( flag, static_cast< void(*)( SYSTEM_INFO *) >( system_info_), & si);
  57. return si;
  58. }
  59. std::size_t pagesize() {
  60. return static_cast< std::size_t >( system_info().dwPageSize);
  61. }
  62. // Windows seams not to provide a limit for the stacksize
  63. // libcoco uses 32k+4k bytes as minimum
  64. bool stack_traits::is_unbounded() {
  65. return true;
  66. }
  67. std::size_t stack_traits::page_size() {
  68. return pagesize();
  69. }
  70. std::size_t stack_traits::default_size() {
  71. return 128 * 1024;
  72. }
  73. // because Windows seams not to provide a limit for minimum stacksize
  74. std::size_t stack_traits::minimum_size() {
  75. return MIN_STACKSIZE;
  76. }
  77. // because Windows seams not to provide a limit for maximum stacksize
  78. // maximum_size() can never be called (pre-condition ! is_unbounded() )
  79. std::size_t stack_traits::maximum_size() {
  80. assert( ! is_unbounded() );
  81. return 1 * 1024 * 1024 * 1024; // 1GB
  82. }
  83. stack_context stack_traits::allocate(size_t size_) {
  84. // calculate how many pages are required
  85. const std::size_t pages(static_cast< std::size_t >( std::ceil( static_cast< float >( size_) / stack_traits::page_size() ) ) );
  86. // add one page at bottom that will be used as guard-page
  87. const std::size_t size__ = ( pages + 1) * stack_traits::page_size();
  88. void * vp = ::VirtualAlloc( 0, size__, MEM_COMMIT, PAGE_READWRITE);
  89. if ( ! vp) throw std::bad_alloc();
  90. DWORD old_options;
  91. const BOOL result = ::VirtualProtect(
  92. vp, stack_traits::page_size(), PAGE_READWRITE | PAGE_GUARD /*PAGE_NOACCESS*/, & old_options);
  93. assert( FALSE != result);
  94. stack_context sctx;
  95. sctx.size = size__;
  96. sctx.sp = static_cast< char * >( vp) + sctx.size;
  97. return sctx;
  98. }
  99. void stack_traits::deallocate( stack_context & sctx) {
  100. assert( sctx.sp);
  101. void * vp = static_cast< char * >( sctx.sp) - sctx.size;
  102. ::VirtualFree( vp, 0, MEM_RELEASE);
  103. }
  104. #else
  105. // 128kb recommended stack size
  106. // # define MINSIGSTKSZ (131072)
  107. void pagesize_( std::size_t * size) {
  108. // conform to POSIX.1-2001
  109. * size = ::sysconf( _SC_PAGESIZE);
  110. }
  111. void stacksize_limit_( rlimit * limit) {
  112. // conforming to POSIX.1-2001
  113. ::getrlimit( RLIMIT_STACK, limit);
  114. }
  115. std::size_t pagesize() {
  116. static std::size_t size = 0;
  117. static std::once_flag flag;
  118. std::call_once( flag, pagesize_, & size);
  119. return size;
  120. }
  121. rlimit stacksize_limit() {
  122. static rlimit limit;
  123. static std::once_flag flag;
  124. std::call_once( flag, stacksize_limit_, & limit);
  125. return limit;
  126. }
  127. bool stack_traits::is_unbounded() {
  128. return RLIM_INFINITY == stacksize_limit().rlim_max;
  129. }
  130. std::size_t stack_traits::page_size() {
  131. return pagesize();
  132. }
  133. std::size_t stack_traits::default_size() {
  134. return 128 * 1024;
  135. }
  136. std::size_t stack_traits::minimum_size() {
  137. return MINSIGSTKSZ;
  138. }
  139. std::size_t stack_traits::maximum_size() {
  140. assert( ! is_unbounded() );
  141. return static_cast< std::size_t >( stacksize_limit().rlim_max);
  142. }
  143. stack_context stack_traits::allocate(std::size_t size_) {
  144. // calculate how many pages are required
  145. const std::size_t pages(static_cast< std::size_t >( std::ceil( static_cast< float >( size_) / stack_traits::page_size() ) ) );
  146. // add one page at bottom that will be used as guard-page
  147. const std::size_t size__ = ( pages + 1) * stack_traits::page_size();
  148. // conform to POSIX.4 (POSIX.1b-1993, _POSIX_C_SOURCE=199309L)
  149. #if defined(MAP_ANON)
  150. void * vp = ::mmap( 0, size__, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
  151. #else
  152. void * vp = ::mmap( 0, size__, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
  153. #endif
  154. if ( MAP_FAILED == vp) throw std::bad_alloc();
  155. // conforming to POSIX.1-2001
  156. const int result( ::mprotect( vp, stack_traits::page_size(), PROT_NONE) );
  157. assert( 0 == result);
  158. stack_context sctx;
  159. sctx.size = size__;
  160. sctx.sp = static_cast< char * >( vp) + sctx.size;
  161. return sctx;
  162. }
  163. void stack_traits::deallocate(stack_context & sctx) {
  164. assert( sctx.sp);
  165. void * vp = static_cast< char * >( sctx.sp) - sctx.size;
  166. // conform to POSIX.4 (POSIX.1b-1993, _POSIX_C_SOURCE=199309L)
  167. ::munmap( vp, sctx.size);
  168. }
  169. #endif
  170. ////////////////////////////////////////////////////////
  171. CoroutineInfo::CoroutineInfo()
  172. : _prev(NULL)
  173. , _next(NULL)
  174. // , _main(true)
  175. , _scheduler(NULL)
  176. , _uid(0)
  177. , _eStatus(CORO_FREE)
  178. // , _ctx_to(NULL)
  179. {
  180. }
  181. CoroutineInfo::CoroutineInfo(CoroutineScheduler* scheduler)
  182. : _prev(NULL)
  183. , _next(NULL)
  184. // , _main(false)
  185. , _scheduler(scheduler)
  186. , _uid(0)
  187. , _eStatus(CORO_FREE)
  188. // , _ctx_to(NULL)
  189. {
  190. }
  191. CoroutineInfo::CoroutineInfo(CoroutineScheduler* scheduler, uint32_t iUid, stack_context stack_ctx)
  192. : _prev(NULL)
  193. , _next(NULL)
  194. // , _main(false)
  195. , _scheduler(scheduler)
  196. , _uid(iUid)
  197. , _eStatus(CORO_FREE)
  198. , _stack_ctx(stack_ctx)
  199. // , _ctx_to(NULL)
  200. {
  201. }
  202. CoroutineInfo::~CoroutineInfo()
  203. {
  204. }
  205. void CoroutineInfo::registerFunc(const std::function<void ()>& callback)
  206. {
  207. _callback = callback;
  208. _init_func.coroFunc = CoroutineInfo::corotineProc;
  209. _init_func.args = this;
  210. // _ctx_to = make_fcontext(_stack_ctx.sp, _stack_ctx.size, CoroutineInfo::corotineEntry);
  211. // jump_fcontext(&_ctx_from, _ctx_to, (intptr_t)this, false);
  212. fcontext_t ctx = make_fcontext(_stack_ctx.sp, _stack_ctx.size, CoroutineInfo::corotineEntry);
  213. transfer_t tf = jump_fcontext(ctx, this);
  214. //实际的ctx
  215. this->setCtx(tf.fctx);
  216. }
  217. void CoroutineInfo::setStackContext(stack_context stack_ctx)
  218. {
  219. _stack_ctx = stack_ctx;
  220. }
  221. void CoroutineInfo::corotineEntry(transfer_t tf)
  222. {
  223. CoroutineInfo * coro = static_cast< CoroutineInfo * >(tf.data);
  224. Func func = coro->_init_func.coroFunc;
  225. void* args = coro->_init_func.args;
  226. // jump_fcontext(coro->_ctx_to, &(coro->_ctx_from), 0, false);
  227. transfer_t t = jump_fcontext(tf.fctx, NULL);
  228. coro->_scheduler->setMainCtx(t.fctx);
  229. try
  230. {
  231. func(args, t);
  232. }
  233. catch(std::exception &ex)
  234. {
  235. TLOGERROR("[TARS][CoroutineInfo::corotineEntry exception:" << ex.what() << endl);
  236. }
  237. catch(...)
  238. {
  239. TLOGERROR("[TARS][CoroutineInfo::corotineEntry unknown exception." << endl);
  240. }
  241. }
  242. void CoroutineInfo::corotineProc(void * args, transfer_t t)
  243. {
  244. CoroutineInfo *coro = (CoroutineInfo*)args;
  245. try
  246. {
  247. std::function<void ()> cb = coro->_callback;
  248. cb();
  249. }
  250. catch(std::exception &ex)
  251. {
  252. TLOGERROR("[TARS][CoroutineInfo::corotineProc exception:" << ex.what() << endl);
  253. }
  254. catch(...)
  255. {
  256. TLOGERROR("[TARS][CoroutineInfo::corotineProc unknown exception." << endl);
  257. }
  258. CoroutineScheduler* scheduler = coro->getScheduler();
  259. scheduler->decUsedSize();
  260. scheduler->moveToFreeList(coro);
  261. scheduler->switchCoro(&(scheduler->getMainCoroutine()));
  262. // scheduler->switchCoro(coro, &(scheduler->getMainCoroutine()));
  263. TLOGERROR("[TARS][CoroutineInfo::corotineProc no come." << endl);
  264. }
  265. //////////////////////////////////////////////////////////////
  266. CoroutineScheduler::CoroutineScheduler()
  267. : _terminal(false)
  268. , _poolSize(1000)
  269. , _stackSize(128*1024)
  270. , _currentSize(0)
  271. , _usedSize(0)
  272. , _uniqId(0)
  273. , _handle(NULL)
  274. , _currentCoro(NULL)
  275. , _all_coro(NULL)
  276. {
  277. _all_coro = new CoroutineInfo*[_poolSize+1];
  278. for(size_t i = 0; i <= _poolSize; ++i)
  279. {
  280. _all_coro[i] = NULL;
  281. }
  282. CoroutineInfo::CoroutineHeadInit(&_active);
  283. CoroutineInfo::CoroutineHeadInit(&_avail);
  284. CoroutineInfo::CoroutineHeadInit(&_inactive);
  285. CoroutineInfo::CoroutineHeadInit(&_timeout);
  286. CoroutineInfo::CoroutineHeadInit(&_free);
  287. }
  288. CoroutineScheduler::~CoroutineScheduler()
  289. {}
  290. void CoroutineScheduler::init(uint32_t iPoolSize, size_t iStackSize)
  291. {
  292. if(iPoolSize <= 0)
  293. {
  294. TLOGERROR("[TARS][[CoroutineScheduler::init iPoolSize <= 0." << endl);
  295. return ;
  296. }
  297. _terminal = false;
  298. _poolSize = iPoolSize;
  299. _stackSize = iStackSize;
  300. if(_poolSize <= 100)
  301. {
  302. _currentSize = _poolSize;
  303. }
  304. else
  305. {
  306. _currentSize = 100;
  307. }
  308. if(_all_coro != NULL)
  309. {
  310. delete [] _all_coro;
  311. _all_coro = new CoroutineInfo*[_poolSize+1];
  312. for(size_t i = 0; i <= _poolSize; ++i)
  313. {
  314. _all_coro[i] = NULL;
  315. }
  316. }
  317. _usedSize = 0;
  318. _uniqId = 0;
  319. int iSucc = 0;
  320. for(size_t i = 0; i < _currentSize; ++i)
  321. {
  322. CoroutineInfo *coro = new CoroutineInfo(this);
  323. // stack_context s_ctx;
  324. // int ret = _alloc.allocate(s_ctx, iStackSize);
  325. // if(ret != 0)
  326. // {
  327. // TLOGERROR("[TARS][CoroutineScheduler::init iPoolSize:" << iPoolSize << "|iStackSize:" << iStackSize << "|i:" << i << endl);
  328. // delete coro;
  329. // coro = NULL;
  330. // break;
  331. // }
  332. stack_context s_ctx = stack_traits::allocate(iStackSize);
  333. coro->setStackContext(s_ctx);
  334. uint32_t iId = generateId();
  335. coro->setUid(iId);
  336. coro->setStatus(CORO_FREE);
  337. //_free.push_front(coro);
  338. //_all.insert(make_pair(coro->getUid(), coro));
  339. _all_coro[iId] = coro;
  340. CoroutineInfo::CoroutineAddTail(coro, &_free);
  341. ++iSucc;
  342. }
  343. _currentSize = iSucc;
  344. _mainCoro.setUid(0);
  345. _mainCoro.setStatus(CORO_FREE);
  346. _currentCoro = &_mainCoro;
  347. TLOGDEBUG("[TARS][CoroutineScheduler::init iPoolSize:" << _poolSize << "|iCurrentSize:" << _currentSize << "|iStackSize:" << _stackSize << endl);
  348. }
  349. int CoroutineScheduler::increaseCoroPoolSize()
  350. {
  351. int iInc = ((_poolSize - _currentSize) > 100) ? 100 : (_poolSize - _currentSize);
  352. if(iInc <= 0)
  353. {
  354. TLOGERROR("[TARS][CoroutineScheduler::increaseCoroPoolSize full iPoolSize:" << _poolSize << "|iCurrentSize:" << _currentSize << endl);
  355. return -1;
  356. }
  357. int iSucc = 0;
  358. for(int i = 0; i < iInc; ++i)
  359. {
  360. CoroutineInfo *coro = new CoroutineInfo(this);
  361. uint32_t iId = generateId();
  362. coro->setUid(iId);
  363. coro->setStatus(CORO_FREE);
  364. // stack_context s_ctx;
  365. // int ret = _alloc.allocate(s_ctx, _stackSize);
  366. // if(ret != 0)
  367. // {
  368. // TLOGERROR("[TARS][CoroutineScheduler::increaseCoroPoolSize iPoolSize:" << _poolSize << "|iStackSize:" << _stackSize << "|i:" << i << endl);
  369. // delete coro;
  370. // coro = NULL;
  371. // break;
  372. // }
  373. stack_context s_ctx = stack_traits::allocate(_stackSize);
  374. coro->setStackContext(s_ctx);
  375. _all_coro[iId] = coro;
  376. CoroutineInfo::CoroutineAddTail(coro, &_free);
  377. ++iSucc;
  378. }
  379. if(iSucc == 0)
  380. {
  381. TLOGERROR("[TARS][CoroutineScheduler::increaseCoroPoolSize cannot create iInc:" << iInc << "|iPoolSize:" << _poolSize << endl);
  382. return -1;
  383. }
  384. _currentSize += iSucc;
  385. return 0;
  386. }
  387. uint32_t CoroutineScheduler::createCoroutine(const std::function<void ()> &callback)
  388. {
  389. if(_usedSize >= _currentSize || CoroutineInfo::CoroutineHeadEmpty(&_free))
  390. {
  391. int iRet = increaseCoroPoolSize();
  392. if(iRet != 0)
  393. return 0;
  394. }
  395. CoroutineInfo *coro = _free._next;
  396. assert(coro != NULL);
  397. CoroutineInfo::CoroutineDel(coro);
  398. _usedSize++;
  399. coro->setStatus(CORO_AVAIL);
  400. CoroutineInfo::CoroutineAddTail(coro, &_avail);
  401. coro->registerFunc(callback);
  402. return coro->getUid();
  403. }
  404. void CoroutineScheduler::run()
  405. {
  406. while(!_terminal)
  407. {
  408. if(CoroutineInfo::CoroutineHeadEmpty(&_avail) && CoroutineInfo::CoroutineHeadEmpty(&_active))
  409. {
  410. TC_ThreadLock::Lock lock(_monitor);
  411. if(_activeCoroQueue.size() <= 0)
  412. {
  413. _monitor.timedWait(1000);
  414. }
  415. }
  416. wakeupbytimeout();
  417. wakeupbyself();
  418. wakeup();
  419. if(!CoroutineInfo::CoroutineHeadEmpty(&_active))
  420. {
  421. int iLoop = 100;
  422. while(iLoop > 0 && !CoroutineInfo::CoroutineHeadEmpty(&_active))
  423. {
  424. CoroutineInfo *coro = _active._next;
  425. assert(coro != NULL);
  426. // switchCoro(&_mainCoro, coro);
  427. switchCoro(coro);
  428. --iLoop;
  429. }
  430. }
  431. if(!CoroutineInfo::CoroutineHeadEmpty(&_avail))
  432. {
  433. CoroutineInfo *coro = _avail._next;
  434. assert(coro != NULL);
  435. // switchCoro(&_mainCoro, coro);
  436. switchCoro(coro);
  437. }
  438. if(_usedSize == 0)
  439. break;
  440. }
  441. destroy();
  442. }
  443. void CoroutineScheduler::tars_run()
  444. {
  445. if(!_terminal)
  446. {
  447. wakeupbytimeout();
  448. wakeupbyself();
  449. wakeup();
  450. if(!CoroutineInfo::CoroutineHeadEmpty(&_active))
  451. {
  452. int iLoop = 100;
  453. while(iLoop > 0 && !CoroutineInfo::CoroutineHeadEmpty(&_active))
  454. {
  455. CoroutineInfo *coro = _active._next;
  456. assert(coro != NULL);
  457. // switchCoro(&_mainCoro, coro);
  458. switchCoro(coro);
  459. --iLoop;
  460. }
  461. }
  462. if(!CoroutineInfo::CoroutineHeadEmpty(&_avail))
  463. {
  464. int iLoop = 100;
  465. while(iLoop > 0 && !CoroutineInfo::CoroutineHeadEmpty(&_avail))
  466. {
  467. CoroutineInfo *coro = _avail._next;
  468. assert(coro != NULL);
  469. // switchCoro(&_mainCoro, coro);
  470. switchCoro(coro);
  471. --iLoop;
  472. }
  473. }
  474. }
  475. }
  476. void CoroutineScheduler::yield(bool bFlag)
  477. {
  478. if(bFlag)
  479. {
  480. putbyself(_currentCoro->getUid());
  481. }
  482. moveToInactive(_currentCoro);
  483. // switchCoro(_currentCoro, &_mainCoro);
  484. switchCoro(&_mainCoro);
  485. }
  486. void CoroutineScheduler::sleep(int iSleepTime)
  487. {
  488. int64_t iNow = TNOWMS;
  489. int64_t iTimeout = iNow + (iSleepTime >= 0 ? iSleepTime : -iSleepTime);
  490. _timeoutCoroId.insert(make_pair(iTimeout, _currentCoro->getUid()));
  491. moveToTimeout(_currentCoro);
  492. // switchCoro(_currentCoro, &_mainCoro);
  493. switchCoro(&_mainCoro);
  494. }
  495. void CoroutineScheduler::putbyself(uint32_t iCoroId)
  496. {
  497. if(!_terminal)
  498. {
  499. _needActiveCoroId.push_back(iCoroId);
  500. }
  501. }
  502. void CoroutineScheduler::wakeupbyself()
  503. {
  504. if(!_terminal)
  505. {
  506. if(_needActiveCoroId.size() > 0)
  507. {
  508. list<uint32_t>::iterator it = _needActiveCoroId.begin();
  509. while(it != _needActiveCoroId.end())
  510. {
  511. CoroutineInfo *coro = _all_coro[*it];
  512. assert(coro != NULL);
  513. moveToAvail(coro);
  514. ++it;
  515. }
  516. _needActiveCoroId.clear();
  517. }
  518. }
  519. }
  520. void CoroutineScheduler::put(uint32_t iCoroId)
  521. {
  522. if(!_terminal)
  523. {
  524. _activeCoroQueue.push_back(iCoroId);
  525. if(_handle)
  526. {
  527. _handle->notifyFilter();
  528. }
  529. else
  530. {
  531. TC_ThreadLock::Lock lock(_monitor);
  532. _monitor.notifyAll();
  533. }
  534. }
  535. }
  536. void CoroutineScheduler::wakeup()
  537. {
  538. if(!_terminal)
  539. {
  540. if(_activeCoroQueue.size() > 0)
  541. {
  542. TC_ThreadQueue<uint32_t, deque<uint32_t> >::queue_type coroIds;
  543. _activeCoroQueue.swap(coroIds);
  544. TC_ThreadQueue<uint32_t, deque<uint32_t> >::queue_type::iterator it = coroIds.begin();
  545. TC_ThreadQueue<uint32_t, deque<uint32_t> >::queue_type::iterator itEnd = coroIds.end();
  546. while(it != itEnd)
  547. {
  548. CoroutineInfo *coro = _all_coro[*it];
  549. assert(coro != NULL);
  550. moveToActive(coro);
  551. ++it;
  552. }
  553. }
  554. }
  555. }
  556. void CoroutineScheduler::wakeupbytimeout()
  557. {
  558. if(!_terminal)
  559. {
  560. if(_timeoutCoroId.size() > 0)
  561. {
  562. int64_t iNow = TNOWMS;
  563. while(true)
  564. {
  565. multimap<int64_t, uint32_t>::iterator it = _timeoutCoroId.begin();
  566. if(it == _timeoutCoroId.end() || it->first > iNow)
  567. break;
  568. CoroutineInfo *coro = _all_coro[it->second];
  569. assert(coro != NULL);
  570. moveToActive(coro);
  571. _timeoutCoroId.erase(it);
  572. }
  573. }
  574. }
  575. }
  576. void CoroutineScheduler::terminate()
  577. {
  578. _terminal = true;
  579. if(_handle)
  580. {
  581. _handle->notifyFilter();
  582. }
  583. else
  584. {
  585. TC_ThreadLock::Lock lock(_monitor);
  586. _monitor.notifyAll();
  587. }
  588. }
  589. uint32_t CoroutineScheduler::generateId()
  590. {
  591. uint32_t i = ++_uniqId;
  592. if(i == 0) {
  593. i = ++_uniqId;
  594. }
  595. assert(i <= _poolSize);
  596. return i;
  597. }
  598. // void CoroutineScheduler::switchCoro(CoroutineInfo *from, CoroutineInfo *to)
  599. void CoroutineScheduler::switchCoro(CoroutineInfo *to)
  600. {
  601. _currentCoro = to;
  602. // jump_fcontext(from->getCtx(), to->getCtx(), 0, false);
  603. transfer_t t = jump_fcontext(to->getCtx(), NULL);
  604. to->setCtx(t.fctx);
  605. }
  606. void CoroutineScheduler::moveToActive(CoroutineInfo *coro, bool bFlag)
  607. {
  608. if(coro->getStatus() == CORO_INACTIVE)
  609. {
  610. CoroutineInfo::CoroutineDel(coro);
  611. coro->setStatus(CORO_ACTIVE);
  612. CoroutineInfo::CoroutineAddTail(coro, &_active);
  613. }
  614. else if(coro->getStatus() == CORO_TIMEOUT)
  615. {
  616. CoroutineInfo::CoroutineDel(coro);
  617. coro->setStatus(CORO_ACTIVE);
  618. CoroutineInfo::CoroutineAddTail(coro, &_active);
  619. }
  620. else
  621. {
  622. TLOGERROR("[TARS][CoroutineScheduler::moveToActive ERROR|iCoroId:" << coro->getUid() << "|tyep:" << coro->getStatus() << endl);
  623. }
  624. }
  625. void CoroutineScheduler::moveToAvail(CoroutineInfo *coro)
  626. {
  627. if(coro->getStatus() == CORO_INACTIVE)
  628. {
  629. CoroutineInfo::CoroutineDel(coro);
  630. coro->setStatus(CORO_AVAIL);
  631. CoroutineInfo::CoroutineAddTail(coro, &_avail);
  632. }
  633. else
  634. {
  635. TLOGERROR("[TARS][CoroutineScheduler::moveToAvail ERROR:|iCoroId:" << coro->getUid() << "|tyep:" << coro->getStatus() << endl);
  636. }
  637. }
  638. void CoroutineScheduler::moveToInactive(CoroutineInfo *coro)
  639. {
  640. if(coro->getStatus() == CORO_ACTIVE)
  641. {
  642. CoroutineInfo::CoroutineDel(coro);
  643. coro->setStatus(CORO_INACTIVE);
  644. CoroutineInfo::CoroutineAddTail(coro, &_inactive);
  645. }
  646. else if(coro->getStatus() == CORO_AVAIL)
  647. {
  648. CoroutineInfo::CoroutineDel(coro);
  649. coro->setStatus(CORO_INACTIVE);
  650. CoroutineInfo::CoroutineAddTail(coro, &_inactive);
  651. }
  652. else
  653. {
  654. TLOGERROR("[TARS][CoroutineScheduler::moveToInactive ERROR|iCoroId:" << coro->getUid() << "|tyep:" << coro->getStatus() << endl);
  655. }
  656. }
  657. void CoroutineScheduler::moveToTimeout(CoroutineInfo *coro)
  658. {
  659. if(coro->getStatus() == CORO_ACTIVE)
  660. {
  661. CoroutineInfo::CoroutineDel(coro);
  662. coro->setStatus(CORO_TIMEOUT);
  663. CoroutineInfo::CoroutineAddTail(coro, &_timeout);
  664. }
  665. else if(coro->getStatus() == CORO_AVAIL)
  666. {
  667. CoroutineInfo::CoroutineDel(coro);
  668. coro->setStatus(CORO_TIMEOUT);
  669. CoroutineInfo::CoroutineAddTail(coro, &_timeout);
  670. }
  671. else
  672. {
  673. TLOGERROR("[TARS][CoroutineScheduler::moveToTimeout ERROR|iCoroId:" << coro->getUid() << "|tyep:" << coro->getStatus() << endl);
  674. }
  675. }
  676. void CoroutineScheduler::moveToFreeList(CoroutineInfo *coro)
  677. {
  678. if(coro->getStatus() == CORO_ACTIVE)
  679. {
  680. CoroutineInfo::CoroutineDel(coro);
  681. coro->setStatus(CORO_FREE);
  682. CoroutineInfo::CoroutineAddTail(coro, &_free);
  683. }
  684. else if(coro->getStatus() == CORO_AVAIL)
  685. {
  686. CoroutineInfo::CoroutineDel(coro);
  687. coro->setStatus(CORO_FREE);
  688. CoroutineInfo::CoroutineAddTail(coro, &_free);
  689. }
  690. else if(coro->getStatus() == CORO_INACTIVE)
  691. {
  692. CoroutineInfo::CoroutineDel(coro);
  693. coro->setStatus(CORO_FREE);
  694. CoroutineInfo::CoroutineAddTail(coro, &_free);
  695. }
  696. else if(coro->getStatus() == CORO_TIMEOUT)
  697. {
  698. CoroutineInfo::CoroutineDel(coro);
  699. coro->setStatus(CORO_FREE);
  700. CoroutineInfo::CoroutineAddTail(coro, &_free);
  701. }
  702. else
  703. {
  704. TLOGERROR("[TARS][CoroutineScheduler::moveToFreeList ERROR: already free|iCoroId:" << coro->getUid() << "|tyep:" << coro->getStatus() << endl);
  705. }
  706. }
  707. void CoroutineScheduler::destroy()
  708. {
  709. if(_all_coro)
  710. {
  711. for(size_t i = 1; i <= _poolSize; i++)
  712. {
  713. if(_all_coro[i])
  714. {
  715. stack_traits::deallocate(_all_coro[i]->getStackContext());
  716. }
  717. }
  718. delete [] _all_coro;
  719. }
  720. }
  721. /////////////////////////////////////////////////////////
  722. Coroutine::Coroutine()
  723. : _coroSched(NULL)
  724. , _num(1)
  725. , _maxNum(128)
  726. , _stackSize(128*1024)
  727. {
  728. }
  729. Coroutine::~Coroutine()
  730. {
  731. if(isAlive())
  732. {
  733. terminate();
  734. getThreadControl().join();
  735. }
  736. }
  737. void Coroutine::setCoroInfo(uint32_t iNum, uint32_t iMaxNum, size_t iStackSize)
  738. {
  739. _maxNum = (iMaxNum > 0 ? iMaxNum : 1);
  740. _num = (iNum > 0 ? (iNum <= _maxNum ? iNum : _maxNum) : 1);
  741. _stackSize = (iStackSize >= pagesize() ? iStackSize : pagesize());
  742. }
  743. void Coroutine::run()
  744. {
  745. initialize();
  746. handleCoro();
  747. destroy();
  748. }
  749. void Coroutine::terminate()
  750. {
  751. if(_coroSched)
  752. {
  753. _coroSched->terminate();
  754. }
  755. }
  756. void Coroutine::handleCoro()
  757. {
  758. _coroSched = new CoroutineScheduler();
  759. _coroSched->init(_maxNum, _stackSize);
  760. ServantProxyThreadData * pSptd = ServantProxyThreadData::getData();
  761. assert(pSptd != NULL);
  762. pSptd->_sched = _coroSched;
  763. for(uint32_t i = 0; i < _num; ++i)
  764. {
  765. _coroSched->createCoroutine(std::bind(&Coroutine::coroEntry, this));
  766. }
  767. _coroSched->run();
  768. delete _coroSched;
  769. _coroSched = NULL;
  770. }
  771. void Coroutine::coroEntry(Coroutine *pCoro)
  772. {
  773. try
  774. {
  775. pCoro->handle();
  776. }
  777. catch(exception &ex)
  778. {
  779. TLOGERROR("[TARS][[Coroutine::coroEntry exception:" << ex.what() << "]" << endl);
  780. }
  781. catch(...)
  782. {
  783. TLOGERROR("[TARS][[Coroutine::coroEntry unknown exception]" << endl);
  784. }
  785. }
  786. uint32_t Coroutine::createCoroutine(const std::function<void ()> &coroFunc)
  787. {
  788. if(_coroSched)
  789. {
  790. return _coroSched->createCoroutine(coroFunc);
  791. }
  792. else
  793. {
  794. TLOGERROR("[TARS][[Coroutine::createCoroutine coro sched no init]" << endl);
  795. }
  796. return -1;
  797. }
  798. void Coroutine::yield()
  799. {
  800. if(_coroSched)
  801. {
  802. _coroSched->yield();
  803. }
  804. else
  805. {
  806. throw CoroutineException("[TARS][[Coroutine::yield coro sched no init]");
  807. }
  808. }
  809. void Coroutine::Sleep(int iSleepTime)
  810. {
  811. if(_coroSched)
  812. {
  813. _coroSched->sleep(iSleepTime);
  814. }
  815. else
  816. {
  817. throw CoroutineException("[TARS][[Coroutine::yield coro sched no init]");
  818. }
  819. }
  820. }