/home/users/khuck/src/hpx-lsu/hpx/runtime/threads/detail/scheduling_loop.hpp

Line% of fetchesSource
1  
//  Copyright (c) 2007-2016 Hartmut Kaiser
2  
//
3  
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
6  
#if !defined(HPX_RUNTIME_THREADS_DETAIL_SCHEDULING_LOOP_JAN_11_2013_0838PM)
7  
#define HPX_RUNTIME_THREADS_DETAIL_SCHEDULING_LOOP_JAN_11_2013_0838PM
8  
9  
#include <hpx/config.hpp>
10  
#include <hpx/runtime/agas/interface.hpp>
11  
#include <hpx/runtime/config_entry.hpp>
12  
#include <hpx/runtime/get_thread_name.hpp>
13  
#include <hpx/runtime/threads/detail/periodic_maintenance.hpp>
14  
#include <hpx/runtime/threads/thread_data.hpp>
15  
#include <hpx/state.hpp>
16  
#include <hpx/util/assert.hpp>
17  
#include <hpx/util/function.hpp>
18  
#include <hpx/util/hardware/timestamp.hpp>
19  
#include <hpx/util/itt_notify.hpp>
20  
#include <hpx/util/safe_lexical_cast.hpp>
21  
22  
#include <boost/atomic.hpp>
23  
24  
#if defined(HPX_HAVE_APEX)
25  
#include <hpx/util/apex.hpp>
26  
#endif
27  
28  
#include <cstddef>
29  
#include <cstdint>
30  
#include <limits>
31  
#include <utility>
32  
33  
namespace hpx { namespace threads { namespace detail
34  
{
35  
    ///////////////////////////////////////////////////////////////////////
36  
    inline void write_new_state_log_debug(std::size_t num_thread,
37  
        thread_data* thrd, thread_state_enum state, char const* info)
38  
    {
39  
        LTM_(debug) << "tfunc(" << num_thread << "): " //-V128
40  
            << "thread(" << thrd->get_thread_id().get() << "), "
41  
            << "description(" << thrd->get_description() << "), "
42  
            << "new state(" << get_thread_state_name(state) << "), "
43  
            << info;
44  
    }
45  
    inline void write_new_state_log_warning(std::size_t num_thread,
46  
        thread_data* thrd, thread_state_enum state, char const* info)
47  
    {
48  
        // log this in any case
49  
        LTM_(warning) << "tfunc(" << num_thread << "): " //-V128
50  
            << "thread(" << thrd->get_thread_id().get() << "), "
51  
            << "description(" << thrd->get_description() << "), "
52  
            << "new state(" << get_thread_state_name(state) << "), "
53  
            << info;
54  
    }
55  
    inline void write_old_state_log(std::size_t num_thread,
56  
        thread_data* thrd, thread_state_enum state)
57  
    {
58  
        LTM_(debug) << "tfunc(" << num_thread << "): " //-V128
59  
                    << "thread(" << thrd->get_thread_id().get() << "), "
60  
                    << "description(" << thrd->get_description() << "), "
61  
                    << "old state(" << get_thread_state_name(state) << ")";
62  
    }
63  
64  
    ///////////////////////////////////////////////////////////////////////
65  
    // helper class for switching thread state in and out during execution
66  
    class switch_status
67  
    {
68  
    public:
69  
        switch_status (thread_data* t, thread_state prev_state)
70  
          : thread_(t), prev_state_(prev_state),
71  
            next_thread_id_(nullptr),
72  
            need_restore_state_(t->set_state_tagged(active, prev_state_, orig_state_))
73  
        {}
74  
75  
        ~switch_status ()
76  
        {
77  
            if (need_restore_state_)
78  
                store_state(prev_state_);
79  
        }
80  
81  
        bool is_valid() const { return need_restore_state_; }
82  
83  
        // allow to change the state the thread will be switched to after
84  
        // execution
85  
        thread_state operator=(thread_result_type && new_state)
86  
        {
87  
            prev_state_ = thread_state(new_state.first,
88  
                prev_state_.state_ex(), prev_state_.tag() + 1);
89  
            next_thread_id_ = std::move(new_state.second);
90  
            return prev_state_;
91  
        }
92  
93  
        // Get the state this thread was in before execution (usually pending),
94  
        // this helps making sure no other worker-thread is started to execute this
95  
        // HPX-thread in the meantime.
96  
        thread_state_enum get_previous() const
97  
        {
98  
            return prev_state_.state();
99  
        }
100  
101  
        // This restores the previous state, while making sure that the
102  
        // original state has not been changed since we started executing this
103  
        // thread. The function returns true if the state has been set, false
104  
        // otherwise.
105  
        bool store_state(thread_state& newstate)
106  
        {
107  
            disable_restore();
108  
            if (thread_->restore_state(prev_state_, orig_state_)) {
109  
                newstate = prev_state_;
110  
                return true;
111  
            }
112  
            return false;
113  
        }
114  
115  
        // disable default handling in destructor
116  
        void disable_restore() { need_restore_state_ = false; }
117  
118  
        thread_data* get_next_thread() const
119  
        {
120  
            // we know that the thread-id is just the pointer to the thread_data
121  
            return reinterpret_cast<thread_data*>(next_thread_id_.get());
122  
        }
123  
124  
    private:
125  
        thread_data* thread_;
126  
        thread_state prev_state_;
127  
        thread_state orig_state_;
128  
        thread_id_type next_thread_id_;
129  
        bool need_restore_state_;
130  
    };
131  
132  
#ifdef HPX_HAVE_THREAD_IDLE_RATES
133  
    struct idle_collect_rate
134  
    {
135  
        idle_collect_rate(std::uint64_t& tfunc_time, std::uint64_t& exec_time)
136  
          : start_timestamp_(util::hardware::timestamp())
137  
          , tfunc_time_(tfunc_time)
138  
          , exec_time_(exec_time)
139  
        {}
140  
141  
        void collect_exec_time(std::uint64_t timestamp)
142  
        {
143  
            exec_time_ += util::hardware::timestamp() - timestamp;
144  
        }
145  
        void take_snapshot()
146  
        {
147  
            if (tfunc_time_ == std::uint64_t(-1))
148  
            {
149  
                start_timestamp_ = util::hardware::timestamp();
150  
                tfunc_time_ = 0;
151  
                exec_time_ = 0;
152  
            }
153  
            else
154  
            {
155  
                tfunc_time_ = util::hardware::timestamp() - start_timestamp_;
156  
            }
157  
        }
158  
159  
        std::uint64_t start_timestamp_;
160  
161  
        std::uint64_t& tfunc_time_;
162  
        std::uint64_t& exec_time_;
163  
    };
164  
165  
    struct exec_time_wrapper
166  
    {
167  
        exec_time_wrapper(idle_collect_rate& idle_rate)
168  
          : timestamp_(util::hardware::timestamp())
169  
          , idle_rate_(idle_rate)
170  
        {}
171  
        ~exec_time_wrapper()
172  
        {
173  
            idle_rate_.collect_exec_time(timestamp_);
174  
        }
175  
176  
        std::uint64_t timestamp_;
177  
        idle_collect_rate& idle_rate_;
178  
    };
179  
180  
    struct tfunc_time_wrapper
181  
    {
182  
        tfunc_time_wrapper(idle_collect_rate& idle_rate)
183  
          : idle_rate_(idle_rate)
184  
        {
185  
        }
186  
        ~tfunc_time_wrapper()
187  
        {
188  
            idle_rate_.take_snapshot();
189  
        }
190  
191  
        idle_collect_rate& idle_rate_;
192  
    };
193  
#else
194  
    struct idle_collect_rate
195  
    {
196  
        idle_collect_rate(std::uint64_t&, std::uint64_t&) {}
197  
    };
198  
199  
    struct exec_time_wrapper
200  
    {
201  
        exec_time_wrapper(idle_collect_rate&) {}
202  
    };
203  
204  
    struct tfunc_time_wrapper
205  
    {
206  
        tfunc_time_wrapper(idle_collect_rate&) {}
207  
    };
208  
#endif
209  
210  
    ///////////////////////////////////////////////////////////////////////////
211  
    struct scheduling_counters
212  
    {
213  
        scheduling_counters(std::int64_t& executed_threads,
214  
                std::int64_t& executed_thread_phases,
215  
                std::uint64_t& tfunc_time, std::uint64_t& exec_time)
216  
          : executed_threads_(executed_threads),
217  
            executed_thread_phases_(executed_thread_phases),
218  
            tfunc_time_(tfunc_time),
219  
            exec_time_(exec_time)
220  
        {}
221  
222  
        std::int64_t& executed_threads_;
223  
        std::int64_t& executed_thread_phases_;
224  
        std::uint64_t& tfunc_time_;
225  
        std::uint64_t& exec_time_;
226  
    };
227  
228  
    struct scheduling_callbacks
229  
    {
230  
        typedef util::function_nonser<void()> callback_type;
231  
        typedef util::function_nonser<bool()> background_callback_type;
232  
233  
        explicit scheduling_callbacks(
234  
                callback_type && outer,
235  
                callback_type && inner = callback_type(),
236  
                background_callback_type && background =
237  
                    background_callback_type(),
238  
                std::size_t max_background_threads =
239  
                    hpx::util::safe_lexical_cast<std::size_t>(
240  
                        hpx::get_config_entry("hpx.max_background_threads",
241  
                            (std::numeric_limits<std::size_t>::max)())))
242  
          : outer_(std::move(outer)),
243  
            inner_(std::move(inner)),
244  
            background_(std::move(background)),
245  
            max_background_threads_(max_background_threads)
246  
        {}
247  
248  
        callback_type outer_;
249  
        callback_type inner_;
250  
        background_callback_type background_;
251  
        std::size_t max_background_threads_;
252  
    };
253  
254  
    template <typename SchedulingPolicy>
255  
    void scheduling_loop(std::size_t num_thread, SchedulingPolicy& scheduler,
256  
        scheduling_counters& counters, scheduling_callbacks& callbacks)
257  
    {
258  
        boost::atomic<hpx::state>& this_state = scheduler.get_state(num_thread);
259  
260  
        util::itt::stack_context ctx;        // helper for itt support
261  
        util::itt::domain domain(get_thread_name().data());
262  
//         util::itt::id threadid(domain, this);
263  
        util::itt::frame_context fctx(domain);
264  
265  
        std::int64_t idle_loop_count = 0;
266  
        std::int64_t busy_loop_count = 0;
267  
268  
        idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_);
269  
        tfunc_time_wrapper tfunc_time_collector(idle_rate);
270  
271  
        scheduler.SchedulingPolicy::start_periodic_maintenance(this_state);
272  
273  
        // spin for some time after queues have become empty
274  
        bool may_exit = false;
275  
        thread_data* thrd = nullptr;
276  
        thread_data* next_thrd = nullptr;
277  
278  
        while (true) {
279  
            // Get the next HPX thread from the queue
280  
            thrd = next_thrd;
281  
            if (HPX_LIKELY(thrd ||
282  
                    scheduler.SchedulingPolicy::get_next_thread(num_thread,
283  
                        idle_loop_count, thrd)))
284  
            {
285  
                tfunc_time_wrapper tfunc_time_collector(idle_rate);
286  
287  
                idle_loop_count = 0;
288  
                ++busy_loop_count;
289  
290  
                may_exit = false;
291  
292  
                // Only pending HPX threads will be executed.
293  
                // Any non-pending HPX threads are leftovers from a set_state()
294  
                // call for a previously pending HPX thread (see comments above).
295  
                thread_state state = thrd->get_state();
296  
                thread_state_enum state_val = state.state();
297  
298  
                detail::write_old_state_log(num_thread, thrd, state_val);
299  
300  
                if (HPX_LIKELY(pending == state_val)) {
301  
                    // switch the state of the thread to active and back to
302  
                    // what the thread reports as its return value
303  
304  
                    {
305  
                        // tries to set state to active (only if state is still
306  
                        // the same as 'state')
307  
                        detail::switch_status thrd_stat (thrd, state);
308  
                        if (HPX_LIKELY(thrd_stat.is_valid() &&
309  
                                thrd_stat.get_previous() == pending))
310  
                        {
311  
                            tfunc_time_wrapper tfunc_time_collector(idle_rate);
312  
313  
                            // thread returns new required state
314  
                            // store the returned state in the thread
315  
                            {
316  
#ifdef HPX_HAVE_ITTNOTIFY
317  
                                util::itt::caller_context cctx(ctx);
318  
                                util::itt::undo_frame_context undoframe(fctx);
319  
                                util::itt::task task(domain, thrd->get_description());
320  
#endif
321  
                                // Record time elapsed in thread changing state
322  
                                // and add to aggregate execution time.
323  
                                exec_time_wrapper exec_time_collector(idle_rate);
324  
325  
#if defined(HPX_HAVE_APEX)
326  
                                util::apex_wrapper apex_profiler(
327  
                                    thrd->get_description());
328  
329  
                                thrd_stat = (*thrd)();
330  
331  
                                if (thrd_stat.get_previous() == terminated)
332  
                                {
333  
                                    apex_profiler.stop();
334  
                                }
335  
                                else
336  
                                {
337  
                                    apex_profiler.yield();
338  
                                }
339  
#else
340  
                                thrd_stat = (*thrd)();
341  
#endif
342  
                            }
343  
344  
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
345  
                            ++counters.executed_thread_phases_;
346  
#endif
347  
                        }
348  
                        else {
349  
                            // some other worker-thread got in between and started
350  
                            // executing this HPX-thread, we just continue with
351  
                            // the next one
352  
                            thrd_stat.disable_restore();
353  
                            detail::write_new_state_log_warning(
354  
                                num_thread, thrd, state_val, "no execution");
355  
                            continue;
356  
                        }
357  
358  
                        // store and retrieve the new state in the thread
359  
                        if (HPX_UNLIKELY(!thrd_stat.store_state(state))) {
360  
                            // some other worker-thread got in between and changed
361  
                            // the state of this thread, we just continue with
362  
                            // the next one
363  
                            detail::write_new_state_log_warning(
364  
                                num_thread, thrd, state_val, "no state change");
365  
                            continue;
366  
                        }
367  
368  
                        state_val = state.state();
369  
370  
                        // any exception thrown from the thread will reset its
371  
                        // state at this point
372  
373  
                        // handle next thread id if given (switch directly to
374  
                        // this thread)
375  
                        next_thrd = thrd_stat.get_next_thread();
376  
                    }
377  
378  
                    //detail::write_new_state_log_debug(num_thread, thrd,
379  
                    //    state_val, "normal");
380  
381  
                    // Re-add this work item to our list of work items if the HPX
382  
                    // thread should be re-scheduled. If the HPX thread is suspended
383  
                    // now we just keep it in the map of threads.
384  
                    if (HPX_UNLIKELY(state_val == pending)) {
385  
                        if (HPX_LIKELY(next_thrd == nullptr)) {
386  
                            // schedule other work
387  
                            scheduler.SchedulingPolicy::wait_or_add_new(
388  
                                num_thread, is_running_state(this_state.load()),
389  
                                idle_loop_count);
390  
                        }
391  
392  
                        // schedule this thread again, make sure it ends up at
393  
                        // the end of the queue
394  
                        scheduler.SchedulingPolicy::schedule_thread_last(thrd,
395  
                            num_thread);
396  
                        scheduler.SchedulingPolicy::do_some_work(num_thread);
397  
                    }
398  
                }
399  
                else if (HPX_UNLIKELY(active == state_val)) {
400  
                    LTM_(warning) << "tfunc(" << num_thread << "): " //-V128
401  
                        "thread(" << thrd->get_thread_id().get() << "), "
402  
                        "description(" << thrd->get_description() << "), "
403  
                        "rescheduling";
404  
405  
                    // re-schedule thread, if it is still marked as active
406  
                    // this might happen, if some thread has been added to the
407  
                    // scheduler queue already but the state has not been reset
408  
                    // yet
409  
                    //
410  
                    // REVIEW: Passing a specific target thread may set off
411  
                    // the round robin queuing.
412  
                    scheduler.SchedulingPolicy::schedule_thread(thrd, num_thread);
413  
                }
414  
415  
                // Remove the mapping from thread_map_ if HPX thread is depleted
416  
                // or terminated, this will delete the HPX thread as all
417  
                // references go out of scope.
418  
                // REVIEW: what has to be done with depleted HPX threads?
419  
                if (HPX_LIKELY(state_val == depleted || state_val == terminated))
420  
                {
421  
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
422  
                    ++counters.executed_threads_;
423  
#endif
424  
                    scheduler.SchedulingPolicy::destroy_thread(thrd, busy_loop_count);
425  
                }
426  
            }
427  
428  
            // if nothing else has to be done either wait or terminate
429  
            else {
430  
                ++idle_loop_count;
431  
432  
                if (scheduler.SchedulingPolicy::wait_or_add_new(num_thread,
433  
                        is_running_state(this_state.load()), idle_loop_count))
434  
                {
435  
                    // clean up terminated threads one more time before existing
436  
                    if (scheduler.SchedulingPolicy::cleanup_terminated(true))
437  
                    {
438  
                        // if this is an inner scheduler, exit immediately
439  
                        if (!(scheduler.get_scheduler_mode() & policies::delay_exit))
440  
                        {
441  
                            this_state.store(state_stopped);
442  
                            break;
443  
                        }
444  
445  
                        // otherwise, keep idling for some time
446  
                        if (!may_exit)
447  
                            idle_loop_count = 0;
448  
                        may_exit = true;
449  
                    }
450  
                }
451  
452  
                // do background work in parcel layer and in agas
453  
                if ((scheduler.get_scheduler_mode() & policies::do_background_work) &&
454  
                    num_thread < callbacks.max_background_threads_ &&
455  
                    !callbacks.background_.empty())
456  
                {
457  
                    if (callbacks.background_())
458  
                        idle_loop_count = 0;
459  
                }
460  
461  
                // call back into invoking context
462  
                if (!callbacks.inner_.empty())
463  
                    callbacks.inner_();
464  
            }
465  
466  
            // something went badly wrong, give up
467  
            if (HPX_UNLIKELY(this_state.load() == state_terminating))
468  
                break;
469  
470  
            if (busy_loop_count > HPX_BUSY_LOOP_COUNT_MAX)
471  
            {
472  
                busy_loop_count = 0;
473  
474  
                // do background work in parcel layer and in agas
475  
                if ((scheduler.get_scheduler_mode() & policies::do_background_work) &&
476  
                    num_thread < callbacks.max_background_threads_ &&
477  
                    !callbacks.background_.empty())
478  
                {
479  
                    if (callbacks.background_())
480  
                        idle_loop_count = 0;
481  
                }
482  
            }
483  
            else if ((scheduler.get_scheduler_mode() & policies::fast_idle_mode) ||
484  
                idle_loop_count > HPX_IDLE_LOOP_COUNT_MAX)
485  
            {
486  
                // clean up terminated threads
487  
                if (idle_loop_count > HPX_IDLE_LOOP_COUNT_MAX)
488  
                    idle_loop_count = 0;
489  
490  
                // call back into invoking context
491  
                if (!callbacks.outer_.empty())
492  
                    callbacks.outer_();
493  
494  
                // break if we were idling after 'may_exit'
495  
                if (may_exit)
496  
                {
497  
                    if (scheduler.SchedulingPolicy::cleanup_terminated(true))
498  
                    {
499  
                        this_state.store(state_stopped);
500  
                        break;
501  
                    }
502  
                    may_exit = false;
503  
                }
504  
                else
505  
                {
506  
                    scheduler.SchedulingPolicy::cleanup_terminated(true);
507  
                }
508  
            }
509  
        }
510  
    }
511  
}}}
512  
513  
#endif
514  
515  
516  

Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.