/home/users/khuck/src/octotiger/src/node_server_actions_1.cpp

Line% of fetchesSource
1  
/*
2  
 * node_server_actions_1.cpp
3  
 *
4  
 *  Created on: Sep 23, 2016
5  
 *      Author: dmarce1
6  
 */
7  
8  
#include "node_server.hpp"
9  
#include "node_client.hpp"
10  
#include "diagnostics.hpp"
11  
#include "future.hpp"
12  
#include "taylor.hpp"
13  
#include "profiler.hpp"
14  
#include "options.hpp"
15  
16  
#include <chrono>
17  
18  
#include <hpx/include/run_as.hpp>
19  
#include <hpx/include/lcos.hpp>
20  
21  
extern options opts;
22  
23  
typedef node_server::load_action load_action_type;
24  
HPX_REGISTER_ACTION(load_action_type);
25  
26  
hpx::mutex rec_size_mutex;
27  
integer rec_size = -1;
28  
29  
void set_locality_data(real omega, space_vector pivot, integer record_size) {
30  
    grid::set_omega(omega);
31  
    grid::set_pivot(pivot);
32  
    rec_size = record_size;
33  
}
34  
35  
hpx::id_type make_new_node(const node_location& loc, const hpx::id_type& _parent) {
36  
    return hpx::new_<node_server>(hpx::find_here(), loc, _parent, ZERO, ZERO).get();
37  
}
38  
39  
HPX_PLAIN_ACTION(make_new_node, make_new_node_action);
40  
HPX_PLAIN_ACTION(set_locality_data, set_locality_data_action);
41  
42  
hpx::future<grid::output_list_type> node_client::load(integer i, const hpx::id_type& _me,
43  
    bool do_o, std::string s) const {
44  
    return hpx::async<typename node_server::load_action>(get_gid(), i, _me, do_o, s);
45  
}
46  
47  
grid::output_list_type node_server::load(integer cnt, const hpx::id_type& _me,
48  
    bool do_output, std::string filename) {
49  
50  
    if (rec_size == -1 && my_location.level() == 0) {
51  
52  
        real omega = 0;
53  
        space_vector pivot;
54  
55  
        // run output on separate thread
56  
        hpx::threads::run_as_os_thread([&]()
57  
        {
58  
            FILE* fp = fopen(filename.c_str(), "rb");
59  
            if (fp == NULL) {
60  
                printf("Failed to open file\n");
61  
                abort();
62  
            }
63  
            fseek(fp, -sizeof(integer), SEEK_END);
64  
            std::size_t read_cnt = fread(&rec_size, sizeof(integer), 1, fp);
65  
            fseek(fp, -4 * sizeof(real) - sizeof(integer), SEEK_END);
66  
            read_cnt += fread(&omega, sizeof(real), 1, fp);
67  
            for (auto& d : geo::dimension::full_set()) {
68  
                read_cnt += fread(&(pivot[d]), sizeof(real), 1, fp);
69  
            }
70  
            fclose(fp);
71  
        }).get();
72  
73  
        auto localities = hpx::find_all_localities();
74  
        std::vector<hpx::future<void>> futs;
75  
        futs.reserve(localities.size());
76  
        for (auto& locality : localities) {
77  
            futs.push_back(
78  
                hpx::async<set_locality_data_action>(locality, omega, pivot, rec_size));
79  
        }
80  
        wait_all_and_propagate_exceptions(futs);
81  
    }
82  
83  
    static auto localities = hpx::find_all_localities();
84  
    me = _me;
85  
86  
    char flag = '0';
87  
    integer total_nodes = 0;
88  
    std::vector<integer> counts(NCHILD);
89  
90  
    // run output on separate thread
91  
    hpx::threads::run_as_os_thread([&]()
92  
    {
93  
        FILE* fp = fopen(filename.c_str(), "rb");
94  
        fseek(fp, cnt * rec_size, SEEK_SET);
95  
        std::size_t read_cnt = fread(&flag, sizeof(char), 1, fp);
96  
        for (auto& this_cnt : counts) {
97  
            read_cnt += fread(&this_cnt, sizeof(integer), 1, fp);
98  
        }
99  
        load_me(fp);
100  
        fseek(fp, 0L, SEEK_END);
101  
        total_nodes = ftell(fp) / rec_size;
102  
        fclose(fp);
103  
    }).get();
104  
105  
    std::vector<hpx::future<grid::output_list_type>> futs;
106  
    //printf( "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1\n" );
107  
    if (flag == '1') {
108  
        is_refined = true;
109  
        children.resize(NCHILD);
110  
        constexpr auto full_set = geo::octant::full_set();
111  
        futs.reserve(full_set.size());
112  
        for (auto& ci : full_set) {
113  
            integer loc_id = ((cnt * localities.size()) / (total_nodes + 1));
114  
            children[ci] = hpx::async<make_new_node_action>(localities[loc_id],
115  
                my_location.get_child(ci), me.get_gid());
116  
            futs.push_back(
117  
                children[ci].load(counts[ci], children[ci].get_gid(), do_output,
118  
                    filename));
119  
        }
120  
    } else if (flag == '0') {
121  
        is_refined = false;
122  
        children.clear();
123  
    } else {
124  
        printf("Corrupt checkpoint file\n");
125  
//		sleep(10);
126  
        hpx::this_thread::sleep_for(std::chrono::seconds(10));
127  
        abort();
128  
    }
129  
    grid::output_list_type my_list;
130  
    for (auto&& fut : futs) {
131  
        if (do_output) {
132  
            grid::merge_output_lists(my_list, fut.get());
133  
        } else {
134  
            fut.get();
135  
        }
136  
    }
137  
    //printf( "***************************************\n" );
138  
    if (!is_refined && do_output) {
139  
        my_list = grid_ptr->get_output_list(false);
140  
        //	grid_ptr = nullptr;
141  
    }
142  
//	hpx::async<inc_grids_loaded_action>(localities[0]).get();
143  
    if (my_location.level() == 0) {
144  
        if (do_output) {
145  
            if (hydro_on && opts.problem == DWD) {
146  
                diagnostics();
147  
            }
148  
            grid::output(my_list, "data.silo", current_time,
149  
                get_rotation_count() / opts.output_dt, false);
150  
        }
151  
        printf("Loaded checkpoint file\n");
152  
        my_list = decltype(my_list)();
153  
154  
    }
155  
    return my_list;
156  
}
157  
158  
typedef node_server::output_action output_action_type;
159  
HPX_REGISTER_ACTION(output_action_type);
160  
161  
hpx::future<grid::output_list_type> node_client::output(std::string fname, int cycle,
162  
    bool flag) const {
163  
    return hpx::async<typename node_server::output_action>(get_gid(), fname, cycle, flag);
164  
}
165  
166  
grid::output_list_type node_server::output(std::string fname, int cycle,
167  
    bool analytic) const {
168  
    if (is_refined) {
169  
        std::vector<hpx::future<grid::output_list_type>> futs;
170  
        futs.reserve(children.size());
171  
        for (auto i = children.begin(); i != children.end(); ++i) {
172  
            futs.push_back(i->output(fname, cycle, analytic));
173  
        }
174  
        auto i = futs.begin();
175  
        grid::output_list_type my_list = i->get();
176  
        for (++i; i != futs.end(); ++i) {
177  
            grid::merge_output_lists(my_list, i->get());
178  
        }
179  
180  
        if (my_location.level() == 0) {
181  
//			hpx::apply([](const grid::output_list_type& olists, const char* filename) {
182  
            printf("Outputing...\n");
183  
            grid::output(my_list, fname, get_time(), cycle, analytic);
184  
            printf("Done...\n");
185  
            //		}, std::move(my_list), fname.c_str());
186  
        }
187  
        return my_list;
188  
189  
    } else {
190  
        return grid_ptr->get_output_list(analytic);
191  
    }
192  
193  
}
194  
195  
typedef node_server::regrid_gather_action regrid_gather_action_type;
196  
HPX_REGISTER_ACTION(regrid_gather_action_type);
197  
198  
hpx::future<integer> node_client::regrid_gather(bool rb) const {
199  
    return hpx::async<typename node_server::regrid_gather_action>(get_gid(), rb);
200  
}
201  
202  
integer node_server::regrid_gather(bool rebalance_only) {
203  
    integer count = integer(1);
204  
205  
    if (is_refined) {
206  
        if (!rebalance_only) {
207  
            /* Turning refinement off */
208  
            if (refinement_flag == 0) {
209  
                children.clear();
210  
                is_refined = false;
211  
                grid_ptr->set_leaf(true);
212  
            }
213  
        }
214  
215  
        if (is_refined) {
216  
            std::vector<hpx::future<integer>> futs;
217  
            futs.reserve(children.size());
218  
            for (auto& child : children) {
219  
                futs.push_back(child.regrid_gather(rebalance_only));
220  
            }
221  
            auto futi = futs.begin();
222  
            for (auto& ci : geo::octant::full_set()) {
223  
                auto child_cnt = futi->get();
224  
                ++futi;
225  
                child_descendant_count[ci] = child_cnt;
226  
                count += child_cnt;
227  
            }
228  
        } else {
229  
            for (auto& ci : geo::octant::full_set()) {
230  
                child_descendant_count[ci] = 0;
231  
            }
232  
        }
233  
    } else if (!rebalance_only) {
234  
        //		if (grid_ptr->refine_me(my_location.level())) {
235  
        if (refinement_flag != 0) {
236  
            refinement_flag = 0;
237  
            count += NCHILD;
238  
239  
            children.resize(NCHILD);
240  
            std::vector<node_location> clocs(NCHILD);
241  
242  
            /* Turning refinement on*/
243  
            is_refined = true;
244  
            grid_ptr->set_leaf(false);
245  
246  
            for (auto& ci : geo::octant::full_set()) {
247  
                child_descendant_count[ci] = 1;
248  
                children[ci] = hpx::new_<node_server>(hpx::find_here(),
249  
                    my_location.get_child(ci), me, current_time, rotational_time);
250  
                std::array<integer, NDIM> lb = { 2 * H_BW, 2 * H_BW, 2 * H_BW };
251  
                std::array<integer, NDIM> ub;
252  
                lb[XDIM] += (1 & (ci >> 0)) * (INX);
253  
                lb[YDIM] += (1 & (ci >> 1)) * (INX);
254  
                lb[ZDIM] += (1 & (ci >> 2)) * (INX);
255  
                for (integer d = 0; d != NDIM; ++d) {
256  
                    ub[d] = lb[d] + (INX);
257  
                }
258  
                std::vector<real> outflows(NF, ZERO);
259  
                if (ci == 0) {
260  
                    outflows = grid_ptr->get_outflows();
261  
                }
262  
                if (current_time > ZERO) {
263  
                    children[ci].set_grid(grid_ptr->get_prolong(lb, ub),
264  
                        std::move(outflows)).get();
265  
                }
266  
            }
267  
        }
268  
    }
269  
270  
    return count;
271  
}
272  
273  
typedef node_server::regrid_scatter_action regrid_scatter_action_type;
274  
HPX_REGISTER_ACTION(regrid_scatter_action_type);
275  
276  
hpx::future<void> node_client::regrid_scatter(integer a, integer b) const {
277  
    return hpx::async<typename node_server::regrid_scatter_action>(get_gid(), a, b);
278  
}
279  
280  
void node_server::regrid_scatter(integer a_, integer total) {
281  
    refinement_flag = 0;
282  
    std::vector<hpx::future<void>> futs;
283  
    if (is_refined) {
284  
        integer a = a_;
285  
        std::vector<hpx::id_type> localities;
286  
        {
287  
            timings::scope ts(timings_, timings::time_find_localities);
288  
            localities = hpx::find_all_localities();
289  
        }
290  
        ++a;
291  
        for (auto& ci : geo::octant::full_set()) {
292  
            const integer loc_index = a * localities.size() / total;
293  
            const auto child_loc = localities[loc_index];
294  
            a += child_descendant_count[ci];
295  
            const hpx::id_type id = children[ci].get_gid();
296  
            integer current_child_id = hpx::naming::get_locality_id_from_gid(
297  
                id.get_gid());
298  
            auto current_child_loc = localities[current_child_id];
299  
            if (child_loc != current_child_loc) {
300  
                children[ci] = children[ci].copy_to_locality(child_loc);
301  
            }
302  
        }
303  
        a = a_ + 1;
304  
        constexpr auto full_set = geo::octant::full_set();
305  
        futs.reserve(full_set.size());
306  
        for (auto& ci : full_set) {
307  
            futs.push_back(children[ci].regrid_scatter(a, total));
308  
            a += child_descendant_count[ci];
309  
        }
310  
    }
311  
    clear_family();
312  
    wait_all_and_propagate_exceptions(futs);
313  
}
314  
315  
typedef node_server::regrid_action regrid_action_type;
316  
HPX_REGISTER_ACTION(regrid_action_type);
317  
318  
hpx::future<void> node_client::regrid(const hpx::id_type& g, bool rb) const {
319  
    return hpx::async<typename node_server::regrid_action>(get_gid(), g, rb);
320  
}
321  
322  
int node_server::regrid(const hpx::id_type& root_gid, bool rb) {
323  
    timings::scope ts(timings_, timings::time_regrid);
324  
    assert(grid_ptr != nullptr);
325  
    printf("-----------------------------------------------\n");
326  
    if (!rb) {
327  
        printf("checking for refinement\n");
328  
        check_for_refinement();
329  
    }
330  
    printf("regridding\n");
331  
    integer a = regrid_gather(rb);
332  
    printf("rebalancing %i nodes\n", int(a));
333  
    regrid_scatter(0, a);
334  
    assert(grid_ptr != nullptr);
335  
    std::vector<hpx::id_type> null_neighbors(geo::direction::count());
336  
    printf("forming tree connections\n");
337  
    form_tree(root_gid, hpx::invalid_id, null_neighbors);
338  
    if (current_time > ZERO) {
339  
        printf("solving gravity\n");
340  
        solve_gravity(true);
341  
    }
342  
    printf("regrid done\n-----------------------------------------------\n");
343  
    return a;
344  
}
345  
346  
typedef node_server::save_action save_action_type;
347  
HPX_REGISTER_ACTION(save_action_type);
348  
349  
integer node_client::save(integer i, std::string s) const {
350  
    return hpx::async<typename node_server::save_action>(get_gid(), i, s).get();
351  
}
352  
353  
integer node_server::save(integer cnt, std::string filename) const {
354  
    char flag = is_refined ? '1' : '0';
355  
    integer record_size = 0;
356  
357  
    // run output on separate thread
358  
    hpx::threads::run_as_os_thread([&]()
359  
    {
360  
        FILE* fp = fopen(filename.c_str(), (cnt == 0) ? "wb" : "ab");
361  
        fwrite(&flag, sizeof(flag), 1, fp);
362  
        ++cnt;
363  
    //	printf("                                   \rSaved %li sub-grids\r", (long int) cnt);
364  
        integer value = cnt;
365  
        std::array<integer, NCHILD> values;
366  
        for (auto& ci : geo::octant::full_set()) {
367  
            if (ci != 0 && is_refined) {
368  
                value += child_descendant_count[ci - 1];
369  
            }
370  
            values[ci] = value;
371  
            fwrite(&value, sizeof(value), 1, fp);
372  
        }
373  
        record_size = save_me(fp) + sizeof(flag) + NCHILD * sizeof(integer);
374  
        fclose(fp);
375  
    }).get();
376  
377  
    if (is_refined) {
378  
        for (auto& ci : geo::octant::full_set()) {
379  
            cnt = children[ci].save(cnt, filename);
380  
        }
381  
    }
382  
383  
    if (my_location.level() == 0) {
384  
        // run output on separate thread
385  
        hpx::threads::run_as_os_thread([&]()
386  
        {
387  
            FILE* fp = fopen(filename.c_str(), "ab");
388  
            real omega = grid::get_omega();
389  
            space_vector pivot = grid::get_pivot();
390  
            fwrite(&omega, sizeof(real), 1, fp);
391  
            for (auto& d : geo::dimension::full_set()) {
392  
                fwrite(&(pivot[d]), sizeof(real), 1, fp);
393  
            }
394  
            fwrite(&record_size, sizeof(integer), 1, fp);
395  
            fclose(fp);
396  
        }).get();
397  
398  
        printf("Saved %li grids to checkpoint file\n", (long int) cnt);
399  
    }
400  
401  
    return cnt;
402  
}
403  
404  
typedef node_server::set_aunt_action set_aunt_action_type;
405  
HPX_REGISTER_ACTION(set_aunt_action_type);
406  
407  
hpx::future<void> node_client::set_aunt(const hpx::id_type& aunt,
408  
    const geo::face& f) const {
409  
    return hpx::async<typename node_server::set_aunt_action>(get_gid(), aunt, f);
410  
}
411  
412  
void node_server::set_aunt(const hpx::id_type& aunt, const geo::face& face) {
413  
    aunts[face] = aunt;
414  
}
415  
416  
typedef node_server::set_grid_action set_grid_action_type;
417  
HPX_REGISTER_ACTION(set_grid_action_type);
418  
419  
hpx::future<void> node_client::set_grid(std::vector<real>&& g,
420  
    std::vector<real>&& o) const {
421  
    return hpx::async<typename node_server::set_grid_action>(get_gid(), g, o);
422  
}
423  
424  
void node_server::set_grid(const std::vector<real>& data, std::vector<real>&& outflows) {
425  
    grid_ptr->set_prolong(data, std::move(outflows));
426  
}
427  
428  
typedef node_server::solve_gravity_action solve_gravity_action_type;
429  
HPX_REGISTER_ACTION(solve_gravity_action_type);
430  
431  
hpx::future<void> node_client::solve_gravity(bool ene) const {
432  
    return hpx::async<typename node_server::solve_gravity_action>(get_gid(), ene);
433  
}
434  
435  
void node_server::solve_gravity(bool ene) {
436  
    if (!gravity_on) {
437  
        return;
438  
    }
439  
    std::vector<hpx::future<void>> child_futs;
440  
    child_futs.reserve(children.size());
441  
    for (auto& child : children) {
442  
        child_futs.push_back(child.solve_gravity(ene));
443  
    }
444  
    compute_fmm(RHO, ene);
445  
    wait_all_and_propagate_exceptions(child_futs);
446  
}
447  
448  

Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.