/home/users/khuck/src/hpx-lsu/hpx/runtime/threads/policies/thread_queue.hpp

Line% of fetchesSource
1  
//  Copyright (c) 2007-2016 Hartmut Kaiser
2  
//  Copyright (c) 2011      Bryce Lelbach
3  
//
4  
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
7  
#if !defined(HPX_THREADMANAGER_THREAD_QUEUE_AUG_25_2009_0132PM)
8  
#define HPX_THREADMANAGER_THREAD_QUEUE_AUG_25_2009_0132PM
9  
10  
#include <hpx/config.hpp>
11  
#include <hpx/error_code.hpp>
12  
#include <hpx/runtime/threads/policies/lockfree_queue_backends.hpp>
13  
#include <hpx/runtime/threads/policies/queue_helpers.hpp>
14  
#include <hpx/runtime/threads/thread_data.hpp>
15  
#include <hpx/throw_exception.hpp>
16  
#include <hpx/util/assert.hpp>
17  
#include <hpx/util/block_profiler.hpp>
18  
#include <hpx/util/function.hpp>
19  
#include <hpx/util/get_and_reset_value.hpp>
20  
#include <hpx/util/high_resolution_clock.hpp>
21  
#include <hpx/util/unlock_guard.hpp>
22  
23  
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
24  
#   include <hpx/util/tick_counter.hpp>
25  
#endif
26  
27  
#include <boost/atomic.hpp>
28  
#include <boost/exception_ptr.hpp>
29  
#include <boost/thread/condition.hpp>
30  
#include <boost/thread/mutex.hpp>
31  
32  
#include <cstddef>
33  
#include <cstdint>
34  
#include <functional>
35  
#include <list>
36  
#include <map>
37  
#include <memory>
38  
#include <mutex>
39  
#include <unordered_set>
40  
#include <utility>
41  
#include <vector>
42  
43  
///////////////////////////////////////////////////////////////////////////////
44  
namespace std
45  
{
46  
    template <>
47  
    struct hash< ::hpx::threads::thread_id_type>
48  
    {
49  
        typedef ::hpx::threads::thread_id_type argument_type;
50  
        typedef std::size_t result_type;
51  
52  
        std::size_t operator()(::hpx::threads::thread_id_type const& v) const
53  
        {
54  
            std::hash<std::size_t> hasher_;
55  
            return hasher_(reinterpret_cast<std::size_t>(v.get()));
56  
        }
57  
    };
58  
}
59  
60  
///////////////////////////////////////////////////////////////////////////////
61  
namespace hpx { namespace threads { namespace policies
62  
{
63  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
64  
    ///////////////////////////////////////////////////////////////////////////
65  
    // We control whether to collect queue wait times using this global bool.
66  
    // It will be set by any of the related performance counters. Once set it
67  
    // stays set, thus no race conditions will occur.
68  
    extern bool maintain_queue_wait_times;
69  
#endif
70  
#ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION
71  
    ///////////////////////////////////////////////////////////////////////////
72  
    // We globally control whether to do minimal deadlock detection using this
73  
    // global bool variable. It will be set once by the runtime configuration
74  
    // startup code
75  
    extern bool minimal_deadlock_detection;
76  
#endif
77  
78  
    ///////////////////////////////////////////////////////////////////////////
79  
    // // Queue back-end interface:
80  
    //
81  
    // template <typename T>
82  
    // struct queue_backend
83  
    // {
84  
    //     typedef ... container_type;
85  
    //     typedef ... value_type;
86  
    //     typedef ... reference;
87  
    //     typedef ... const_reference;
88  
    //     typedef ... size_type;
89  
    //
90  
    //     queue_backend(
91  
    //         size_type initial_size = ...
92  
    //       , size_type num_thread = ...
93  
    //         );
94  
    //
95  
    //     bool push(const_reference val);
96  
    //
97  
    //     bool pop(reference val, bool steal = true);
98  
    //
99  
    //     bool empty();
100  
    // };
101  
    //
102  
    // struct queue_policy
103  
    // {
104  
    //     template <typename T>
105  
    //     struct apply
106  
    //     {
107  
    //         typedef ... type;
108  
    //     };
109  
    // };
110  
    template <typename Mutex = boost::mutex,
111  
        typename PendingQueuing = lockfree_lifo,
112  
        typename StagedQueuing = lockfree_lifo,
113  
        typename TerminatedQueuing = lockfree_fifo>
114  
    class thread_queue
115  
    {
116  
    private:
117  
        // we use a simple mutex to protect the data members for now
118  
        typedef Mutex mutex_type;
119  
120  
        // Add this number of threads to the work items queue each time the
121  
        // function \a add_new() is called if the queue is empty.
122  
        enum {
123  
            min_add_new_count = 10,
124  
            max_add_new_count = 10,
125  
            max_delete_count = 1000
126  
        };
127  
128  
        // this is the type of a map holding all threads (except depleted ones)
129  
        typedef std::unordered_set<thread_id_type> thread_map_type;
130  
131  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
132  
        typedef
133  
            util::tuple<thread_init_data, thread_state_enum, std::uint64_t>
134  
        task_description;
135  
#else
136  
        typedef util::tuple<thread_init_data, thread_state_enum> task_description;
137  
#endif
138  
139  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
140  
        typedef util::tuple<thread_data*, std::uint64_t> thread_description;
141  
#else
142  
        typedef thread_data thread_description;
143  
#endif
144  
145  
        typedef typename PendingQueuing::template
146  
            apply<thread_description*>::type work_items_type;
147  
148  
        typedef typename StagedQueuing::template
149  
            apply<task_description*>::type task_items_type;
150  
151  
        typedef typename TerminatedQueuing::template
152  
            apply<thread_data*>::type terminated_items_type;
153  
154  
    protected:
155  
        template <typename Lock>
156  
        void create_thread_object(threads::thread_id_type& thrd,
157  
            threads::thread_init_data& data, thread_state_enum state, Lock& lk)
158  
        {
159  
            HPX_ASSERT(lk.owns_lock());
160  
            HPX_ASSERT(data.stacksize != 0);
161  
162  
            std::ptrdiff_t stacksize = data.stacksize;
163  
164  
            std::list<thread_id_type>* heap = nullptr;
165  
166  
            if (stacksize == get_stack_size(thread_stacksize_small))
167  
            {
168  
                heap = &thread_heap_small_;
169  
            }
170  
            else if (stacksize == get_stack_size(thread_stacksize_medium))
171  
            {
172  
                heap = &thread_heap_medium_;
173  
            }
174  
            else if (stacksize == get_stack_size(thread_stacksize_large))
175  
            {
176  
                heap = &thread_heap_large_;
177  
            }
178  
            else if (stacksize == get_stack_size(thread_stacksize_huge))
179  
            {
180  
                heap = &thread_heap_huge_;
181  
            }
182  
            else {
183  
                switch(stacksize) {
184  
                case thread_stacksize_small:
185  
                    heap = &thread_heap_small_;
186  
                    break;
187  
188  
                case thread_stacksize_medium:
189  
                    heap = &thread_heap_medium_;
190  
                    break;
191  
192  
                case thread_stacksize_large:
193  
                    heap = &thread_heap_large_;
194  
                    break;
195  
196  
                case thread_stacksize_huge:
197  
                    heap = &thread_heap_huge_;
198  
                    break;
199  
200  
                default:
201  
                    break;
202  
                }
203  
            }
204  
            HPX_ASSERT(heap);
205  
206  
            // Check for an unused thread object.
207  
            if (!heap->empty())
208  
            {
209  
                // Take ownership of the thread object and rebind it.
210  
                thrd = heap->front();
211  
                heap->pop_front();
212  
                thrd->rebind(data,
213  
                    state == pending_do_not_schedule ? pending : state);
214  
            }
215  
216  
            else
217  
            {
218  
                hpx::util::unlock_guard<Lock> ull(lk);
219  
220  
                // Allocate a new thread object.
221  
                thrd = threads::thread_data::create(
222  
                    data, memory_pool_,
223  
                    state == pending_do_not_schedule ? pending : state);
224  
            }
225  
        }
226  
227  
        ///////////////////////////////////////////////////////////////////////
228  
        // add new threads if there is some amount of work available
229  
        std::size_t add_new(std::int64_t add_count, thread_queue* addfrom,
230  
            std::unique_lock<mutex_type> &lk, bool steal = false)
231  
        {
232  
            HPX_ASSERT(lk.owns_lock());
233  
234  
            if (HPX_UNLIKELY(0 == add_count))
235  
                return 0;
236  
237  
            std::size_t added = 0;
238  
            task_description* task = nullptr;
239  
            while (add_count-- && addfrom->new_tasks_.pop(task, steal))
240  
            {
241  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
242  
                if (maintain_queue_wait_times) {
243  
                    addfrom->new_tasks_wait_ +=
244  
                        util::high_resolution_clock::now() - util::get<2>(*task);
245  
                    ++addfrom->new_tasks_wait_count_;
246  
                }
247  
#endif
248  
                --addfrom->new_tasks_count_;
249  
250  
                // measure thread creation time
251  
                util::block_profiler_wrapper<add_new_tag> bp(add_new_logger_);
252  
253  
                // create the new thread
254  
                threads::thread_init_data& data = util::get<0>(*task);
255  
                thread_state_enum state = util::get<1>(*task);
256  
                threads::thread_id_type thrd;
257  
258  
                create_thread_object(thrd, data, state, lk);
259  
260  
                delete task;
261  
262  
                // add the new entry to the map of all threads
263  
                std::pair<thread_map_type::iterator, bool> p =
264  
                    thread_map_.insert(thrd);
265  
266  
                if (HPX_UNLIKELY(!p.second)) {
267  
                    HPX_THROW_EXCEPTION(hpx::out_of_memory,
268  
                        "threadmanager::add_new",
269  
                        "Couldn't add new thread to the thread map");
270  
                    return 0;
271  
                }
272  
                ++thread_map_count_;
273  
274  
                // only insert the thread into the work-items queue if it is in
275  
                // pending state
276  
                if (state == pending) {
277  
                    // pushing the new thread into the pending queue of the
278  
                    // specified thread_queue
279  
                    ++added;
280  
                    schedule_thread(thrd.get());
281  
                }
282  
283  
                // this thread has to be in the map now
284  
                HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end());
285  
                HPX_ASSERT(thrd->get_pool() == &memory_pool_);
286  
            }
287  
288  
            if (added) {
289  
                LTM_(debug) << "add_new: added " << added << " tasks to queues"; //-V128
290  
            }
291  
            return added;
292  
        }
293  
294  
        ///////////////////////////////////////////////////////////////////////
295  
        bool add_new_if_possible(std::size_t& added, thread_queue* addfrom,
296  
            std::unique_lock<mutex_type> &lk, bool steal = false)
297  
        {
298  
            HPX_ASSERT(lk.owns_lock());
299  
300  
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
301  
            util::tick_counter tc(add_new_time_);
302  
#endif
303  
304  
            if (0 == addfrom->new_tasks_count_.load(boost::memory_order_relaxed))
305  
                return false;
306  
307  
            // create new threads from pending tasks (if appropriate)
308  
            std::int64_t add_count = -1;                  // default is no constraint
309  
310  
            // if the map doesn't hold max_count threads yet add some
311  
            // FIXME: why do we have this test? can max_count_ ever be zero?
312  
            if (HPX_LIKELY(max_count_)) {
313  
                std::size_t count = thread_map_.size();
314  
                if (max_count_ >= count + min_add_new_count) { //-V104
315  
                    HPX_ASSERT(max_count_ - count <
316  
                        static_cast<std::size_t>((std::numeric_limits
317  
                            <std::int64_t>::max)()));
318  
                    add_count = static_cast<std::int64_t>(max_count_ - count);
319  
                    if (add_count < min_add_new_count)
320  
                        add_count = min_add_new_count;
321  
                }
322  
                else {
323  
                    return false;
324  
                }
325  
            }
326  
327  
            std::size_t addednew = add_new(add_count, addfrom, lk, steal);
328  
            added += addednew;
329  
            return addednew != 0;
330  
        }
331  
332  
        ///////////////////////////////////////////////////////////////////////
333  
        bool add_new_always(std::size_t& added, thread_queue* addfrom,
334  
            std::unique_lock<mutex_type> &lk, bool steal = false)
335  
        {
336  
            HPX_ASSERT(lk.owns_lock());
337  
338  
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
339  
            util::tick_counter tc(add_new_time_);
340  
#endif
341  
342  
            if (0 == addfrom->new_tasks_count_.load(boost::memory_order_relaxed))
343  
                return false;
344  
345  
            // create new threads from pending tasks (if appropriate)
346  
            std::int64_t add_count = -1;                  // default is no constraint
347  
348  
            // if we are desperate (no work in the queues), add some even if the
349  
            // map holds more than max_count
350  
            if (HPX_LIKELY(max_count_)) {
351  
                std::size_t count = thread_map_.size();
352  
                if (max_count_ >= count + min_add_new_count) { //-V104
353  
                    HPX_ASSERT(max_count_ - count <
354  
                        static_cast<std::size_t>((std::numeric_limits
355  
                            <std::int64_t>::max)()));
356  
                    add_count = static_cast<std::int64_t>(max_count_ - count);
357  
                    if (add_count < min_add_new_count)
358  
                        add_count = min_add_new_count;
359  
                    if (add_count > max_add_new_count)
360  
                        add_count = max_add_new_count;
361  
                }
362  
                else if (work_items_.empty()) {
363  
                    add_count = min_add_new_count;    // add this number of threads
364  
                    max_count_ += min_add_new_count;  // increase max_count //-V101
365  
                }
366  
                else {
367  
                    return false;
368  
                }
369  
            }
370  
371  
            std::size_t addednew = add_new(add_count, addfrom, lk, steal);
372  
            added += addednew;
373  
            return addednew != 0;
374  
        }
375  
376  
        void recycle_thread(thread_id_type thrd)
377  
        {
378  
            std::ptrdiff_t stacksize = thrd->get_stack_size();
379  
380  
            if (stacksize == get_stack_size(thread_stacksize_small))
381  
            {
382  
                thread_heap_small_.push_front(thrd);
383  
            }
384  
            else if (stacksize == get_stack_size(thread_stacksize_medium))
385  
            {
386  
                thread_heap_medium_.push_front(thrd);
387  
            }
388  
            else if (stacksize == get_stack_size(thread_stacksize_large))
389  
            {
390  
                thread_heap_large_.push_front(thrd);
391  
            }
392  
            else if (stacksize == get_stack_size(thread_stacksize_huge))
393  
            {
394  
                thread_heap_huge_.push_front(thrd);
395  
            }
396  
            else
397  
            {
398  
                switch(stacksize) {
399  
                case thread_stacksize_small:
400  
                    thread_heap_small_.push_front(thrd);
401  
                    break;
402  
403  
                case thread_stacksize_medium:
404  
                    thread_heap_medium_.push_front(thrd);
405  
                    break;
406  
407  
                case thread_stacksize_large:
408  
                    thread_heap_large_.push_front(thrd);
409  
                    break;
410  
411  
                case thread_stacksize_huge:
412  
                    thread_heap_huge_.push_front(thrd);
413  
                    break;
414  
415  
                default:
416  
                    HPX_ASSERT(false);
417  
                    break;
418  
                }
419  
            }
420  
        }
421  
422  
    public:
423  
        /// This function makes sure all threads which are marked for deletion
424  
        /// (state is terminated) are properly destroyed.
425  
        ///
426  
        /// This returns 'true' if there are no more terminated threads waiting
427  
        /// to be deleted.
428  
        bool cleanup_terminated_locked_helper(bool delete_all = false)
429  
        {
430  
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
431  
            util::tick_counter tc(cleanup_terminated_time_);
432  
#endif
433  
434  
            if (terminated_items_count_ == 0 && thread_map_.empty())
435  
                return true;
436  
437  
            if (delete_all) {
438  
                // delete all threads
439  
                thread_data* todelete;
440  
                while (terminated_items_.pop(todelete))
441  
                {
442  
                    --terminated_items_count_;
443  
444  
                    // this thread has to be in this map
445  
                    HPX_ASSERT(thread_map_.find(todelete) != thread_map_.end());
446  
447  
                    bool deleted = thread_map_.erase(todelete) != 0;
448  
                    HPX_ASSERT(deleted);
449  
                    if (deleted) {
450  
                        --thread_map_count_;
451  
                        HPX_ASSERT(thread_map_count_ >= 0);
452  
                    }
453  
                }
454  
            }
455  
            else {
456  
                // delete only this many threads
457  
                std::int64_t delete_count =
458  
                    (std::max)(
459  
                        static_cast<std::int64_t>(terminated_items_count_ / 10),
460  
                        static_cast<std::int64_t>(max_delete_count));
461  
462  
                thread_data* todelete;
463  
                while (delete_count && terminated_items_.pop(todelete))
464  
                {
465  
                    --terminated_items_count_;
466  
467  
                    thread_map_type::iterator it = thread_map_.find(todelete);
468  
469  
                    // this thread has to be in this map
470  
                    HPX_ASSERT(it != thread_map_.end());
471  
472  
                    recycle_thread(*it);
473  
474  
                    thread_map_.erase(it);
475  
                    --thread_map_count_;
476  
                    HPX_ASSERT(thread_map_count_ >= 0);
477  
478  
                    --delete_count;
479  
                }
480  
            }
481  
            return terminated_items_count_ == 0;
482  
        }
483  
484  
        bool cleanup_terminated_locked(bool delete_all = false)
485  
        {
486  
            return cleanup_terminated_locked_helper(delete_all) &&
487  
                thread_map_.empty();
488  
        }
489  
490  
    public:
491  
        bool cleanup_terminated(bool delete_all = false)
492  
        {
493  
            if (terminated_items_count_ == 0)
494  
                return thread_map_count_ == 0;
495  
496  
            if (delete_all) {
497  
                // do not lock mutex while deleting all threads, do it piece-wise
498  
                bool thread_map_is_empty = false;
499  
                while (true)
500  
                {
501  
                    std::lock_guard<mutex_type> lk(mtx_);
502  
                    if (cleanup_terminated_locked_helper(false))
503  
                    {
504  
                        thread_map_is_empty =
505  
                            (thread_map_count_ == 0) && (new_tasks_count_ == 0);
506  
                        break;
507  
                    }
508  
                }
509  
                return thread_map_is_empty;
510  
            }
511  
512  
            std::lock_guard<mutex_type> lk(mtx_);
513  
            return cleanup_terminated_locked_helper(false) &&
514  
                (thread_map_count_ == 0) && (new_tasks_count_ == 0);
515  
        }
516  
517  
        // The maximum number of active threads this thread manager should
518  
        // create. This number will be a constraint only as long as the work
519  
        // items queue is not empty. Otherwise the number of active threads
520  
        // will be incremented in steps equal to the \a min_add_new_count
521  
        // specified above.
522  
        enum { max_thread_count = 1000 };
523  
524  
        thread_queue(std::size_t queue_num = std::size_t(-1),
525  
                std::size_t max_count = max_thread_count)
526  
          : thread_map_count_(0),
527  
            work_items_(128, queue_num),
528  
            work_items_count_(0),
529  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
530  
            work_items_wait_(0),
531  
            work_items_wait_count_(0),
532  
#endif
533  
            terminated_items_(128),
534  
            terminated_items_count_(0),
535  
            max_count_((0 == max_count)
536  
                      ? static_cast<std::size_t>(max_thread_count)
537  
                      : max_count),
538  
            new_tasks_(128),
539  
            new_tasks_count_(0),
540  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
541  
            new_tasks_wait_(0),
542  
            new_tasks_wait_count_(0),
543  
#endif
544  
            memory_pool_(64),
545  
            thread_heap_small_(),
546  
            thread_heap_medium_(),
547  
            thread_heap_large_(),
548  
            thread_heap_huge_(),
549  
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
550  
            add_new_time_(0),
551  
            cleanup_terminated_time_(0),
552  
#endif
553  
#ifdef HPX_HAVE_THREAD_STEALING_COUNTS
554  
            pending_misses_(0),
555  
            pending_accesses_(0),
556  
            stolen_from_pending_(0),
557  
            stolen_from_staged_(0),
558  
            stolen_to_pending_(0),
559  
            stolen_to_staged_(0),
560  
#endif
561  
            add_new_logger_("thread_queue::add_new")
562  
        {}
563  
564  
        void set_max_count(std::size_t max_count = max_thread_count)
565  
        {
566  
            max_count_ = (0 == max_count) ? max_thread_count : max_count; //-V105
567  
        }
568  
569  
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
570  
        std::uint64_t get_creation_time(bool reset)
571  
        {
572  
            return util::get_and_reset_value(add_new_time_, reset);
573  
        }
574  
575  
        std::uint64_t get_cleanup_time(bool reset)
576  
        {
577  
            return util::get_and_reset_value(cleanup_terminated_time_, reset);
578  
        }
579  
#endif
580  
581  
        ///////////////////////////////////////////////////////////////////////
582  
        // This returns the current length of the queues (work items and new items)
583  
        std::int64_t get_queue_length() const
584  
        {
585  
            return work_items_count_ + new_tasks_count_;
586  
        }
587  
588  
        // This returns the current length of the pending queue
589  
        std::int64_t get_pending_queue_length() const
590  
        {
591  
            return work_items_count_;
592  
        }
593  
594  
        // This returns the current length of the staged queue
595  
        std::int64_t get_staged_queue_length(
596  
            boost::memory_order order = boost::memory_order_seq_cst) const
597  
        {
598  
            return new_tasks_count_.load(order);
599  
        }
600  
601  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
602  
        std::uint64_t get_average_task_wait_time() const
603  
        {
604  
            std::uint64_t count = new_tasks_wait_count_;
605  
            if (count == 0)
606  
                return 0;
607  
            return new_tasks_wait_ / count;
608  
        }
609  
610  
        std::uint64_t get_average_thread_wait_time() const
611  
        {
612  
            std::uint64_t count = work_items_wait_count_;
613  
            if (count == 0)
614  
                return 0;
615  
            return work_items_wait_ / count;
616  
        }
617  
#endif
618  
619  
#ifdef HPX_HAVE_THREAD_STEALING_COUNTS
620  
        std::int64_t get_num_pending_misses(bool reset)
621  
        {
622  
            return util::get_and_reset_value(pending_misses_, reset);
623  
        }
624  
625  
        void increment_num_pending_misses(std::size_t num = 1)
626  
        {
627  
            pending_misses_ += num;
628  
        }
629  
630  
        std::int64_t get_num_pending_accesses(bool reset)
631  
        {
632  
            return util::get_and_reset_value(pending_accesses_, reset);
633  
        }
634  
635  
        void increment_num_pending_accesses(std::size_t num = 1)
636  
        {
637  
            pending_accesses_ += num;
638  
        }
639  
640  
        std::int64_t get_num_stolen_from_pending(bool reset)
641  
        {
642  
            return util::get_and_reset_value(stolen_from_pending_, reset);
643  
        }
644  
645  
        void increment_num_stolen_from_pending(std::size_t num = 1)
646  
        {
647  
            stolen_from_pending_ += num;
648  
        }
649  
650  
        std::int64_t get_num_stolen_from_staged(bool reset)
651  
        {
652  
            return util::get_and_reset_value(stolen_from_staged_, reset);
653  
        }
654  
655  
        void increment_num_stolen_from_staged(std::size_t num = 1)
656  
        {
657  
            stolen_from_staged_ += num;
658  
        }
659  
660  
        std::int64_t get_num_stolen_to_pending(bool reset)
661  
        {
662  
            return util::get_and_reset_value(stolen_to_pending_, reset);
663  
        }
664  
665  
        void increment_num_stolen_to_pending(std::size_t num = 1)
666  
        {
667  
            stolen_to_pending_ += num;
668  
        }
669  
670  
        std::int64_t get_num_stolen_to_staged(bool reset)
671  
        {
672  
            return util::get_and_reset_value(stolen_to_staged_, reset);
673  
        }
674  
675  
        void increment_num_stolen_to_staged(std::size_t num = 1)
676  
        {
677  
            stolen_to_staged_ += num;
678  
        }
679  
#else
680  
        void increment_num_pending_misses(std::size_t num = 1) {}
681  
        void increment_num_pending_accesses(std::size_t num = 1) {}
682  
        void increment_num_stolen_from_pending(std::size_t num = 1) {}
683  
        void increment_num_stolen_from_staged(std::size_t num = 1) {}
684  
        void increment_num_stolen_to_pending(std::size_t num = 1) {}
685  
        void increment_num_stolen_to_staged(std::size_t num = 1) {}
686  
#endif
687  
688  
        ///////////////////////////////////////////////////////////////////////
689  
        // create a new thread and schedule it if the initial state is equal to
690  
        // pending
691  
        void create_thread(thread_init_data& data, thread_id_type* id,
692  
            thread_state_enum initial_state, bool run_now, error_code& ec)
693  
        {
694  
            // thread has not been created yet
695  
            if (id) *id = invalid_thread_id;
696  
697  
            if (run_now)
698  
            {
699  
                threads::thread_id_type thrd;
700  
701  
                // The mutex can not be locked while a new thread is getting
702  
                // created, as it might have that the current HPX thread gets
703  
                // suspended.
704  
                {
705  
                    std::unique_lock<mutex_type> lk(mtx_);
706  
707  
                    create_thread_object(thrd, data, initial_state, lk);
708  
709  
                    // add a new entry in the map for this thread
710  
                    std::pair<thread_map_type::iterator, bool> p =
711  
                        thread_map_.insert(thrd);
712  
713  
                    if (HPX_UNLIKELY(!p.second)) {
714  
                        HPX_THROWS_IF(ec, hpx::out_of_memory,
715  
                            "threadmanager::register_thread",
716  
                            "Couldn't add new thread to the map of threads");
717  
                        return;
718  
                    }
719  
                    ++thread_map_count_;
720  
721  
                    // this thread has to be in the map now
722  
                    HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end());
723  
                    HPX_ASSERT(thrd->get_pool() == &memory_pool_);
724  
725  
                    // push the new thread in the pending queue thread
726  
                    if (initial_state == pending)
727  
                        schedule_thread(thrd.get());
728  
729  
                    // return the thread_id of the newly created thread
730  
                    if (id) *id = std::move(thrd);
731  
732  
                    if (&ec != &throws)
733  
                        ec = make_success_code();
734  
                    return;
735  
                }
736  
            }
737  
738  
            // do not execute the work, but register a task description for
739  
            // later thread creation
740  
            ++new_tasks_count_;
741  
742  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
743  
            new_tasks_.push(new task_description(
744  
                std::move(data), initial_state,
745  
                util::high_resolution_clock::now()
746  
            ));
747  
#else
748  
            new_tasks_.push(new task_description( //-V106
749  
                std::move(data), initial_state));
750  
#endif
751  
            if (&ec != &throws)
752  
                ec = make_success_code();
753  
        }
754  
755  
        void move_work_items_from(thread_queue *src, std::int64_t count)
756  
        {
757  
            thread_description* trd;
758  
            while (src->work_items_.pop(trd))
759  
            {
760  
                --src->work_items_count_;
761  
762  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
763  
                if (maintain_queue_wait_times) {
764  
                    std::uint64_t now = util::high_resolution_clock::now();
765  
                    src->work_items_wait_ += now - util::get<1>(*trd);
766  
                    ++src->work_items_wait_count_;
767  
                    util::get<1>(*trd) = now;
768  
                }
769  
#endif
770  
771  
                bool finished = count == ++work_items_count_;
772  
                work_items_.push(trd);
773  
                if (finished)
774  
                    break;
775  
            }
776  
        }
777  
778  
        void move_task_items_from(thread_queue *src,
779  
            std::int64_t count)
780  
        {
781  
            task_description* task;
782  
            while (src->new_tasks_.pop(task))
783  
            {
784  
                --src->new_tasks_count_;
785  
786  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
787  
                if (maintain_queue_wait_times) {
788  
                    std::int64_t now = util::high_resolution_clock::now();
789  
                    src->new_tasks_wait_ += now - util::get<2>(*task);
790  
                    ++src->new_tasks_wait_count_;
791  
                    util::get<2>(*task) = now;
792  
                }
793  
#endif
794  
795  
                bool finish = count == ++new_tasks_count_;
796  
                if (new_tasks_.push(task))
797  
                {
798  
                    if (finish)
799  
                        break;
800  
                }
801  
                else
802  
                {
803  
                    --new_tasks_count_;
804  
                }
805  
            }
806  
        }
807  
808  
        /// Return the next thread to be executed, return false if non is
809  
        /// available
810  
        bool get_next_thread(threads::thread_data*& thrd,
811  
            bool steal = false) HPX_HOT
812  
        {
813  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
814  
            thread_description* tdesc;
815  
            if (0 != work_items_count_.load(boost::memory_order_relaxed) &&
816  
                work_items_.pop(tdesc, steal))
817  
            {
818  
                --work_items_count_;
819  
820  
                if (maintain_queue_wait_times) {
821  
                    work_items_wait_ += util::high_resolution_clock::now() -
822  
                        util::get<1>(*tdesc);
823  
                    ++work_items_wait_count_;
824  
                }
825  
826  
                thrd = util::get<0>(*tdesc);
827  
                delete tdesc;
828  
829  
                return true;
830  
            }
831  
#else
832  
            if (0 != work_items_count_.load(boost::memory_order_relaxed) &&
833  
                work_items_.pop(thrd, steal))
834  
            {
835  
                --work_items_count_;
836  
                return true;
837  
            }
838  
#endif
839  
            return false;
840  
        }
841  
842  
        /// Schedule the passed thread
843  
        void schedule_thread(threads::thread_data* thrd, bool other_end = false)
844  
        {
845  
            ++work_items_count_;
846  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
847  
            work_items_.push(new thread_description(
848  
                thrd, util::high_resolution_clock::now()), other_end);
849  
#else
850  
            work_items_.push(thrd, other_end);
851  
#endif
852  
        }
853  
854  
        /// Destroy the passed thread as it has been terminated
855  
        bool destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
856  
        {
857  
            if (thrd->get_pool() == &memory_pool_)
858  
            {
859  
                terminated_items_.push(thrd);
860  
861  
                std::int64_t count = ++terminated_items_count_;
862  
                if (count > HPX_MAX_TERMINATED_THREADS)
863  
                {
864  
                    cleanup_terminated(true);   // clean up all terminated threads
865  
                }
866  
                return true;
867  
            }
868  
            return false;
869  
        }
870  
871  
        ///////////////////////////////////////////////////////////////////////
872  
        /// Return the number of existing threads with the given state.
873  
        std::int64_t get_thread_count(thread_state_enum state = unknown) const
874  
        {
875  
            if (terminated == state)
876  
                return terminated_items_count_;
877  
878  
            if (staged == state)
879  
                return new_tasks_count_;
880  
881  
            if (unknown == state)
882  
                return thread_map_count_ + new_tasks_count_ - terminated_items_count_;
883  
884  
            // acquire lock only if absolutely necessary
885  
            std::lock_guard<mutex_type> lk(mtx_);
886  
887  
            std::int64_t num_threads = 0;
888  
            thread_map_type::const_iterator end = thread_map_.end();
889  
            for (thread_map_type::const_iterator it = thread_map_.begin();
890  
                 it != end; ++it)
891  
            {
892  
                if ((*it)->get_state().state() == state)
893  
                    ++num_threads;
894  
            }
895  
            return num_threads;
896  
        }
897  
898  
        ///////////////////////////////////////////////////////////////////////
899  
        void abort_all_suspended_threads()
900  
        {
901  
            std::lock_guard<mutex_type> lk(mtx_);
902  
            thread_map_type::iterator end =  thread_map_.end();
903  
            for (thread_map_type::iterator it = thread_map_.begin();
904  
                 it != end; ++it)
905  
            {
906  
                if ((*it)->get_state().state() == suspended)
907  
                {
908  
                    (*it)->set_state(pending, wait_abort);
909  
                    schedule_thread((*it).get());
910  
                }
911  
            }
912  
        }
913  
914  
        bool enumerate_threads(
915  
            util::function_nonser<bool(thread_id_type)> const& f,
916  
            thread_state_enum state = unknown) const
917  
        {
918  
            std::uint64_t count = thread_map_count_;
919  
            if (state == terminated)
920  
            {
921  
                count = terminated_items_count_;
922  
            }
923  
            else if (state == staged)
924  
            {
925  
                HPX_THROW_EXCEPTION(bad_parameter,
926  
                    "thread_queue::iterate_threads",
927  
                    "can't iterate over thread ids of staged threads");
928  
                return false;
929  
            }
930  
931  
            std::vector<thread_id_type> ids;
932  
            ids.reserve(count);
933  
934  
            if (state == unknown)
935  
            {
936  
                std::lock_guard<mutex_type> lk(mtx_);
937  
                thread_map_type::const_iterator end =  thread_map_.end();
938  
                for (thread_map_type::const_iterator it = thread_map_.begin();
939  
                     it != end; ++it)
940  
                {
941  
                    ids.push_back(*it);
942  
                }
943  
            }
944  
            else
945  
            {
946  
                std::lock_guard<mutex_type> lk(mtx_);
947  
                thread_map_type::const_iterator end =  thread_map_.end();
948  
                for (thread_map_type::const_iterator it = thread_map_.begin();
949  
                     it != end; ++it)
950  
                {
951  
                    if ((*it)->get_state().state() == state)
952  
                        ids.push_back(*it);
953  
                }
954  
            }
955  
956  
            // now invoke callback function for all matching threads
957  
            for (thread_id_type const& id : ids)
958  
            {
959  
                if (!f(id))
960  
                    return false;       // stop iteration
961  
            }
962  
963  
            return true;
964  
        }
965  
966  
        /// This is a function which gets called periodically by the thread
967  
        /// manager to allow for maintenance tasks to be executed in the
968  
        /// scheduler. Returns true if the OS thread calling this function
969  
        /// has to be terminated (i.e. no more work has to be done).
970  
        inline bool wait_or_add_new(bool running,
971  
            std::int64_t& idle_loop_count, std::size_t& added,
972  
            thread_queue* addfrom_ = nullptr, bool steal = false) HPX_HOT
973  
        {
974  
            // try to generate new threads from task lists, but only if our
975  
            // own list of threads is empty
976  
            if (0 == work_items_count_.load(boost::memory_order_relaxed)) {
977  
978  
                // No obvious work has to be done, so a lock won't hurt too much.
979  
                //
980  
                // We prefer to exit this function (some kind of very short
981  
                // busy waiting) to blocking on this lock. Locking fails either
982  
                // when a thread is currently doing thread maintenance, which
983  
                // means there might be new work, or the thread owning the lock
984  
                // just falls through to the cleanup work below (no work is available)
985  
                // in which case the current thread (which failed to acquire
986  
                // the lock) will just retry to enter this loop.
987  
                std::unique_lock<mutex_type> lk(mtx_, std::try_to_lock);
988  
                if (!lk.owns_lock())
989  
                    return false;            // avoid long wait on lock
990  
991  
                // stop running after all HPX threads have been terminated
992  
                thread_queue* addfrom = addfrom_ ? addfrom_ : this;
993  
                bool added_new = add_new_always(added, addfrom, lk, steal);
994  
                if (!added_new) {
995  
                    // Before exiting each of the OS threads deletes the
996  
                    // remaining terminated HPX threads
997  
                    // REVIEW: Should we be doing this if we are stealing?
998  
                    bool canexit = cleanup_terminated_locked(true);
999  
                    if (!running && canexit) {
1000  
                        // we don't have any registered work items anymore
1001  
                        //do_some_work();       // notify possibly waiting threads
1002  
                        return true;            // terminate scheduling loop
1003  
                    }
1004  
                    return false;
1005  
                }
1006  
1007  
                cleanup_terminated_locked();
1008  
            }
1009  
            return false;
1010  
        }
1011  
1012  
        ///////////////////////////////////////////////////////////////////////
1013  
        bool dump_suspended_threads(std::size_t num_thread
1014  
          , std::int64_t& idle_loop_count, bool running)
1015  
        {
1016  
#ifndef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION
1017  
            return false;
1018  
#else
1019  
            if (minimal_deadlock_detection) {
1020  
                std::lock_guard<mutex_type> lk(mtx_);
1021  
                return detail::dump_suspended_threads(num_thread, thread_map_
1022  
                  , idle_loop_count, running);
1023  
            }
1024  
            return false;
1025  
#endif
1026  
        }
1027  
1028  
        ///////////////////////////////////////////////////////////////////////
1029  
        void on_start_thread(std::size_t num_thread) {}
1030  
        void on_stop_thread(std::size_t num_thread) {}
1031  
        void on_error(std::size_t num_thread, boost::exception_ptr const& e) {}
1032  
1033  
    private:
1034  
        mutable mutex_type mtx_;                    ///< mutex protecting the members
1035  
1036  
        thread_map_type thread_map_;
1037  
        ///< mapping of thread id's to HPX-threads
1038  
        boost::atomic<std::int64_t> thread_map_count_;
1039  
        ///< overall count of work items
1040  
1041  
        work_items_type work_items_;
1042  
        ///< list of active work items
1043  
        boost::atomic<std::int64_t> work_items_count_;
1044  
        ///< count of active work items
1045  
1046  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
1047  
        boost::atomic<std::int64_t> work_items_wait_;
1048  
        ///< overall wait time of work items
1049  
        boost::atomic<std::int64_t> work_items_wait_count_;
1050  
        ///< overall number of work items in queue
1051  
#endif
1052  
        terminated_items_type terminated_items_;     ///< list of terminated threads
1053  
        boost::atomic<std::int64_t> terminated_items_count_;
1054  
        ///< count of terminated items
1055  
1056  
        std::size_t max_count_;
1057  
        ///< maximum number of existing HPX-threads
1058  
        task_items_type new_tasks_;
1059  
        ///< list of new tasks to run
1060  
1061  
        boost::atomic<std::int64_t> new_tasks_count_;
1062  
        ///< count of new tasks to run
1063  
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
1064  
        boost::atomic<std::int64_t> new_tasks_wait_;
1065  
        ///< overall wait time of new tasks
1066  
        boost::atomic<std::int64_t> new_tasks_wait_count_;
1067  
        ///< overall number tasks waited
1068  
#endif
1069  
1070  
        threads::thread_pool memory_pool_;          ///< OS thread local memory pools for
1071  
                                                    ///< HPX-threads
1072  
1073  
        std::list<thread_id_type> thread_heap_small_;
1074  
        std::list<thread_id_type> thread_heap_medium_;
1075  
        std::list<thread_id_type> thread_heap_large_;
1076  
        std::list<thread_id_type> thread_heap_huge_;
1077  
1078  
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
1079  
        std::uint64_t add_new_time_;
1080  
        std::uint64_t cleanup_terminated_time_;
1081  
#endif
1082  
1083  
#ifdef HPX_HAVE_THREAD_STEALING_COUNTS
1084  
        // # of times our associated worker-thread couldn't find work in work_items
1085  
        boost::atomic<std::int64_t> pending_misses_;
1086  
1087  
        // # of times our associated worker-thread looked for work in work_items
1088  
        boost::atomic<std::int64_t> pending_accesses_;
1089  
1090  
        boost::atomic<std::int64_t> stolen_from_pending_;
1091  
        ///< count of work_items stolen from this queue
1092  
        boost::atomic<std::int64_t> stolen_from_staged_;
1093  
        ///< count of new_tasks stolen from this queue
1094  
        boost::atomic<std::int64_t> stolen_to_pending_;
1095  
        ///< count of work_items stolen to this queue from other queues
1096  
        boost::atomic<std::int64_t> stolen_to_staged_;
1097  
        ///< count of new_tasks stolen to this queue from other queues
1098  
#endif
1099  
1100  
        util::block_profiler<add_new_tag> add_new_logger_;
1101  
    };
1102  
}}}
1103  
1104  
#endif
1105  
1106  

Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.