| Line | % of fetches | Source |
|---|---|---|
| 1 | // Copyright (c) 2014 University of Oregon | |
| 2 | // | |
| 3 | ||
| 4 | #ifdef APEX_HAVE_HPX | |
| 5 | #include <hpx/config.hpp> | |
| 6 | #ifdef APEX_HAVE_OTF2 | |
| 7 | #define APEX_TRACE_APEX | |
| 8 | #endif // APEX_HAVE_OTF2 | |
| 9 | #endif // APEX_HAVE_HPX | |
| 10 | ||
| 11 | #include "profiler_listener.hpp" | |
| 12 | #include "profiler.hpp" | |
| 13 | #include "thread_instance.hpp" | |
| 14 | #include <iostream> | |
| 15 | #include <iomanip> | |
| 16 | #include <fstream> | |
| 17 | #include <math.h> | |
| 18 | #include "apex_options.hpp" | |
| 19 | #include "profiler.hpp" | |
| 20 | #include "profile.hpp" | |
| 21 | #include "apex.hpp" | |
| 22 | ||
| 23 | #include <atomic> | |
| 24 | #if !defined(_WIN32) && (defined(__unix__) || defined(__unix) || (defined(__APPLE__) && defined(__MACH__))) | |
| 25 | #include <unistd.h> | |
| 26 | #include <sched.h> | |
| 27 | #endif | |
| 28 | #include <cstdio> | |
| 29 | #include <vector> | |
| 30 | #include <string> | |
| 31 | #include <unordered_set> | |
| 32 | #include <algorithm> | |
| 33 | #include <iterator> | |
| 34 | ||
| 35 | #include <functional> | |
| 36 | #include <thread> | |
| 37 | #include <future> | |
| 38 | ||
| 39 | #if defined(APEX_THROTTLE) | |
| 40 | #include "apex_cxx_shared_lock.hpp" | |
| 41 | apex::shared_mutex_type throttled_event_set_mutex; | |
| 42 | #define APEX_THROTTLE_CALLS 1000 | |
| 43 | #ifdef APEX_USE_CLOCK_TIMESTAMP | |
| 44 | #define APEX_THROTTLE_PERCALL 0.00001 // 10 microseconds. | |
| 45 | #else | |
| 46 | #define APEX_THROTTLE_PERCALL 50000 // 50k cycles. | |
| 47 | #endif | |
| 48 | #endif | |
| 49 | ||
| 50 | #if APEX_HAVE_PAPI | |
| 51 | #include "papi.h" | |
| 52 | #include <mutex> | |
| 53 | std::mutex event_set_mutex; | |
| 54 | #endif | |
| 55 | ||
| 56 | #ifdef APEX_HAVE_HPX | |
| 57 | #include <boost/assign.hpp> | |
| 58 | #include <boost/cstdint.hpp> | |
| 59 | #include <hpx/include/performance_counters.hpp> | |
| 60 | #include <hpx/include/actions.hpp> | |
| 61 | #include <hpx/include/util.hpp> | |
| 62 | #include <hpx/lcos/local/composable_guard.hpp> | |
| 63 | static void apex_schedule_process_profiles(void); // not in apex namespace | |
| 64 | const int num_non_worker_threads_registered = 0; | |
| 65 | #endif | |
| 66 | ||
| 67 | #define APEX_MAIN "APEX MAIN" | |
| 68 | ||
| 69 | #ifdef APEX_HAVE_TAU | |
| 70 | #define PROFILING_ON | |
| 71 | #define TAU_DOT_H_LESS_HEADERS | |
| 72 | #include <TAU.h> | |
| 73 | #endif | |
| 74 | ||
| 75 | #include "utils.hpp" | |
| 76 | ||
| 77 | #include <cstdlib> | |
| 78 | #include <ctime> | |
| 79 | ||
| 80 | using namespace std; | |
| 81 | using namespace apex; | |
| 82 | ||
| 83 | APEX_NATIVE_TLS unsigned int my_tid = 0; // the current thread's TID in APEX | |
| 84 | ||
| 85 | namespace apex { | |
| 86 | ||
| 87 | #ifdef APEX_MULTIPLE_QUEUES | |
| 88 | /* this is a thread-local pointer to a concurrent queue for each worker thread. */ | |
| 89 | __thread profiler_queue_t * thequeue; | |
| 90 | #endif | |
| 91 | ||
| 92 | /* THis is a special profiler, indicating that the timer requested is | |
| 93 | throttled, and shouldn't be processed. */ | |
| 94 | profiler* profiler::disabled_profiler = new profiler(); | |
| 95 | ||
| 96 | #ifdef APEX_HAVE_HPX | |
| 97 | /* Flag indicating whether a consumer task is currently running */ | |
| 98 | std::atomic_flag consumer_task_running = ATOMIC_FLAG_INIT; | |
| 99 | bool hpx_shutdown = false; | |
| 100 | #endif | |
| 101 | ||
| 102 | double profiler_listener::get_non_idle_time() { | |
| 103 | double non_idle_time = 0.0; | |
| 104 | /* Iterate over all timers and accumulate the time spent in them */ | |
| 105 | unordered_map<task_identifier, profile*>::const_iterator it2; | |
| 106 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
| 107 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
| 108 | profile * p = it2->second; | |
| 109 | #if defined(APEX_THROTTLE) | |
| 110 | task_identifier id = it2->first; | |
| 111 | unordered_set<task_identifier>::const_iterator it4; | |
| 112 | { | |
| 113 | read_lock_type l(throttled_event_set_mutex); | |
| 114 | it4 = throttled_tasks.find(id); | |
| 115 | } | |
| 116 | if (it4!= throttled_tasks.end()) { | |
| 117 | continue; | |
| 118 | } | |
| 119 | #endif | |
| 120 | if (p->get_type() == APEX_TIMER) { | |
| 121 | non_idle_time += p->get_accumulated(); | |
| 122 | } | |
| 123 | } | |
| 124 | return non_idle_time*profiler::get_cpu_mhz(); | |
| 125 | } | |
| 126 | ||
| 127 | profile * profiler_listener::get_idle_time() { | |
| 128 | double non_idle_time = get_non_idle_time(); | |
| 129 | /* Subtract the accumulated time from the main time span. */ | |
| 130 | int num_worker_threads = thread_instance::get_num_threads(); | |
| 131 | #ifdef APEX_HAVE_HPX | |
| 132 | num_worker_threads = num_worker_threads - num_non_worker_threads_registered; | |
| 133 | #endif | |
| 134 | std::chrono::duration<double> time_span = | |
| 135 | std::chrono::duration_cast<std::chrono::duration<double>> | |
| 136 | (MYCLOCK::now() - main_timer->start); | |
| 137 | double total_main = time_span.count() * | |
| 138 | fmin(hardware_concurrency(), num_worker_threads); | |
| 139 | double elapsed = total_main - non_idle_time; | |
| 140 | elapsed = elapsed > 0.0 ? elapsed : 0.0; | |
| 141 | profile * theprofile = new profile(elapsed*profiler::get_cpu_mhz(), 0, NULL, false); | |
| 142 | return theprofile; | |
| 143 | } | |
| 144 | ||
| 145 | profile * profiler_listener::get_idle_rate() { | |
| 146 | double non_idle_time = get_non_idle_time(); | |
| 147 | /* Subtract the accumulated time from the main time span. */ | |
| 148 | int num_worker_threads = thread_instance::get_num_threads(); | |
| 149 | #ifdef APEX_HAVE_HPX | |
| 150 | num_worker_threads = num_worker_threads - num_non_worker_threads_registered; | |
| 151 | #endif | |
| 152 | std::chrono::duration<double> time_span = | |
| 153 | std::chrono::duration_cast<std::chrono::duration<double>> | |
| 154 | (MYCLOCK::now() - main_timer->start); | |
| 155 | double total_main = time_span.count() * | |
| 156 | fmin(hardware_concurrency(), num_worker_threads); | |
| 157 | double elapsed = total_main - non_idle_time; | |
| 158 | double rate = elapsed > 0.0 ? ((elapsed/total_main)) : 0.0; | |
| 159 | profile * theprofile = new profile(rate, 0, NULL, false); | |
| 160 | return theprofile; | |
| 161 | } | |
| 162 | ||
| 163 | /* Return the requested profile object to the user. | |
| 164 | * Return nullptr if doesn't exist. */ | |
| 165 | profile * profiler_listener::get_profile(task_identifier &id) { | |
| 166 | if (id.name == string(APEX_IDLE_RATE)) { | |
| 167 | return get_idle_rate(); | |
| 168 | } else if (id.name == string(APEX_IDLE_TIME)) { | |
| 169 | return get_idle_time(); | |
| 170 | } else if (id.name == string(APEX_NON_IDLE_TIME)) { | |
| 171 | profile * theprofile = new profile(get_non_idle_time(), 0, NULL, false); | |
| 172 | return theprofile; | |
| 173 | } | |
| 174 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
| 175 | unordered_map<task_identifier, profile*>::const_iterator it = task_map.find(id); | |
| 176 | if (it != task_map.end()) { | |
| 177 | return (*it).second; | |
| 178 | } | |
| 179 | return nullptr; | |
| 180 | } | |
| 181 | ||
| 182 | void profiler_listener::reset_all(void) { | |
| 183 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
| 184 | for(auto &it : task_map) { | |
| 185 | it.second->reset(); | |
| 186 | } | |
| 187 | } | |
| 188 | ||
| 189 | /* After the consumer thread pulls a profiler off of the queue, | |
| 190 | * process it by updating its profile object in the map of profiles. */ | |
| 191 | // TODO The name-based timer and address-based timer paths through | |
| 192 | // the code involve a lot of duplication -- this should be refactored | |
| 193 | // to remove the duplication so it's easier to maintain. | |
| 194 | unsigned int profiler_listener::process_profile(std::shared_ptr<profiler> &p, unsigned int tid) | |
| 195 | { | |
| 196 | if(p == nullptr) return 0; | |
| 197 | profile * theprofile; | |
| 198 | if(p->is_reset == reset_type::ALL) { | |
| 199 | reset_all(); | |
| 200 | return 0; | |
| 201 | } | |
| 202 | double values[8] = {0}; | |
| 203 | double tmp_num_counters = 0; | |
| 204 | #if APEX_HAVE_PAPI | |
| 205 | tmp_num_counters = num_papi_counters; | |
| 206 | for (int i = 0 ; i < num_papi_counters ; i++) { | |
| 207 | if (p->papi_stop_values[i] > p->papi_start_values[i]) { | |
| 208 | values[i] = p->papi_stop_values[i] - p->papi_start_values[i]; | |
| 209 | } else { | |
| 210 | values[i] = 0.0; | |
| 211 | } | |
| 212 | } | |
| 213 | #endif | |
| 214 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex, std::defer_lock); | |
| 215 | // There is only one consumer thread except during shutdown, so we only need | |
| 216 | // to lock during shutdown. | |
| 217 | bool did_lock = false; | |
| 218 | if(_done) { | |
| 219 | task_map_lock.lock(); | |
| 220 | did_lock = true; | |
| 221 | } | |
| 222 | unordered_map<task_identifier, profile*>::const_iterator it = task_map.find(*(p->task_id)); | |
| 223 | if (it != task_map.end()) { | |
| 224 | // A profile for this ID already exists. | |
| 225 | theprofile = (*it).second; | |
| 226 | if(_done && did_lock) { | |
| 227 | task_map_lock.unlock(); | |
| 228 | } | |
| 229 | if(p->is_reset == reset_type::CURRENT) { | |
| 230 | theprofile->reset(); | |
| 231 | } else { | |
| 232 | theprofile->increment(p->elapsed(), tmp_num_counters, values, p->is_resume); | |
| 233 | } | |
| 234 | #if defined(APEX_THROTTLE) | |
| 235 | // Is this a lightweight task? If so, we shouldn't measure it any more, | |
| 236 | // in order to reduce overhead. | |
| 237 | if (theprofile->get_calls() > APEX_THROTTLE_CALLS && | |
| 238 | theprofile->get_mean() < APEX_THROTTLE_PERCALL) { | |
| 239 | unordered_set<task_identifier>::const_iterator it2; | |
| 240 | { | |
| 241 | read_lock_type l(throttled_event_set_mutex); | |
| 242 | it2 = throttled_tasks.find(*(p->task_id)); | |
| 243 | } | |
| 244 | if (it2 == throttled_tasks.end()) { | |
| 245 | // lock the set for insert | |
| 246 | { | |
| 247 | write_lock_type l(throttled_event_set_mutex); | |
| 248 | // was it inserted when we were waiting? | |
| 249 | it2 = throttled_tasks.find(*(p->task_id)); | |
| 250 | // no? OK - insert it. | |
| 251 | if (it2 == throttled_tasks.end()) { | |
| 252 | throttled_tasks.insert(*(p->task_id)); | |
| 253 | } | |
| 254 | } | |
| 255 | if (apex_options::use_screen_output()) { | |
| 256 | cout << "APEX: disabling lightweight timer " | |
| 257 | << p->task_id->get_name() | |
| 258 | << endl; | |
| 259 | fflush(stdout); | |
| 260 | } | |
| 261 | } | |
| 262 | } | |
| 263 | #endif | |
| 264 | } else { | |
| 265 | // Create a new profile for this name. | |
| 266 | theprofile = new profile(p->is_reset == reset_type::CURRENT ? 0.0 : p->elapsed(), tmp_num_counters, values, p->is_resume, p->is_counter ? APEX_COUNTER : APEX_TIMER); | |
| 267 | task_map[*(p->task_id)] = theprofile; | |
| 268 | if(_done && did_lock) { | |
| 269 | task_map_lock.unlock(); | |
| 270 | } | |
| 271 | #ifdef APEX_HAVE_HPX | |
| 272 | #ifdef APEX_REGISTER_HPX3_COUNTERS | |
| 273 | if(!_done) { | |
| 274 | if(get_hpx_runtime_ptr() != nullptr && p->task_id->has_name()) { | |
| 275 | std::string timer_name(p->task_id->get_name()); | |
| 276 | //Don't register timers containing "/" | |
| 277 | if(timer_name.find("/") == std::string::npos) { | |
| 278 | hpx::performance_counters::install_counter_type( | |
| 279 | std::string("/apex/") + timer_name, | |
| 280 | [p](bool r)->boost::int64_t{ | |
| 281 | boost::int64_t value(p->elapsed()); | |
| 282 | return value; | |
| 283 | }, | |
| 284 | std::string("APEX counter ") + timer_name, | |
| 285 | "" | |
| 286 | ); | |
| 287 | } | |
| 288 | } else { | |
| 289 | std::cerr << "HPX runtime not initialized yet." << std::endl; | |
| 290 | } | |
| 291 | } | |
| 292 | #endif | |
| 293 | #endif | |
| 294 | } | |
| 295 | #if !defined(_MSC_VER) | |
| 296 | /* write the sample to the file */ | |
| 297 | if (apex_options::task_scatterplot()) { | |
| 298 | if (!p->is_counter) { | |
| 299 | static int thresh = RAND_MAX/100; | |
| 300 | if (std::rand() < thresh) { | |
| 301 | std::unique_lock<std::mutex> task_map_lock(_mtx); | |
| 302 | task_scatterplot_samples << p->normalized_timestamp() << " " | |
| 303 | << p->elapsed()*profiler::get_cpu_mhz()*1000000 << " " | |
| 304 | << "'" << p->task_id->get_name() << "'" << endl; | |
| 305 | int loc0 = task_scatterplot_samples.tellp(); | |
| 306 | if (loc0 > 32768) { | |
| 307 | // lock access to the file | |
| 308 | // write using low-level file locking! | |
| 309 | struct flock fl; | |
| 310 | fl.l_type = F_WRLCK; /* F_RDLCK, F_WRLCK, F_UNLCK */ | |
| 311 | fl.l_whence = SEEK_SET; /* SEEK_SET, SEEK_CUR, SEEK_END */ | |
| 312 | fl.l_start = 0; /* Offset from l_whence */ | |
| 313 | fl.l_len = 0; /* length, 0 = to EOF */ | |
| 314 | fl.l_pid = getpid(); /* our PID */ | |
| 315 | fcntl(task_scatterplot_sample_file, F_SETLKW, &fl); /* F_GETLK, F_SETLK, F_SETLKW */ | |
| 316 | // flush the string stream to the file | |
| 317 | //lseek(task_scatterplot_sample_file, 0, SEEK_END); | |
| 318 | ssize_t bytes_written = write(task_scatterplot_sample_file, | |
| 319 | task_scatterplot_samples.str().c_str(), loc0); | |
| 320 | if (bytes_written < 0) { | |
| 321 | int errsv = errno; | |
| 322 | perror("Error writing to scatterplot!"); | |
| 323 | fprintf(stderr, "Error writing scatterplot:\n%s\n", | |
| 324 | strerror(errsv)); | |
| 325 | } | |
| 326 | fl.l_type = F_UNLCK; /* tell it to unlock the region */ | |
| 327 | fcntl(task_scatterplot_sample_file, F_SETLK, &fl); /* set the region to unlocked */ | |
| 328 | // reset the stringstream | |
| 329 | task_scatterplot_samples.str(""); | |
| 330 | } | |
| 331 | } | |
| 332 | } | |
| 333 | } | |
| 334 | #endif | |
| 335 | return 1; | |
| 336 | } | |
| 337 | ||
| 338 | inline unsigned int profiler_listener::process_dependency(task_dependency* td) | |
| 339 | { | |
| 340 | unordered_map<task_identifier, unordered_map<task_identifier, int>* >::const_iterator it = task_dependencies.find(td->parent); | |
| 341 | unordered_map<task_identifier, int> * depend; | |
| 342 | // if this is a new dependency for this parent? | |
| 343 | if (it == task_dependencies.end()) { | |
| 344 | depend = new unordered_map<task_identifier, int>(); | |
| 345 | (*depend)[td->child] = 1; | |
| 346 | task_dependencies[td->parent] = depend; | |
| 347 | // otherwise, see if this parent has seen this child | |
| 348 | } else { | |
| 349 | depend = it->second; | |
| 350 | unordered_map<task_identifier, int>::const_iterator it2 = depend->find(td->child); | |
| 351 | // first time for this child | |
| 352 | if (it2 == depend->end()) { | |
| 353 | (*depend)[td->child] = 1; | |
| 354 | // not the first time for this child | |
| 355 | } else { | |
| 356 | int tmp = it2->second; | |
| 357 | (*depend)[td->child] = tmp + 1; | |
| 358 | } | |
| 359 | } | |
| 360 | delete(td); | |
| 361 | return 1; | |
| 362 | } | |
| 363 | ||
| 364 | /* Cleaning up memory. Not really necessary, because it only gets | |
| 365 | * called at shutdown. But a good idea to do regardless. */ | |
| 366 | void profiler_listener::delete_profiles(void) { | |
| 367 | // iterate over the map and free the objects in the map | |
| 368 | unordered_map<task_identifier, profile*>::const_iterator it; | |
| 369 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
| 370 | for(it = task_map.begin(); it != task_map.end(); it++) { | |
| 371 | delete it->second; | |
| 372 | } | |
| 373 | // clear the map. | |
| 374 | task_map.clear(); | |
| 375 | ||
| 376 | } | |
| 377 | ||
| 378 | #define PAD_WITH_SPACES "%8s" | |
| 379 | #define FORMAT_PERCENT "%8.3f" | |
| 380 | #define FORMAT_SCIENTIFIC "%1.2e" | |
| 381 | ||
| 382 | template<typename ... Args> | |
| 383 | string string_format( const std::string& format, Args ... args ) | |
| 384 | { | |
| 385 | size_t size = snprintf( nullptr, 0, format.c_str(), args ... ) + 1; // Extra space for '\0' | |
| 386 | unique_ptr<char[]> buf( new char[ size ] ); | |
| 387 | snprintf( buf.get(), size, format.c_str(), args ... ); | |
| 388 | return string( buf.get(), buf.get() + size - 1 ); // We don't want the '\0' inside | |
| 389 | } | |
| 390 | ||
| 391 | void profiler_listener::write_one_timer(task_identifier &task_id, | |
| 392 | profile * p, stringstream &screen_output, | |
| 393 | stringstream &csv_output, double &total_accumulated, | |
| 394 | double &total_main) { | |
| 395 | string action_name = task_id.get_name(); | |
| 396 | string shorter(action_name); | |
| 397 | // to keep formatting pretty, trim any long timer names | |
| 398 | if (shorter.size() > 30) { | |
| 399 | shorter.resize(27); | |
| 400 | shorter.resize(30, '.'); | |
| 401 | } | |
| 402 | //screen_output << "\"" << shorter << "\", " ; | |
| 403 | screen_output << string_format("%30s", shorter.c_str()) << " : "; | |
| 404 | #if defined(APEX_THROTTLE) | |
| 405 | // if this profile was throttled, don't output the measurements. | |
| 406 | // they are limited and bogus, anyway. | |
| 407 | unordered_set<task_identifier>::const_iterator it4; | |
| 408 | { | |
| 409 | read_lock_type l(throttled_event_set_mutex); | |
| 410 | it4 = throttled_tasks.find(task_id); | |
| 411 | } | |
| 412 | if (it4!= throttled_tasks.end()) { | |
| 413 | screen_output << "DISABLED (high frequency, short duration)" << endl; | |
| 414 | return; | |
| 415 | } | |
| 416 | #endif | |
| 417 | if(p->get_calls() < 1) { | |
| 418 | p->get_profile()->calls = 1; | |
| 419 | } | |
| 420 | if (p->get_calls() < 999999) { | |
| 421 | screen_output << string_format(PAD_WITH_SPACES, to_string((int)p->get_calls()).c_str()) << " " ; | |
| 422 | } else { | |
| 423 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_calls()) << " " ; | |
| 424 | } | |
| 425 | if (p->get_type() == APEX_TIMER) { | |
| 426 | csv_output << "\"" << action_name << "\","; | |
| 427 | csv_output << llround(p->get_calls()) << ","; | |
| 428 | // convert MHz to Hz | |
| 429 | csv_output << std::llround(p->get_accumulated()) << ","; | |
| 430 | // convert MHz to microseconds | |
| 431 | csv_output << std::llround(p->get_accumulated()*profiler::get_cpu_mhz()*1000000); | |
| 432 | screen_output << " --n/a-- " ; | |
| 433 | screen_output << string_format(FORMAT_SCIENTIFIC, (p->get_mean()*profiler::get_cpu_mhz())) << " " ; | |
| 434 | screen_output << " --n/a-- " ; | |
| 435 | screen_output << string_format(FORMAT_SCIENTIFIC, (p->get_accumulated()*profiler::get_cpu_mhz())) << " " ; | |
| 436 | screen_output << " --n/a-- " ; | |
| 437 | screen_output << string_format(FORMAT_PERCENT, (((p->get_accumulated()*profiler::get_cpu_mhz())/total_main)*100)); | |
| 438 | #if APEX_HAVE_PAPI | |
| 439 | for (int i = 0 ; i < num_papi_counters ; i++) { | |
| 440 | screen_output << " " << string_format(FORMAT_SCIENTIFIC, (p->get_papi_metrics()[i])); | |
| 441 | csv_output << "," << std::llround(p->get_papi_metrics()[i]); | |
| 442 | } | |
| 443 | #endif | |
| 444 | screen_output << endl; | |
| 445 | total_accumulated += p->get_accumulated(); | |
| 446 | csv_output << endl; | |
| 447 | } else { | |
| 448 | if (action_name.find('%') == string::npos) { | |
| 449 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_minimum()) << " " ; | |
| 450 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_mean()) << " " ; | |
| 451 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_maximum()) << " " ; | |
| 452 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_accumulated()) << " " ; | |
| 453 | screen_output << string_format(FORMAT_SCIENTIFIC, p->get_stddev()) << " " ; | |
| 454 | } else { | |
| 455 | screen_output << string_format(FORMAT_PERCENT, p->get_minimum()) << " " ; | |
| 456 | screen_output << string_format(FORMAT_PERCENT, p->get_mean()) << " " ; | |
| 457 | screen_output << string_format(FORMAT_PERCENT, p->get_maximum()) << " " ; | |
| 458 | screen_output << string_format(FORMAT_PERCENT, p->get_accumulated()) << " " ; | |
| 459 | screen_output << string_format(FORMAT_PERCENT, p->get_stddev()) << " " ; | |
| 460 | } | |
| 461 | screen_output << " --n/a-- " << endl; | |
| 462 | } | |
| 463 | } | |
| 464 | ||
| 465 | /* At program termination, write the measurements to the screen, or to CSV file, or both. */ | |
| 466 | void profiler_listener::finalize_profiles(void) { | |
| 467 | // our TOTAL available time is the elapsed * the number of threads, or cores | |
| 468 | int num_worker_threads = thread_instance::get_num_threads(); | |
| 469 | double wall_clock_main = main_timer->elapsed() * profiler::get_cpu_mhz(); | |
| 470 | #ifdef APEX_HAVE_HPX | |
| 471 | num_worker_threads = num_worker_threads - num_non_worker_threads_registered; | |
| 472 | #endif | |
| 473 | double total_main = wall_clock_main * | |
| 474 | fmin(hardware_concurrency(), num_worker_threads); | |
| 475 | // create a stringstream to hold all the screen output - we may not | |
| 476 | // want to write it out | |
| 477 | stringstream screen_output; | |
| 478 | // create a stringstream to hold all the CSV output - we may not | |
| 479 | // want to write it out | |
| 480 | stringstream csv_output; | |
| 481 | // iterate over the profiles in the address map | |
| 482 | screen_output << "Elapsed time: " << wall_clock_main << endl; | |
| 483 | screen_output << "Cores detected: " << hardware_concurrency() << endl; | |
| 484 | screen_output << "Worker Threads observed: " << num_worker_threads << endl; | |
| 485 | screen_output << "Available CPU time: " << total_main << endl; | |
| 486 | map<apex_function_address, profile*>::const_iterator it; | |
| 487 | screen_output << "Action : #calls | minimum | mean | maximum | total | stddev | % total " << apex_options::papi_metrics() << endl; | |
| 488 | screen_output << "------------------------------------------------------------------------------------------------------------" << endl; | |
| 489 | csv_output << "\"task\",\"num calls\",\"total cycles\",\"total microseconds\""; | |
| 490 | #if APEX_HAVE_PAPI | |
| 491 | for (int i = 0 ; i < num_papi_counters ; i++) { | |
| 492 | csv_output << ",\"" << metric_names[i] << "\""; | |
| 493 | } | |
| 494 | #endif | |
| 495 | csv_output << endl; | |
| 496 | double total_accumulated = 0.0; | |
| 497 | unordered_map<task_identifier, profile*>::const_iterator it2; | |
| 498 | std::vector<task_identifier> id_vector; | |
| 499 | // iterate over the counters, and sort their names | |
| 500 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
| 501 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
| 502 | task_identifier task_id = it2->first; | |
| 503 | profile * p = it2->second; | |
| 504 | if (p->get_type() != APEX_TIMER) { | |
| 505 | id_vector.push_back(task_id); | |
| 506 | } | |
| 507 | } | |
| 508 | std::sort(id_vector.begin(), id_vector.end()); | |
| 509 | // iterate over the counters | |
| 510 | for(task_identifier task_id : id_vector) { | |
| 511 | profile * p = task_map[task_id]; | |
| 512 | if (p) { | |
| 513 | write_one_timer(task_id, p, screen_output, csv_output, total_accumulated, total_main); | |
| 514 | } | |
| 515 | } | |
| 516 | id_vector.clear(); | |
| 517 | // iterate over the timers, and sort their names | |
| 518 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
| 519 | profile * p = it2->second; | |
| 520 | task_identifier task_id = it2->first; | |
| 521 | if (p->get_type() == APEX_TIMER) { | |
| 522 | id_vector.push_back(task_id); | |
| 523 | } | |
| 524 | } | |
| 525 | // iterate over the timers | |
| 526 | std::sort(id_vector.begin(), id_vector.end()); | |
| 527 | // iterate over the counters | |
| 528 | for(task_identifier task_id : id_vector) { | |
| 529 | profile * p = task_map[task_id]; | |
| 530 | if (p) { | |
| 531 | write_one_timer(task_id, p, screen_output, csv_output, total_accumulated, total_main); | |
| 532 | } | |
| 533 | } | |
| 534 | double idle_rate = total_main - (total_accumulated*profiler::get_cpu_mhz()); | |
| 535 | screen_output << string_format("%30s", APEX_IDLE_TIME) << " : "; | |
| 536 | screen_output << " --n/a-- " ; | |
| 537 | screen_output << " --n/a-- " ; | |
| 538 | screen_output << " --n/a-- " ; | |
| 539 | screen_output << " --n/a-- " ; | |
| 540 | if (idle_rate < 0.0) { | |
| 541 | screen_output << " --n/a-- " ; | |
| 542 | } else { | |
| 543 | screen_output << string_format(FORMAT_SCIENTIFIC, idle_rate) << " " ; | |
| 544 | } | |
| 545 | screen_output << " --n/a-- " ; | |
| 546 | if (idle_rate < 0.0) { | |
| 547 | screen_output << " --n/a-- " << endl; | |
| 548 | } else { | |
| 549 | screen_output << string_format(FORMAT_PERCENT, ((idle_rate/total_main)*100)) << endl; | |
| 550 | } | |
| 551 | screen_output << "------------------------------------------------------------------------------------------------------------" << endl; | |
| 552 | if (apex_options::use_screen_output()) { | |
| 553 | cout << screen_output.str(); | |
| 554 | } | |
| 555 | if (apex_options::use_csv_output()) { | |
| 556 | ofstream csvfile; | |
| 557 | stringstream csvname; | |
| 558 | csvname << "apex." << node_id << ".csv"; | |
| 559 | csvfile.open(csvname.str(), ios::out); | |
| 560 | csvfile << csv_output.str(); | |
| 561 | csvfile.close(); | |
| 562 | } | |
| 563 | } | |
| 564 | ||
| 565 | /* The following code is from: | |
| 566 | http://stackoverflow.com/questions/7706339/grayscale-to-red-green-blue-matlab-jet-color-scale */ | |
| 567 | class node_color { | |
| 568 | public: | |
| 569 | double red; | |
| 570 | double green; | |
| 571 | double blue; | |
| 572 | node_color() : red(1.0), green(1.0), blue(1.0) {} | |
| 573 | int convert(double in) { return (int)(in * 255.0); } | |
| 574 | } ; | |
| 575 | ||
| 576 | node_color * get_node_color(double v,double vmin,double vmax) | |
| 577 | { | |
| 578 | node_color * c = new node_color(); | |
| 579 | double dv; | |
| 580 | ||
| 581 | if (v < vmin) | |
| 582 | v = vmin; | |
| 583 | if (v > vmax) | |
| 584 | v = vmax; | |
| 585 | dv = vmax - vmin; | |
| 586 | ||
| 587 | if (v < (vmin + 0.25 * dv)) { | |
| 588 | c->red = 0; | |
| 589 | c->green = 4 * (v - vmin) / dv; | |
| 590 | } else if (v < (vmin + 0.5 * dv)) { | |
| 591 | c->red = 0; | |
| 592 | c->blue = 1 + 4 * (vmin + 0.25 * dv - v) / dv; | |
| 593 | } else if (v < (vmin + 0.75 * dv)) { | |
| 594 | c->red = 4 * (v - vmin - 0.5 * dv) / dv; | |
| 595 | c->blue = 0; | |
| 596 | } else { | |
| 597 | c->green = 1 + 4 * (vmin + 0.75 * dv - v) / dv; | |
| 598 | c->blue = 0; | |
| 599 | } | |
| 600 | ||
| 601 | return(c); | |
| 602 | } | |
| 603 | ||
| 604 | void profiler_listener::write_taskgraph(void) { | |
| 605 | ofstream myfile; | |
| 606 | stringstream dotname; | |
| 607 | dotname << "taskgraph." << node_id << ".dot"; | |
| 608 | myfile.open(dotname.str().c_str()); | |
| 609 | ||
| 610 | myfile << "digraph prof {\n rankdir=\"LR\";\n node [shape=box];\n"; | |
| 611 | for(auto dep = task_dependencies.begin(); dep != task_dependencies.end(); dep++) { | |
| 612 | task_identifier parent = dep->first; | |
| 613 | auto children = dep->second; | |
| 614 | string parent_name = parent.get_name(); | |
| 615 | for(auto offspring = children->begin(); offspring != children->end(); offspring++) { | |
| 616 | task_identifier child = offspring->first; | |
| 617 | int count = offspring->second; | |
| 618 | string child_name = child.get_name(); | |
| 619 | myfile << " \"" << parent_name << "\" -> \"" << child_name << "\""; | |
| 620 | myfile << " [ label=\" count: " << count << "\" ]; " << std::endl; | |
| 621 | ||
| 622 | } | |
| 623 | } | |
| 624 | ||
| 625 | // our TOTAL available time is the elapsed * the number of threads, or cores | |
| 626 | int num_worker_threads = thread_instance::get_num_threads(); | |
| 627 | #ifdef APEX_HAVE_HPX | |
| 628 | num_worker_threads = num_worker_threads - num_non_worker_threads_registered; | |
| 629 | #endif | |
| 630 | double total_main = main_timer->elapsed() * | |
| 631 | fmin(hardware_concurrency(), num_worker_threads); | |
| 632 | ||
| 633 | // output nodes with "main" [shape=box; style=filled; fillcolor="#ff0000" ]; | |
| 634 | unordered_map<task_identifier, profile*>::const_iterator it; | |
| 635 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
| 636 | for(it = task_map.begin(); it != task_map.end(); it++) { | |
| 637 | profile * p = it->second; | |
| 638 | if (p->get_type() == APEX_TIMER) { | |
| 639 | node_color * c = get_node_color((p->get_accumulated()*profiler::get_cpu_mhz()), 0.0, total_main); | |
| 640 | task_identifier task_id = it->first; | |
| 641 | myfile << " \"" << task_id.get_name() << "\" [shape=box; style=filled; fillcolor=\"#" << | |
| 642 | setfill('0') << setw(2) << hex << c->convert(c->red) << | |
| 643 | setfill('0') << setw(2) << hex << c->convert(c->green) << | |
| 644 | setfill('0') << setw(2) << hex << c->convert(c->blue) << "\"" << | |
| 645 | "; label=\"" << task_id.get_name() << ":\\n" << (p->get_accumulated()*profiler::get_cpu_mhz()) << "s\" ];" << std::endl; | |
| 646 | } | |
| 647 | } | |
| 648 | myfile << "}\n"; | |
| 649 | myfile.close(); | |
| 650 | } | |
| 651 | ||
| 652 | /* When writing a TAU profile, write out a timer line */ | |
| 653 | void format_line(ofstream &myfile, profile * p) { | |
| 654 | myfile << p->get_calls() << " "; | |
| 655 | myfile << 0 << " "; | |
| 656 | myfile << ((p->get_accumulated()*profiler::get_cpu_mhz())) << " "; | |
| 657 | myfile << ((p->get_accumulated()*profiler::get_cpu_mhz())) << " "; | |
| 658 | myfile << 0 << " "; | |
| 659 | myfile << "GROUP=\"TAU_USER\" "; | |
| 660 | myfile << endl; | |
| 661 | } | |
| 662 | ||
| 663 | /* When writing a TAU profile, write out the main timer line */ | |
| 664 | void format_line(ofstream &myfile, profile * p, double not_main) { | |
| 665 | myfile << p->get_calls() << " "; | |
| 666 | myfile << 0 << " "; | |
| 667 | myfile << (max(((p->get_accumulated()*profiler::get_cpu_mhz()) - not_main),0.0)) << " "; | |
| 668 | myfile << ((p->get_accumulated()*profiler::get_cpu_mhz())) << " "; | |
| 669 | myfile << 0 << " "; | |
| 670 | myfile << "GROUP=\"TAU_USER\" "; | |
| 671 | myfile << endl; | |
| 672 | } | |
| 673 | ||
| 674 | /* When writing a TAU profile, write out a counter line */ | |
| 675 | void format_counter_line(ofstream &myfile, profile * p) { | |
| 676 | myfile << p->get_calls() << " "; // numevents | |
| 677 | myfile << p->get_maximum() << " "; // max | |
| 678 | myfile << p->get_minimum() << " "; // min | |
| 679 | myfile << p->get_mean() << " "; // mean | |
| 680 | myfile << p->get_sum_squares() << " "; | |
| 681 | myfile << endl; | |
| 682 | } | |
| 683 | ||
| 684 | /* Write TAU profiles from the collected data. */ | |
| 685 | void profiler_listener::write_profile() { | |
| 686 | ofstream myfile; | |
| 687 | stringstream datname; | |
| 688 | // name format: profile.nodeid.contextid.threadid | |
| 689 | // We only write one profile per process | |
| 690 | datname << "profile." << node_id << ".0.0"; | |
| 691 | ||
| 692 | // name format: profile.nodeid.contextid.threadid | |
| 693 | myfile.open(datname.str().c_str()); | |
| 694 | int counter_events = 0; | |
| 695 | ||
| 696 | // Determine number of counter events, as these need to be | |
| 697 | // excluded from the number of normal timers | |
| 698 | unordered_map<task_identifier, profile*>::const_iterator it2; | |
| 699 | std::unique_lock<std::mutex> task_map_lock(_task_map_mutex); | |
| 700 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
| 701 | profile * p = it2->second; | |
| 702 | if(p->get_type() == APEX_COUNTER) { | |
| 703 | counter_events++; | |
| 704 | } | |
| 705 | } | |
| 706 | int function_count = task_map.size() - counter_events; | |
| 707 | ||
| 708 | // Print the normal timers to the profile file | |
| 709 | // 1504 templated_functions_MULTI_TIME | |
| 710 | myfile << function_count << " templated_functions_MULTI_TIME" << endl; | |
| 711 | // # Name Calls Subrs Excl Incl ProfileCalls # | |
| 712 | myfile << "# Name Calls Subrs Excl Incl ProfileCalls #" << endl; | |
| 713 | thread_instance ti = thread_instance::instance(); | |
| 714 | ||
| 715 | // Iterate over the profiles which are associated to a function | |
| 716 | // by name. Only output the regular timers now. Counters are | |
| 717 | // in a separate section, below. | |
| 718 | profile * mainp = nullptr; | |
| 719 | double not_main = 0.0; | |
| 720 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
| 721 | profile * p = it2->second; | |
| 722 | task_identifier task_id = it2->first; | |
| 723 | if(p->get_type() == APEX_TIMER) { | |
| 724 | string action_name = task_id.get_name(); | |
| 725 | if(strcmp(action_name.c_str(), APEX_MAIN) == 0) { | |
| 726 | mainp = p; | |
| 727 | } else { | |
| 728 | myfile << "\"" << action_name << "\" "; | |
| 729 | format_line (myfile, p); | |
| 730 | not_main += (p->get_accumulated()*profiler::get_cpu_mhz()); | |
| 731 | } | |
| 732 | } | |
| 733 | } | |
| 734 | if (mainp != nullptr) { | |
| 735 | myfile << "\"" << APEX_MAIN << "\" "; | |
| 736 | format_line (myfile, mainp, not_main); | |
| 737 | } | |
| 738 | ||
| 739 | // 0 aggregates | |
| 740 | myfile << "0 aggregates" << endl; | |
| 741 | ||
| 742 | // Now process the counters, if there are any. | |
| 743 | if(counter_events > 0) { | |
| 744 | myfile << counter_events << " userevents" << endl; | |
| 745 | myfile << "# eventname numevents max min mean sumsqr" << endl; | |
| 746 | for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { | |
| 747 | profile * p = it2->second; | |
| 748 | if(p->get_type() == APEX_COUNTER) { | |
| 749 | task_identifier task_id = it2->first; | |
| 750 | myfile << "\"" << task_id.get_name() << "\" "; | |
| 751 | format_counter_line (myfile, p); | |
| 752 | } | |
| 753 | } | |
| 754 | } | |
| 755 | myfile.close(); | |
| 756 | } | |
| 757 | ||
| 758 | /* | |
| 759 | * The main function for the consumer thread has to be static, but | |
| 760 | * the processing needs access to member variables, so get the | |
| 761 | * profiler_listener instance, and call it's proper function. | |
| 762 | * | |
| 763 | * This is a wrapper, so that we can launch the thread and set | |
| 764 | * affinity. However, process_profiles_wrapper is also used by the | |
| 765 | * last worker that calls apex_finalize(), so we don't want to change | |
| 766 | * that thread's affinity. So this wrapper is only for the consumer | |
| 767 | * thread. | |
| 768 | */ | |
| 769 | void profiler_listener::consumer_process_profiles_wrapper(void) { | |
| 770 | if (apex_options::pin_apex_threads()) { | |
| 771 | set_thread_affinity(); | |
| 772 | } | |
| 773 | process_profiles_wrapper(); | |
| 774 | } | |
| 775 | ||
| 776 | /* | |
| 777 | * The main function for the consumer thread has to be static, but | |
| 778 | * the processing needs access to member variables, so get the | |
| 779 | * profiler_listener instance, and call it's proper function. | |
| 780 | */ | |
| 781 | void profiler_listener::process_profiles_wrapper(void) { | |
| 782 | apex * inst = apex::instance(); | |
| 783 | if (inst != nullptr) { | |
| 784 | profiler_listener * pl = inst->the_profiler_listener; | |
| 785 | if (pl != nullptr) { | |
| 786 | #ifdef APEX_TRACE_APEX | |
| 787 | profiler * p = start((apex_function_address)&profiler_listener::process_profiles_wrapper); | |
| 788 | pl->process_profiles(); | |
| 789 | stop(p); | |
| 790 | #else | |
| 791 | pl->process_profiles(); | |
| 792 | #endif | |
| 793 | } | |
| 794 | } | |
| 795 | } | |
| 796 | ||
| 797 | bool profiler_listener::concurrent_cleanup(void){ | |
| 798 | std::shared_ptr<profiler> p; | |
| 799 | #ifdef APEX_MULTIPLE_QUEUES | |
| 800 | std::unique_lock<std::mutex> queue_lock(queue_mtx); | |
| 801 | std::vector<profiler_queue_t*>::const_iterator a_queue; | |
| 802 | for (a_queue = allqueues.begin() ; a_queue != allqueues.end() ; ++a_queue) { | |
| 803 | thequeue = *a_queue; | |
| 804 | while(thequeue->try_dequeue(p)) { | |
| 805 | process_profile(p,0); | |
| 806 | } | |
| 807 | } | |
| 808 | #else | |
| 809 | while(thequeue.try_dequeue(p)) { | |
| 810 | process_profile(p,0); | |
| 811 | } | |
| 812 | #endif | |
| 813 | return true; | |
| 814 | } | |
| 815 | ||
| 816 | /* This is the main function for the consumer thread. | |
| 817 | * It will wait at a semaphore for pending work. When there is | |
| 818 | * work on one or more queues, it will iterate over the queues | |
| 819 | * and process the pending profiler objects, updating the profiles | |
| 820 | * as it goes. */ | |
| 821 | void profiler_listener::process_profiles(void) | |
| 822 | { | |
| 823 | if (!_initialized) { | |
| 824 | initialize_worker_thread_for_TAU(); | |
| 825 | _initialized = true; | |
| 826 | } | |
| 827 | #ifdef APEX_HAVE_TAU | |
| 828 | if (apex_options::use_tau()) { | |
| 829 | TAU_START("profiler_listener::process_profiles"); | |
| 830 | } | |
| 831 | #endif | |
| 832 | ||
| 833 | std::shared_ptr<profiler> p; | |
| 834 | task_dependency* td; | |
| 835 | // Main loop. Stay in this loop unless "done". | |
| 836 | #ifndef APEX_HAVE_HPX | |
| 837 | while (!_done) { | |
| 838 | queue_signal.wait(); | |
| 839 | #endif | |
| 840 | #ifdef APEX_HAVE_TAU | |
| 841 | /* | |
| 842 | if (apex_options::use_tau()) { | |
| 843 | TAU_START("profiler_listener::process_profiles: main loop"); | |
| 844 | } | |
| 845 | */ | |
| 846 | #endif | |
| 847 | #ifdef APEX_MULTIPLE_QUEUES | |
| 848 | std::vector<profiler_queue_t*>::const_iterator a_queue; | |
| 849 | std::unique_lock<std::mutex> queue_lock(queue_mtx); | |
| 850 | int i = 0; | |
| 851 | for (a_queue = allqueues.begin() ; a_queue != allqueues.end() ; ++a_queue) { | |
| 852 | thequeue = *a_queue; | |
| 853 | while(!_done && thequeue->try_dequeue(p)) { | |
| 854 | process_profile(p, 0); | |
| 855 | #ifdef APEX_HAVE_HPX // don't hang out in this task too long. | |
| 856 | if (++i > 1000) break; | |
| 857 | #endif | |
| 858 | } | |
| 859 | } | |
| 860 | #else | |
| 861 | while(!_done && thequeue.try_dequeue(p)) { | |
| 862 | process_profile(p, 0); | |
| 863 | } | |
| 864 | #endif | |
| 865 | if (apex_options::use_taskgraph_output()) { | |
| 866 | while(!_done && dependency_queue.try_dequeue(td)) { | |
| 867 | process_dependency(td); | |
| 868 | } | |
| 869 | } | |
| 870 | /* | |
| 871 | * I want to process the tasks concurrently, but this loop | |
| 872 | * is too much overhead. Maybe dequeue them in batches? | |
| 873 | */ | |
| 874 | /* | |
| 875 | std::vector<std::future<void>> pending_futures; | |
| 876 | while(!_done && thequeue.try_dequeue(p)) { | |
| 877 | auto f = std::async(my_stupid_wrapper,p); | |
| 878 | // transfer the future's shared state to a longer-lived future | |
| 879 | pending_futures.push_back(std::move(f)); | |
| 880 | } | |
| 881 | for (auto iter = pending_futures.begin() ; iter < pending_futures.end() ; iter++ ) { | |
| 882 | iter->get(); | |
| 883 | } | |
| 884 | */ | |
| 885 | ||
| 886 | #ifdef APEX_HAVE_TAU | |
| 887 | /* | |
| 888 | if (apex_options::use_tau()) { | |
| 889 | TAU_STOP("profiler_listener::process_profiles: main loop"); | |
| 890 | } | |
| 891 | */ | |
| 892 | #endif | |
| 893 | #ifndef APEX_HAVE_HPX | |
| 894 | } | |
| 895 | ||
| 896 | if (apex_options::use_taskgraph_output()) { | |
| 897 | // process the task dependencies | |
| 898 | while(dependency_queue.try_dequeue(td)) { | |
| 899 | process_dependency(td); | |
| 900 | } | |
| 901 | } | |
| 902 | ||
| 903 | #endif // NOT DEFINED APEX_HAVE_HPX | |
| 904 | ||
| 905 | #ifdef APEX_HAVE_HPX | |
| 906 | consumer_task_running.clear(memory_order_release); | |
| 907 | #endif | |
| 908 | ||
| 909 | #ifdef APEX_HAVE_TAU | |
| 910 | if (apex_options::use_tau()) { | |
| 911 | TAU_STOP("profiler_listener::process_profiles"); | |
| 912 | } | |
| 913 | #endif | |
| 914 | } | |
| 915 | ||
| 916 | #if APEX_HAVE_PAPI | |
| 917 | APEX_NATIVE_TLS int EventSet = PAPI_NULL; | |
| 918 | enum papi_state { papi_running, papi_suspended }; | |
| 919 | APEX_NATIVE_TLS papi_state thread_papi_state = papi_suspended; | |
| 920 | #define PAPI_ERROR_CHECK(name) \ | |
| 921 | if (rc != 0) cout << "name: " << rc << ": " << PAPI_strerror(rc) << endl; | |
| 922 | ||
| 923 | void profiler_listener::initialize_PAPI(bool first_time) { | |
| 924 | int rc = 0; | |
| 925 | if (first_time) { | |
| 926 | PAPI_library_init( PAPI_VER_CURRENT ); | |
| 927 | //rc = PAPI_multiplex_init(); // use more counters than allowed | |
| 928 | //PAPI_ERROR_CHECK(PAPI_multiplex_init); | |
| 929 | PAPI_thread_init( thread_instance::get_id ); | |
| 930 | // default | |
| 931 | //rc = PAPI_set_domain(PAPI_DOM_ALL); | |
| 932 | //PAPI_ERROR_CHECK(PAPI_set_domain); | |
| 933 | } else { | |
| 934 | PAPI_register_thread(); | |
| 935 | } | |
| 936 | rc = PAPI_create_eventset(&EventSet); | |
| 937 | PAPI_ERROR_CHECK(PAPI_create_eventset); | |
| 938 | // default | |
| 939 | //rc = PAPI_assign_eventset_component (EventSet, 0); | |
| 940 | //PAPI_ERROR_CHECK(PAPI_assign_eventset_component); | |
| 941 | // default | |
| 942 | //rc = PAPI_set_granularity(PAPI_GRN_THR); | |
| 943 | //PAPI_ERROR_CHECK(PAPI_set_granularity); | |
| 944 | // unnecessary complexity | |
| 945 | //rc = PAPI_set_multiplex(EventSet); | |
| 946 | //PAPI_ERROR_CHECK(PAPI_set_multiplex); | |
| 947 | // parse the requested set of papi counters | |
| 948 | // The string is modified by strtok, so copy it. | |
| 949 | if (strlen(apex_options::papi_metrics()) > 0) { | |
| 950 | std::stringstream tmpstr(apex_options::papi_metrics()); | |
| 951 | // use stream iterators to copy the stream to the vector as whitespace separated strings | |
| 952 | std::istream_iterator<std::string> tmpstr_it(tmpstr); | |
| 953 | std::istream_iterator<std::string> tmpstr_end; | |
| 954 | std::vector<std::string> tmpstr_results(tmpstr_it, tmpstr_end); | |
| 955 | int code; | |
| 956 | // iterate over the counter names in the vector | |
| 957 | for (auto p : tmpstr_results) { | |
| 958 | int rc = PAPI_event_name_to_code(const_cast<char*>(p.c_str()), &code); | |
| 959 | if (PAPI_query_event (code) == PAPI_OK) { | |
| 960 | rc = PAPI_add_event(EventSet, code); | |
| 961 | PAPI_ERROR_CHECK(PAPI_add_event); | |
| 962 | if (rc != 0) { printf ("Event that failed: %s\n", p.c_str()); } | |
| 963 | if (first_time) { | |
| 964 | metric_names.push_back(string(p.c_str())); | |
| 965 | num_papi_counters++; | |
| 966 | } | |
| 967 | } | |
| 968 | } | |
| 969 | if (!apex_options::papi_suspend()) { | |
| 970 | rc = PAPI_start( EventSet ); | |
| 971 | PAPI_ERROR_CHECK(PAPI_start); | |
| 972 | thread_papi_state = papi_running; | |
| 973 | } | |
| 974 | } | |
| 975 | } | |
| 976 | ||
| 977 | #endif | |
| 978 | ||
| 979 | /* When APEX gets a STARTUP event, do some initialization. */ | |
| 980 | void profiler_listener::on_startup(startup_event_data &data) { | |
| 981 | if (!_done) { | |
| 982 | my_tid = (unsigned int)thread_instance::get_id(); | |
| 983 | #ifndef APEX_HAVE_HPX | |
| 984 | // Start the consumer thread, to process profiler objects. | |
| 985 | consumer_thread = new std::thread(consumer_process_profiles_wrapper); | |
| 986 | #endif | |
| 987 | ||
| 988 | #if APEX_HAVE_PAPI | |
| 989 | initialize_PAPI(true); | |
| 990 | event_sets[0] = EventSet; | |
| 991 | #endif | |
| 992 | ||
| 993 | /* This commented out code is to change the priority of the consumer thread. | |
| 994 | * IDEALLY, I would like to make this a low priority thread, but that is as | |
| 995 | * yet unsuccessful. */ | |
| 996 | #if 0 | |
| 997 | int retcode; | |
| 998 | int policy; | |
| 999 | ||
| 1000 | pthread_t threadID = (pthread_t) consumer_thread->native_handle(); | |
| 1001 | ||
| 1002 | struct sched_param param; | |
| 1003 | ||
| 1004 | if ((retcode = pthread_getschedparam(threadID, &policy, ¶m)) != 0) | |
| 1005 | { | |
| 1006 | errno = retcode; | |
| 1007 | perror("pthread_getschedparam"); | |
| 1008 | exit(EXIT_FAILURE); | |
| 1009 | } | |
| 1010 | std::cout << "INHERITED: "; | |
| 1011 | std::cout << "policy=" << ((policy == SCHED_FIFO) ? "SCHED_FIFO" : | |
| 1012 | (policy == SCHED_RR) ? "SCHED_RR" : | |
| 1013 | (policy == SCHED_OTHER) ? "SCHED_OTHER" : | |
| 1014 | "???") | |
| 1015 | << ", priority=" << param.sched_priority << " of " << sched_get_priority_min(policy) << "," << sched_get_priority_max(policy) << std::endl; | |
| 1016 | //param.sched_priority = 10; | |
| 1017 | if ((retcode = pthread_setschedparam(threadID, policy, ¶m)) != 0) | |
| 1018 | { | |
| 1019 | errno = retcode; | |
| 1020 | perror("pthread_setschedparam"); | |
| 1021 | exit(EXIT_FAILURE); | |
| 1022 | } | |
| 1023 | #endif | |
| 1024 | ||
| 1025 | // time the whole application. | |
| 1026 | main_timer = std::make_shared<profiler>(new task_identifier(string(APEX_MAIN))); | |
| 1027 | #if APEX_HAVE_PAPI | |
| 1028 | if (num_papi_counters > 0 && !apex_options::papi_suspend() && thread_papi_state == papi_running) { | |
| 1029 | int rc = PAPI_read( EventSet, main_timer->papi_start_values ); | |
| 1030 | PAPI_ERROR_CHECK(PAPI_read); | |
| 1031 | } | |
| 1032 | #endif | |
| 1033 | } | |
| 1034 | APEX_UNUSED(data); | |
| 1035 | } | |
| 1036 | ||
| 1037 | /* On the shutdown event, notify the consumer thread that we are done | |
| 1038 | * and set the "terminate" flag. */ | |
| 1039 | void profiler_listener::on_shutdown(shutdown_event_data &data) { | |
| 1040 | if (_done) { return; } | |
| 1041 | if (!_done) { | |
| 1042 | _done = true; | |
| 1043 | node_id = data.node_id; | |
| 1044 | //sleep(1); | |
| 1045 | #ifndef APEX_HAVE_HPX | |
| 1046 | queue_signal.post(); | |
| 1047 | if (consumer_thread != nullptr) { | |
| 1048 | consumer_thread->join(); | |
| 1049 | } | |
| 1050 | #endif | |
| 1051 | ||
| 1052 | // stop the main timer, and process that profile? | |
| 1053 | main_timer->stop(); | |
| 1054 | #if APEX_HAVE_PAPI | |
| 1055 | if (num_papi_counters > 0 && !apex_options::papi_suspend() && thread_papi_state == papi_running) { | |
| 1056 | int rc = PAPI_read( EventSet, main_timer->papi_stop_values ); | |
| 1057 | PAPI_ERROR_CHECK(PAPI_read); | |
| 1058 | } | |
| 1059 | #endif | |
| 1060 | // if this profile is processed, it will get deleted. so don't process it! | |
| 1061 | // It also clutters up the final profile, if generated. | |
| 1062 | //process_profile(main_timer.get(), my_tid); | |
| 1063 | ||
| 1064 | // output to screen? | |
| 1065 | if ((apex_options::use_screen_output() || | |
| 1066 | apex_options::use_csv_output()) && node_id == 0) | |
| 1067 | { | |
| 1068 | #ifdef APEX_MULTIPLE_QUEUES | |
| 1069 | size_t ignored = 0; | |
| 1070 | std::unique_lock<std::mutex> queue_lock(queue_mtx); | |
| 1071 | std::vector<profiler_queue_t*>::const_iterator a_queue; | |
| 1072 | for (a_queue = allqueues.begin() ; a_queue != allqueues.end() ; ++a_queue) { | |
| 1073 | thequeue = *a_queue; | |
| 1074 | ignored += thequeue->size_approx(); | |
| 1075 | } | |
| 1076 | #else | |
| 1077 | size_t ignored = thequeue.size_approx(); | |
| 1078 | #endif | |
| 1079 | if (ignored > 0) { | |
| 1080 | std::cerr << "Info: " << ignored << " items remaining on on the profiler_listener queue..."; | |
| 1081 | } | |
| 1082 | #ifndef APEX_HAVE_HPX | |
| 1083 | // We might be done, but check to make sure the queue is empty | |
| 1084 | std::vector<std::future<bool>> pending_futures; | |
| 1085 | for (unsigned int i=0; i<hardware_concurrency(); ++i) { | |
| 1086 | #ifdef APEX_STATIC | |
| 1087 | /* Static libC++ doesn't do async very well. In fact, it crashes. */ | |
| 1088 | auto f = std::async(&profiler_listener::concurrent_cleanup,this); | |
| 1089 | #else // APEX_STATIC | |
| 1090 | auto f = std::async(std::launch::async,&profiler_listener::concurrent_cleanup,this); | |
| 1091 | #endif // APEX_STATIC | |
| 1092 | // transfer the future's shared state to a longer-lived future | |
| 1093 | pending_futures.push_back(std::move(f)); | |
| 1094 | } | |
| 1095 | for (auto iter = pending_futures.begin() ; iter < pending_futures.end() ; iter++ ) { | |
| 1096 | iter->get(); | |
| 1097 | } | |
| 1098 | #endif // APEX_HAVE_HPX | |
| 1099 | if (ignored > 0) { | |
| 1100 | std::cerr << "done." << std::endl; | |
| 1101 | } | |
| 1102 | if (apex_options::use_screen_output() || apex_options::use_csv_output()) { | |
| 1103 | finalize_profiles(); | |
| 1104 | } | |
| 1105 | } | |
| 1106 | if (apex_options::use_taskgraph_output() && node_id == 0) | |
| 1107 | { | |
| 1108 | write_taskgraph(); | |
| 1109 | } | |
| 1110 | ||
| 1111 | // output to 1 TAU profile per process? | |
| 1112 | if (apex_options::use_profile_output() && !apex_options::use_tau()) { | |
| 1113 | write_profile(); | |
| 1114 | } | |
| 1115 | #if !defined(_MSC_VER) | |
| 1116 | if (apex_options::task_scatterplot()) { | |
| 1117 | // get the length of the stream | |
| 1118 | int loc0 = task_scatterplot_samples.tellp(); | |
| 1119 | // lock access to the file | |
| 1120 | // write using low-level file locking! | |
| 1121 | struct flock fl; | |
| 1122 | fl.l_type = F_WRLCK; /* F_RDLCK, F_WRLCK, F_UNLCK */ | |
| 1123 | fl.l_whence = SEEK_SET; /* SEEK_SET, SEEK_CUR, SEEK_END */ | |
| 1124 | fl.l_start = 0; /* Offset from l_whence */ | |
| 1125 | fl.l_len = 0; /* length, 0 = to EOF */ | |
| 1126 | fl.l_pid = getpid(); /* our PID */ | |
| 1127 | fcntl(task_scatterplot_sample_file, F_SETLKW, &fl); /* F_GETLK, F_SETLK, F_SETLKW */ | |
| 1128 | // flush the string stream to the file | |
| 1129 | //lseek(task_scatterplot_sample_file, 0, SEEK_END); | |
| 1130 | ssize_t bytes_written = write(task_scatterplot_sample_file, | |
| 1131 | task_scatterplot_samples.str().c_str(), loc0); | |
| 1132 | if (bytes_written < 0) { | |
| 1133 | int errsv = errno; | |
| 1134 | perror("Error writing to scatterplot!"); | |
| 1135 | fprintf(stderr, "Error writing scatterplot:\n%s\n", | |
| 1136 | strerror(errsv)); | |
| 1137 | } | |
| 1138 | fl.l_type = F_UNLCK; /* tell it to unlock the region */ | |
| 1139 | fcntl(task_scatterplot_sample_file, F_SETLK, &fl); /* set the region to unlocked */ | |
| 1140 | close(task_scatterplot_sample_file); | |
| 1141 | } | |
| 1142 | #endif | |
| 1143 | ||
| 1144 | } | |
| 1145 | /* The cleanup is disabled for now. Why? Because we want to be able | |
| 1146 | * to access the profiles at the end of the run, after APEX has | |
| 1147 | * finalized. */ | |
| 1148 | // cleanup. | |
| 1149 | // delete_profiles(); | |
| 1150 | } | |
| 1151 | ||
| 1152 | /* When a new node is created */ | |
| 1153 | void profiler_listener::on_new_node(node_event_data &data) { | |
| 1154 | if (!_done) { | |
| 1155 | } | |
| 1156 | APEX_UNUSED(data); | |
| 1157 | } | |
| 1158 | ||
| 1159 | /* When a new thread is registered, expand all of our storage as necessary | |
| 1160 | * to handle the new thread */ | |
| 1161 | void profiler_listener::on_new_thread(new_thread_event_data &data) { | |
| 1162 | if (!_done) { | |
| 1163 | my_tid = (unsigned int)thread_instance::get_id(); | |
| 1164 | async_thread_setup(); | |
| 1165 | #if APEX_HAVE_PAPI | |
| 1166 | initialize_PAPI(false); | |
| 1167 | event_set_mutex.lock(); | |
| 1168 | if (my_tid >= event_sets.size()) { | |
| 1169 | if (my_tid >= event_sets.size()) { | |
| 1170 | event_sets.resize(my_tid + 1); | |
| 1171 | } | |
| 1172 | } | |
| 1173 | event_sets[my_tid] = EventSet; | |
| 1174 | event_set_mutex.unlock(); | |
| 1175 | #endif | |
| 1176 | } | |
| 1177 | APEX_UNUSED(data); | |
| 1178 | } | |
| 1179 | ||
| 1180 | extern "C" int main (int, char**); | |
| 1181 | ||
| 1182 | /* When a start event happens, create a profiler object. Unless this | |
| 1183 | * named event is throttled, in which case do nothing, as quickly as possible */ | |
| 1184 | inline bool profiler_listener::_common_start(task_identifier * id, bool is_resume) { | |
| 1185 | if (!_done) { | |
| 1186 | #if defined(APEX_THROTTLE) | |
| 1187 | // if this timer is throttled, return without doing anything | |
| 1188 | unordered_set<task_identifier>::const_iterator it; | |
| 1189 | { | |
| 1190 | read_lock_type l(throttled_event_set_mutex); | |
| 1191 | it = throttled_tasks.find(*id); | |
| 1192 | } | |
| 1193 | if (it != throttled_tasks.end()) { | |
| 1194 | /* | |
| 1195 | * The throw is removed, because it is a performance penalty on some systems | |
| 1196 | * on_start now returns a boolean | |
| 1197 | */ | |
| 1198 | //throw disabled_profiler_exception(); // to be caught by apex::start/resume | |
| 1199 | return false; | |
| 1200 | } | |
| 1201 | #endif | |
| 1202 | // start the profiler object, which starts our timers | |
| 1203 | //std::shared_ptr<profiler> p = std::make_shared<profiler>(id, is_resume); | |
| 1204 | profiler * p = new profiler(id, is_resume); | |
| 1205 | thread_instance::instance().set_current_profiler(p); | |
| 1206 | #if APEX_HAVE_PAPI | |
| 1207 | if (num_papi_counters > 0 && !apex_options::papi_suspend()) { | |
| 1208 | // if papi was previously suspended, we need to start the counters | |
| 1209 | if (thread_papi_state == papi_suspended) { | |
| 1210 | int rc = PAPI_start( EventSet ); | |
| 1211 | PAPI_ERROR_CHECK(PAPI_start); | |
| 1212 | thread_papi_state = papi_running; | |
| 1213 | } | |
| 1214 | int rc = PAPI_read( EventSet, p->papi_start_values ); | |
| 1215 | PAPI_ERROR_CHECK(PAPI_read); | |
| 1216 | } else { | |
| 1217 | // if papi is still running, stop the counters | |
| 1218 | if (thread_papi_state == papi_running) { | |
| 1219 | long long dummy[8]; | |
| 1220 | int rc = PAPI_stop( EventSet, dummy ); | |
| 1221 | PAPI_ERROR_CHECK(PAPI_stop); | |
| 1222 | thread_papi_state = papi_suspended; | |
| 1223 | } | |
| 1224 | } | |
| 1225 | #endif | |
| 1226 | } else { | |
| 1227 | return false; | |
| 1228 | } | |
| 1229 | return true; | |
| 1230 | } | |
| 1231 | ||
| 1232 | inline void profiler_listener::push_profiler(int my_tid, std::shared_ptr<profiler> &p) { | |
| 1233 | // if we aren't processing profiler objects, just return. | |
| 1234 | if (!apex_options::process_async_state()) { return; } | |
| 1235 | #ifdef APEX_TRACE_APEX | |
| 1236 | if (p->task_id->address == (uint64_t)&profiler_listener::process_profiles_wrapper) { return; } | |
| 1237 | #endif | |
| 1238 | // we have to make a local copy, because lockfree queues DO NOT SUPPORT shared_ptrs! | |
| 1239 | #ifdef APEX_MULTIPLE_QUEUES | |
| 1240 | thequeue->enqueue(p); | |
| 1241 | #else | |
| 1242 | thequeue.enqueue(p); | |
| 1243 | #endif | |
| 1244 | /* | |
| 1245 | bool worked = thequeue.enqueue(p); | |
| 1246 | if (!worked) { | |
| 1247 | static std::atomic<bool> issued(false); | |
| 1248 | if (!issued) { | |
| 1249 | issued = true; | |
| 1250 | cout << "APEX Warning : failed to push " << p->task_id->get_name() << endl; | |
| 1251 | cout << "One or more frequently-called, lightweight functions is being timed." << endl; | |
| 1252 | } | |
| 1253 | } | |
| 1254 | */ | |
| 1255 | #ifndef APEX_HAVE_HPX | |
| 1256 | queue_signal.post(); | |
| 1257 | #endif | |
| 1258 | #ifdef APEX_HAVE_HPX | |
| 1259 | apex_schedule_process_profiles(); | |
| 1260 | #endif | |
| 1261 | } | |
| 1262 | ||
| 1263 | /* Stop the timer, if applicable, and queue the profiler object */ | |
| 1264 | inline void profiler_listener::_common_stop(std::shared_ptr<profiler> &p, bool is_yield) { | |
| 1265 | if (!_done) { | |
| 1266 | if (p) { | |
| 1267 | p->stop(is_yield); | |
| 1268 | #if APEX_HAVE_PAPI | |
| 1269 | if (num_papi_counters > 0 && !apex_options::papi_suspend() && thread_papi_state == papi_running) { | |
| 1270 | int rc = PAPI_read( EventSet, p->papi_stop_values ); | |
| 1271 | PAPI_ERROR_CHECK(PAPI_read); | |
| 1272 | } | |
| 1273 | #endif | |
| 1274 | // Why is this happening now? Why not at start? Why not at create? | |
| 1275 | /* | |
| 1276 | if (apex_options::use_taskgraph_output()) { | |
| 1277 | if (!p->is_resume) { | |
| 1278 | // get the PARENT profiler | |
| 1279 | profiler * parent_profiler = nullptr; | |
| 1280 | try { | |
| 1281 | parent_profiler = thread_instance::instance().get_parent_profiler(); | |
| 1282 | if (parent_profiler != NULL) { | |
| 1283 | task_identifier * parent = parent_profiler->task_id; | |
| 1284 | task_identifier * child = p->task_id; | |
| 1285 | dependency_queue.enqueue(new task_dependency(parent, child)); | |
| 1286 | } | |
| 1287 | } catch (empty_stack_exception& e) { } | |
| 1288 | } | |
| 1289 | } | |
| 1290 | */ | |
| 1291 | push_profiler(my_tid, p); | |
| 1292 | } | |
| 1293 | } | |
| 1294 | } | |
| 1295 | ||
| 1296 | /* Start the timer */ | |
| 1297 | bool profiler_listener::on_start(task_identifier * id) { | |
| 1298 | return _common_start(id, false); | |
| 1299 | } | |
| 1300 | ||
| 1301 | /* This is just like starting a timer, but don't increment the number of calls | |
| 1302 | * value. That is because we are restarting an existing timer. */ | |
| 1303 | bool profiler_listener::on_resume(task_identifier * id) { | |
| 1304 | return _common_start(id, true); | |
| 1305 | } | |
| 1306 | ||
| 1307 | /* Stop the timer */ | |
| 1308 | void profiler_listener::on_stop(std::shared_ptr<profiler> &p) { | |
| 1309 | _common_stop(p, p->is_resume); // don't change the yield/resume value! | |
| 1310 | } | |
| 1311 | ||
| 1312 | /* Stop the timer, but don't increment the number of calls */ | |
| 1313 | void profiler_listener::on_yield(std::shared_ptr<profiler> &p) { | |
| 1314 | _common_stop(p, true); | |
| 1315 | } | |
| 1316 | ||
| 1317 | /* When a thread exits, pop and stop all timers. */ | |
| 1318 | void profiler_listener::on_exit_thread(event_data &data) { | |
| 1319 | APEX_UNUSED(data); | |
| 1320 | } | |
| 1321 | ||
| 1322 | /* When an asynchronous thread is launched, they should | |
| 1323 | * call apex::async_thread_setup() which will end up here.*/ | |
| 1324 | void profiler_listener::async_thread_setup(void) { | |
| 1325 | #ifdef APEX_MULTIPLE_QUEUES | |
| 1326 | // for asynchronous threads, check to make sure there is a queue! | |
| 1327 | if (thequeue == nullptr) { | |
| 1328 | thequeue = new profiler_queue_t(); | |
| 1329 | { | |
| 1330 | std::unique_lock<std::mutex> queue_lock(queue_mtx); | |
| 1331 | allqueues.push_back(thequeue); | |
| 1332 | } | |
| 1333 | } | |
| 1334 | #endif | |
| 1335 | } | |
| 1336 | ||
| 1337 | /* When a sample value is processed, save it as a profiler object, and queue it. */ | |
| 1338 | void profiler_listener::on_sample_value(sample_value_event_data &data) { | |
| 1339 | if (!_done) { | |
| 1340 | std::shared_ptr<profiler> p = std::make_shared<profiler>(new task_identifier(*data.counter_name), data.counter_value); | |
| 1341 | p->is_counter = data.is_counter; | |
| 1342 | push_profiler(my_tid, p); | |
| 1343 | } | |
| 1344 | } | |
| 1345 | ||
| 1346 | void profiler_listener::on_new_task(task_identifier * id, uint64_t task_id) { | |
| 1347 | //cout << "New task: " << task_id << endl; | |
| 1348 | if (!apex_options::use_taskgraph_output()) { return; } | |
| 1349 | // get the current profiler | |
| 1350 | profiler * p = thread_instance::instance().get_current_profiler(); | |
| 1351 | if (p != NULL) { | |
| 1352 | dependency_queue.enqueue(new task_dependency(p->task_id, id)); | |
| 1353 | } else { | |
| 1354 | task_identifier * parent = new task_identifier(string("__start")); | |
| 1355 | dependency_queue.enqueue(new task_dependency(parent, id)); | |
| 1356 | } | |
| 1357 | } | |
| 1358 | ||
| 1359 | /* Communication send event. Save the number of bytes. */ | |
| 1360 | void profiler_listener::on_send(message_event_data &data) { | |
| 1361 | if (!_done) { | |
| 1362 | std::shared_ptr<profiler> p = std::make_shared<profiler>(new task_identifier("Bytes Sent"), (double)data.size); | |
| 1363 | push_profiler(0, p); | |
| 1364 | } | |
| 1365 | } | |
| 1366 | ||
| 1367 | /* Communication recv event. Save the number of bytes. */ | |
| 1368 | void profiler_listener::on_recv(message_event_data &data) { | |
| 1369 | if (!_done) { | |
| 1370 | std::shared_ptr<profiler> p = std::make_shared<profiler>(new task_identifier("Bytes Received"), (double)data.size); | |
| 1371 | push_profiler(0, p); | |
| 1372 | } | |
| 1373 | } | |
| 1374 | ||
| 1375 | /* For periodic stuff. Do something? */ | |
| 1376 | void profiler_listener::on_periodic(periodic_event_data &data) { | |
| 1377 | if (!_done) { | |
| 1378 | } | |
| 1379 | APEX_UNUSED(data); | |
| 1380 | } | |
| 1381 | ||
| 1382 | /* For custom event stuff. Do something? */ | |
| 1383 | void profiler_listener::on_custom_event(custom_event_data &data) { | |
| 1384 | if (!_done) { | |
| 1385 | } | |
| 1386 | APEX_UNUSED(data); | |
| 1387 | } | |
| 1388 | ||
| 1389 | void profiler_listener::reset(task_identifier * id) { | |
| 1390 | std::shared_ptr<profiler> p; | |
| 1391 | p = std::make_shared<profiler>(id, false, reset_type::CURRENT); | |
| 1392 | push_profiler(my_tid, p); | |
| 1393 | } | |
| 1394 | ||
| 1395 | profiler_listener::~profiler_listener (void) { | |
| 1396 | _done = true; // yikes! | |
| 1397 | finalize(); | |
| 1398 | delete_profiles(); | |
| 1399 | #ifndef APEX_HAVE_HPX | |
| 1400 | delete consumer_thread; | |
| 1401 | #endif | |
| 1402 | }; | |
| 1403 | ||
| 1404 | } | |
| 1405 | ||
| 1406 | #ifdef APEX_HAVE_HPX | |
| 1407 | HPX_DECLARE_ACTION(::apex::profiler_listener::process_profiles_wrapper, apex_internal_process_profiles_action); | |
| 1408 | HPX_ACTION_HAS_CRITICAL_PRIORITY(apex_internal_process_profiles_action); | |
| 1409 | HPX_PLAIN_ACTION(::apex::profiler_listener::process_profiles_wrapper, apex_internal_process_profiles_action); | |
| 1410 | ||
| 1411 | void apex_schedule_process_profiles() { | |
| 1412 | if(get_hpx_runtime_ptr() == nullptr) return; | |
| 1413 | if(hpx_shutdown) { | |
| 1414 | ::apex::profiler_listener::process_profiles_wrapper(); | |
| 1415 | } else { | |
| 1416 | if(!consumer_task_running.test_and_set(memory_order_acq_rel)) { | |
| 1417 | apex_internal_process_profiles_action act; | |
| 1418 | try { | |
| 1419 | hpx::apply(act, hpx::find_here()); | |
| 1420 | } catch(...) { | |
| 1421 | // During shutdown, we can't schedule a new task, | |
| 1422 | // so we process profiles ourselves. | |
| 1423 | profiler_listener::process_profiles_wrapper(); | |
| 1424 | } | |
| 1425 | } | |
| 1426 | } | |
| 1427 | } | |
| 1428 | ||
| 1429 | #endif | |
| 1430 | ||
| 1431 | ||
| 1432 |
Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.