Line | % of fetches | Source |
---|---|---|
1 | // Copyright (c) 2014 University of Oregon | |
2 | // | |
3 | ||
4 | #ifdef APEX_HAVE_HPX | |
5 | #include <hpx/config.hpp> | |
6 | #ifdef APEX_HAVE_OTF2 | |
7 | #define APEX_TRACE_APEX | |
8 | #endif // APEX_HAVE_OTF2 | |
9 | #endif // APEX_HAVE_HPX | |
10 | ||
11 | #include "profiler_listener.hpp" | |
12 | #include "profiler.hpp" | |
13 | #include "thread_instance.hpp" | |
14 | #include <iostream> | |
15 | #include <iomanip> | |
16 | #include <fstream> | |
17 | #include <math.h> | |
18 | #include "apex_options.hpp" | |
19 | #include "profiler.hpp" | |
20 | #include "profile.hpp" | |
21 | #include "apex.hpp" | |
22 | ||
23 | #include <atomic> | |
24 | #if !defined(_WIN32) && (defined(__unix__) || defined(__unix) || (defined(__APPLE__) && defined(__MACH__))) | |
25 | #include <unistd.h> | |
26 | #include <sched.h> | |
27 | #endif | |
28 | #include <cstdio> | |
29 | #include <vector> | |
30 | #include <string> | |
31 | #include <unordered_set> | |
32 | #include <algorithm> | |
33 | #include <iterator> | |
34 | ||
35 | #include <functional> | |
36 | #include <thread> | |
37 | #include <future> | |
38 | ||
39 | #if defined(APEX_THROTTLE) | |
40 | #include "apex_cxx_shared_lock.hpp" | |
41 | apex::shared_mutex_type throttled_event_set_mutex; | |
42 | #define APEX_THROTTLE_CALLS 1000 | |
43 | #ifdef APEX_USE_CLOCK_TIMESTAMP | |
44 | #define APEX_THROTTLE_PERCALL 0.00001 // 10 microseconds. | |
45 | #else | |
46 | #define APEX_THROTTLE_PERCALL 50000 // 50k cycles. | |
47 | #endif | |
48 | #endif | |
49 | ||
50 | #if APEX_HAVE_PAPI | |
51 | #include "papi.h" | |
52 | #include <mutex> | |
53 | std::mutex event_set_mutex; | |
54 | #endif | |
55 | ||
56 | #ifdef APEX_HAVE_HPX | |
57 | #include <boost/assign.hpp> | |
58 | #include <boost/cstdint.hpp> | |
59 | #include <hpx/include/performance_counters.hpp> | |
60 | #include <hpx/include/actions.hpp> | |
61 | #include <hpx/include/util.hpp> | |
62 | #include <hpx/lcos/local/composable_guard.hpp> | |
63 | static void apex_schedule_process_profiles(void); // not in apex namespace | |
64 | const int num_non_worker_threads_registered = 0; | |
65 | #endif | |
66 | ||
67 | #define APEX_MAIN "APEX MAIN" | |
68 | ||
69 | #ifdef APEX_HAVE_TAU | |
70 | #define PROFILING_ON | |
71 | #define TAU_DOT_H_LESS_HEADERS | |
72 | #include <TAU.h> | |
73 | #endif | |
74 | ||
75 | #include "utils.hpp" | |
76 | ||
77 | #include <cstdlib> | |
78 | #include <ctime> | |
79 | ||
80 | using namespace std; | |
81 | using namespace apex; | |
82 | ||
83 | APEX_NATIVE_TLS unsigned int my_tid = 0; // the current thread's TID in APEX | |
84 | ||
85 | namespace apex { | |
86 | ||
87 | #ifdef APEX_MULTIPLE_QUEUES | |
88 | /* this is a thread-local pointer to a concurrent queue for each worker thread. */ | |
89 | __thread profiler_queue_t * thequeue; | |
90 | #endif | |
91 | ||
92 | /* THis is a special profiler, indicating that the timer requested is | |
93 | throttled, and shouldn't be processed. */ | |
94 | profiler* profiler::disabled_profiler = new profiler(); | |
95 | ||
96 | #ifdef APEX_HAVE_HPX | |
97 | /* Flag indicating whether a consumer task is currently running */ | |
98 | std::atomic_flag consumer_task_running = ATOMIC_FLAG_INIT; | |
99 | bool hpx_shutdown = false; | |
100 | #endif | |
101 | ||
102 | double profiler_listener::get_non_idle_time() { | |
103 | double non_idle_time = 0.0; | |
104 | /* Iterate over all timers and accumulate the time spent in them */ | |
105 | unordered_map<task_identifier, profile*>::const_iterator it2; | |
106 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
107 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
108 | profile * p = it2->second; | |
109 | #if defined(APEX_THROTTLE) | |
110 | task_identifier id = it2->first; | |
111 | unordered_set<task_identifier>::const_iterator it4; | |
112 | { | |
113 | read_lock_type l(throttled_event_set_mutex); | |
114 | it4 = throttled_tasks.find(id); | |
115 | } | |
116 | if (it4!= throttled_tasks.end()) { | |
117 | continue; | |
118 | } | |
119 | #endif | |
120 | if (p->get_type() == APEX_TIMER) { | |
121 | non_idle_time += p->get_accumulated(); | |
122 | } | |
123 | } | |
124 | return non_idle_time*profiler::get_cpu_mhz(); | |
125 | } | |
126 | ||
127 | profile * profiler_listener::get_idle_time() { | |
128 | double non_idle_time = get_non_idle_time(); | |
129 | /* Subtract the accumulated time from the main time span. */ | |
130 | int num_worker_threads = thread_instance::get_num_threads(); | |
131 | #ifdef APEX_HAVE_HPX | |
132 | num_worker_threads = num_worker_threads - num_non_worker_threads_registered; | |
133 | #endif | |
134 | std::chrono::duration<double> time_span = | |
135 | std::chrono::duration_cast<std::chrono::duration<double>> | |
136 | (MYCLOCK::now() - main_timer->start); | |
137 | double total_main = time_span.count() * | |
138 | fmin(hardware_concurrency(), num_worker_threads); | |
139 | double elapsed = total_main - non_idle_time; | |
140 | elapsed = elapsed > 0.0 ? elapsed : 0.0; | |
141 | profile * theprofile = new profile(elapsed*profiler::get_cpu_mhz(), 0, NULL, false); | |
142 | return theprofile; | |
143 | } | |
144 | ||
145 | profile * profiler_listener::get_idle_rate() { | |
146 | double non_idle_time = get_non_idle_time(); | |
147 | /* Subtract the accumulated time from the main time span. */ | |
148 | int num_worker_threads = thread_instance::get_num_threads(); | |
149 | #ifdef APEX_HAVE_HPX | |
150 | num_worker_threads = num_worker_threads - num_non_worker_threads_registered; | |
151 | #endif | |
152 | std::chrono::duration<double> time_span = | |
153 | std::chrono::duration_cast<std::chrono::duration<double>> | |
154 | (MYCLOCK::now() - main_timer->start); | |
155 | double total_main = time_span.count() * | |
156 | fmin(hardware_concurrency(), num_worker_threads); | |
157 | double elapsed = total_main - non_idle_time; | |
158 | double rate = elapsed > 0.0 ? ((elapsed/total_main)) : 0.0; | |
159 | profile * theprofile = new profile(rate, 0, NULL, false); | |
160 | return theprofile; | |
161 | } | |
162 | ||
163 | /* Return the requested profile object to the user. | |
164 | * Return nullptr if doesn't exist. */ | |
165 | profile * profiler_listener::get_profile(task_identifier &id) { | |
166 | if (id.name == string(APEX_IDLE_RATE)) { | |
167 | return get_idle_rate(); | |
168 | } else if (id.name == string(APEX_IDLE_TIME)) { | |
169 | return get_idle_time(); | |
170 | } else if (id.name == string(APEX_NON_IDLE_TIME)) { | |
171 | profile * theprofile = new profile(get_non_idle_time(), 0, NULL, false); | |
172 | return theprofile; | |
173 | } | |
174 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
175 | unordered_map<task_identifier, profile*>::const_iterator it = task_map.find(id); | |
176 | if (it != task_map.end()) { | |
177 | return (*it).second; | |
178 | } | |
179 | return nullptr; | |
180 | } | |
181 | ||
182 | void profiler_listener::reset_all(void) { | |
183 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
184 | for(auto &it : task_map) { | |
185 | it.second->reset(); | |
186 | } | |
187 | } | |
188 | ||
189 | /* After the consumer thread pulls a profiler off of the queue, | |
190 | * process it by updating its profile object in the map of profiles. */ | |
191 | // TODO The name-based timer and address-based timer paths through | |
192 | // the code involve a lot of duplication -- this should be refactored | |
193 | // to remove the duplication so it's easier to maintain. | |
194 | unsigned int profiler_listener::process_profile(std::shared_ptr<profiler> &p, unsigned int tid) | |
195 | { | |
196 | if(p == nullptr) return 0; | |
197 | profile * theprofile; | |
198 | if(p->is_reset == reset_type::ALL) { | |
199 | reset_all(); | |
200 | return 0; | |
201 | } | |
202 | double values[8] = {0}; | |
203 | double tmp_num_counters = 0; | |
204 | #if APEX_HAVE_PAPI | |
205 | tmp_num_counters = num_papi_counters; | |
206 | for (int i = 0 ; i < num_papi_counters ; i++) { | |
207 | if (p->papi_stop_values[i] > p->papi_start_values[i]) { | |
208 | values[i] = p->papi_stop_values[i] - p->papi_start_values[i]; | |
209 | } else { | |
210 | values[i] = 0.0; | |
211 | } | |
212 | } | |
213 | #endif | |
214 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex, std::defer_lock); | |
215 | // There is only one consumer thread except during shutdown, so we only need | |
216 | // to lock during shutdown. | |
217 | bool did_lock = false; | |
218 | if(_done) { | |
219 | task_map_lock.lock(); | |
220 | did_lock = true; | |
221 | } | |
222 | unordered_map<task_identifier, profile*>::const_iterator it = task_map.find(*(p->task_id)); | |
223 | if (it != task_map.end()) { | |
224 | // A profile for this ID already exists. | |
225 | theprofile = (*it).second; | |
226 | if(_done && did_lock) { | |
227 | task_map_lock.unlock(); | |
228 | } | |
229 | if(p->is_reset == reset_type::CURRENT) { | |
230 | theprofile->reset(); | |
231 | } else { | |
232 | theprofile->increment(p->elapsed(), tmp_num_counters, values, p->is_resume); | |
233 | } | |
234 | #if defined(APEX_THROTTLE) | |
235 | // Is this a lightweight task? If so, we shouldn't measure it any more, | |
236 | // in order to reduce overhead. | |
237 | if (theprofile->get_calls() > APEX_THROTTLE_CALLS && | |
238 | theprofile->get_mean() < APEX_THROTTLE_PERCALL) { | |
239 | unordered_set<task_identifier>::const_iterator it2; | |
240 | { | |
241 | read_lock_type l(throttled_event_set_mutex); | |
242 | it2 = throttled_tasks.find(*(p->task_id)); | |
243 | } | |
244 | if (it2 == throttled_tasks.end()) { | |
245 | // lock the set for insert | |
246 | { | |
247 | write_lock_type l(throttled_event_set_mutex); | |
248 | // was it inserted when we were waiting? | |
249 | it2 = throttled_tasks.find(*(p->task_id)); | |
250 | // no? OK - insert it. | |
251 | if (it2 == throttled_tasks.end()) { | |
252 | throttled_tasks.insert(*(p->task_id)); | |
253 | } | |
254 | } | |
255 | if (apex_options::use_screen_output()) { | |
256 | cout << "APEX: disabling lightweight timer " | |
257 | << p->task_id->get_name() | |
258 | << endl; | |
259 | fflush(stdout); | |
260 | } | |
261 | } | |
262 | } | |
263 | #endif | |
264 | } else { | |
265 | // Create a new profile for this name. | |
266 | theprofile = new profile(p->is_reset == reset_type::CURRENT ? 0.0 : p->elapsed(), tmp_num_counters, values, p->is_resume, p->is_counter ? APEX_COUNTER : APEX_TIMER); | |
267 | task_map[*(p->task_id)] = theprofile; | |
268 | if(_done && did_lock) { | |
269 | task_map_lock.unlock(); | |
270 | } | |
271 | #ifdef APEX_HAVE_HPX | |
272 | #ifdef APEX_REGISTER_HPX3_COUNTERS | |
273 | if(!_done) { | |
274 | if(get_hpx_runtime_ptr() != nullptr && p->task_id->has_name()) { | |
275 | std::string timer_name(p->task_id->get_name()); | |
276 | //Don't register timers containing "/" | |
277 | if(timer_name.find("/") == std::string::npos) { | |
278 | hpx::performance_counters::install_counter_type( | |
279 | std::string("/apex/") + timer_name, | |
280 | [p](bool r)->boost::int64_t{ | |
281 | boost::int64_t value(p->elapsed()); | |
282 | return value; | |
283 | }, | |
284 | std::string("APEX counter ") + timer_name, | |
285 | "" | |
286 | ); | |
287 | } | |
288 | } else { | |
289 | std::cerr << "HPX runtime not initialized yet." << std::endl; | |
290 | } | |
291 | } | |
292 | #endif | |
293 | #endif | |
294 | } | |
295 | #if !defined(_MSC_VER) | |
296 | /* write the sample to the file */ | |
297 | if (apex_options::task_scatterplot()) { | |
298 | if (!p->is_counter) { | |
299 | static int thresh = RAND_MAX/100; | |
300 | if (std::rand() < thresh) { | |
301 | std::unique_lock<std::mutex> task_map_lock(_mtx); | |
302 | task_scatterplot_samples << p->normalized_timestamp() << " " | |
303 | << p->elapsed()*profiler::get_cpu_mhz()*1000000 << " " | |
304 | << "'" << p->task_id->get_name() << "'" << endl; | |
305 | int loc0 = task_scatterplot_samples.tellp(); | |
306 | if (loc0 > 32768) { | |
307 | // lock access to the file | |
308 | // write using low-level file locking! | |
309 | struct flock fl; | |
310 | fl.l_type = F_WRLCK; /* F_RDLCK, F_WRLCK, F_UNLCK */ | |
311 | fl.l_whence = SEEK_SET; /* SEEK_SET, SEEK_CUR, SEEK_END */ | |
312 | fl.l_start = 0; /* Offset from l_whence */ | |
313 | fl.l_len = 0; /* length, 0 = to EOF */ | |
314 | fl.l_pid = getpid(); /* our PID */ | |
315 | fcntl(task_scatterplot_sample_file, F_SETLKW, &fl); /* F_GETLK, F_SETLK, F_SETLKW */ | |
316 | // flush the string stream to the file | |
317 | //lseek(task_scatterplot_sample_file, 0, SEEK_END); | |
318 | ssize_t bytes_written = write(task_scatterplot_sample_file, | |
319 | task_scatterplot_samples.str().c_str(), loc0); | |
320 | if (bytes_written < 0) { | |
321 | int errsv = errno; | |
322 | perror("Error writing to scatterplot!"); | |
323 | fprintf(stderr, "Error writing scatterplot:\n%s\n", | |
324 | strerror(errsv)); | |
325 | } | |
326 | fl.l_type = F_UNLCK; /* tell it to unlock the region */ | |
327 | fcntl(task_scatterplot_sample_file, F_SETLK, &fl); /* set the region to unlocked */ | |
328 | // reset the stringstream | |
329 | task_scatterplot_samples.str(""); | |
330 | } | |
331 | } | |
332 | } | |
333 | } | |
334 | #endif | |
335 | return 1; | |
336 | } | |
337 | ||
338 | inline unsigned int profiler_listener::process_dependency(task_dependency* td) | |
339 | { | |
340 | unordered_map<task_identifier, unordered_map<task_identifier, int>* >::const_iterator it = task_dependencies.find(td->parent); | |
341 | unordered_map<task_identifier, int> * depend; | |
342 | // if this is a new dependency for this parent? | |
343 | if (it == task_dependencies.end()) { | |
344 | depend = new unordered_map<task_identifier, int>(); | |
345 | (*depend)[td->child] = 1; | |
346 | task_dependencies[td->parent] = depend; | |
347 | // otherwise, see if this parent has seen this child | |
348 | } else { | |
349 | depend = it->second; | |
350 | unordered_map<task_identifier, int>::const_iterator it2 = depend->find(td->child); | |
351 | // first time for this child | |
352 | if (it2 == depend->end()) { | |
353 | (*depend)[td->child] = 1; | |
354 | // not the first time for this child | |
355 | } else { | |
356 | int tmp = it2->second; | |
357 | (*depend)[td->child] = tmp + 1; | |
358 | } | |
359 | } | |
360 | delete(td); | |
361 | return 1; | |
362 | } | |
363 | ||
364 | /* Cleaning up memory. Not really necessary, because it only gets | |
365 | * called at shutdown. But a good idea to do regardless. */ | |
366 | void profiler_listener::delete_profiles(void) { | |
367 | // iterate over the map and free the objects in the map | |
368 | unordered_map<task_identifier, profile*>::const_iterator it; | |
369 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
370 | for(it = task_map.begin(); it != task_map.end(); it++) { | |
371 | delete it->second; | |
372 | } | |
373 | // clear the map. | |
374 | task_map.clear(); | |
375 | ||
376 | } | |
377 | ||
378 | #define PAD_WITH_SPACES "%8s" | |
379 | #define FORMAT_PERCENT "%8.3f" | |
380 | #define FORMAT_SCIENTIFIC "%1.2e" | |
381 | ||
382 | template<typename ... Args> | |
383 | string string_format( const std::string& format, Args ... args ) | |
384 | { | |
385 | size_t size = snprintf( nullptr, 0, format.c_str(), args ... ) + 1; // Extra space for '\0' | |
386 | unique_ptr<char[]> buf( new char[ size ] ); | |
387 | snprintf( buf.get(), size, format.c_str(), args ... ); | |
388 | return string( buf.get(), buf.get() + size - 1 ); // We don't want the '\0' inside | |
389 | } | |
390 | ||
391 | void profiler_listener::write_one_timer(task_identifier &task_id, | |
392 | profile * p, stringstream &screen_output, | |
393 | stringstream &csv_output, double &total_accumulated, | |
394 | double &total_main) { | |
395 | string action_name = task_id.get_name(); | |
396 | string shorter(action_name); | |
397 | // to keep formatting pretty, trim any long timer names | |
398 | if (shorter.size() > 30) { | |
399 | shorter.resize(27); | |
400 | shorter.resize(30, '.'); | |
401 | } | |
402 | //screen_output << "\"" << shorter << "\", " ; | |
403 | screen_output << string_format("%30s", shorter.c_str()) << " : "; | |
404 | #if defined(APEX_THROTTLE) | |
405 | // if this profile was throttled, don't output the measurements. | |
406 | // they are limited and bogus, anyway. | |
407 | unordered_set<task_identifier>::const_iterator it4; | |
408 | { | |
409 | read_lock_type l(throttled_event_set_mutex); | |
410 | it4 = throttled_tasks.find(task_id); | |
411 | } | |
412 | if (it4!= throttled_tasks.end()) { | |
413 | screen_output << "DISABLED (high frequency, short duration)" << endl; | |
414 | return; | |
415 | } | |
416 | #endif | |
417 | if(p->get_calls() < 1) { | |
418 | p->get_profile()->calls = 1; | |
419 | } | |
420 | if (p->get_calls() < 999999) { | |
421 | screen_output << string_format(PAD_WITH_SPACES, to_string((int)p->get_calls()).c_str()) << " " ; | |
422 | } else { | |
423 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_calls()) << " " ; | |
424 | } | |
425 | if (p->get_type() == APEX_TIMER) { | |
426 | csv_output << "\"" << action_name << "\","; | |
427 | csv_output << llround(p->get_calls()) << ","; | |
428 | // convert MHz to Hz | |
429 | csv_output << std::llround(p->get_accumulated()) << ","; | |
430 | // convert MHz to microseconds | |
431 | csv_output << std::llround(p->get_accumulated()*profiler::get_cpu_mhz()*1000000); | |
432 | screen_output << " --n/a-- " ; | |
433 | screen_output << string_format(FORMAT_SCIENTIFIC, (p->get_mean()*profiler::get_cpu_mhz())) << " " ; | |
434 | screen_output << " --n/a-- " ; | |
435 | screen_output << string_format(FORMAT_SCIENTIFIC, (p->get_accumulated()*profiler::get_cpu_mhz())) << " " ; | |
436 | screen_output << " --n/a-- " ; | |
437 | screen_output << string_format(FORMAT_PERCENT, (((p->get_accumulated()*profiler::get_cpu_mhz())/total_main)*100)); | |
438 | #if APEX_HAVE_PAPI | |
439 | for (int i = 0 ; i < num_papi_counters ; i++) { | |
440 | screen_output << " " << string_format(FORMAT_SCIENTIFIC, (p->get_papi_metrics()[i])); | |
441 | csv_output << "," << std::llround(p->get_papi_metrics()[i]); | |
442 | } | |
443 | #endif | |
444 | screen_output << endl; | |
445 | total_accumulated += p->get_accumulated(); | |
446 | csv_output << endl; | |
447 | } else { | |
448 | if (action_name.find('%') == string::npos) { | |
449 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_minimum()) << " " ; | |
450 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_mean()) << " " ; | |
451 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_maximum()) << " " ; | |
452 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_accumulated()) << " " ; | |
453 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_stddev()) << " " ; | |
454 | } else { | |
455 | screen_output << string_format(FORMAT_PERCENT, p->get_minimum()) << " " ; | |
456 | screen_output << string_format(FORMAT_PERCENT, p->get_mean()) << " " ; | |
457 | screen_output << string_format(FORMAT_PERCENT, p->get_maximum()) << " " ; | |
458 | screen_output << string_format(FORMAT_PERCENT, p->get_accumulated()) << " " ; | |
459 | screen_output << string_format(FORMAT_PERCENT, p->get_stddev()) << " " ; | |
460 | } | |
461 | screen_output << " --n/a-- " << endl; | |
462 | } | |
463 | } | |
464 | ||
465 | /* At program termination, write the measurements to the screen, or to CSV file, or both. */ | |
466 | void profiler_listener::finalize_profiles(void) { | |
467 | // our TOTAL available time is the elapsed * the number of threads, or cores | |
468 | int num_worker_threads = thread_instance::get_num_threads(); | |
469 | double wall_clock_main = main_timer->elapsed() * profiler::get_cpu_mhz(); | |
470 | #ifdef APEX_HAVE_HPX | |
471 | num_worker_threads = num_worker_threads - num_non_worker_threads_registered; | |
472 | #endif | |
473 | double total_main = wall_clock_main * | |
474 | fmin(hardware_concurrency(), num_worker_threads); | |
475 | // create a stringstream to hold all the screen output - we may not | |
476 | // want to write it out | |
477 | stringstream screen_output; | |
478 | // create a stringstream to hold all the CSV output - we may not | |
479 | // want to write it out | |
480 | stringstream csv_output; | |
481 | // iterate over the profiles in the address map | |
482 | screen_output << "Elapsed time: " << wall_clock_main << endl; | |
483 | screen_output << "Cores detected: " << hardware_concurrency() << endl; | |
484 | screen_output << "Worker Threads observed: " << num_worker_threads << endl; | |
485 | screen_output << "Available CPU time: " << total_main << endl; | |
486 | map<apex_function_address, profile*>::const_iterator it; | |
487 | screen_output << "Action : #calls | minimum | mean | maximum | total | stddev | % total " << apex_options::papi_metrics() << endl; | |
488 | screen_output << "------------------------------------------------------------------------------------------------------------" << endl; | |
489 | csv_output << "\"task\",\"num calls\",\"total cycles\",\"total microseconds\""; | |
490 | #if APEX_HAVE_PAPI | |
491 | for (int i = 0 ; i < num_papi_counters ; i++) { | |
492 | csv_output << ",\"" << metric_names[i] << "\""; | |
493 | } | |
494 | #endif | |
495 | csv_output << endl; | |
496 | double total_accumulated = 0.0; | |
497 | unordered_map<task_identifier, profile*>::const_iterator it2; | |
498 | std::vector<task_identifier> id_vector; | |
499 | // iterate over the counters, and sort their names | |
500 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
501 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
502 | task_identifier task_id = it2->first; | |
503 | profile * p = it2->second; | |
504 | if (p->get_type() != APEX_TIMER) { | |
505 | id_vector.push_back(task_id); | |
506 | } | |
507 | } | |
508 | std::sort(id_vector.begin(), id_vector.end()); | |
509 | // iterate over the counters | |
510 | for(task_identifier task_id : id_vector) { | |
511 | profile * p = task_map[task_id]; | |
512 | if (p) { | |
513 | write_one_timer(task_id, p, screen_output, csv_output, total_accumulated, total_main); | |
514 | } | |
515 | } | |
516 | id_vector.clear(); | |
517 | // iterate over the timers, and sort their names | |
518 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
519 | profile * p = it2->second; | |
520 | task_identifier task_id = it2->first; | |
521 | if (p->get_type() == APEX_TIMER) { | |
522 | id_vector.push_back(task_id); | |
523 | } | |
524 | } | |
525 | // iterate over the timers | |
526 | std::sort(id_vector.begin(), id_vector.end()); | |
527 | // iterate over the counters | |
528 | for(task_identifier task_id : id_vector) { | |
529 | profile * p = task_map[task_id]; | |
530 | if (p) { | |
531 | write_one_timer(task_id, p, screen_output, csv_output, total_accumulated, total_main); | |
532 | } | |
533 | } | |
534 | double idle_rate = total_main - (total_accumulated*profiler::get_cpu_mhz()); | |
535 | screen_output << string_format("%30s", APEX_IDLE_TIME) << " : "; | |
536 | screen_output << " --n/a-- " ; | |
537 | screen_output << " --n/a-- " ; | |
538 | screen_output << " --n/a-- " ; | |
539 | screen_output << " --n/a-- " ; | |
540 | if (idle_rate < 0.0) { | |
541 | screen_output << " --n/a-- " ; | |
542 | } else { | |
543 | screen_output << string_format(FORMAT_SCIENTIFIC, idle_rate) << " " ; | |
544 | } | |
545 | screen_output << " --n/a-- " ; | |
546 | if (idle_rate < 0.0) { | |
547 | screen_output << " --n/a-- " << endl; | |
548 | } else { | |
549 | screen_output << string_format(FORMAT_PERCENT, ((idle_rate/total_main)*100)) << endl; | |
550 | } | |
551 | screen_output << "------------------------------------------------------------------------------------------------------------" << endl; | |
552 | if (apex_options::use_screen_output()) { | |
553 | cout << screen_output.str(); | |
554 | } | |
555 | if (apex_options::use_csv_output()) { | |
556 | ofstream csvfile; | |
557 | stringstream csvname; | |
558 | csvname << "apex." << node_id << ".csv"; | |
559 | csvfile.open(csvname.str(), ios::out); | |
560 | csvfile << csv_output.str(); | |
561 | csvfile.close(); | |
562 | } | |
563 | } | |
564 | ||
565 | /* The following code is from: | |
566 | http://stackoverflow.com/questions/7706339/grayscale-to-red-green-blue-matlab-jet-color-scale */ | |
567 | class node_color { | |
568 | public: | |
569 | double red; | |
570 | double green; | |
571 | double blue; | |
572 | node_color() : red(1.0), green(1.0), blue(1.0) {} | |
573 | int convert(double in) { return (int)(in * 255.0); } | |
574 | } ; | |
575 | ||
576 | node_color * get_node_color(double v,double vmin,double vmax) | |
577 | { | |
578 | node_color * c = new node_color(); | |
579 | double dv; | |
580 | ||
581 | if (v < vmin) | |
582 | v = vmin; | |
583 | if (v > vmax) | |
584 | v = vmax; | |
585 | dv = vmax - vmin; | |
586 | ||
587 | if (v < (vmin + 0.25 * dv)) { | |
588 | c->red = 0; | |
589 | c->green = 4 * (v - vmin) / dv; | |
590 | } else if (v < (vmin + 0.5 * dv)) { | |
591 | c->red = 0; | |
592 | c->blue = 1 + 4 * (vmin + 0.25 * dv - v) / dv; | |
593 | } else if (v < (vmin + 0.75 * dv)) { | |
594 | c->red = 4 * (v - vmin - 0.5 * dv) / dv; | |
595 | c->blue = 0; | |
596 | } else { | |
597 | c->green = 1 + 4 * (vmin + 0.75 * dv - v) / dv; | |
598 | c->blue = 0; | |
599 | } | |
600 | ||
601 | return(c); | |
602 | } | |
603 | ||
604 | void profiler_listener::write_taskgraph(void) { | |
605 | ofstream myfile; | |
606 | stringstream dotname; | |
607 | dotname << "taskgraph." << node_id << ".dot"; | |
608 | myfile.open(dotname.str().c_str()); | |
609 | ||
610 | myfile << "digraph prof {\n rankdir=\"LR\";\n node [shape=box];\n"; | |
611 | for(auto dep = task_dependencies.begin(); dep != task_dependencies.end(); dep++) { | |
612 | task_identifier parent = dep->first; | |
613 | auto children = dep->second; | |
614 | string parent_name = parent.get_name(); | |
615 | for(auto offspring = children->begin(); offspring != children->end(); offspring++) { | |
616 | task_identifier child = offspring->first; | |
617 | int count = offspring->second; | |
618 | string child_name = child.get_name(); | |
619 | myfile << " \"" << parent_name << "\" -> \"" << child_name << "\""; | |
620 | myfile << " [ label=\" count: " << count << "\" ]; " << std::endl; | |
621 | ||
622 | } | |
623 | } | |
624 | ||
625 | // our TOTAL available time is the elapsed * the number of threads, or cores | |
626 | int num_worker_threads = thread_instance::get_num_threads(); | |
627 | #ifdef APEX_HAVE_HPX | |
628 | num_worker_threads = num_worker_threads - num_non_worker_threads_registered; | |
629 | #endif | |
630 | double total_main = main_timer->elapsed() * | |
631 | fmin(hardware_concurrency(), num_worker_threads); | |
632 | ||
633 | // output nodes with "main" [shape=box; style=filled; fillcolor="#ff0000" ]; | |
634 | unordered_map<task_identifier, profile*>::const_iterator it; | |
635 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
636 | for(it = task_map.begin(); it != task_map.end(); it++) { | |
637 | profile * p = it->second; | |
638 | if (p->get_type() == APEX_TIMER) { | |
639 | node_color * c = get_node_color((p->get_accumulated()*profiler::get_cpu_mhz()), 0.0, total_main); | |
640 | task_identifier task_id = it->first; | |
641 | myfile << " \"" << task_id.get_name() << "\" [shape=box; style=filled; fillcolor=\"#" << | |
642 | setfill('0') << setw(2) << hex << c->convert(c->red) << | |
643 | setfill('0') << setw(2) << hex << c->convert(c->green) << | |
644 | setfill('0') << setw(2) << hex << c->convert(c->blue) << "\"" << | |
645 | "; label=\"" << task_id.get_name() << ":\\n" << (p->get_accumulated()*profiler::get_cpu_mhz()) << "s\" ];" << std::endl; | |
646 | } | |
647 | } | |
648 | myfile << "}\n"; | |
649 | myfile.close(); | |
650 | } | |
651 | ||
652 | /* When writing a TAU profile, write out a timer line */ | |
653 | void format_line(ofstream &myfile, profile * p) { | |
654 | myfile << p->get_calls() << " "; | |
655 | myfile << 0 << " "; | |
656 | myfile << ((p->get_accumulated()*profiler::get_cpu_mhz())) << " "; | |
657 | myfile << ((p->get_accumulated()*profiler::get_cpu_mhz())) << " "; | |
658 | myfile << 0 << " "; | |
659 | myfile << "GROUP=\"TAU_USER\" "; | |
660 | myfile << endl; | |
661 | } | |
662 | ||
663 | /* When writing a TAU profile, write out the main timer line */ | |
664 | void format_line(ofstream &myfile, profile * p, double not_main) { | |
665 | myfile << p->get_calls() << " "; | |
666 | myfile << 0 << " "; | |
667 | myfile << (max(((p->get_accumulated()*profiler::get_cpu_mhz()) - not_main),0.0)) << " "; | |
668 | myfile << ((p->get_accumulated()*profiler::get_cpu_mhz())) << " "; | |
669 | myfile << 0 << " "; | |
670 | myfile << "GROUP=\"TAU_USER\" "; | |
671 | myfile << endl; | |
672 | } | |
673 | ||
674 | /* When writing a TAU profile, write out a counter line */ | |
675 | void format_counter_line(ofstream &myfile, profile * p) { | |
676 | myfile << p->get_calls() << " "; // numevents | |
677 | myfile << p->get_maximum() << " "; // max | |
678 | myfile << p->get_minimum() << " "; // min | |
679 | myfile << p->get_mean() << " "; // mean | |
680 | myfile << p->get_sum_squares() << " "; | |
681 | myfile << endl; | |
682 | } | |
683 | ||
684 | /* Write TAU profiles from the collected data. */ | |
685 | void profiler_listener::write_profile() { | |
686 | ofstream myfile; | |
687 | stringstream datname; | |
688 | // name format: profile.nodeid.contextid.threadid | |
689 | // We only write one profile per process | |
690 | datname << "profile." << node_id << ".0.0"; | |
691 | ||
692 | // name format: profile.nodeid.contextid.threadid | |
693 | myfile.open(datname.str().c_str()); | |
694 | int counter_events = 0; | |
695 | ||
696 | // Determine number of counter events, as these need to be | |
697 | // excluded from the number of normal timers | |
698 | unordered_map<task_identifier, profile*>::const_iterator it2; | |
699 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
700 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
701 | profile * p = it2->second; | |
702 | if(p->get_type() == APEX_COUNTER) { | |
703 | counter_events++; | |
704 | } | |
705 | } | |
706 | int function_count = task_map.size() - counter_events; | |
707 | ||
708 | // Print the normal timers to the profile file | |
709 | // 1504 templated_functions_MULTI_TIME | |
710 | myfile << function_count << " templated_functions_MULTI_TIME" << endl; | |
711 | // # Name Calls Subrs Excl Incl ProfileCalls # | |
712 | myfile << "# Name Calls Subrs Excl Incl ProfileCalls #" << endl; | |
713 | thread_instance ti = thread_instance::instance(); | |
714 | ||
715 | // Iterate over the profiles which are associated to a function | |
716 | // by name. Only output the regular timers now. Counters are | |
717 | // in a separate section, below. | |
718 | profile * mainp = nullptr; | |
719 | double not_main = 0.0; | |
720 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
721 | profile * p = it2->second; | |
722 | task_identifier task_id = it2->first; | |
723 | if(p->get_type() == APEX_TIMER) { | |
724 | string action_name = task_id.get_name(); | |
725 | if(strcmp(action_name.c_str(), APEX_MAIN) == 0) { | |
726 | mainp = p; | |
727 | } else { | |
728 | myfile << "\"" << action_name << "\" "; | |
729 | format_line (myfile, p); | |
730 | not_main += (p->get_accumulated()*profiler::get_cpu_mhz()); | |
731 | } | |
732 | } | |
733 | } | |
734 | if (mainp != nullptr) { | |
735 | myfile << "\"" << APEX_MAIN << "\" "; | |
736 | format_line (myfile, mainp, not_main); | |
737 | } | |
738 | ||
739 | // 0 aggregates | |
740 | myfile << "0 aggregates" << endl; | |
741 | ||
742 | // Now process the counters, if there are any. | |
743 | if(counter_events > 0) { | |
744 | myfile << counter_events << " userevents" << endl; | |
745 | myfile << "# eventname numevents max min mean sumsqr" << endl; | |
746 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
747 | profile * p = it2->second; | |
748 | if(p->get_type() == APEX_COUNTER) { | |
749 | task_identifier task_id = it2->first; | |
750 | myfile << "\"" << task_id.get_name() << "\" "; | |
751 | format_counter_line (myfile, p); | |
752 | } | |
753 | } | |
754 | } | |
755 | myfile.close(); | |
756 | } | |
757 | ||
758 | /* | |
759 | * The main function for the consumer thread has to be static, but | |
760 | * the processing needs access to member variables, so get the | |
761 | * profiler_listener instance, and call it's proper function. | |
762 | * | |
763 | * This is a wrapper, so that we can launch the thread and set | |
764 | * affinity. However, process_profiles_wrapper is also used by the | |
765 | * last worker that calls apex_finalize(), so we don't want to change | |
766 | * that thread's affinity. So this wrapper is only for the consumer | |
767 | * thread. | |
768 | */ | |
769 | void profiler_listener::consumer_process_profiles_wrapper(void) { | |
770 | if (apex_options::pin_apex_threads()) { | |
771 | set_thread_affinity(); | |
772 | } | |
773 | process_profiles_wrapper(); | |
774 | } | |
775 | ||
776 | /* | |
777 | * The main function for the consumer thread has to be static, but | |
778 | * the processing needs access to member variables, so get the | |
779 | * profiler_listener instance, and call it's proper function. | |
780 | */ | |
781 | void profiler_listener::process_profiles_wrapper(void) { | |
782 | apex * inst = apex::instance(); | |
783 | if (inst != nullptr) { | |
784 | profiler_listener * pl = inst->the_profiler_listener; | |
785 | if (pl != nullptr) { | |
786 | #ifdef APEX_TRACE_APEX | |
787 | profiler * p = start((apex_function_address)&profiler_listener::process_profiles_wrapper); | |
788 | pl->process_profiles(); | |
789 | stop(p); | |
790 | #else | |
791 | pl->process_profiles(); | |
792 | #endif | |
793 | } | |
794 | } | |
795 | } | |
796 | ||
797 | bool profiler_listener::concurrent_cleanup(void){ | |
798 | std::shared_ptr<profiler> p; | |
799 | #ifdef APEX_MULTIPLE_QUEUES | |
800 | std::unique_lock<std::mutex> queue_lock(queue_mtx); | |
801 | std::vector<profiler_queue_t*>::const_iterator a_queue; | |
802 | for (a_queue = allqueues.begin() ; a_queue != allqueues.end() ; ++a_queue) { | |
803 | thequeue = *a_queue; | |
804 | while(thequeue->try_dequeue(p)) { | |
805 | process_profile(p,0); | |
806 | } | |
807 | } | |
808 | #else | |
809 | while(thequeue.try_dequeue(p)) { | |
810 | process_profile(p,0); | |
811 | } | |
812 | #endif | |
813 | return true; | |
814 | } | |
815 | ||
816 | /* This is the main function for the consumer thread. | |
817 | * It will wait at a semaphore for pending work. When there is | |
818 | * work on one or more queues, it will iterate over the queues | |
819 | * and process the pending profiler objects, updating the profiles | |
820 | * as it goes. */ | |
821 | void profiler_listener::process_profiles(void) | |
822 | { | |
823 | if (!_initialized) { | |
824 | initialize_worker_thread_for_TAU(); | |
825 | _initialized = true; | |
826 | } | |
827 | #ifdef APEX_HAVE_TAU | |
828 | if (apex_options::use_tau()) { | |
829 | TAU_START("profiler_listener::process_profiles"); | |
830 | } | |
831 | #endif | |
832 | ||
833 | std::shared_ptr<profiler> p; | |
834 | task_dependency* td; | |
835 | // Main loop. Stay in this loop unless "done". | |
836 | #ifndef APEX_HAVE_HPX | |
837 | while (!_done) { | |
838 | queue_signal.wait(); | |
839 | #endif | |
840 | #ifdef APEX_HAVE_TAU | |
841 | /* | |
842 | if (apex_options::use_tau()) { | |
843 | TAU_START("profiler_listener::process_profiles: main loop"); | |
844 | } | |
845 | */ | |
846 | #endif | |
847 | #ifdef APEX_MULTIPLE_QUEUES | |
848 | std::vector<profiler_queue_t*>::const_iterator a_queue; | |
849 | std::unique_lock<std::mutex> queue_lock(queue_mtx); | |
850 | int i = 0; | |
851 | for (a_queue = allqueues.begin() ; a_queue != allqueues.end() ; ++a_queue) { | |
852 | thequeue = *a_queue; | |
853 | while(!_done && thequeue->try_dequeue(p)) { | |
854 | process_profile(p, 0); | |
855 | #ifdef APEX_HAVE_HPX // don't hang out in this task too long. | |
856 | if (++i > 1000) break; | |
857 | #endif | |
858 | } | |
859 | } | |
860 | #else | |
861 | while(!_done && thequeue.try_dequeue(p)) { | |
862 | process_profile(p, 0); | |
863 | } | |
864 | #endif | |
865 | if (apex_options::use_taskgraph_output()) { | |
866 | while(!_done && dependency_queue.try_dequeue(td)) { | |
867 | process_dependency(td); | |
868 | } | |
869 | } | |
870 | /* | |
871 | * I want to process the tasks concurrently, but this loop | |
872 | * is too much overhead. Maybe dequeue them in batches? | |
873 | */ | |
874 | /* | |
875 | std::vector<std::future<void>> pending_futures; | |
876 | while(!_done && thequeue.try_dequeue(p)) { | |
877 | auto f = std::async(my_stupid_wrapper,p); | |
878 | // transfer the future's shared state to a longer-lived future | |
879 | pending_futures.push_back(std::move(f)); | |
880 | } | |
881 | for (auto iter = pending_futures.begin() ; iter < pending_futures.end() ; iter++ ) { | |
882 | iter->get(); | |
883 | } | |
884 | */ | |
885 | ||
886 | #ifdef APEX_HAVE_TAU | |
887 | /* | |
888 | if (apex_options::use_tau()) { | |
889 | TAU_STOP("profiler_listener::process_profiles: main loop"); | |
890 | } | |
891 | */ | |
892 | #endif | |
893 | #ifndef APEX_HAVE_HPX | |
894 | } | |
895 | ||
896 | if (apex_options::use_taskgraph_output()) { | |
897 | // process the task dependencies | |
898 | while(dependency_queue.try_dequeue(td)) { | |
899 | process_dependency(td); | |
900 | } | |
901 | } | |
902 | ||
903 | #endif // NOT DEFINED APEX_HAVE_HPX | |
904 | ||
905 | #ifdef APEX_HAVE_HPX | |
906 | consumer_task_running.clear(memory_order_release); | |
907 | #endif | |
908 | ||
909 | #ifdef APEX_HAVE_TAU | |
910 | if (apex_options::use_tau()) { | |
911 | TAU_STOP("profiler_listener::process_profiles"); | |
912 | } | |
913 | #endif | |
914 | } | |
915 | ||
916 | #if APEX_HAVE_PAPI | |
917 | APEX_NATIVE_TLS int EventSet = PAPI_NULL; | |
918 | enum papi_state { papi_running, papi_suspended }; | |
919 | APEX_NATIVE_TLS papi_state thread_papi_state = papi_suspended; | |
920 | #define PAPI_ERROR_CHECK(name) \ | |
921 | if (rc != 0) cout << "name: " << rc << ": " << PAPI_strerror(rc) << endl; | |
922 | ||
923 | void profiler_listener::initialize_PAPI(bool first_time) { | |
924 | int rc = 0; | |
925 | if (first_time) { | |
926 | PAPI_library_init( PAPI_VER_CURRENT ); | |
927 | //rc = PAPI_multiplex_init(); // use more counters than allowed | |
928 | //PAPI_ERROR_CHECK(PAPI_multiplex_init); | |
929 | PAPI_thread_init( thread_instance::get_id ); | |
930 | // default | |
931 | //rc = PAPI_set_domain(PAPI_DOM_ALL); | |
932 | //PAPI_ERROR_CHECK(PAPI_set_domain); | |
933 | } else { | |
934 | PAPI_register_thread(); | |
935 | } | |
936 | rc = PAPI_create_eventset(&EventSet); | |
937 | PAPI_ERROR_CHECK(PAPI_create_eventset); | |
938 | // default | |
939 | //rc = PAPI_assign_eventset_component (EventSet, 0); | |
940 | //PAPI_ERROR_CHECK(PAPI_assign_eventset_component); | |
941 | // default | |
942 | //rc = PAPI_set_granularity(PAPI_GRN_THR); | |
943 | //PAPI_ERROR_CHECK(PAPI_set_granularity); | |
944 | // unnecessary complexity | |
945 | //rc = PAPI_set_multiplex(EventSet); | |
946 | //PAPI_ERROR_CHECK(PAPI_set_multiplex); | |
947 | // parse the requested set of papi counters | |
948 | // The string is modified by strtok, so copy it. | |
949 | if (strlen(apex_options::papi_metrics()) > 0) { | |
950 | std::stringstream tmpstr(apex_options::papi_metrics()); | |
951 | // use stream iterators to copy the stream to the vector as whitespace separated strings | |
952 | std::istream_iterator<std::string> tmpstr_it(tmpstr); | |
953 | std::istream_iterator<std::string> tmpstr_end; | |
954 | std::vector<std::string> tmpstr_results(tmpstr_it, tmpstr_end); | |
955 | int code; | |
956 | // iterate over the counter names in the vector | |
957 | for (auto p : tmpstr_results) { | |
958 | int rc = PAPI_event_name_to_code(const_cast<char*>(p.c_str()), &code); | |
959 | if (PAPI_query_event (code) == PAPI_OK) { | |
960 | rc = PAPI_add_event(EventSet, code); | |
961 | PAPI_ERROR_CHECK(PAPI_add_event); | |
962 | if (rc != 0) { printf ("Event that failed: %s\n", p.c_str()); } | |
963 | if (first_time) { | |
964 | metric_names.push_back(string(p.c_str())); | |
965 | num_papi_counters++; | |
966 | } | |
967 | } | |
968 | } | |
969 | if (!apex_options::papi_suspend()) { | |
970 | rc = PAPI_start( EventSet ); | |
971 | PAPI_ERROR_CHECK(PAPI_start); | |
972 | thread_papi_state = papi_running; | |
973 | } | |
974 | } | |
975 | } | |
976 | ||
977 | #endif | |
978 | ||
979 | /* When APEX gets a STARTUP event, do some initialization. */ | |
980 | void profiler_listener::on_startup(startup_event_data &data) { | |
981 | if (!_done) { | |
982 | my_tid = (unsigned int)thread_instance::get_id(); | |
983 | #ifndef APEX_HAVE_HPX | |
984 | // Start the consumer thread, to process profiler objects. | |
985 | consumer_thread = new std::thread(consumer_process_profiles_wrapper); | |
986 | #endif | |
987 | ||
988 | #if APEX_HAVE_PAPI | |
989 | initialize_PAPI(true); | |
990 | event_sets[0] = EventSet; | |
991 | #endif | |
992 | ||
993 | /* This commented out code is to change the priority of the consumer thread. | |
994 | * IDEALLY, I would like to make this a low priority thread, but that is as | |
995 | * yet unsuccessful. */ | |
996 | #if 0 | |
997 | int retcode; | |
998 | int policy; | |
999 | ||
1000 | pthread_t threadID = (pthread_t) consumer_thread->native_handle(); | |
1001 | ||
1002 | struct sched_param param; | |
1003 | ||
1004 | if ((retcode = pthread_getschedparam(threadID, &policy, ¶m)) != 0) | |
1005 | { | |
1006 | errno = retcode; | |
1007 | perror("pthread_getschedparam"); | |
1008 | exit(EXIT_FAILURE); | |
1009 | } | |
1010 | std::cout << "INHERITED: "; | |
1011 | std::cout << "policy=" << ((policy == SCHED_FIFO) ? "SCHED_FIFO" : | |
1012 | (policy == SCHED_RR) ? "SCHED_RR" : | |
1013 | (policy == SCHED_OTHER) ? "SCHED_OTHER" : | |
1014 | "???") | |
1015 | << ", priority=" << param.sched_priority << " of " << sched_get_priority_min(policy) << "," << sched_get_priority_max(policy) << std::endl; | |
1016 | //param.sched_priority = 10; | |
1017 | if ((retcode = pthread_setschedparam(threadID, policy, ¶m)) != 0) | |
1018 | { | |
1019 | errno = retcode; | |
1020 | perror("pthread_setschedparam"); | |
1021 | exit(EXIT_FAILURE); | |
1022 | } | |
1023 | #endif | |
1024 | ||
1025 | // time the whole application. | |
1026 | main_timer = std::make_shared<profiler>(new task_identifier(string(APEX_MAIN))); | |
1027 | #if APEX_HAVE_PAPI | |
1028 | if (num_papi_counters > 0 && !apex_options::papi_suspend() && thread_papi_state == papi_running) { | |
1029 | int rc = PAPI_read( EventSet, main_timer->papi_start_values ); | |
1030 | PAPI_ERROR_CHECK(PAPI_read); | |
1031 | } | |
1032 | #endif | |
1033 | } | |
1034 | APEX_UNUSED(data); | |
1035 | } | |
1036 | ||
1037 | /* On the shutdown event, notify the consumer thread that we are done | |
1038 | * and set the "terminate" flag. */ | |
1039 | void profiler_listener::on_shutdown(shutdown_event_data &data) { | |
1040 | if (_done) { return; } | |
1041 | if (!_done) { | |
1042 | _done = true; | |
1043 | node_id = data.node_id; | |
1044 | //sleep(1); | |
1045 | #ifndef APEX_HAVE_HPX | |
1046 | queue_signal.post(); | |
1047 | if (consumer_thread != nullptr) { | |
1048 | consumer_thread->join(); | |
1049 | } | |
1050 | #endif | |
1051 | ||
1052 | // stop the main timer, and process that profile? | |
1053 | main_timer->stop(); | |
1054 | #if APEX_HAVE_PAPI | |
1055 | if (num_papi_counters > 0 && !apex_options::papi_suspend() && thread_papi_state == papi_running) { | |
1056 | int rc = PAPI_read( EventSet, main_timer->papi_stop_values ); | |
1057 | PAPI_ERROR_CHECK(PAPI_read); | |
1058 | } | |
1059 | #endif | |
1060 | // if this profile is processed, it will get deleted. so don't process it! | |
1061 | // It also clutters up the final profile, if generated. | |
1062 | //process_profile(main_timer.get(), my_tid); | |
1063 | ||
1064 | // output to screen? | |
1065 | if ((apex_options::use_screen_output() || | |
1066 | apex_options::use_csv_output()) && node_id == 0) | |
1067 | { | |
1068 | #ifdef APEX_MULTIPLE_QUEUES | |
1069 | size_t ignored = 0; | |
1070 | std::unique_lock<std::mutex> queue_lock(queue_mtx); | |
1071 | std::vector<profiler_queue_t*>::const_iterator a_queue; | |
1072 | for (a_queue = allqueues.begin() ; a_queue != allqueues.end() ; ++a_queue) { | |
1073 | thequeue = *a_queue; | |
1074 | ignored += thequeue->size_approx(); | |
1075 | } | |
1076 | #else | |
1077 | size_t ignored = thequeue.size_approx(); | |
1078 | #endif | |
1079 | if (ignored > 0) { | |
1080 | std::cerr << "Info: " << ignored << " items remaining on on the profiler_listener queue..."; | |
1081 | } | |
1082 | #ifndef APEX_HAVE_HPX | |
1083 | // We might be done, but check to make sure the queue is empty | |
1084 | std::vector<std::future<bool>> pending_futures; | |
1085 | for (unsigned int i=0; i<hardware_concurrency(); ++i) { | |
1086 | #ifdef APEX_STATIC | |
1087 | /* Static libC++ doesn't do async very well. In fact, it crashes. */ | |
1088 | auto f = std::async(&profiler_listener::concurrent_cleanup,this); | |
1089 | #else // APEX_STATIC | |
1090 | auto f = std::async(std::launch::async,&profiler_listener::concurrent_cleanup,this); | |
1091 | #endif // APEX_STATIC | |
1092 | // transfer the future's shared state to a longer-lived future | |
1093 | pending_futures.push_back(std::move(f)); | |
1094 | } | |
1095 | for (auto iter = pending_futures.begin() ; iter < pending_futures.end() ; iter++ ) { | |
1096 | iter->get(); | |
1097 | } | |
1098 | #endif // APEX_HAVE_HPX | |
1099 | if (ignored > 0) { | |
1100 | std::cerr << "done." << std::endl; | |
1101 | } | |
1102 | if (apex_options::use_screen_output() || apex_options::use_csv_output()) { | |
1103 | finalize_profiles(); | |
1104 | } | |
1105 | } | |
1106 | if (apex_options::use_taskgraph_output() && node_id == 0) | |
1107 | { | |
1108 | write_taskgraph(); | |
1109 | } | |
1110 | ||
1111 | // output to 1 TAU profile per process? | |
1112 | if (apex_options::use_profile_output() && !apex_options::use_tau()) { | |
1113 | write_profile(); | |
1114 | } | |
1115 | #if !defined(_MSC_VER) | |
1116 | if (apex_options::task_scatterplot()) { | |
1117 | // get the length of the stream | |
1118 | int loc0 = task_scatterplot_samples.tellp(); | |
1119 | // lock access to the file | |
1120 | // write using low-level file locking! | |
1121 | struct flock fl; | |
1122 | fl.l_type = F_WRLCK; /* F_RDLCK, F_WRLCK, F_UNLCK */ | |
1123 | fl.l_whence = SEEK_SET; /* SEEK_SET, SEEK_CUR, SEEK_END */ | |
1124 | fl.l_start = 0; /* Offset from l_whence */ | |
1125 | fl.l_len = 0; /* length, 0 = to EOF */ | |
1126 | fl.l_pid = getpid(); /* our PID */ | |
1127 | fcntl(task_scatterplot_sample_file, F_SETLKW, &fl); /* F_GETLK, F_SETLK, F_SETLKW */ | |
1128 | // flush the string stream to the file | |
1129 | //lseek(task_scatterplot_sample_file, 0, SEEK_END); | |
1130 | ssize_t bytes_written = write(task_scatterplot_sample_file, | |
1131 | task_scatterplot_samples.str().c_str(), loc0); | |
1132 | if (bytes_written < 0) { | |
1133 | int errsv = errno; | |
1134 | perror("Error writing to scatterplot!"); | |
1135 | fprintf(stderr, "Error writing scatterplot:\n%s\n", | |
1136 | strerror(errsv)); | |
1137 | } | |
1138 | fl.l_type = F_UNLCK; /* tell it to unlock the region */ | |
1139 | fcntl(task_scatterplot_sample_file, F_SETLK, &fl); /* set the region to unlocked */ | |
1140 | close(task_scatterplot_sample_file); | |
1141 | } | |
1142 | #endif | |
1143 | ||
1144 | } | |
1145 | /* The cleanup is disabled for now. Why? Because we want to be able | |
1146 | * to access the profiles at the end of the run, after APEX has | |
1147 | * finalized. */ | |
1148 | // cleanup. | |
1149 | // delete_profiles(); | |
1150 | } | |
1151 | ||
1152 | /* When a new node is created */ | |
1153 | void profiler_listener::on_new_node(node_event_data &data) { | |
1154 | if (!_done) { | |
1155 | } | |
1156 | APEX_UNUSED(data); | |
1157 | } | |
1158 | ||
1159 | /* When a new thread is registered, expand all of our storage as necessary | |
1160 | * to handle the new thread */ | |
1161 | void profiler_listener::on_new_thread(new_thread_event_data &data) { | |
1162 | if (!_done) { | |
1163 | my_tid = (unsigned int)thread_instance::get_id(); | |
1164 | async_thread_setup(); | |
1165 | #if APEX_HAVE_PAPI | |
1166 | initialize_PAPI(false); | |
1167 | event_set_mutex.lock(); | |
1168 | if (my_tid >= event_sets.size()) { | |
1169 | if (my_tid >= event_sets.size()) { | |
1170 | event_sets.resize(my_tid + 1); | |
1171 | } | |
1172 | } | |
1173 | event_sets[my_tid] = EventSet; | |
1174 | event_set_mutex.unlock(); | |
1175 | #endif | |
1176 | } | |
1177 | APEX_UNUSED(data); | |
1178 | } | |
1179 | ||
1180 | extern "C" int main (int, char**); | |
1181 | ||
1182 | /* When a start event happens, create a profiler object. Unless this | |
1183 | * named event is throttled, in which case do nothing, as quickly as possible */ | |
1184 | inline bool profiler_listener::_common_start(task_identifier * id, bool is_resume) { | |
1185 | if (!_done) { | |
1186 | #if defined(APEX_THROTTLE) | |
1187 | // if this timer is throttled, return without doing anything | |
1188 | unordered_set<task_identifier>::const_iterator it; | |
1189 | { | |
1190 | read_lock_type l(throttled_event_set_mutex); | |
1191 | it = throttled_tasks.find(*id); | |
1192 | } | |
1193 | if (it != throttled_tasks.end()) { | |
1194 | /* | |
1195 | * The throw is removed, because it is a performance penalty on some systems | |
1196 | * on_start now returns a boolean | |
1197 | */ | |
1198 | //throw disabled_profiler_exception(); // to be caught by apex::start/resume | |
1199 | return false; | |
1200 | } | |
1201 | #endif | |
1202 | // start the profiler object, which starts our timers | |
1203 | //std::shared_ptr<profiler> p = std::make_shared<profiler>(id, is_resume); | |
1204 | profiler * p = new profiler(id, is_resume); | |
1205 | thread_instance::instance().set_current_profiler(p); | |
1206 | #if APEX_HAVE_PAPI | |
1207 | if (num_papi_counters > 0 && !apex_options::papi_suspend()) { | |
1208 | // if papi was previously suspended, we need to start the counters | |
1209 | if (thread_papi_state == papi_suspended) { | |
1210 | int rc = PAPI_start( EventSet ); | |
1211 | PAPI_ERROR_CHECK(PAPI_start); | |
1212 | thread_papi_state = papi_running; | |
1213 | } | |
1214 | int rc = PAPI_read( EventSet, p->papi_start_values ); | |
1215 | PAPI_ERROR_CHECK(PAPI_read); | |
1216 | } else { | |
1217 | // if papi is still running, stop the counters | |
1218 | if (thread_papi_state == papi_running) { | |
1219 | long long dummy[8]; | |
1220 | int rc = PAPI_stop( EventSet, dummy ); | |
1221 | PAPI_ERROR_CHECK(PAPI_stop); | |
1222 | thread_papi_state = papi_suspended; | |
1223 | } | |
1224 | } | |
1225 | #endif | |
1226 | } else { | |
1227 | return false; | |
1228 | } | |
1229 | return true; | |
1230 | } | |
1231 | ||
1232 | inline void profiler_listener::push_profiler(int my_tid, std::shared_ptr<profiler> &p) { | |
1233 | // if we aren't processing profiler objects, just return. | |
1234 | if (!apex_options::process_async_state()) { return; } | |
1235 | #ifdef APEX_TRACE_APEX | |
1236 | if (p->task_id->address == (uint64_t)&profiler_listener::process_profiles_wrapper) { return; } | |
1237 | #endif | |
1238 | // we have to make a local copy, because lockfree queues DO NOT SUPPORT shared_ptrs! | |
1239 | #ifdef APEX_MULTIPLE_QUEUES | |
1240 | thequeue->enqueue(p); | |
1241 | #else | |
1242 | thequeue.enqueue(p); | |
1243 | #endif | |
1244 | /* | |
1245 | bool worked = thequeue.enqueue(p); | |
1246 | if (!worked) { | |
1247 | static std::atomic<bool> issued(false); | |
1248 | if (!issued) { | |
1249 | issued = true; | |
1250 | cout << "APEX Warning : failed to push " << p->task_id->get_name() << endl; | |
1251 | cout << "One or more frequently-called, lightweight functions is being timed." << endl; | |
1252 | } | |
1253 | } | |
1254 | */ | |
1255 | #ifndef APEX_HAVE_HPX | |
1256 | queue_signal.post(); | |
1257 | #endif | |
1258 | #ifdef APEX_HAVE_HPX | |
1259 | apex_schedule_process_profiles(); | |
1260 | #endif | |
1261 | } | |
1262 | ||
1263 | /* Stop the timer, if applicable, and queue the profiler object */ | |
1264 | inline void profiler_listener::_common_stop(std::shared_ptr<profiler> &p, bool is_yield) { | |
1265 | if (!_done) { | |
1266 | if (p) { | |
1267 | p->stop(is_yield); | |
1268 | #if APEX_HAVE_PAPI | |
1269 | if (num_papi_counters > 0 && !apex_options::papi_suspend() && thread_papi_state == papi_running) { | |
1270 | int rc = PAPI_read( EventSet, p->papi_stop_values ); | |
1271 | PAPI_ERROR_CHECK(PAPI_read); | |
1272 | } | |
1273 | #endif | |
1274 | // Why is this happening now? Why not at start? Why not at create? | |
1275 | /* | |
1276 | if (apex_options::use_taskgraph_output()) { | |
1277 | if (!p->is_resume) { | |
1278 | // get the PARENT profiler | |
1279 | profiler * parent_profiler = nullptr; | |
1280 | try { | |
1281 | parent_profiler = thread_instance::instance().get_parent_profiler(); | |
1282 | if (parent_profiler != NULL) { | |
1283 | task_identifier * parent = parent_profiler->task_id; | |
1284 | task_identifier * child = p->task_id; | |
1285 | dependency_queue.enqueue(new task_dependency(parent, child)); | |
1286 | } | |
1287 | } catch (empty_stack_exception& e) { } | |
1288 | } | |
1289 | } | |
1290 | */ | |
1291 | push_profiler(my_tid, p); | |
1292 | } | |
1293 | } | |
1294 | } | |
1295 | ||
1296 | /* Start the timer */ | |
1297 | bool profiler_listener::on_start(task_identifier * id) { | |
1298 | return _common_start(id, false); | |
1299 | } | |
1300 | ||
1301 | /* This is just like starting a timer, but don't increment the number of calls | |
1302 | * value. That is because we are restarting an existing timer. */ | |
1303 | bool profiler_listener::on_resume(task_identifier * id) { | |
1304 | return _common_start(id, true); | |
1305 | } | |
1306 | ||
1307 | /* Stop the timer */ | |
1308 | void profiler_listener::on_stop(std::shared_ptr<profiler> &p) { | |
1309 | _common_stop(p, p->is_resume); // don't change the yield/resume value! | |
1310 | } | |
1311 | ||
1312 | /* Stop the timer, but don't increment the number of calls */ | |
1313 | void profiler_listener::on_yield(std::shared_ptr<profiler> &p) { | |
1314 | _common_stop(p, true); | |
1315 | } | |
1316 | ||
1317 | /* When a thread exits, pop and stop all timers. */ | |
1318 | void profiler_listener::on_exit_thread(event_data &data) { | |
1319 | APEX_UNUSED(data); | |
1320 | } | |
1321 | ||
1322 | /* When an asynchronous thread is launched, they should | |
1323 | * call apex::async_thread_setup() which will end up here.*/ | |
1324 | void profiler_listener::async_thread_setup(void) { | |
1325 | #ifdef APEX_MULTIPLE_QUEUES | |
1326 | // for asynchronous threads, check to make sure there is a queue! | |
1327 | if (thequeue == nullptr) { | |
1328 | thequeue = new profiler_queue_t(); | |
1329 | { | |
1330 | std::unique_lock<std::mutex> queue_lock(queue_mtx); | |
1331 | allqueues.push_back(thequeue); | |
1332 | } | |
1333 | } | |
1334 | #endif | |
1335 | } | |
1336 | ||
1337 | /* When a sample value is processed, save it as a profiler object, and queue it. */ | |
1338 | void profiler_listener::on_sample_value(sample_value_event_data &data) { | |
1339 | if (!_done) { | |
1340 | std::shared_ptr<profiler> p = std::make_shared<profiler>(new task_identifier(*data.counter_name), data.counter_value); | |
1341 | p->is_counter = data.is_counter; | |
1342 | push_profiler(my_tid, p); | |
1343 | } | |
1344 | } | |
1345 | ||
1346 | void profiler_listener::on_new_task(task_identifier * id, uint64_t task_id) { | |
1347 | //cout << "New task: " << task_id << endl; | |
1348 | if (!apex_options::use_taskgraph_output()) { return; } | |
1349 | // get the current profiler | |
1350 | profiler * p = thread_instance::instance().get_current_profiler(); | |
1351 | if (p != NULL) { | |
1352 | dependency_queue.enqueue(new task_dependency(p->task_id, id)); | |
1353 | } else { | |
1354 | task_identifier * parent = new task_identifier(string("__start")); | |
1355 | dependency_queue.enqueue(new task_dependency(parent, id)); | |
1356 | } | |
1357 | } | |
1358 | ||
1359 | /* Communication send event. Save the number of bytes. */ | |
1360 | void profiler_listener::on_send(message_event_data &data) { | |
1361 | if (!_done) { | |
1362 | std::shared_ptr<profiler> p = std::make_shared<profiler>(new task_identifier("Bytes Sent"), (double)data.size); | |
1363 | push_profiler(0, p); | |
1364 | } | |
1365 | } | |
1366 | ||
1367 | /* Communication recv event. Save the number of bytes. */ | |
1368 | void profiler_listener::on_recv(message_event_data &data) { | |
1369 | if (!_done) { | |
1370 | std::shared_ptr<profiler> p = std::make_shared<profiler>(new task_identifier("Bytes Received"), (double)data.size); | |
1371 | push_profiler(0, p); | |
1372 | } | |
1373 | } | |
1374 | ||
1375 | /* For periodic stuff. Do something? */ | |
1376 | void profiler_listener::on_periodic(periodic_event_data &data) { | |
1377 | if (!_done) { | |
1378 | } | |
1379 | APEX_UNUSED(data); | |
1380 | } | |
1381 | ||
1382 | /* For custom event stuff. Do something? */ | |
1383 | void profiler_listener::on_custom_event(custom_event_data &data) { | |
1384 | if (!_done) { | |
1385 | } | |
1386 | APEX_UNUSED(data); | |
1387 | } | |
1388 | ||
1389 | void profiler_listener::reset(task_identifier * id) { | |
1390 | std::shared_ptr<profiler> p; | |
1391 | p = std::make_shared<profiler>(id, false, reset_type::CURRENT); | |
1392 | push_profiler(my_tid, p); | |
1393 | } | |
1394 | ||
1395 | profiler_listener::~profiler_listener (void) { | |
1396 | _done = true; // yikes! | |
1397 | finalize(); | |
1398 | delete_profiles(); | |
1399 | #ifndef APEX_HAVE_HPX | |
1400 | delete consumer_thread; | |
1401 | #endif | |
1402 | }; | |
1403 | ||
1404 | } | |
1405 | ||
1406 | #ifdef APEX_HAVE_HPX | |
1407 | HPX_DECLARE_ACTION(::apex::profiler_listener::process_profiles_wrapper, apex_internal_process_profiles_action); | |
1408 | HPX_ACTION_HAS_CRITICAL_PRIORITY(apex_internal_process_profiles_action); | |
1409 | HPX_PLAIN_ACTION(::apex::profiler_listener::process_profiles_wrapper, apex_internal_process_profiles_action); | |
1410 | ||
1411 | void apex_schedule_process_profiles() { | |
1412 | if(get_hpx_runtime_ptr() == nullptr) return; | |
1413 | if(hpx_shutdown) { | |
1414 | ::apex::profiler_listener::process_profiles_wrapper(); | |
1415 | } else { | |
1416 | if(!consumer_task_running.test_and_set(memory_order_acq_rel)) { | |
1417 | apex_internal_process_profiles_action act; | |
1418 | try { | |
1419 | hpx::apply(act, hpx::find_here()); | |
1420 | } catch(...) { | |
1421 | // During shutdown, we can't schedule a new task, | |
1422 | // so we process profiles ourselves. | |
1423 | profiler_listener::process_profiles_wrapper(); | |
1424 | } | |
1425 | } | |
1426 | } | |
1427 | } | |
1428 | ||
1429 | #endif | |
1430 | ||
1431 | ||
1432 |
Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.