| Line | % of fetches | Source |
|---|---|---|
| 1 | // Copyright (c) 2007-2016 Hartmut Kaiser | |
| 2 | // | |
| 3 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
| 4 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
| 5 | ||
| 6 | #if !defined(HPX_RUNTIME_THREADS_DETAIL_SCHEDULING_LOOP_JAN_11_2013_0838PM) | |
| 7 | #define HPX_RUNTIME_THREADS_DETAIL_SCHEDULING_LOOP_JAN_11_2013_0838PM | |
| 8 | ||
| 9 | #include <hpx/config.hpp> | |
| 10 | #include <hpx/runtime/agas/interface.hpp> | |
| 11 | #include <hpx/runtime/config_entry.hpp> | |
| 12 | #include <hpx/runtime/get_thread_name.hpp> | |
| 13 | #include <hpx/runtime/threads/detail/periodic_maintenance.hpp> | |
| 14 | #include <hpx/runtime/threads/thread_data.hpp> | |
| 15 | #include <hpx/state.hpp> | |
| 16 | #include <hpx/util/assert.hpp> | |
| 17 | #include <hpx/util/function.hpp> | |
| 18 | #include <hpx/util/hardware/timestamp.hpp> | |
| 19 | #include <hpx/util/itt_notify.hpp> | |
| 20 | #include <hpx/util/safe_lexical_cast.hpp> | |
| 21 | ||
| 22 | #include <boost/atomic.hpp> | |
| 23 | ||
| 24 | #if defined(HPX_HAVE_APEX) | |
| 25 | #include <hpx/util/apex.hpp> | |
| 26 | #endif | |
| 27 | ||
| 28 | #include <cstddef> | |
| 29 | #include <cstdint> | |
| 30 | #include <limits> | |
| 31 | #include <utility> | |
| 32 | ||
| 33 | namespace hpx { namespace threads { namespace detail | |
| 34 | { | |
| 35 | /////////////////////////////////////////////////////////////////////// | |
| 36 | inline void write_new_state_log_debug(std::size_t num_thread, | |
| 37 | thread_data* thrd, thread_state_enum state, char const* info) | |
| 38 | { | |
| 39 | LTM_(debug) << "tfunc(" << num_thread << "): " //-V128 | |
| 40 | << "thread(" << thrd->get_thread_id().get() << "), " | |
| 41 | << "description(" << thrd->get_description() << "), " | |
| 42 | << "new state(" << get_thread_state_name(state) << "), " | |
| 43 | << info; | |
| 44 | } | |
| 45 | inline void write_new_state_log_warning(std::size_t num_thread, | |
| 46 | thread_data* thrd, thread_state_enum state, char const* info) | |
| 47 | { | |
| 48 | // log this in any case | |
| 49 | LTM_(warning) << "tfunc(" << num_thread << "): " //-V128 | |
| 50 | << "thread(" << thrd->get_thread_id().get() << "), " | |
| 51 | << "description(" << thrd->get_description() << "), " | |
| 52 | << "new state(" << get_thread_state_name(state) << "), " | |
| 53 | << info; | |
| 54 | } | |
| 55 | inline void write_old_state_log(std::size_t num_thread, | |
| 56 | thread_data* thrd, thread_state_enum state) | |
| 57 | { | |
| 58 | LTM_(debug) << "tfunc(" << num_thread << "): " //-V128 | |
| 59 | << "thread(" << thrd->get_thread_id().get() << "), " | |
| 60 | << "description(" << thrd->get_description() << "), " | |
| 61 | << "old state(" << get_thread_state_name(state) << ")"; | |
| 62 | } | |
| 63 | ||
| 64 | /////////////////////////////////////////////////////////////////////// | |
| 65 | // helper class for switching thread state in and out during execution | |
| 66 | class switch_status | |
| 67 | { | |
| 68 | public: | |
| 69 | switch_status (thread_data* t, thread_state prev_state) | |
| 70 | : thread_(t), prev_state_(prev_state), | |
| 71 | next_thread_id_(nullptr), | |
| 72 | need_restore_state_(t->set_state_tagged(active, prev_state_, orig_state_)) | |
| 73 | {} | |
| 74 | ||
| 75 | ~switch_status () | |
| 76 | { | |
| 77 | if (need_restore_state_) | |
| 78 | store_state(prev_state_); | |
| 79 | } | |
| 80 | ||
| 81 | bool is_valid() const { return need_restore_state_; } | |
| 82 | ||
| 83 | // allow to change the state the thread will be switched to after | |
| 84 | // execution | |
| 85 | thread_state operator=(thread_result_type && new_state) | |
| 86 | { | |
| 87 | prev_state_ = thread_state(new_state.first, | |
| 88 | prev_state_.state_ex(), prev_state_.tag() + 1); | |
| 89 | next_thread_id_ = std::move(new_state.second); | |
| 90 | return prev_state_; | |
| 91 | } | |
| 92 | ||
| 93 | // Get the state this thread was in before execution (usually pending), | |
| 94 | // this helps making sure no other worker-thread is started to execute this | |
| 95 | // HPX-thread in the meantime. | |
| 96 | thread_state_enum get_previous() const | |
| 97 | { | |
| 98 | return prev_state_.state(); | |
| 99 | } | |
| 100 | ||
| 101 | // This restores the previous state, while making sure that the | |
| 102 | // original state has not been changed since we started executing this | |
| 103 | // thread. The function returns true if the state has been set, false | |
| 104 | // otherwise. | |
| 105 | bool store_state(thread_state& newstate) | |
| 106 | { | |
| 107 | disable_restore(); | |
| 108 | if (thread_->restore_state(prev_state_, orig_state_)) { | |
| 109 | newstate = prev_state_; | |
| 110 | return true; | |
| 111 | } | |
| 112 | return false; | |
| 113 | } | |
| 114 | ||
| 115 | // disable default handling in destructor | |
| 116 | void disable_restore() { need_restore_state_ = false; } | |
| 117 | ||
| 118 | thread_data* get_next_thread() const | |
| 119 | { | |
| 120 | // we know that the thread-id is just the pointer to the thread_data | |
| 121 | return reinterpret_cast<thread_data*>(next_thread_id_.get()); | |
| 122 | } | |
| 123 | ||
| 124 | private: | |
| 125 | thread_data* thread_; | |
| 126 | thread_state prev_state_; | |
| 127 | thread_state orig_state_; | |
| 128 | thread_id_type next_thread_id_; | |
| 129 | bool need_restore_state_; | |
| 130 | }; | |
| 131 | ||
| 132 | #ifdef HPX_HAVE_THREAD_IDLE_RATES | |
| 133 | struct idle_collect_rate | |
| 134 | { | |
| 135 | idle_collect_rate(std::uint64_t& tfunc_time, std::uint64_t& exec_time) | |
| 136 | : start_timestamp_(util::hardware::timestamp()) | |
| 137 | , tfunc_time_(tfunc_time) | |
| 138 | , exec_time_(exec_time) | |
| 139 | {} | |
| 140 | ||
| 141 | void collect_exec_time(std::uint64_t timestamp) | |
| 142 | { | |
| 143 | exec_time_ += util::hardware::timestamp() - timestamp; | |
| 144 | } | |
| 145 | void take_snapshot() | |
| 146 | { | |
| 147 | if (tfunc_time_ == std::uint64_t(-1)) | |
| 148 | { | |
| 149 | start_timestamp_ = util::hardware::timestamp(); | |
| 150 | tfunc_time_ = 0; | |
| 151 | exec_time_ = 0; | |
| 152 | } | |
| 153 | else | |
| 154 | { | |
| 155 | tfunc_time_ = util::hardware::timestamp() - start_timestamp_; | |
| 156 | } | |
| 157 | } | |
| 158 | ||
| 159 | std::uint64_t start_timestamp_; | |
| 160 | ||
| 161 | std::uint64_t& tfunc_time_; | |
| 162 | std::uint64_t& exec_time_; | |
| 163 | }; | |
| 164 | ||
| 165 | struct exec_time_wrapper | |
| 166 | { | |
| 167 | exec_time_wrapper(idle_collect_rate& idle_rate) | |
| 168 | : timestamp_(util::hardware::timestamp()) | |
| 169 | , idle_rate_(idle_rate) | |
| 170 | {} | |
| 171 | ~exec_time_wrapper() | |
| 172 | { | |
| 173 | idle_rate_.collect_exec_time(timestamp_); | |
| 174 | } | |
| 175 | ||
| 176 | std::uint64_t timestamp_; | |
| 177 | idle_collect_rate& idle_rate_; | |
| 178 | }; | |
| 179 | ||
| 180 | struct tfunc_time_wrapper | |
| 181 | { | |
| 182 | tfunc_time_wrapper(idle_collect_rate& idle_rate) | |
| 183 | : idle_rate_(idle_rate) | |
| 184 | { | |
| 185 | } | |
| 186 | ~tfunc_time_wrapper() | |
| 187 | { | |
| 188 | idle_rate_.take_snapshot(); | |
| 189 | } | |
| 190 | ||
| 191 | idle_collect_rate& idle_rate_; | |
| 192 | }; | |
| 193 | #else | |
| 194 | struct idle_collect_rate | |
| 195 | { | |
| 196 | idle_collect_rate(std::uint64_t&, std::uint64_t&) {} | |
| 197 | }; | |
| 198 | ||
| 199 | struct exec_time_wrapper | |
| 200 | { | |
| 201 | exec_time_wrapper(idle_collect_rate&) {} | |
| 202 | }; | |
| 203 | ||
| 204 | struct tfunc_time_wrapper | |
| 205 | { | |
| 206 | tfunc_time_wrapper(idle_collect_rate&) {} | |
| 207 | }; | |
| 208 | #endif | |
| 209 | ||
| 210 | /////////////////////////////////////////////////////////////////////////// | |
| 211 | struct scheduling_counters | |
| 212 | { | |
| 213 | scheduling_counters(std::int64_t& executed_threads, | |
| 214 | std::int64_t& executed_thread_phases, | |
| 215 | std::uint64_t& tfunc_time, std::uint64_t& exec_time) | |
| 216 | : executed_threads_(executed_threads), | |
| 217 | executed_thread_phases_(executed_thread_phases), | |
| 218 | tfunc_time_(tfunc_time), | |
| 219 | exec_time_(exec_time) | |
| 220 | {} | |
| 221 | ||
| 222 | std::int64_t& executed_threads_; | |
| 223 | std::int64_t& executed_thread_phases_; | |
| 224 | std::uint64_t& tfunc_time_; | |
| 225 | std::uint64_t& exec_time_; | |
| 226 | }; | |
| 227 | ||
| 228 | struct scheduling_callbacks | |
| 229 | { | |
| 230 | typedef util::function_nonser<void()> callback_type; | |
| 231 | typedef util::function_nonser<bool()> background_callback_type; | |
| 232 | ||
| 233 | explicit scheduling_callbacks( | |
| 234 | callback_type && outer, | |
| 235 | callback_type && inner = callback_type(), | |
| 236 | background_callback_type && background = | |
| 237 | background_callback_type(), | |
| 238 | std::size_t max_background_threads = | |
| 239 | hpx::util::safe_lexical_cast<std::size_t>( | |
| 240 | hpx::get_config_entry("hpx.max_background_threads", | |
| 241 | (std::numeric_limits<std::size_t>::max)()))) | |
| 242 | : outer_(std::move(outer)), | |
| 243 | inner_(std::move(inner)), | |
| 244 | background_(std::move(background)), | |
| 245 | max_background_threads_(max_background_threads) | |
| 246 | {} | |
| 247 | ||
| 248 | callback_type outer_; | |
| 249 | callback_type inner_; | |
| 250 | background_callback_type background_; | |
| 251 | std::size_t max_background_threads_; | |
| 252 | }; | |
| 253 | ||
| 254 | template <typename SchedulingPolicy> | |
| 255 | void scheduling_loop(std::size_t num_thread, SchedulingPolicy& scheduler, | |
| 256 | scheduling_counters& counters, scheduling_callbacks& callbacks) | |
| 257 | { | |
| 258 | boost::atomic<hpx::state>& this_state = scheduler.get_state(num_thread); | |
| 259 | ||
| 260 | util::itt::stack_context ctx; // helper for itt support | |
| 261 | util::itt::domain domain(get_thread_name().data()); | |
| 262 | // util::itt::id threadid(domain, this); | |
| 263 | util::itt::frame_context fctx(domain); | |
| 264 | ||
| 265 | std::int64_t idle_loop_count = 0; | |
| 266 | std::int64_t busy_loop_count = 0; | |
| 267 | ||
| 268 | idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_); | |
| 269 | tfunc_time_wrapper tfunc_time_collector(idle_rate); | |
| 270 | ||
| 271 | scheduler.SchedulingPolicy::start_periodic_maintenance(this_state); | |
| 272 | ||
| 273 | // spin for some time after queues have become empty | |
| 274 | bool may_exit = false; | |
| 275 | thread_data* thrd = nullptr; | |
| 276 | thread_data* next_thrd = nullptr; | |
| 277 | ||
| 278 | while (true) { | |
| 279 | // Get the next HPX thread from the queue | |
| 280 | thrd = next_thrd; | |
| 281 | if (HPX_LIKELY(thrd || | |
| 282 | scheduler.SchedulingPolicy::get_next_thread(num_thread, | |
| 283 | idle_loop_count, thrd))) | |
| 284 | { | |
| 285 | tfunc_time_wrapper tfunc_time_collector(idle_rate); | |
| 286 | ||
| 287 | idle_loop_count = 0; | |
| 288 | ++busy_loop_count; | |
| 289 | ||
| 290 | may_exit = false; | |
| 291 | ||
| 292 | // Only pending HPX threads will be executed. | |
| 293 | // Any non-pending HPX threads are leftovers from a set_state() | |
| 294 | // call for a previously pending HPX thread (see comments above). | |
| 295 | thread_state state = thrd->get_state(); | |
| 296 | thread_state_enum state_val = state.state(); | |
| 297 | ||
| 298 | detail::write_old_state_log(num_thread, thrd, state_val); | |
| 299 | ||
| 300 | if (HPX_LIKELY(pending == state_val)) { | |
| 301 | // switch the state of the thread to active and back to | |
| 302 | // what the thread reports as its return value | |
| 303 | ||
| 304 | { | |
| 305 | // tries to set state to active (only if state is still | |
| 306 | // the same as 'state') | |
| 307 | detail::switch_status thrd_stat (thrd, state); | |
| 308 | if (HPX_LIKELY(thrd_stat.is_valid() && | |
| 309 | thrd_stat.get_previous() == pending)) | |
| 310 | { | |
| 311 | tfunc_time_wrapper tfunc_time_collector(idle_rate); | |
| 312 | ||
| 313 | // thread returns new required state | |
| 314 | // store the returned state in the thread | |
| 315 | { | |
| 316 | #ifdef HPX_HAVE_ITTNOTIFY | |
| 317 | util::itt::caller_context cctx(ctx); | |
| 318 | util::itt::undo_frame_context undoframe(fctx); | |
| 319 | util::itt::task task(domain, thrd->get_description()); | |
| 320 | #endif | |
| 321 | // Record time elapsed in thread changing state | |
| 322 | // and add to aggregate execution time. | |
| 323 | exec_time_wrapper exec_time_collector(idle_rate); | |
| 324 | ||
| 325 | #if defined(HPX_HAVE_APEX) | |
| 326 | util::apex_wrapper apex_profiler( | |
| 327 | thrd->get_description()); | |
| 328 | ||
| 329 | thrd_stat = (*thrd)(); | |
| 330 | ||
| 331 | if (thrd_stat.get_previous() == terminated) | |
| 332 | { | |
| 333 | apex_profiler.stop(); | |
| 334 | } | |
| 335 | else | |
| 336 | { | |
| 337 | apex_profiler.yield(); | |
| 338 | } | |
| 339 | #else | |
| 340 | thrd_stat = (*thrd)(); | |
| 341 | #endif | |
| 342 | } | |
| 343 | ||
| 344 | #ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS | |
| 345 | ++counters.executed_thread_phases_; | |
| 346 | #endif | |
| 347 | } | |
| 348 | else { | |
| 349 | // some other worker-thread got in between and started | |
| 350 | // executing this HPX-thread, we just continue with | |
| 351 | // the next one | |
| 352 | thrd_stat.disable_restore(); | |
| 353 | detail::write_new_state_log_warning( | |
| 354 | num_thread, thrd, state_val, "no execution"); | |
| 355 | continue; | |
| 356 | } | |
| 357 | ||
| 358 | // store and retrieve the new state in the thread | |
| 359 | if (HPX_UNLIKELY(!thrd_stat.store_state(state))) { | |
| 360 | // some other worker-thread got in between and changed | |
| 361 | // the state of this thread, we just continue with | |
| 362 | // the next one | |
| 363 | detail::write_new_state_log_warning( | |
| 364 | num_thread, thrd, state_val, "no state change"); | |
| 365 | continue; | |
| 366 | } | |
| 367 | ||
| 368 | state_val = state.state(); | |
| 369 | ||
| 370 | // any exception thrown from the thread will reset its | |
| 371 | // state at this point | |
| 372 | ||
| 373 | // handle next thread id if given (switch directly to | |
| 374 | // this thread) | |
| 375 | next_thrd = thrd_stat.get_next_thread(); | |
| 376 | } | |
| 377 | ||
| 378 | //detail::write_new_state_log_debug(num_thread, thrd, | |
| 379 | // state_val, "normal"); | |
| 380 | ||
| 381 | // Re-add this work item to our list of work items if the HPX | |
| 382 | // thread should be re-scheduled. If the HPX thread is suspended | |
| 383 | // now we just keep it in the map of threads. | |
| 384 | if (HPX_UNLIKELY(state_val == pending)) { | |
| 385 | if (HPX_LIKELY(next_thrd == nullptr)) { | |
| 386 | // schedule other work | |
| 387 | scheduler.SchedulingPolicy::wait_or_add_new( | |
| 388 | num_thread, is_running_state(this_state.load()), | |
| 389 | idle_loop_count); | |
| 390 | } | |
| 391 | ||
| 392 | // schedule this thread again, make sure it ends up at | |
| 393 | // the end of the queue | |
| 394 | scheduler.SchedulingPolicy::schedule_thread_last(thrd, | |
| 395 | num_thread); | |
| 396 | scheduler.SchedulingPolicy::do_some_work(num_thread); | |
| 397 | } | |
| 398 | } | |
| 399 | else if (HPX_UNLIKELY(active == state_val)) { | |
| 400 | LTM_(warning) << "tfunc(" << num_thread << "): " //-V128 | |
| 401 | "thread(" << thrd->get_thread_id().get() << "), " | |
| 402 | "description(" << thrd->get_description() << "), " | |
| 403 | "rescheduling"; | |
| 404 | ||
| 405 | // re-schedule thread, if it is still marked as active | |
| 406 | // this might happen, if some thread has been added to the | |
| 407 | // scheduler queue already but the state has not been reset | |
| 408 | // yet | |
| 409 | // | |
| 410 | // REVIEW: Passing a specific target thread may set off | |
| 411 | // the round robin queuing. | |
| 412 | scheduler.SchedulingPolicy::schedule_thread(thrd, num_thread); | |
| 413 | } | |
| 414 | ||
| 415 | // Remove the mapping from thread_map_ if HPX thread is depleted | |
| 416 | // or terminated, this will delete the HPX thread as all | |
| 417 | // references go out of scope. | |
| 418 | // REVIEW: what has to be done with depleted HPX threads? | |
| 419 | if (HPX_LIKELY(state_val == depleted || state_val == terminated)) | |
| 420 | { | |
| 421 | #ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS | |
| 422 | ++counters.executed_threads_; | |
| 423 | #endif | |
| 424 | scheduler.SchedulingPolicy::destroy_thread(thrd, busy_loop_count); | |
| 425 | } | |
| 426 | } | |
| 427 | ||
| 428 | // if nothing else has to be done either wait or terminate | |
| 429 | else { | |
| 430 | ++idle_loop_count; | |
| 431 | ||
| 432 | if (scheduler.SchedulingPolicy::wait_or_add_new(num_thread, | |
| 433 | is_running_state(this_state.load()), idle_loop_count)) | |
| 434 | { | |
| 435 | // clean up terminated threads one more time before existing | |
| 436 | if (scheduler.SchedulingPolicy::cleanup_terminated(true)) | |
| 437 | { | |
| 438 | // if this is an inner scheduler, exit immediately | |
| 439 | if (!(scheduler.get_scheduler_mode() & policies::delay_exit)) | |
| 440 | { | |
| 441 | this_state.store(state_stopped); | |
| 442 | break; | |
| 443 | } | |
| 444 | ||
| 445 | // otherwise, keep idling for some time | |
| 446 | if (!may_exit) | |
| 447 | idle_loop_count = 0; | |
| 448 | may_exit = true; | |
| 449 | } | |
| 450 | } | |
| 451 | ||
| 452 | // do background work in parcel layer and in agas | |
| 453 | if ((scheduler.get_scheduler_mode() & policies::do_background_work) && | |
| 454 | num_thread < callbacks.max_background_threads_ && | |
| 455 | !callbacks.background_.empty()) | |
| 456 | { | |
| 457 | if (callbacks.background_()) | |
| 458 | idle_loop_count = 0; | |
| 459 | } | |
| 460 | ||
| 461 | // call back into invoking context | |
| 462 | if (!callbacks.inner_.empty()) | |
| 463 | callbacks.inner_(); | |
| 464 | } | |
| 465 | ||
| 466 | // something went badly wrong, give up | |
| 467 | if (HPX_UNLIKELY(this_state.load() == state_terminating)) | |
| 468 | break; | |
| 469 | ||
| 470 | if (busy_loop_count > HPX_BUSY_LOOP_COUNT_MAX) | |
| 471 | { | |
| 472 | busy_loop_count = 0; | |
| 473 | ||
| 474 | // do background work in parcel layer and in agas | |
| 475 | if ((scheduler.get_scheduler_mode() & policies::do_background_work) && | |
| 476 | num_thread < callbacks.max_background_threads_ && | |
| 477 | !callbacks.background_.empty()) | |
| 478 | { | |
| 479 | if (callbacks.background_()) | |
| 480 | idle_loop_count = 0; | |
| 481 | } | |
| 482 | } | |
| 483 | else if ((scheduler.get_scheduler_mode() & policies::fast_idle_mode) || | |
| 484 | idle_loop_count > HPX_IDLE_LOOP_COUNT_MAX) | |
| 485 | { | |
| 486 | // clean up terminated threads | |
| 487 | if (idle_loop_count > HPX_IDLE_LOOP_COUNT_MAX) | |
| 488 | idle_loop_count = 0; | |
| 489 | ||
| 490 | // call back into invoking context | |
| 491 | if (!callbacks.outer_.empty()) | |
| 492 | callbacks.outer_(); | |
| 493 | ||
| 494 | // break if we were idling after 'may_exit' | |
| 495 | if (may_exit) | |
| 496 | { | |
| 497 | if (scheduler.SchedulingPolicy::cleanup_terminated(true)) | |
| 498 | { | |
| 499 | this_state.store(state_stopped); | |
| 500 | break; | |
| 501 | } | |
| 502 | may_exit = false; | |
| 503 | } | |
| 504 | else | |
| 505 | { | |
| 506 | scheduler.SchedulingPolicy::cleanup_terminated(true); | |
| 507 | } | |
| 508 | } | |
| 509 | } | |
| 510 | } | |
| 511 | }}} | |
| 512 | ||
| 513 | #endif | |
| 514 | ||
| 515 | ||
| 516 |
Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.