task_group.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // bthread - A M:N threading library to make applications more concurrent.
  18. // Date: Tue Jul 10 17:40:58 CST 2012
  19. #include <sys/types.h>
  20. #include <stddef.h> // size_t
  21. #include <gflags/gflags.h>
  22. #include "butil/compat.h" // OS_MACOSX
  23. #include "butil/macros.h" // ARRAY_SIZE
  24. #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
  25. #include "butil/fast_rand.h"
  26. #include "butil/unique_ptr.h"
  27. #include "butil/third_party/murmurhash3/murmurhash3.h" // fmix64
  28. #include "bthread/errno.h" // ESTOP
  29. #include "bthread/butex.h" // butex_*
  30. #include "bthread/sys_futex.h" // futex_wake_private
  31. #include "bthread/processor.h" // cpu_relax
  32. #include "bthread/task_control.h"
  33. #include "bthread/task_group.h"
  34. #include "bthread/timer_thread.h"
  35. #include "bthread/errno.h"
  36. namespace bthread {
  37. static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = {
  38. BTHREAD_STACKTYPE_UNKNOWN, 0, NULL };
  39. static bool pass_bool(const char*, bool) { return true; }
  40. DEFINE_bool(show_bthread_creation_in_vars, false, "When this flags is on, The time "
  41. "from bthread creation to first run will be recorded and shown "
  42. "in /vars");
  43. const bool ALLOW_UNUSED dummy_show_bthread_creation_in_vars =
  44. ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_show_bthread_creation_in_vars,
  45. pass_bool);
  46. DEFINE_bool(show_per_worker_usage_in_vars, false,
  47. "Show per-worker usage in /vars/bthread_per_worker_usage_<tid>");
  48. const bool ALLOW_UNUSED dummy_show_per_worker_usage_in_vars =
  49. ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_show_per_worker_usage_in_vars,
  50. pass_bool);
  51. __thread TaskGroup* tls_task_group = NULL;
  52. // Sync with TaskMeta::local_storage when a bthread is created or destroyed.
  53. // During running, the two fields may be inconsistent, use tls_bls as the
  54. // groundtruth.
  55. thread_local LocalStorage tls_bls = BTHREAD_LOCAL_STORAGE_INITIALIZER;
  56. // defined in bthread/key.cpp
  57. extern void return_keytable(bthread_keytable_pool_t*, KeyTable*);
  58. // [Hacky] This is a special TLS set by bthread-rpc privately... to save
  59. // overhead of creation keytable, may be removed later.
  60. BAIDU_THREAD_LOCAL void* tls_unique_user_ptr = NULL;
  61. const TaskStatistics EMPTY_STAT = { 0, 0 };
  62. const size_t OFFSET_TABLE[] = {
  63. #include "bthread/offset_inl.list"
  64. };
  65. int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) {
  66. TaskMeta* const m = address_meta(tid);
  67. if (m != NULL) {
  68. const uint32_t given_ver = get_version(tid);
  69. BAIDU_SCOPED_LOCK(m->version_lock);
  70. if (given_ver == *m->version_butex) {
  71. *out = m->attr;
  72. return 0;
  73. }
  74. }
  75. errno = EINVAL;
  76. return -1;
  77. }
  78. void TaskGroup::set_stopped(bthread_t tid) {
  79. TaskMeta* const m = address_meta(tid);
  80. if (m != NULL) {
  81. const uint32_t given_ver = get_version(tid);
  82. BAIDU_SCOPED_LOCK(m->version_lock);
  83. if (given_ver == *m->version_butex) {
  84. m->stop = true;
  85. }
  86. }
  87. }
  88. bool TaskGroup::is_stopped(bthread_t tid) {
  89. TaskMeta* const m = address_meta(tid);
  90. if (m != NULL) {
  91. const uint32_t given_ver = get_version(tid);
  92. BAIDU_SCOPED_LOCK(m->version_lock);
  93. if (given_ver == *m->version_butex) {
  94. return m->stop;
  95. }
  96. }
  97. // If the tid does not exist or version does not match, it's intuitive
  98. // to treat the thread as "stopped".
  99. return true;
  100. }
  101. bool TaskGroup::wait_task(bthread_t* tid) {
  102. do {
  103. #ifndef BTHREAD_DONT_SAVE_PARKING_STATE
  104. if (_last_pl_state.stopped()) {
  105. return false;
  106. }
  107. _pl->wait(_last_pl_state);
  108. if (steal_task(tid)) {
  109. return true;
  110. }
  111. #else
  112. const ParkingLot::State st = _pl->get_state();
  113. if (st.stopped()) {
  114. return false;
  115. }
  116. if (steal_task(tid)) {
  117. return true;
  118. }
  119. _pl->wait(st);
  120. #endif
  121. } while (true);
  122. }
  123. static double get_cumulated_cputime_from_this(void* arg) {
  124. return static_cast<TaskGroup*>(arg)->cumulated_cputime_ns() / 1000000000.0;
  125. }
  126. void TaskGroup::run_main_task() {
  127. bvar::PassiveStatus<double> cumulated_cputime(
  128. get_cumulated_cputime_from_this, this);
  129. std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;
  130. TaskGroup* dummy = this;
  131. bthread_t tid;
  132. while (wait_task(&tid)) {
  133. TaskGroup::sched_to(&dummy, tid);
  134. DCHECK_EQ(this, dummy);
  135. DCHECK_EQ(_cur_meta->stack, _main_stack);
  136. if (_cur_meta->tid != _main_tid) {
  137. TaskGroup::task_runner(1/*skip remained*/);
  138. }
  139. if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) {
  140. char name[32];
  141. #if defined(OS_MACOSX)
  142. snprintf(name, sizeof(name), "bthread_worker_usage_%" PRIu64,
  143. pthread_numeric_id());
  144. #else
  145. snprintf(name, sizeof(name), "bthread_worker_usage_%ld",
  146. (long)syscall(SYS_gettid));
  147. #endif
  148. usage_bvar.reset(new bvar::PerSecond<bvar::PassiveStatus<double> >
  149. (name, &cumulated_cputime, 1));
  150. }
  151. }
  152. // Don't forget to add elapse of last wait_task.
  153. current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
  154. }
  155. TaskGroup::TaskGroup(TaskControl* c)
  156. :
  157. #ifndef NDEBUG
  158. _sched_recursive_guard(0),
  159. #endif
  160. _cur_meta(NULL)
  161. , _control(c)
  162. , _num_nosignal(0)
  163. , _nsignaled(0)
  164. , _last_run_ns(butil::cpuwide_time_ns())
  165. , _cumulated_cputime_ns(0)
  166. , _nswitch(0)
  167. , _last_context_remained(NULL)
  168. , _last_context_remained_arg(NULL)
  169. , _pl(NULL)
  170. , _main_stack(NULL)
  171. , _main_tid(0)
  172. , _remote_num_nosignal(0)
  173. , _remote_nsignaled(0)
  174. {
  175. _steal_seed = butil::fast_rand();
  176. _steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
  177. _pl = &c->_pl[butil::fmix64(pthread_numeric_id()) % TaskControl::PARKING_LOT_NUM];
  178. CHECK(c);
  179. }
  180. TaskGroup::~TaskGroup() {
  181. if (_main_tid) {
  182. TaskMeta* m = address_meta(_main_tid);
  183. CHECK(_main_stack == m->stack);
  184. return_stack(m->release_stack());
  185. return_resource(get_slot(_main_tid));
  186. _main_tid = 0;
  187. }
  188. }
  189. int TaskGroup::init(size_t runqueue_capacity) {
  190. if (_rq.init(runqueue_capacity) != 0) {
  191. LOG(FATAL) << "Fail to init _rq";
  192. return -1;
  193. }
  194. if (_remote_rq.init(runqueue_capacity / 2) != 0) {
  195. LOG(FATAL) << "Fail to init _remote_rq";
  196. return -1;
  197. }
  198. ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
  199. if (NULL == stk) {
  200. LOG(FATAL) << "Fail to get main stack container";
  201. return -1;
  202. }
  203. butil::ResourceId<TaskMeta> slot;
  204. TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
  205. if (NULL == m) {
  206. LOG(FATAL) << "Fail to get TaskMeta";
  207. return -1;
  208. }
  209. m->stop = false;
  210. m->interrupted = false;
  211. m->about_to_quit = false;
  212. m->fn = NULL;
  213. m->arg = NULL;
  214. m->local_storage = LOCAL_STORAGE_INIT;
  215. m->cpuwide_start_ns = butil::cpuwide_time_ns();
  216. m->stat = EMPTY_STAT;
  217. m->attr = BTHREAD_ATTR_TASKGROUP;
  218. m->tid = make_tid(*m->version_butex, slot);
  219. m->set_stack(stk);
  220. _cur_meta = m;
  221. _main_tid = m->tid;
  222. _main_stack = stk;
  223. _last_run_ns = butil::cpuwide_time_ns();
  224. return 0;
  225. }
  226. void TaskGroup::task_runner(intptr_t skip_remained) {
  227. // NOTE: tls_task_group is volatile since tasks are moved around
  228. // different groups.
  229. TaskGroup* g = tls_task_group;
  230. if (!skip_remained) {
  231. while (g->_last_context_remained) {
  232. RemainedFn fn = g->_last_context_remained;
  233. g->_last_context_remained = NULL;
  234. fn(g->_last_context_remained_arg);
  235. g = tls_task_group;
  236. }
  237. #ifndef NDEBUG
  238. --g->_sched_recursive_guard;
  239. #endif
  240. }
  241. do {
  242. // A task can be stopped before it gets running, in which case
  243. // we may skip user function, but that may confuse user:
  244. // Most tasks have variables to remember running result of the task,
  245. // which is often initialized to values indicating success. If an
  246. // user function is never called, the variables will be unchanged
  247. // however they'd better reflect failures because the task is stopped
  248. // abnormally.
  249. // Meta and identifier of the task is persistent in this run.
  250. TaskMeta* const m = g->_cur_meta;
  251. if (FLAGS_show_bthread_creation_in_vars) {
  252. // NOTE: the thread triggering exposure of pending time may spend
  253. // considerable time because a single bvar::LatencyRecorder
  254. // contains many bvar.
  255. g->_control->exposed_pending_time() <<
  256. (butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000L;
  257. }
  258. // Not catch exceptions except ExitException which is for implementing
  259. // bthread_exit(). User code is intended to crash when an exception is
  260. // not caught explicitly. This is consistent with other threading
  261. // libraries.
  262. void* thread_return;
  263. try {
  264. thread_return = m->fn(m->arg);
  265. } catch (ExitException& e) {
  266. thread_return = e.value();
  267. }
  268. // Group is probably changed
  269. g = tls_task_group;
  270. // TODO: Save thread_return
  271. (void)thread_return;
  272. // Logging must be done before returning the keytable, since the logging lib
  273. // use bthread local storage internally, or will cause memory leak.
  274. // FIXME: the time from quiting fn to here is not counted into cputime
  275. if (m->attr.flags & BTHREAD_LOG_START_AND_FINISH) {
  276. LOG(INFO) << "Finished bthread " << m->tid << ", cputime="
  277. << m->stat.cputime_ns / 1000000.0 << "ms";
  278. }
  279. // Clean tls variables, must be done before changing version_butex
  280. // otherwise another thread just joined this thread may not see side
  281. // effects of destructing tls variables.
  282. KeyTable* kt = tls_bls.keytable;
  283. if (kt != NULL) {
  284. return_keytable(m->attr.keytable_pool, kt);
  285. // After deletion: tls may be set during deletion.
  286. tls_bls.keytable = NULL;
  287. m->local_storage.keytable = NULL; // optional
  288. }
  289. // Increase the version and wake up all joiners, if resulting version
  290. // is 0, change it to 1 to make bthread_t never be 0. Any access
  291. // or join to the bthread after changing version will be rejected.
  292. // The spinlock is for visibility of TaskGroup::get_attr.
  293. {
  294. BAIDU_SCOPED_LOCK(m->version_lock);
  295. if (0 == ++*m->version_butex) {
  296. ++*m->version_butex;
  297. }
  298. }
  299. butex_wake_except(m->version_butex, 0);
  300. g->_control->_nbthreads << -1;
  301. g->set_remained(TaskGroup::_release_last_context, m);
  302. ending_sched(&g);
  303. } while (g->_cur_meta->tid != g->_main_tid);
  304. // Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
  305. // tasks to run, quit for more tasks.
  306. }
  307. void TaskGroup::_release_last_context(void* arg) {
  308. TaskMeta* m = static_cast<TaskMeta*>(arg);
  309. if (m->stack_type() != STACK_TYPE_PTHREAD) {
  310. return_stack(m->release_stack()/*may be NULL*/);
  311. } else {
  312. // it's _main_stack, don't return.
  313. m->set_stack(NULL);
  314. }
  315. return_resource(get_slot(m->tid));
  316. }
  317. int TaskGroup::start_foreground(TaskGroup** pg,
  318. bthread_t* __restrict th,
  319. const bthread_attr_t* __restrict attr,
  320. void * (*fn)(void*),
  321. void* __restrict arg) {
  322. if (__builtin_expect(!fn, 0)) {
  323. return EINVAL;
  324. }
  325. const int64_t start_ns = butil::cpuwide_time_ns();
  326. const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
  327. butil::ResourceId<TaskMeta> slot;
  328. TaskMeta* m = butil::get_resource(&slot);
  329. if (__builtin_expect(!m, 0)) {
  330. return ENOMEM;
  331. }
  332. CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
  333. m->stop = false;
  334. m->interrupted = false;
  335. m->about_to_quit = false;
  336. m->fn = fn;
  337. m->arg = arg;
  338. CHECK(m->stack == NULL);
  339. m->attr = using_attr;
  340. m->local_storage = LOCAL_STORAGE_INIT;
  341. m->cpuwide_start_ns = start_ns;
  342. m->stat = EMPTY_STAT;
  343. m->tid = make_tid(*m->version_butex, slot);
  344. *th = m->tid;
  345. if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
  346. LOG(INFO) << "Started bthread " << m->tid;
  347. }
  348. TaskGroup* g = *pg;
  349. g->_control->_nbthreads << 1;
  350. if (g->is_current_pthread_task()) {
  351. // never create foreground task in pthread.
  352. g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
  353. } else {
  354. // NOSIGNAL affects current task, not the new task.
  355. RemainedFn fn = NULL;
  356. if (g->current_task()->about_to_quit) {
  357. fn = ready_to_run_in_worker_ignoresignal;
  358. } else {
  359. fn = ready_to_run_in_worker;
  360. }
  361. ReadyToRunArgs args = {
  362. g->current_tid(),
  363. (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
  364. };
  365. g->set_remained(fn, &args);
  366. TaskGroup::sched_to(pg, m->tid);
  367. }
  368. return 0;
  369. }
  370. template <bool REMOTE>
  371. int TaskGroup::start_background(bthread_t* __restrict th,
  372. const bthread_attr_t* __restrict attr,
  373. void * (*fn)(void*),
  374. void* __restrict arg) {
  375. if (__builtin_expect(!fn, 0)) {
  376. return EINVAL;
  377. }
  378. const int64_t start_ns = butil::cpuwide_time_ns();
  379. const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
  380. butil::ResourceId<TaskMeta> slot;
  381. TaskMeta* m = butil::get_resource(&slot);
  382. if (__builtin_expect(!m, 0)) {
  383. return ENOMEM;
  384. }
  385. CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
  386. m->stop = false;
  387. m->interrupted = false;
  388. m->about_to_quit = false;
  389. m->fn = fn;
  390. m->arg = arg;
  391. CHECK(m->stack == NULL);
  392. m->attr = using_attr;
  393. m->local_storage = LOCAL_STORAGE_INIT;
  394. m->cpuwide_start_ns = start_ns;
  395. m->stat = EMPTY_STAT;
  396. m->tid = make_tid(*m->version_butex, slot);
  397. *th = m->tid;
  398. if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
  399. LOG(INFO) << "Started bthread " << m->tid;
  400. }
  401. _control->_nbthreads << 1;
  402. if (REMOTE) {
  403. ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
  404. } else {
  405. ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
  406. }
  407. return 0;
  408. }
  409. // Explicit instantiations.
  410. template int
  411. TaskGroup::start_background<true>(bthread_t* __restrict th,
  412. const bthread_attr_t* __restrict attr,
  413. void * (*fn)(void*),
  414. void* __restrict arg);
  415. template int
  416. TaskGroup::start_background<false>(bthread_t* __restrict th,
  417. const bthread_attr_t* __restrict attr,
  418. void * (*fn)(void*),
  419. void* __restrict arg);
  420. int TaskGroup::join(bthread_t tid, void** return_value) {
  421. if (__builtin_expect(!tid, 0)) { // tid of bthread is never 0.
  422. return EINVAL;
  423. }
  424. TaskMeta* m = address_meta(tid);
  425. if (__builtin_expect(!m, 0)) {
  426. // The bthread is not created yet, this join is definitely wrong.
  427. return EINVAL;
  428. }
  429. TaskGroup* g = tls_task_group;
  430. if (g != NULL && g->current_tid() == tid) {
  431. // joining self causes indefinite waiting.
  432. return EINVAL;
  433. }
  434. const uint32_t expected_version = get_version(tid);
  435. while (*m->version_butex == expected_version) {
  436. if (butex_wait(m->version_butex, expected_version, NULL) < 0 &&
  437. errno != EWOULDBLOCK && errno != EINTR) {
  438. return errno;
  439. }
  440. }
  441. if (return_value) {
  442. *return_value = NULL;
  443. }
  444. return 0;
  445. }
  446. bool TaskGroup::exists(bthread_t tid) {
  447. if (tid != 0) { // tid of bthread is never 0.
  448. TaskMeta* m = address_meta(tid);
  449. if (m != NULL) {
  450. return (*m->version_butex == get_version(tid));
  451. }
  452. }
  453. return false;
  454. }
  455. TaskStatistics TaskGroup::main_stat() const {
  456. TaskMeta* m = address_meta(_main_tid);
  457. return m ? m->stat : EMPTY_STAT;
  458. }
  459. void TaskGroup::ending_sched(TaskGroup** pg) {
  460. TaskGroup* g = *pg;
  461. bthread_t next_tid = 0;
  462. // Find next task to run, if none, switch to idle thread of the group.
  463. #ifndef BTHREAD_FAIR_WSQ
  464. // When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
  465. // WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
  466. // to 2.9%
  467. const bool popped = g->_rq.pop(&next_tid);
  468. #else
  469. const bool popped = g->_rq.steal(&next_tid);
  470. #endif
  471. if (!popped && !g->steal_task(&next_tid)) {
  472. // Jump to main task if there's no task to run.
  473. next_tid = g->_main_tid;
  474. }
  475. TaskMeta* const cur_meta = g->_cur_meta;
  476. TaskMeta* next_meta = address_meta(next_tid);
  477. if (next_meta->stack == NULL) {
  478. if (next_meta->stack_type() == cur_meta->stack_type()) {
  479. // also works with pthread_task scheduling to pthread_task, the
  480. // transfered stack is just _main_stack.
  481. next_meta->set_stack(cur_meta->release_stack());
  482. } else {
  483. ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
  484. if (stk) {
  485. next_meta->set_stack(stk);
  486. } else {
  487. // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
  488. // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
  489. // This basically means that if we can't allocate stack, run
  490. // the task in pthread directly.
  491. next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
  492. next_meta->set_stack(g->_main_stack);
  493. }
  494. }
  495. }
  496. sched_to(pg, next_meta);
  497. }
  498. void TaskGroup::sched(TaskGroup** pg) {
  499. TaskGroup* g = *pg;
  500. bthread_t next_tid = 0;
  501. // Find next task to run, if none, switch to idle thread of the group.
  502. #ifndef BTHREAD_FAIR_WSQ
  503. const bool popped = g->_rq.pop(&next_tid);
  504. #else
  505. const bool popped = g->_rq.steal(&next_tid);
  506. #endif
  507. if (!popped && !g->steal_task(&next_tid)) {
  508. // Jump to main task if there's no task to run.
  509. next_tid = g->_main_tid;
  510. }
  511. sched_to(pg, next_tid);
  512. }
  513. void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
  514. TaskGroup* g = *pg;
  515. #ifndef NDEBUG
  516. if ((++g->_sched_recursive_guard) > 1) {
  517. LOG(FATAL) << "Recursively(" << g->_sched_recursive_guard - 1
  518. << ") call sched_to(" << g << ")";
  519. }
  520. #endif
  521. // Save errno so that errno is bthread-specific.
  522. const int saved_errno = errno;
  523. void* saved_unique_user_ptr = tls_unique_user_ptr;
  524. TaskMeta* const cur_meta = g->_cur_meta;
  525. const int64_t now = butil::cpuwide_time_ns();
  526. const int64_t elp_ns = now - g->_last_run_ns;
  527. g->_last_run_ns = now;
  528. cur_meta->stat.cputime_ns += elp_ns;
  529. if (cur_meta->tid != g->main_tid()) {
  530. g->_cumulated_cputime_ns += elp_ns;
  531. }
  532. ++cur_meta->stat.nswitch;
  533. ++ g->_nswitch;
  534. // Switch to the task
  535. if (__builtin_expect(next_meta != cur_meta, 1)) {
  536. g->_cur_meta = next_meta;
  537. // Switch tls_bls
  538. cur_meta->local_storage = tls_bls;
  539. tls_bls = next_meta->local_storage;
  540. // Logging must be done after switching the local storage, since the logging lib
  541. // use bthread local storage internally, or will cause memory leak.
  542. if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
  543. (next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
  544. LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> "
  545. << next_meta->tid;
  546. }
  547. if (cur_meta->stack != NULL) {
  548. if (next_meta->stack != cur_meta->stack) {
  549. jump_stack(cur_meta->stack, next_meta->stack);
  550. // probably went to another group, need to assign g again.
  551. g = tls_task_group;
  552. }
  553. #ifndef NDEBUG
  554. else {
  555. // else pthread_task is switching to another pthread_task, sc
  556. // can only equal when they're both _main_stack
  557. CHECK(cur_meta->stack == g->_main_stack);
  558. }
  559. #endif
  560. }
  561. // else because of ending_sched(including pthread_task->pthread_task)
  562. } else {
  563. LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
  564. }
  565. while (g->_last_context_remained) {
  566. RemainedFn fn = g->_last_context_remained;
  567. g->_last_context_remained = NULL;
  568. fn(g->_last_context_remained_arg);
  569. g = tls_task_group;
  570. }
  571. // Restore errno
  572. errno = saved_errno;
  573. tls_unique_user_ptr = saved_unique_user_ptr;
  574. #ifndef NDEBUG
  575. --g->_sched_recursive_guard;
  576. #endif
  577. *pg = g;
  578. }
  579. void TaskGroup::destroy_self() {
  580. if (_control) {
  581. _control->_destroy_group(this);
  582. _control = NULL;
  583. } else {
  584. CHECK(false);
  585. }
  586. }
  587. void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
  588. push_rq(tid);
  589. if (nosignal) {
  590. ++_num_nosignal;
  591. } else {
  592. const int additional_signal = _num_nosignal;
  593. _num_nosignal = 0;
  594. _nsignaled += 1 + additional_signal;
  595. _control->signal_task(1 + additional_signal);
  596. }
  597. }
  598. void TaskGroup::flush_nosignal_tasks() {
  599. const int val = _num_nosignal;
  600. if (val) {
  601. _num_nosignal = 0;
  602. _nsignaled += val;
  603. _control->signal_task(val);
  604. }
  605. }
  606. void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
  607. _remote_rq._mutex.lock();
  608. while (!_remote_rq.push_locked(tid)) {
  609. flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
  610. LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
  611. << _remote_rq.capacity();
  612. ::usleep(1000);
  613. _remote_rq._mutex.lock();
  614. }
  615. if (nosignal) {
  616. ++_remote_num_nosignal;
  617. _remote_rq._mutex.unlock();
  618. } else {
  619. const int additional_signal = _remote_num_nosignal;
  620. _remote_num_nosignal = 0;
  621. _remote_nsignaled += 1 + additional_signal;
  622. _remote_rq._mutex.unlock();
  623. _control->signal_task(1 + additional_signal);
  624. }
  625. }
  626. void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) {
  627. const int val = _remote_num_nosignal;
  628. if (!val) {
  629. locked_mutex.unlock();
  630. return;
  631. }
  632. _remote_num_nosignal = 0;
  633. _remote_nsignaled += val;
  634. locked_mutex.unlock();
  635. _control->signal_task(val);
  636. }
  637. void TaskGroup::ready_to_run_general(bthread_t tid, bool nosignal) {
  638. if (tls_task_group == this) {
  639. return ready_to_run(tid, nosignal);
  640. }
  641. return ready_to_run_remote(tid, nosignal);
  642. }
  643. void TaskGroup::flush_nosignal_tasks_general() {
  644. if (tls_task_group == this) {
  645. return flush_nosignal_tasks();
  646. }
  647. return flush_nosignal_tasks_remote();
  648. }
  649. void TaskGroup::ready_to_run_in_worker(void* args_in) {
  650. ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
  651. return tls_task_group->ready_to_run(args->tid, args->nosignal);
  652. }
  653. void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) {
  654. ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
  655. return tls_task_group->push_rq(args->tid);
  656. }
  657. struct SleepArgs {
  658. uint64_t timeout_us;
  659. bthread_t tid;
  660. TaskMeta* meta;
  661. TaskGroup* group;
  662. };
  663. static void ready_to_run_from_timer_thread(void* arg) {
  664. CHECK(tls_task_group == NULL);
  665. const SleepArgs* e = static_cast<const SleepArgs*>(arg);
  666. e->group->control()->choose_one_group()->ready_to_run_remote(e->tid);
  667. }
  668. void TaskGroup::_add_sleep_event(void* void_args) {
  669. // Must copy SleepArgs. After calling TimerThread::schedule(), previous
  670. // thread may be stolen by a worker immediately and the on-stack SleepArgs
  671. // will be gone.
  672. SleepArgs e = *static_cast<SleepArgs*>(void_args);
  673. TaskGroup* g = e.group;
  674. TimerThread::TaskId sleep_id;
  675. sleep_id = get_global_timer_thread()->schedule(
  676. ready_to_run_from_timer_thread, void_args,
  677. butil::microseconds_from_now(e.timeout_us));
  678. if (!sleep_id) {
  679. // fail to schedule timer, go back to previous thread.
  680. g->ready_to_run(e.tid);
  681. return;
  682. }
  683. // Set TaskMeta::current_sleep which is for interruption.
  684. const uint32_t given_ver = get_version(e.tid);
  685. {
  686. BAIDU_SCOPED_LOCK(e.meta->version_lock);
  687. if (given_ver == *e.meta->version_butex && !e.meta->interrupted) {
  688. e.meta->current_sleep = sleep_id;
  689. return;
  690. }
  691. }
  692. // The thread is stopped or interrupted.
  693. // interrupt() always sees that current_sleep == 0. It will not schedule
  694. // the calling thread. The race is between current thread and timer thread.
  695. if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
  696. // added to timer, previous thread may be already woken up by timer and
  697. // even stopped. It's safe to schedule previous thread when unschedule()
  698. // returns 0 which means "the not-run-yet sleep_id is removed". If the
  699. // sleep_id is running(returns 1), ready_to_run_in_worker() will
  700. // schedule previous thread as well. If sleep_id does not exist,
  701. // previous thread is scheduled by timer thread before and we don't
  702. // have to do it again.
  703. g->ready_to_run(e.tid);
  704. }
  705. }
  706. // To be consistent with sys_usleep, set errno and return -1 on error.
  707. int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
  708. if (0 == timeout_us) {
  709. yield(pg);
  710. return 0;
  711. }
  712. TaskGroup* g = *pg;
  713. // We have to schedule timer after we switched to next bthread otherwise
  714. // the timer may wake up(jump to) current still-running context.
  715. SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g };
  716. g->set_remained(_add_sleep_event, &e);
  717. sched(pg);
  718. g = *pg;
  719. e.meta->current_sleep = 0;
  720. if (e.meta->interrupted) {
  721. // Race with set and may consume multiple interruptions, which are OK.
  722. e.meta->interrupted = false;
  723. // NOTE: setting errno to ESTOP is not necessary from bthread's
  724. // pespective, however many RPC code expects bthread_usleep to set
  725. // errno to ESTOP when the thread is stopping, and print FATAL
  726. // otherwise. To make smooth transitions, ESTOP is still set instead
  727. // of EINTR when the thread is stopping.
  728. errno = (e.meta->stop ? ESTOP : EINTR);
  729. return -1;
  730. }
  731. return 0;
  732. }
  733. // Defined in butex.cpp
  734. bool erase_from_butex_because_of_interruption(ButexWaiter* bw);
  735. static int interrupt_and_consume_waiters(
  736. bthread_t tid, ButexWaiter** pw, uint64_t* sleep_id) {
  737. TaskMeta* const m = TaskGroup::address_meta(tid);
  738. if (m == NULL) {
  739. return EINVAL;
  740. }
  741. const uint32_t given_ver = get_version(tid);
  742. BAIDU_SCOPED_LOCK(m->version_lock);
  743. if (given_ver == *m->version_butex) {
  744. *pw = m->current_waiter.exchange(NULL, butil::memory_order_acquire);
  745. *sleep_id = m->current_sleep;
  746. m->current_sleep = 0; // only one stopper gets the sleep_id
  747. m->interrupted = true;
  748. return 0;
  749. }
  750. return EINVAL;
  751. }
  752. static int set_butex_waiter(bthread_t tid, ButexWaiter* w) {
  753. TaskMeta* const m = TaskGroup::address_meta(tid);
  754. if (m != NULL) {
  755. const uint32_t given_ver = get_version(tid);
  756. BAIDU_SCOPED_LOCK(m->version_lock);
  757. if (given_ver == *m->version_butex) {
  758. // Release fence makes m->interrupted visible to butex_wait
  759. m->current_waiter.store(w, butil::memory_order_release);
  760. return 0;
  761. }
  762. }
  763. return EINVAL;
  764. }
  765. // The interruption is "persistent" compared to the ones caused by signals,
  766. // namely if a bthread is interrupted when it's not blocked, the interruption
  767. // is still remembered and will be checked at next blocking. This designing
  768. // choice simplifies the implementation and reduces notification loss caused
  769. // by race conditions.
  770. // TODO: bthreads created by BTHREAD_ATTR_PTHREAD blocking on bthread_usleep()
  771. // can't be interrupted.
  772. int TaskGroup::interrupt(bthread_t tid, TaskControl* c) {
  773. // Consume current_waiter in the TaskMeta, wake it up then set it back.
  774. ButexWaiter* w = NULL;
  775. uint64_t sleep_id = 0;
  776. int rc = interrupt_and_consume_waiters(tid, &w, &sleep_id);
  777. if (rc) {
  778. return rc;
  779. }
  780. // a bthread cannot wait on a butex and be sleepy at the same time.
  781. CHECK(!sleep_id || !w);
  782. if (w != NULL) {
  783. erase_from_butex_because_of_interruption(w);
  784. // If butex_wait() already wakes up before we set current_waiter back,
  785. // the function will spin until current_waiter becomes non-NULL.
  786. rc = set_butex_waiter(tid, w);
  787. if (rc) {
  788. LOG(FATAL) << "butex_wait should spin until setting back waiter";
  789. return rc;
  790. }
  791. } else if (sleep_id != 0) {
  792. if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
  793. bthread::TaskGroup* g = bthread::tls_task_group;
  794. if (g) {
  795. g->ready_to_run(tid);
  796. } else {
  797. if (!c) {
  798. return EINVAL;
  799. }
  800. c->choose_one_group()->ready_to_run_remote(tid);
  801. }
  802. }
  803. }
  804. return 0;
  805. }
  806. void TaskGroup::yield(TaskGroup** pg) {
  807. TaskGroup* g = *pg;
  808. ReadyToRunArgs args = { g->current_tid(), false };
  809. g->set_remained(ready_to_run_in_worker, &args);
  810. sched(pg);
  811. }
  812. void print_task(std::ostream& os, bthread_t tid) {
  813. TaskMeta* const m = TaskGroup::address_meta(tid);
  814. if (m == NULL) {
  815. os << "bthread=" << tid << " : never existed";
  816. return;
  817. }
  818. const uint32_t given_ver = get_version(tid);
  819. bool matched = false;
  820. bool stop = false;
  821. bool interrupted = false;
  822. bool about_to_quit = false;
  823. void* (*fn)(void*) = NULL;
  824. void* arg = NULL;
  825. bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
  826. bool has_tls = false;
  827. int64_t cpuwide_start_ns = 0;
  828. TaskStatistics stat = {0, 0};
  829. {
  830. BAIDU_SCOPED_LOCK(m->version_lock);
  831. if (given_ver == *m->version_butex) {
  832. matched = true;
  833. stop = m->stop;
  834. interrupted = m->interrupted;
  835. about_to_quit = m->about_to_quit;
  836. fn = m->fn;
  837. arg = m->arg;
  838. attr = m->attr;
  839. has_tls = m->local_storage.keytable;
  840. cpuwide_start_ns = m->cpuwide_start_ns;
  841. stat = m->stat;
  842. }
  843. }
  844. if (!matched) {
  845. os << "bthread=" << tid << " : not exist now";
  846. } else {
  847. os << "bthread=" << tid << " :\nstop=" << stop
  848. << "\ninterrupted=" << interrupted
  849. << "\nabout_to_quit=" << about_to_quit
  850. << "\nfn=" << (void*)fn
  851. << "\narg=" << (void*)arg
  852. << "\nattr={stack_type=" << attr.stack_type
  853. << " flags=" << attr.flags
  854. << " keytable_pool=" << attr.keytable_pool
  855. << "}\nhas_tls=" << has_tls
  856. << "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns
  857. << "\ncputime_ns=" << stat.cputime_ns
  858. << "\nnswitch=" << stat.nswitch;
  859. }
  860. }
  861. } // namespace bthread