/home/users/khuck/src/hpx-lsu/apex/src/apex/profiler_listener.cpp

Line% of fetchesSource
1  
//  Copyright (c) 2014 University of Oregon
2  
//
3  
4  
#ifdef APEX_HAVE_HPX
5  
#include <hpx/config.hpp>
6  
#ifdef APEX_HAVE_OTF2
7  
#define APEX_TRACE_APEX
8  
#endif // APEX_HAVE_OTF2
9  
#endif // APEX_HAVE_HPX
10  
11  
#include "profiler_listener.hpp"
12  
#include "profiler.hpp"
13  
#include "thread_instance.hpp"
14  
#include <iostream>
15  
#include <iomanip>
16  
#include <fstream>
17  
#include <math.h>
18  
#include "apex_options.hpp"
19  
#include "profiler.hpp"
20  
#include "profile.hpp"
21  
#include "apex.hpp"
22  
23  
#include <atomic>
24  
#if !defined(_WIN32) && (defined(__unix__) || defined(__unix) || (defined(__APPLE__) && defined(__MACH__)))
25  
#include <unistd.h>
26  
#include <sched.h>
27  
#endif
28  
#include <cstdio>
29  
#include <vector>
30  
#include <string>
31  
#include <unordered_set>
32  
#include <algorithm>
33  
#include <iterator>
34  
35  
#include <functional>
36  
#include <thread>
37  
#include <future>
38  
39  
#if defined(APEX_THROTTLE)
40  
#include "apex_cxx_shared_lock.hpp"
41  
apex::shared_mutex_type throttled_event_set_mutex;
42  
#define APEX_THROTTLE_CALLS 1000
43  
#ifdef APEX_USE_CLOCK_TIMESTAMP
44  
#define APEX_THROTTLE_PERCALL 0.00001 // 10 microseconds.
45  
#else
46  
#define APEX_THROTTLE_PERCALL 50000 // 50k cycles.
47  
#endif
48  
#endif
49  
50  
#if APEX_HAVE_PAPI
51  
#include "papi.h"
52  
#include <mutex>
53  
std::mutex event_set_mutex;
54  
#endif
55  
56  
#ifdef APEX_HAVE_HPX
57  
#include <boost/assign.hpp>
58  
#include <boost/cstdint.hpp>
59  
#include <hpx/include/performance_counters.hpp>
60  
#include <hpx/include/actions.hpp>
61  
#include <hpx/include/util.hpp>
62  
#include <hpx/lcos/local/composable_guard.hpp>
63  
static void apex_schedule_process_profiles(void); // not in apex namespace
64  
const int num_non_worker_threads_registered = 0;
65  
#endif
66  
67  
#define APEX_MAIN "APEX MAIN"
68  
69  
#ifdef APEX_HAVE_TAU
70  
#define PROFILING_ON
71  
#define TAU_DOT_H_LESS_HEADERS
72  
#include <TAU.h>
73  
#endif
74  
75  
#include "utils.hpp"
76  
77  
#include <cstdlib>
78  
#include <ctime>
79  
80  
using namespace std;
81  
using namespace apex;
82  
83  
APEX_NATIVE_TLS unsigned int my_tid = 0; // the current thread's TID in APEX
84  
85  
namespace apex {
86  
87  
#ifdef APEX_MULTIPLE_QUEUES
88  
  /* this is a thread-local pointer to a concurrent queue for each worker thread. */
89  
  __thread profiler_queue_t * thequeue;
90  
#endif
91  
92  
  /* THis is a special profiler, indicating that the timer requested is
93  
     throttled, and shouldn't be processed. */
94  
  profiler* profiler::disabled_profiler = new profiler();
95  
96  
#ifdef APEX_HAVE_HPX
97  
  /* Flag indicating whether a consumer task is currently running */
98  
  std::atomic_flag consumer_task_running = ATOMIC_FLAG_INIT;
99  
  bool hpx_shutdown = false;
100  
#endif
101  
102  
  double profiler_listener::get_non_idle_time() {
103  
    double non_idle_time = 0.0;
104  
    /* Iterate over all timers and accumulate the time spent in them */
105  
    unordered_map<task_identifier, profile*>::const_iterator it2;
106  
    std::unique_lock<std::mutex> task_map_lock(_task_map_mutex);
107  
    for(it2 = task_map.begin(); it2 != task_map.end(); it2++) {
108  
      profile * p = it2->second;
109  
#if defined(APEX_THROTTLE)
110  
      task_identifier id = it2->first;
111  
      unordered_set<task_identifier>::const_iterator it4;
112  
	  {
113  
      	read_lock_type l(throttled_event_set_mutex);
114  
      	it4 = throttled_tasks.find(id);
115  
	  }
116  
      if (it4!= throttled_tasks.end()) {
117  
        continue;
118  
      }
119  
#endif
120  
      if (p->get_type() == APEX_TIMER) {
121  
        non_idle_time += p->get_accumulated();
122  
      }
123  
    }
124  
    return non_idle_time*profiler::get_cpu_mhz();
125  
  }
126  
127  
  profile * profiler_listener::get_idle_time() {
128  
    double non_idle_time = get_non_idle_time();
129  
    /* Subtract the accumulated time from the main time span. */
130  
	int num_worker_threads = thread_instance::get_num_threads();
131  
#ifdef APEX_HAVE_HPX
132  
    num_worker_threads = num_worker_threads - num_non_worker_threads_registered;
133  
#endif
134  
    std::chrono::duration<double> time_span =
135  
        std::chrono::duration_cast<std::chrono::duration<double>>
136  
           (MYCLOCK::now() - main_timer->start);
137  
    double total_main = time_span.count() *
138  
                fmin(hardware_concurrency(), num_worker_threads);
139  
    double elapsed = total_main - non_idle_time;
140  
    elapsed = elapsed > 0.0 ? elapsed : 0.0;
141  
    profile * theprofile = new profile(elapsed*profiler::get_cpu_mhz(), 0, NULL, false);
142  
    return theprofile;
143  
  }
144  
145  
  profile * profiler_listener::get_idle_rate() {
146  
    double non_idle_time = get_non_idle_time();
147  
    /* Subtract the accumulated time from the main time span. */
148  
	int num_worker_threads = thread_instance::get_num_threads();
149  
#ifdef APEX_HAVE_HPX
150  
    num_worker_threads = num_worker_threads - num_non_worker_threads_registered;
151  
#endif
152  
    std::chrono::duration<double> time_span =
153  
        std::chrono::duration_cast<std::chrono::duration<double>>
154  
           (MYCLOCK::now() - main_timer->start);
155  
    double total_main = time_span.count() *
156  
                fmin(hardware_concurrency(), num_worker_threads);
157  
    double elapsed = total_main - non_idle_time;
158  
    double rate = elapsed > 0.0 ? ((elapsed/total_main)) : 0.0;
159  
    profile * theprofile = new profile(rate, 0, NULL, false);
160  
    return theprofile;
161  
  }
162  
163  
  /* Return the requested profile object to the user.
164  
   * Return nullptr if doesn't exist. */
165  
  profile * profiler_listener::get_profile(task_identifier &id) {
166  
    if (id.name == string(APEX_IDLE_RATE)) {
167  
        return get_idle_rate();
168  
    } else if (id.name == string(APEX_IDLE_TIME)) {
169  
        return get_idle_time();
170  
    } else if (id.name == string(APEX_NON_IDLE_TIME)) {
171  
        profile * theprofile = new profile(get_non_idle_time(), 0, NULL, false);
172  
        return theprofile;
173  
    }
174  
    std::unique_lock<std::mutex> task_map_lock(_task_map_mutex);
175  
    unordered_map<task_identifier, profile*>::const_iterator it = task_map.find(id);
176  
    if (it != task_map.end()) {
177  
      return (*it).second;
178  
    }
179  
    return nullptr;
180  
  }
181  
182  
  void profiler_listener::reset_all(void) {
183  
    std::unique_lock<std::mutex> task_map_lock(_task_map_mutex);
184  
    for(auto &it : task_map) {
185  
        it.second->reset();
186  
    }
187  
  }
188  
189  
  /* After the consumer thread pulls a profiler off of the queue,
190  
   * process it by updating its profile object in the map of profiles. */
191  
  // TODO The name-based timer and address-based timer paths through
192  
  // the code involve a lot of duplication -- this should be refactored
193  
  // to remove the duplication so it's easier to maintain.
194  
  unsigned int profiler_listener::process_profile(std::shared_ptr<profiler> &p, unsigned int tid)
195  
  {
196  
    if(p == nullptr) return 0;
197  
    profile * theprofile;
198  
    if(p->is_reset == reset_type::ALL) {
199  
        reset_all();
200  
        return 0;
201  
    }
202  
    double values[8] = {0};
203  
    double tmp_num_counters = 0;
204  
#if APEX_HAVE_PAPI
205  
    tmp_num_counters = num_papi_counters;
206  
    for (int i = 0 ; i < num_papi_counters ; i++) {
207  
        if (p->papi_stop_values[i] > p->papi_start_values[i]) {
208  
            values[i] = p->papi_stop_values[i] - p->papi_start_values[i];
209  
        } else {
210  
            values[i] = 0.0;
211  
        }
212  
    }
213  
#endif
214  
    std::unique_lock<std::mutex> task_map_lock(_task_map_mutex, std::defer_lock);
215  
    // There is only one consumer thread except during shutdown, so we only need
216  
    // to lock during shutdown.
217  
    bool did_lock = false;
218  
    if(_done) {
219  
        task_map_lock.lock();
220  
        did_lock = true;
221  
    }
222  
    unordered_map<task_identifier, profile*>::const_iterator it = task_map.find(*(p->task_id));
223  
    if (it != task_map.end()) {
224  
      	// A profile for this ID already exists.
225  
        theprofile = (*it).second;
226  
        if(_done && did_lock) {
227  
            task_map_lock.unlock();
228  
        }
229  
        if(p->is_reset == reset_type::CURRENT) {
230  
            theprofile->reset();
231  
        } else {
232  
            theprofile->increment(p->elapsed(), tmp_num_counters, values, p->is_resume);
233  
        }
234  
#if defined(APEX_THROTTLE)
235  
        // Is this a lightweight task? If so, we shouldn't measure it any more,
236  
        // in order to reduce overhead.
237  
        if (theprofile->get_calls() > APEX_THROTTLE_CALLS &&
238  
            theprofile->get_mean() < APEX_THROTTLE_PERCALL) {
239  
            unordered_set<task_identifier>::const_iterator it2;
240  
	        {
241  
      	      read_lock_type l(throttled_event_set_mutex);
242  
              it2 = throttled_tasks.find(*(p->task_id));
243  
			}
244  
            if (it2 == throttled_tasks.end()) {
245  
                // lock the set for insert
246  
	            {
247  
      	            write_lock_type l(throttled_event_set_mutex);
248  
                    // was it inserted when we were waiting?
249  
                    it2 = throttled_tasks.find(*(p->task_id));
250  
                    // no? OK - insert it.
251  
                    if (it2 == throttled_tasks.end()) {
252  
                        throttled_tasks.insert(*(p->task_id));
253  
                    }
254  
                }
255  
                if (apex_options::use_screen_output()) {
256  
                    cout << "APEX: disabling lightweight timer "
257  
                         << p->task_id->get_name()
258  
                          << endl;
259  
                    fflush(stdout);
260  
                }
261  
            }
262  
        }
263  
#endif
264  
      } else {
265  
        // Create a new profile for this name.
266  
        theprofile = new profile(p->is_reset == reset_type::CURRENT ? 0.0 : p->elapsed(), tmp_num_counters, values, p->is_resume, p->is_counter ? APEX_COUNTER : APEX_TIMER);
267  
        task_map[*(p->task_id)] = theprofile;
268  
        if(_done && did_lock) {
269  
            task_map_lock.unlock();
270  
        }
271  
#ifdef APEX_HAVE_HPX
272  
#ifdef APEX_REGISTER_HPX3_COUNTERS
273  
        if(!_done) {
274  
            if(get_hpx_runtime_ptr() != nullptr && p->task_id->has_name()) {
275  
                std::string timer_name(p->task_id->get_name());
276  
                //Don't register timers containing "/"
277  
                if(timer_name.find("/") == std::string::npos) {
278  
                    hpx::performance_counters::install_counter_type(
279  
                    std::string("/apex/") + timer_name,
280  
                    [p](bool r)->boost::int64_t{
281  
                        boost::int64_t value(p->elapsed());
282  
                        return value;
283  
                    },
284  
                    std::string("APEX counter ") + timer_name,
285  
                    ""
286  
                    );
287  
                }
288  
            } else {
289  
                std::cerr << "HPX runtime not initialized yet." << std::endl;
290  
            }
291  
        }
292  
#endif
293  
#endif
294  
      }
295  
#if !defined(_MSC_VER)
296  
      /* write the sample to the file */
297  
      if (apex_options::task_scatterplot()) {
298  
        if (!p->is_counter) {
299  
            static int thresh = RAND_MAX/100;
300  
            if (std::rand() < thresh) {
301  
                std::unique_lock<std::mutex> task_map_lock(_mtx);
302  
                task_scatterplot_samples << p->normalized_timestamp() << " "
303  
                            << p->elapsed()*profiler::get_cpu_mhz()*1000000 << " "
304  
                            << "'" << p->task_id->get_name() << "'" << endl;
305  
                int loc0 = task_scatterplot_samples.tellp();
306  
                if (loc0 > 32768) {
307  
                    // lock access to the file
308  
        			// write using low-level file locking!
309  
        			struct flock fl;
310  
        			fl.l_type   = F_WRLCK;  /* F_RDLCK, F_WRLCK, F_UNLCK    */
311  
        			fl.l_whence = SEEK_SET; /* SEEK_SET, SEEK_CUR, SEEK_END */
312  
        			fl.l_start  = 0;        /* Offset from l_whence         */
313  
        			fl.l_len    = 0;        /* length, 0 = to EOF           */
314  
        			fl.l_pid    = getpid();      /* our PID                      */
315  
        			fcntl(task_scatterplot_sample_file, F_SETLKW, &fl);  /* F_GETLK, F_SETLK, F_SETLKW */
316  
                    // flush the string stream to the file
317  
                    //lseek(task_scatterplot_sample_file, 0, SEEK_END);
318  
        			ssize_t bytes_written = write(task_scatterplot_sample_file,
319  
						  task_scatterplot_samples.str().c_str(), loc0);
320  
                    if (bytes_written < 0) {
321  
                        int errsv = errno;
322  
                        perror("Error writing to scatterplot!");
323  
                        fprintf(stderr, "Error writing scatterplot:\n%s\n",
324  
                                strerror(errsv));
325  
                    }
326  
        			fl.l_type   = F_UNLCK;   /* tell it to unlock the region */
327  
        			fcntl(task_scatterplot_sample_file, F_SETLK, &fl); /* set the region to unlocked */
328  
                    // reset the stringstream
329  
                    task_scatterplot_samples.str("");
330  
                }
331  
            }
332  
        }
333  
      }
334  
#endif
335  
    return 1;
336  
  }
337  
338  
  inline unsigned int profiler_listener::process_dependency(task_dependency* td)
339  
  {
340  
      unordered_map<task_identifier, unordered_map<task_identifier, int>* >::const_iterator it = task_dependencies.find(td->parent);
341  
      unordered_map<task_identifier, int> * depend;
342  
      // if this is a new dependency for this parent?
343  
      if (it == task_dependencies.end()) {
344  
          depend = new unordered_map<task_identifier, int>();
345  
          (*depend)[td->child] = 1;
346  
          task_dependencies[td->parent] = depend;
347  
      // otherwise, see if this parent has seen this child
348  
      } else {
349  
          depend = it->second;
350  
          unordered_map<task_identifier, int>::const_iterator it2 = depend->find(td->child);
351  
          // first time for this child
352  
          if (it2 == depend->end()) {
353  
              (*depend)[td->child] = 1;
354  
          // not the first time for this child
355  
          } else {
356  
              int tmp = it2->second;
357  
              (*depend)[td->child] = tmp + 1;
358  
          }
359  
      }
360  
      delete(td);
361  
      return 1;
362  
  }
363  
364  
  /* Cleaning up memory. Not really necessary, because it only gets
365  
   * called at shutdown. But a good idea to do regardless. */
366  
  void profiler_listener::delete_profiles(void) {
367  
    // iterate over the map and free the objects in the map
368  
    unordered_map<task_identifier, profile*>::const_iterator it;
369  
    std::unique_lock<std::mutex> task_map_lock(_task_map_mutex);
370  
    for(it = task_map.begin(); it != task_map.end(); it++) {
371  
      delete it->second;
372  
    }
373  
    // clear the map.
374  
    task_map.clear();
375  
376  
  }
377  
378  
#define PAD_WITH_SPACES "%8s"
379  
#define FORMAT_PERCENT "%8.3f"
380  
#define FORMAT_SCIENTIFIC "%1.2e"
381  
382  
  template<typename ... Args>
383  
  string string_format( const std::string& format, Args ... args )
384  
  {
385  
      size_t size = snprintf( nullptr, 0, format.c_str(), args ... ) + 1; // Extra space for '\0'
386  
      unique_ptr<char[]> buf( new char[ size ] );
387  
      snprintf( buf.get(), size, format.c_str(), args ... );
388  
      return string( buf.get(), buf.get() + size - 1 ); // We don't want the '\0' inside
389  
  }
390  
391  
  void profiler_listener::write_one_timer(task_identifier &task_id,
392  
          profile * p, stringstream &screen_output,
393  
          stringstream &csv_output, double &total_accumulated,
394  
          double &total_main) {
395  
      string action_name = task_id.get_name();
396  
      string shorter(action_name);
397  
      // to keep formatting pretty, trim any long timer names
398  
      if (shorter.size() > 30) {
399  
        shorter.resize(27);
400  
        shorter.resize(30, '.');
401  
      }
402  
      //screen_output << "\"" << shorter << "\", " ;
403  
      screen_output << string_format("%30s", shorter.c_str()) << " : ";
404  
#if defined(APEX_THROTTLE)
405  
      // if this profile was throttled, don't output the measurements.
406  
      // they are limited and bogus, anyway.
407  
      unordered_set<task_identifier>::const_iterator it4;
408  
	  {
409  
      	read_lock_type l(throttled_event_set_mutex);
410  
        it4 = throttled_tasks.find(task_id);
411  
  	  }
412  
      if (it4!= throttled_tasks.end()) {
413  
        screen_output << "DISABLED (high frequency, short duration)" << endl;
414  
        return;
415  
      }
416  
#endif
417  
      if(p->get_calls() < 1) {
418  
        p->get_profile()->calls = 1;
419  
      }
420  
      if (p->get_calls() < 999999) {
421  
          screen_output << string_format(PAD_WITH_SPACES, to_string((int)p->get_calls()).c_str()) << "   " ;
422  
      } else {
423  
          screen_output << string_format(FORMAT_SCIENTIFIC, p->get_calls()) << "   " ;
424  
      }
425  
      if (p->get_type() == APEX_TIMER) {
426  
        csv_output << "\"" << action_name << "\",";
427  
        csv_output << llround(p->get_calls()) << ",";
428  
        // convert MHz to Hz
429  
        csv_output << std::llround(p->get_accumulated()) << ",";
430  
        // convert MHz to microseconds
431  
        csv_output << std::llround(p->get_accumulated()*profiler::get_cpu_mhz()*1000000);
432  
        screen_output << " --n/a--   " ;
433  
        screen_output << string_format(FORMAT_SCIENTIFIC, (p->get_mean()*profiler::get_cpu_mhz())) << "   " ;
434  
        screen_output << " --n/a--   " ;
435  
        screen_output << string_format(FORMAT_SCIENTIFIC, (p->get_accumulated()*profiler::get_cpu_mhz())) << "   " ;
436  
        screen_output << " --n/a--   " ;
437  
        screen_output << string_format(FORMAT_PERCENT, (((p->get_accumulated()*profiler::get_cpu_mhz())/total_main)*100));
438  
#if APEX_HAVE_PAPI
439  
        for (int i = 0 ; i < num_papi_counters ; i++) {
440  
            screen_output  << "   " << string_format(FORMAT_SCIENTIFIC, (p->get_papi_metrics()[i]));
441  
            csv_output << "," << std::llround(p->get_papi_metrics()[i]);
442  
        }
443  
#endif
444  
        screen_output << endl;
445  
        total_accumulated += p->get_accumulated();
446  
        csv_output << endl;
447  
      } else {
448  
        if (action_name.find('%') == string::npos) {
449  
          screen_output << string_format(FORMAT_SCIENTIFIC, p->get_minimum()) << "   " ;
450  
          screen_output << string_format(FORMAT_SCIENTIFIC, p->get_mean()) << "   " ;
451  
          screen_output << string_format(FORMAT_SCIENTIFIC, p->get_maximum()) << "   " ;
452  
          screen_output << string_format(FORMAT_SCIENTIFIC, p->get_accumulated()) << "   " ;
453  
          screen_output << string_format(FORMAT_SCIENTIFIC, p->get_stddev()) << "   " ;
454  
        } else {
455  
          screen_output << string_format(FORMAT_PERCENT, p->get_minimum()) << "   " ;
456  
          screen_output << string_format(FORMAT_PERCENT, p->get_mean()) << "   " ;
457  
          screen_output << string_format(FORMAT_PERCENT, p->get_maximum()) << "   " ;
458  
          screen_output << string_format(FORMAT_PERCENT, p->get_accumulated()) << "   " ;
459  
          screen_output << string_format(FORMAT_PERCENT, p->get_stddev()) << "   " ;
460  
        }
461  
        screen_output << " --n/a-- "  << endl;
462  
      }
463  
  }
464  
465  
  /* At program termination, write the measurements to the screen, or to CSV file, or both. */
466  
  void profiler_listener::finalize_profiles(void) {
467  
    // our TOTAL available time is the elapsed * the number of threads, or cores
468  
	int num_worker_threads = thread_instance::get_num_threads();
469  
    double wall_clock_main = main_timer->elapsed() * profiler::get_cpu_mhz();
470  
#ifdef APEX_HAVE_HPX
471  
    num_worker_threads = num_worker_threads - num_non_worker_threads_registered;
472  
#endif
473  
    double total_main = wall_clock_main *
474  
        fmin(hardware_concurrency(), num_worker_threads);
475  
    // create a stringstream to hold all the screen output - we may not
476  
    // want to write it out
477  
    stringstream screen_output;
478  
    // create a stringstream to hold all the CSV output - we may not
479  
    // want to write it out
480  
    stringstream csv_output;
481  
    // iterate over the profiles in the address map
482  
    screen_output << "Elapsed time: " << wall_clock_main << endl;
483  
    screen_output << "Cores detected: " << hardware_concurrency() << endl;
484  
    screen_output << "Worker Threads observed: " << num_worker_threads << endl;
485  
    screen_output << "Available CPU time: " << total_main << endl;
486  
    map<apex_function_address, profile*>::const_iterator it;
487  
    screen_output << "Action                         :  #calls  |  minimum |    mean  |  maximum |   total  |  stddev  |  % total  " << apex_options::papi_metrics() << endl;
488  
    screen_output << "------------------------------------------------------------------------------------------------------------" << endl;
489  
    csv_output << "\"task\",\"num calls\",\"total cycles\",\"total microseconds\"";
490  
#if APEX_HAVE_PAPI
491  
    for (int i = 0 ; i < num_papi_counters ; i++) {
492  
       csv_output << ",\"" << metric_names[i] << "\"";
493  
    }
494  
#endif
495  
    csv_output << endl;
496  
    double total_accumulated = 0.0;
497  
    unordered_map<task_identifier, profile*>::const_iterator it2;
498  
    std::vector<task_identifier> id_vector;
499  
    // iterate over the counters, and sort their names
500  
    std::unique_lock<std::mutex> task_map_lock(_task_map_mutex);
501  
    for(it2 = task_map.begin(); it2 != task_map.end(); it2++) {
502  
        task_identifier task_id = it2->first;
503  
        profile * p = it2->second;
504  
        if (p->get_type() != APEX_TIMER) {
505  
            id_vector.push_back(task_id);
506  
        }
507  
    }
508  
    std::sort(id_vector.begin(), id_vector.end());
509  
    // iterate over the counters
510  
    for(task_identifier task_id : id_vector) {
511  
        profile * p = task_map[task_id];
512  
        if (p) {
513  
            write_one_timer(task_id, p, screen_output, csv_output, total_accumulated, total_main);
514  
        }
515  
    }
516  
    id_vector.clear();
517  
    // iterate over the timers, and sort their names
518  
    for(it2 = task_map.begin(); it2 != task_map.end(); it2++) {
519  
        profile * p = it2->second;
520  
        task_identifier task_id = it2->first;
521  
        if (p->get_type() == APEX_TIMER) {
522  
            id_vector.push_back(task_id);
523  
        }
524  
    }
525  
    // iterate over the timers
526  
    std::sort(id_vector.begin(), id_vector.end());
527  
    // iterate over the counters
528  
    for(task_identifier task_id : id_vector) {
529  
        profile * p = task_map[task_id];
530  
        if (p) {
531  
            write_one_timer(task_id, p, screen_output, csv_output, total_accumulated, total_main);
532  
        }
533  
    }
534  
    double idle_rate = total_main - (total_accumulated*profiler::get_cpu_mhz());
535  
    screen_output << string_format("%30s", APEX_IDLE_TIME) << " : ";
536  
    screen_output << " --n/a--   " ;
537  
    screen_output << " --n/a--   " ;
538  
    screen_output << " --n/a--   " ;
539  
    screen_output << " --n/a--   " ;
540  
    if (idle_rate < 0.0) {
541  
      screen_output << " --n/a--   " ;
542  
    } else {
543  
      screen_output << string_format(FORMAT_SCIENTIFIC, idle_rate) << "   " ;
544  
    }
545  
    screen_output << " --n/a--   " ;
546  
    if (idle_rate < 0.0) {
547  
      screen_output << " --n/a--   " << endl;
548  
    } else {
549  
      screen_output << string_format(FORMAT_PERCENT, ((idle_rate/total_main)*100)) << endl;
550  
    }
551  
    screen_output << "------------------------------------------------------------------------------------------------------------" << endl;
552  
    if (apex_options::use_screen_output()) {
553  
        cout << screen_output.str();
554  
    }
555  
    if (apex_options::use_csv_output()) {
556  
        ofstream csvfile;
557  
        stringstream csvname;
558  
        csvname << "apex." << node_id << ".csv";
559  
        csvfile.open(csvname.str(), ios::out);
560  
        csvfile << csv_output.str();
561  
        csvfile.close();
562  
    }
563  
  }
564  
565  
/* The following code is from:
566  
   http://stackoverflow.com/questions/7706339/grayscale-to-red-green-blue-matlab-jet-color-scale */
567  
class node_color {
568  
public:
569  
    double red;
570  
    double green;
571  
    double blue;
572  
    node_color() : red(1.0), green(1.0), blue(1.0) {}
573  
    int convert(double in) { return (int)(in * 255.0); }
574  
} ;
575  
576  
node_color * get_node_color(double v,double vmin,double vmax)
577  
{
578  
   node_color * c = new node_color();
579  
   double dv;
580  
581  
   if (v < vmin)
582  
      v = vmin;
583  
   if (v > vmax)
584  
      v = vmax;
585  
   dv = vmax - vmin;
586  
587  
   if (v < (vmin + 0.25 * dv)) {
588  
      c->red = 0;
589  
      c->green = 4 * (v - vmin) / dv;
590  
   } else if (v < (vmin + 0.5 * dv)) {
591  
      c->red = 0;
592  
      c->blue = 1 + 4 * (vmin + 0.25 * dv - v) / dv;
593  
   } else if (v < (vmin + 0.75 * dv)) {
594  
      c->red = 4 * (v - vmin - 0.5 * dv) / dv;
595  
      c->blue = 0;
596  
   } else {
597  
      c->green = 1 + 4 * (vmin + 0.75 * dv - v) / dv;
598  
      c->blue = 0;
599  
   }
600  
601  
   return(c);
602  
}
603  
604  
  void profiler_listener::write_taskgraph(void) {
605  
    ofstream myfile;
606  
    stringstream dotname;
607  
    dotname << "taskgraph." << node_id << ".dot";
608  
    myfile.open(dotname.str().c_str());
609  
610  
    myfile << "digraph prof {\n rankdir=\"LR\";\n node [shape=box];\n";
611  
    for(auto dep = task_dependencies.begin(); dep != task_dependencies.end(); dep++) {
612  
        task_identifier parent = dep->first;
613  
        auto children = dep->second;
614  
        string parent_name = parent.get_name();
615  
        for(auto offspring = children->begin(); offspring != children->end(); offspring++) {
616  
            task_identifier child = offspring->first;
617  
            int count = offspring->second;
618  
            string child_name = child.get_name();
619  
            myfile << "  \"" << parent_name << "\" -> \"" << child_name << "\"";
620  
            myfile << " [ label=\"  count: " << count << "\" ]; " << std::endl;
621  
622  
        }
623  
    }
624  
625  
    // our TOTAL available time is the elapsed * the number of threads, or cores
626  
	int num_worker_threads = thread_instance::get_num_threads();
627  
#ifdef APEX_HAVE_HPX
628  
    num_worker_threads = num_worker_threads - num_non_worker_threads_registered;
629  
#endif
630  
    double total_main = main_timer->elapsed() *
631  
        fmin(hardware_concurrency(), num_worker_threads);
632  
633  
    // output nodes with  "main" [shape=box; style=filled; fillcolor="#ff0000" ];
634  
    unordered_map<task_identifier, profile*>::const_iterator it;
635  
    std::unique_lock<std::mutex> task_map_lock(_task_map_mutex);
636  
    for(it = task_map.begin(); it != task_map.end(); it++) {
637  
      profile * p = it->second;
638  
      if (p->get_type() == APEX_TIMER) {
639  
        node_color * c = get_node_color((p->get_accumulated()*profiler::get_cpu_mhz()), 0.0, total_main);
640  
        task_identifier task_id = it->first;
641  
        myfile << "  \"" << task_id.get_name() << "\" [shape=box; style=filled; fillcolor=\"#" <<
642  
            setfill('0') << setw(2) << hex << c->convert(c->red) <<
643  
            setfill('0') << setw(2) << hex << c->convert(c->green) <<
644  
            setfill('0') << setw(2) << hex << c->convert(c->blue) << "\"" <<
645  
            "; label=\"" << task_id.get_name() << ":\\n" << (p->get_accumulated()*profiler::get_cpu_mhz()) << "s\" ];" << std::endl;
646  
      }
647  
    }
648  
    myfile << "}\n";
649  
    myfile.close();
650  
  }
651  
652  
  /* When writing a TAU profile, write out a timer line */
653  
  void format_line(ofstream &myfile, profile * p) {
654  
    myfile << p->get_calls() << " ";
655  
    myfile << 0 << " ";
656  
    myfile << ((p->get_accumulated()*profiler::get_cpu_mhz())) << " ";
657  
    myfile << ((p->get_accumulated()*profiler::get_cpu_mhz())) << " ";
658  
    myfile << 0 << " ";
659  
    myfile << "GROUP=\"TAU_USER\" ";
660  
    myfile << endl;
661  
  }
662  
663  
  /* When writing a TAU profile, write out the main timer line */
664  
  void format_line(ofstream &myfile, profile * p, double not_main) {
665  
    myfile << p->get_calls() << " ";
666  
    myfile << 0 << " ";
667  
    myfile << (max(((p->get_accumulated()*profiler::get_cpu_mhz()) - not_main),0.0)) << " ";
668  
    myfile << ((p->get_accumulated()*profiler::get_cpu_mhz())) << " ";
669  
    myfile << 0 << " ";
670  
    myfile << "GROUP=\"TAU_USER\" ";
671  
    myfile << endl;
672  
  }
673  
674  
  /* When writing a TAU profile, write out a counter line */
675  
  void format_counter_line(ofstream &myfile, profile * p) {
676  
    myfile << p->get_calls() << " ";       // numevents
677  
    myfile << p->get_maximum() << " ";     // max
678  
    myfile << p->get_minimum() << " ";     // min
679  
    myfile << p->get_mean() << " ";        // mean
680  
    myfile << p->get_sum_squares() << " ";
681  
    myfile << endl;
682  
  }
683  
684  
  /* Write TAU profiles from the collected data. */
685  
  void profiler_listener::write_profile() {
686  
    ofstream myfile;
687  
    stringstream datname;
688  
    // name format: profile.nodeid.contextid.threadid
689  
    // We only write one profile per process
690  
    datname << "profile." << node_id << ".0.0";
691  
692  
    // name format: profile.nodeid.contextid.threadid
693  
    myfile.open(datname.str().c_str());
694  
    int counter_events = 0;
695  
696  
    // Determine number of counter events, as these need to be
697  
    // excluded from the number of normal timers
698  
    unordered_map<task_identifier, profile*>::const_iterator it2;
699  
    std::unique_lock<std::mutex> task_map_lock(_task_map_mutex);
700  
    for(it2 = task_map.begin(); it2 != task_map.end(); it2++) {
701  
      profile * p = it2->second;
702  
      if(p->get_type() == APEX_COUNTER) {
703  
        counter_events++;
704  
      }
705  
    }
706  
    int function_count = task_map.size() - counter_events;
707  
708  
    // Print the normal timers to the profile file
709  
    // 1504 templated_functions_MULTI_TIME
710  
    myfile << function_count << " templated_functions_MULTI_TIME" << endl;
711  
    // # Name Calls Subrs Excl Incl ProfileCalls #
712  
    myfile << "# Name Calls Subrs Excl Incl ProfileCalls #" << endl;
713  
    thread_instance ti = thread_instance::instance();
714  
715  
    // Iterate over the profiles which are associated to a function
716  
    // by name. Only output the regular timers now. Counters are
717  
    // in a separate section, below.
718  
    profile * mainp = nullptr;
719  
    double not_main = 0.0;
720  
    for(it2 = task_map.begin(); it2 != task_map.end(); it2++) {
721  
      profile * p = it2->second;
722  
	  task_identifier task_id = it2->first;
723  
      if(p->get_type() == APEX_TIMER) {
724  
        string action_name = task_id.get_name();
725  
        if(strcmp(action_name.c_str(), APEX_MAIN) == 0) {
726  
          mainp = p;
727  
        } else {
728  
          myfile << "\"" << action_name << "\" ";
729  
          format_line (myfile, p);
730  
          not_main += (p->get_accumulated()*profiler::get_cpu_mhz());
731  
        }
732  
      }
733  
    }
734  
    if (mainp != nullptr) {
735  
      myfile << "\"" << APEX_MAIN << "\" ";
736  
      format_line (myfile, mainp, not_main);
737  
    }
738  
739  
    // 0 aggregates
740  
    myfile << "0 aggregates" << endl;
741  
742  
    // Now process the counters, if there are any.
743  
    if(counter_events > 0) {
744  
      myfile << counter_events << " userevents" << endl;
745  
      myfile << "# eventname numevents max min mean sumsqr" << endl;
746  
      for(it2 = task_map.begin(); it2 != task_map.end(); it2++) {
747  
        profile * p = it2->second;
748  
        if(p->get_type() == APEX_COUNTER) {
749  
          task_identifier task_id = it2->first;
750  
          myfile << "\"" << task_id.get_name() << "\" ";
751  
          format_counter_line (myfile, p);
752  
        }
753  
      }
754  
    }
755  
    myfile.close();
756  
  }
757  
758  
  /*
759  
   * The main function for the consumer thread has to be static, but
760  
   * the processing needs access to member variables, so get the
761  
   * profiler_listener instance, and call it's proper function.
762  
   *
763  
   * This is a wrapper, so that we can launch the thread and set
764  
   * affinity. However, process_profiles_wrapper is also used by the
765  
   * last worker that calls apex_finalize(), so we don't want to change
766  
   * that thread's affinity. So this wrapper is only for the consumer
767  
   * thread.
768  
   */
769  
  void profiler_listener::consumer_process_profiles_wrapper(void) {
770  
      if (apex_options::pin_apex_threads()) {
771  
      	  set_thread_affinity();
772  
	  }
773  
      process_profiles_wrapper();
774  
  }
775  
776  
  /*
777  
   * The main function for the consumer thread has to be static, but
778  
   * the processing needs access to member variables, so get the
779  
   * profiler_listener instance, and call it's proper function.
780  
   */
781  
  void profiler_listener::process_profiles_wrapper(void) {
782  
      apex * inst = apex::instance();
783  
      if (inst != nullptr) {
784  
          profiler_listener * pl = inst->the_profiler_listener;
785  
          if (pl != nullptr) {
786  
#ifdef APEX_TRACE_APEX
787  
      profiler * p = start((apex_function_address)&profiler_listener::process_profiles_wrapper);
788  
              pl->process_profiles();
789  
      stop(p);
790  
#else
791  
              pl->process_profiles();
792  
#endif
793  
          }
794  
      }
795  
  }
796  
797  
  bool profiler_listener::concurrent_cleanup(void){
798  
      std::shared_ptr<profiler> p;
799  
#ifdef APEX_MULTIPLE_QUEUES
800  
      std::unique_lock<std::mutex> queue_lock(queue_mtx);
801  
	  std::vector<profiler_queue_t*>::const_iterator a_queue;
802  
	  for (a_queue = allqueues.begin() ; a_queue != allqueues.end() ; ++a_queue) {
803  
	    thequeue = *a_queue;
804  
      	while(thequeue->try_dequeue(p)) {
805  
        	process_profile(p,0);
806  
      	}
807  
	  }
808  
#else
809  
      while(thequeue.try_dequeue(p)) {
810  
       	process_profile(p,0);
811  
      }
812  
#endif
813  
      return true;
814  
  }
815  
816  
  /* This is the main function for the consumer thread.
817  
   * It will wait at a semaphore for pending work. When there is
818  
   * work on one or more queues, it will iterate over the queues
819  
   * and process the pending profiler objects, updating the profiles
820  
   * as it goes. */
821  
  void profiler_listener::process_profiles(void)
822  
  {
823  
    if (!_initialized) {
824  
      initialize_worker_thread_for_TAU();
825  
      _initialized = true;
826  
    }
827  
#ifdef APEX_HAVE_TAU
828  
    if (apex_options::use_tau()) {
829  
      TAU_START("profiler_listener::process_profiles");
830  
    }
831  
#endif
832  
833  
    std::shared_ptr<profiler> p;
834  
    task_dependency* td;
835  
    // Main loop. Stay in this loop unless "done".
836  
#ifndef APEX_HAVE_HPX
837  
    while (!_done) {
838  
      queue_signal.wait();
839  
#endif
840  
#ifdef APEX_HAVE_TAU
841  
      /*
842  
    if (apex_options::use_tau()) {
843  
      TAU_START("profiler_listener::process_profiles: main loop");
844  
    }
845  
    */
846  
#endif
847  
#ifdef APEX_MULTIPLE_QUEUES
848  
	  std::vector<profiler_queue_t*>::const_iterator a_queue;
849  
      std::unique_lock<std::mutex> queue_lock(queue_mtx);
850  
	  int i = 0;
851  
	  for (a_queue = allqueues.begin() ; a_queue != allqueues.end() ; ++a_queue) {
852  
	    thequeue = *a_queue;
853  
        while(!_done && thequeue->try_dequeue(p)) {
854  
          process_profile(p, 0);
855  
#ifdef APEX_HAVE_HPX // don't hang out in this task too long.
856  
		  if (++i > 1000) break;
857  
#endif
858  
        }
859  
	  }
860  
#else
861  
        while(!_done && thequeue.try_dequeue(p)) {
862  
             process_profile(p, 0);
863  
        }
864  
#endif
865  
      if (apex_options::use_taskgraph_output()) {
866  
        while(!_done && dependency_queue.try_dequeue(td)) {
867  
          process_dependency(td);
868  
        }
869  
      }
870  
      /*
871  
       * I want to process the tasks concurrently, but this loop
872  
       * is too much overhead. Maybe dequeue them in batches?
873  
       */
874  
      /*
875  
      std::vector<std::future<void>> pending_futures;
876  
      while(!_done && thequeue.try_dequeue(p)) {
877  
          auto f = std::async(my_stupid_wrapper,p);
878  
          // transfer the future's shared state to a longer-lived future
879  
          pending_futures.push_back(std::move(f));
880  
      }
881  
      for (auto iter = pending_futures.begin() ; iter < pending_futures.end() ; iter++ ) {
882  
          iter->get();
883  
      }
884  
      */
885  
886  
#ifdef APEX_HAVE_TAU
887  
      /*
888  
      if (apex_options::use_tau()) {
889  
        TAU_STOP("profiler_listener::process_profiles: main loop");
890  
      }
891  
      */
892  
#endif
893  
#ifndef APEX_HAVE_HPX
894  
    }
895  
896  
    if (apex_options::use_taskgraph_output()) {
897  
      // process the task dependencies
898  
      while(dependency_queue.try_dequeue(td)) {
899  
        process_dependency(td);
900  
      }
901  
    }
902  
903  
#endif // NOT DEFINED APEX_HAVE_HPX
904  
905  
#ifdef APEX_HAVE_HPX
906  
    consumer_task_running.clear(memory_order_release);
907  
#endif
908  
909  
#ifdef APEX_HAVE_TAU
910  
    if (apex_options::use_tau()) {
911  
      TAU_STOP("profiler_listener::process_profiles");
912  
    }
913  
#endif
914  
  }
915  
916  
#if APEX_HAVE_PAPI
917  
APEX_NATIVE_TLS int EventSet = PAPI_NULL;
918  
enum papi_state { papi_running, papi_suspended };
919  
APEX_NATIVE_TLS papi_state thread_papi_state = papi_suspended;
920  
#define PAPI_ERROR_CHECK(name) \
921  
if (rc != 0) cout << "name: " << rc << ": " << PAPI_strerror(rc) << endl;
922  
923  
  void profiler_listener::initialize_PAPI(bool first_time) {
924  
      int rc = 0;
925  
      if (first_time) {
926  
        PAPI_library_init( PAPI_VER_CURRENT );
927  
        //rc = PAPI_multiplex_init(); // use more counters than allowed
928  
        //PAPI_ERROR_CHECK(PAPI_multiplex_init);
929  
        PAPI_thread_init( thread_instance::get_id );
930  
        // default
931  
        //rc = PAPI_set_domain(PAPI_DOM_ALL);
932  
        //PAPI_ERROR_CHECK(PAPI_set_domain);
933  
      } else {
934  
        PAPI_register_thread();
935  
      }
936  
      rc = PAPI_create_eventset(&EventSet);
937  
      PAPI_ERROR_CHECK(PAPI_create_eventset);
938  
      // default
939  
      //rc = PAPI_assign_eventset_component (EventSet, 0);
940  
      //PAPI_ERROR_CHECK(PAPI_assign_eventset_component);
941  
      // default
942  
      //rc = PAPI_set_granularity(PAPI_GRN_THR);
943  
      //PAPI_ERROR_CHECK(PAPI_set_granularity);
944  
      // unnecessary complexity
945  
      //rc = PAPI_set_multiplex(EventSet);
946  
      //PAPI_ERROR_CHECK(PAPI_set_multiplex);
947  
      // parse the requested set of papi counters
948  
      // The string is modified by strtok, so copy it.
949  
      if (strlen(apex_options::papi_metrics()) > 0) {
950  
        std::stringstream tmpstr(apex_options::papi_metrics());
951  
        // use stream iterators to copy the stream to the vector as whitespace separated strings
952  
        std::istream_iterator<std::string> tmpstr_it(tmpstr);
953  
        std::istream_iterator<std::string> tmpstr_end;
954  
        std::vector<std::string> tmpstr_results(tmpstr_it, tmpstr_end);
955  
        int code;
956  
        // iterate over the counter names in the vector
957  
        for (auto p : tmpstr_results) {
958  
          int rc = PAPI_event_name_to_code(const_cast<char*>(p.c_str()), &code);
959  
          if (PAPI_query_event (code) == PAPI_OK) {
960  
            rc = PAPI_add_event(EventSet, code);
961  
            PAPI_ERROR_CHECK(PAPI_add_event);
962  
            if (rc != 0) { printf ("Event that failed: %s\n", p.c_str()); }
963  
            if (first_time) {
964  
              metric_names.push_back(string(p.c_str()));
965  
              num_papi_counters++;
966  
            }
967  
          }
968  
        }
969  
        if (!apex_options::papi_suspend()) {
970  
            rc = PAPI_start( EventSet );
971  
            PAPI_ERROR_CHECK(PAPI_start);
972  
            thread_papi_state = papi_running;
973  
        }
974  
      }
975  
  }
976  
977  
#endif
978  
979  
  /* When APEX gets a STARTUP event, do some initialization. */
980  
  void profiler_listener::on_startup(startup_event_data &data) {
981  
    if (!_done) {
982  
      my_tid = (unsigned int)thread_instance::get_id();
983  
#ifndef APEX_HAVE_HPX
984  
      // Start the consumer thread, to process profiler objects.
985  
      consumer_thread = new std::thread(consumer_process_profiles_wrapper);
986  
#endif
987  
988  
#if APEX_HAVE_PAPI
989  
      initialize_PAPI(true);
990  
      event_sets[0] = EventSet;
991  
#endif
992  
993  
      /* This commented out code is to change the priority of the consumer thread.
994  
       * IDEALLY, I would like to make this a low priority thread, but that is as
995  
       * yet unsuccessful. */
996  
#if 0
997  
      int retcode;
998  
      int policy;
999  
1000  
      pthread_t threadID = (pthread_t) consumer_thread->native_handle();
1001  
1002  
      struct sched_param param;
1003  
1004  
      if ((retcode = pthread_getschedparam(threadID, &policy, &param)) != 0)
1005  
      {
1006  
        errno = retcode;
1007  
        perror("pthread_getschedparam");
1008  
        exit(EXIT_FAILURE);
1009  
      }
1010  
      std::cout << "INHERITED: ";
1011  
      std::cout << "policy=" << ((policy == SCHED_FIFO)  ? "SCHED_FIFO" :
1012  
          (policy == SCHED_RR)    ? "SCHED_RR" :
1013  
          (policy == SCHED_OTHER) ? "SCHED_OTHER" :
1014  
          "???")
1015  
        << ", priority=" << param.sched_priority << " of " << sched_get_priority_min(policy) << "," << sched_get_priority_max(policy)  << std::endl;
1016  
      //param.sched_priority = 10;
1017  
      if ((retcode = pthread_setschedparam(threadID, policy, &param)) != 0)
1018  
      {
1019  
        errno = retcode;
1020  
        perror("pthread_setschedparam");
1021  
        exit(EXIT_FAILURE);
1022  
      }
1023  
#endif
1024  
1025  
      // time the whole application.
1026  
      main_timer = std::make_shared<profiler>(new task_identifier(string(APEX_MAIN)));
1027  
#if APEX_HAVE_PAPI
1028  
      if (num_papi_counters > 0 && !apex_options::papi_suspend() && thread_papi_state == papi_running) {
1029  
        int rc = PAPI_read( EventSet, main_timer->papi_start_values );
1030  
        PAPI_ERROR_CHECK(PAPI_read);
1031  
      }
1032  
#endif
1033  
    }
1034  
    APEX_UNUSED(data);
1035  
  }
1036  
1037  
  /* On the shutdown event, notify the consumer thread that we are done
1038  
   * and set the "terminate" flag. */
1039  
  void profiler_listener::on_shutdown(shutdown_event_data &data) {
1040  
    if (_done) { return; }
1041  
    if (!_done) {
1042  
      _done = true;
1043  
      node_id = data.node_id;
1044  
      //sleep(1);
1045  
#ifndef APEX_HAVE_HPX
1046  
      queue_signal.post();
1047  
      if (consumer_thread != nullptr) {
1048  
          consumer_thread->join();
1049  
      }
1050  
#endif
1051  
1052  
      // stop the main timer, and process that profile?
1053  
      main_timer->stop();
1054  
#if APEX_HAVE_PAPI
1055  
      if (num_papi_counters > 0 && !apex_options::papi_suspend() && thread_papi_state == papi_running) {
1056  
        int rc = PAPI_read( EventSet, main_timer->papi_stop_values );
1057  
        PAPI_ERROR_CHECK(PAPI_read);
1058  
      }
1059  
#endif
1060  
      // if this profile is processed, it will get deleted. so don't process it!
1061  
      // It also clutters up the final profile, if generated.
1062  
      //process_profile(main_timer.get(), my_tid);
1063  
1064  
      // output to screen?
1065  
      if ((apex_options::use_screen_output() ||
1066  
           apex_options::use_csv_output()) && node_id == 0)
1067  
      {
1068  
#ifdef APEX_MULTIPLE_QUEUES
1069  
        size_t ignored = 0;
1070  
        std::unique_lock<std::mutex> queue_lock(queue_mtx);
1071  
	    std::vector<profiler_queue_t*>::const_iterator a_queue;
1072  
	    for (a_queue = allqueues.begin() ; a_queue != allqueues.end() ; ++a_queue) {
1073  
	      thequeue = *a_queue;
1074  
          ignored += thequeue->size_approx();
1075  
		}
1076  
#else
1077  
        size_t ignored = thequeue.size_approx();
1078  
#endif
1079  
        if (ignored > 0) {
1080  
          std::cerr << "Info: " << ignored << " items remaining on on the profiler_listener queue...";
1081  
        }
1082  
#ifndef APEX_HAVE_HPX
1083  
        // We might be done, but check to make sure the queue is empty
1084  
        std::vector<std::future<bool>> pending_futures;
1085  
        for (unsigned int i=0; i<hardware_concurrency(); ++i) {
1086  
#ifdef APEX_STATIC
1087  
            /* Static libC++ doesn't do async very well. In fact, it crashes. */
1088  
            auto f = std::async(&profiler_listener::concurrent_cleanup,this);
1089  
#else // APEX_STATIC
1090  
            auto f = std::async(std::launch::async,&profiler_listener::concurrent_cleanup,this);
1091  
#endif // APEX_STATIC
1092  
            // transfer the future's shared state to a longer-lived future
1093  
            pending_futures.push_back(std::move(f));
1094  
        }
1095  
        for (auto iter = pending_futures.begin() ; iter < pending_futures.end() ; iter++ ) {
1096  
            iter->get();
1097  
        }
1098  
#endif // APEX_HAVE_HPX
1099  
        if (ignored > 0) {
1100  
          std::cerr << "done." << std::endl;
1101  
        }
1102  
        if (apex_options::use_screen_output() || apex_options::use_csv_output()) {
1103  
            finalize_profiles();
1104  
        }
1105  
      }
1106  
      if (apex_options::use_taskgraph_output() && node_id == 0)
1107  
      {
1108  
        write_taskgraph();
1109  
      }
1110  
1111  
      // output to 1 TAU profile per process?
1112  
      if (apex_options::use_profile_output() && !apex_options::use_tau()) {
1113  
        write_profile();
1114  
      }
1115  
#if !defined(_MSC_VER)
1116  
      if (apex_options::task_scatterplot()) {
1117  
          // get the length of the stream
1118  
          int loc0 = task_scatterplot_samples.tellp();
1119  
          // lock access to the file
1120  
          // write using low-level file locking!
1121  
          struct flock fl;
1122  
          fl.l_type   = F_WRLCK;  /* F_RDLCK, F_WRLCK, F_UNLCK    */
1123  
          fl.l_whence = SEEK_SET; /* SEEK_SET, SEEK_CUR, SEEK_END */
1124  
          fl.l_start  = 0;        /* Offset from l_whence         */
1125  
          fl.l_len    = 0;        /* length, 0 = to EOF           */
1126  
          fl.l_pid    = getpid();      /* our PID                      */
1127  
          fcntl(task_scatterplot_sample_file, F_SETLKW, &fl);  /* F_GETLK, F_SETLK, F_SETLKW */
1128  
          // flush the string stream to the file
1129  
          //lseek(task_scatterplot_sample_file, 0, SEEK_END);
1130  
          ssize_t bytes_written = write(task_scatterplot_sample_file,
1131  
                task_scatterplot_samples.str().c_str(), loc0);
1132  
          if (bytes_written < 0) {
1133  
              int errsv = errno;
1134  
              perror("Error writing to scatterplot!");
1135  
              fprintf(stderr, "Error writing scatterplot:\n%s\n",
1136  
                      strerror(errsv));
1137  
          }
1138  
          fl.l_type   = F_UNLCK;   /* tell it to unlock the region */
1139  
          fcntl(task_scatterplot_sample_file, F_SETLK, &fl); /* set the region to unlocked */
1140  
          close(task_scatterplot_sample_file);
1141  
      }
1142  
#endif
1143  
1144  
    }
1145  
    /* The cleanup is disabled for now. Why? Because we want to be able
1146  
     * to access the profiles at the end of the run, after APEX has
1147  
     * finalized. */
1148  
    // cleanup.
1149  
    // delete_profiles();
1150  
  }
1151  
1152  
  /* When a new node is created */
1153  
  void profiler_listener::on_new_node(node_event_data &data) {
1154  
    if (!_done) {
1155  
    }
1156  
    APEX_UNUSED(data);
1157  
  }
1158  
1159  
  /* When a new thread is registered, expand all of our storage as necessary
1160  
   * to handle the new thread */
1161  
  void profiler_listener::on_new_thread(new_thread_event_data &data) {
1162  
    if (!_done) {
1163  
      my_tid = (unsigned int)thread_instance::get_id();
1164  
	  async_thread_setup();
1165  
#if APEX_HAVE_PAPI
1166  
      initialize_PAPI(false);
1167  
      event_set_mutex.lock();
1168  
      if (my_tid >= event_sets.size()) {
1169  
        if (my_tid >= event_sets.size()) {
1170  
          event_sets.resize(my_tid + 1);
1171  
        }
1172  
      }
1173  
      event_sets[my_tid] = EventSet;
1174  
      event_set_mutex.unlock();
1175  
#endif
1176  
    }
1177  
    APEX_UNUSED(data);
1178  
  }
1179  
1180  
  extern "C" int main (int, char**);
1181  
1182  
  /* When a start event happens, create a profiler object. Unless this
1183  
   * named event is throttled, in which case do nothing, as quickly as possible */
1184  
  inline bool profiler_listener::_common_start(task_identifier * id, bool is_resume) {
1185  
    if (!_done) {
1186  
#if defined(APEX_THROTTLE)
1187  
      // if this timer is throttled, return without doing anything
1188  
      unordered_set<task_identifier>::const_iterator it;
1189  
	  {
1190  
      	read_lock_type l(throttled_event_set_mutex);
1191  
        it = throttled_tasks.find(*id);
1192  
      }
1193  
      if (it != throttled_tasks.end()) {
1194  
          /*
1195  
           * The throw is removed, because it is a performance penalty on some systems
1196  
           * on_start now returns a boolean
1197  
           */
1198  
        //throw disabled_profiler_exception(); // to be caught by apex::start/resume
1199  
        return false;
1200  
      }
1201  
#endif
1202  
      // start the profiler object, which starts our timers
1203  
      //std::shared_ptr<profiler> p = std::make_shared<profiler>(id, is_resume);
1204  
      profiler * p = new profiler(id, is_resume);
1205  
      thread_instance::instance().set_current_profiler(p);
1206  
#if APEX_HAVE_PAPI
1207  
      if (num_papi_counters > 0 && !apex_options::papi_suspend()) {
1208  
          // if papi was previously suspended, we need to start the counters
1209  
          if (thread_papi_state == papi_suspended) {
1210  
            int rc = PAPI_start( EventSet );
1211  
            PAPI_ERROR_CHECK(PAPI_start);
1212  
            thread_papi_state = papi_running;
1213  
          }
1214  
          int rc = PAPI_read( EventSet, p->papi_start_values );
1215  
          PAPI_ERROR_CHECK(PAPI_read);
1216  
      } else {
1217  
          // if papi is still running, stop the counters
1218  
          if (thread_papi_state == papi_running) {
1219  
            long long dummy[8];
1220  
            int rc = PAPI_stop( EventSet, dummy );
1221  
            PAPI_ERROR_CHECK(PAPI_stop);
1222  
            thread_papi_state = papi_suspended;
1223  
          }
1224  
      }
1225  
#endif
1226  
    } else {
1227  
        return false;
1228  
    }
1229  
    return true;
1230  
  }
1231  
1232  
  inline void profiler_listener::push_profiler(int my_tid, std::shared_ptr<profiler> &p) {
1233  
  	  // if we aren't processing profiler objects, just return.
1234  
  	  if (!apex_options::process_async_state()) { return; }
1235  
#ifdef APEX_TRACE_APEX
1236  
  	  if (p->task_id->address == (uint64_t)&profiler_listener::process_profiles_wrapper) { return; }
1237  
#endif
1238  
      // we have to make a local copy, because lockfree queues DO NOT SUPPORT shared_ptrs!
1239  
#ifdef APEX_MULTIPLE_QUEUES
1240  
      thequeue->enqueue(p);
1241  
#else
1242  
      thequeue.enqueue(p);
1243  
#endif
1244  
	  /*
1245  
      bool worked = thequeue.enqueue(p);
1246  
      if (!worked) {
1247  
          static std::atomic<bool> issued(false);
1248  
          if (!issued) {
1249  
              issued = true;
1250  
              cout << "APEX Warning : failed to push " << p->task_id->get_name() << endl;
1251  
              cout << "One or more frequently-called, lightweight functions is being timed." << endl;
1252  
          }
1253  
      }
1254  
	  */
1255  
#ifndef APEX_HAVE_HPX
1256  
      queue_signal.post();
1257  
#endif
1258  
#ifdef APEX_HAVE_HPX
1259  
      apex_schedule_process_profiles();
1260  
#endif
1261  
  }
1262  
1263  
  /* Stop the timer, if applicable, and queue the profiler object */
1264  
  inline void profiler_listener::_common_stop(std::shared_ptr<profiler> &p, bool is_yield) {
1265  
    if (!_done) {
1266  
      if (p) {
1267  
        p->stop(is_yield);
1268  
#if APEX_HAVE_PAPI
1269  
        if (num_papi_counters > 0 && !apex_options::papi_suspend() && thread_papi_state == papi_running) {
1270  
            int rc = PAPI_read( EventSet, p->papi_stop_values );
1271  
            PAPI_ERROR_CHECK(PAPI_read);
1272  
        }
1273  
#endif
1274  
        // Why is this happening now?  Why not at start? Why not at create?
1275  
        /*
1276  
        if (apex_options::use_taskgraph_output()) {
1277  
          if (!p->is_resume) {
1278  
            // get the PARENT profiler
1279  
            profiler * parent_profiler = nullptr;
1280  
            try {
1281  
              parent_profiler = thread_instance::instance().get_parent_profiler();
1282  
              if (parent_profiler != NULL) {
1283  
                task_identifier * parent = parent_profiler->task_id;
1284  
                task_identifier * child = p->task_id;
1285  
                dependency_queue.enqueue(new task_dependency(parent, child));
1286  
              }
1287  
            } catch (empty_stack_exception& e) { }
1288  
          }
1289  
        }
1290  
        */
1291  
        push_profiler(my_tid, p);
1292  
      }
1293  
    }
1294  
  }
1295  
1296  
  /* Start the timer */
1297  
  bool profiler_listener::on_start(task_identifier * id) {
1298  
    return _common_start(id, false);
1299  
  }
1300  
1301  
  /* This is just like starting a timer, but don't increment the number of calls
1302  
   * value. That is because we are restarting an existing timer. */
1303  
  bool profiler_listener::on_resume(task_identifier * id) {
1304  
    return _common_start(id, true);
1305  
  }
1306  
1307  
   /* Stop the timer */
1308  
  void profiler_listener::on_stop(std::shared_ptr<profiler> &p) {
1309  
    _common_stop(p, p->is_resume); // don't change the yield/resume value!
1310  
  }
1311  
1312  
  /* Stop the timer, but don't increment the number of calls */
1313  
  void profiler_listener::on_yield(std::shared_ptr<profiler> &p) {
1314  
    _common_stop(p, true);
1315  
  }
1316  
1317  
  /* When a thread exits, pop and stop all timers. */
1318  
  void profiler_listener::on_exit_thread(event_data &data) {
1319  
    APEX_UNUSED(data);
1320  
  }
1321  
1322  
  /* When an asynchronous thread is launched, they should
1323  
   * call apex::async_thread_setup() which will end up here.*/
1324  
  void profiler_listener::async_thread_setup(void) {
1325  
#ifdef APEX_MULTIPLE_QUEUES
1326  
	  // for asynchronous threads, check to make sure there is a queue!
1327  
	  if (thequeue == nullptr) {
1328  
	  	thequeue = new profiler_queue_t();
1329  
	  	{
1330  
        	std::unique_lock<std::mutex> queue_lock(queue_mtx);
1331  
	    	allqueues.push_back(thequeue);
1332  
	  	}
1333  
	  }
1334  
#endif
1335  
  }
1336  
1337  
  /* When a sample value is processed, save it as a profiler object, and queue it. */
1338  
  void profiler_listener::on_sample_value(sample_value_event_data &data) {
1339  
    if (!_done) {
1340  
      std::shared_ptr<profiler> p = std::make_shared<profiler>(new task_identifier(*data.counter_name), data.counter_value);
1341  
      p->is_counter = data.is_counter;
1342  
      push_profiler(my_tid, p);
1343  
    }
1344  
  }
1345  
1346  
  void profiler_listener::on_new_task(task_identifier * id, uint64_t task_id) {
1347  
    //cout << "New task: " << task_id << endl;
1348  
    if (!apex_options::use_taskgraph_output()) { return; }
1349  
    // get the current profiler
1350  
    profiler * p = thread_instance::instance().get_current_profiler();
1351  
    if (p != NULL) {
1352  
        dependency_queue.enqueue(new task_dependency(p->task_id, id));
1353  
    } else {
1354  
        task_identifier * parent = new task_identifier(string("__start"));
1355  
        dependency_queue.enqueue(new task_dependency(parent, id));
1356  
    }
1357  
  }
1358  
1359  
  /* Communication send event. Save the number of bytes. */
1360  
  void profiler_listener::on_send(message_event_data &data) {
1361  
    if (!_done) {
1362  
      std::shared_ptr<profiler> p = std::make_shared<profiler>(new task_identifier("Bytes Sent"), (double)data.size);
1363  
      push_profiler(0, p);
1364  
    }
1365  
  }
1366  
1367  
  /* Communication recv event. Save the number of bytes. */
1368  
  void profiler_listener::on_recv(message_event_data &data) {
1369  
    if (!_done) {
1370  
      std::shared_ptr<profiler> p = std::make_shared<profiler>(new task_identifier("Bytes Received"), (double)data.size);
1371  
      push_profiler(0, p);
1372  
    }
1373  
  }
1374  
1375  
  /* For periodic stuff. Do something? */
1376  
  void profiler_listener::on_periodic(periodic_event_data &data) {
1377  
    if (!_done) {
1378  
    }
1379  
    APEX_UNUSED(data);
1380  
  }
1381  
1382  
  /* For custom event stuff. Do something? */
1383  
  void profiler_listener::on_custom_event(custom_event_data &data) {
1384  
    if (!_done) {
1385  
    }
1386  
    APEX_UNUSED(data);
1387  
  }
1388  
1389  
  void profiler_listener::reset(task_identifier * id) {
1390  
    std::shared_ptr<profiler> p;
1391  
    p = std::make_shared<profiler>(id, false, reset_type::CURRENT);
1392  
    push_profiler(my_tid, p);
1393  
  }
1394  
1395  
  profiler_listener::~profiler_listener (void) {
1396  
      _done = true; // yikes!
1397  
      finalize();
1398  
      delete_profiles();
1399  
#ifndef APEX_HAVE_HPX
1400  
      delete consumer_thread;
1401  
#endif
1402  
  };
1403  
1404  
}
1405  
1406  
#ifdef APEX_HAVE_HPX
1407  
HPX_DECLARE_ACTION(::apex::profiler_listener::process_profiles_wrapper, apex_internal_process_profiles_action);
1408  
HPX_ACTION_HAS_CRITICAL_PRIORITY(apex_internal_process_profiles_action);
1409  
HPX_PLAIN_ACTION(::apex::profiler_listener::process_profiles_wrapper, apex_internal_process_profiles_action);
1410  
1411  
void apex_schedule_process_profiles() {
1412  
    if(get_hpx_runtime_ptr() == nullptr) return;
1413  
    if(hpx_shutdown) {
1414  
        ::apex::profiler_listener::process_profiles_wrapper();
1415  
    } else {
1416  
		if(!consumer_task_running.test_and_set(memory_order_acq_rel)) {
1417  
        	apex_internal_process_profiles_action act;
1418  
        	try {
1419  
           		hpx::apply(act, hpx::find_here());
1420  
        	} catch(...) {
1421  
           		// During shutdown, we can't schedule a new task,
1422  
           		// so we process profiles ourselves.
1423  
           		profiler_listener::process_profiles_wrapper();
1424  
			}
1425  
		}
1426  
    }
1427  
}
1428  
1429  
#endif
1430  
1431  
1432  

Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.