/home/users/khuck/src/hpx-lsu/src/runtime/threads/detail/thread_pool.cpp

Line% of fetchesSource
1  
//  Copyright (c) 2007-2015 Hartmut Kaiser
2  
//
3  
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
6  
#include <hpx/runtime/threads/detail/thread_pool.hpp>
7  
8  
#include <hpx/error_code.hpp>
9  
#include <hpx/exception.hpp>
10  
#include <hpx/state.hpp>
11  
#include <hpx/throw_exception.hpp>
12  
#include <hpx/lcos/local/no_mutex.hpp>
13  
#include <hpx/runtime/get_worker_thread_num.hpp>
14  
#include <hpx/runtime/threads/detail/create_thread.hpp>
15  
#include <hpx/runtime/threads/detail/create_work.hpp>
16  
#include <hpx/runtime/threads/detail/scheduling_loop.hpp>
17  
#include <hpx/runtime/threads/detail/set_thread_state.hpp>
18  
#include <hpx/runtime/threads/detail/thread_num_tss.hpp>
19  
#include <hpx/runtime/threads/policies/callback_notifier.hpp>
20  
#include <hpx/runtime/threads/topology.hpp>
21  
#include <hpx/util/assert.hpp>
22  
#include <hpx/util/bind.hpp>
23  
#include <hpx/util/logging.hpp>
24  
#include <hpx/util/hardware/timestamp.hpp>
25  
#include <hpx/util/high_resolution_clock.hpp>
26  
#include <hpx/util/unlock_guard.hpp>
27  
28  
#include <boost/atomic.hpp>
29  
#include <boost/exception_ptr.hpp>
30  
#include <boost/system/system_error.hpp>
31  
#include <boost/thread/barrier.hpp>
32  
#include <boost/thread/mutex.hpp>
33  
#include <boost/thread/thread.hpp>
34  
35  
#include <algorithm>
36  
#include <cstddef>
37  
#include <cstdint>
38  
#include <exception>
39  
#include <functional>
40  
#include <iomanip>
41  
#include <mutex>
42  
#include <numeric>
43  
44  
namespace hpx { namespace threads { namespace detail
45  
{
46  
    ///////////////////////////////////////////////////////////////////////////
47  
    template <typename Scheduler>
48  
    void thread_pool<Scheduler>::init_tss(std::size_t num)
49  
    {
50  
        thread_num_tss_.init_tss(num);
51  
    }
52  
53  
    template <typename Scheduler>
54  
    void thread_pool<Scheduler>::deinit_tss()
55  
    {
56  
        thread_num_tss_.deinit_tss();
57  
    }
58  
59  
    ///////////////////////////////////////////////////////////////////////////
60  
    template <typename Scheduler>
61  
    thread_pool<Scheduler>::thread_pool(Scheduler& sched,
62  
            threads::policies::callback_notifier& notifier,
63  
            char const* pool_name, policies::scheduler_mode m)
64  
      : sched_(sched),
65  
        notifier_(notifier),
66  
        pool_name_(pool_name),
67  
        thread_count_(0),
68  
        used_processing_units_(),
69  
        mode_(m)
70  
    {
71  
        timestamp_scale_ = 1.0;
72  
    }
73  
74  
    template <typename Scheduler>
75  
    thread_pool<Scheduler>::~thread_pool()
76  
    {
77  
        if (!threads_.empty()) {
78  
            if (!sched_.has_reached_state(state_suspended))
79  
            {
80  
                // still running
81  
                lcos::local::no_mutex mtx;
82  
                std::unique_lock<lcos::local::no_mutex> l(mtx);
83  
                stop_locked(l);
84  
            }
85  
            threads_.clear();
86  
        }
87  
    }
88  
89  
    ///////////////////////////////////////////////////////////////////////////
90  
    template <typename Scheduler>
91  
    hpx::state thread_pool<Scheduler>::get_state() const
92  
    {
93  
        // get_worker_thread_num returns the global thread number which might
94  
        // be too large. This function might get called from within
95  
        // background_work inside the os executors
96  
        if (thread_count_ != 0)
97  
        {
98  
            std::size_t num_thread = get_worker_thread_num() % thread_count_;
99  
            if (num_thread != std::size_t(-1))
100  
                return get_state(num_thread);
101  
        }
102  
        return sched_.get_minmax_state().second;
103  
    }
104  
105  
    template <typename Scheduler>
106  
    hpx::state thread_pool<Scheduler>::get_state(std::size_t num_thread) const
107  
    {
108  
        HPX_ASSERT(num_thread != std::size_t(-1));
109  
        return sched_.get_state(num_thread).load();
110  
    }
111  
112  
    template <typename Scheduler>
113  
    bool thread_pool<Scheduler>::has_reached_state(hpx::state s) const
114  
    {
115  
        return sched_.has_reached_state(s);
116  
    }
117  
118  
    ///////////////////////////////////////////////////////////////////////////
119  
    template <typename Scheduler>
120  
    std::size_t thread_pool<Scheduler>::init(std::size_t num_threads,
121  
        policies::init_affinity_data const& data)
122  
    {
123  
        topology const& topology_ = get_topology();
124  
        std::size_t cores_used = sched_.Scheduler::init(data, topology_);
125  
126  
        resize(used_processing_units_, threads::hardware_concurrency());
127  
        for (std::size_t i = 0; i != num_threads; ++i)
128  
            used_processing_units_ |= sched_.Scheduler::get_pu_mask(topology_, i);
129  
130  
        return cores_used;
131  
    }
132  
133  
    ///////////////////////////////////////////////////////////////////////////
134  
    template <typename Scheduler>
135  
    std::size_t thread_pool<Scheduler>::get_pu_num(std::size_t num_thread) const
136  
    {
137  
        return sched_.Scheduler::get_pu_num(num_thread);
138  
    }
139  
140  
    template <typename Scheduler>
141  
    mask_cref_type thread_pool<Scheduler>::get_pu_mask(
142  
        topology const& topology, std::size_t num_thread) const
143  
    {
144  
        return sched_.Scheduler::get_pu_mask(topology, num_thread);
145  
    }
146  
147  
    template <typename Scheduler>
148  
    mask_cref_type thread_pool<Scheduler>::get_used_processing_units() const
149  
    {
150  
        return used_processing_units_;
151  
    }
152  
153  
    template <typename Scheduler>
154  
    void thread_pool<Scheduler>::do_some_work(std::size_t num_thread)
155  
    {
156  
        sched_.Scheduler::do_some_work(num_thread);
157  
    }
158  
159  
    template <typename Scheduler>
160  
    void thread_pool<Scheduler>::report_error(std::size_t num,
161  
        boost::exception_ptr const& e)
162  
    {
163  
        sched_.set_all_states(state_terminating);
164  
        notifier_.on_error(num, e);
165  
        sched_.Scheduler::on_error(num, e);
166  
    }
167  
168  
    ///////////////////////////////////////////////////////////////////////////
169  
    template <typename Scheduler>
170  
    void thread_pool<Scheduler>::create_thread(thread_init_data& data,
171  
        thread_id_type& id, thread_state_enum initial_state, bool run_now,
172  
        error_code& ec)
173  
    {
174  
        // verify state
175  
        if (thread_count_ == 0 && !sched_.is_state(state_running))
176  
        {
177  
            // thread-manager is not currently running
178  
            HPX_THROWS_IF(ec, invalid_status,
179  
                "thread_pool<Scheduler>::create_thread",
180  
                "invalid state: thread pool is not running");
181  
            return;
182  
        }
183  
184  
        detail::create_thread(&sched_, data, id, initial_state, run_now, ec); //-V601
185  
    }
186  
187  
    template <typename Scheduler>
188  
    void thread_pool<Scheduler>::create_work(thread_init_data& data,
189  
        thread_state_enum initial_state, error_code& ec)
190  
    {
191  
        // verify state
192  
        if (thread_count_ == 0 && !sched_.is_state(state_running))
193  
        {
194  
            // thread-manager is not currently running
195  
            HPX_THROWS_IF(ec, invalid_status,
196  
                "thread_pool<Scheduler>::create_work",
197  
                "invalid state: thread pool is not running");
198  
            return;
199  
        }
200  
201  
        detail::create_work(&sched_, data, initial_state, ec); //-V601
202  
    }
203  
204  
    template <typename Scheduler>
205  
    thread_state thread_pool<Scheduler>::set_state(
206  
        thread_id_type const& id, thread_state_enum new_state,
207  
        thread_state_ex_enum new_state_ex, thread_priority priority,
208  
        error_code& ec)
209  
    {
210  
        return detail::set_thread_state(id, new_state, //-V107
211  
            new_state_ex, priority, get_worker_thread_num(), ec);
212  
    }
213  
214  
    template <typename Scheduler>
215  
    thread_id_type thread_pool<Scheduler>::set_state(
216  
        util::steady_time_point const& abs_time,
217  
        thread_id_type const& id, thread_state_enum newstate,
218  
        thread_state_ex_enum newstate_ex, thread_priority priority,
219  
        error_code& ec)
220  
    {
221  
        return detail::set_thread_state_timed(sched_, abs_time, id,
222  
            newstate, newstate_ex, priority, get_worker_thread_num(), ec);
223  
    }
224  
225  
    template <typename Scheduler>
226  
    void thread_pool<Scheduler>::abort_all_suspended_threads()
227  
    {
228  
        sched_.Scheduler::abort_all_suspended_threads();
229  
    }
230  
231  
    template <typename Scheduler>
232  
    bool thread_pool<Scheduler>::cleanup_terminated(bool delete_all)
233  
    {
234  
        return sched_.Scheduler::cleanup_terminated(delete_all);
235  
    }
236  
237  
    ///////////////////////////////////////////////////////////////////////////
238  
    template <typename Scheduler>
239  
    std::size_t thread_pool<Scheduler>::get_worker_thread_num() const
240  
    {
241  
        return thread_num_tss_.get_worker_thread_num();
242  
    }
243  
244  
    template <typename Scheduler>
245  
    boost::thread& thread_pool<Scheduler>::get_os_thread_handle(
246  
        std::size_t num_thread)
247  
    {
248  
        HPX_ASSERT(num_thread < threads_.size());
249  
        return threads_[threads_.size() - num_thread - 1];
250  
    }
251  
252  
    template <typename Scheduler>
253  
    std::int64_t thread_pool<Scheduler>::get_thread_count(
254  
        thread_state_enum state, thread_priority priority,
255  
        std::size_t num, bool reset) const
256  
    {
257  
        return sched_.Scheduler::get_thread_count(state, priority, num, reset);
258  
    }
259  
260  
    template <typename Scheduler>
261  
    bool thread_pool<Scheduler>::enumerate_threads(
262  
        util::function_nonser<bool(thread_id_type)> const& f,
263  
        thread_state_enum state) const
264  
    {
265  
        return sched_.Scheduler::enumerate_threads(f, state);
266  
    }
267  
268  
    template <typename Scheduler>
269  
    void thread_pool<Scheduler>::reset_thread_distribution()
270  
    {
271  
        return sched_.Scheduler::reset_thread_distribution();
272  
    }
273  
274  
    template <typename Scheduler>
275  
    void thread_pool<Scheduler>::set_scheduler_mode(
276  
        threads::policies::scheduler_mode mode)
277  
    {
278  
        return sched_.set_scheduler_mode(mode);
279  
    }
280  
281  
    ///////////////////////////////////////////////////////////////////////////
282  
    template <typename Scheduler>
283  
    bool thread_pool<Scheduler>::run(std::unique_lock<boost::mutex>& l,
284  
        std::size_t num_threads)
285  
    {
286  
        HPX_ASSERT(l.owns_lock());
287  
288  
        LTM_(info) //-V128
289  
            << "thread_pool::run: " << pool_name_
290  
            << " number of processing units available: " //-V128
291  
            << threads::hardware_concurrency();
292  
        LTM_(info) //-V128
293  
            << "thread_pool::run: " << pool_name_
294  
            << " creating " << num_threads << " OS thread(s)"; //-V128
295  
296  
        if (0 == num_threads) {
297  
            HPX_THROW_EXCEPTION(bad_parameter,
298  
                "thread_pool::run", "number of threads is zero");
299  
        }
300  
301  
        if (!threads_.empty() || sched_.has_reached_state(state_running))
302  
            return true;    // do nothing if already running
303  
304  
        executed_threads_.resize(num_threads);
305  
        executed_thread_phases_.resize(num_threads);
306  
307  
        tfunc_times_.resize(num_threads);
308  
        exec_times_.resize(num_threads);
309  
310  
        reset_tfunc_times_.resize(num_threads);
311  
312  
        // scale timestamps to nanoseconds
313  
        std::uint64_t base_timestamp = util::hardware::timestamp();
314  
        std::uint64_t base_time = util::high_resolution_clock::now();
315  
        std::uint64_t curr_timestamp = util::hardware::timestamp();
316  
        std::uint64_t curr_time = util::high_resolution_clock::now();
317  
318  
        while ((curr_time - base_time) <= 100000)
319  
        {
320  
            curr_timestamp = util::hardware::timestamp();
321  
            curr_time = util::high_resolution_clock::now();
322  
        }
323  
324  
        if (curr_timestamp - base_timestamp != 0)
325  
        {
326  
            timestamp_scale_ = double(curr_time - base_time) /
327  
                double(curr_timestamp - base_timestamp);
328  
        }
329  
330  
#if defined(HPX_HAVE_THREAD_CUMULATIVE_COUNTS)
331  
        // timestamps/values of last reset operation for various performance
332  
        // counters
333  
        reset_executed_threads_.resize(num_threads);
334  
        reset_executed_thread_phases_.resize(num_threads);
335  
336  
#if defined(HPX_HAVE_THREAD_IDLE_RATES)
337  
        // timestamps/values of last reset operation for various performance
338  
        // counters
339  
        reset_thread_duration_.resize(num_threads);
340  
        reset_thread_duration_times_.resize(num_threads);
341  
342  
        reset_thread_overhead_.resize(num_threads);
343  
        reset_thread_overhead_times_.resize(num_threads);
344  
        reset_thread_overhead_times_total_.resize(num_threads);
345  
346  
        reset_thread_phase_duration_.resize(num_threads);
347  
        reset_thread_phase_duration_times_.resize(num_threads);
348  
349  
        reset_thread_phase_overhead_.resize(num_threads);
350  
        reset_thread_phase_overhead_times_.resize(num_threads);
351  
        reset_thread_phase_overhead_times_total_.resize(num_threads);
352  
353  
        reset_cumulative_thread_duration_.resize(num_threads);
354  
355  
        reset_cumulative_thread_overhead_.resize(num_threads);
356  
        reset_cumulative_thread_overhead_total_.resize(num_threads);
357  
#endif
358  
#endif
359  
360  
#if defined(HPX_HAVE_THREAD_IDLE_RATES)
361  
        reset_idle_rate_time_.resize(num_threads);
362  
        reset_idle_rate_time_total_.resize(num_threads);
363  
364  
#if defined(HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES)
365  
        reset_creation_idle_rate_time_.resize(num_threads);
366  
        reset_creation_idle_rate_time_total_.resize(num_threads);
367  
368  
        reset_cleanup_idle_rate_time_.resize(num_threads);
369  
        reset_cleanup_idle_rate_time_total_.resize(num_threads);
370  
#endif
371  
#endif
372  
373  
        LTM_(info)
374  
            << "thread_pool::run: " << pool_name_
375  
            << " timestamp_scale: " << timestamp_scale_; //-V128
376  
377  
        try {
378  
            HPX_ASSERT(startup_.get() == nullptr);
379  
            startup_.reset(
380  
                new boost::barrier(static_cast<unsigned>(num_threads+1))
381  
            );
382  
383  
            // run threads and wait for initialization to complete
384  
385  
            topology const& topology_ = get_topology();
386  
387  
            std::size_t thread_num = num_threads;
388  
            while (thread_num-- != 0) {
389  
                threads::mask_cref_type mask =
390  
                    sched_.Scheduler::get_pu_mask(topology_, thread_num);
391  
392  
                LTM_(info) //-V128
393  
                    << "thread_pool::run: " << pool_name_
394  
                    << " create OS thread " << thread_num //-V128
395  
                    << ": will run on processing units within this mask: "
396  
#if !defined(HPX_HAVE_MORE_THAN_64_THREADS) || \
397  
    (defined(HPX_HAVE_MAX_CPU_COUNT) && HPX_HAVE_MAX_CPU_COUNT <= 64)
398  
                    << std::hex << "0x" << mask;
399  
#else
400  
                    << "0b" << mask;
401  
#endif
402  
403  
                // create a new thread
404  
                threads_.push_back(boost::thread(
405  
                        &thread_pool::thread_func, this, thread_num,
406  
                        std::ref(topology_), std::ref(*startup_)
407  
                    ));
408  
409  
                // set the new threads affinity (on Windows systems)
410  
                if (any(mask))
411  
                {
412  
                    error_code ec(lightweight);
413  
                    topology_.set_thread_affinity_mask(threads_.back(), mask, ec);
414  
                    if (ec)
415  
                    {
416  
                        LTM_(warning) //-V128
417  
                            << "thread_pool::run: " << pool_name_
418  
                            << " setting thread affinity on OS thread " //-V128
419  
                            << thread_num << " failed with: "
420  
                            << ec.get_message();
421  
                    }
422  
                }
423  
                else
424  
                {
425  
                    LTM_(debug) //-V128
426  
                        << "thread_pool::run: " << pool_name_
427  
                        << " setting thread affinity on OS thread " //-V128
428  
                        << thread_num << " was explicitly disabled.";
429  
                }
430  
            }
431  
432  
            // the main thread needs to have a unique thread_num
433  
            init_tss(num_threads);
434  
            startup_->wait();
435  
436  
            // The scheduler is now running.
437  
            sched_.set_all_states(state_running);
438  
        }
439  
        catch (std::exception const& e) {
440  
            LTM_(always)
441  
                << "thread_pool::run: " << pool_name_
442  
                << " failed with: " << e.what();
443  
444  
            // trigger the barrier
445  
            if (startup_.get() != nullptr)
446  
            {
447  
                while (num_threads-- != 0 && !startup_->wait())
448  
                    ;
449  
            }
450  
451  
            stop(l);
452  
            threads_.clear();
453  
454  
            return false;
455  
        }
456  
457  
        LTM_(info) << "thread_pool::run: " << pool_name_ << " running";
458  
        return true;
459  
    }
460  
461  
    ///////////////////////////////////////////////////////////////////////////
462  
    template <typename Scheduler>
463  
    void thread_pool<Scheduler>::stop (
464  
        std::unique_lock<boost::mutex>& l, bool blocking)
465  
    {
466  
        HPX_ASSERT(l.owns_lock());
467  
468  
        return stop_locked(l, blocking);
469  
    }
470  
471  
    template <typename Scheduler>
472  
    template <typename Lock>
473  
    void thread_pool<Scheduler>::stop_locked(Lock& l, bool blocking)
474  
    {
475  
        LTM_(info)
476  
            << "thread_pool::stop: " << pool_name_
477  
            << " blocking(" << std::boolalpha << blocking << ")";
478  
479  
        deinit_tss();
480  
481  
        if (!threads_.empty()) {
482  
            // set state to stopping
483  
            sched_.set_all_states(state_stopping);
484  
485  
            // make sure we're not waiting
486  
            sched_.Scheduler::do_some_work(std::size_t(-1));
487  
488  
            if (blocking) {
489  
                for (std::size_t i = 0; i != threads_.size(); ++i)
490  
                {
491  
                    // make sure no OS thread is waiting
492  
                    LTM_(info)
493  
                        << "thread_pool::stop: " << pool_name_
494  
                        << " notify_all";
495  
496  
                    sched_.Scheduler::do_some_work(std::size_t(-1));
497  
498  
                    LTM_(info) //-V128
499  
                        << "thread_pool::stop: " << pool_name_
500  
                        << " join:" << i; //-V128
501  
502  
                    // unlock the lock while joining
503  
                    util::unlock_guard<Lock> ul(l);
504  
                    threads_[i].join();
505  
                }
506  
                threads_.clear();
507  
            }
508  
        }
509  
    }
510  
511  
    ///////////////////////////////////////////////////////////////////////////
512  
    struct manage_active_thread_count
513  
    {
514  
        manage_active_thread_count(boost::atomic<long>& counter)
515  
          : counter_(counter)
516  
        {
517  
            ++counter_;
518  
        }
519  
        ~manage_active_thread_count()
520  
        {
521  
            --counter_;
522  
        }
523  
524  
        boost::atomic<long>& counter_;
525  
    };
526  
527  
    template <typename Scheduler>
528  
    struct init_tss_helper
529  
    {
530  
        init_tss_helper(thread_pool<Scheduler>& pool, std::size_t thread_num)
531  
          : pool_(pool), thread_num_(thread_num)
532  
        {
533  
            pool.notifier_.on_start_thread(thread_num);
534  
            pool.init_tss(thread_num);
535  
            pool.sched_.Scheduler::on_start_thread(thread_num);
536  
        }
537  
        ~init_tss_helper()
538  
        {
539  
            pool_.sched_.Scheduler::on_stop_thread(thread_num_);
540  
            pool_.deinit_tss();
541  
            pool_.notifier_.on_stop_thread(thread_num_);
542  
        }
543  
544  
        thread_pool<Scheduler>& pool_;
545  
        std::size_t thread_num_;
546  
    };
547  
548  
    template <typename Scheduler>
549  
    void thread_pool<Scheduler>::thread_func(std::size_t num_thread,
550  
        topology const& topology, boost::barrier& startup)
551  
    {
552  
        // Set the affinity for the current thread.
553  
        threads::mask_cref_type mask =
554  
            sched_.Scheduler::get_pu_mask(topology, num_thread);
555  
556  
        if (LHPX_ENABLED(debug))
557  
            topology.write_to_log();
558  
559  
        error_code ec(lightweight);
560  
        if (any(mask))
561  
        {
562  
            topology.set_thread_affinity_mask(mask, ec);
563  
            if (ec)
564  
            {
565  
                LTM_(warning) //-V128
566  
                    << "thread_pool::thread_func: " << pool_name_
567  
                    << " setting thread affinity on OS thread " //-V128
568  
                    << num_thread << " failed with: " << ec.get_message();
569  
            }
570  
        }
571  
        else
572  
        {
573  
            LTM_(debug) //-V128
574  
                << "thread_pool::thread_func: " << pool_name_
575  
                << " setting thread affinity on OS thread " //-V128
576  
                << num_thread << " was explicitly disabled.";
577  
        }
578  
579  
        // Setting priority of worker threads to a lower priority, this needs to
580  
        // be done in order to give the parcel pool threads higher priority
581  
        if ((mode_ & policies::reduce_thread_priority) &&
582  
            any(mask & used_processing_units_))
583  
        {
584  
            topology.reduce_thread_priority(ec);
585  
            if (ec)
586  
            {
587  
                LTM_(warning) //-V128
588  
                    << "thread_pool::thread_func: " << pool_name_
589  
                    << " reducing thread priority on OS thread " //-V128
590  
                    << num_thread << " failed with: " << ec.get_message();
591  
            }
592  
        }
593  
594  
        // manage the number of this thread in its TSS
595  
        init_tss_helper<Scheduler> tss_helper(*this, num_thread);
596  
597  
        // wait for all threads to start up before before starting HPX work
598  
        startup.wait();
599  
600  
        {
601  
            LTM_(info) //-V128
602  
                << "thread_pool::thread_func: " << pool_name_
603  
                << " starting OS thread: " << num_thread; //-V128
604  
605  
            try {
606  
                try {
607  
                    manage_active_thread_count count(thread_count_);
608  
609  
                    // run the work queue
610  
                    hpx::threads::coroutines::prepare_main_thread main_thread;
611  
612  
                    // run main Scheduler loop until terminated
613  
                    detail::scheduling_counters counters(
614  
                        executed_threads_[num_thread],
615  
                        executed_thread_phases_[num_thread],
616  
                        tfunc_times_[num_thread], exec_times_[num_thread]);
617  
618  
                    detail::scheduling_callbacks callbacks(
619  
                        util::bind( //-V107
620  
                            &policies::scheduler_base::idle_callback,
621  
                            &sched_, num_thread
622  
                        ),
623  
                        detail::scheduling_callbacks::callback_type());
624  
625  
                    if (mode_ & policies::do_background_work)
626  
                    {
627  
                        callbacks.background_ = util::bind( //-V107
628  
                            &policies::scheduler_base::background_callback,
629  
                            &sched_, num_thread);
630  
                    }
631  
632  
                    sched_.set_scheduler_mode(mode_);
633  
                    detail::scheduling_loop(num_thread, sched_, counters,
634  
                        callbacks);
635  
636  
                    // the OS thread is allowed to exit only if no more HPX
637  
                    // threads exist or if some other thread has terminated
638  
                    HPX_ASSERT(!sched_.Scheduler::get_thread_count(
639  
                            unknown, thread_priority_default, num_thread) ||
640  
                        sched_.get_state(num_thread) == state_terminating);
641  
                }
642  
                catch (hpx::exception const& e) {
643  
                    LFATAL_ //-V128
644  
                        << "thread_pool::thread_func: " << pool_name_
645  
                        << " thread_num:" << num_thread //-V128
646  
                        << " : caught hpx::exception: "
647  
                        << e.what() << ", aborted thread execution";
648  
649  
                    report_error(num_thread, boost::current_exception());
650  
                    return;
651  
                }
652  
                catch (boost::system::system_error const& e) {
653  
                    LFATAL_ //-V128
654  
                        << "thread_pool::thread_func: " << pool_name_
655  
                        << " thread_num:" << num_thread //-V128
656  
                        << " : caught boost::system::system_error: "
657  
                        << e.what() << ", aborted thread execution";
658  
659  
                    report_error(num_thread, boost::current_exception());
660  
                    return;
661  
                }
662  
                catch (std::exception const& e) {
663  
                    // Repackage exceptions to avoid slicing.
664  
                    boost::throw_exception(boost::enable_error_info(
665  
                        hpx::exception(unhandled_exception, e.what())));
666  
                }
667  
            }
668  
            catch (...) {
669  
                LFATAL_ //-V128
670  
                    << "thread_pool::thread_func: " << pool_name_
671  
                    << " thread_num:" << num_thread //-V128
672  
                    << " : caught unexpected " //-V128
673  
                       "exception, aborted thread execution";
674  
675  
                report_error(num_thread, boost::current_exception());
676  
                return;
677  
            }
678  
679  
            LTM_(info) //-V128
680  
                << "thread_pool::thread_func: " << pool_name_
681  
                << " thread_num: " << num_thread
682  
                << " : ending OS thread, " //-V128
683  
                   "executed " << executed_threads_[num_thread]
684  
                << " HPX threads";
685  
        }
686  
    }
687  
688  
    ///////////////////////////////////////////////////////////////////////////
689  
    // performance counters
690  
#if defined(HPX_HAVE_THREAD_CUMULATIVE_COUNTS)
691  
    template <typename Scheduler>
692  
    std::int64_t thread_pool<Scheduler>::
693  
        get_executed_threads(std::size_t num, bool reset)
694  
    {
695  
        std::int64_t executed_threads = 0;
696  
        std::int64_t reset_executed_threads = 0;
697  
698  
        if (num != std::size_t(-1))
699  
        {
700  
            executed_threads = executed_threads_[num];
701  
            reset_executed_threads = reset_executed_threads_[num];
702  
703  
            if (reset)
704  
                reset_executed_threads_[num] = executed_threads;
705  
        }
706  
        else
707  
        {
708  
            executed_threads = std::accumulate(executed_threads_.begin(),
709  
                executed_threads_.end(), std::int64_t(0));
710  
            reset_executed_threads = std::accumulate(
711  
                reset_executed_threads_.begin(),
712  
                reset_executed_threads_.end(), std::int64_t(0));
713  
714  
            if (reset)
715  
            {
716  
                std::copy(executed_threads_.begin(), executed_threads_.end(),
717  
                    reset_executed_threads_.begin());
718  
            }
719  
        }
720  
721  
        HPX_ASSERT(executed_threads >= reset_executed_threads);
722  
723  
        return executed_threads - reset_executed_threads;
724  
    }
725  
726  
    template <typename Scheduler>
727  
    std::int64_t thread_pool<Scheduler>::
728  
        get_executed_thread_phases(std::size_t num, bool reset)
729  
    {
730  
        std::int64_t executed_phases = 0;
731  
        std::int64_t reset_executed_phases = 0;
732  
733  
        if (num != std::size_t(-1))
734  
        {
735  
            executed_phases = executed_thread_phases_[num];
736  
            reset_executed_phases = reset_executed_thread_phases_[num];
737  
738  
            if (reset)
739  
                reset_executed_thread_phases_[num] = executed_phases;
740  
        }
741  
        else
742  
        {
743  
            executed_phases = std::accumulate(executed_thread_phases_.begin(),
744  
                executed_thread_phases_.end(), std::int64_t(0));
745  
            reset_executed_phases = std::accumulate(
746  
                reset_executed_thread_phases_.begin(),
747  
                reset_executed_thread_phases_.end(), std::int64_t(0));
748  
749  
            if (reset)
750  
            {
751  
                std::copy(executed_thread_phases_.begin(),
752  
                    executed_thread_phases_.end(),
753  
                    reset_executed_thread_phases_.begin());
754  
            }
755  
        }
756  
757  
        HPX_ASSERT(executed_phases >= reset_executed_phases);
758  
759  
        return executed_phases - reset_executed_phases;
760  
    }
761  
762  
#if defined(HPX_HAVE_THREAD_IDLE_RATES)
763  
    template <typename Scheduler>
764  
    std::int64_t thread_pool<Scheduler>::
765  
        get_thread_phase_duration(std::size_t num, bool reset)
766  
    {
767  
        std::uint64_t exec_total = 0ul;
768  
        std::int64_t num_phases = 0l;
769  
        std::uint64_t reset_exec_total = 0ul;
770  
        std::int64_t reset_num_phases = 0l;
771  
772  
        if (num != std::size_t(-1))
773  
        {
774  
            exec_total = exec_times_[num];
775  
            num_phases = executed_thread_phases_[num];
776  
777  
            reset_exec_total = reset_thread_phase_duration_times_[num];
778  
            reset_num_phases = reset_thread_phase_duration_[num];
779  
780  
            if (reset)
781  
            {
782  
                reset_thread_phase_duration_[num] = num_phases;
783  
                reset_thread_phase_duration_times_[num] = exec_total;
784  
            }
785  
        }
786  
        else
787  
        {
788  
            exec_total = std::accumulate(exec_times_.begin(),
789  
                exec_times_.end(), std::uint64_t(0));
790  
            num_phases = std::accumulate(executed_thread_phases_.begin(),
791  
                executed_thread_phases_.end(), std::int64_t(0));
792  
793  
            reset_exec_total = std::accumulate(
794  
                reset_thread_phase_duration_times_.begin(),
795  
                reset_thread_phase_duration_times_.end(), std::uint64_t(0));
796  
            reset_num_phases = std::accumulate(
797  
                reset_thread_phase_duration_.begin(),
798  
                reset_thread_phase_duration_.end(), std::int64_t(0));
799  
800  
            if (reset)
801  
            {
802  
                std::copy(exec_times_.begin(), exec_times_.end(),
803  
                    reset_thread_phase_duration_times_.begin());
804  
                std::copy(executed_thread_phases_.begin(),
805  
                    executed_thread_phases_.end(),
806  
                    reset_thread_phase_duration_.begin());
807  
            }
808  
        }
809  
810  
        HPX_ASSERT(exec_total >= reset_exec_total);
811  
        HPX_ASSERT(num_phases >= reset_num_phases);
812  
813  
        exec_total -= reset_exec_total;
814  
        num_phases -= reset_num_phases;
815  
816  
        return std::uint64_t(
817  
                (double(exec_total) * timestamp_scale_) / double(num_phases)
818  
            );
819  
    }
820  
821  
    template <typename Scheduler>
822  
    std::int64_t thread_pool<Scheduler>::
823  
        get_thread_duration(std::size_t num, bool reset)
824  
    {
825  
        std::uint64_t exec_total = 0ul;
826  
        std::int64_t num_threads = 0l;
827  
        std::uint64_t reset_exec_total = 0ul;
828  
        std::int64_t reset_num_threads = 0l;
829  
830  
        if (num != std::size_t(-1))
831  
        {
832  
            exec_total = exec_times_[num];
833  
            num_threads = executed_threads_[num];
834  
835  
            reset_exec_total = reset_thread_duration_times_[num];
836  
            reset_num_threads = reset_thread_duration_[num];
837  
838  
            if (reset)
839  
            {
840  
                reset_thread_duration_[num] = num_threads;
841  
                reset_thread_duration_times_[num] = exec_total;
842  
            }
843  
        }
844  
        else
845  
        {
846  
            exec_total = std::accumulate(exec_times_.begin(),
847  
                exec_times_.end(), std::uint64_t(0));
848  
            num_threads = std::accumulate(executed_threads_.begin(),
849  
                executed_threads_.end(), std::int64_t(0));
850  
851  
            reset_exec_total = std::accumulate(
852  
                reset_thread_duration_times_.begin(),
853  
                reset_thread_duration_times_.end(),
854  
                std::uint64_t(0));
855  
            reset_num_threads = std::accumulate(
856  
                reset_thread_duration_.begin(),
857  
                reset_thread_duration_.end(),
858  
                std::int64_t(0));
859  
860  
            if (reset)
861  
            {
862  
                std::copy(exec_times_.begin(), exec_times_.end(),
863  
                    reset_thread_duration_times_.begin());
864  
                std::copy(executed_threads_.begin(),
865  
                    executed_threads_.end(),
866  
                    reset_thread_duration_.begin());
867  
            }
868  
        }
869  
870  
        HPX_ASSERT(exec_total >= reset_exec_total);
871  
        HPX_ASSERT(num_threads >= reset_num_threads);
872  
873  
        exec_total -= reset_exec_total;
874  
        num_threads -= reset_num_threads;
875  
876  
        return std::uint64_t(
877  
                (double(exec_total) * timestamp_scale_) / double(num_threads)
878  
            );
879  
    }
880  
881  
    template <typename Scheduler>
882  
    std::int64_t thread_pool<Scheduler>::
883  
        get_thread_phase_overhead(std::size_t num, bool reset)
884  
    {
885  
        std::uint64_t exec_total = 0;
886  
        std::uint64_t tfunc_total = 0;
887  
        std::int64_t num_phases = 0;
888  
889  
        std::uint64_t reset_exec_total = 0;
890  
        std::uint64_t reset_tfunc_total = 0;
891  
        std::int64_t reset_num_phases = 0;
892  
893  
        if (num != std::size_t(-1))
894  
        {
895  
            exec_total = exec_times_[num];
896  
            tfunc_total = tfunc_times_[num];
897  
            num_phases = executed_thread_phases_[num];
898  
899  
            reset_exec_total =  reset_thread_phase_overhead_times_[num];
900  
            reset_tfunc_total = reset_thread_phase_overhead_times_total_[num];
901  
            reset_num_phases =  reset_thread_phase_overhead_[num];
902  
903  
            if (reset)
904  
            {
905  
                reset_thread_phase_overhead_times_[num] = exec_total;
906  
                reset_thread_phase_overhead_times_total_[num] = tfunc_total;
907  
                reset_thread_phase_overhead_[num] = num_phases;
908  
            }
909  
        }
910  
        else
911  
        {
912  
            exec_total = std::accumulate(exec_times_.begin(),
913  
                exec_times_.end(), std::uint64_t(0));
914  
            tfunc_total = std::accumulate(tfunc_times_.begin(),
915  
                tfunc_times_.end(), std::uint64_t(0));
916  
            num_phases = std::accumulate(
917  
                executed_thread_phases_.begin(),
918  
                executed_thread_phases_.end(), std::int64_t(0));
919  
920  
            reset_exec_total = std::accumulate(
921  
                reset_thread_phase_overhead_times_.begin(),
922  
                reset_thread_phase_overhead_times_.end(), std::uint64_t(0));
923  
            reset_tfunc_total = std::accumulate(
924  
                reset_thread_phase_overhead_times_total_.begin(),
925  
                reset_thread_phase_overhead_times_total_.end(),
926  
                std::uint64_t(0));
927  
            reset_num_phases = std::accumulate(
928  
                reset_thread_phase_overhead_.begin(),
929  
                reset_thread_phase_overhead_.end(), std::int64_t(0));
930  
931  
            if (reset)
932  
            {
933  
                std::copy(exec_times_.begin(), exec_times_.end(),
934  
                    reset_thread_phase_overhead_times_.begin());
935  
                std::copy(tfunc_times_.begin(), tfunc_times_.end(),
936  
                    reset_thread_phase_overhead_times_total_.begin());
937  
                std::copy(executed_thread_phases_.begin(),
938  
                    executed_thread_phases_.end(),
939  
                    reset_thread_phase_overhead_.begin());
940  
            }
941  
        }
942  
943  
        HPX_ASSERT(exec_total >= reset_exec_total);
944  
        HPX_ASSERT(tfunc_total >= reset_tfunc_total);
945  
        HPX_ASSERT(num_phases >= reset_num_phases);
946  
947  
        exec_total -= reset_exec_total;
948  
        tfunc_total -= reset_tfunc_total;
949  
        num_phases -= reset_num_phases;
950  
951  
        if (num_phases == 0)        // avoid division by zero
952  
            return 0;
953  
954  
        HPX_ASSERT(tfunc_total >= exec_total);
955  
956  
        return std::uint64_t(
957  
                double((tfunc_total - exec_total) * timestamp_scale_) /
958  
                double(num_phases)
959  
            );
960  
    }
961  
962  
    template <typename Scheduler>
963  
    std::int64_t thread_pool<Scheduler>::
964  
        get_thread_overhead(std::size_t num, bool reset)
965  
    {
966  
        std::uint64_t exec_total = 0;
967  
        std::uint64_t tfunc_total = 0;
968  
        std::int64_t num_threads = 0;
969  
970  
        std::uint64_t reset_exec_total = 0;
971  
        std::uint64_t reset_tfunc_total = 0;
972  
        std::int64_t reset_num_threads = 0;
973  
974  
        if (num != std::size_t(-1))
975  
        {
976  
            exec_total = exec_times_[num];
977  
            tfunc_total = tfunc_times_[num];
978  
            num_threads = executed_threads_[num];
979  
980  
            reset_exec_total =  reset_thread_overhead_times_[num];
981  
            reset_tfunc_total = reset_thread_overhead_times_total_[num];
982  
            reset_num_threads =  reset_thread_overhead_[num];
983  
984  
            if (reset)
985  
            {
986  
                reset_thread_overhead_times_[num] = exec_total;
987  
                reset_thread_overhead_times_total_[num] = tfunc_total;
988  
                reset_thread_overhead_[num] = num_threads;
989  
            }
990  
        }
991  
        else
992  
        {
993  
            exec_total = std::accumulate(exec_times_.begin(),
994  
                exec_times_.end(), std::uint64_t(0));
995  
            tfunc_total = std::accumulate(tfunc_times_.begin(),
996  
                tfunc_times_.end(), std::uint64_t(0));
997  
            num_threads = std::accumulate(executed_threads_.begin(),
998  
                executed_threads_.end(), std::int64_t(0));
999  
1000  
            reset_exec_total = std::accumulate(
1001  
                reset_thread_overhead_times_.begin(),
1002  
                reset_thread_overhead_times_.end(), std::uint64_t(0));
1003  
            reset_tfunc_total = std::accumulate(
1004  
                reset_thread_overhead_times_total_.begin(),
1005  
                reset_thread_overhead_times_total_.end(),
1006  
                std::uint64_t(0));
1007  
            reset_num_threads = std::accumulate(
1008  
                reset_thread_overhead_.begin(),
1009  
                reset_thread_overhead_.end(), std::int64_t(0));
1010  
1011  
            if (reset)
1012  
            {
1013  
                std::copy(exec_times_.begin(), exec_times_.end(),
1014  
                    reset_thread_overhead_times_.begin());
1015  
                std::copy(tfunc_times_.begin(), tfunc_times_.end(),
1016  
                    reset_thread_overhead_times_total_.begin());
1017  
                std::copy(executed_threads_.begin(),
1018  
                    executed_threads_.end(),
1019  
                    reset_thread_overhead_.begin());
1020  
            }
1021  
        }
1022  
1023  
        HPX_ASSERT(exec_total >= reset_exec_total);
1024  
        HPX_ASSERT(tfunc_total >= reset_tfunc_total);
1025  
        HPX_ASSERT(num_threads >= reset_num_threads);
1026  
1027  
        exec_total -= reset_exec_total;
1028  
        tfunc_total -= reset_tfunc_total;
1029  
        num_threads -= reset_num_threads;
1030  
1031  
        if (num_threads == 0)        // avoid division by zero
1032  
            return 0;
1033  
1034  
        HPX_ASSERT(tfunc_total >= exec_total);
1035  
1036  
        return std::uint64_t(
1037  
                double((tfunc_total - exec_total) * timestamp_scale_) /
1038  
                double(num_threads)
1039  
            );
1040  
    }
1041  
1042  
    template <typename Scheduler>
1043  
    std::int64_t thread_pool<Scheduler>::
1044  
        get_cumulative_thread_duration(std::size_t num, bool reset)
1045  
    {
1046  
        std::uint64_t exec_total = 0ul;
1047  
        std::uint64_t reset_exec_total = 0ul;
1048  
1049  
        if (num != std::size_t(-1))
1050  
        {
1051  
            exec_total = exec_times_[num];
1052  
            reset_exec_total = reset_cumulative_thread_duration_[num];
1053  
1054  
            if (reset)
1055  
                reset_cumulative_thread_duration_[num] = exec_total;
1056  
        }
1057  
        else
1058  
        {
1059  
            exec_total = std::accumulate(exec_times_.begin(),
1060  
                exec_times_.end(), std::uint64_t(0));
1061  
            reset_exec_total = std::accumulate(
1062  
                reset_cumulative_thread_duration_.begin(),
1063  
                reset_cumulative_thread_duration_.end(),
1064  
                std::uint64_t(0));
1065  
1066  
            if (reset)
1067  
            {
1068  
                std::copy(exec_times_.begin(), exec_times_.end(),
1069  
                    reset_cumulative_thread_duration_.begin());
1070  
            }
1071  
        }
1072  
1073  
        HPX_ASSERT(exec_total >= reset_exec_total);
1074  
1075  
        exec_total -= reset_exec_total;
1076  
1077  
        return std::uint64_t(double(exec_total) * timestamp_scale_);
1078  
    }
1079  
1080  
    template <typename Scheduler>
1081  
    std::int64_t thread_pool<Scheduler>::
1082  
        get_cumulative_thread_overhead(std::size_t num, bool reset)
1083  
    {
1084  
        std::uint64_t exec_total = 0ul;
1085  
        std::uint64_t reset_exec_total = 0ul;
1086  
        std::uint64_t tfunc_total = 0ul;
1087  
        std::uint64_t reset_tfunc_total = 0ul;
1088  
1089  
        if (num != std::size_t(-1))
1090  
        {
1091  
            exec_total = exec_times_[num];
1092  
            tfunc_total = tfunc_times_[num];
1093  
1094  
            reset_exec_total = reset_cumulative_thread_overhead_[num];
1095  
            reset_tfunc_total = reset_cumulative_thread_overhead_total_[num];
1096  
1097  
            if (reset)
1098  
            {
1099  
                reset_cumulative_thread_overhead_[num] = exec_total;
1100  
                reset_cumulative_thread_overhead_total_[num] = tfunc_total;
1101  
            }
1102  
        }
1103  
        else
1104  
        {
1105  
            exec_total = std::accumulate(exec_times_.begin(),
1106  
                exec_times_.end(), std::uint64_t(0));
1107  
            reset_exec_total = std::accumulate(
1108  
                reset_cumulative_thread_overhead_.begin(),
1109  
                reset_cumulative_thread_overhead_.end(),
1110  
                std::uint64_t(0));
1111  
1112  
            tfunc_total = std::accumulate(tfunc_times_.begin(),
1113  
                tfunc_times_.end(), std::uint64_t(0));
1114  
            reset_tfunc_total = std::accumulate(
1115  
                reset_cumulative_thread_overhead_total_.begin(),
1116  
                reset_cumulative_thread_overhead_total_.end(),
1117  
                std::uint64_t(0));
1118  
1119  
            if (reset)
1120  
            {
1121  
                std::copy(exec_times_.begin(), exec_times_.end(),
1122  
                    reset_cumulative_thread_overhead_.begin());
1123  
                std::copy(tfunc_times_.begin(), tfunc_times_.end(),
1124  
                    reset_cumulative_thread_overhead_total_.begin());
1125  
            }
1126  
        }
1127  
1128  
        HPX_ASSERT(exec_total >= reset_exec_total);
1129  
        HPX_ASSERT(tfunc_total >= reset_tfunc_total);
1130  
1131  
        exec_total -= reset_exec_total;
1132  
        tfunc_total -= reset_tfunc_total;
1133  
1134  
        return std::uint64_t(
1135  
                (double(tfunc_total) - double(exec_total)) * timestamp_scale_
1136  
            );
1137  
    }
1138  
#endif
1139  
#endif
1140  
1141  
    template <typename Scheduler>
1142  
    std::int64_t thread_pool<Scheduler>::
1143  
        get_cumulative_duration(std::size_t num, bool reset)
1144  
    {
1145  
        std::uint64_t tfunc_total = 0ul;
1146  
        std::uint64_t reset_tfunc_total = 0ul;
1147  
1148  
        if (num != std::size_t(-1))
1149  
        {
1150  
            tfunc_total = tfunc_times_[num];
1151  
            reset_tfunc_total = reset_tfunc_times_[num];
1152  
1153  
            if (reset)
1154  
                reset_tfunc_times_[num] = tfunc_total;
1155  
        }
1156  
        else
1157  
        {
1158  
            tfunc_total = std::accumulate(tfunc_times_.begin(),
1159  
                tfunc_times_.end(), std::uint64_t(0));
1160  
            reset_tfunc_total = std::accumulate(
1161  
                reset_tfunc_times_.begin(), reset_tfunc_times_.end(),
1162  
                std::uint64_t(0));
1163  
1164  
            if (reset)
1165  
            {
1166  
                std::copy(tfunc_times_.begin(), tfunc_times_.end(),
1167  
                    reset_tfunc_times_.begin());
1168  
            }
1169  
        }
1170  
1171  
        HPX_ASSERT(tfunc_total >= reset_tfunc_total);
1172  
1173  
        tfunc_total -= reset_tfunc_total;
1174  
1175  
        return std::uint64_t(double(tfunc_total) * timestamp_scale_);
1176  
    }
1177  
1178  
#if defined(HPX_HAVE_THREAD_IDLE_RATES)
1179  
    ///////////////////////////////////////////////////////////////////////////
1180  
    template <typename Scheduler>
1181  
    std::int64_t thread_pool<Scheduler>::avg_idle_rate(bool reset)
1182  
    {
1183  
        std::uint64_t exec_total = std::accumulate(exec_times_.begin(),
1184  
            exec_times_.end(), std::uint64_t(0));
1185  
        std::uint64_t tfunc_total = std::accumulate(tfunc_times_.begin(),
1186  
            tfunc_times_.end(), std::uint64_t(0));
1187  
        std::uint64_t reset_exec_total = std::accumulate(
1188  
            reset_idle_rate_time_.begin(),
1189  
            reset_idle_rate_time_.end(), std::uint64_t(0));
1190  
        std::uint64_t reset_tfunc_total = std::accumulate(
1191  
            reset_idle_rate_time_total_.begin(),
1192  
            reset_idle_rate_time_total_.end(), std::uint64_t(0));
1193  
1194  
        if (reset)
1195  
        {
1196  
            std::copy(exec_times_.begin(), exec_times_.end(),
1197  
                reset_idle_rate_time_.begin());
1198  
            std::copy(tfunc_times_.begin(), tfunc_times_.end(),
1199  
                reset_idle_rate_time_total_.begin());
1200  
        }
1201  
1202  
        HPX_ASSERT(exec_total >= reset_exec_total);
1203  
        HPX_ASSERT(tfunc_total >= reset_tfunc_total);
1204  
1205  
        exec_total -= reset_exec_total;
1206  
        tfunc_total -= reset_tfunc_total;
1207  
1208  
        if (tfunc_total == 0)   // avoid division by zero
1209  
            return 10000LL;
1210  
1211  
        HPX_ASSERT(tfunc_total >= exec_total);
1212  
1213  
        double const percent = 1. - (double(exec_total) / double(tfunc_total));
1214  
        return std::int64_t(10000. * percent);   // 0.01 percent
1215  
    }
1216  
1217  
    template <typename Scheduler>
1218  
    std::int64_t thread_pool<Scheduler>::avg_idle_rate(
1219  
        std::size_t num_thread, bool reset)
1220  
    {
1221  
        std::uint64_t exec_time = exec_times_[num_thread];
1222  
        std::uint64_t tfunc_time = tfunc_times_[num_thread];
1223  
        std::uint64_t reset_exec_time = reset_idle_rate_time_[num_thread];
1224  
        std::uint64_t reset_tfunc_time = reset_idle_rate_time_total_[num_thread];
1225  
1226  
        if (reset)
1227  
        {
1228  
            reset_idle_rate_time_[num_thread] = exec_time;
1229  
            reset_idle_rate_time_total_[num_thread] = tfunc_time;
1230  
        }
1231  
1232  
        HPX_ASSERT(exec_time >= reset_exec_time);
1233  
        HPX_ASSERT(tfunc_time >= reset_tfunc_time);
1234  
1235  
        exec_time -= reset_exec_time;
1236  
        tfunc_time -= reset_tfunc_time;
1237  
1238  
        if (tfunc_time == 0)   // avoid division by zero
1239  
            return 10000LL;
1240  
1241  
        HPX_ASSERT(tfunc_time > exec_time);
1242  
1243  
        double const percent = 1. - (double(exec_time) / double(tfunc_time));
1244  
        return std::int64_t(10000. * percent);   // 0.01 percent
1245  
    }
1246  
1247  
#if defined(HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES)
1248  
    template <typename Scheduler>
1249  
    std::int64_t thread_pool<Scheduler>::avg_creation_idle_rate(bool reset)
1250  
    {
1251  
        double const creation_total =
1252  
            static_cast<double>(sched_.get_creation_time(reset));
1253  
1254  
        std::uint64_t exec_total = std::accumulate(exec_times_.begin(),
1255  
            exec_times_.end(), std::uint64_t(0));
1256  
        std::uint64_t tfunc_total = std::accumulate(tfunc_times_.begin(),
1257  
            tfunc_times_.end(), std::uint64_t(0));
1258  
        std::uint64_t reset_exec_total = std::accumulate(
1259  
            reset_creation_idle_rate_time_.begin(),
1260  
            reset_creation_idle_rate_time_.end(), std::uint64_t(0));
1261  
        std::uint64_t reset_tfunc_total = std::accumulate(
1262  
            reset_creation_idle_rate_time_total_.begin(),
1263  
            reset_creation_idle_rate_time_total_.end(), std::uint64_t(0));
1264  
1265  
        if (reset)
1266  
        {
1267  
            std::copy(exec_times_.begin(), exec_times_.end(),
1268  
                reset_creation_idle_rate_time_.begin());
1269  
            std::copy(tfunc_times_.begin(), tfunc_times_.end(),
1270  
                reset_creation_idle_rate_time_.begin());
1271  
        }
1272  
1273  
        HPX_ASSERT(exec_total >= reset_exec_total);
1274  
        HPX_ASSERT(tfunc_total >= reset_tfunc_total);
1275  
1276  
        exec_total -= reset_exec_total;
1277  
        tfunc_total -= reset_tfunc_total;
1278  
1279  
        if (tfunc_total == exec_total)   // avoid division by zero
1280  
            return 10000LL;
1281  
1282  
        HPX_ASSERT(tfunc_total > exec_total);
1283  
1284  
        double const percent = (creation_total / double(tfunc_total - exec_total));
1285  
        return std::int64_t(10000. * percent);    // 0.01 percent
1286  
    }
1287  
1288  
    template <typename Scheduler>
1289  
    std::int64_t thread_pool<Scheduler>::avg_cleanup_idle_rate(bool reset)
1290  
    {
1291  
        double const cleanup_total =
1292  
            static_cast<double>(sched_.get_cleanup_time(reset));
1293  
1294  
        std::uint64_t exec_total = std::accumulate(exec_times_.begin(),
1295  
            exec_times_.end(), std::uint64_t(0));
1296  
        std::uint64_t tfunc_total = std::accumulate(tfunc_times_.begin(),
1297  
            tfunc_times_.end(), std::uint64_t(0));
1298  
        std::uint64_t reset_exec_total = std::accumulate(
1299  
            reset_cleanup_idle_rate_time_.begin(),
1300  
            reset_cleanup_idle_rate_time_.end(), std::uint64_t(0));
1301  
        std::uint64_t reset_tfunc_total = std::accumulate(
1302  
            reset_cleanup_idle_rate_time_total_.begin(),
1303  
            reset_cleanup_idle_rate_time_total_.end(), std::uint64_t(0));
1304  
1305  
        if (reset)
1306  
        {
1307  
            std::copy(exec_times_.begin(), exec_times_.end(),
1308  
                reset_cleanup_idle_rate_time_.begin());
1309  
            std::copy(tfunc_times_.begin(), tfunc_times_.end(),
1310  
                reset_cleanup_idle_rate_time_.begin());
1311  
        }
1312  
1313  
        HPX_ASSERT(exec_total >= reset_exec_total);
1314  
        HPX_ASSERT(tfunc_total >= reset_tfunc_total);
1315  
1316  
        exec_total -= reset_exec_total;
1317  
        tfunc_total -= reset_tfunc_total;
1318  
1319  
        if (tfunc_total == exec_total)   // avoid division by zero
1320  
            return 10000LL;
1321  
1322  
        HPX_ASSERT(tfunc_total > exec_total);
1323  
1324  
        double const percent = (cleanup_total / double(tfunc_total - exec_total));
1325  
        return std::int64_t(10000. * percent);    // 0.01 percent
1326  
    }
1327  
#endif
1328  
#endif
1329  
1330  
    template <typename Scheduler>
1331  
    std::int64_t thread_pool<Scheduler>::
1332  
        get_queue_length(std::size_t num_thread) const
1333  
    {
1334  
        return sched_.Scheduler::get_queue_length(num_thread);
1335  
    }
1336  
1337  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
1338  
    template <typename Scheduler>
1339  
    std::int64_t thread_pool<Scheduler>::
1340  
        get_average_thread_wait_time(std::size_t num_thread) const
1341  
    {
1342  
        return sched_.Scheduler::get_average_thread_wait_time(num_thread);
1343  
    }
1344  
1345  
    template <typename Scheduler>
1346  
    std::int64_t thread_pool<Scheduler>::
1347  
        get_average_task_wait_time(std::size_t num_thread) const
1348  
    {
1349  
        return sched_.Scheduler::get_average_task_wait_time(num_thread);
1350  
    }
1351  
#endif
1352  
1353  
#ifdef HPX_HAVE_THREAD_STEALING_COUNTS
1354  
    template <typename Scheduler>
1355  
    std::int64_t thread_pool<Scheduler>::
1356  
        get_num_pending_misses(std::size_t num, bool reset)
1357  
    {
1358  
        return sched_.Scheduler::get_num_pending_misses(num, reset);
1359  
    }
1360  
1361  
    template <typename Scheduler>
1362  
    std::int64_t thread_pool<Scheduler>::
1363  
        get_num_pending_accesses(std::size_t num, bool reset)
1364  
    {
1365  
        return sched_.Scheduler::get_num_pending_accesses(num, reset);
1366  
    }
1367  
1368  
    template <typename Scheduler>
1369  
    std::int64_t thread_pool<Scheduler>::
1370  
        get_num_stolen_from_pending(std::size_t num, bool reset)
1371  
    {
1372  
        return sched_.Scheduler::get_num_stolen_from_pending(num, reset);
1373  
    }
1374  
1375  
    template <typename Scheduler>
1376  
    std::int64_t thread_pool<Scheduler>::
1377  
        get_num_stolen_to_pending(std::size_t num, bool reset)
1378  
    {
1379  
        return sched_.Scheduler::get_num_stolen_to_pending(num, reset);
1380  
    }
1381  
1382  
    template <typename Scheduler>
1383  
    std::int64_t thread_pool<Scheduler>::
1384  
        get_num_stolen_from_staged(std::size_t num, bool reset)
1385  
    {
1386  
        return sched_.Scheduler::get_num_stolen_from_staged(num, reset);
1387  
    }
1388  
1389  
    template <typename Scheduler>
1390  
    std::int64_t thread_pool<Scheduler>::
1391  
        get_num_stolen_to_staged(std::size_t num, bool reset)
1392  
    {
1393  
        return sched_.Scheduler::get_num_stolen_to_staged(num, reset);
1394  
    }
1395  
#endif
1396  
1397  
}}}
1398  
1399  
///////////////////////////////////////////////////////////////////////////////
1400  
/// explicit template instantiation for the thread manager of our choice
1401  
#if defined(HPX_HAVE_THROTTLE_SCHEDULER) && defined(HPX_HAVE_APEX)
1402  
#include <hpx/runtime/threads/policies/throttle_queue_scheduler.hpp>
1403  
template class HPX_EXPORT hpx::threads::detail::thread_pool<
1404  
    hpx::threads::policies::throttle_queue_scheduler<> >;
