Line | % of fetches | Source |
---|---|---|
1 | // Copyright (c) 2007-2016 Hartmut Kaiser | |
2 | // | |
3 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
4 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
5 | ||
6 | #if !defined(HPX_RUNTIME_THREADS_DETAIL_SCHEDULING_LOOP_JAN_11_2013_0838PM) | |
7 | #define HPX_RUNTIME_THREADS_DETAIL_SCHEDULING_LOOP_JAN_11_2013_0838PM | |
8 | ||
9 | #include <hpx/config.hpp> | |
10 | #include <hpx/runtime/agas/interface.hpp> | |
11 | #include <hpx/runtime/config_entry.hpp> | |
12 | #include <hpx/runtime/get_thread_name.hpp> | |
13 | #include <hpx/runtime/threads/detail/periodic_maintenance.hpp> | |
14 | #include <hpx/runtime/threads/thread_data.hpp> | |
15 | #include <hpx/state.hpp> | |
16 | #include <hpx/util/assert.hpp> | |
17 | #include <hpx/util/function.hpp> | |
18 | #include <hpx/util/hardware/timestamp.hpp> | |
19 | #include <hpx/util/itt_notify.hpp> | |
20 | #include <hpx/util/safe_lexical_cast.hpp> | |
21 | ||
22 | #include <boost/atomic.hpp> | |
23 | ||
24 | #if defined(HPX_HAVE_APEX) | |
25 | #include <hpx/util/apex.hpp> | |
26 | #endif | |
27 | ||
28 | #include <cstddef> | |
29 | #include <cstdint> | |
30 | #include <limits> | |
31 | #include <utility> | |
32 | ||
33 | namespace hpx { namespace threads { namespace detail | |
34 | { | |
35 | /////////////////////////////////////////////////////////////////////// | |
36 | inline void write_new_state_log_debug(std::size_t num_thread, | |
37 | thread_data* thrd, thread_state_enum state, char const* info) | |
38 | { | |
39 | LTM_(debug) << "tfunc(" << num_thread << "): " //-V128 | |
40 | << "thread(" << thrd->get_thread_id().get() << "), " | |
41 | << "description(" << thrd->get_description() << "), " | |
42 | << "new state(" << get_thread_state_name(state) << "), " | |
43 | << info; | |
44 | } | |
45 | inline void write_new_state_log_warning(std::size_t num_thread, | |
46 | thread_data* thrd, thread_state_enum state, char const* info) | |
47 | { | |
48 | // log this in any case | |
49 | LTM_(warning) << "tfunc(" << num_thread << "): " //-V128 | |
50 | << "thread(" << thrd->get_thread_id().get() << "), " | |
51 | << "description(" << thrd->get_description() << "), " | |
52 | << "new state(" << get_thread_state_name(state) << "), " | |
53 | << info; | |
54 | } | |
55 | inline void write_old_state_log(std::size_t num_thread, | |
56 | thread_data* thrd, thread_state_enum state) | |
57 | { | |
58 | LTM_(debug) << "tfunc(" << num_thread << "): " //-V128 | |
59 | << "thread(" << thrd->get_thread_id().get() << "), " | |
60 | << "description(" << thrd->get_description() << "), " | |
61 | << "old state(" << get_thread_state_name(state) << ")"; | |
62 | } | |
63 | ||
64 | /////////////////////////////////////////////////////////////////////// | |
65 | // helper class for switching thread state in and out during execution | |
66 | class switch_status | |
67 | { | |
68 | public: | |
69 | switch_status (thread_data* t, thread_state prev_state) | |
70 | : thread_(t), prev_state_(prev_state), | |
71 | next_thread_id_(nullptr), | |
72 | need_restore_state_(t->set_state_tagged(active, prev_state_, orig_state_)) | |
73 | {} | |
74 | ||
75 | ~switch_status () | |
76 | { | |
77 | if (need_restore_state_) | |
78 | store_state(prev_state_); | |
79 | } | |
80 | ||
81 | bool is_valid() const { return need_restore_state_; } | |
82 | ||
83 | // allow to change the state the thread will be switched to after | |
84 | // execution | |
85 | thread_state operator=(thread_result_type && new_state) | |
86 | { | |
87 | prev_state_ = thread_state(new_state.first, | |
88 | prev_state_.state_ex(), prev_state_.tag() + 1); | |
89 | next_thread_id_ = std::move(new_state.second); | |
90 | return prev_state_; | |
91 | } | |
92 | ||
93 | // Get the state this thread was in before execution (usually pending), | |
94 | // this helps making sure no other worker-thread is started to execute this | |
95 | // HPX-thread in the meantime. | |
96 | thread_state_enum get_previous() const | |
97 | { | |
98 | return prev_state_.state(); | |
99 | } | |
100 | ||
101 | // This restores the previous state, while making sure that the | |
102 | // original state has not been changed since we started executing this | |
103 | // thread. The function returns true if the state has been set, false | |
104 | // otherwise. | |
105 | bool store_state(thread_state& newstate) | |
106 | { | |
107 | disable_restore(); | |
108 | if (thread_->restore_state(prev_state_, orig_state_)) { | |
109 | newstate = prev_state_; | |
110 | return true; | |
111 | } | |
112 | return false; | |
113 | } | |
114 | ||
115 | // disable default handling in destructor | |
116 | void disable_restore() { need_restore_state_ = false; } | |
117 | ||
118 | thread_data* get_next_thread() const | |
119 | { | |
120 | // we know that the thread-id is just the pointer to the thread_data | |
121 | return reinterpret_cast<thread_data*>(next_thread_id_.get()); | |
122 | } | |
123 | ||
124 | private: | |
125 | thread_data* thread_; | |
126 | thread_state prev_state_; | |
127 | thread_state orig_state_; | |
128 | thread_id_type next_thread_id_; | |
129 | bool need_restore_state_; | |
130 | }; | |
131 | ||
132 | #ifdef HPX_HAVE_THREAD_IDLE_RATES | |
133 | struct idle_collect_rate | |
134 | { | |
135 | idle_collect_rate(std::uint64_t& tfunc_time, std::uint64_t& exec_time) | |
136 | : start_timestamp_(util::hardware::timestamp()) | |
137 | , tfunc_time_(tfunc_time) | |
138 | , exec_time_(exec_time) | |
139 | {} | |
140 | ||
141 | void collect_exec_time(std::uint64_t timestamp) | |
142 | { | |
143 | exec_time_ += util::hardware::timestamp() - timestamp; | |
144 | } | |
145 | void take_snapshot() | |
146 | { | |
147 | if (tfunc_time_ == std::uint64_t(-1)) | |
148 | { | |
149 | start_timestamp_ = util::hardware::timestamp(); | |
150 | tfunc_time_ = 0; | |
151 | exec_time_ = 0; | |
152 | } | |
153 | else | |
154 | { | |
155 | tfunc_time_ = util::hardware::timestamp() - start_timestamp_; | |
156 | } | |
157 | } | |
158 | ||
159 | std::uint64_t start_timestamp_; | |
160 | ||
161 | std::uint64_t& tfunc_time_; | |
162 | std::uint64_t& exec_time_; | |
163 | }; | |
164 | ||
165 | struct exec_time_wrapper | |
166 | { | |
167 | exec_time_wrapper(idle_collect_rate& idle_rate) | |
168 | : timestamp_(util::hardware::timestamp()) | |
169 | , idle_rate_(idle_rate) | |
170 | {} | |
171 | ~exec_time_wrapper() | |
172 | { | |
173 | idle_rate_.collect_exec_time(timestamp_); | |
174 | } | |
175 | ||
176 | std::uint64_t timestamp_; | |
177 | idle_collect_rate& idle_rate_; | |
178 | }; | |
179 | ||
180 | struct tfunc_time_wrapper | |
181 | { | |
182 | tfunc_time_wrapper(idle_collect_rate& idle_rate) | |
183 | : idle_rate_(idle_rate) | |
184 | { | |
185 | } | |
186 | ~tfunc_time_wrapper() | |
187 | { | |
188 | idle_rate_.take_snapshot(); | |
189 | } | |
190 | ||
191 | idle_collect_rate& idle_rate_; | |
192 | }; | |
193 | #else | |
194 | struct idle_collect_rate | |
195 | { | |
196 | idle_collect_rate(std::uint64_t&, std::uint64_t&) {} | |
197 | }; | |
198 | ||
199 | struct exec_time_wrapper | |
200 | { | |
201 | exec_time_wrapper(idle_collect_rate&) {} | |
202 | }; | |
203 | ||
204 | struct tfunc_time_wrapper | |
205 | { | |
206 | tfunc_time_wrapper(idle_collect_rate&) {} | |
207 | }; | |
208 | #endif | |
209 | ||
210 | /////////////////////////////////////////////////////////////////////////// | |
211 | struct scheduling_counters | |
212 | { | |
213 | scheduling_counters(std::int64_t& executed_threads, | |
214 | std::int64_t& executed_thread_phases, | |
215 | std::uint64_t& tfunc_time, std::uint64_t& exec_time) | |
216 | : executed_threads_(executed_threads), | |
217 | executed_thread_phases_(executed_thread_phases), | |
218 | tfunc_time_(tfunc_time), | |
219 | exec_time_(exec_time) | |
220 | {} | |
221 | ||
222 | std::int64_t& executed_threads_; | |
223 | std::int64_t& executed_thread_phases_; | |
224 | std::uint64_t& tfunc_time_; | |
225 | std::uint64_t& exec_time_; | |
226 | }; | |
227 | ||
228 | struct scheduling_callbacks | |
229 | { | |
230 | typedef util::function_nonser<void()> callback_type; | |
231 | typedef util::function_nonser<bool()> background_callback_type; | |
232 | ||
233 | explicit scheduling_callbacks( | |
234 | callback_type && outer, | |
235 | callback_type && inner = callback_type(), | |
236 | background_callback_type && background = | |
237 | background_callback_type(), | |
238 | std::size_t max_background_threads = | |
239 | hpx::util::safe_lexical_cast<std::size_t>( | |
240 | hpx::get_config_entry("hpx.max_background_threads", | |
241 | (std::numeric_limits<std::size_t>::max)()))) | |
242 | : outer_(std::move(outer)), | |
243 | inner_(std::move(inner)), | |
244 | background_(std::move(background)), | |
245 | max_background_threads_(max_background_threads) | |
246 | {} | |
247 | ||
248 | callback_type outer_; | |
249 | callback_type inner_; | |
250 | background_callback_type background_; | |
251 | std::size_t max_background_threads_; | |
252 | }; | |
253 | ||
254 | template <typename SchedulingPolicy> | |
255 | void scheduling_loop(std::size_t num_thread, SchedulingPolicy& scheduler, | |
256 | scheduling_counters& counters, scheduling_callbacks& callbacks) | |
257 | { | |
258 | boost::atomic<hpx::state>& this_state = scheduler.get_state(num_thread); | |
259 | ||
260 | util::itt::stack_context ctx; // helper for itt support | |
261 | util::itt::domain domain(get_thread_name().data()); | |
262 | // util::itt::id threadid(domain, this); | |
263 | util::itt::frame_context fctx(domain); | |
264 | ||
265 | std::int64_t idle_loop_count = 0; | |
266 | std::int64_t busy_loop_count = 0; | |
267 | ||
268 | idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_); | |
269 | tfunc_time_wrapper tfunc_time_collector(idle_rate); | |
270 | ||
271 | scheduler.SchedulingPolicy::start_periodic_maintenance(this_state); | |
272 | ||
273 | // spin for some time after queues have become empty | |
274 | bool may_exit = false; | |
275 | thread_data* thrd = nullptr; | |
276 | thread_data* next_thrd = nullptr; | |
277 | ||
278 | while (true) { | |
279 | // Get the next HPX thread from the queue | |
280 | thrd = next_thrd; | |
281 | if (HPX_LIKELY(thrd || | |
282 | scheduler.SchedulingPolicy::get_next_thread(num_thread, | |
283 | idle_loop_count, thrd))) | |
284 | { | |
285 | tfunc_time_wrapper tfunc_time_collector(idle_rate); | |
286 | ||
287 | idle_loop_count = 0; | |
288 | ++busy_loop_count; | |
289 | ||
290 | may_exit = false; | |
291 | ||
292 | // Only pending HPX threads will be executed. | |
293 | // Any non-pending HPX threads are leftovers from a set_state() | |
294 | // call for a previously pending HPX thread (see comments above). | |
295 | thread_state state = thrd->get_state(); | |
296 | thread_state_enum state_val = state.state(); | |
297 | ||
298 | detail::write_old_state_log(num_thread, thrd, state_val); | |
299 | ||
300 | if (HPX_LIKELY(pending == state_val)) { | |
301 | // switch the state of the thread to active and back to | |
302 | // what the thread reports as its return value | |
303 | ||
304 | { | |
305 | // tries to set state to active (only if state is still | |
306 | // the same as 'state') | |
307 | detail::switch_status thrd_stat (thrd, state); | |
308 | if (HPX_LIKELY(thrd_stat.is_valid() && | |
309 | thrd_stat.get_previous() == pending)) | |
310 | { | |
311 | tfunc_time_wrapper tfunc_time_collector(idle_rate); | |
312 | ||
313 | // thread returns new required state | |
314 | // store the returned state in the thread | |
315 | { | |
316 | #ifdef HPX_HAVE_ITTNOTIFY | |
317 | util::itt::caller_context cctx(ctx); | |
318 | util::itt::undo_frame_context undoframe(fctx); | |
319 | util::itt::task task(domain, thrd->get_description()); | |
320 | #endif | |
321 | // Record time elapsed in thread changing state | |
322 | // and add to aggregate execution time. | |
323 | exec_time_wrapper exec_time_collector(idle_rate); | |
324 | ||
325 | #if defined(HPX_HAVE_APEX) | |
326 | util::apex_wrapper apex_profiler( | |
327 | thrd->get_description()); | |
328 | ||
329 | thrd_stat = (*thrd)(); | |
330 | ||
331 | if (thrd_stat.get_previous() == terminated) | |
332 | { | |
333 | apex_profiler.stop(); | |
334 | } | |
335 | else | |
336 | { | |
337 | apex_profiler.yield(); | |
338 | } | |
339 | #else | |
340 | thrd_stat = (*thrd)(); | |
341 | #endif | |
342 | } | |
343 | ||
344 | #ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS | |
345 | ++counters.executed_thread_phases_; | |
346 | #endif | |
347 | } | |
348 | else { | |
349 | // some other worker-thread got in between and started | |
350 | // executing this HPX-thread, we just continue with | |
351 | // the next one | |
352 | thrd_stat.disable_restore(); | |
353 | detail::write_new_state_log_warning( | |
354 | num_thread, thrd, state_val, "no execution"); | |
355 | continue; | |
356 | } | |
357 | ||
358 | // store and retrieve the new state in the thread | |
359 | if (HPX_UNLIKELY(!thrd_stat.store_state(state))) { | |
360 | // some other worker-thread got in between and changed | |
361 | // the state of this thread, we just continue with | |
362 | // the next one | |
363 | detail::write_new_state_log_warning( | |
364 | num_thread, thrd, state_val, "no state change"); | |
365 | continue; | |
366 | } | |
367 | ||
368 | state_val = state.state(); | |
369 | ||
370 | // any exception thrown from the thread will reset its | |
371 | // state at this point | |
372 | ||
373 | // handle next thread id if given (switch directly to | |
374 | // this thread) | |
375 | next_thrd = thrd_stat.get_next_thread(); | |
376 | } | |
377 | ||
378 | //detail::write_new_state_log_debug(num_thread, thrd, | |
379 | // state_val, "normal"); | |
380 | ||
381 | // Re-add this work item to our list of work items if the HPX | |
382 | // thread should be re-scheduled. If the HPX thread is suspended | |
383 | // now we just keep it in the map of threads. | |
384 | if (HPX_UNLIKELY(state_val == pending)) { | |
385 | if (HPX_LIKELY(next_thrd == nullptr)) { | |
386 | // schedule other work | |
387 | scheduler.SchedulingPolicy::wait_or_add_new( | |
388 | num_thread, is_running_state(this_state.load()), | |
389 | idle_loop_count); | |
390 | } | |
391 | ||
392 | // schedule this thread again, make sure it ends up at | |
393 | // the end of the queue | |
394 | scheduler.SchedulingPolicy::schedule_thread_last(thrd, | |
395 | num_thread); | |
396 | scheduler.SchedulingPolicy::do_some_work(num_thread); | |
397 | } | |
398 | } | |
399 | else if (HPX_UNLIKELY(active == state_val)) { | |
400 | LTM_(warning) << "tfunc(" << num_thread << "): " //-V128 | |
401 | "thread(" << thrd->get_thread_id().get() << "), " | |
402 | "description(" << thrd->get_description() << "), " | |
403 | "rescheduling"; | |
404 | ||
405 | // re-schedule thread, if it is still marked as active | |
406 | // this might happen, if some thread has been added to the | |
407 | // scheduler queue already but the state has not been reset | |
408 | // yet | |
409 | // | |
410 | // REVIEW: Passing a specific target thread may set off | |
411 | // the round robin queuing. | |
412 | scheduler.SchedulingPolicy::schedule_thread(thrd, num_thread); | |
413 | } | |
414 | ||
415 | // Remove the mapping from thread_map_ if HPX thread is depleted | |
416 | // or terminated, this will delete the HPX thread as all | |
417 | // references go out of scope. | |
418 | // REVIEW: what has to be done with depleted HPX threads? | |
419 | if (HPX_LIKELY(state_val == depleted || state_val == terminated)) | |
420 | { | |
421 | #ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS | |
422 | ++counters.executed_threads_; | |
423 | #endif | |
424 | scheduler.SchedulingPolicy::destroy_thread(thrd, busy_loop_count); | |
425 | } | |
426 | } | |
427 | ||
428 | // if nothing else has to be done either wait or terminate | |
429 | else { | |
430 | ++idle_loop_count; | |
431 | ||
432 | if (scheduler.SchedulingPolicy::wait_or_add_new(num_thread, | |
433 | is_running_state(this_state.load()), idle_loop_count)) | |
434 | { | |
435 | // clean up terminated threads one more time before existing | |
436 | if (scheduler.SchedulingPolicy::cleanup_terminated(true)) | |
437 | { | |
438 | // if this is an inner scheduler, exit immediately | |
439 | if (!(scheduler.get_scheduler_mode() & policies::delay_exit)) | |
440 | { | |
441 | this_state.store(state_stopped); | |
442 | break; | |
443 | } | |
444 | ||
445 | // otherwise, keep idling for some time | |
446 | if (!may_exit) | |
447 | idle_loop_count = 0; | |
448 | may_exit = true; | |
449 | } | |
450 | } | |
451 | ||
452 | // do background work in parcel layer and in agas | |
453 | if ((scheduler.get_scheduler_mode() & policies::do_background_work) && | |
454 | num_thread < callbacks.max_background_threads_ && | |
455 | !callbacks.background_.empty()) | |
456 | { | |
457 | if (callbacks.background_()) | |
458 | idle_loop_count = 0; | |
459 | } | |
460 | ||
461 | // call back into invoking context | |
462 | if (!callbacks.inner_.empty()) | |
463 | callbacks.inner_(); | |
464 | } | |
465 | ||
466 | // something went badly wrong, give up | |
467 | if (HPX_UNLIKELY(this_state.load() == state_terminating)) | |
468 | break; | |
469 | ||
470 | if (busy_loop_count > HPX_BUSY_LOOP_COUNT_MAX) | |
471 | { | |
472 | busy_loop_count = 0; | |
473 | ||
474 | // do background work in parcel layer and in agas | |
475 | if ((scheduler.get_scheduler_mode() & policies::do_background_work) && | |
476 | num_thread < callbacks.max_background_threads_ && | |
477 | !callbacks.background_.empty()) | |
478 | { | |
479 | if (callbacks.background_()) | |
480 | idle_loop_count = 0; | |
481 | } | |
482 | } | |
483 | else if ((scheduler.get_scheduler_mode() & policies::fast_idle_mode) || | |
484 | idle_loop_count > HPX_IDLE_LOOP_COUNT_MAX) | |
485 | { | |
486 | // clean up terminated threads | |
487 | if (idle_loop_count > HPX_IDLE_LOOP_COUNT_MAX) | |
488 | idle_loop_count = 0; | |
489 | ||
490 | // call back into invoking context | |
491 | if (!callbacks.outer_.empty()) | |
492 | callbacks.outer_(); | |
493 | ||
494 | // break if we were idling after 'may_exit' | |
495 | if (may_exit) | |
496 | { | |
497 | if (scheduler.SchedulingPolicy::cleanup_terminated(true)) | |
498 | { | |
499 | this_state.store(state_stopped); | |
500 | break; | |
501 | } | |
502 | may_exit = false; | |
503 | } | |
504 | else | |
505 | { | |
506 | scheduler.SchedulingPolicy::cleanup_terminated(true); | |
507 | } | |
508 | } | |
509 | } | |
510 | } | |
511 | }}} | |
512 | ||
513 | #endif | |
514 | ||
515 | ||
516 |
Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.