Line | % of fetches | Source | |
---|---|---|---|
1 | // Copyright (c) 2007-2016 Hartmut Kaiser | ||
2 | // Copyright (c) 2011 Bryce Lelbach | ||
3 | // | ||
4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
6 | |||
7 | #if !defined(HPX_THREADMANAGER_SCHEDULING_LOCAL_PRIORITY_QUEUE_MAR_15_2011_0926AM) | ||
8 | #define HPX_THREADMANAGER_SCHEDULING_LOCAL_PRIORITY_QUEUE_MAR_15_2011_0926AM | ||
9 | |||
10 | #include <hpx/config.hpp> | ||
11 | #include <hpx/runtime/threads/policies/affinity_data.hpp> | ||
12 | #include <hpx/runtime/threads/policies/scheduler_base.hpp> | ||
13 | #include <hpx/runtime/threads/policies/thread_queue.hpp> | ||
14 | #include <hpx/runtime/threads/thread_data.hpp> | ||
15 | #include <hpx/runtime/threads/topology.hpp> | ||
16 | #include <hpx/runtime/threads_fwd.hpp> | ||
17 | #include <hpx/throw_exception.hpp> | ||
18 | #include <hpx/util/logging.hpp> | ||
19 | #include <hpx/util_fwd.hpp> | ||
20 | |||
21 | #include <boost/atomic.hpp> | ||
22 | #include <boost/exception_ptr.hpp> | ||
23 | |||
24 | #include <cstddef> | ||
25 | #include <cstdint> | ||
26 | #include <memory> | ||
27 | #include <string> | ||
28 | #include <type_traits> | ||
29 | #include <vector> | ||
30 | |||
31 | #include <hpx/config/warnings_prefix.hpp> | ||
32 | |||
33 | // TODO: add branch prediction and function heat | ||
34 | |||
35 | /////////////////////////////////////////////////////////////////////////////// | ||
36 | namespace hpx { namespace threads { namespace policies | ||
37 | { | ||
38 | #ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION | ||
39 | /////////////////////////////////////////////////////////////////////////// | ||
40 | // We globally control whether to do minimal deadlock detection using this | ||
41 | // global bool variable. It will be set once by the runtime configuration | ||
42 | // startup code | ||
43 | extern bool minimal_deadlock_detection; | ||
44 | #endif | ||
45 | |||
46 | /////////////////////////////////////////////////////////////////////////// | ||
47 | /// The local_priority_queue_scheduler maintains exactly one queue of work | ||
48 | /// items (threads) per OS thread, where this OS thread pulls its next work | ||
49 | /// from. Additionally it maintains separate queues: several for high | ||
50 | /// priority threads and one for low priority threads. | ||
51 | /// High priority threads are executed by the first N OS threads before any | ||
52 | /// other work is executed. Low priority threads are executed by the last | ||
53 | /// OS thread whenever no other work is available. | ||
54 | template <typename Mutex | ||
55 | , typename PendingQueuing | ||
56 | , typename StagedQueuing | ||
57 | , typename TerminatedQueuing | ||
58 | > | ||
59 | class local_priority_queue_scheduler : public scheduler_base | ||
60 | { | ||
61 | protected: | ||
62 | // The maximum number of active threads this thread manager should | ||
63 | // create. This number will be a constraint only as long as the work | ||
64 | // items queue is not empty. Otherwise the number of active threads | ||
65 | // will be incremented in steps equal to the \a min_add_new_count | ||
66 | // specified above. | ||
67 | // FIXME: this is specified both here, and in thread_queue. | ||
68 | enum { max_thread_count = 1000 }; | ||
69 | |||
70 | public: | ||
71 | typedef std::false_type has_periodic_maintenance; | ||
72 | |||
73 | typedef thread_queue< | ||
74 | Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing | ||
75 | > thread_queue_type; | ||
76 | |||
77 | // the scheduler type takes two initialization parameters: | ||
78 | // the number of queues | ||
79 | // the number of high priority queues | ||
80 | // the maxcount per queue | ||
81 | struct init_parameter | ||
82 | { | ||
83 | init_parameter() | ||
84 | : num_queues_(1), | ||
85 | num_high_priority_queues_(1), | ||
86 | max_queue_thread_count_(max_thread_count), | ||
87 | numa_sensitive_(0), | ||
88 | description_("local_priority_queue_scheduler") | ||
89 | {} | ||
90 | |||
91 | init_parameter(std::size_t num_queues, | ||
92 | std::size_t num_high_priority_queues = std::size_t(-1), | ||
93 | std::size_t max_queue_thread_count = max_thread_count, | ||
94 | std::size_t numa_sensitive = 0, | ||
95 | char const* description = "local_priority_queue_scheduler") | ||
96 | : num_queues_(num_queues), | ||
97 | num_high_priority_queues_( | ||
98 | num_high_priority_queues == std::size_t(-1) ? | ||
99 | num_queues : num_high_priority_queues), | ||
100 | max_queue_thread_count_(max_queue_thread_count), | ||
101 | numa_sensitive_(numa_sensitive), | ||
102 | description_(description) | ||
103 | {} | ||
104 | |||
105 | init_parameter(std::size_t num_queues, char const* description) | ||
106 | : num_queues_(num_queues), | ||
107 | num_high_priority_queues_(num_queues), | ||
108 | max_queue_thread_count_(max_thread_count), | ||
109 | numa_sensitive_(false), | ||
110 | description_(description) | ||
111 | {} | ||
112 | |||
113 | std::size_t num_queues_; | ||
114 | std::size_t num_high_priority_queues_; | ||
115 | std::size_t max_queue_thread_count_; | ||
116 | std::size_t numa_sensitive_; | ||
117 | char const* description_; | ||
118 | }; | ||
119 | typedef init_parameter init_parameter_type; | ||
120 | |||
121 | local_priority_queue_scheduler(init_parameter_type const& init, | ||
122 | bool deferred_initialization = true) | ||
123 | : scheduler_base(init.num_queues_, init.description_), | ||
124 | max_queue_thread_count_(init.max_queue_thread_count_), | ||
125 | queues_(init.num_queues_), | ||
126 | high_priority_queues_(init.num_high_priority_queues_), | ||
127 | low_priority_queue_(init.max_queue_thread_count_), | ||
128 | curr_queue_(0), | ||
129 | numa_sensitive_(init.numa_sensitive_), | ||
130 | #if !defined(HPX_NATIVE_MIC) // we know that the MIC has one NUMA domain only | ||
131 | steals_in_numa_domain_(), | ||
132 | steals_outside_numa_domain_(), | ||
133 | #endif | ||
134 | #if !defined(HPX_HAVE_MORE_THAN_64_THREADS) || defined(HPX_HAVE_MAX_CPU_COUNT) | ||
135 | numa_domain_masks_(init.num_queues_), | ||
136 | outside_numa_domain_masks_(init.num_queues_) | ||
137 | #else | ||
138 | numa_domain_masks_(init.num_queues_, topology_.get_machine_affinity_mask()), | ||
139 | outside_numa_domain_masks_(init.num_queues_, | ||
140 | topology_.get_machine_affinity_mask()) | ||
141 | #endif | ||
142 | { | ||
143 | #if !defined(HPX_NATIVE_MIC) // we know that the MIC has one NUMA domain only | ||
144 | resize(steals_in_numa_domain_, init.num_queues_); | ||
145 | resize(steals_outside_numa_domain_, init.num_queues_); | ||
146 | #endif | ||
147 | if (!deferred_initialization) | ||
148 | { | ||
149 | BOOST_ASSERT(init.num_queues_ != 0); | ||
150 | for (std::size_t i = 0; i < init.num_queues_; ++i) | ||
151 | queues_[i] = new thread_queue_type(init.max_queue_thread_count_); | ||
152 | |||
153 | BOOST_ASSERT(init.num_high_priority_queues_ != 0); | ||
154 | BOOST_ASSERT(init.num_high_priority_queues_ <= init.num_queues_); | ||
155 | for (std::size_t i = 0; i < init.num_high_priority_queues_; ++i) { | ||
156 | high_priority_queues_[i] = | ||
157 | new thread_queue_type(init.max_queue_thread_count_); | ||
158 | } | ||
159 | } | ||
160 | } | ||
161 | |||
162 | virtual ~local_priority_queue_scheduler() | ||
163 | { | ||
164 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
165 | delete queues_[i]; | ||
166 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
167 | delete high_priority_queues_[i]; | ||
168 | } | ||
169 | |||
170 | bool numa_sensitive() const { return numa_sensitive_ != 0; } | ||
171 | |||
172 | static std::string get_scheduler_name() | ||
173 | { | ||
174 | return "local_priority_queue_scheduler"; | ||
175 | } | ||
176 | |||
177 | #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES | ||
178 | std::uint64_t get_creation_time(bool reset) | ||
179 | { | ||
180 | std::uint64_t time = 0; | ||
181 | |||
182 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
183 | time += high_priority_queues_[i]->get_creation_time(reset); | ||
184 | |||
185 | time += low_priority_queue_.get_creation_time(reset); | ||
186 | |||
187 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
188 | time += queues_[i]->get_creation_time(reset); | ||
189 | |||
190 | return time; | ||
191 | } | ||
192 | |||
193 | std::uint64_t get_cleanup_time(bool reset) | ||
194 | { | ||
195 | std::uint64_t time = 0; | ||
196 | |||
197 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
198 | time += high_priority_queues_[i]-> | ||
199 | get_cleanup_time(reset); | ||
200 | |||
201 | time += low_priority_queue_.get_cleanup_time(reset); | ||
202 | |||
203 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
204 | time += queues_[i]->get_cleanup_time(reset); | ||
205 | |||
206 | return time; | ||
207 | } | ||
208 | #endif | ||
209 | |||
210 | #ifdef HPX_HAVE_THREAD_STEALING_COUNTS | ||
211 | std::int64_t get_num_pending_misses(std::size_t num_thread, bool reset) | ||
212 | { | ||
213 | std::int64_t num_pending_misses = 0; | ||
214 | if (num_thread == std::size_t(-1)) | ||
215 | { | ||
216 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
217 | num_pending_misses += high_priority_queues_[i]-> | ||
218 | get_num_pending_misses(reset); | ||
219 | |||
220 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
221 | num_pending_misses += queues_[i]-> | ||
222 | get_num_pending_misses(reset); | ||
223 | |||
224 | num_pending_misses += low_priority_queue_. | ||
225 | get_num_pending_misses(reset); | ||
226 | |||
227 | return num_pending_misses; | ||
228 | } | ||
229 | |||
230 | num_pending_misses += queues_[num_thread]-> | ||
231 | get_num_pending_misses(reset); | ||
232 | |||
233 | if (num_thread < high_priority_queues_.size()) | ||
234 | { | ||
235 | num_pending_misses += high_priority_queues_[num_thread]-> | ||
236 | get_num_pending_misses(reset); | ||
237 | } | ||
238 | if (num_thread == 0) | ||
239 | { | ||
240 | num_pending_misses += low_priority_queue_. | ||
241 | get_num_pending_misses(reset); | ||
242 | } | ||
243 | return num_pending_misses; | ||
244 | } | ||
245 | |||
246 | std::int64_t get_num_pending_accesses(std::size_t num_thread, bool reset) | ||
247 | { | ||
248 | std::int64_t num_pending_accesses = 0; | ||
249 | if (num_thread == std::size_t(-1)) | ||
250 | { | ||
251 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
252 | num_pending_accesses += high_priority_queues_[i]-> | ||
253 | get_num_pending_accesses(reset); | ||
254 | |||
255 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
256 | num_pending_accesses += queues_[i]-> | ||
257 | get_num_pending_accesses(reset); | ||
258 | |||
259 | num_pending_accesses += low_priority_queue_. | ||
260 | get_num_pending_accesses(reset); | ||
261 | |||
262 | return num_pending_accesses; | ||
263 | } | ||
264 | |||
265 | num_pending_accesses += queues_[num_thread]-> | ||
266 | get_num_pending_accesses(reset); | ||
267 | |||
268 | if (num_thread < high_priority_queues_.size()) | ||
269 | { | ||
270 | num_pending_accesses += high_priority_queues_[num_thread]-> | ||
271 | get_num_pending_accesses(reset); | ||
272 | } | ||
273 | if (num_thread == 0) | ||
274 | { | ||
275 | num_pending_accesses += low_priority_queue_. | ||
276 | get_num_pending_accesses(reset); | ||
277 | } | ||
278 | return num_pending_accesses; | ||
279 | } | ||
280 | |||
281 | std::int64_t get_num_stolen_from_pending(std::size_t num_thread, bool reset) | ||
282 | { | ||
283 | std::int64_t num_stolen_threads = 0; | ||
284 | if (num_thread == std::size_t(-1)) | ||
285 | { | ||
286 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
287 | num_stolen_threads += high_priority_queues_[i]-> | ||
288 | get_num_stolen_from_pending(reset); | ||
289 | |||
290 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
291 | num_stolen_threads += queues_[i]-> | ||
292 | get_num_stolen_from_pending(reset); | ||
293 | |||
294 | num_stolen_threads += low_priority_queue_. | ||
295 | get_num_stolen_from_pending(reset); | ||
296 | |||
297 | return num_stolen_threads; | ||
298 | } | ||
299 | |||
300 | num_stolen_threads += queues_[num_thread]-> | ||
301 | get_num_stolen_from_pending(reset); | ||
302 | |||
303 | if (num_thread < high_priority_queues_.size()) | ||
304 | { | ||
305 | num_stolen_threads += high_priority_queues_[num_thread]-> | ||
306 | get_num_stolen_from_pending(reset); | ||
307 | } | ||
308 | if (num_thread == 0) | ||
309 | { | ||
310 | num_stolen_threads += low_priority_queue_. | ||
311 | get_num_stolen_from_pending(reset); | ||
312 | } | ||
313 | return num_stolen_threads; | ||
314 | } | ||
315 | |||
316 | std::int64_t get_num_stolen_to_pending(std::size_t num_thread, bool reset) | ||
317 | { | ||
318 | std::int64_t num_stolen_threads = 0; | ||
319 | if (num_thread == std::size_t(-1)) | ||
320 | { | ||
321 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
322 | num_stolen_threads += high_priority_queues_[i]-> | ||
323 | get_num_stolen_to_pending(reset); | ||
324 | |||
325 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
326 | num_stolen_threads += queues_[i]-> | ||
327 | get_num_stolen_to_pending(reset); | ||
328 | |||
329 | num_stolen_threads += low_priority_queue_. | ||
330 | get_num_stolen_to_pending(reset); | ||
331 | |||
332 | return num_stolen_threads; | ||
333 | } | ||
334 | |||
335 | num_stolen_threads += queues_[num_thread]-> | ||
336 | get_num_stolen_to_pending(reset); | ||
337 | |||
338 | if (num_thread < high_priority_queues_.size()) | ||
339 | { | ||
340 | num_stolen_threads += high_priority_queues_[num_thread]-> | ||
341 | get_num_stolen_to_pending(reset); | ||
342 | } | ||
343 | if (num_thread == 0) | ||
344 | { | ||
345 | num_stolen_threads += low_priority_queue_. | ||
346 | get_num_stolen_to_pending(reset); | ||
347 | } | ||
348 | return num_stolen_threads; | ||
349 | } | ||
350 | |||
351 | std::int64_t get_num_stolen_from_staged(std::size_t num_thread, bool reset) | ||
352 | { | ||
353 | std::int64_t num_stolen_threads = 0; | ||
354 | if (num_thread == std::size_t(-1)) | ||
355 | { | ||
356 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
357 | num_stolen_threads += high_priority_queues_[i]-> | ||
358 | get_num_stolen_from_staged(reset); | ||
359 | |||
360 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
361 | num_stolen_threads += queues_[i]-> | ||
362 | get_num_stolen_from_staged(reset); | ||
363 | |||
364 | num_stolen_threads += low_priority_queue_. | ||
365 | get_num_stolen_from_staged(reset); | ||
366 | |||
367 | return num_stolen_threads; | ||
368 | } | ||
369 | |||
370 | num_stolen_threads += queues_[num_thread]-> | ||
371 | get_num_stolen_from_staged(reset); | ||
372 | |||
373 | if (num_thread < high_priority_queues_.size()) | ||
374 | { | ||
375 | num_stolen_threads += high_priority_queues_[num_thread]-> | ||
376 | get_num_stolen_from_staged(reset); | ||
377 | } | ||
378 | if (num_thread == 0) | ||
379 | { | ||
380 | num_stolen_threads += low_priority_queue_. | ||
381 | get_num_stolen_from_staged(reset); | ||
382 | } | ||
383 | return num_stolen_threads; | ||
384 | } | ||
385 | |||
386 | std::int64_t get_num_stolen_to_staged(std::size_t num_thread, bool reset) | ||
387 | { | ||
388 | std::int64_t num_stolen_threads = 0; | ||
389 | if (num_thread == std::size_t(-1)) | ||
390 | { | ||
391 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
392 | num_stolen_threads += high_priority_queues_[i]-> | ||
393 | get_num_stolen_to_staged(reset); | ||
394 | |||
395 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
396 | num_stolen_threads += queues_[i]-> | ||
397 | get_num_stolen_to_staged(reset); | ||
398 | |||
399 | num_stolen_threads += low_priority_queue_. | ||
400 | get_num_stolen_to_staged(reset); | ||
401 | |||
402 | return num_stolen_threads; | ||
403 | } | ||
404 | |||
405 | num_stolen_threads += queues_[num_thread]-> | ||
406 | get_num_stolen_to_staged(reset); | ||
407 | |||
408 | if (num_thread < high_priority_queues_.size()) | ||
409 | { | ||
410 | num_stolen_threads += high_priority_queues_[num_thread]-> | ||
411 | get_num_stolen_to_staged(reset); | ||
412 | } | ||
413 | if (num_thread == 0) | ||
414 | { | ||
415 | num_stolen_threads += low_priority_queue_. | ||
416 | get_num_stolen_to_staged(reset); | ||
417 | } | ||
418 | return num_stolen_threads; | ||
419 | } | ||
420 | #endif | ||
421 | |||
422 | /////////////////////////////////////////////////////////////////////// | ||
423 | void abort_all_suspended_threads() | ||
424 | { | ||
425 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
426 | queues_[i]->abort_all_suspended_threads(); | ||
427 | |||
428 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
429 | high_priority_queues_[i]->abort_all_suspended_threads(); | ||
430 | |||
431 | low_priority_queue_.abort_all_suspended_threads(); | ||
432 | } | ||
433 | |||
434 | /////////////////////////////////////////////////////////////////////// | ||
435 | bool cleanup_terminated(bool delete_all = false) | ||
436 | { | ||
437 | bool empty = true; | ||
438 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
439 | empty = queues_[i]->cleanup_terminated(delete_all) && empty; | ||
440 | if (!delete_all) | ||
441 | return empty; | ||
442 | |||
443 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
444 | empty = high_priority_queues_[i]-> | ||
445 | cleanup_terminated(delete_all) && empty; | ||
446 | |||
447 | empty = low_priority_queue_.cleanup_terminated(delete_all) && empty; | ||
448 | return empty; | ||
449 | } | ||
450 | |||
451 | /////////////////////////////////////////////////////////////////////// | ||
452 | // create a new thread and schedule it if the initial state is equal to | ||
453 | // pending | ||
454 | void create_thread(thread_init_data& data, thread_id_type* id, | ||
455 | thread_state_enum initial_state, bool run_now, error_code& ec, | ||
456 | std::size_t num_thread) | ||
457 | { | ||
458 | #ifdef HPX_HAVE_THREAD_TARGET_ADDRESS | ||
459 | // // try to figure out the NUMA node where the data lives | ||
460 | // if (numa_sensitive_ && std::size_t(-1) == num_thread) { | ||
461 | // mask_cref_type mask = | ||
462 | // topology_.get_thread_affinity_mask_from_lva(data.lva); | ||
463 | // if (any(mask)) { | ||
464 | // num_thread = find_first(mask); | ||
465 | // } | ||
466 | // } | ||
467 | #endif | ||
468 | std::size_t queue_size = queues_.size(); | ||
469 | |||
470 | if (std::size_t(-1) == num_thread) | ||
471 | num_thread = curr_queue_++ % queue_size; | ||
472 | |||
473 | if (num_thread >= queue_size) | ||
474 | num_thread %= queue_size; | ||
475 | |||
476 | // now create the thread | ||
477 | if (data.priority == thread_priority_critical) { | ||
478 | std::size_t num = num_thread % high_priority_queues_.size(); | ||
479 | high_priority_queues_[num]->create_thread(data, id, | ||
480 | initial_state, run_now, ec); | ||
481 | return; | ||
482 | } | ||
483 | |||
484 | if (data.priority == thread_priority_boost) { | ||
485 | data.priority = thread_priority_normal; | ||
486 | std::size_t num = num_thread % high_priority_queues_.size(); | ||
487 | high_priority_queues_[num]->create_thread(data, id, | ||
488 | initial_state, run_now, ec); | ||
489 | return; | ||
490 | } | ||
491 | |||
492 | if (data.priority == thread_priority_low) { | ||
493 | low_priority_queue_.create_thread(data, id, initial_state, | ||
494 | run_now, ec); | ||
495 | return; | ||
496 | } | ||
497 | |||
498 | HPX_ASSERT(num_thread < queue_size); | ||
499 | queues_[num_thread]->create_thread(data, id, initial_state, | ||
500 | run_now, ec); | ||
501 | } | ||
502 | |||
503 | /// Return the next thread to be executed, return false if none is | ||
504 | /// available | ||
505 | virtual bool get_next_thread(std::size_t num_thread, | ||
506 | std::int64_t& idle_loop_count, threads::thread_data*& thrd) | ||
507 | { | ||
508 | std::size_t queues_size = queues_.size(); | ||
509 | std::size_t high_priority_queues = high_priority_queues_.size(); | ||
510 | |||
511 | HPX_ASSERT(num_thread < queues_size); | ||
512 | thread_queue_type* this_high_priority_queue = nullptr; | ||
513 | thread_queue_type* this_queue = queues_[num_thread]; | ||
514 | |||
515 | if (num_thread < high_priority_queues) | ||
516 | { | ||
517 | this_high_priority_queue = high_priority_queues_[num_thread]; | ||
518 | bool result = this_high_priority_queue->get_next_thread(thrd); | ||
519 | |||
520 | this_high_priority_queue->increment_num_pending_accesses(); | ||
521 | if (result) | ||
522 | return true; | ||
523 | this_high_priority_queue->increment_num_pending_misses(); | ||
524 | } | ||
525 | |||
526 | { | ||
527 | bool result = this_queue->get_next_thread(thrd); | ||
528 | |||
529 | this_queue->increment_num_pending_accesses(); | ||
530 | if (result) | ||
531 | return true; | ||
532 | this_queue->increment_num_pending_misses(); | ||
533 | |||
534 | bool have_staged = this_queue-> | ||
535 | get_staged_queue_length(boost::memory_order_relaxed) != 0; | ||
536 | |||
537 | // Give up, we should have work to convert. | ||
538 | if (have_staged) | ||
539 | return false; | ||
540 | } | ||
541 | |||
542 | if (numa_sensitive_ != 0) // limited or no stealing across domains | ||
543 | { | ||
544 | |||
545 | // steal thread from other queue of same NUMA domain | ||
546 | std::size_t pu_number = get_pu_num(num_thread); | ||
547 | #if !defined(HPX_NATIVE_MIC) // we know that the MIC has one NUMA domain only | ||
548 | if (test(steals_in_numa_domain_, pu_number)) //-V600 //-V111 | ||
549 | #endif | ||
550 | { | ||
551 | mask_cref_type this_numa_domain = | ||
552 | numa_domain_masks_[num_thread]; | ||
553 | for (std::size_t i = 1; i != queues_size; ++i) | ||
554 | { | ||
555 | // FIXME: Do a better job here. | ||
556 | std::size_t const idx = (i + num_thread) % queues_size; | ||
557 | |||
558 | HPX_ASSERT(idx != num_thread); | ||
559 | |||
560 | std::size_t pu_num = get_pu_num(idx); | ||
561 | if (!test(this_numa_domain, pu_num)) //-V560 //-V600 //-V111 | ||
562 | continue; | ||
563 | |||
564 | if (idx < high_priority_queues && | ||
565 | num_thread < high_priority_queues) | ||
566 | { | ||
567 | thread_queue_type* q = high_priority_queues_[idx]; | ||
568 | if (q->get_next_thread(thrd)) | ||
569 | { | ||
570 | q->increment_num_stolen_from_pending(); | ||
571 | this_high_priority_queue-> | ||
572 | increment_num_stolen_to_pending(); | ||
573 | return true; | ||
574 | } | ||
575 | } | ||
576 | |||
577 | if (queues_[idx]->get_next_thread(thrd)) | ||
578 | { | ||
579 | queues_[idx]->increment_num_stolen_from_pending(); | ||
580 | this_queue->increment_num_stolen_to_pending(); | ||
581 | return true; | ||
582 | } | ||
583 | } | ||
584 | } | ||
585 | |||
586 | #if !defined(HPX_NATIVE_MIC) // we know that the MIC has one NUMA domain only | ||
587 | // if nothing found, ask everybody else | ||
588 | if (test(steals_outside_numa_domain_, pu_number)) //-V600 //-V111 | ||
589 | { | ||
590 | mask_cref_type numa_domain = | ||
591 | outside_numa_domain_masks_[num_thread]; | ||
592 | for (std::size_t i = 1; i != queues_size; ++i) | ||
593 | { | ||
594 | // FIXME: Do a better job here. | ||
595 | std::size_t const idx = (i + num_thread) % queues_size; | ||
596 | |||
597 | HPX_ASSERT(idx != num_thread); | ||
598 | |||
599 | std::size_t pu_num = get_pu_num(idx); | ||
600 | if (!test(numa_domain, pu_num)) //-V560 //-V600 //-V111 | ||
601 | continue; | ||
602 | |||
603 | if (idx < high_priority_queues && | ||
604 | num_thread < high_priority_queues) | ||
605 | { | ||
606 | thread_queue_type* q = high_priority_queues_[idx]; | ||
607 | if (q->get_next_thread(thrd)) | ||
608 | { | ||
609 | q->increment_num_stolen_from_pending(); | ||
610 | this_high_priority_queue-> | ||
611 | increment_num_stolen_to_pending(); | ||
612 | return true; | ||
613 | } | ||
614 | } | ||
615 | |||
616 | if (queues_[idx]->get_next_thread(thrd)) | ||
617 | { | ||
618 | queues_[idx]->increment_num_stolen_from_pending(); | ||
619 | this_queue->increment_num_stolen_to_pending(); | ||
620 | return true; | ||
621 | } | ||
622 | } | ||
623 | } | ||
624 | #endif | ||
625 | } | ||
626 | |||
627 | else // not NUMA-sensitive | ||
628 | { | ||
629 | for (std::size_t i = 1; i != queues_size; ++i) | ||
630 | { | ||
631 | // FIXME: Do a better job here. | ||
632 | std::size_t const idx = (i + num_thread) % queues_size; | ||
633 | |||
634 | HPX_ASSERT(idx != num_thread); | ||
635 | |||
636 | if (idx < high_priority_queues && | ||
637 | num_thread < high_priority_queues) | ||
638 | { | ||
639 ![]() | 0.0% | thread_queue_type* q = high_priority_queues_[idx]; | |
![]() | |||
640 | if (q->get_next_thread(thrd)) | ||
641 | { | ||
642 | q->increment_num_stolen_from_pending(); | ||
643 | this_high_priority_queue-> | ||
644 | increment_num_stolen_to_pending(); | ||
645 | return true; | ||
646 | } | ||
647 | } | ||
648 | |||
649 | if (queues_[idx]->get_next_thread(thrd)) | ||
650 | { | ||
651 | queues_[idx]->increment_num_stolen_from_pending(); | ||
652 | this_queue->increment_num_stolen_to_pending(); | ||
653 | return true; | ||
654 | } | ||
655 | } | ||
656 | } | ||
657 | |||
658 | return low_priority_queue_.get_next_thread(thrd); | ||
659 | } | ||
660 | |||
661 | /// Schedule the passed thread | ||
662 | void schedule_thread(threads::thread_data* thrd, | ||
663 | std::size_t num_thread, | ||
664 | thread_priority priority = thread_priority_normal) | ||
665 | { | ||
666 | if (std::size_t(-1) == num_thread) | ||
667 | num_thread = curr_queue_++ % queues_.size(); | ||
668 | |||
669 | if (priority == thread_priority_critical || | ||
670 | priority == thread_priority_boost) | ||
671 | { | ||
672 | std::size_t num = num_thread % high_priority_queues_.size(); | ||
673 | high_priority_queues_[num]->schedule_thread(thrd); | ||
674 | } | ||
675 | else if (priority == thread_priority_low) { | ||
676 | low_priority_queue_.schedule_thread(thrd); | ||
677 | } | ||
678 | else { | ||
679 | HPX_ASSERT(num_thread < queues_.size()); | ||
680 | queues_[num_thread]->schedule_thread(thrd); | ||
681 | } | ||
682 | } | ||
683 | |||
684 | void schedule_thread_last(threads::thread_data* thrd, | ||
685 | std::size_t num_thread, | ||
686 | thread_priority priority = thread_priority_normal) | ||
687 | { | ||
688 | if (std::size_t(-1) == num_thread) | ||
689 | num_thread = curr_queue_++ % queues_.size(); | ||
690 | |||
691 | if (priority == thread_priority_critical || | ||
692 | priority == thread_priority_boost) | ||
693 | { | ||
694 | std::size_t num = num_thread % high_priority_queues_.size(); | ||
695 | high_priority_queues_[num]->schedule_thread(thrd, true); | ||
696 | } | ||
697 | else if (priority == thread_priority_low) { | ||
698 | low_priority_queue_.schedule_thread(thrd, true); | ||
699 | } | ||
700 | else { | ||
701 | HPX_ASSERT(num_thread < queues_.size()); | ||
702 | queues_[num_thread]->schedule_thread(thrd, true); | ||
703 | } | ||
704 | } | ||
705 | |||
706 | /// Destroy the passed thread as it has been terminated | ||
707 | bool destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count) | ||
708 | { | ||
709 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
710 | { | ||
711 | if (high_priority_queues_[i]->destroy_thread(thrd, busy_count)) | ||
712 | return true; | ||
713 | } | ||
714 | |||
715 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
716 | { | ||
717 | if (queues_[i]->destroy_thread(thrd, busy_count)) | ||
718 | return true; | ||
719 | } | ||
720 | |||
721 | if (low_priority_queue_.destroy_thread(thrd, busy_count)) | ||
722 | return true; | ||
723 | |||
724 | // the thread has to belong to one of the queues, always | ||
725 | HPX_ASSERT(false); | ||
726 | |||
727 | return false; | ||
728 | } | ||
729 | |||
730 | /////////////////////////////////////////////////////////////////////// | ||
731 | // This returns the current length of the queues (work items and new items) | ||
732 | std::int64_t get_queue_length(std::size_t num_thread = std::size_t(-1)) const | ||
733 | { | ||
734 | // Return queue length of one specific queue. | ||
735 | std::int64_t count = 0; | ||
736 | if (std::size_t(-1) != num_thread) { | ||
737 | HPX_ASSERT(num_thread < queues_.size()); | ||
738 | |||
739 | if (num_thread < high_priority_queues_.size()) | ||
740 | count = high_priority_queues_[num_thread]->get_queue_length(); | ||
741 | |||
742 | if (num_thread == queues_.size()-1) | ||
743 | count += low_priority_queue_.get_queue_length(); | ||
744 | |||
745 | return count + queues_[num_thread]->get_queue_length(); | ||
746 | } | ||
747 | |||
748 | // Cumulative queue lengths of all queues. | ||
749 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
750 | count += high_priority_queues_[i]->get_queue_length(); | ||
751 | |||
752 | count += low_priority_queue_.get_queue_length(); | ||
753 | |||
754 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
755 | count += queues_[i]->get_queue_length(); | ||
756 | |||
757 | return count; | ||
758 | } | ||
759 | |||
760 | /////////////////////////////////////////////////////////////////////// | ||
761 | // Queries the current thread count of the queues. | ||
762 | std::int64_t get_thread_count(thread_state_enum state = unknown, | ||
763 | thread_priority priority = thread_priority_default, | ||
764 | std::size_t num_thread = std::size_t(-1), bool reset = false) const | ||
765 | { | ||
766 | // Return thread count of one specific queue. | ||
767 | std::int64_t count = 0; | ||
768 | if (std::size_t(-1) != num_thread) | ||
769 | { | ||
770 | HPX_ASSERT(num_thread < queues_.size()); | ||
771 | |||
772 | switch (priority) { | ||
773 | case thread_priority_default: | ||
774 | { | ||
775 | if (num_thread < high_priority_queues_.size()) | ||
776 | count = high_priority_queues_[num_thread]-> | ||
777 | get_thread_count(state); | ||
778 | |||
779 | if (queues_.size()-1 == num_thread) | ||
780 | count += low_priority_queue_.get_thread_count(state); | ||
781 | |||
782 | return count + queues_[num_thread]->get_thread_count(state); | ||
783 | } | ||
784 | |||
785 | case thread_priority_low: | ||
786 | { | ||
787 | if (queues_.size()-1 == num_thread) | ||
788 | return low_priority_queue_.get_thread_count(state); | ||
789 | break; | ||
790 | } | ||
791 | |||
792 | case thread_priority_normal: | ||
793 | return queues_[num_thread]->get_thread_count(state); | ||
794 | |||
795 | case thread_priority_boost: | ||
796 | case thread_priority_critical: | ||
797 | { | ||
798 | if (num_thread < high_priority_queues_.size()) | ||
799 | return high_priority_queues_[num_thread]-> | ||
800 | get_thread_count(state); | ||
801 | break; | ||
802 | } | ||
803 | |||
804 | default: | ||
805 | case thread_priority_unknown: | ||
806 | { | ||
807 | HPX_THROW_EXCEPTION(bad_parameter, | ||
808 | "local_priority_queue_scheduler::get_thread_count", | ||
809 | "unknown thread priority value (thread_priority_unknown)"); | ||
810 | return 0; | ||
811 | } | ||
812 | } | ||
813 | return 0; | ||
814 | } | ||
815 | |||
816 | // Return the cumulative count for all queues. | ||
817 | switch (priority) { | ||
818 | case thread_priority_default: | ||
819 | { | ||
820 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
821 | count += high_priority_queues_[i]->get_thread_count(state); | ||
822 | |||
823 | count += low_priority_queue_.get_thread_count(state); | ||
824 | |||
825 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
826 | count += queues_[i]->get_thread_count(state); | ||
827 | |||
828 | break; | ||
829 | } | ||
830 | |||
831 | case thread_priority_low: | ||
832 | return low_priority_queue_.get_thread_count(state); | ||
833 | |||
834 | case thread_priority_normal: | ||
835 | { | ||
836 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
837 | count += queues_[i]->get_thread_count(state); | ||
838 | break; | ||
839 | } | ||
840 | |||
841 | case thread_priority_boost: | ||
842 | case thread_priority_critical: | ||
843 | { | ||
844 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
845 | count += high_priority_queues_[i]->get_thread_count(state); | ||
846 | break; | ||
847 | } | ||
848 | |||
849 | default: | ||
850 | case thread_priority_unknown: | ||
851 | { | ||
852 | HPX_THROW_EXCEPTION(bad_parameter, | ||
853 | "local_priority_queue_scheduler::get_thread_count", | ||
854 | "unknown thread priority value (thread_priority_unknown)"); | ||
855 | return 0; | ||
856 | } | ||
857 | } | ||
858 | return count; | ||
859 | } | ||
860 | |||
861 | /////////////////////////////////////////////////////////////////////// | ||
862 | // Enumerate matching threads from all queues | ||
863 | bool enumerate_threads( | ||
864 | util::function_nonser<bool(thread_id_type)> const& f, | ||
865 | thread_state_enum state = unknown) const | ||
866 | { | ||
867 | bool result = true; | ||
868 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
869 | { | ||
870 | result = result && | ||
871 | high_priority_queues_[i]->enumerate_threads(f, state); | ||
872 | } | ||
873 | |||
874 | result = result && low_priority_queue_.enumerate_threads(f, state); | ||
875 | |||
876 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
877 | { | ||
878 | result = result && queues_[i]->enumerate_threads(f, state); | ||
879 | } | ||
880 | return result; | ||
881 | } | ||
882 | |||
883 | #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME | ||
884 | /////////////////////////////////////////////////////////////////////// | ||
885 | // Queries the current average thread wait time of the queues. | ||
886 | std::int64_t get_average_thread_wait_time( | ||
887 | std::size_t num_thread = std::size_t(-1)) const | ||
888 | { | ||
889 | // Return average thread wait time of one specific queue. | ||
890 | std::uint64_t wait_time = 0; | ||
891 | std::uint64_t count = 0; | ||
892 | if (std::size_t(-1) != num_thread) | ||
893 | { | ||
894 | HPX_ASSERT(num_thread < queues_.size()); | ||
895 | |||
896 | if (num_thread < high_priority_queues_.size()) | ||
897 | { | ||
898 | wait_time = high_priority_queues_[num_thread]-> | ||
899 | get_average_thread_wait_time(); | ||
900 | ++count; | ||
901 | } | ||
902 | |||
903 | if (queues_.size()-1 == num_thread) | ||
904 | { | ||
905 | wait_time += low_priority_queue_. | ||
906 | get_average_thread_wait_time(); | ||
907 | ++count; | ||
908 | } | ||
909 | |||
910 | wait_time += queues_[num_thread]->get_average_thread_wait_time(); | ||
911 | return wait_time / (count + 1); | ||
912 | } | ||
913 | |||
914 | // Return the cumulative average thread wait time for all queues. | ||
915 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
916 | { | ||
917 | wait_time += high_priority_queues_[i]->get_average_thread_wait_time(); | ||
918 | ++count; | ||
919 | } | ||
920 | |||
921 | wait_time += low_priority_queue_.get_average_thread_wait_time(); | ||
922 | |||
923 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
924 | { | ||
925 | wait_time += queues_[i]->get_average_thread_wait_time(); | ||
926 | ++count; | ||
927 | } | ||
928 | |||
929 | return wait_time / (count + 1); | ||
930 | } | ||
931 | |||
932 | /////////////////////////////////////////////////////////////////////// | ||
933 | // Queries the current average task wait time of the queues. | ||
934 | std::int64_t get_average_task_wait_time( | ||
935 | std::size_t num_thread = std::size_t(-1)) const | ||
936 | { | ||
937 | // Return average task wait time of one specific queue. | ||
938 | std::uint64_t wait_time = 0; | ||
939 | std::uint64_t count = 0; | ||
940 | if (std::size_t(-1) != num_thread) | ||
941 | { | ||
942 | HPX_ASSERT(num_thread < queues_.size()); | ||
943 | |||
944 | if (num_thread < high_priority_queues_.size()) | ||
945 | { | ||
946 | wait_time = high_priority_queues_[num_thread]-> | ||
947 | get_average_task_wait_time(); | ||
948 | ++count; | ||
949 | } | ||
950 | |||
951 | if (queues_.size()-1 == num_thread) | ||
952 | { | ||
953 | wait_time += low_priority_queue_. | ||
954 | get_average_task_wait_time(); | ||
955 | ++count; | ||
956 | } | ||
957 | |||
958 | wait_time += queues_[num_thread]->get_average_task_wait_time(); | ||
959 | return wait_time / (count + 1); | ||
960 | } | ||
961 | |||
962 | // Return the cumulative average task wait time for all queues. | ||
963 | for (std::size_t i = 0; i != high_priority_queues_.size(); ++i) | ||
964 | { | ||
965 | wait_time += high_priority_queues_[i]-> | ||
966 | get_average_task_wait_time(); | ||
967 | ++count; | ||
968 | } | ||
969 | |||
970 | wait_time += low_priority_queue_.get_average_task_wait_time(); | ||
971 | |||
972 | for (std::size_t i = 0; i != queues_.size(); ++i) | ||
973 | { | ||
974 | wait_time += queues_[i]->get_average_task_wait_time(); | ||
975 | ++count; | ||
976 | } | ||
977 | |||
978 | return wait_time / (count + 1); | ||
979 | } | ||
980 | #endif | ||
981 | |||
982 | /// This is a function which gets called periodically by the thread | ||
983 | /// manager to allow for maintenance tasks to be executed in the | ||
984 | /// scheduler. Returns true if the OS thread calling this function | ||
985 | /// has to be terminated (i.e. no more work has to be done). | ||
986 | virtual bool wait_or_add_new(std::size_t num_thread, bool running, | ||
987 | std::int64_t& idle_loop_count) | ||
988 | { | ||
989 | std::size_t queues_size = queues_.size(); | ||
990 | HPX_ASSERT(num_thread < queues_.size()); | ||
991 | |||
992 | std::size_t added = 0; | ||
993 | bool result = true; | ||
994 | |||
995 | std::size_t high_priority_queues = high_priority_queues_.size(); | ||
996 | thread_queue_type* this_high_priority_queue = nullptr; | ||
997 | thread_queue_type* this_queue = queues_[num_thread]; | ||
998 | |||
999 | if (num_thread < high_priority_queues) | ||
1000 | { | ||
1001 | this_high_priority_queue = high_priority_queues_[num_thread]; | ||
1002 | result = this_high_priority_queue->wait_or_add_new(running, | ||
1003 | idle_loop_count, added) | ||
1004 | && result; | ||
1005 | if (0 != added) return result; | ||
1006 | } | ||
1007 | |||
1008 | result = this_queue->wait_or_add_new( | ||
1009 | running, idle_loop_count, added) && result; | ||
1010 | if (0 != added) return result; | ||
1011 | |||
1012 | if (numa_sensitive_ != 0) // limited or no cross domain stealing | ||
1013 | { | ||
1014 | // steal work items: first try to steal from other cores in | ||
1015 | // the same NUMA node | ||
1016 | |||
1017 | std::size_t pu_number = get_pu_num(num_thread); | ||
1018 | #if !defined(HPX_NATIVE_MIC) // we know that the MIC has one NUMA domain only | ||
1019 | if (test(steals_in_numa_domain_, pu_number)) //-V600 //-V111 | ||
1020 | #endif | ||
1021 | { | ||
1022 | mask_cref_type numa_domain_mask = | ||
1023 | numa_domain_masks_[num_thread]; | ||
1024 | |||
1025 | for (std::size_t i = 1; i != queues_size; ++i) | ||
1026 | { | ||
1027 | // FIXME: Do a better job here. | ||
1028 | std::size_t const idx = (i + num_thread) % queues_size; | ||
1029 | |||
1030 | HPX_ASSERT(idx != num_thread); | ||
1031 | |||
1032 | std::size_t pu_num = get_pu_num(idx); | ||
1033 | if (!test(numa_domain_mask, pu_num)) //-V600 | ||
1034 | continue; | ||
1035 | |||
1036 | if (idx < high_priority_queues && | ||
1037 | num_thread < high_priority_queues) | ||
1038 | { | ||
1039 | thread_queue_type* q = high_priority_queues_[idx]; | ||
1040 | result = this_high_priority_queue-> | ||
1041 | wait_or_add_new(running, idle_loop_count, | ||
1042 | added, q) | ||
1043 | && result; | ||
1044 | |||
1045 | if (0 != added) | ||
1046 | { | ||
1047 | q->increment_num_stolen_from_staged(added); | ||
1048 | this_high_priority_queue-> | ||
1049 | increment_num_stolen_to_staged(added); | ||
1050 | return result; | ||
1051 | } | ||
1052 | } | ||
1053 | |||
1054 | result = this_queue->wait_or_add_new(running, | ||
1055 | idle_loop_count, added, queues_[idx]) && result; | ||
1056 | if (0 != added) | ||
1057 | { | ||
1058 | queues_[idx]->increment_num_stolen_from_staged(added); | ||
1059 | this_queue->increment_num_stolen_to_staged(added); | ||
1060 | return result; | ||
1061 | } | ||
1062 | } | ||
1063 | } | ||
1064 | |||
1065 | #if !defined(HPX_NATIVE_MIC) // we know that the MIC has one NUMA domain only | ||
1066 | // if nothing found, ask everybody else | ||
1067 | if (test(steals_outside_numa_domain_, pu_number)) //-V600 //-V111 | ||
1068 | { | ||
1069 | mask_cref_type numa_domain_mask = | ||
1070 | outside_numa_domain_masks_[num_thread]; | ||
1071 | for (std::size_t i = 1; i != queues_size; ++i) | ||
1072 | { | ||
1073 | // FIXME: Do a better job here. | ||
1074 | std::size_t const idx = (i + num_thread) % queues_size; | ||
1075 | |||
1076 | HPX_ASSERT(idx != num_thread); | ||
1077 | |||
1078 | std::size_t pu_num = get_pu_num(idx); | ||
1079 | if (!test(numa_domain_mask, pu_num)) //-V600 | ||
1080 | continue; | ||
1081 | |||
1082 | if (idx < high_priority_queues && | ||
1083 | num_thread < high_priority_queues) | ||
1084 | { | ||
1085 | thread_queue_type* q = high_priority_queues_[idx]; | ||
1086 | result = this_high_priority_queue-> | ||
1087 | wait_or_add_new(running, idle_loop_count, | ||
1088 | added, q) | ||
1089 | && result; | ||
1090 | if (0 != added) | ||
1091 | { | ||
1092 | q->increment_num_stolen_from_staged(added); | ||
1093 | this_high_priority_queue-> | ||
1094 | increment_num_stolen_to_staged(added); | ||
1095 | return result; | ||
1096 | } | ||
1097 | } | ||
1098 | |||
1099 | result = this_queue->wait_or_add_new(running, | ||
1100 | idle_loop_count, added, queues_[idx]) && result; | ||
1101 | if (0 != added) | ||
1102 | { | ||
1103 | queues_[idx]->increment_num_stolen_from_staged(added); | ||
1104 | this_queue->increment_num_stolen_to_staged(added); | ||
1105 | return result; | ||
1106 | } | ||
1107 | } | ||
1108 | } | ||
1109 | #endif | ||
1110 | } | ||
1111 | |||
1112 | else // not NUMA-sensitive | ||
1113 | { | ||
1114 | for (std::size_t i = 1; i != queues_size; ++i) | ||
1115 | { | ||
1116 | // FIXME: Do a better job here. | ||
1117 | std::size_t const idx = (i + num_thread) % queues_size; | ||
1118 | |||
1119 | HPX_ASSERT(idx != num_thread); | ||
1120 | |||
1121 | if (idx < high_priority_queues && | ||
1122 | num_thread < high_priority_queues) | ||
1123 | { | ||
1124 ![]() | 0.7% | thread_queue_type* q = high_priority_queues_[idx]; | |
![]() ![]() ![]() | |||
1125 | result = this_high_priority_queue-> | ||
1126 | wait_or_add_new(running, idle_loop_count, added, q) | ||
1127 | && result; | ||
1128 | if (0 != added) | ||
1129 | { | ||
1130 | q->increment_num_stolen_from_staged(added); | ||
1131 | this_high_priority_queue-> | ||
1132 | increment_num_stolen_to_staged(added); | ||
1133 | return result; | ||
1134 | } | ||
1135 | } | ||
1136 | |||
1137 | result = this_queue->wait_or_add_new(running, | ||
1138 ![]() | 0.2% | idle_loop_count, added, queues_[idx]) && result; | |
![]() | |||
1139 | if (0 != added) | ||
1140 | { | ||
1141 | queues_[idx]->increment_num_stolen_from_staged(added); | ||
1142 | this_queue->increment_num_stolen_to_staged(added); | ||
1143 | return result; | ||
1144 | } | ||
1145 | } | ||
1146 | } | ||
1147 | |||
1148 | #ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION | ||
1149 | // no new work is available, are we deadlocked? | ||
1150 | if (HPX_UNLIKELY(minimal_deadlock_detection && LHPX_ENABLED(error))) | ||
1151 | { | ||
1152 | bool suspended_only = true; | ||
1153 | |||
1154 | for (std::size_t i = 0; suspended_only && i != queues_.size(); ++i) { | ||
1155 | suspended_only = queues_[i]->dump_suspended_threads( | ||
1156 | i, idle_loop_count, running); | ||
1157 | } | ||
1158 | |||
1159 | if (HPX_UNLIKELY(suspended_only)) { | ||
1160 | if (running) { | ||
1161 | LTM_(error) //-V128 | ||
1162 | << "queue(" << num_thread << "): " | ||
1163 | << "no new work available, are we deadlocked?"; | ||
1164 | } | ||
1165 | else { | ||
1166 | LHPX_CONSOLE_(hpx::util::logging::level::error) //-V128 | ||
1167 | << " [TM] " //-V128 | ||
1168 | << "queue(" << num_thread << "): " | ||
1169 | << "no new work available, are we deadlocked?\n"; | ||
1170 | } | ||
1171 | } | ||
1172 | } | ||
1173 | #endif | ||
1174 | |||
1175 | result = low_priority_queue_.wait_or_add_new(running, | ||
1176 | idle_loop_count, added) && result; | ||
1177 | if (0 != added) return result; | ||
1178 | |||
1179 | return result; | ||
1180 | } | ||
1181 | |||
1182 | /////////////////////////////////////////////////////////////////////// | ||
1183 | void on_start_thread(std::size_t num_thread) | ||
1184 | { | ||
1185 | if (nullptr == queues_[num_thread]) | ||
1186 | { | ||
1187 | queues_[num_thread] = | ||
1188 | new thread_queue_type(max_queue_thread_count_); | ||
1189 | |||
1190 | if (num_thread < high_priority_queues_.size()) | ||
1191 | { | ||
1192 | high_priority_queues_[num_thread] = | ||
1193 | new thread_queue_type(max_queue_thread_count_); | ||
1194 | } | ||
1195 | } | ||
1196 | |||
1197 | // forward this call to all queues etc. | ||
1198 | if (num_thread < high_priority_queues_.size()) | ||
1199 | high_priority_queues_[num_thread]->on_start_thread(num_thread); | ||
1200 | if (num_thread == queues_.size()-1) | ||
1201 | low_priority_queue_.on_start_thread(num_thread); | ||
1202 | |||
1203 | queues_[num_thread]->on_start_thread(num_thread); | ||
1204 | |||
1205 | // pre-calculate certain constants for the given thread number | ||
1206 | std::size_t num_pu = get_pu_num(num_thread); | ||
1207 | mask_cref_type machine_mask = topology_.get_machine_affinity_mask(); | ||
1208 | mask_cref_type core_mask = | ||
1209 | topology_.get_thread_affinity_mask(num_pu, numa_sensitive_ != 0); | ||
1210 | mask_cref_type node_mask = | ||
1211 | topology_.get_numa_node_affinity_mask(num_pu, numa_sensitive_ != 0); | ||
1212 | |||
1213 | if (any(core_mask) && any(node_mask)) | ||
1214 | { | ||
1215 | #if !defined(HPX_NATIVE_MIC) // we know that the MIC has one NUMA domain only | ||
1216 | set(steals_in_numa_domain_, num_pu); | ||
1217 | #endif | ||
1218 | numa_domain_masks_[num_thread] = node_mask; | ||
1219 | } | ||
1220 | |||
1221 | // we allow the thread on the boundary of the NUMA domain to steal | ||
1222 | mask_type first_mask = mask_type(); | ||
1223 | resize(first_mask, mask_size(core_mask)); | ||
1224 | |||
1225 | std::size_t first = find_first(node_mask); | ||
1226 | if (first != std::size_t(-1)) | ||
1227 | set(first_mask, first); | ||
1228 | else | ||
1229 | first_mask = core_mask; | ||
1230 | |||
1231 | if (numa_sensitive_ != 2 && any(first_mask & core_mask)) | ||
1232 | { | ||
1233 | #if !defined(HPX_NATIVE_MIC) // we know that the MIC has one NUMA domain only | ||
1234 | set(steals_outside_numa_domain_, num_pu); | ||
1235 | #endif | ||
1236 | outside_numa_domain_masks_[num_thread] = | ||
1237 | not_(node_mask) & machine_mask; | ||
1238 | } | ||
1239 | } | ||
1240 | |||
1241 | void on_stop_thread(std::size_t num_thread) | ||
1242 | { | ||
1243 | if (num_thread < high_priority_queues_.size()) | ||
1244 | high_priority_queues_[num_thread]->on_stop_thread(num_thread); | ||
1245 | if (num_thread == queues_.size()-1) | ||
1246 | low_priority_queue_.on_stop_thread(num_thread); | ||
1247 | |||
1248 | queues_[num_thread]->on_stop_thread(num_thread); | ||
1249 | } | ||
1250 | |||
1251 | void on_error(std::size_t num_thread, boost::exception_ptr const& e) | ||
1252 | { | ||
1253 | if (num_thread < high_priority_queues_.size()) | ||
1254 | high_priority_queues_[num_thread]->on_error(num_thread, e); | ||
1255 | if (num_thread == queues_.size()-1) | ||
1256 | low_priority_queue_.on_error(num_thread, e); | ||
1257 | |||
1258 | queues_[num_thread]->on_error(num_thread, e); | ||
1259 | } | ||
1260 | |||
1261 | void reset_thread_distribution() | ||
1262 | { | ||
1263 | curr_queue_.store(0); | ||
1264 | } | ||
1265 | |||
1266 | protected: | ||
1267 | std::size_t max_queue_thread_count_; | ||
1268 | std::vector<thread_queue_type*> queues_; | ||
1269 | std::vector<thread_queue_type*> high_priority_queues_; | ||
1270 | thread_queue_type low_priority_queue_; | ||
1271 | boost::atomic<std::size_t> curr_queue_; | ||
1272 | std::size_t numa_sensitive_; | ||
1273 | |||
1274 | #if !defined(HPX_NATIVE_MIC) // we know that the MIC has one NUMA domain only | ||
1275 | mask_type steals_in_numa_domain_; | ||
1276 | mask_type steals_outside_numa_domain_; | ||
1277 | #endif | ||
1278 | std::vector<mask_type> numa_domain_masks_; | ||
1279 | std::vector<mask_type> outside_numa_domain_masks_; | ||
1280 | }; | ||
1281 | }}} | ||
1282 | |||
1283 | #include <hpx/config/warnings_suffix.hpp> | ||
1284 | |||
1285 | #endif | ||
1286 | |||
1287 |
Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.