1405  
#endif
1406  
1407  
#if defined(HPX_HAVE_LOCAL_SCHEDULER)
1408  
#include <hpx/runtime/threads/policies/local_queue_scheduler.hpp>
1409  
template class HPX_EXPORT hpx::threads::detail::thread_pool<
1410  
    hpx::threads::policies::local_queue_scheduler<> >;
1411  
#endif
1412  
1413  
#if defined(HPX_HAVE_STATIC_SCHEDULER)
1414  
#include <hpx/runtime/threads/policies/static_queue_scheduler.hpp>
1415  
template class HPX_EXPORT hpx::threads::detail::thread_pool<
1416  
    hpx::threads::policies::static_queue_scheduler<> >;
1417  
#endif
1418  
1419  
#if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
1420  
#include <hpx/runtime/threads/policies/static_priority_queue_scheduler.hpp>
1421  
template class HPX_EXPORT hpx::threads::detail::thread_pool<
1422  
    hpx::threads::policies::static_priority_queue_scheduler<> >;
1423  
#endif
1424  
1425  
#include <hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp>
1426  
template class HPX_EXPORT hpx::threads::detail::thread_pool<
1427  
    hpx::threads::policies::local_priority_queue_scheduler<> >;
1428  
1429  
#if defined(HPX_HAVE_ABP_SCHEDULER)
1430  
template class HPX_EXPORT hpx::threads::detail::thread_pool<
1431  
    hpx::threads::policies::abp_fifo_priority_queue_scheduler>;
1432  
#endif
1433  
1434  
#if defined(HPX_HAVE_HIERARCHY_SCHEDULER)
1435  
#include <hpx/runtime/threads/policies/hierarchy_scheduler.hpp>
1436  
template class HPX_EXPORT hpx::threads::detail::thread_pool<
1437  
    hpx::threads::policies::hierarchy_scheduler<> >;
1438  
#endif
1439  
1440  
#if defined(HPX_HAVE_PERIODIC_PRIORITY_SCHEDULER)
1441  
#include <hpx/runtime/threads/policies/periodic_priority_queue_scheduler.hpp>
1442  
template class HPX_EXPORT hpx::threads::detail::thread_pool<
1443  
    hpx::threads::policies::periodic_priority_queue_scheduler<> >;
1444  
#endif
1445  
1446  

Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.