/home/users/khuck/src/hpx-lsu/hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp

Line% of fetchesSource
1  
//  Copyright (c) 2007-2016 Hartmut Kaiser
2  
//  Copyright (c) 2011      Bryce Lelbach
3  
//
4  
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
7  
#if !defined(HPX_THREADMANAGER_SCHEDULING_LOCAL_PRIORITY_QUEUE_MAR_15_2011_0926AM)
8  
#define HPX_THREADMANAGER_SCHEDULING_LOCAL_PRIORITY_QUEUE_MAR_15_2011_0926AM
9  
10  
#include <hpx/config.hpp>
11  
#include <hpx/runtime/threads/policies/affinity_data.hpp>
12  
#include <hpx/runtime/threads/policies/scheduler_base.hpp>
13  
#include <hpx/runtime/threads/policies/thread_queue.hpp>
14  
#include <hpx/runtime/threads/thread_data.hpp>
15  
#include <hpx/runtime/threads/topology.hpp>
16  
#include <hpx/runtime/threads_fwd.hpp>
17  
#include <hpx/throw_exception.hpp>
18  
#include <hpx/util/logging.hpp>
19  
#include <hpx/util_fwd.hpp>
20  
21  
#include <boost/atomic.hpp>
22  
#include <boost/exception_ptr.hpp>
23  
24  
#include <cstddef>
25  
#include <cstdint>
26  
#include <memory>
27  
#include <string>
28  
#include <type_traits>
29  
#include <vector>
30  
31  
#include <hpx/config/warnings_prefix.hpp>
32  
33  
// TODO: add branch prediction and function heat
34  
35  
///////////////////////////////////////////////////////////////////////////////
36  
namespace hpx { namespace threads { namespace policies
37  
{
38  
#ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION
39  
    ///////////////////////////////////////////////////////////////////////////
40  
    // We globally control whether to do minimal deadlock detection using this
41  
    // global bool variable. It will be set once by the runtime configuration
42  
    // startup code
43  
    extern bool minimal_deadlock_detection;
44  
#endif
45  
46  
    ///////////////////////////////////////////////////////////////////////////
47  
    /// The local_priority_queue_scheduler maintains exactly one queue of work
48  
    /// items (threads) per OS thread, where this OS thread pulls its next work
49  
    /// from. Additionally it maintains separate queues: several for high
50  
    /// priority threads and one for low priority threads.
51  
    /// High priority threads are executed by the first N OS threads before any
52  
    /// other work is executed. Low priority threads are executed by the last
53  
    /// OS thread whenever no other work is available.
54  
    template <typename Mutex
55  
            , typename PendingQueuing
56  
            , typename StagedQueuing
57  
            , typename TerminatedQueuing
58  
             >
59  
    class local_priority_queue_scheduler : public scheduler_base
60  
    {
61  
    protected:
62  
        // The maximum number of active threads this thread manager should
63  
        // create. This number will be a constraint only as long as the work
64  
        // items queue is not empty. Otherwise the number of active threads
65  
        // will be incremented in steps equal to the \a min_add_new_count
66  
        // specified above.
67  
        // FIXME: this is specified both here, and in thread_queue.
68  
        enum { max_thread_count = 1000 };
69  
70  
    public:
71  
        typedef std::false_type has_periodic_maintenance;
72  
73  
        typedef thread_queue<
74  
            Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing
75  
        > thread_queue_type;
76  
77  
        // the scheduler type takes two initialization parameters:
78  
        //    the number of queues
79  
        //    the number of high priority queues
80  
        //    the maxcount per queue
81  
        struct init_parameter
82  
        {
83  
            init_parameter()
84  
              : num_queues_(1),
85  
                num_high_priority_queues_(1),
86  
                max_queue_thread_count_(max_thread_count),
87  
                numa_sensitive_(0),
88  
                description_("local_priority_queue_scheduler")
89  
            {}
90  
91  
            init_parameter(std::size_t num_queues,
92  
                    std::size_t num_high_priority_queues = std::size_t(-1),
93  
                    std::size_t max_queue_thread_count = max_thread_count,
94  
                    std::size_t numa_sensitive = 0,
95  
                    char const* description = "local_priority_queue_scheduler")
96  
              : num_queues_(num_queues),
97  
                num_high_priority_queues_(
98  
                    num_high_priority_queues == std::size_t(-1) ?
99  
                        num_queues : num_high_priority_queues),
100  
                max_queue_thread_count_(max_queue_thread_count),
101  
                numa_sensitive_(numa_sensitive),
102  
                description_(description)
103  
            {}
104  
105  
            init_parameter(std::size_t num_queues, char const* description)
106  
              : num_queues_(num_queues),
107  
                num_high_priority_queues_(num_queues),
108  
                max_queue_thread_count_(max_thread_count),
109  
                numa_sensitive_(false),
110  
                description_(description)
111  
            {}
112  
113  
            std::size_t num_queues_;
114  
            std::size_t num_high_priority_queues_;
115  
            std::size_t max_queue_thread_count_;
116  
            std::size_t numa_sensitive_;
117  
            char const* description_;
118  
        };
119  
        typedef init_parameter init_parameter_type;
120  
121  
        local_priority_queue_scheduler(init_parameter_type const& init,
122  
                bool deferred_initialization = true)
123  
          : scheduler_base(init.num_queues_, init.description_),
124  
            max_queue_thread_count_(init.max_queue_thread_count_),
125  
            queues_(init.num_queues_),
126  
            high_priority_queues_(init.num_high_priority_queues_),
127  
            low_priority_queue_(init.max_queue_thread_count_),
128  
            curr_queue_(0),
129  
            numa_sensitive_(init.numa_sensitive_),
130  
#if !defined(HPX_NATIVE_MIC)        // we know that the MIC has one NUMA domain only
131  
            steals_in_numa_domain_(),
132  
            steals_outside_numa_domain_(),
133  
#endif
134  
#if !defined(HPX_HAVE_MORE_THAN_64_THREADS) || defined(HPX_HAVE_MAX_CPU_COUNT)
135  
            numa_domain_masks_(init.num_queues_),
136  
            outside_numa_domain_masks_(init.num_queues_)
137  
#else
138  
            numa_domain_masks_(init.num_queues_, topology_.get_machine_affinity_mask()),
139  
            outside_numa_domain_masks_(init.num_queues_,
140  
                topology_.get_machine_affinity_mask())
141  
#endif
142  
        {
143  
#if !defined(HPX_NATIVE_MIC)        // we know that the MIC has one NUMA domain only
144  
            resize(steals_in_numa_domain_, init.num_queues_);
145  
            resize(steals_outside_numa_domain_, init.num_queues_);
146  
#endif
147  
            if (!deferred_initialization)
148  
            {
149  
                BOOST_ASSERT(init.num_queues_ != 0);
150  
                for (std::size_t i = 0; i < init.num_queues_; ++i)
151  
                    queues_[i] = new thread_queue_type(init.max_queue_thread_count_);
152  
153  
                BOOST_ASSERT(init.num_high_priority_queues_ != 0);
154  
                BOOST_ASSERT(init.num_high_priority_queues_ <= init.num_queues_);
155  
                for (std::size_t i = 0; i < init.num_high_priority_queues_; ++i) {
156  
                    high_priority_queues_[i] =
157  
                        new thread_queue_type(init.max_queue_thread_count_);
158  
                }
159  
            }
160  
        }
161  
162  
        virtual ~local_priority_queue_scheduler()
163  
        {
164  
            for (std::size_t i = 0; i != queues_.size(); ++i)
165  
                delete queues_[i];
166  
            for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
167  
                delete high_priority_queues_[i];
168  
        }
169  
170  
        bool numa_sensitive() const { return numa_sensitive_ != 0; }
171  
172  
        static std::string get_scheduler_name()
173  
        {
174  
            return "local_priority_queue_scheduler";
175  
        }
176  
177  
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
178  
        std::uint64_t get_creation_time(bool reset)
179  
        {
180  
            std::uint64_t time = 0;
181  
182  
            for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
183  
                time += high_priority_queues_[i]->get_creation_time(reset);
184  
185  
            time += low_priority_queue_.get_creation_time(reset);
186  
187  
            for (std::size_t i = 0; i != queues_.size(); ++i)
188  
                time += queues_[i]->get_creation_time(reset);
189  
190  
            return time;
191  
        }
192  
193  
        std::uint64_t get_cleanup_time(bool reset)
194  
        {
195  
            std::uint64_t time = 0;
196  
197  
            for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
198  
                time += high_priority_queues_[i]->
199  
                    get_cleanup_time(reset);
200  
201  
            time += low_priority_queue_.get_cleanup_time(reset);
202  
203  
            for (std::size_t i = 0; i != queues_.size(); ++i)
204  
                time += queues_[i]->get_cleanup_time(reset);
205  
206  
            return time;
207  
        }
208  
#endif
209  
210  
#ifdef HPX_HAVE_THREAD_STEALING_COUNTS
211  
        std::int64_t get_num_pending_misses(std::size_t num_thread, bool reset)
212  
        {
213  
            std::int64_t num_pending_misses = 0;
214  
            if (num_thread == std::size_t(-1))
215  
            {
216  
                for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
217  
                    num_pending_misses += high_priority_queues_[i]->
218  
                        get_num_pending_misses(reset);
219  
220  
                for (std::size_t i = 0; i != queues_.size(); ++i)
221  
                    num_pending_misses += queues_[i]->
222  
                        get_num_pending_misses(reset);
223  
224  
                num_pending_misses += low_priority_queue_.
225  
                    get_num_pending_misses(reset);
226  
227  
                return num_pending_misses;
228  
            }
229  
230  
            num_pending_misses += queues_[num_thread]->
231  
                get_num_pending_misses(reset);
232  
233  
            if (num_thread < high_priority_queues_.size())
234  
            {
235  
                num_pending_misses += high_priority_queues_[num_thread]->
236  
                    get_num_pending_misses(reset);
237  
            }
238  
            if (num_thread == 0)
239  
            {
240  
                num_pending_misses += low_priority_queue_.
241  
                    get_num_pending_misses(reset);
242  
            }
243  
            return num_pending_misses;
244  
        }
245  
246  
        std::int64_t get_num_pending_accesses(std::size_t num_thread, bool reset)
247  
        {
248  
            std::int64_t num_pending_accesses = 0;
249  
            if (num_thread == std::size_t(-1))
250  
            {
251  
                for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
252  
                    num_pending_accesses += high_priority_queues_[i]->
253  
                        get_num_pending_accesses(reset);
254  
255  
                for (std::size_t i = 0; i != queues_.size(); ++i)
256  
                    num_pending_accesses += queues_[i]->
257  
                        get_num_pending_accesses(reset);
258  
259  
                num_pending_accesses += low_priority_queue_.
260  
                    get_num_pending_accesses(reset);
261  
262  
                return num_pending_accesses;
263  
            }
264  
265  
            num_pending_accesses += queues_[num_thread]->
266  
                get_num_pending_accesses(reset);
267  
268  
            if (num_thread < high_priority_queues_.size())
269  
            {
270  
                num_pending_accesses += high_priority_queues_[num_thread]->
271  
                    get_num_pending_accesses(reset);
272  
            }
273  
            if (num_thread == 0)
274  
            {
275  
                num_pending_accesses += low_priority_queue_.
276  
                    get_num_pending_accesses(reset);
277  
            }
278  
            return num_pending_accesses;
279  
        }
280  
281  
        std::int64_t get_num_stolen_from_pending(std::size_t num_thread, bool reset)
282  
        {
283  
            std::int64_t num_stolen_threads = 0;
284  
            if (num_thread == std::size_t(-1))
285  
            {
286  
                for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
287  
                    num_stolen_threads += high_priority_queues_[i]->
288  
                        get_num_stolen_from_pending(reset);
289  
290  
                for (std::size_t i = 0; i != queues_.size(); ++i)
291  
                    num_stolen_threads += queues_[i]->
292  
                        get_num_stolen_from_pending(reset);
293  
294  
                num_stolen_threads += low_priority_queue_.
295  
                    get_num_stolen_from_pending(reset);
296  
297  
                return num_stolen_threads;
298  
            }
299  
300  
            num_stolen_threads += queues_[num_thread]->
301  
                get_num_stolen_from_pending(reset);
302  
303  
            if (num_thread < high_priority_queues_.size())
304  
            {
305  
                num_stolen_threads += high_priority_queues_[num_thread]->
306  
                    get_num_stolen_from_pending(reset);
307  
            }
308  
            if (num_thread == 0)
309  
            {
310  
                num_stolen_threads += low_priority_queue_.
311  
                    get_num_stolen_from_pending(reset);
312  
            }
313  
            return num_stolen_threads;
314  
        }
315  
316  
        std::int64_t get_num_stolen_to_pending(std::size_t num_thread, bool reset)
317  
        {
318  
            std::int64_t num_stolen_threads = 0;
319  
            if (num_thread == std::size_t(-1))
320  
            {
321  
                for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
322  
                    num_stolen_threads += high_priority_queues_[i]->
323  
                        get_num_stolen_to_pending(reset);
324  
325  
                for (std::size_t i = 0; i != queues_.size(); ++i)
326  
                    num_stolen_threads += queues_[i]->
327  
                        get_num_stolen_to_pending(reset);
328  
329  
                num_stolen_threads += low_priority_queue_.
330  
                    get_num_stolen_to_pending(reset);
331  
332  
                return num_stolen_threads;
333  
            }
334  
335  
            num_stolen_threads += queues_[num_thread]->
336  
                get_num_stolen_to_pending(reset);
337  
338  
            if (num_thread < high_priority_queues_.size())
339  
            {
340  
                num_stolen_threads += high_priority_queues_[num_thread]->
341  
                    get_num_stolen_to_pending(reset);
342  
            }
343  
            if (num_thread == 0)
344  
            {
345  
                num_stolen_threads += low_priority_queue_.
346  
                    get_num_stolen_to_pending(reset);
347  
            }
348  
            return num_stolen_threads;
349  
        }
350  
351  
        std::int64_t get_num_stolen_from_staged(std::size_t num_thread, bool reset)
352  
        {
353  
            std::int64_t num_stolen_threads = 0;
354  
            if (num_thread == std::size_t(-1))
355  
            {
356  
                for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
357  
                    num_stolen_threads += high_priority_queues_[i]->
358  
                        get_num_stolen_from_staged(reset);
359  
360  
                for (std::size_t i = 0; i != queues_.size(); ++i)
361  
                    num_stolen_threads += queues_[i]->
362  
                        get_num_stolen_from_staged(reset);
363  
364  
                num_stolen_threads += low_priority_queue_.
365  
                    get_num_stolen_from_staged(reset);
366  
367  
                return num_stolen_threads;
368  
            }
369  
370  
            num_stolen_threads += queues_[num_thread]->
371  
                get_num_stolen_from_staged(reset);
372  
373  
            if (num_thread < high_priority_queues_.size())
374  
            {
375  
                num_stolen_threads += high_priority_queues_[num_thread]->
376  
                    get_num_stolen_from_staged(reset);
377  
            }
378  
            if (num_thread == 0)
379  
            {
380  
                num_stolen_threads += low_priority_queue_.
381  
                    get_num_stolen_from_staged(reset);
382  
            }
383  
            return num_stolen_threads;
384  
        }
385  
386  
        std::int64_t get_num_stolen_to_staged(std::size_t num_thread, bool reset)
387  
        {
388  
            std::int64_t num_stolen_threads = 0;
389  
            if (num_thread == std::size_t(-1))
390  
            {
391  
                for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
392  
                    num_stolen_threads += high_priority_queues_[i]->
393  
                        get_num_stolen_to_staged(reset);
394  
395  
                for (std::size_t i = 0; i != queues_.size(); ++i)
396  
                    num_stolen_threads += queues_[i]->
397  
                        get_num_stolen_to_staged(reset);
398  
399  
                num_stolen_threads += low_priority_queue_.
400  
                    get_num_stolen_to_staged(reset);
401  
402  
                return num_stolen_threads;
403  
            }
404  
405  
            num_stolen_threads += queues_[num_thread]->
406  
                get_num_stolen_to_staged(reset);
407  
408  
            if (num_thread < high_priority_queues_.size())
409  
            {
410  
                num_stolen_threads += high_priority_queues_[num_thread]->
411  
                    get_num_stolen_to_staged(reset);
412  
            }
413  
            if (num_thread == 0)
414  
            {
415  
                num_stolen_threads += low_priority_queue_.
416  
                    get_num_stolen_to_staged(reset);
417  
            }
418  
            return num_stolen_threads;
419  
        }
420  
#endif
421  
422  
        ///////////////////////////////////////////////////////////////////////
423  
        void abort_all_suspended_threads()
424  
        {
425  
            for (std::size_t i = 0; i != queues_.size(); ++i)
426  
                queues_[i]->abort_all_suspended_threads();
427  
428  
            for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
429  
                high_priority_queues_[i]->abort_all_suspended_threads();
430  
431  
            low_priority_queue_.abort_all_suspended_threads();
432  
        }
433  
434  
        ///////////////////////////////////////////////////////////////////////
435  
        bool cleanup_terminated(bool delete_all = false)
436  
        {
437  
            bool empty = true;
438  
            for (std::size_t i = 0; i != queues_.size(); ++i)
439  
                empty = queues_[i]->cleanup_terminated(delete_all) && empty;
440  
            if (!delete_all)
441  
                return empty;
442  
443  
            for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
444  
                empty = high_priority_queues_[i]->
445  
                    cleanup_terminated(delete_all) && empty;
446  
447  
            empty = low_priority_queue_.cleanup_terminated(delete_all) && empty;
448  
            return empty;
449  
        }
450  
451  
        ///////////////////////////////////////////////////////////////////////
452  
        // create a new thread and schedule it if the initial state is equal to
453  
        // pending
454  
        void create_thread(thread_init_data& data, thread_id_type* id,
455  
            thread_state_enum initial_state, bool run_now, error_code& ec,
456  
            std::size_t num_thread)
457  
        {
458  
#ifdef HPX_HAVE_THREAD_TARGET_ADDRESS
459  
//             // try to figure out the NUMA node where the data lives
460  
//             if (numa_sensitive_ && std::size_t(-1) == num_thread) {
461  
//                 mask_cref_type mask =
462  
//                     topology_.get_thread_affinity_mask_from_lva(data.lva);
463  
//                 if (any(mask)) {
464  
//                     num_thread = find_first(mask);
465  
//                 }
466  
//             }
467  
#endif
468  
            std::size_t queue_size = queues_.size();
469  
470  
            if (std::size_t(-1) == num_thread)
471  
                num_thread = curr_queue_++ % queue_size;
472  
473  
            if (num_thread >= queue_size)
474  
                num_thread %= queue_size;
475  
476  
            // now create the thread
477  
            if (data.priority == thread_priority_critical) {
478  
                std::size_t num = num_thread % high_priority_queues_.size();
479  
                high_priority_queues_[num]->create_thread(data, id,
480  
                    initial_state, run_now, ec);
481  
                return;
482  
            }
483  
484  
            if (data.priority == thread_priority_boost) {
485  
                data.priority = thread_priority_normal;
486  
                std::size_t num = num_thread % high_priority_queues_.size();
487  
                high_priority_queues_[num]->create_thread(data, id,
488  
                    initial_state, run_now, ec);
489  
                return;
490  
            }
491  
492  
            if (data.priority == thread_priority_low) {
493  
                low_priority_queue_.create_thread(data, id, initial_state,
494  
                    run_now, ec);
495  
                return;
496  
            }
497  
498  
            HPX_ASSERT(num_thread < queue_size);
499  
            queues_[num_thread]->create_thread(data, id, initial_state,
500  
                run_now, ec);
501  
        }
502  
503  
        /// Return the next thread to be executed, return false if none is
504  
        /// available
505  
        virtual bool get_next_thread(std::size_t num_thread,
506  
            std::int64_t& idle_loop_count, threads::thread_data*& thrd)
507  
        {
508  
            std::size_t queues_size = queues_.size();
509  
            std::size_t high_priority_queues = high_priority_queues_.size();
510  
511  
            HPX_ASSERT(num_thread < queues_size);
512  
            thread_queue_type* this_high_priority_queue = nullptr;
513  
            thread_queue_type* this_queue = queues_[num_thread];
514  
515  
            if (num_thread < high_priority_queues)
516  
            {
517  
                this_high_priority_queue = high_priority_queues_[num_thread];
518  
                bool result = this_high_priority_queue->get_next_thread(thrd);
519  
520  
                this_high_priority_queue->increment_num_pending_accesses();
521  
                if (result)
522  
                    return true;
523  
                this_high_priority_queue->increment_num_pending_misses();
524  
            }
525  
526  
            {
527  
                bool result = this_queue->get_next_thread(thrd);
528  
529  
                this_queue->increment_num_pending_accesses();
530  
                if (result)
531  
                    return true;
532  
                this_queue->increment_num_pending_misses();
533  
534  
                bool have_staged = this_queue->
535  
                    get_staged_queue_length(boost::memory_order_relaxed) != 0;
536  
537  
                // Give up, we should have work to convert.
538  
                if (have_staged)
539  
                    return false;
540  
            }
541  
542  
            if (numa_sensitive_ != 0)   // limited or no stealing across domains
543  
            {
544  
545  
                // steal thread from other queue of same NUMA domain
546  
                std::size_t pu_number = get_pu_num(num_thread);
547  
#if !defined(HPX_NATIVE_MIC)        // we know that the MIC has one NUMA domain only
548  
                if (test(steals_in_numa_domain_, pu_number)) //-V600 //-V111
549  
#endif
550  
                {
551  
                    mask_cref_type this_numa_domain =
552  
                        numa_domain_masks_[num_thread];
553  
                    for (std::size_t i = 1; i != queues_size; ++i)
554  
                    {
555  
                        // FIXME: Do a better job here.
556  
                        std::size_t const idx = (i + num_thread) % queues_size;
557  
558  
                        HPX_ASSERT(idx != num_thread);
559  
560  
                        std::size_t pu_num = get_pu_num(idx);
561  
                        if (!test(this_numa_domain, pu_num)) //-V560 //-V600 //-V111
562  
                            continue;
563  
564  
                        if (idx < high_priority_queues &&
565  
                            num_thread < high_priority_queues)
566  
                        {
567  
                            thread_queue_type* q = high_priority_queues_[idx];
568  
                            if (q->get_next_thread(thrd))
569  
                            {
570  
                                q->increment_num_stolen_from_pending();
571  
                                this_high_priority_queue->
572  
                                    increment_num_stolen_to_pending();
573  
                                return true;
574  
                            }
575  
                        }
576  
577  
                        if (queues_[idx]->get_next_thread(thrd))
578  
                        {
579  
                            queues_[idx]->increment_num_stolen_from_pending();
580  
                            this_queue->increment_num_stolen_to_pending();
581  
                            return true;
582  
                        }
583  
                    }
584  
                }
585  
586  
#if !defined(HPX_NATIVE_MIC)        // we know that the MIC has one NUMA domain only
587  
                // if nothing found, ask everybody else
588  
                if (test(steals_outside_numa_domain_, pu_number)) //-V600 //-V111
589  
                {
590  
                    mask_cref_type numa_domain =
591  
                        outside_numa_domain_masks_[num_thread];
592  
                    for (std::size_t i = 1; i != queues_size; ++i)
593  
                    {
594  
                        // FIXME: Do a better job here.
595  
                        std::size_t const idx = (i + num_thread) % queues_size;
596  
597  
                        HPX_ASSERT(idx != num_thread);
598  
599  
                        std::size_t pu_num = get_pu_num(idx);
600  
                        if (!test(numa_domain, pu_num)) //-V560 //-V600 //-V111
601  
                            continue;
602  
603  
                        if (idx < high_priority_queues &&
604  
                            num_thread < high_priority_queues)
605  
                        {
606  
                            thread_queue_type* q = high_priority_queues_[idx];
607  
                            if (q->get_next_thread(thrd))
608  
                            {
609  
                                q->increment_num_stolen_from_pending();
610  
                                this_high_priority_queue->
611  
                                    increment_num_stolen_to_pending();
612  
                                return true;
613  
                            }
614  
                        }
615  
616  
                        if (queues_[idx]->get_next_thread(thrd))
617  
                        {
618  
                            queues_[idx]->increment_num_stolen_from_pending();
619  
                            this_queue->increment_num_stolen_to_pending();
620  
                            return true;
621  
                        }
622  
                    }
623  
                }
624  
#endif
625  
            }
626  
627  
            else // not NUMA-sensitive
628  
            {
629  
                for (std::size_t i = 1; i != queues_size; ++i)
630  
                {
631  
                    // FIXME: Do a better job here.
632  
                    std::size_t const idx = (i + num_thread) % queues_size;
633  
634  
                    HPX_ASSERT(idx != num_thread);
635  
636  
                    if (idx < high_priority_queues &&
637  
                        num_thread < high_priority_queues)
638  
                    {
639 0.0%
                        thread_queue_type* q = high_priority_queues_[idx];
640  
                        if (q->get_next_thread(thrd))
641  
                        {
642  
                            q->increment_num_stolen_from_pending();
643  
                            this_high_priority_queue->
644  
                                increment_num_stolen_to_pending();
645  
                            return true;
646  
                        }
647  
                    }
648  
649  
                    if (queues_[idx]->get_next_thread(thrd))
650  
                    {
651  
                        queues_[idx]->increment_num_stolen_from_pending();
652  
                        this_queue->increment_num_stolen_to_pending();
653  
                        return true;
654  
                    }
655  
                }
656  
            }
657  
658  
            return low_priority_queue_.get_next_thread(thrd);
659  
        }
660  
661  
        /// Schedule the passed thread
662  
        void schedule_thread(threads::thread_data* thrd,
663  
            std::size_t num_thread,
664  
            thread_priority priority = thread_priority_normal)
665  
        {
666  
            if (std::size_t(-1) == num_thread)
667  
                num_thread = curr_queue_++ % queues_.size();
668  
669  
            if (priority == thread_priority_critical ||
670  
                priority == thread_priority_boost)
671  
            {
672  
                std::size_t num = num_thread % high_priority_queues_.size();
673  
                high_priority_queues_[num]->schedule_thread(thrd);
674  
            }
675  
            else if (priority == thread_priority_low) {
676  
                low_priority_queue_.schedule_thread(thrd);
677  
            }
678  
            else {
679  
                HPX_ASSERT(num_thread < queues_.size());
680  
                queues_[num_thread]->schedule_thread(thrd);
681  
            }
682  
        }
683  
684  
        void schedule_thread_last(threads::thread_data* thrd,
685  
            std::size_t num_thread,
686  
            thread_priority priority = thread_priority_normal)
687  
        {
688  
            if (std::size_t(-1) == num_thread)
689  
                num_thread = curr_queue_++ % queues_.size();
690  
691  
            if (priority == thread_priority_critical ||
692  
                priority == thread_priority_boost)
693  
            {
694  
                std::size_t num = num_thread % high_priority_queues_.size();
695  
                high_priority_queues_[num]->schedule_thread(thrd, true);
696  
            }
697  
            else if (priority == thread_priority_low) {
698  
                low_priority_queue_.schedule_thread(thrd, true);
699  
            }
700  
            else {
701  
                HPX_ASSERT(num_thread < queues_.size());
702  
                queues_[num_thread]->schedule_thread(thrd, true);
703  
            }
704  
        }
705  
706  
        /// Destroy the passed thread as it has been terminated
707  
        bool destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
708  
        {
709  
            for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
710  
            {
711  
                if (high_priority_queues_[i]->destroy_thread(thrd, busy_count))
712  
                    return true;
713  
            }
714  
715  
            for (std::size_t i = 0; i != queues_.size(); ++i)
716  
            {
717  
                if (queues_[i]->destroy_thread(thrd, busy_count))
718  
                    return true;
719  
            }
720  
721  
            if (low_priority_queue_.destroy_thread(thrd, busy_count))
722  
                return true;
723  
724  
            // the thread has to belong to one of the queues, always
725  
            HPX_ASSERT(false);
726  
727  
            return false;
728  
        }
729  
730  
        ///////////////////////////////////////////////////////////////////////
731  
        // This returns the current length of the queues (work items and new items)
732  
        std::int64_t get_queue_length(std::size_t num_thread = std::size_t(-1)) const
733  
        {
734  
            // Return queue length of one specific queue.
735  
            std::int64_t count = 0;
736  
            if (std::size_t(-1) != num_thread) {
737  
                HPX_ASSERT(num_thread < queues_.size());
738  
739  
                if (num_thread < high_priority_queues_.size())
740  
                    count = high_priority_queues_[num_thread]->get_queue_length();
741  
742  
                if (num_thread == queues_.size()-1)
743  
                    count += low_priority_queue_.get_queue_length();
744  
745  
                return count + queues_[num_thread]->get_queue_length();
746  
            }
747  
748  
            // Cumulative queue lengths of all queues.
749  
            for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
750  
                count += high_priority_queues_[i]->get_queue_length();
751  
752  
            count += low_priority_queue_.get_queue_length();
753  
754  
            for (std::size_t i = 0; i != queues_.size(); ++i)
755  
                count += queues_[i]->get_queue_length();
756  
757  
            return count;
758  
        }
759  
760  
        ///////////////////////////////////////////////////////////////////////
761  
        // Queries the current thread count of the queues.
762  
        std::int64_t get_thread_count(thread_state_enum state = unknown,
763  
            thread_priority priority = thread_priority_default,
764  
            std::size_t num_thread = std::size_t(-1), bool reset = false) const
765  
        {
766  
            // Return thread count of one specific queue.
767  
            std::int64_t count = 0;
768  
            if (std::size_t(-1) != num_thread)
769  
            {
770  
                HPX_ASSERT(num_thread < queues_.size());
771  
772  
                switch (priority) {
773  
                case thread_priority_default:
774  
                    {
775  
                        if (num_thread < high_priority_queues_.size())
776  
                            count = high_priority_queues_[num_thread]->
777  
                                get_thread_count(state);
778  
779  
                        if (queues_.size()-1 == num_thread)
780  
                            count += low_priority_queue_.get_thread_count(state);
781  
782  
                        return count + queues_[num_thread]->get_thread_count(state);
783  
                    }
784  
785  
                case thread_priority_low:
786  
                    {
787  
                        if (queues_.size()-1 == num_thread)
788  
                            return low_priority_queue_.get_thread_count(state);
789  
                        break;
790  
                    }
791  
792  
                case thread_priority_normal:
793  
                    return queues_[num_thread]->get_thread_count(state);
794  
795  
                case thread_priority_boost:
796  
                case thread_priority_critical:
797  
                    {
798  
                        if (num_thread < high_priority_queues_.size())
799  
                            return high_priority_queues_[num_thread]->
800  
                            get_thread_count(state);
801  
                        break;
802  
                    }
803  
804  
                default:
805  
                case thread_priority_unknown:
806  
                    {
807  
                        HPX_THROW_EXCEPTION(bad_parameter,
808  
                            "local_priority_queue_scheduler::get_thread_count",
809  
                            "unknown thread priority value (thread_priority_unknown)");
810  
                        return 0;
811  
                    }
812  
                }
813  
                return 0;
814  
            }
815  
816  
            // Return the cumulative count for all queues.
817  
            switch (priority) {
818  
            case thread_priority_default:
819  
                {
820  
                    for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
821  
                        count += high_priority_queues_[i]->get_thread_count(state);
822  
823  
                    count += low_priority_queue_.get_thread_count(state);
824  
825  
                    for (std::size_t i = 0; i != queues_.size(); ++i)
826  
                        count += queues_[i]->get_thread_count(state);
827  
828  
                    break;
829  
                }
830  
831  
            case thread_priority_low:
832  
                return low_priority_queue_.get_thread_count(state);
833  
834  
            case thread_priority_normal:
835  
                {
836  
                    for (std::size_t i = 0; i != queues_.size(); ++i)
837  
                        count += queues_[i]->get_thread_count(state);
838  
                    break;
839  
                }
840  
841  
            case thread_priority_boost:
842  
            case thread_priority_critical:
843  
                {
844  
                    for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
845  
                        count += high_priority_queues_[i]->get_thread_count(state);
846  
                    break;
847  
                }
848  
849  
            default:
850  
            case thread_priority_unknown:
851  
                {
852  
                    HPX_THROW_EXCEPTION(bad_parameter,
853  
                        "local_priority_queue_scheduler::get_thread_count",
854  
                        "unknown thread priority value (thread_priority_unknown)");
855  
                    return 0;
856  
                }
857  
            }
858  
            return count;
859  
        }
860  
861  
        ///////////////////////////////////////////////////////////////////////
862  
        // Enumerate matching threads from all queues
863  
        bool enumerate_threads(
864  
            util::function_nonser<bool(thread_id_type)> const& f,
865  
            thread_state_enum state = unknown) const
866  
        {
867  
            bool result = true;
868  
            for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
869  
            {
870  
                result = result &&
871  
                    high_priority_queues_[i]->enumerate_threads(f, state);
872  
            }
873  
874  
            result = result && low_priority_queue_.enumerate_threads(f, state);
875  
876  
            for (std::size_t i = 0; i != queues_.size(); ++i)
877  
            {
878  
                result = result && queues_[i]->enumerate_threads(f, state);
879  
            }
880  
            return result;
881  
        }
882  
883  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
884  
        ///////////////////////////////////////////////////////////////////////
885  
        // Queries the current average thread wait time of the queues.
886  
        std::int64_t get_average_thread_wait_time(
887  
            std::size_t num_thread = std::size_t(-1)) const
888  
        {
889  
            // Return average thread wait time of one specific queue.
890  
            std::uint64_t wait_time = 0;
891  
            std::uint64_t count = 0;
892  
            if (std::size_t(-1) != num_thread)
893  
            {
894  
                HPX_ASSERT(num_thread < queues_.size());
895  
896  
                if (num_thread < high_priority_queues_.size())
897  
                {
898  
                    wait_time = high_priority_queues_[num_thread]->
899  
                        get_average_thread_wait_time();
900  
                    ++count;
901  
                }
902  
903  
                if (queues_.size()-1 == num_thread)
904  
                {
905  
                    wait_time += low_priority_queue_.
906  
                        get_average_thread_wait_time();
907  
                    ++count;
908  
                }
909  
910  
                wait_time += queues_[num_thread]->get_average_thread_wait_time();
911  
                return wait_time / (count + 1);
912  
            }
913  
914  
            // Return the cumulative average thread wait time for all queues.
915  
            for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
916  
            {
917  
                wait_time += high_priority_queues_[i]->get_average_thread_wait_time();
918  
                ++count;
919  
            }
920  
921  
            wait_time += low_priority_queue_.get_average_thread_wait_time();
922  
923  
            for (std::size_t i = 0; i != queues_.size(); ++i)
924  
            {
925  
                wait_time += queues_[i]->get_average_thread_wait_time();
926  
                ++count;
927  
            }
928  
929  
            return wait_time / (count + 1);
930  
        }
931  
932  
        ///////////////////////////////////////////////////////////////////////
933  
        // Queries the current average task wait time of the queues.
934  
        std::int64_t get_average_task_wait_time(
935  
            std::size_t num_thread = std::size_t(-1)) const
936  
        {
937  
            // Return average task wait time of one specific queue.
938  
            std::uint64_t wait_time = 0;
939  
            std::uint64_t count = 0;
940  
            if (std::size_t(-1) != num_thread)
941  
            {
942  
                HPX_ASSERT(num_thread < queues_.size());
943  
944  
                if (num_thread < high_priority_queues_.size())
945  
                {
946  
                    wait_time = high_priority_queues_[num_thread]->
947  
                        get_average_task_wait_time();
948  
                    ++count;
949  
                }
950  
951  
                if (queues_.size()-1 == num_thread)
952  
                {
953  
                    wait_time += low_priority_queue_.
954  
                        get_average_task_wait_time();
955  
                    ++count;
956  
                }
957  
958  
                wait_time += queues_[num_thread]->get_average_task_wait_time();
959  
                return wait_time / (count + 1);
960  
            }
961  
962  
            // Return the cumulative average task wait time for all queues.
963  
            for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
964  
            {
965  
                wait_time += high_priority_queues_[i]->
966  
                    get_average_task_wait_time();
967  
                ++count;
968  
            }
969  
970  
            wait_time += low_priority_queue_.get_average_task_wait_time();
971  
972  
            for (std::size_t i = 0; i != queues_.size(); ++i)
973  
            {
974  
                wait_time += queues_[i]->get_average_task_wait_time();
975  
                ++count;
976  
            }
977  
978  
            return wait_time / (count + 1);
979  
        }
980  
#endif
981  
982  
        /// This is a function which gets called periodically by the thread
983  
        /// manager to allow for maintenance tasks to be executed in the
984  
        /// scheduler. Returns true if the OS thread calling this function
985  
        /// has to be terminated (i.e. no more work has to be done).
986  
        virtual bool wait_or_add_new(std::size_t num_thread, bool running,
987  
            std::int64_t& idle_loop_count)
988  
        {
989  
            std::size_t queues_size = queues_.size();
990  
            HPX_ASSERT(num_thread < queues_.size());
991  
992  
            std::size_t added = 0;
993  
            bool result = true;
994  
995  
            std::size_t high_priority_queues = high_priority_queues_.size();
996  
            thread_queue_type* this_high_priority_queue = nullptr;
997  
            thread_queue_type* this_queue = queues_[num_thread];
998  
999  
            if (num_thread < high_priority_queues)
1000  
            {
1001  
                this_high_priority_queue = high_priority_queues_[num_thread];
1002  
                result = this_high_priority_queue->wait_or_add_new(running,
1003  
                            idle_loop_count, added)
1004  
                        && result;
1005  
                if (0 != added) return result;
1006  
            }
1007  
1008  
            result = this_queue->wait_or_add_new(
1009  
                running, idle_loop_count, added) && result;
1010  
            if (0 != added) return result;
1011  
1012  
            if (numa_sensitive_ != 0)   // limited or no cross domain stealing
1013  
            {
1014  
                // steal work items: first try to steal from other cores in
1015  
                // the same NUMA node
1016  
1017  
                std::size_t pu_number = get_pu_num(num_thread);
1018  
#if !defined(HPX_NATIVE_MIC)        // we know that the MIC has one NUMA domain only
1019  
                if (test(steals_in_numa_domain_, pu_number)) //-V600 //-V111
1020  
#endif
1021  
                {
1022  
                    mask_cref_type numa_domain_mask =
1023  
                        numa_domain_masks_[num_thread];
1024  
1025  
                    for (std::size_t i = 1; i != queues_size; ++i)
1026  
                    {
1027  
                        // FIXME: Do a better job here.
1028  
                        std::size_t const idx = (i + num_thread) % queues_size;
1029  
1030  
                        HPX_ASSERT(idx != num_thread);
1031  
1032  
                        std::size_t pu_num = get_pu_num(idx);
1033  
                        if (!test(numa_domain_mask, pu_num)) //-V600
1034  
                            continue;
1035  
1036  
                        if (idx < high_priority_queues &&
1037  
                            num_thread < high_priority_queues)
1038  
                        {
1039  
                            thread_queue_type* q =  high_priority_queues_[idx];
1040  
                            result = this_high_priority_queue->
1041  
                                wait_or_add_new(running, idle_loop_count,
1042  
                                    added, q)
1043  
                              && result;
1044  
1045  
                            if (0 != added)
1046  
                            {
1047  
                                q->increment_num_stolen_from_staged(added);
1048  
                                this_high_priority_queue->
1049  
                                    increment_num_stolen_to_staged(added);
1050  
                                return result;
1051  
                            }
1052  
                        }
1053  
1054  
                        result = this_queue->wait_or_add_new(running,
1055  
                            idle_loop_count, added, queues_[idx]) && result;
1056  
                        if (0 != added)
1057  
                        {
1058  
                            queues_[idx]->increment_num_stolen_from_staged(added);
1059  
                            this_queue->increment_num_stolen_to_staged(added);
1060  
                            return result;
1061  
                        }
1062  
                    }
1063  
                }
1064  
1065  
#if !defined(HPX_NATIVE_MIC)        // we know that the MIC has one NUMA domain only
1066  
                // if nothing found, ask everybody else
1067  
                if (test(steals_outside_numa_domain_, pu_number)) //-V600 //-V111
1068  
                {
1069  
                    mask_cref_type numa_domain_mask =
1070  
                        outside_numa_domain_masks_[num_thread];
1071  
                    for (std::size_t i = 1; i != queues_size; ++i)
1072  
                    {
1073  
                        // FIXME: Do a better job here.
1074  
                        std::size_t const idx = (i + num_thread) % queues_size;
1075  
1076  
                        HPX_ASSERT(idx != num_thread);
1077  
1078  
                        std::size_t pu_num = get_pu_num(idx);
1079  
                        if (!test(numa_domain_mask, pu_num)) //-V600
1080  
                            continue;
1081  
1082  
                        if (idx < high_priority_queues &&
1083  
                            num_thread < high_priority_queues)
1084  
                        {
1085  
                            thread_queue_type* q =  high_priority_queues_[idx];
1086  
                            result = this_high_priority_queue->
1087  
                                wait_or_add_new(running, idle_loop_count,
1088  
                                    added, q)
1089  
                               && result;
1090  
                            if (0 != added)
1091  
                            {
1092  
                                q->increment_num_stolen_from_staged(added);
1093  
                                this_high_priority_queue->
1094  
                                    increment_num_stolen_to_staged(added);
1095  
                                return result;
1096  
                            }
1097  
                        }
1098  
1099  
                        result = this_queue->wait_or_add_new(running,
1100  
                            idle_loop_count, added, queues_[idx]) && result;
1101  
                        if (0 != added)
1102  
                        {
1103  
                            queues_[idx]->increment_num_stolen_from_staged(added);
1104  
                            this_queue->increment_num_stolen_to_staged(added);
1105  
                            return result;
1106  
                        }
1107  
                    }
1108  
                }
1109  
#endif
1110  
            }
1111  
1112  
            else // not NUMA-sensitive
1113  
            {
1114  
                for (std::size_t i = 1; i != queues_size; ++i)
1115  
                {
1116  
                    // FIXME: Do a better job here.
1117  
                    std::size_t const idx = (i + num_thread) % queues_size;
1118  
1119  
                    HPX_ASSERT(idx != num_thread);
1120  
1121  
                    if (idx < high_priority_queues &&
1122  
                        num_thread < high_priority_queues)
1123  
                    {
1124 0.7%
                        thread_queue_type* q =  high_priority_queues_[idx];
1125  
                        result = this_high_priority_queue->
1126  
                            wait_or_add_new(running, idle_loop_count, added, q)
1127  
                           && result;
1128  
                        if (0 != added)
1129  
                        {
1130  
                            q->increment_num_stolen_from_staged(added);
1131  
                            this_high_priority_queue->
1132  
                                increment_num_stolen_to_staged(added);
1133  
                            return result;
1134  
                        }
1135  
                    }
1136  
1137  
                    result = this_queue->wait_or_add_new(running,
1138 0.2%
                        idle_loop_count, added, queues_[idx]) && result;
1139  
                    if (0 != added)
1140  
                    {
1141  
                        queues_[idx]->increment_num_stolen_from_staged(added);
1142  
                        this_queue->increment_num_stolen_to_staged(added);
1143  
                        return result;
1144  
                    }
1145  
                }
1146  
            }
1147  
1148  
#ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION
1149  
            // no new work is available, are we deadlocked?
1150  
            if (HPX_UNLIKELY(minimal_deadlock_detection && LHPX_ENABLED(error)))
1151  
            {
1152  
                bool suspended_only = true;
1153  
1154  
                for (std::size_t i = 0; suspended_only && i != queues_.size(); ++i) {
1155  
                    suspended_only = queues_[i]->dump_suspended_threads(
1156  
                        i, idle_loop_count, running);
1157  
                }
1158  
1159  
                if (HPX_UNLIKELY(suspended_only)) {
1160  
                    if (running) {
1161  
                        LTM_(error) //-V128
1162  
                            << "queue(" << num_thread << "): "
1163  
                            << "no new work available, are we deadlocked?";
1164  
                    }
1165  
                    else {
1166  
                        LHPX_CONSOLE_(hpx::util::logging::level::error) //-V128
1167  
                              << "  [TM] " //-V128
1168  
                              << "queue(" << num_thread << "): "
1169  
                              << "no new work available, are we deadlocked?\n";
1170  
                    }
1171  
                }
1172  
            }
1173  
#endif
1174  
1175  
            result = low_priority_queue_.wait_or_add_new(running,
1176  
                idle_loop_count, added) && result;
1177  
            if (0 != added) return result;
1178  
1179  
            return result;
1180  
        }
1181  
1182  
        ///////////////////////////////////////////////////////////////////////
1183  
        void on_start_thread(std::size_t num_thread)
1184  
        {
1185  
            if (nullptr == queues_[num_thread])
1186  
            {
1187  
                queues_[num_thread] =
1188  
                    new thread_queue_type(max_queue_thread_count_);
1189  
1190  
                if (num_thread < high_priority_queues_.size())
1191  
                {
1192  
                    high_priority_queues_[num_thread] =
1193  
                        new thread_queue_type(max_queue_thread_count_);
1194  
                }
1195  
            }
1196  
1197  
            // forward this call to all queues etc.
1198  
            if (num_thread < high_priority_queues_.size())
1199  
                high_priority_queues_[num_thread]->on_start_thread(num_thread);
1200  
            if (num_thread == queues_.size()-1)
1201  
                low_priority_queue_.on_start_thread(num_thread);
1202  
1203  
            queues_[num_thread]->on_start_thread(num_thread);
1204  
1205  
            // pre-calculate certain constants for the given thread number
1206  
            std::size_t num_pu = get_pu_num(num_thread);
1207  
            mask_cref_type machine_mask = topology_.get_machine_affinity_mask();
1208  
            mask_cref_type core_mask =
1209  
                topology_.get_thread_affinity_mask(num_pu, numa_sensitive_ != 0);
1210  
            mask_cref_type node_mask =
1211  
                topology_.get_numa_node_affinity_mask(num_pu, numa_sensitive_ != 0);
1212  
1213  
            if (any(core_mask) && any(node_mask))
1214  
            {
1215  
#if !defined(HPX_NATIVE_MIC)        // we know that the MIC has one NUMA domain only
1216  
                set(steals_in_numa_domain_, num_pu);
1217  
#endif
1218  
                numa_domain_masks_[num_thread] = node_mask;
1219  
            }
1220  
1221  
            // we allow the thread on the boundary of the NUMA domain to steal
1222  
            mask_type first_mask = mask_type();
1223  
            resize(first_mask, mask_size(core_mask));
1224  
1225  
            std::size_t first = find_first(node_mask);
1226  
            if (first != std::size_t(-1))
1227  
                set(first_mask, first);
1228  
            else
1229  
                first_mask = core_mask;
1230  
1231  
            if (numa_sensitive_ != 2 && any(first_mask & core_mask))
1232  
            {
1233  
#if !defined(HPX_NATIVE_MIC)        // we know that the MIC has one NUMA domain only
1234  
                set(steals_outside_numa_domain_, num_pu);
1235  
#endif
1236  
                outside_numa_domain_masks_[num_thread] =
1237  
                    not_(node_mask) & machine_mask;
1238  
            }
1239  
        }
1240  
1241  
        void on_stop_thread(std::size_t num_thread)
1242  
        {
1243  
            if (num_thread < high_priority_queues_.size())
1244  
                high_priority_queues_[num_thread]->on_stop_thread(num_thread);
1245  
            if (num_thread == queues_.size()-1)
1246  
                low_priority_queue_.on_stop_thread(num_thread);
1247  
1248  
            queues_[num_thread]->on_stop_thread(num_thread);
1249  
        }
1250  
1251  
        void on_error(std::size_t num_thread, boost::exception_ptr const& e)
1252  
        {
1253  
            if (num_thread < high_priority_queues_.size())
1254  
                high_priority_queues_[num_thread]->on_error(num_thread, e);
1255  
            if (num_thread == queues_.size()-1)
1256  
                low_priority_queue_.on_error(num_thread, e);
1257  
1258  
            queues_[num_thread]->on_error(num_thread, e);
1259  
        }
1260  
1261  
        void reset_thread_distribution()
1262  
        {
1263  
            curr_queue_.store(0);
1264  
        }
1265  
1266  
    protected:
1267  
        std::size_t max_queue_thread_count_;
1268  
        std::vector<thread_queue_type*> queues_;
1269  
        std::vector<thread_queue_type*> high_priority_queues_;
1270  
        thread_queue_type low_priority_queue_;
1271  
        boost::atomic<std::size_t> curr_queue_;
1272  
        std::size_t numa_sensitive_;
1273  
1274  
#if !defined(HPX_NATIVE_MIC)        // we know that the MIC has one NUMA domain only
1275  
        mask_type steals_in_numa_domain_;
1276  
        mask_type steals_outside_numa_domain_;
1277  
#endif
1278  
        std::vector<mask_type> numa_domain_masks_;
1279  
        std::vector<mask_type> outside_numa_domain_masks_;
1280  
    };
1281  
}}}
1282  
1283  
#include <hpx/config/warnings_suffix.hpp>
1284  
1285  
#endif
1286  
1287  

Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.