// define these protocols only if monitoring is enabled. Otherwise, // TauMon.h will define default empty functions and we do not want // them interfering with one another. #ifdef TAU_MONITORING #include "Profile/TauMon.h" #include "Profile/TauMonMrnet.h" // for now, TAU_MONITORING will need to rely on TAU_EXP_UNIFY. This will // not be the case once event unification is implemented on every // available monitoring transport. #ifdef TAU_EXP_UNIFY #include #include #include #include #include #include #include #include #include #include "mrnet/MRNet.h" using namespace MRN; using namespace std; // Back-end rank int rank; // Using a global for now. Could make it object-based like Aroon's old // codes later. Network *net; Stream *ctrl_stream; // Determine whether to extend protocol to receive results from the FE. bool broadcastResults; // Unification structures FunctionEventLister *mrnetFuncEventLister; Tau_unify_object_t *mrnetFuncUnifier; AtomicEventLister *mrnetAtomEventLister; Tau_unify_object_t *mrnetAtomUnifier; extern "C" void Tau_mon_connect() { int size; MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); char myHostName[64]; gethostname(myHostName, sizeof(myHostName)); int targetRank; int beRank, mrnetPort, mrnetRank; char mrnetHostName[64]; int in_beRank, in_mrnetPort, in_mrnetRank; char in_mrnetHostName[64]; // Rank 0 reads in all connection information and sends appropriate // chunks to the other ranks. if (rank == 0) { TAU_VERBOSE("Connecting to ToM\n"); FILE *connections = fopen("attachBE_connections","r"); // assume there are exactly size entries in the connection file. for (int i=0; irecv(&tag, p, &ctrl_stream); p->unpack("%d", &data); if (data == 1) { broadcastResults = true; } else if (data == 0) { broadcastResults = false; } else { fprintf(stderr,"Warning: Invalid initial control signal %d.\n", data); } /* DEBUG printf("[%d] Got ToM control stream. Proceeding with application code\n", rank); */ } // more like a "last call" for protocol action than an // actual disconnect call. This call should not exit unless // the front-end says so. extern "C" void Tau_mon_disconnect() { if (rank == 0) { TAU_VERBOSE("Disconnecting from ToM\n"); } // Tell front-end to tear down network and exit STREAM_FLUSHSEND(ctrl_stream, TOM_CONTROL, "%d", TOM_EXIT); } extern "C" void protocolLoop(int *globalToLocal, int numGlobal) { // receive from the network so that ToM will always know which stream to // respond to. int protocolTag; PacketPtr p; Stream *stream; bool processProtocol = true; // data from Basestats to be kept for later procotols. *CWL* find a // modular way for this information to be exchanged between protocols. double *means; double *std_devs; double *mins; double *maxes; int numThreads = RtsLayer::getNumThreads(); int numCounters = Tau_Global_numCounters; while (processProtocol) { net->recv(&protocolTag, p, &stream); switch (protocolTag) { case PROT_UNIFY: { // Rank 0 additionally responds to the front-end with all global // function name strings. if (rank == 0) { int tag; PacketPtr p; printf("Num Global = %d\n", numGlobal); net->recv(&tag, p, &stream); STREAM_FLUSHSEND(stream, PROT_UNIFY, "%as", mrnetFuncUnifier->globalStrings, numGlobal); } break; } case PROT_BASESTATS: { // *DEBUG* if (rank == 0) { printf("BE: Instructed by FE to report events and counters.\n"); } double *out_sums; double *out_sumsofsqr; double *out_mins; double *out_maxes; out_sums = (double *)TAU_UTIL_MALLOC(numGlobal*numCounters* sizeof(double)); out_sumsofsqr = (double *)TAU_UTIL_MALLOC(numGlobal*numCounters* sizeof(double)); out_mins = (double *)TAU_UTIL_MALLOC(numGlobal*numCounters* sizeof(double)); out_maxes = (double *)TAU_UTIL_MALLOC(numGlobal*numCounters* sizeof(double)); // For each event, how many threads contribute values // from this node? int *threads; threads = (int *)TAU_UTIL_MALLOC(numGlobal*sizeof(int)); // Construct the data arrays to be sent. for (int evt=0; evtgetDumpExclusiveValues(tid)[ctr]; sum += val; sumofsqr += val*val; if (tid == 0) { min = val; max = val; } else { if (min > val) { min = val; } if (max < val) { max = val; } } } // Write thread-accumulated values into the appropriate arrays out_sums[aIdx] = sum; out_sumsofsqr[aIdx] = sumofsqr; out_mins[aIdx] = min; out_maxes[aIdx] = max; } threads[evt] = numThreads; } else { /* globalToLocal[evt] != -1 */ // send a null contribution for each counter associated with // the function for this node for (int ctr=0; ctrrecv(&protocolTag, p, &stream); p->unpack("%alf %alf", &means, &numMeans, &std_devs, &numStdDevs, &mins, &numMins, &maxes, &numMaxes); /* DEBUG printf("BE: Received %d values from FE\n", numMeans); for (int val=0; valunpack("%ad %d %d", &keepItem, &numItems, &numKeep, &numBins); /* DEBUG printf("BE: Received filtered events from FE %d %d %d\n", numItems, numKeep, numBins); for (int i=0; igetDumpExclusiveValues(tid)[ctr]; int histIdx = (int)floor((val-mins[aIdx])/interval); // hackish way of dealing with rounding problems. /* printf("BE: %f %f %f %f %d\n", interval, range, mins[aIdx], val, histIdx); */ if (histIdx < 0) { histIdx = 0; } if (histIdx >= numBins) { histIdx = numBins-1; } histBins[keepIdx*numBins+histIdx]++; } } keepIdx++; } } } STREAM_FLUSHSEND(stream, protocolTag, "%ad", histBins, numKeep*numBins); break; } case PROT_CLASSIFIER: { break; } case PROT_CLUST_KMEANS: { break; } case PROT_EXIT: { // Front-end is done with this round of the processing protocol. // Backends are now allowed to proceed with computation. processProtocol = false; break; } default: { printf("Warning: Unknown protocol tag [%d]\n", protocolTag); } } } /* while (processProtocol) */ if (rank == 0) { TAU_VERBOSE("BE: Protocol handled. Application computation follows.\n"); } } extern "C" void Tau_mon_onlineDump() { // *DEBUG* printf("Tau Mon data ready for dump.\n"); // Need to get data loaded and computed from the stacks int numThreads = RtsLayer::getNumThreads(); for (int tid = 0; tidglobalNumItems; int numLocal = mrnetFuncUnifier->localNumItems; assert(numLocal <= numGlobal); int *globalToLocal = (int*)TAU_UTIL_MALLOC(numGlobal*sizeof(int)); // initialize all global entries to -1 // where -1 indicates that the event did not occur for this rank for (int i=0; imapping[i]] = mrnetFuncUnifier->sortMap[i]; } /* DEBUG for (int i=0; iGetName(), fi->getDumpExclusiveValues(0)[0]); } } */ // Tell the MRNet front-end that the data is ready and wait for // protocol instructions. STREAM_FLUSHSEND(ctrl_stream, TOM_CONTROL, "%d", PROT_DATA_READY); // Start the protocol loop. protocolLoop(globalToLocal, numGlobal); } #endif /* TAU_EXP_UNIFY */ #endif /* TAU_MONITORING */