Line | % of fetches | Source |
---|---|---|
1 | // Copyright (c) 2007-2016 Hartmut Kaiser | |
2 | // Copyright (c) 2011 Bryce Lelbach | |
3 | // | |
4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | ||
7 | #if !defined(HPX_THREADMANAGER_THREAD_QUEUE_AUG_25_2009_0132PM) | |
8 | #define HPX_THREADMANAGER_THREAD_QUEUE_AUG_25_2009_0132PM | |
9 | ||
10 | #include <hpx/config.hpp> | |
11 | #include <hpx/error_code.hpp> | |
12 | #include <hpx/runtime/threads/policies/lockfree_queue_backends.hpp> | |
13 | #include <hpx/runtime/threads/policies/queue_helpers.hpp> | |
14 | #include <hpx/runtime/threads/thread_data.hpp> | |
15 | #include <hpx/throw_exception.hpp> | |
16 | #include <hpx/util/assert.hpp> | |
17 | #include <hpx/util/block_profiler.hpp> | |
18 | #include <hpx/util/function.hpp> | |
19 | #include <hpx/util/get_and_reset_value.hpp> | |
20 | #include <hpx/util/high_resolution_clock.hpp> | |
21 | #include <hpx/util/unlock_guard.hpp> | |
22 | ||
23 | #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES | |
24 | # include <hpx/util/tick_counter.hpp> | |
25 | #endif | |
26 | ||
27 | #include <boost/atomic.hpp> | |
28 | #include <boost/exception_ptr.hpp> | |
29 | #include <boost/thread/condition.hpp> | |
30 | #include <boost/thread/mutex.hpp> | |
31 | ||
32 | #include <cstddef> | |
33 | #include <cstdint> | |
34 | #include <functional> | |
35 | #include <list> | |
36 | #include <map> | |
37 | #include <memory> | |
38 | #include <mutex> | |
39 | #include <unordered_set> | |
40 | #include <utility> | |
41 | #include <vector> | |
42 | ||
43 | /////////////////////////////////////////////////////////////////////////////// | |
44 | namespace std | |
45 | { | |
46 | template <> | |
47 | struct hash< ::hpx::threads::thread_id_type> | |
48 | { | |
49 | typedef ::hpx::threads::thread_id_type argument_type; | |
50 | typedef std::size_t result_type; | |
51 | ||
52 | std::size_t operator()(::hpx::threads::thread_id_type const& v) const | |
53 | { | |
54 | std::hash<std::size_t> hasher_; | |
55 | return hasher_(reinterpret_cast<std::size_t>(v.get())); | |
56 | } | |
57 | }; | |
58 | } | |
59 | ||
60 | /////////////////////////////////////////////////////////////////////////////// | |
61 | namespace hpx { namespace threads { namespace policies | |
62 | { | |
63 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
64 | /////////////////////////////////////////////////////////////////////////// | |
65 | // We control whether to collect queue wait times using this global bool. | |
66 | // It will be set by any of the related performance counters. Once set it | |
67 | // stays set, thus no race conditions will occur. | |
68 | extern bool maintain_queue_wait_times; | |
69 | #endif | |
70 | #ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION | |
71 | /////////////////////////////////////////////////////////////////////////// | |
72 | // We globally control whether to do minimal deadlock detection using this | |
73 | // global bool variable. It will be set once by the runtime configuration | |
74 | // startup code | |
75 | extern bool minimal_deadlock_detection; | |
76 | #endif | |
77 | ||
78 | /////////////////////////////////////////////////////////////////////////// | |
79 | // // Queue back-end interface: | |
80 | // | |
81 | // template <typename T> | |
82 | // struct queue_backend | |
83 | // { | |
84 | // typedef ... container_type; | |
85 | // typedef ... value_type; | |
86 | // typedef ... reference; | |
87 | // typedef ... const_reference; | |
88 | // typedef ... size_type; | |
89 | // | |
90 | // queue_backend( | |
91 | // size_type initial_size = ... | |
92 | // , size_type num_thread = ... | |
93 | // ); | |
94 | // | |
95 | // bool push(const_reference val); | |
96 | // | |
97 | // bool pop(reference val, bool steal = true); | |
98 | // | |
99 | // bool empty(); | |
100 | // }; | |
101 | // | |
102 | // struct queue_policy | |
103 | // { | |
104 | // template <typename T> | |
105 | // struct apply | |
106 | // { | |
107 | // typedef ... type; | |
108 | // }; | |
109 | // }; | |
110 | template <typename Mutex = boost::mutex, | |
111 | typename PendingQueuing = lockfree_lifo, | |
112 | typename StagedQueuing = lockfree_lifo, | |
113 | typename TerminatedQueuing = lockfree_fifo> | |
114 | class thread_queue | |
115 | { | |
116 | private: | |
117 | // we use a simple mutex to protect the data members for now | |
118 | typedef Mutex mutex_type; | |
119 | ||
120 | // Add this number of threads to the work items queue each time the | |
121 | // function \a add_new() is called if the queue is empty. | |
122 | enum { | |
123 | min_add_new_count = 10, | |
124 | max_add_new_count = 10, | |
125 | max_delete_count = 1000 | |
126 | }; | |
127 | ||
128 | // this is the type of a map holding all threads (except depleted ones) | |
129 | typedef std::unordered_set<thread_id_type> thread_map_type; | |
130 | ||
131 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
132 | typedef | |
133 | util::tuple<thread_init_data, thread_state_enum, std::uint64_t> | |
134 | task_description; | |
135 | #else | |
136 | typedef util::tuple<thread_init_data, thread_state_enum> task_description; | |
137 | #endif | |
138 | ||
139 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
140 | typedef util::tuple<thread_data*, std::uint64_t> thread_description; | |
141 | #else | |
142 | typedef thread_data thread_description; | |
143 | #endif | |
144 | ||
145 | typedef typename PendingQueuing::template | |
146 | apply<thread_description*>::type work_items_type; | |
147 | ||
148 | typedef typename StagedQueuing::template | |
149 | apply<task_description*>::type task_items_type; | |
150 | ||
151 | typedef typename TerminatedQueuing::template | |
152 | apply<thread_data*>::type terminated_items_type; | |
153 | ||
154 | protected: | |
155 | template <typename Lock> | |
156 | void create_thread_object(threads::thread_id_type& thrd, | |
157 | threads::thread_init_data& data, thread_state_enum state, Lock& lk) | |
158 | { | |
159 | HPX_ASSERT(lk.owns_lock()); | |
160 | HPX_ASSERT(data.stacksize != 0); | |
161 | ||
162 | std::ptrdiff_t stacksize = data.stacksize; | |
163 | ||
164 | std::list<thread_id_type>* heap = nullptr; | |
165 | ||
166 | if (stacksize == get_stack_size(thread_stacksize_small)) | |
167 | { | |
168 | heap = &thread_heap_small_; | |
169 | } | |
170 | else if (stacksize == get_stack_size(thread_stacksize_medium)) | |
171 | { | |
172 | heap = &thread_heap_medium_; | |
173 | } | |
174 | else if (stacksize == get_stack_size(thread_stacksize_large)) | |
175 | { | |
176 | heap = &thread_heap_large_; | |
177 | } | |
178 | else if (stacksize == get_stack_size(thread_stacksize_huge)) | |
179 | { | |
180 | heap = &thread_heap_huge_; | |
181 | } | |
182 | else { | |
183 | switch(stacksize) { | |
184 | case thread_stacksize_small: | |
185 | heap = &thread_heap_small_; | |
186 | break; | |
187 | ||
188 | case thread_stacksize_medium: | |
189 | heap = &thread_heap_medium_; | |
190 | break; | |
191 | ||
192 | case thread_stacksize_large: | |
193 | heap = &thread_heap_large_; | |
194 | break; | |
195 | ||
196 | case thread_stacksize_huge: | |
197 | heap = &thread_heap_huge_; | |
198 | break; | |
199 | ||
200 | default: | |
201 | break; | |
202 | } | |
203 | } | |
204 | HPX_ASSERT(heap); | |
205 | ||
206 | // Check for an unused thread object. | |
207 | if (!heap->empty()) | |
208 | { | |
209 | // Take ownership of the thread object and rebind it. | |
210 | thrd = heap->front(); | |
211 | heap->pop_front(); | |
212 | thrd->rebind(data, | |
213 | state == pending_do_not_schedule ? pending : state); | |
214 | } | |
215 | ||
216 | else | |
217 | { | |
218 | hpx::util::unlock_guard<Lock> ull(lk); | |
219 | ||
220 | // Allocate a new thread object. | |
221 | thrd = threads::thread_data::create( | |
222 | data, memory_pool_, | |
223 | state == pending_do_not_schedule ? pending : state); | |
224 | } | |
225 | } | |
226 | ||
227 | /////////////////////////////////////////////////////////////////////// | |
228 | // add new threads if there is some amount of work available | |
229 | std::size_t add_new(std::int64_t add_count, thread_queue* addfrom, | |
230 | std::unique_lock<mutex_type> &lk, bool steal = false) | |
231 | { | |
232 | HPX_ASSERT(lk.owns_lock()); | |
233 | ||
234 | if (HPX_UNLIKELY(0 == add_count)) | |
235 | return 0; | |
236 | ||
237 | std::size_t added = 0; | |
238 | task_description* task = nullptr; | |
239 | while (add_count-- && addfrom->new_tasks_.pop(task, steal)) | |
240 | { | |
241 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
242 | if (maintain_queue_wait_times) { | |
243 | addfrom->new_tasks_wait_ += | |
244 | util::high_resolution_clock::now() - util::get<2>(*task); | |
245 | ++addfrom->new_tasks_wait_count_; | |
246 | } | |
247 | #endif | |
248 | --addfrom->new_tasks_count_; | |
249 | ||
250 | // measure thread creation time | |
251 | util::block_profiler_wrapper<add_new_tag> bp(add_new_logger_); | |
252 | ||
253 | // create the new thread | |
254 | threads::thread_init_data& data = util::get<0>(*task); | |
255 | thread_state_enum state = util::get<1>(*task); | |
256 | threads::thread_id_type thrd; | |
257 | ||
258 | create_thread_object(thrd, data, state, lk); | |
259 | ||
260 | delete task; | |
261 | ||
262 | // add the new entry to the map of all threads | |
263 | std::pair<thread_map_type::iterator, bool> p = | |
264 | thread_map_.insert(thrd); | |
265 | ||
266 | if (HPX_UNLIKELY(!p.second)) { | |
267 | HPX_THROW_EXCEPTION(hpx::out_of_memory, | |
268 | "threadmanager::add_new", | |
269 | "Couldn't add new thread to the thread map"); | |
270 | return 0; | |
271 | } | |
272 | ++thread_map_count_; | |
273 | ||
274 | // only insert the thread into the work-items queue if it is in | |
275 | // pending state | |
276 | if (state == pending) { | |
277 | // pushing the new thread into the pending queue of the | |
278 | // specified thread_queue | |
279 | ++added; | |
280 | schedule_thread(thrd.get()); | |
281 | } | |
282 | ||
283 | // this thread has to be in the map now | |
284 | HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end()); | |
285 | HPX_ASSERT(thrd->get_pool() == &memory_pool_); | |
286 | } | |
287 | ||
288 | if (added) { | |
289 | LTM_(debug) << "add_new: added " << added << " tasks to queues"; //-V128 | |
290 | } | |
291 | return added; | |
292 | } | |
293 | ||
294 | /////////////////////////////////////////////////////////////////////// | |
295 | bool add_new_if_possible(std::size_t& added, thread_queue* addfrom, | |
296 | std::unique_lock<mutex_type> &lk, bool steal = false) | |
297 | { | |
298 | HPX_ASSERT(lk.owns_lock()); | |
299 | ||
300 | #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES | |
301 | util::tick_counter tc(add_new_time_); | |
302 | #endif | |
303 | ||
304 | if (0 == addfrom->new_tasks_count_.load(boost::memory_order_relaxed)) | |
305 | return false; | |
306 | ||
307 | // create new threads from pending tasks (if appropriate) | |
308 | std::int64_t add_count = -1; // default is no constraint | |
309 | ||
310 | // if the map doesn't hold max_count threads yet add some | |
311 | // FIXME: why do we have this test? can max_count_ ever be zero? | |
312 | if (HPX_LIKELY(max_count_)) { | |
313 | std::size_t count = thread_map_.size(); | |
314 | if (max_count_ >= count + min_add_new_count) { //-V104 | |
315 | HPX_ASSERT(max_count_ - count < | |
316 | static_cast<std::size_t>((std::numeric_limits | |
317 | <std::int64_t>::max)())); | |
318 | add_count = static_cast<std::int64_t>(max_count_ - count); | |
319 | if (add_count < min_add_new_count) | |
320 | add_count = min_add_new_count; | |
321 | } | |
322 | else { | |
323 | return false; | |
324 | } | |
325 | } | |
326 | ||
327 | std::size_t addednew = add_new(add_count, addfrom, lk, steal); | |
328 | added += addednew; | |
329 | return addednew != 0; | |
330 | } | |
331 | ||
332 | /////////////////////////////////////////////////////////////////////// | |
333 | bool add_new_always(std::size_t& added, thread_queue* addfrom, | |
334 | std::unique_lock<mutex_type> &lk, bool steal = false) | |
335 | { | |
336 | HPX_ASSERT(lk.owns_lock()); | |
337 | ||
338 | #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES | |
339 | util::tick_counter tc(add_new_time_); | |
340 | #endif | |
341 | ||
342 | if (0 == addfrom->new_tasks_count_.load(boost::memory_order_relaxed)) | |
343 | return false; | |
344 | ||
345 | // create new threads from pending tasks (if appropriate) | |
346 | std::int64_t add_count = -1; // default is no constraint | |
347 | ||
348 | // if we are desperate (no work in the queues), add some even if the | |
349 | // map holds more than max_count | |
350 | if (HPX_LIKELY(max_count_)) { | |
351 | std::size_t count = thread_map_.size(); | |
352 | if (max_count_ >= count + min_add_new_count) { //-V104 | |
353 | HPX_ASSERT(max_count_ - count < | |
354 | static_cast<std::size_t>((std::numeric_limits | |
355 | <std::int64_t>::max)())); | |
356 | add_count = static_cast<std::int64_t>(max_count_ - count); | |
357 | if (add_count < min_add_new_count) | |
358 | add_count = min_add_new_count; | |
359 | if (add_count > max_add_new_count) | |
360 | add_count = max_add_new_count; | |
361 | } | |
362 | else if (work_items_.empty()) { | |
363 | add_count = min_add_new_count; // add this number of threads | |
364 | max_count_ += min_add_new_count; // increase max_count //-V101 | |
365 | } | |
366 | else { | |
367 | return false; | |
368 | } | |
369 | } | |
370 | ||
371 | std::size_t addednew = add_new(add_count, addfrom, lk, steal); | |
372 | added += addednew; | |
373 | return addednew != 0; | |
374 | } | |
375 | ||
376 | void recycle_thread(thread_id_type thrd) | |
377 | { | |
378 | std::ptrdiff_t stacksize = thrd->get_stack_size(); | |
379 | ||
380 | if (stacksize == get_stack_size(thread_stacksize_small)) | |
381 | { | |
382 | thread_heap_small_.push_front(thrd); | |
383 | } | |
384 | else if (stacksize == get_stack_size(thread_stacksize_medium)) | |
385 | { | |
386 | thread_heap_medium_.push_front(thrd); | |
387 | } | |
388 | else if (stacksize == get_stack_size(thread_stacksize_large)) | |
389 | { | |
390 | thread_heap_large_.push_front(thrd); | |
391 | } | |
392 | else if (stacksize == get_stack_size(thread_stacksize_huge)) | |
393 | { | |
394 | thread_heap_huge_.push_front(thrd); | |
395 | } | |
396 | else | |
397 | { | |
398 | switch(stacksize) { | |
399 | case thread_stacksize_small: | |
400 | thread_heap_small_.push_front(thrd); | |
401 | break; | |
402 | ||
403 | case thread_stacksize_medium: | |
404 | thread_heap_medium_.push_front(thrd); | |
405 | break; | |
406 | ||
407 | case thread_stacksize_large: | |
408 | thread_heap_large_.push_front(thrd); | |
409 | break; | |
410 | ||
411 | case thread_stacksize_huge: | |
412 | thread_heap_huge_.push_front(thrd); | |
413 | break; | |
414 | ||
415 | default: | |
416 | HPX_ASSERT(false); | |
417 | break; | |
418 | } | |
419 | } | |
420 | } | |
421 | ||
422 | public: | |
423 | /// This function makes sure all threads which are marked for deletion | |
424 | /// (state is terminated) are properly destroyed. | |
425 | /// | |
426 | /// This returns 'true' if there are no more terminated threads waiting | |
427 | /// to be deleted. | |
428 | bool cleanup_terminated_locked_helper(bool delete_all = false) | |
429 | { | |
430 | #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES | |
431 | util::tick_counter tc(cleanup_terminated_time_); | |
432 | #endif | |
433 | ||
434 | if (terminated_items_count_ == 0 && thread_map_.empty()) | |
435 | return true; | |
436 | ||
437 | if (delete_all) { | |
438 | // delete all threads | |
439 | thread_data* todelete; | |
440 | while (terminated_items_.pop(todelete)) | |
441 | { | |
442 | --terminated_items_count_; | |
443 | ||
444 | // this thread has to be in this map | |
445 | HPX_ASSERT(thread_map_.find(todelete) != thread_map_.end()); | |
446 | ||
447 | bool deleted = thread_map_.erase(todelete) != 0; | |
448 | HPX_ASSERT(deleted); | |
449 | if (deleted) { | |
450 | --thread_map_count_; | |
451 | HPX_ASSERT(thread_map_count_ >= 0); | |
452 | } | |
453 | } | |
454 | } | |
455 | else { | |
456 | // delete only this many threads | |
457 | std::int64_t delete_count = | |
458 | (std::max)( | |
459 | static_cast<std::int64_t>(terminated_items_count_ / 10), | |
460 | static_cast<std::int64_t>(max_delete_count)); | |
461 | ||
462 | thread_data* todelete; | |
463 | while (delete_count && terminated_items_.pop(todelete)) | |
464 | { | |
465 | --terminated_items_count_; | |
466 | ||
467 | thread_map_type::iterator it = thread_map_.find(todelete); | |
468 | ||
469 | // this thread has to be in this map | |
470 | HPX_ASSERT(it != thread_map_.end()); | |
471 | ||
472 | recycle_thread(*it); | |
473 | ||
474 | thread_map_.erase(it); | |
475 | --thread_map_count_; | |
476 | HPX_ASSERT(thread_map_count_ >= 0); | |
477 | ||
478 | --delete_count; | |
479 | } | |
480 | } | |
481 | return terminated_items_count_ == 0; | |
482 | } | |
483 | ||
484 | bool cleanup_terminated_locked(bool delete_all = false) | |
485 | { | |
486 | return cleanup_terminated_locked_helper(delete_all) && | |
487 | thread_map_.empty(); | |
488 | } | |
489 | ||
490 | public: | |
491 | bool cleanup_terminated(bool delete_all = false) | |
492 | { | |
493 | if (terminated_items_count_ == 0) | |
494 | return thread_map_count_ == 0; | |
495 | ||
496 | if (delete_all) { | |
497 | // do not lock mutex while deleting all threads, do it piece-wise | |
498 | bool thread_map_is_empty = false; | |
499 | while (true) | |
500 | { | |
501 | std::lock_guard<mutex_type> lk(mtx_); | |
502 | if (cleanup_terminated_locked_helper(false)) | |
503 | { | |
504 | thread_map_is_empty = | |
505 | (thread_map_count_ == 0) && (new_tasks_count_ == 0); | |
506 | break; | |
507 | } | |
508 | } | |
509 | return thread_map_is_empty; | |
510 | } | |
511 | ||
512 | std::lock_guard<mutex_type> lk(mtx_); | |
513 | return cleanup_terminated_locked_helper(false) && | |
514 | (thread_map_count_ == 0) && (new_tasks_count_ == 0); | |
515 | } | |
516 | ||
517 | // The maximum number of active threads this thread manager should | |
518 | // create. This number will be a constraint only as long as the work | |
519 | // items queue is not empty. Otherwise the number of active threads | |
520 | // will be incremented in steps equal to the \a min_add_new_count | |
521 | // specified above. | |
522 | enum { max_thread_count = 1000 }; | |
523 | ||
524 | thread_queue(std::size_t queue_num = std::size_t(-1), | |
525 | std::size_t max_count = max_thread_count) | |
526 | : thread_map_count_(0), | |
527 | work_items_(128, queue_num), | |
528 | work_items_count_(0), | |
529 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
530 | work_items_wait_(0), | |
531 | work_items_wait_count_(0), | |
532 | #endif | |
533 | terminated_items_(128), | |
534 | terminated_items_count_(0), | |
535 | max_count_((0 == max_count) | |
536 | ? static_cast<std::size_t>(max_thread_count) | |
537 | : max_count), | |
538 | new_tasks_(128), | |
539 | new_tasks_count_(0), | |
540 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
541 | new_tasks_wait_(0), | |
542 | new_tasks_wait_count_(0), | |
543 | #endif | |
544 | memory_pool_(64), | |
545 | thread_heap_small_(), | |
546 | thread_heap_medium_(), | |
547 | thread_heap_large_(), | |
548 | thread_heap_huge_(), | |
549 | #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES | |
550 | add_new_time_(0), | |
551 | cleanup_terminated_time_(0), | |
552 | #endif | |
553 | #ifdef HPX_HAVE_THREAD_STEALING_COUNTS | |
554 | pending_misses_(0), | |
555 | pending_accesses_(0), | |
556 | stolen_from_pending_(0), | |
557 | stolen_from_staged_(0), | |
558 | stolen_to_pending_(0), | |
559 | stolen_to_staged_(0), | |
560 | #endif | |
561 | add_new_logger_("thread_queue::add_new") | |
562 | {} | |
563 | ||
564 | void set_max_count(std::size_t max_count = max_thread_count) | |
565 | { | |
566 | max_count_ = (0 == max_count) ? max_thread_count : max_count; //-V105 | |
567 | } | |
568 | ||
569 | #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES | |
570 | std::uint64_t get_creation_time(bool reset) | |
571 | { | |
572 | return util::get_and_reset_value(add_new_time_, reset); | |
573 | } | |
574 | ||
575 | std::uint64_t get_cleanup_time(bool reset) | |
576 | { | |
577 | return util::get_and_reset_value(cleanup_terminated_time_, reset); | |
578 | } | |
579 | #endif | |
580 | ||
581 | /////////////////////////////////////////////////////////////////////// | |
582 | // This returns the current length of the queues (work items and new items) | |
583 | std::int64_t get_queue_length() const | |
584 | { | |
585 | return work_items_count_ + new_tasks_count_; | |
586 | } | |
587 | ||
588 | // This returns the current length of the pending queue | |
589 | std::int64_t get_pending_queue_length() const | |
590 | { | |
591 | return work_items_count_; | |
592 | } | |
593 | ||
594 | // This returns the current length of the staged queue | |
595 | std::int64_t get_staged_queue_length( | |
596 | boost::memory_order order = boost::memory_order_seq_cst) const | |
597 | { | |
598 | return new_tasks_count_.load(order); | |
599 | } | |
600 | ||
601 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
602 | std::uint64_t get_average_task_wait_time() const | |
603 | { | |
604 | std::uint64_t count = new_tasks_wait_count_; | |
605 | if (count == 0) | |
606 | return 0; | |
607 | return new_tasks_wait_ / count; | |
608 | } | |
609 | ||
610 | std::uint64_t get_average_thread_wait_time() const | |
611 | { | |
612 | std::uint64_t count = work_items_wait_count_; | |
613 | if (count == 0) | |
614 | return 0; | |
615 | return work_items_wait_ / count; | |
616 | } | |
617 | #endif | |
618 | ||
619 | #ifdef HPX_HAVE_THREAD_STEALING_COUNTS | |
620 | std::int64_t get_num_pending_misses(bool reset) | |
621 | { | |
622 | return util::get_and_reset_value(pending_misses_, reset); | |
623 | } | |
624 | ||
625 | void increment_num_pending_misses(std::size_t num = 1) | |
626 | { | |
627 | pending_misses_ += num; | |
628 | } | |
629 | ||
630 | std::int64_t get_num_pending_accesses(bool reset) | |
631 | { | |
632 | return util::get_and_reset_value(pending_accesses_, reset); | |
633 | } | |
634 | ||
635 | void increment_num_pending_accesses(std::size_t num = 1) | |
636 | { | |
637 | pending_accesses_ += num; | |
638 | } | |
639 | ||
640 | std::int64_t get_num_stolen_from_pending(bool reset) | |
641 | { | |
642 | return util::get_and_reset_value(stolen_from_pending_, reset); | |
643 | } | |
644 | ||
645 | void increment_num_stolen_from_pending(std::size_t num = 1) | |
646 | { | |
647 | stolen_from_pending_ += num; | |
648 | } | |
649 | ||
650 | std::int64_t get_num_stolen_from_staged(bool reset) | |
651 | { | |
652 | return util::get_and_reset_value(stolen_from_staged_, reset); | |
653 | } | |
654 | ||
655 | void increment_num_stolen_from_staged(std::size_t num = 1) | |
656 | { | |
657 | stolen_from_staged_ += num; | |
658 | } | |
659 | ||
660 | std::int64_t get_num_stolen_to_pending(bool reset) | |
661 | { | |
662 | return util::get_and_reset_value(stolen_to_pending_, reset); | |
663 | } | |
664 | ||
665 | void increment_num_stolen_to_pending(std::size_t num = 1) | |
666 | { | |
667 | stolen_to_pending_ += num; | |
668 | } | |
669 | ||
670 | std::int64_t get_num_stolen_to_staged(bool reset) | |
671 | { | |
672 | return util::get_and_reset_value(stolen_to_staged_, reset); | |
673 | } | |
674 | ||
675 | void increment_num_stolen_to_staged(std::size_t num = 1) | |
676 | { | |
677 | stolen_to_staged_ += num; | |
678 | } | |
679 | #else | |
680 | void increment_num_pending_misses(std::size_t num = 1) {} | |
681 | void increment_num_pending_accesses(std::size_t num = 1) {} | |
682 | void increment_num_stolen_from_pending(std::size_t num = 1) {} | |
683 | void increment_num_stolen_from_staged(std::size_t num = 1) {} | |
684 | void increment_num_stolen_to_pending(std::size_t num = 1) {} | |
685 | void increment_num_stolen_to_staged(std::size_t num = 1) {} | |
686 | #endif | |
687 | ||
688 | /////////////////////////////////////////////////////////////////////// | |
689 | // create a new thread and schedule it if the initial state is equal to | |
690 | // pending | |
691 | void create_thread(thread_init_data& data, thread_id_type* id, | |
692 | thread_state_enum initial_state, bool run_now, error_code& ec) | |
693 | { | |
694 | // thread has not been created yet | |
695 | if (id) *id = invalid_thread_id; | |
696 | ||
697 | if (run_now) | |
698 | { | |
699 | threads::thread_id_type thrd; | |
700 | ||
701 | // The mutex can not be locked while a new thread is getting | |
702 | // created, as it might have that the current HPX thread gets | |
703 | // suspended. | |
704 | { | |
705 | std::unique_lock<mutex_type> lk(mtx_); | |
706 | ||
707 | create_thread_object(thrd, data, initial_state, lk); | |
708 | ||
709 | // add a new entry in the map for this thread | |
710 | std::pair<thread_map_type::iterator, bool> p = | |
711 | thread_map_.insert(thrd); | |
712 | ||
713 | if (HPX_UNLIKELY(!p.second)) { | |
714 | HPX_THROWS_IF(ec, hpx::out_of_memory, | |
715 | "threadmanager::register_thread", | |
716 | "Couldn't add new thread to the map of threads"); | |
717 | return; | |
718 | } | |
719 | ++thread_map_count_; | |
720 | ||
721 | // this thread has to be in the map now | |
722 | HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end()); | |
723 | HPX_ASSERT(thrd->get_pool() == &memory_pool_); | |
724 | ||
725 | // push the new thread in the pending queue thread | |
726 | if (initial_state == pending) | |
727 | schedule_thread(thrd.get()); | |
728 | ||
729 | // return the thread_id of the newly created thread | |
730 | if (id) *id = std::move(thrd); | |
731 | ||
732 | if (&ec != &throws) | |
733 | ec = make_success_code(); | |
734 | return; | |
735 | } | |
736 | } | |
737 | ||
738 | // do not execute the work, but register a task description for | |
739 | // later thread creation | |
740 | ++new_tasks_count_; | |
741 | ||
742 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
743 | new_tasks_.push(new task_description( | |
744 | std::move(data), initial_state, | |
745 | util::high_resolution_clock::now() | |
746 | )); | |
747 | #else | |
748 | new_tasks_.push(new task_description( //-V106 | |
749 | std::move(data), initial_state)); | |
750 | #endif | |
751 | if (&ec != &throws) | |
752 | ec = make_success_code(); | |
753 | } | |
754 | ||
755 | void move_work_items_from(thread_queue *src, std::int64_t count) | |
756 | { | |
757 | thread_description* trd; | |
758 | while (src->work_items_.pop(trd)) | |
759 | { | |
760 | --src->work_items_count_; | |
761 | ||
762 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
763 | if (maintain_queue_wait_times) { | |
764 | std::uint64_t now = util::high_resolution_clock::now(); | |
765 | src->work_items_wait_ += now - util::get<1>(*trd); | |
766 | ++src->work_items_wait_count_; | |
767 | util::get<1>(*trd) = now; | |
768 | } | |
769 | #endif | |
770 | ||
771 | bool finished = count == ++work_items_count_; | |
772 | work_items_.push(trd); | |
773 | if (finished) | |
774 | break; | |
775 | } | |
776 | } | |
777 | ||
778 | void move_task_items_from(thread_queue *src, | |
779 | std::int64_t count) | |
780 | { | |
781 | task_description* task; | |
782 | while (src->new_tasks_.pop(task)) | |
783 | { | |
784 | --src->new_tasks_count_; | |
785 | ||
786 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
787 | if (maintain_queue_wait_times) { | |
788 | std::int64_t now = util::high_resolution_clock::now(); | |
789 | src->new_tasks_wait_ += now - util::get<2>(*task); | |
790 | ++src->new_tasks_wait_count_; | |
791 | util::get<2>(*task) = now; | |
792 | } | |
793 | #endif | |
794 | ||
795 | bool finish = count == ++new_tasks_count_; | |
796 | if (new_tasks_.push(task)) | |
797 | { | |
798 | if (finish) | |
799 | break; | |
800 | } | |
801 | else | |
802 | { | |
803 | --new_tasks_count_; | |
804 | } | |
805 | } | |
806 | } | |
807 | ||
808 | /// Return the next thread to be executed, return false if non is | |
809 | /// available | |
810 | bool get_next_thread(threads::thread_data*& thrd, | |
811 | bool steal = false) HPX_HOT | |
812 | { | |
813 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
814 | thread_description* tdesc; | |
815 | if (0 != work_items_count_.load(boost::memory_order_relaxed) && | |
816 | work_items_.pop(tdesc, steal)) | |
817 | { | |
818 | --work_items_count_; | |
819 | ||
820 | if (maintain_queue_wait_times) { | |
821 | work_items_wait_ += util::high_resolution_clock::now() - | |
822 | util::get<1>(*tdesc); | |
823 | ++work_items_wait_count_; | |
824 | } | |
825 | ||
826 | thrd = util::get<0>(*tdesc); | |
827 | delete tdesc; | |
828 | ||
829 | return true; | |
830 | } | |
831 | #else | |
832 | if (0 != work_items_count_.load(boost::memory_order_relaxed) && | |
833 | work_items_.pop(thrd, steal)) | |
834 | { | |
835 | --work_items_count_; | |
836 | return true; | |
837 | } | |
838 | #endif | |
839 | return false; | |
840 | } | |
841 | ||
842 | /// Schedule the passed thread | |
843 | void schedule_thread(threads::thread_data* thrd, bool other_end = false) | |
844 | { | |
845 | ++work_items_count_; | |
846 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
847 | work_items_.push(new thread_description( | |
848 | thrd, util::high_resolution_clock::now()), other_end); | |
849 | #else | |
850 | work_items_.push(thrd, other_end); | |
851 | #endif | |
852 | } | |
853 | ||
854 | /// Destroy the passed thread as it has been terminated | |
855 | bool destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count) | |
856 | { | |
857 | if (thrd->get_pool() == &memory_pool_) | |
858 | { | |
859 | terminated_items_.push(thrd); | |
860 | ||
861 | std::int64_t count = ++terminated_items_count_; | |
862 | if (count > HPX_MAX_TERMINATED_THREADS) | |
863 | { | |
864 | cleanup_terminated(true); // clean up all terminated threads | |
865 | } | |
866 | return true; | |
867 | } | |
868 | return false; | |
869 | } | |
870 | ||
871 | /////////////////////////////////////////////////////////////////////// | |
872 | /// Return the number of existing threads with the given state. | |
873 | std::int64_t get_thread_count(thread_state_enum state = unknown) const | |
874 | { | |
875 | if (terminated == state) | |
876 | return terminated_items_count_; | |
877 | ||
878 | if (staged == state) | |
879 | return new_tasks_count_; | |
880 | ||
881 | if (unknown == state) | |
882 | return thread_map_count_ + new_tasks_count_ - terminated_items_count_; | |
883 | ||
884 | // acquire lock only if absolutely necessary | |
885 | std::lock_guard<mutex_type> lk(mtx_); | |
886 | ||
887 | std::int64_t num_threads = 0; | |
888 | thread_map_type::const_iterator end = thread_map_.end(); | |
889 | for (thread_map_type::const_iterator it = thread_map_.begin(); | |
890 | it != end; ++it) | |
891 | { | |
892 | if ((*it)->get_state().state() == state) | |
893 | ++num_threads; | |
894 | } | |
895 | return num_threads; | |
896 | } | |
897 | ||
898 | /////////////////////////////////////////////////////////////////////// | |
899 | void abort_all_suspended_threads() | |
900 | { | |
901 | std::lock_guard<mutex_type> lk(mtx_); | |
902 | thread_map_type::iterator end = thread_map_.end(); | |
903 | for (thread_map_type::iterator it = thread_map_.begin(); | |
904 | it != end; ++it) | |
905 | { | |
906 | if ((*it)->get_state().state() == suspended) | |
907 | { | |
908 | (*it)->set_state(pending, wait_abort); | |
909 | schedule_thread((*it).get()); | |
910 | } | |
911 | } | |
912 | } | |
913 | ||
914 | bool enumerate_threads( | |
915 | util::function_nonser<bool(thread_id_type)> const& f, | |
916 | thread_state_enum state = unknown) const | |
917 | { | |
918 | std::uint64_t count = thread_map_count_; | |
919 | if (state == terminated) | |
920 | { | |
921 | count = terminated_items_count_; | |
922 | } | |
923 | else if (state == staged) | |
924 | { | |
925 | HPX_THROW_EXCEPTION(bad_parameter, | |
926 | "thread_queue::iterate_threads", | |
927 | "can't iterate over thread ids of staged threads"); | |
928 | return false; | |
929 | } | |
930 | ||
931 | std::vector<thread_id_type> ids; | |
932 | ids.reserve(count); | |
933 | ||
934 | if (state == unknown) | |
935 | { | |
936 | std::lock_guard<mutex_type> lk(mtx_); | |
937 | thread_map_type::const_iterator end = thread_map_.end(); | |
938 | for (thread_map_type::const_iterator it = thread_map_.begin(); | |
939 | it != end; ++it) | |
940 | { | |
941 | ids.push_back(*it); | |
942 | } | |
943 | } | |
944 | else | |
945 | { | |
946 | std::lock_guard<mutex_type> lk(mtx_); | |
947 | thread_map_type::const_iterator end = thread_map_.end(); | |
948 | for (thread_map_type::const_iterator it = thread_map_.begin(); | |
949 | it != end; ++it) | |
950 | { | |
951 | if ((*it)->get_state().state() == state) | |
952 | ids.push_back(*it); | |
953 | } | |
954 | } | |
955 | ||
956 | // now invoke callback function for all matching threads | |
957 | for (thread_id_type const& id : ids) | |
958 | { | |
959 | if (!f(id)) | |
960 | return false; // stop iteration | |
961 | } | |
962 | ||
963 | return true; | |
964 | } | |
965 | ||
966 | /// This is a function which gets called periodically by the thread | |
967 | /// manager to allow for maintenance tasks to be executed in the | |
968 | /// scheduler. Returns true if the OS thread calling this function | |
969 | /// has to be terminated (i.e. no more work has to be done). | |
970 | inline bool wait_or_add_new(bool running, | |
971 | std::int64_t& idle_loop_count, std::size_t& added, | |
972 | thread_queue* addfrom_ = nullptr, bool steal = false) HPX_HOT | |
973 | { | |
974 | // try to generate new threads from task lists, but only if our | |
975 | // own list of threads is empty | |
976 | if (0 == work_items_count_.load(boost::memory_order_relaxed)) { | |
977 | ||
978 | // No obvious work has to be done, so a lock won't hurt too much. | |
979 | // | |
980 | // We prefer to exit this function (some kind of very short | |
981 | // busy waiting) to blocking on this lock. Locking fails either | |
982 | // when a thread is currently doing thread maintenance, which | |
983 | // means there might be new work, or the thread owning the lock | |
984 | // just falls through to the cleanup work below (no work is available) | |
985 | // in which case the current thread (which failed to acquire | |
986 | // the lock) will just retry to enter this loop. | |
987 | std::unique_lock<mutex_type> lk(mtx_, std::try_to_lock); | |
988 | if (!lk.owns_lock()) | |
989 | return false; // avoid long wait on lock | |
990 | ||
991 | // stop running after all HPX threads have been terminated | |
992 | thread_queue* addfrom = addfrom_ ? addfrom_ : this; | |
993 | bool added_new = add_new_always(added, addfrom, lk, steal); | |
994 | if (!added_new) { | |
995 | // Before exiting each of the OS threads deletes the | |
996 | // remaining terminated HPX threads | |
997 | // REVIEW: Should we be doing this if we are stealing? | |
998 | bool canexit = cleanup_terminated_locked(true); | |
999 | if (!running && canexit) { | |
1000 | // we don't have any registered work items anymore | |
1001 | //do_some_work(); // notify possibly waiting threads | |
1002 | return true; // terminate scheduling loop | |
1003 | } | |
1004 | return false; | |
1005 | } | |
1006 | ||
1007 | cleanup_terminated_locked(); | |
1008 | } | |
1009 | return false; | |
1010 | } | |
1011 | ||
1012 | /////////////////////////////////////////////////////////////////////// | |
1013 | bool dump_suspended_threads(std::size_t num_thread | |
1014 | , std::int64_t& idle_loop_count, bool running) | |
1015 | { | |
1016 | #ifndef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION | |
1017 | return false; | |
1018 | #else | |
1019 | if (minimal_deadlock_detection) { | |
1020 | std::lock_guard<mutex_type> lk(mtx_); | |
1021 | return detail::dump_suspended_threads(num_thread, thread_map_ | |
1022 | , idle_loop_count, running); | |
1023 | } | |
1024 | return false; | |
1025 | #endif | |
1026 | } | |
1027 | ||
1028 | /////////////////////////////////////////////////////////////////////// | |
1029 | void on_start_thread(std::size_t num_thread) {} | |
1030 | void on_stop_thread(std::size_t num_thread) {} | |
1031 | void on_error(std::size_t num_thread, boost::exception_ptr const& e) {} | |
1032 | ||
1033 | private: | |
1034 | mutable mutex_type mtx_; ///< mutex protecting the members | |
1035 | ||
1036 | thread_map_type thread_map_; | |
1037 | ///< mapping of thread id's to HPX-threads | |
1038 | boost::atomic<std::int64_t> thread_map_count_; | |
1039 | ///< overall count of work items | |
1040 | ||
1041 | work_items_type work_items_; | |
1042 | ///< list of active work items | |
1043 | boost::atomic<std::int64_t> work_items_count_; | |
1044 | ///< count of active work items | |
1045 | ||
1046 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
1047 | boost::atomic<std::int64_t> work_items_wait_; | |
1048 | ///< overall wait time of work items | |
1049 | boost::atomic<std::int64_t> work_items_wait_count_; | |
1050 | ///< overall number of work items in queue | |
1051 | #endif | |
1052 | terminated_items_type terminated_items_; ///< list of terminated threads | |
1053 | boost::atomic<std::int64_t> terminated_items_count_; | |
1054 | ///< count of terminated items | |
1055 | ||
1056 | std::size_t max_count_; | |
1057 | ///< maximum number of existing HPX-threads | |
1058 | task_items_type new_tasks_; | |
1059 | ///< list of new tasks to run | |
1060 | ||
1061 | boost::atomic<std::int64_t> new_tasks_count_; | |
1062 | ///< count of new tasks to run | |
1063 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | |
1064 | boost::atomic<std::int64_t> new_tasks_wait_; | |
1065 | ///< overall wait time of new tasks | |
1066 | boost::atomic<std::int64_t> new_tasks_wait_count_; | |
1067 | ///< overall number tasks waited | |
1068 | #endif | |
1069 | ||
1070 | threads::thread_pool memory_pool_; ///< OS thread local memory pools for | |
1071 | ///< HPX-threads | |
1072 | ||
1073 | std::list<thread_id_type> thread_heap_small_; | |
1074 | std::list<thread_id_type> thread_heap_medium_; | |
1075 | std::list<thread_id_type> thread_heap_large_; | |
1076 | std::list<thread_id_type> thread_heap_huge_; | |
1077 | ||
1078 | #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES | |
1079 | std::uint64_t add_new_time_; | |
1080 | std::uint64_t cleanup_terminated_time_; | |
1081 | #endif | |
1082 | ||
1083 | #ifdef HPX_HAVE_THREAD_STEALING_COUNTS | |
1084 | // # of times our associated worker-thread couldn't find work in work_items | |
1085 | boost::atomic<std::int64_t> pending_misses_; | |
1086 | ||
1087 | // # of times our associated worker-thread looked for work in work_items | |
1088 | boost::atomic<std::int64_t> pending_accesses_; | |
1089 | ||
1090 | boost::atomic<std::int64_t> stolen_from_pending_; | |
1091 | ///< count of work_items stolen from this queue | |
1092 | boost::atomic<std::int64_t> stolen_from_staged_; | |
1093 | ///< count of new_tasks stolen from this queue | |
1094 | boost::atomic<std::int64_t> stolen_to_pending_; | |
1095 | ///< count of work_items stolen to this queue from other queues | |
1096 | boost::atomic<std::int64_t> stolen_to_staged_; | |
1097 | ///< count of new_tasks stolen to this queue from other queues | |
1098 | #endif | |
1099 | ||
1100 | util::block_profiler<add_new_tag> add_new_logger_; | |
1101 | }; | |
1102 | }}} | |
1103 | ||
1104 | #endif | |
1105 | ||
1106 |
Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.