/home/users/khuck/src/hpx-lsu/src/runtime/agas/addressing_service.cpp

Line% of fetchesSource
1  
////////////////////////////////////////////////////////////////////////////////
2  
//  Copyright (c) 2011 Bryce Adelstein-Lelbach
3  
//  Copyright (c) 2011-2016 Hartmut Kaiser
4  
//  Copyright (c) 2016 Parsa Amini
5  
//  Copyright (c) 2016 Thomas Heller
6  
//
7  
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
8  
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9  
////////////////////////////////////////////////////////////////////////////////
10  
11  
#include <hpx/config.hpp>
12  
#include <hpx/exception.hpp>
13  
#include <hpx/runtime.hpp>
14  
#include <hpx/apply.hpp>
15  
#include <hpx/traits/action_priority.hpp>
16  
#include <hpx/traits/action_was_object_migrated.hpp>
17  
#include <hpx/traits/component_supports_migration.hpp>
18  
#include <hpx/runtime/agas/addressing_service.hpp>
19  
#include <hpx/runtime/agas/big_boot_barrier.hpp>
20  
#include <hpx/runtime/agas/component_namespace.hpp>
21  
#include <hpx/runtime/agas/locality_namespace.hpp>
22  
#include <hpx/runtime/agas/primary_namespace.hpp>
23  
#include <hpx/runtime/agas/symbol_namespace.hpp>
24  
#include <hpx/runtime/agas/namespace_action_code.hpp>
25  
#include <hpx/runtime/agas/server/component_namespace.hpp>
26  
#include <hpx/runtime/agas/server/locality_namespace.hpp>
27  
#include <hpx/runtime/agas/server/primary_namespace.hpp>
28  
#include <hpx/runtime/agas/server/symbol_namespace.hpp>
29  
#include <hpx/runtime/agas/detail/bootstrap_component_namespace.hpp>
30  
#include <hpx/runtime/agas/detail/bootstrap_locality_namespace.hpp>
31  
#include <hpx/runtime/find_localities.hpp>
32  
#include <hpx/runtime/naming/split_gid.hpp>
33  
#include <hpx/util/bind.hpp>
34  
#include <hpx/util/logging.hpp>
35  
#include <hpx/util/runtime_configuration.hpp>
36  
#include <hpx/util/safe_lexical_cast.hpp>
37  
#include <hpx/util/assert.hpp>
38  
#include <hpx/util/register_locks.hpp>
39  
#include <hpx/util/unlock_guard.hpp>
40  
#include <hpx/util/insert_checked.hpp>
41  
#include <hpx/performance_counters/counters.hpp>
42  
#include <hpx/performance_counters/counter_creators.hpp>
43  
#include <hpx/performance_counters/manage_counter_type.hpp>
44  
#include <hpx/lcos/wait_all.hpp>
45  
#include <hpx/lcos/broadcast.hpp>
46  
47  
#include <boost/format.hpp>
48  
#include <boost/icl/closed_interval.hpp>
49  
50  
#include <cstddef>
51  
#include <cstdint>
52  
#include <functional>
53  
#include <map>
54  
#include <memory>
55  
#include <mutex>
56  
#include <sstream>
57  
#include <string>
58  
#include <utility>
59  
#include <vector>
60  
61  
namespace hpx { namespace agas
62  
{
63  
struct addressing_service::gva_cache_key
64  
{ // {{{ gva_cache_key implementation
65  
  private:
66  
    typedef boost::icl::closed_interval<naming::gid_type, std::less>
67  
        key_type;
68  
69  
    key_type key_;
70  
71  
  public:
72  
    gva_cache_key()
73  
      : key_()
74  
    {}
75  
76  
    explicit gva_cache_key(
77  
        naming::gid_type const& id_
78  
      , std::uint64_t count_ = 1
79  
        )
80  
      : key_(naming::detail::get_stripped_gid(id_)
81  
           , naming::detail::get_stripped_gid(id_) + (count_ - 1))
82  
    {
83  
        HPX_ASSERT(count_);
84  
    }
85  
86  
    naming::gid_type get_gid() const
87  
    {
88  
        return boost::icl::lower(key_);
89  
    }
90  
91  
    std::uint64_t get_count() const
92  
    {
93  
        naming::gid_type const size = boost::icl::length(key_);
94  
        HPX_ASSERT(size.get_msb() == 0);
95  
        return size.get_lsb();
96  
    }
97  
98  
    friend bool operator<(
99  
        gva_cache_key const& lhs
100  
      , gva_cache_key const& rhs
101  
        )
102  
    {
103  
        return boost::icl::exclusive_less(lhs.key_, rhs.key_);
104  
    }
105  
106  
    friend bool operator==(
107  
        gva_cache_key const& lhs
108  
      , gva_cache_key const& rhs
109  
        )
110  
    {
111  
        // Direct hit
112  
        if(lhs.key_ == rhs.key_)
113  
            return true;
114  
115  
        // Is lhs in rhs?
116  
        if (1 == lhs.get_count() && 1 != rhs.get_count())
117  
            return boost::icl::contains(rhs.key_, lhs.key_);
118  
119  
        // Is rhs in lhs?
120  
        else if (1 != lhs.get_count() && 1 == rhs.get_count())
121  
            return boost::icl::contains(lhs.key_, rhs.key_);
122  
123  
        return false;
124  
    }
125  
}; // }}}
126  
127  
addressing_service::addressing_service(
128  
    parcelset::parcelhandler& ph
129  
  , util::runtime_configuration const& ini_
130  
  , runtime_mode runtime_type_
131  
    )
132  
  : gva_cache_(new gva_cache_type)
133  
  , console_cache_(naming::invalid_locality_id)
134  
  , max_refcnt_requests_(ini_.get_agas_max_pending_refcnt_requests())
135  
  , refcnt_requests_count_(0)
136  
  , enable_refcnt_caching_(true)
137  
  , refcnt_requests_(new refcnt_requests_type)
138  
  , service_type(ini_.get_agas_service_mode())
139  
  , runtime_type(runtime_type_)
140  
  , caching_(ini_.get_agas_caching_mode())
141  
  , range_caching_(caching_ ? ini_.get_agas_range_caching_mode() : false)
142  
  , action_priority_(ini_.get_agas_dedicated_server() ?
143  
        threads::thread_priority_normal : threads::thread_priority_boost)
144  
  , rts_lva_(0)
145  
  , mem_lva_(0)
146  
  , state_(state_starting)
147  
  , locality_()
148  
{ // {{{
149  
    std::shared_ptr<parcelset::parcelport> pp = ph.get_bootstrap_parcelport();
150  
    create_big_boot_barrier(pp ? pp.get() : nullptr, ph.endpoints(), ini_);
151  
152  
    if (caching_)
153  
        gva_cache_->reserve(ini_.get_agas_local_cache_size());
154  
155  
    if (service_type == service_mode_bootstrap)
156  
    {
157  
        launch_bootstrap(pp, ph.endpoints(), ini_);
158  
    }
159  
} // }}}
160  
161  
void addressing_service::initialize(parcelset::parcelhandler& ph,
162  
    std::uint64_t rts_lva, std::uint64_t mem_lva)
163  
{ // {{{
164  
    rts_lva_ = rts_lva;
165  
    mem_lva_ = mem_lva;
166  
167  
    // now, boot the parcel port
168  
    std::shared_ptr<parcelset::parcelport> pp = ph.get_bootstrap_parcelport();
169  
    if(pp)
170  
        pp->run(false);
171  
172  
    if (service_type == service_mode_bootstrap)
173  
    {
174  
        get_big_boot_barrier().wait_bootstrap();
175  
    }
176  
    else
177  
    {
178  
        launch_hosted();
179  
        get_big_boot_barrier().wait_hosted(
180  
            pp->get_locality_name(),
181  
            primary_ns_.ptr(), symbol_ns_.ptr());
182  
    }
183  
184  
    set_status(state_running);
185  
} // }}}
186  
187  
namespace detail
188  
{
189  
    std::uint32_t get_number_of_pus_in_cores(std::uint32_t num_cores);
190  
}
191  
192  
void addressing_service::launch_bootstrap(
193  
    std::shared_ptr<parcelset::parcelport> const& pp
194  
  , parcelset::endpoints_type const & endpoints
195  
  , util::runtime_configuration const& ini_
196  
    )
197  
{ // {{{
198  
    component_ns_.reset(new detail::bootstrap_component_namespace);
199  
    locality_ns_.reset(new detail::bootstrap_locality_namespace(
200  
        reinterpret_cast<server::primary_namespace *>(primary_ns_.ptr())));
201  
202  
    runtime& rt = get_runtime();
203  
204  
    naming::gid_type const here =
205  
        naming::get_gid_from_locality_id(HPX_AGAS_BOOTSTRAP_PREFIX);
206  
    set_local_locality(here);
207  
208  
    // store number of cores used by other processes
209  
    std::uint32_t cores_needed = rt.assign_cores();
210  
    std::uint32_t first_used_core = rt.assign_cores(
211  
        pp ? pp->get_locality_name() : "", cores_needed);
212  
213  
    util::runtime_configuration& cfg = rt.get_config();
214  
    cfg.set_first_used_core(first_used_core);
215  
    HPX_ASSERT(pp ? pp->here() == pp->agas_locality(cfg) : true);
216  
    rt.assign_cores();
217  
218  
    naming::id_type const locality_gid = locality_ns_->gid();
219  
    gva locality_gva(here,
220  
        server::locality_namespace::get_component_type(), 1U,
221  
            locality_ns_->ptr());
222  
223  
    naming::id_type const primary_gid = primary_ns_.gid();
224  
    gva primary_gva(here,
225  
        server::primary_namespace::get_component_type(), 1U,
226  
            primary_ns_.ptr());
227  
228  
    naming::id_type const component_gid = component_ns_->gid();
229  
    gva component_gva(here,
230  
         server::component_namespace::get_component_type(), 1U,
231  
            component_ns_->ptr());
232  
233  
    naming::id_type const symbol_gid = symbol_ns_.gid();
234  
    gva symbol_gva(here,
235  
        server::symbol_namespace::get_component_type(), 1U,
236  
            symbol_ns_.ptr());
237  
238  
    rt.get_config().parse("assigned locality",
239  
        boost::str(boost::format("hpx.locality!=%1%")
240  
                  % naming::get_locality_id_from_gid(here)));
241  
242  
    std::uint32_t num_threads = hpx::util::get_entry_as<std::uint32_t>(
243  
        ini_, "hpx.os_threads", 1u);
244  
    locality_ns_->allocate(endpoints, 0, num_threads, naming::invalid_gid);
245  
246  
    naming::gid_type runtime_support_gid1(here);
247  
    runtime_support_gid1.set_lsb(rt.get_runtime_support_lva());
248  
    naming::gid_type runtime_support_gid2(here);
249  
    runtime_support_gid2.set_lsb(std::uint64_t(0));
250  
251  
    gva runtime_support_address(here
252  
      , components::get_component_type<components::server::runtime_support>()
253  
      , 1U, rt.get_runtime_support_lva());
254  
255  
    register_name("/0/agas/locality#0", here);
256  
    if (is_console())
257  
        register_name("/0/locality#console", here);
258  
259  
    naming::gid_type lower, upper;
260  
    get_id_range(HPX_INITIAL_GID_RANGE, lower, upper);
261  
    rt.get_id_pool().set_range(lower, upper);
262  
} // }}}
263  
264  
void addressing_service::launch_hosted()
265  
{
266  
}
267  
268  
void addressing_service::adjust_local_cache_size(std::size_t cache_size)
269  
{ // {{{
270  
    // adjust the local AGAS cache size for the number of worker threads and
271  
    // create the hierarchy based on the topology
272  
    if (caching_)
273  
    {
274  
        std::size_t previous = gva_cache_->size();
275  
            gva_cache_->reserve(cache_size);
276  
277  
        LAGAS_(info) << (boost::format(
278  
            "addressing_service::adjust_local_cache_size, previous size: %1%, "
279  
            "new size: %3%")
280  
            % previous % cache_size);
281  
    }
282  
} // }}}
283  
284  
void addressing_service::set_local_locality(naming::gid_type const& g)
285  
{
286  
    locality_ = g;
287  
    primary_ns_.set_local_locality(g);
288  
}
289  
290  
bool addressing_service::register_locality(
291  
    parcelset::endpoints_type const & endpoints
292  
  , naming::gid_type& prefix
293  
  , std::uint32_t num_threads
294  
  , error_code& ec
295  
    )
296  
{ // {{{
297  
    try {
298  
        prefix = naming::get_gid_from_locality_id(
299  
            locality_ns_->allocate(endpoints, 0, num_threads, prefix));
300  
301  
        {
302  
            std::lock_guard<mutex_type> l(resolved_localities_mtx_);
303  
            std::pair<resolved_localities_type::iterator, bool> res
304  
                = resolved_localities_.insert(std::make_pair(
305  
                    prefix
306  
                  , endpoints
307  
                ));
308  
            HPX_ASSERT(res.second);
309  
        }
310  
311  
        return true;
312  
    }
313  
    catch (hpx::exception const& e) {
314  
        HPX_RETHROWS_IF(ec, e, "addressing_service::register_locality");
315  
        return false;
316  
    }
317  
} // }}}
318  
319  
void addressing_service::register_console(parcelset::endpoints_type const & eps)
320  
{
321  
    std::lock_guard<mutex_type> l(resolved_localities_mtx_);
322  
    std::pair<resolved_localities_type::iterator, bool> res
323  
        = resolved_localities_.insert(std::make_pair(
324  
            naming::get_gid_from_locality_id(0)
325  
          , eps
326  
        ));
327  
    HPX_ASSERT(res.second);
328  
}
329  
330  
bool addressing_service::has_resolved_locality(
331  
    naming::gid_type const & gid
332  
    )
333  
{ // {{{
334  
    std::unique_lock<mutex_type> l(resolved_localities_mtx_);
335  
    return resolved_localities_.find(gid) != resolved_localities_.end();
336  
} // }}}
337  
338  
parcelset::endpoints_type const & addressing_service::resolve_locality(
339  
    naming::gid_type const & gid
340  
  , error_code& ec
341  
    )
342  
{ // {{{
343  
    std::unique_lock<mutex_type> l(resolved_localities_mtx_);
344  
    resolved_localities_type::iterator it = resolved_localities_.find(gid);
345  
    if (it == resolved_localities_.end())
346  
    {
347  
        // The locality hasn't been requested to be resolved yet. Do it now.
348  
        parcelset::endpoints_type endpoints;
349  
        {
350  
            hpx::util::unlock_guard<std::unique_lock<mutex_type> > ul(l);
351  
            endpoints = locality_ns_->resolve_locality(gid);
352  
        }
353  
354  
        // Search again ... might have been added by a different thread already
355  
        it = resolved_localities_.find(gid);
356  
357  
        if (it == resolved_localities_.end())
358  
        {
359  
            if(HPX_UNLIKELY(!util::insert_checked(resolved_localities_.insert(
360  
                    std::make_pair(
361  
                        gid
362  
                      , endpoints
363  
                    )
364  
                ), it)))
365  
            {
366  
                l.unlock();
367  
368  
                HPX_THROWS_IF(ec, internal_server_error
369  
                  , "addressing_service::resolve_locality"
370  
                  , "resolved locality insertion failed "
371  
                    "due to a locking error or memory corruption");
372  
                return resolved_localities_[naming::invalid_gid];
373  
            }
374  
        }
375  
    }
376  
    return it->second;
377  
} // }}}
378  
379  
// TODO: We need to ensure that the locality isn't unbound while it still holds
380  
// referenced objects.
381  
bool addressing_service::unregister_locality(
382  
    naming::gid_type const & gid
383  
  , error_code& ec
384  
    )
385  
{ // {{{
386  
    try {
387  
        locality_ns_->free(gid);
388  
        component_ns_->unregister_server_instance(ec);
389  
        symbol_ns_.unregister_server_instance(ec);
390  
391  
        remove_resolved_locality(gid);
392  
        return true;
393  
    }
394  
    catch (hpx::exception const& e) {
395  
        HPX_RETHROWS_IF(ec, e, "addressing_service::unregister_locality");
396  
        return false;
397  
    }
398  
} // }}}
399  
400  
void addressing_service::remove_resolved_locality(naming::gid_type const& gid)
401  
{
402  
    std::lock_guard<mutex_type> l(resolved_localities_mtx_);
403  
    resolved_localities_type::iterator it = resolved_localities_.find(gid);
404  
    if(it != resolved_localities_.end())
405  
        resolved_localities_.erase(it);
406  
}
407  
408  
409  
bool addressing_service::get_console_locality(
410  
    naming::gid_type& prefix
411  
  , error_code& ec
412  
    )
413  
{ // {{{
414  
    try {
415  
        if (get_status() != state_running)
416  
        {
417  
            if (&ec != &throws)
418  
                ec = make_success_code();
419  
            return false;
420  
        }
421  
422  
        if (is_console())
423  
        {
424  
            prefix = get_local_locality();
425  
            if (&ec != &throws)
426  
                ec = make_success_code();
427  
            return true;
428  
        }
429  
430  
        {
431  
            std::lock_guard<mutex_type> lock(console_cache_mtx_);
432  
433  
            if (console_cache_ != naming::invalid_locality_id)
434  
            {
435  
                prefix = naming::get_gid_from_locality_id(console_cache_);
436  
                if (&ec != &throws)
437  
                    ec = make_success_code();
438  
                return true;
439  
            }
440  
        }
441  
442  
        std::string key("/0/locality#console");
443  
444  
        hpx::id_type resolved_prefix = resolve_name(key);
445  
        if (resolved_prefix != naming::invalid_id)
446  
        {
447  
            std::uint32_t console = naming::get_locality_id_from_id(resolved_prefix);
448  
            prefix = resolved_prefix.get_gid();
449  
450  
            {
451  
                std::lock_guard<mutex_type> lock(console_cache_mtx_);
452  
                if (console_cache_ == naming::invalid_locality_id) {
453  
                    console_cache_ = console;
454  
                }
455  
                else {
456  
                    HPX_ASSERT(console_cache_ == console);
457  
                }
458  
            }
459  
460  
            LAGAS_(debug) <<
461  
                ( boost::format(
462  
                  "addressing_server::get_console_locality, "
463  
                  "caching console locality, prefix(%1%)")
464  
                % console);
465  
466  
            return true;
467  
        }
468  
469  
        return false;
470  
    }
471  
    catch (hpx::exception const& e) {
472  
        HPX_RETHROWS_IF(ec, e, "addressing_service::get_console_locality");
473  
        return false;
474  
    }
475  
} // }}}
476  
477  
bool addressing_service::get_localities(
478  
    std::vector<naming::gid_type>& locality_ids
479  
  , components::component_type type
480  
  , error_code& ec
481  
    )
482  
{ // {{{ get_locality_ids implementation
483  
    try {
484  
        if (type != components::component_invalid)
485  
        {
486  
            const std::vector<std::uint32_t> p = component_ns_->resolve_id(type);
487  
488  
            if (!p.size())
489  
                return false;
490  
491  
            locality_ids.clear();
492  
            for (std::size_t i = 0; i < p.size(); ++i)
493  
                locality_ids.push_back(naming::get_gid_from_locality_id(p[i]));
494  
495  
            return true;
496  
        }
497  
498  
        else
499  
        {
500  
            const std::vector<std::uint32_t> p = locality_ns_->localities();
501  
502  
            if (!p.size())
503  
                return false;
504  
505  
            locality_ids.clear();
506  
            for (std::size_t i = 0; i < p.size(); ++i)
507  
                locality_ids.push_back(naming::get_gid_from_locality_id(p[i]));
508  
509  
            return true;
510  
        }
511  
    }
512  
    catch (hpx::exception const& e) {
513  
        HPX_RETHROWS_IF(ec, e, "addressing_service::get_locality_ids");
514  
        return false;
515  
    }
516  
} // }}}
517  
518  
///////////////////////////////////////////////////////////////////////////////
519  
std::uint32_t addressing_service::get_num_localities(
520  
    components::component_type type
521  
  , error_code& ec
522  
    )
523  
{ // {{{ get_num_localities implementation
524  
    try {
525  
        if (type == components::component_invalid)
526  
        {
527  
            return locality_ns_->get_num_localities();
528  
        }
529  
530  
        return component_ns_->get_num_localities(type).get();
531  
    }
532  
    catch (hpx::exception const& e) {
533  
        HPX_RETHROWS_IF(ec, e, "addressing_service::get_num_localities");
534  
    }
535  
    return std::uint32_t(-1);
536  
} // }}}
537  
538  
lcos::future<std::uint32_t> addressing_service::get_num_localities_async(
539  
    components::component_type type
540  
    )
541  
{ // {{{ get_num_localities implementation
542  
    if (type == components::component_invalid)
543  
    {
544  
        return locality_ns_->get_num_localities_async();
545  
    }
546  
547  
    return component_ns_->get_num_localities(type);
548  
} // }}}
549  
550  
///////////////////////////////////////////////////////////////////////////////
551  
std::uint32_t addressing_service::get_num_overall_threads(
552  
    error_code& ec
553  
    )
554  
{ // {{{ get_num_overall_threads implementation
555  
    try {
556  
        return locality_ns_->get_num_overall_threads();
557  
    }
558  
    catch (hpx::exception const& e) {
559  
        HPX_RETHROWS_IF(ec, e, "addressing_service::get_num_overall_threads");
560  
    }
561  
    return std::uint32_t(0);
562  
} // }}}
563  
564  
lcos::future<std::uint32_t> addressing_service::get_num_overall_threads_async()
565  
{ // {{{
566  
    return locality_ns_->get_num_overall_threads_async();
567  
} // }}}
568  
569  
std::vector<std::uint32_t> addressing_service::get_num_threads(
570  
    error_code& ec
571  
    )
572  
{ // {{{ get_num_threads implementation
573  
    try {
574  
        return locality_ns_->get_num_threads();
575  
    }
576  
    catch (hpx::exception const& e) {
577  
        HPX_RETHROWS_IF(ec, e, "addressing_service::get_num_threads");
578  
    }
579  
    return std::vector<std::uint32_t>();
580  
} // }}}
581  
582  
lcos::future<std::vector<std::uint32_t> > addressing_service::get_num_threads_async()
583  
{ // {{{
584  
    return locality_ns_->get_num_threads_async();
585  
} // }}}
586  
587  
///////////////////////////////////////////////////////////////////////////////
588  
components::component_type addressing_service::get_component_id(
589  
    std::string const& name
590  
  , error_code& ec
591  
    )
592  
{ /// {{{
593  
    try {
594  
        return component_ns_->bind_name(name);
595  
    }
596  
    catch (hpx::exception const& e) {
597  
        HPX_RETHROWS_IF(ec, e, "addressing_service::get_component_id");
598  
        return components::component_invalid;
599  
    }
600  
} // }}}
601  
602  
void addressing_service::iterate_types(
603  
    iterate_types_function_type const& f
604  
  , error_code& ec
605  
    )
606  
{ // {{{
607  
    try {
608  
        return component_ns_->iterate_types(f);
609  
    }
610  
    catch (hpx::exception const& e) {
611  
        HPX_RETHROWS_IF(ec, e, "addressing_service::iterate_types");
612  
    }
613  
} // }}}
614  
615  
std::string addressing_service::get_component_type_name(
616  
    components::component_type id
617  
  , error_code& ec
618  
    )
619  
{ // {{{
620  
    try {
621  
        return component_ns_->get_component_type_name(id);
622  
    }
623  
    catch (hpx::exception const& e) {
624  
        HPX_RETHROWS_IF(ec, e, "addressing_service::iterate_types");
625  
    }
626  
    return "<unknown>";
627  
} // }}}
628  
629  
components::component_type addressing_service::register_factory(
630  
    std::uint32_t prefix
631  
  , std::string const& name
632  
  , error_code& ec
633  
    )
634  
{ // {{{
635  
    try {
636  
        return component_ns_->bind_prefix(name, prefix);
637  
    }
638  
    catch (hpx::exception const& e) {
639  
        HPX_RETHROWS_IF(ec, e, "addressing_service::register_factory");
640  
        return components::component_invalid;
641  
    }
642  
} // }}}
643  
644  
///////////////////////////////////////////////////////////////////////////////
645  
bool addressing_service::get_id_range(
646  
    std::uint64_t count
647  
  , naming::gid_type& lower_bound
648  
  , naming::gid_type& upper_bound
649  
  , error_code& ec
650  
    )
651  
{ // {{{ get_id_range implementation
652  
    try {
653  
        // parcelset::endpoints_type() is an obsolete, dummy argument
654  
655  
        std::pair<naming::gid_type, naming::gid_type> rep(
656  
            primary_ns_.allocate(count));
657  
658  
        if(rep.first == naming::invalid_gid || rep.second == naming::invalid_gid)
659  
            return false;
660  
661  
        lower_bound = rep.first;
662  
        upper_bound = rep.second;
663  
664  
        return true;
665  
    }
666  
    catch (hpx::exception const& e) {
667  
        HPX_RETHROWS_IF(ec, e, "addressing_service::get_id_range");
668  
        return false;
669  
    }
670  
} // }}}
671  
672  
bool addressing_service::bind_range_local(
673  
    naming::gid_type const& lower_id
674  
  , std::uint64_t count
675  
  , naming::address const& baseaddr
676  
  , std::uint64_t offset
677  
  , error_code& ec
678  
    )
679  
{ // {{{ bind_range implementation
680  
    try {
681  
        naming::gid_type const& prefix = baseaddr.locality_;
682  
683  
        // Create a global virtual address from the legacy calling convention
684  
        // parameters
685  
        gva const g(prefix, baseaddr.type_, count, baseaddr.address_, offset);
686  
687  
        primary_ns_.bind_gid(g, lower_id, naming::get_locality_from_gid(lower_id));
688  
689  
        if (range_caching_)
690  
        {
691  
            // Put the range into the cache.
692  
            update_cache_entry(lower_id, g, ec);
693  
        }
694  
        else
695  
        {
696  
            // Only put the first GID in the range into the cache
697  
            gva const first_g = g.resolve(lower_id, lower_id);
698  
            update_cache_entry(lower_id, first_g, ec);
699  
        }
700  
701  
        if (ec)
702  
            return false;
703  
704  
        return true;
705  
    }
706  
    catch (hpx::exception const& e) {
707  
        HPX_RETHROWS_IF(ec, e, "addressing_service::bind_range_local");
708  
        return false;
709  
    }
710  
} // }}}
711  
712  
bool addressing_service::bind_postproc(
713  
    future<bool> f, naming::gid_type const& lower_id, gva const& g
714  
    )
715  
{
716  
    f.get();
717  
718  
    if(range_caching_)
719  
    {
720  
        // Put the range into the cache.
721  
        update_cache_entry(lower_id, g);
722  
    }
723  
    else
724  
    {
725  
        // Only put the first GID in the range into the cache
726  
        gva const first_g = g.resolve(lower_id, lower_id);
727  
        update_cache_entry(lower_id, first_g);
728  
    }
729  
730  
    return true;
731  
}
732  
733  
hpx::future<bool> addressing_service::bind_range_async(
734  
    naming::gid_type const& lower_id
735  
  , std::uint64_t count
736  
  , naming::address const& baseaddr
737  
  , std::uint64_t offset
738  
  , naming::gid_type const& locality
739  
    )
740  
{
741  
    // ask server
742  
    naming::gid_type const& prefix = baseaddr.locality_;
743  
744  
    // Create a global virtual address from the legacy calling convention
745  
    // parameters.
746  
    gva const g(prefix, baseaddr.type_, count, baseaddr.address_, offset);
747  
748  
    naming::gid_type id(
749  
        naming::detail::get_stripped_gid_except_dont_cache(lower_id));
750  
751  
    future<bool> f = primary_ns_.bind_gid_async(g, id, locality);
752  
753  
    return f.then(util::bind(
754  
            util::one_shot(&addressing_service::bind_postproc),
755  
            this, _1, id, g
756  
        ));
757  
}
758  
759  
hpx::future<naming::address> addressing_service::unbind_range_async(
760  
    naming::gid_type const& lower_id
761  
  , std::uint64_t count
762  
    )
763  
{
764  
    return primary_ns_.unbind_gid_async(count, lower_id);
765  
}
766  
767  
bool addressing_service::unbind_range_local(
768  
    naming::gid_type const& lower_id
769  
  , std::uint64_t count
770  
  , naming::address& addr
771  
  , error_code& ec
772  
    )
773  
{ // {{{ unbind_range implementation
774  
    try {
775  
776  
        naming::gid_type gid = naming::detail::get_stripped_gid(lower_id);
777  
778  
        addr = primary_ns_.unbind_gid(count, lower_id);
779  
780  
        return true;
781  
    }
782  
    catch (hpx::exception const& e) {
783  
        HPX_RETHROWS_IF(ec, e, "addressing_service::unbind_range_local");
784  
        return false;
785  
    }
786  
} // }}}
787  
788  
/// This function will test whether the given address refers to an object
789  
/// living on the locality of the caller. We rely completely on the local AGAS
790  
/// cache and local AGAS instance, assuming that everything which is not in
791  
/// the cache is not local.
792  
793  
// bool addressing_service::is_local_address(
794  
//     naming::gid_type const& id
795  
//   , naming::address& addr
796  
//   , error_code& ec
797  
//     )
798  
// {
799  
//     // Resolve the address of the GID.
800  
//
801  
//     // NOTE: We do not throw here for a reason; it is perfectly valid for the
802  
//     // GID to not be found in the local AGAS instance.
803  
//     if (!resolve(id, addr, ec) || ec)
804  
//         return false;
805  
//
806  
//     return addr.locality_ == get_here();
807  
// }
808  
809  
bool addressing_service::is_local_address_cached(
810  
    naming::gid_type const& gid
811  
  , naming::address& addr
812  
  , error_code& ec
813  
    )
814  
{
815  
    // Assume non-local operation if the gid is known to have been migrated
816  
    naming::gid_type id(naming::detail::get_stripped_gid_except_dont_cache(gid));
817  
818  
    {
819  
        std::lock_guard<mutex_type> lock(migrated_objects_mtx_);
820  
        if (was_object_migrated_locked(id))
821  
        {
822  
            if (&ec != &throws)
823  
                ec = make_success_code();
824  
            return false;
825  
        }
826  
    }
827  
828  
    // Try to resolve the address of the GID from the locally available
829  
    // information.
830  
831  
    // NOTE: We do not throw here for a reason; it is perfectly valid for the
832  
    // GID to not be found in the cache.
833  
    if (!resolve_cached(id, addr, ec) || ec)
834  
    {
835  
        if (ec) return false;
836  
837  
        // try also the local part of AGAS before giving up
838  
        if (!resolve_full_local(id, addr, ec) || ec)
839  
            return false;
840  
    }
841  
842  
    return addr.locality_ == get_local_locality();
843  
}
844  
845  
// Return true if at least one address is local.
846  
// bool addressing_service::is_local_address(
847  
//     naming::gid_type const* gids
848  
//   , naming::address* addrs
849  
//   , std::size_t size
850  
//   , boost::dynamic_bitset<>& locals
851  
//   , error_code& ec
852  
//     )
853  
// {
854  
//     // Try the cache
855  
//     if (caching_)
856  
//     {
857  
//         bool all_resolved = resolve_cached(gids, addrs, size, locals, ec);
858  
//         if (ec)
859  
//             return false;
860  
//         if (all_resolved)
861  
//             return locals.any();      // all destinations resolved
862  
//     }
863  
//
864  
//     if (!resolve_full(gids, addrs, size, locals, ec) || ec)
865  
//         return false;
866  
//
867  
//     return locals.any();
868  
// }
869  
870  
bool addressing_service::is_local_lva_encoded_address(
871  
    std::uint64_t msb
872  
    )
873  
{
874  
    // NOTE: This should still be migration safe.
875  
    return naming::detail::strip_internal_bits_from_gid(msb) ==
876  
        get_local_locality().get_msb();
877  
}
878  
879  
bool addressing_service::resolve_locally_known_addresses(
880  
    naming::gid_type const& id
881  
  , naming::address& addr
882  
    )
883  
{
884  
    // LVA-encoded GIDs (located on this machine)
885  
    std::uint64_t lsb = id.get_lsb();
886  
    std::uint64_t msb =
887  
        naming::detail::strip_internal_bits_from_gid(id.get_msb());
888  
889  
    if (is_local_lva_encoded_address(msb))
890  
    {
891  
        addr.locality_ = get_local_locality();
892  
893  
        // An LSB of 0 references the runtime support component
894  
        HPX_ASSERT(rts_lva_);
895  
896  
        if (0 == lsb || lsb == rts_lva_)
897  
        {
898  
            addr.type_ = components::component_runtime_support;
899  
            addr.address_ = rts_lva_;
900  
        }
901  
        else
902  
        {
903  
            HPX_ASSERT(mem_lva_);
904  
905  
            addr.type_ = components::component_memory;
906  
            addr.address_ = mem_lva_;
907  
        }
908  
909  
        return true;
910  
    }
911  
912  
    // explicitly resolve localities
913  
    if (naming::is_locality(id))
914  
    {
915  
        addr.locality_ = id;
916  
        addr.type_ = components::component_runtime_support;
917  
        // addr.address_ will be supplied on the target locality
918  
        return true;
919  
    }
920  
921  
    // authoritative AGAS component address resolution
922  
    if (HPX_AGAS_LOCALITY_NS_MSB == msb && HPX_AGAS_LOCALITY_NS_LSB == lsb)
923  
    {
924  
        addr = locality_ns_->addr();
925  
        return true;
926  
    }
927  
    if (HPX_AGAS_COMPONENT_NS_MSB == msb && HPX_AGAS_COMPONENT_NS_LSB == lsb)
928  
    {
929  
        addr = component_ns_->addr();
930  
        return true;
931  
    }
932  
933  
    naming::gid_type dest = naming::get_locality_from_gid(id);
934  
    if (HPX_AGAS_PRIMARY_NS_LSB == lsb)
935  
    {
936  
        // primary AGAS service on locality 0?
937  
        if(dest == get_local_locality())
938  
        {
939  
            addr = primary_ns_.addr();
940  
        }
941  
        // primary AGAS service on any locality
942  
        else
943  
        {
944  
            addr.locality_ = dest;
945  
            addr.type_ = server::primary_namespace::get_component_type();
946  
            // addr.address_ will be supplied on the target locality
947  
            return true;
948  
        }
949  
    }
950  
951  
    if (HPX_AGAS_SYMBOL_NS_LSB == lsb)
952  
    {
953  
        // symbol AGAS service on this locality?
954  
        if(dest == get_local_locality())
955  
        {
956  
            addr = symbol_ns_.addr();
957  
        }
958  
        // symbol AGAS service on any locality
959  
        else
960  
        {
961  
            addr.locality_ = dest;
962  
            addr.type_ = server::symbol_namespace::get_component_type();
963  
            // addr.address_ will be supplied on the target locality
964  
        }
965  
966  
        return true;
967  
    }
968  
969  
    return false;
970  
} // }}}
971  
972  
bool addressing_service::resolve_full_local(
973  
    naming::gid_type const& id
974  
  , naming::address& addr
975  
  , error_code& ec
976  
    )
977  
{ // {{{ resolve implementation
978  
    try {
979  
        auto rep = primary_ns_.resolve_gid(id);
980  
981  
        using hpx::util::get;
982  
983  
        if (get<0>(rep) == naming::invalid_gid || get<2>(rep) == naming::invalid_gid)
984  
            return false;
985  
986  
        // Resolve the gva to the real resolved address (which is just a gva
987  
        // with as fully resolved LVA and an offset of zero).
988  
        naming::gid_type base_gid = get<0>(rep);
989  
        gva const base_gva = get<1>(rep);
990  
991  
        gva const g = base_gva.resolve(id, base_gid);
992  
993  
        addr.locality_ = g.prefix;
994  
        addr.type_ = g.type;
995  
        addr.address_ = g.lva();
996  
997  
        if (naming::detail::store_in_cache(id))
998  
        {
999  
            HPX_ASSERT(addr.address_);
1000  
            if(range_caching_)
1001  
            {
1002  
                // Put the range into the cache.
1003  
                update_cache_entry(base_gid, base_gva, ec);
1004  
            }
1005  
            else
1006  
            {
1007  
                // Put the fully resolved gva into the cache.
1008  
                update_cache_entry(id, g, ec);
1009  
            }
1010  
        }
1011  
1012  
        if (ec)
1013  
            return false;
1014  
1015  
        if (&ec != &throws)
1016  
            ec = make_success_code();
1017  
1018  
        return true;
1019  
    }
1020  
    catch (hpx::exception const& e) {
1021  
        HPX_RETHROWS_IF(ec, e, "addressing_service::resolve_full_local");
1022  
        return false;
1023  
    }
1024  
} // }}}
1025  
1026  
bool addressing_service::resolve_cached(
1027  
    naming::gid_type const& gid
1028  
  , naming::address& addr
1029  
  , error_code& ec
1030  
    )
1031  
{ // {{{ resolve_cached implementation
1032  
1033  
    naming::gid_type id = naming::detail::get_stripped_gid_except_dont_cache(gid);
1034  
1035  
    // special cases
1036  
    if (resolve_locally_known_addresses(id, addr))
1037  
        return true;
1038  
1039  
    // If caching is disabled, bail
1040  
    if (!caching_)
1041  
    {
1042  
        if (&ec != &throws)
1043  
            ec = make_success_code();
1044  
        return false;
1045  
    }
1046  
1047  
    // don't look at cache if id is marked as non-cache-able
1048  
    if (!naming::detail::store_in_cache(id))
1049  
    {
1050  
        if (&ec != &throws)
1051  
            ec = make_success_code();
1052  
        return false;
1053  
    }
1054  
1055  
    // don't look at the cache if the id is locally managed
1056  
    if (naming::get_locality_id_from_gid(id) ==
1057  
        naming::get_locality_id_from_gid(locality_))
1058  
    {
1059  
        if (&ec != &throws)
1060  
            ec = make_success_code();
1061  
        return false;
1062  
    }
1063  
1064  
    // force routing if target object was migrated
1065  
    {
1066  
        std::lock_guard<mutex_type> lock(migrated_objects_mtx_);
1067  
        if (was_object_migrated_locked(id))
1068  
        {
1069  
            if (&ec != &throws)
1070  
                ec = make_success_code();
1071  
            return false;
1072  
        }
1073  
    }
1074  
1075  
    // first look up the requested item in the cache
1076  
    gva g;
1077  
    naming::gid_type idbase;
1078  
    if (get_cache_entry(id, g, idbase, ec))
1079  
    {
1080  
        addr.locality_ = g.prefix;
1081  
        addr.type_ = g.type;
1082  
        addr.address_ = g.lva(id, idbase);
1083  
1084  
        if (&ec != &throws)
1085  
            ec = make_success_code();
1086  
1087  
/*
1088  
        LAGAS_(debug) <<
1089  
            ( boost::format(
1090  
                "addressing_service::resolve_cached, "
1091  
                "cache hit for address %1%, lva %2% (base %3%, lva %4%)")
1092  
            % id
1093  
            % reinterpret_cast<void*>(addr.address_)
1094  
            % idbase.get_gid()
1095  
            % reinterpret_cast<void*>(g.lva()));
1096  
*/
1097  
1098  
        return true;
1099  
    }
1100  
1101  
    if (&ec != &throws)
1102  
        ec = make_success_code();
1103  
1104  
    LAGAS_(debug) <<
1105  
        ( boost::format(
1106  
            "addressing_service::resolve_cached, "
1107  
            "cache miss for address %1%")
1108  
        % id);
1109  
1110  
    return false;
1111  
} // }}}
1112  
1113  
hpx::future<naming::address> addressing_service::resolve_async(
1114  
    naming::gid_type const& gid
1115  
    )
1116  
{
1117  
    if (!gid)
1118  
    {
1119  
        HPX_THROW_EXCEPTION(bad_parameter,
1120  
            "addressing_service::resolve_async",
1121  
            "invalid reference id");
1122  
        return make_ready_future(naming::address());
1123  
    }
1124  
1125  
    // Try the cache.
1126  
    if (caching_)
1127  
    {
1128  
        naming::address addr;
1129  
        error_code ec;
1130  
        if (resolve_cached(gid, addr, ec))
1131  
            return make_ready_future(addr);
1132  
1133  
        if (ec)
1134  
        {
1135  
            return hpx::make_exceptional_future<naming::address>(
1136  
                hpx::detail::access_exception(ec));
1137  
        }
1138  
    }
1139  
1140  
    // now try the AGAS service
1141  
    return resolve_full_async(gid);
1142  
}
1143  
1144  
hpx::future<naming::id_type> addressing_service::get_colocation_id_async(
1145  
    naming::id_type const& id
1146  
    )
1147  
{
1148  
    if (!id)
1149  
    {
1150  
        HPX_THROW_EXCEPTION(bad_parameter,
1151  
            "addressing_service::get_colocation_id_async",
1152  
            "invalid reference id");
1153  
        return make_ready_future(naming::invalid_id);
1154  
    }
1155  
1156  
    return primary_ns_.colocate(id.get_gid());
1157  
}
1158  
1159  
///////////////////////////////////////////////////////////////////////////////
1160  
naming::address addressing_service::resolve_full_postproc(
1161  
    future<primary_namespace::resolved_type> f, naming::gid_type const& id
1162  
    )
1163  
{
1164  
    using hpx::util::get;
1165  
1166  
    naming::address addr;
1167  
1168  
    auto rep = f.get();
1169  
    if (get<0>(rep) == naming::invalid_gid || get<2>(rep) == naming::invalid_gid)
1170  
    {
1171  
        HPX_THROW_EXCEPTION(bad_parameter,
1172  
            "addressing_service::resolve_full_postproc",
1173  
            "could no resolve global id");
1174  
        return addr;
1175  
    }
1176  
1177  
    // Resolve the gva to the real resolved address (which is just a gva
1178  
    // with as fully resolved LVA and and offset of zero).
1179  
    naming::gid_type base_gid = get<0>(rep);
1180  
    gva const base_gva = get<1>(rep);
1181  
1182  
    gva const g = base_gva.resolve(id, base_gid);
1183  
1184  
    addr.locality_ = g.prefix;
1185  
    addr.type_ = g.type;
1186  
    addr.address_ = g.lva();
1187  
1188  
    if (naming::detail::store_in_cache(id))
1189  
    {
1190  
        if (range_caching_)
1191  
        {
1192  
            // Put the range into the cache.
1193  
            update_cache_entry(base_gid, base_gva);
1194  
        }
1195  
        else
1196  
        {
1197  
            // Put the fully resolved gva into the cache.
1198  
            update_cache_entry(id, g);
1199  
        }
1200  
    }
1201  
1202  
    return addr;
1203  
}
1204  
1205  
hpx::future<naming::address> addressing_service::resolve_full_async(
1206  
    naming::gid_type const& gid
1207  
    )
1208  
{
1209  
    if (!gid)
1210  
    {
1211  
        HPX_THROW_EXCEPTION(bad_parameter,
1212  
            "addressing_service::resolve_full_async",
1213  
            "invalid reference id");
1214  
        return make_ready_future(naming::address());
1215  
    }
1216  
1217  
    // ask server
1218  
    future<primary_namespace::resolved_type> f =
1219  
        primary_ns_.resolve_full(gid);
1220  
1221  
    using util::placeholders::_1;
1222  
    return f.then(util::bind(
1223  
            util::one_shot(&addressing_service::resolve_full_postproc),
1224  
            this, _1, gid
1225  
        ));
1226  
}
1227  
1228  
///////////////////////////////////////////////////////////////////////////////
1229  
bool addressing_service::resolve_full_local(
1230  
    naming::gid_type const* gids
1231  
  , naming::address* addrs
1232  
  , std::size_t count
1233  
  , boost::dynamic_bitset<>& locals
1234  
  , error_code& ec
1235  
    )
1236  
{
1237  
    locals.resize(count);
1238  
1239  
    try {
1240  
        using hpx::util::get;
1241  
1242  
        // special cases
1243  
        for (std::size_t i = 0; i != count; ++i)
1244  
        {
1245  
            if (addrs[i])
1246  
            {
1247  
                locals.set(i, true);
1248  
                continue;
1249  
            }
1250  
1251  
            HPX_ASSERT(!locals.test(i));
1252  
1253  
            if (!addrs[i] && !locals.test(i))
1254  
            {
1255  
                auto rep = primary_ns_.resolve_gid(gids[i]);
1256  
1257  
                if (get<0>(rep) == naming::invalid_gid ||
1258  
                    get<2>(rep) == naming::invalid_gid)
1259  
                    return false;
1260  
                // Resolve the gva to the real resolved address (which is just a gva
1261  
                // with as fully resolved LVA and and offset of zero).
1262  
                naming::gid_type base_gid = get<0>(rep);
1263  
                gva const base_gva = get<1>(rep);
1264  
1265  
                gva const g = base_gva.resolve(gids[i], base_gid);
1266  
1267  
                naming::address& addr = addrs[i];
1268  
                addr.locality_ = g.prefix;
1269  
                addr.type_ = g.type;
1270  
                addr.address_ = g.lva();
1271  
1272  
                hpx::error_code ec;
1273  
                if (naming::detail::store_in_cache(gids[i]))
1274  
                {
1275  
                    if (range_caching_)
1276  
                    {
1277  
                        // Put the range into the cache.
1278  
                        update_cache_entry(base_gid, base_gva, ec);
1279  
                    }
1280  
                    else
1281  
                    {
1282  
                        // Put the fully resolved gva into the cache.
1283  
                        update_cache_entry(gids[i], g, ec);
1284  
                    }
1285  
                }
1286  
1287  
                if (ec)
1288  
                    return false;
1289  
            }
1290  
        }
1291  
1292  
        return true;
1293  
    }
1294  
    catch (hpx::exception const& e) {
1295  
        HPX_RETHROWS_IF(ec, e, "addressing_service::resolve_full");
1296  
        return false;
1297  
    }
1298  
}
1299  
1300  
bool addressing_service::resolve_cached(
1301  
    naming::gid_type const* gids
1302  
  , naming::address* addrs
1303  
  , std::size_t count
1304  
  , boost::dynamic_bitset<>& locals
1305  
  , error_code& ec
1306  
    )
1307  
{
1308  
    locals.resize(count);
1309  
1310  
    std::size_t resolved = 0;
1311  
    for (std::size_t i = 0; i != count; ++i)
1312  
    {
1313  
        if (!addrs[i] && !locals.test(i))
1314  
        {
1315  
            bool was_resolved = resolve_cached(gids[i], addrs[i], ec);
1316  
            if (ec)
1317  
                return false;
1318  
            if (was_resolved)
1319  
                ++resolved;
1320  
1321  
            if (addrs[i].locality_ == get_local_locality())
1322  
                locals.set(i, true);
1323  
        }
1324  
1325  
        else if (addrs[i].locality_ == get_local_locality())
1326  
        {
1327  
            ++resolved;
1328  
            locals.set(i, true);
1329  
        }
1330  
    }
1331  
1332  
    return resolved == count;   // returns whether all have been resolved
1333  
}
1334  
1335  
///////////////////////////////////////////////////////////////////////////////
1336  
void addressing_service::route(
1337  
    parcelset::parcel p
1338  
  , util::function_nonser<void(boost::system::error_code const&,
1339  
        parcelset::parcel const&)> && f
1340  
  , threads::thread_priority local_priority
1341  
    )
1342  
{
1343  
    if (HPX_UNLIKELY(nullptr == threads::get_self_ptr()))
1344  
    {
1345  
        // reschedule this call as an HPX thread
1346  
        void (addressing_service::*route_ptr)(
1347  
            parcelset::parcel,
1348  
            util::function_nonser<void(boost::system::error_code const&,
1349  
                parcelset::parcel const&)> &&,
1350  
            threads::thread_priority
1351  
        ) = &addressing_service::route;
1352  
1353  
        threads::register_thread_nullary(
1354  
            util::deferred_call(
1355  
                route_ptr, this, std::move(p), std::move(f), local_priority),
1356  
            "addressing_service::route", threads::pending, true,
1357  
            threads::thread_priority_normal, std::size_t(-1),
1358  
            threads::thread_stacksize_default);
1359  
        return;
1360  
    }
1361  
1362  
    primary_ns_.route(std::move(p), std::move(f));
1363  
}
1364  
1365  
///////////////////////////////////////////////////////////////////////////////
1366  
// The parameter 'compensated_credit' holds the amount of credits to be added
1367  
// to the acknowledged number of credits. The compensated credits are non-zero
1368  
// if there was a pending decref request at the point when the incref was sent.
1369  
// The pending decref was subtracted from the amount of credits to incref.
1370  
std::int64_t addressing_service::synchronize_with_async_incref(
1371  
    hpx::future<std::int64_t> fut
1372  
  , naming::id_type const& id
1373  
  , std::int64_t compensated_credit
1374  
    )
1375  
{
1376  
    return fut.get() + compensated_credit;
1377  
}
1378  
1379  
lcos::future<std::int64_t> addressing_service::incref_async(
1380  
    naming::gid_type const& id
1381  
  , std::int64_t credit
1382  
  , naming::id_type const& keep_alive
1383  
    )
1384  
{ // {{{ incref implementation
1385  
    naming::gid_type raw(naming::detail::get_stripped_gid(id));
1386  
1387  
    if (HPX_UNLIKELY(nullptr == threads::get_self_ptr()))
1388  
    {
1389  
        // reschedule this call as an HPX thread
1390  
        lcos::future<std::int64_t> (
1391  
                addressing_service::*incref_async_ptr)(
1392  
            naming::gid_type const&
1393  
          , std::int64_t
1394  
          , naming::id_type const&
1395  
        ) = &addressing_service::incref_async;
1396  
1397  
        return async(incref_async_ptr, this, raw, credit, keep_alive);
1398  
    }
1399  
1400  
    if (HPX_UNLIKELY(0 >= credit))
1401  
    {
1402  
        HPX_THROW_EXCEPTION(bad_parameter
1403  
          , "addressing_service::incref_async"
1404  
          , boost::str(boost::format("invalid credit count of %1%") % credit));
1405  
        return lcos::future<std::int64_t>();
1406  
    }
1407  
1408  
    HPX_ASSERT(keep_alive != naming::invalid_id);
1409  
1410  
    typedef refcnt_requests_type::value_type mapping;
1411  
1412  
    // Some examples of calculating the compensated credits below
1413  
    //
1414  
    //  case   pending   credits   remaining   sent to   compensated
1415  
    //  no     decref              decrefs     AGAS      credits
1416  
    // ------+---------+---------+------------+--------+-------------
1417  
    //   1         0        10        0           0        10
1418  
    //   2        10         9        1           0        10
1419  
    //   3        10        10        0           0        10
1420  
    //   4        10        11        0           1        10
1421  
1422  
    std::pair<naming::gid_type, std::int64_t> pending_incref;
1423  
    bool has_pending_incref = false;
1424  
    std::int64_t pending_decrefs = 0;
1425  
1426  
    {
1427  
        std::lock_guard<mutex_type> l(refcnt_requests_mtx_);
1428  
1429  
        typedef refcnt_requests_type::iterator iterator;
1430  
1431  
        iterator matches = refcnt_requests_->find(raw);
1432  
        if (matches != refcnt_requests_->end())
1433  
        {
1434  
            pending_decrefs = matches->second;
1435  
            matches->second += credit;
1436  
1437  
            // Increment requests need to be handled immediately.
1438  
1439  
            // If the given incref was fully compensated by a pending decref
1440  
            // (i.e. match_data is less than 0) then there is no need
1441  
            // to do anything more.
1442  
            if (matches->second > 0)
1443  
            {
1444  
                // credit > decrefs (case no 4): store the remaining incref to
1445  
                // be handled below.
1446  
                pending_incref = mapping(matches->first, matches->second);
1447  
                has_pending_incref = true;
1448  
1449  
                refcnt_requests_->erase(matches);
1450  
            }
1451  
            else if (matches->second == 0)
1452  
            {
1453  
                // credit == decref (case no. 3): if the incref offsets any
1454  
                // pending decref, just remove the pending decref request.
1455  
                refcnt_requests_->erase(matches);
1456  
            }
1457  
            else
1458  
            {
1459  
                // credit < decref (case no. 2): do nothing
1460  
            }
1461  
        }
1462  
        else
1463  
        {
1464  
            // case no. 1
1465  
            pending_incref = mapping(raw, credit);
1466  
            has_pending_incref = true;
1467  
        }
1468  
    }
1469  
1470  
    if (!has_pending_incref)
1471  
    {
1472  
        // no need to talk to AGAS, acknowledge the incref immediately
1473  
        return hpx::make_ready_future(pending_decrefs);
1474  
    }
1475  
1476  
    naming::gid_type const e_lower = pending_incref.first;
1477  
1478  
    lcos::future<std::int64_t> f =
1479  
        primary_ns_.increment_credit(pending_incref.second, e_lower, e_lower);
1480  
1481  
    // pass the amount of compensated decrefs to the callback
1482  
    using util::placeholders::_1;
1483  
    return f.then(util::bind(
1484  
            util::one_shot(&addressing_service::synchronize_with_async_incref),
1485  
            this, _1, keep_alive, pending_decrefs
1486  
        ));
1487  
} // }}}
1488  
1489  
///////////////////////////////////////////////////////////////////////////////
1490  
void addressing_service::decref(
1491  
    naming::gid_type const& gid
1492  
  , std::int64_t credit
1493  
  , error_code& ec
1494  
    )
1495  
{ // {{{ decref implementation
1496  
    naming::gid_type raw(naming::detail::get_stripped_gid(gid));
1497  
1498  
    if (HPX_UNLIKELY(nullptr == threads::get_self_ptr()))
1499  
    {
1500  
        // reschedule this call as an HPX thread
1501  
        void (addressing_service::*decref_ptr)(
1502  
            naming::gid_type const&
1503  
          , std::int64_t
1504  
          , error_code&
1505  
        ) = &addressing_service::decref;
1506  
1507  
        threads::register_thread_nullary(
1508  
            util::deferred_call(decref_ptr, this, raw, credit, std::ref(throws)),
1509  
            "addressing_service::decref", threads::pending, true,
1510  
            threads::thread_priority_normal, std::size_t(-1),
1511  
            threads::thread_stacksize_default, ec);
1512  
1513  
        return;
1514  
    }
1515  
1516  
    if (HPX_UNLIKELY(credit <= 0))
1517  
    {
1518  
        HPX_THROWS_IF(ec, bad_parameter
1519  
          , "addressing_service::decref"
1520  
          , boost::str(boost::format("invalid credit count of %1%") % credit));
1521  
        return;
1522  
    }
1523  
1524  
    try {
1525  
        std::unique_lock<mutex_type> l(refcnt_requests_mtx_);
1526  
1527  
        // Match the decref request with entries in the incref table
1528  
        typedef refcnt_requests_type::iterator iterator;
1529  
        typedef refcnt_requests_type::value_type mapping;
1530  
1531  
        iterator matches = refcnt_requests_->find(raw);
1532  
        if (matches != refcnt_requests_->end())
1533  
        {
1534  
            matches->second -= credit;
1535  
        }
1536  
        else
1537  
        {
1538  
            std::pair<iterator, bool> p =
1539  
                refcnt_requests_->insert(mapping(raw, -credit));
1540  
1541  
            if (HPX_UNLIKELY(!p.second))
1542  
            {
1543  
                l.unlock();
1544  
1545  
                HPX_THROWS_IF(ec, bad_parameter
1546  
                  , "addressing_service::decref"
1547  
                  , boost::str(boost::format("couldn't insert decref request "
1548  
                        "for %1% (%2%)") % raw % credit));
1549  
                return;
1550  
            }
1551  
        }
1552  
1553  
        send_refcnt_requests(l, ec);
1554  
    }
1555  
    catch (hpx::exception const& e) {
1556  
        HPX_RETHROWS_IF(ec, e, "addressing_service::decref");
1557  
    }
1558  
} // }}}
1559  
1560  
///////////////////////////////////////////////////////////////////////////////
1561  
bool addressing_service::register_name(
1562  
    std::string const& name
1563  
  , naming::gid_type const& id
1564  
  , error_code& ec
1565  
    )
1566  
{ // {{{
1567  
    try {
1568  
        return symbol_ns_.bind(name, naming::detail::get_stripped_gid(id));
1569  
    }
1570  
    catch (hpx::exception const& e) {
1571  
        HPX_RETHROWS_IF(ec, e, "addressing_service::register_name");
1572  
        return false;
1573  
    }
1574  
} // }}}
1575  
1576  
static bool correct_credit_on_failure(future<bool> f, naming::id_type id,
1577  
    std::int64_t mutable_gid_credit, std::int64_t new_gid_credit)
1578  
{
1579  
    // Return the credit to the GID if the operation failed
1580  
    if ((f.has_exception() && mutable_gid_credit != 0) || !f.get())
1581  
    {
1582  
        naming::detail::add_credit_to_gid(id.get_gid(), new_gid_credit);
1583  
        return false;
1584  
    }
1585  
    return true;
1586  
}
1587  
1588  
lcos::future<bool> addressing_service::register_name_async(
1589  
    std::string const& name
1590  
  , naming::id_type const& id
1591  
    )
1592  
{ // {{{
1593  
    // We need to modify the reference count.
1594  
    naming::gid_type& mutable_gid = const_cast<naming::id_type&>(id).get_gid();
1595  
    naming::gid_type new_gid = naming::detail::split_gid_if_needed(mutable_gid).get();
1596  
1597  
    future<bool> f = symbol_ns_.bind_async(name, new_gid);
1598  
1599  
    std::int64_t new_credit = naming::detail::get_credit_from_gid(new_gid);
1600  
    if (new_credit != 0)
1601  
    {
1602  
        using util::placeholders::_1;
1603  
        return f.then(util::bind(
1604  
                util::one_shot(&correct_credit_on_failure),
1605  
                _1, id, std::int64_t(HPX_GLOBALCREDIT_INITIAL), new_credit
1606  
            ));
1607  
    }
1608  
1609  
    return f;
1610  
} // }}}
1611  
1612  
///////////////////////////////////////////////////////////////////////////////
1613  
naming::id_type addressing_service::unregister_name(
1614  
    std::string const& name
1615  
  , error_code& ec
1616  
    )
1617  
{ // {{{
1618  
    try {
1619  
        return symbol_ns_.unbind(name);
1620  
    }
1621  
    catch (hpx::exception const& e) {
1622  
        HPX_RETHROWS_IF(ec, e, "addressing_service::unregister_name");
1623  
        return naming::invalid_id;
1624  
    }
1625  
} // }}}
1626  
1627  
lcos::future<naming::id_type> addressing_service::unregister_name_async(
1628  
    std::string const& name
1629  
    )
1630  
{ // {{{
1631  
    return symbol_ns_.unbind_async(name);
1632  
} // }}}
1633  
1634  
///////////////////////////////////////////////////////////////////////////////
1635  
naming::id_type addressing_service::resolve_name(
1636  
    std::string const& name
1637  
  , error_code& ec
1638  
    )
1639  
{ // {{{
1640  
    try {
1641  
        return symbol_ns_.resolve(name);
1642  
    }
1643  
    catch (hpx::exception const& e) {
1644  
        HPX_RETHROWS_IF(ec, e, "addressing_service::resolve_name");
1645  
        return naming::invalid_id;
1646  
    }
1647  
} // }}}
1648  
1649  
lcos::future<naming::id_type> addressing_service::resolve_name_async(
1650  
    std::string const& name
1651  
    )
1652  
{ // {{{
1653  
    return symbol_ns_.resolve_async(name);
1654  
} // }}}
1655  
1656  
namespace detail
1657  
{
1658  
    hpx::future<hpx::id_type> on_register_event(hpx::future<bool> f,
1659  
        hpx::future<hpx::id_type> result_f)
1660  
    {
1661  
        if (!f.get())
1662  
        {
1663  
            HPX_THROW_EXCEPTION(bad_request,
1664  
                "hpx::agas::detail::on_register_event",
1665  
                "request 'symbol_ns_on_event' failed");
1666  
            return hpx::future<hpx::id_type>();
1667  
        }
1668  
1669  
        return result_f;
1670  
    }
1671  
}
1672  
1673  
future<hpx::id_type> addressing_service::on_symbol_namespace_event(
1674  
    std::string const& name, bool call_for_past_events)
1675  
{
1676  
    lcos::promise<naming::id_type, naming::gid_type> p;
1677  
    auto result_f = p.get_future();
1678  
1679  
    hpx::future<bool> f = symbol_ns_.on_event(name, call_for_past_events, p.get_id());
1680  
1681  
    using util::placeholders::_1;
1682  
    return f.then(util::bind(
1683  
            util::one_shot(&detail::on_register_event), _1, std::move(result_f)
1684  
        ));
1685  
}
1686  
1687  
}}
1688  
1689  
///////////////////////////////////////////////////////////////////////////////
1690  
typedef hpx::agas::server::symbol_namespace::on_event_action
1691  
    symbol_namespace_on_event_action;
1692  
1693  
HPX_REGISTER_BROADCAST_ACTION_DECLARATION(symbol_namespace_on_event_action,
1694  
        symbol_namespace_on_event_action)
1695  
HPX_REGISTER_BROADCAST_ACTION_ID(symbol_namespace_on_event_action,
1696  
        symbol_namespace_on_event_action,
1697  
        hpx::actions::broadcast_symbol_namespace_on_event_action_id)
1698  
1699  
namespace hpx { namespace agas
1700  
{
1701  
    namespace detail
1702  
    {
1703  
        std::vector<hpx::id_type> find_all_symbol_namespace_services()
1704  
        {
1705  
            std::vector<hpx::id_type> ids;
1706  
            for (hpx::id_type const& id : hpx::find_all_localities())
1707  
            {
1708  
                ids.push_back(hpx::id_type(
1709  
                    agas::symbol_namespace::get_service_instance(id),
1710  
                    id_type::unmanaged));
1711  
            }
1712  
            return ids;
1713  
        }
1714  
    }
1715  
1716  
/// Invoke the supplied hpx::function for every registered global name
1717  
bool addressing_service::iterate_ids(
1718  
    iterate_names_function_type const& f
1719  
  , error_code& ec
1720  
    )
1721  
{ // {{{
1722  
    try {
1723  
        server::symbol_namespace::iterate_action act;
1724  
        lcos::broadcast(act, detail::find_all_symbol_namespace_services(), f).get(ec);
1725  
1726  
        return !ec;
1727  
    }
1728  
    catch (hpx::exception const& e) {
1729  
        HPX_RETHROWS_IF(ec, e, "addressing_service::iterate_ids");
1730  
        return false;
1731  
    }
1732  
} // }}}
1733  
1734  
// This function has to return false if the key is already in the cache (true
1735  
// means go ahead with the cache update).
1736  
bool check_for_collisions(
1737  
    addressing_service::gva_cache_key const& new_key
1738  
  , addressing_service::gva_cache_key const& old_key
1739  
    )
1740  
{
1741  
    return (new_key.get_gid() != old_key.get_gid())
1742  
        || (new_key.get_count() != old_key.get_count());
1743  
}
1744  
1745  
void addressing_service::update_cache_entry(
1746  
    naming::gid_type const& id
1747  
  , gva const& g
1748  
  , error_code& ec
1749  
    )
1750  
{ // {{{
1751  
    if (!caching_)
1752  
    {
1753  
        // If caching is disabled, we silently pretend success.
1754  
        if (&ec != &throws)
1755  
            ec = make_success_code();
1756  
        return;
1757  
    }
1758  
1759  
    // don't look at cache if id is marked as non-cache-able
1760  
    if (!naming::detail::store_in_cache(id))
1761  
    {
1762  
        if (&ec != &throws)
1763  
            ec = make_success_code();
1764  
        return;
1765  
    }
1766  
1767  
    naming::gid_type gid = naming::detail::get_stripped_gid(id);
1768  
1769  
    // don't look at the cache if the id is locally managed
1770  
    if (naming::get_locality_id_from_gid(gid) ==
1771  
        naming::get_locality_id_from_gid(locality_))
1772  
    {
1773  
        if (&ec != &throws)
1774  
            ec = make_success_code();
1775  
        return;
1776  
    }
1777  
1778  
    if(hpx::threads::get_self_ptr() == nullptr)
1779  
    {
1780  
        // Don't update the cache while HPX is starting up ...
1781  
        if(hpx::is_starting())
1782  
        {
1783  
            return;
1784  
        }
1785  
        void (addressing_service::*update_cache_entry_ptr)(
1786  
            naming::gid_type const&
1787  
          , gva const &
1788  
          , error_code&
1789  
        ) = &addressing_service::update_cache_entry;
1790  
        threads::register_thread_nullary(
1791  
            util::deferred_call(update_cache_entry_ptr, this, id, g, std::ref(throws)),
1792  
            "addressing_service::update_cache_entry", threads::pending, true,
1793  
            threads::thread_priority_normal, std::size_t(-1),
1794  
            threads::thread_stacksize_default, ec);
1795  
    }
1796  
1797  
    try {
1798  
        // The entry in AGAS for a locality's RTS component has a count of 0,
1799  
        // so we convert it to 1 here so that the cache doesn't break.
1800  
        const std::uint64_t count = (g.count ? g.count : 1);
1801  
1802  
        LAGAS_(debug) <<
1803  
            ( boost::format(
1804  
            "addressing_service::update_cache_entry, gid(%1%), count(%2%)"
1805  
            ) % gid % count);
1806  
1807  
        const gva_cache_key key(gid, count);
1808  
1809  
        {
1810  
            std::lock_guard<mutex_type> lock(gva_cache_mtx_);
1811  
            if (!gva_cache_->update_if(key, g, check_for_collisions))
1812  
            {
1813  
                if (LAGAS_ENABLED(warning))
1814  
                {
1815  
                    // Figure out who we collided with.
1816  
                    addressing_service::gva_cache_key idbase;
1817  
                    addressing_service::gva_cache_type::entry_type e;
1818  
1819  
                    if (!gva_cache_->get_entry(key, idbase, e))
1820  
                    {
1821  
                        // This is impossible under sane conditions.
1822  
                        HPX_THROWS_IF(ec, invalid_data
1823  
                          , "addressing_service::update_cache_entry"
1824  
                          , "data corruption or lock error occurred in cache");
1825  
                        return;
1826  
                    }
1827  
1828  
                    LAGAS_(warning) <<
1829  
                        ( boost::format(
1830  
                            "addressing_service::update_cache_entry, "
1831  
                            "aborting update due to key collision in cache, "
1832  
                            "new_gid(%1%), new_count(%2%), old_gid(%3%), old_count(%4%)"
1833  
                        ) % gid % count % idbase.get_gid() % idbase.get_count());
1834  
                }
1835  
            }
1836  
        }
1837  
1838  
        if (&ec != &throws)
1839  
            ec = make_success_code();
1840  
    }
1841  
    catch (hpx::exception const& e) {
1842  
        HPX_RETHROWS_IF(ec, e, "addressing_service::update_cache_entry");
1843  
    }
1844  
} // }}}
1845  
1846  
bool addressing_service::get_cache_entry(
1847  
    naming::gid_type const& gid
1848  
  , gva& gva
1849  
  , naming::gid_type& idbase
1850  
  , error_code& ec
1851  
    )
1852  
{
1853  
    // Don't use the cache while HPX is starting up
1854  
    if(hpx::is_starting())
1855  
    {
1856  
        return false;
1857  
    }
1858  
    HPX_ASSERT(hpx::threads::get_self_ptr());
1859  
    gva_cache_key k(gid);
1860  
    gva_cache_key idbase_key;
1861  
1862  
    std::unique_lock<mutex_type> lock(gva_cache_mtx_);
1863  
    if(gva_cache_->get_entry(k, idbase_key, gva))
1864  
    {
1865  
        const std::uint64_t id_msb =
1866  
            naming::detail::strip_internal_bits_from_gid(gid.get_msb());
1867  
1868  
        if (HPX_UNLIKELY(id_msb != idbase_key.get_gid().get_msb()))
1869  
        {
1870  
            lock.unlock();
1871  
            HPX_THROWS_IF(ec, internal_server_error
1872  
              , "addressing_service::get_cache_entry"
1873  
              , "bad entry in cache, MSBs of GID base and GID do not match");
1874  
            return false;
1875  
        }
1876  
        idbase = idbase_key.get_gid();
1877  
        return true;
1878  
    }
1879  
1880  
    return false;
1881  
}
1882  
1883  
1884  
void addressing_service::clear_cache(
1885  
    error_code& ec
1886  
    )
1887  
{ // {{{
1888  
    if (!caching_)
1889  
    {
1890  
        // If caching is disabled, we silently pretend success.
1891  
        if (&ec != &throws)
1892  
            ec = make_success_code();
1893  
        return;
1894  
    }
1895  
1896  
    try {
1897  
        LAGAS_(warning) << "addressing_service::clear_cache, clearing cache";
1898  
1899  
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
1900  
1901  
        gva_cache_->clear();
1902  
1903  
        if (&ec != &throws)
1904  
            ec = make_success_code();
1905  
    }
1906  
    catch (hpx::exception const& e) {
1907  
        HPX_RETHROWS_IF(ec, e, "addressing_service::clear_cache");
1908  
    }
1909  
} // }}}
1910  
1911  
void addressing_service::remove_cache_entry(
1912  
    naming::gid_type const& id
1913  
  , error_code& ec
1914  
    )
1915  
{
1916  
    // If caching is disabled, we silently pretend success.
1917  
    if (!caching_)
1918  
    {
1919  
        if (&ec != &throws)
1920  
            ec = make_success_code();
1921  
        return;
1922  
    }
1923  
1924  
    // don't look at cache if id is marked as non-cache-able
1925  
    if (!naming::detail::store_in_cache(id))
1926  
    {
1927  
        if (&ec != &throws)
1928  
            ec = make_success_code();
1929  
        return;
1930  
    }
1931  
1932  
    naming::gid_type gid = naming::detail::get_stripped_gid(id);
1933  
1934  
    // don't look at the cache if the id is locally managed
1935  
    if (naming::get_locality_id_from_gid(gid) ==
1936  
        naming::get_locality_id_from_gid(locality_))
1937  
    {
1938  
        if (&ec != &throws)
1939  
            ec = make_success_code();
1940  
        return;
1941  
    }
1942  
1943  
    try {
1944  
        LAGAS_(warning) << "addressing_service::remove_cache_entry";
1945  
1946  
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
1947  
1948  
        gva_cache_->erase(
1949  
            [&gid](std::pair<gva_cache_key, gva> const& p)
1950  
            {
1951  
                return gid == p.first.get_gid();
1952  
            });
1953  
1954  
        if (&ec != &throws)
1955  
            ec = make_success_code();
1956  
    }
1957  
    catch (hpx::exception const& e) {
1958  
        HPX_RETHROWS_IF(ec, e, "addressing_service::clear_cache");
1959  
    }
1960  
}
1961  
1962  
// Disable refcnt caching during shutdown
1963  
void addressing_service::start_shutdown(error_code& ec)
1964  
{
1965  
    // If caching is disabled, we silently pretend success.
1966  
    if (!caching_)
1967  
        return;
1968  
1969  
    std::unique_lock<mutex_type> l(refcnt_requests_mtx_);
1970  
    enable_refcnt_caching_ = false;
1971  
    send_refcnt_requests_sync(l, ec);
1972  
}
1973  
1974  
namespace detail
1975  
{
1976  
    // get action code from counter type
1977  
    namespace_action_code retrieve_action_code(
1978  
        std::string const& name
1979  
      , error_code& ec
1980  
        )
1981  
    {
1982  
        performance_counters::counter_path_elements p;
1983  
        performance_counters::get_counter_path_elements(name, p, ec);
1984  
        if (ec) return invalid_request;
1985  
1986  
        if (p.objectname_ != "agas")
1987  
        {
1988  
            HPX_THROWS_IF(ec, bad_parameter, "retrieve_action_code",
1989  
                "unknown performance counter (unrelated to AGAS)");
1990  
            return invalid_request;
1991  
        }
1992  
1993  
        // component_ns
1994  
        for (std::size_t i = 0;
1995  
             i != num_component_namespace_services;
1996  
             ++i)
1997  
        {
1998  
            if (p.countername_ == component_namespace_services[i].name_)
1999  
                return component_namespace_services[i].code_;
2000  
        }
2001  
2002  
        // locality_ns
2003  
        for (std::size_t i = 0;
2004  
             i != num_locality_namespace_services;
2005  
             ++i)
2006  
        {
2007  
            if (p.countername_ == locality_namespace_services[i].name_)
2008  
                return locality_namespace_services[i].code_;
2009  
        }
2010  
2011  
        // primary_ns
2012  
        for (std::size_t i = 0;
2013  
             i != num_primary_namespace_services;
2014  
             ++i)
2015  
        {
2016  
            if (p.countername_ == primary_namespace_services[i].name_)
2017  
                return primary_namespace_services[i].code_;
2018  
        }
2019  
2020  
        // symbol_ns
2021  
        for (std::size_t i = 0;
2022  
             i != num_symbol_namespace_services;
2023  
             ++i)
2024  
        {
2025  
            if (p.countername_ == symbol_namespace_services[i].name_)
2026  
                return symbol_namespace_services[i].code_;
2027  
        }
2028  
2029  
        HPX_THROWS_IF(ec, bad_parameter, "retrieve_action_code",
2030  
            "unknown performance counter (unrelated to AGAS)");
2031  
        return invalid_request;
2032  
    }
2033  
2034  
    // get service action code from counter type
2035  
    namespace_action_code retrieve_action_service_code(
2036  
        std::string const& name
2037  
      , error_code& ec
2038  
        )
2039  
    {
2040  
        performance_counters::counter_path_elements p;
2041  
        performance_counters::get_counter_path_elements(name, p, ec);
2042  
        if (ec) return invalid_request;
2043  
2044  
        if (p.objectname_ != "agas")
2045  
        {
2046  
            HPX_THROWS_IF(ec, bad_parameter, "retrieve_action_service_code",
2047  
                "unknown performance counter (unrelated to AGAS)");
2048  
            return invalid_request;
2049  
        }
2050  
2051  
        // component_ns
2052  
        for (std::size_t i = 0;
2053  
             i != num_component_namespace_services;
2054  
             ++i)
2055  
        {
2056  
            if (p.countername_ == component_namespace_services[i].name_)
2057  
                return component_namespace_services[i].service_code_;
2058  
        }
2059  
2060  
        // locality_ns
2061  
        for (std::size_t i = 0;
2062  
             i != num_locality_namespace_services;
2063  
             ++i)
2064  
        {
2065  
            if (p.countername_ == locality_namespace_services[i].name_)
2066  
                return locality_namespace_services[i].service_code_;
2067  
        }
2068  
2069  
        // primary_ns
2070  
        for (std::size_t i = 0;
2071  
             i != num_primary_namespace_services;
2072  
             ++i)
2073  
        {
2074  
            if (p.countername_ == primary_namespace_services[i].name_)
2075  
                return primary_namespace_services[i].service_code_;
2076  
        }
2077  
2078  
        // symbol_ns
2079  
        for (std::size_t i = 0;
2080  
             i != num_symbol_namespace_services;
2081  
             ++i)
2082  
        {
2083  
            if (p.countername_ == symbol_namespace_services[i].name_)
2084  
                return symbol_namespace_services[i].service_code_;
2085  
        }
2086  
2087  
        HPX_THROWS_IF(ec, bad_parameter, "retrieve_action_service_code",
2088  
            "unknown performance counter (unrelated to AGAS)");
2089  
        return invalid_request;
2090  
    }
2091  
}
2092  
2093  
///////////////////////////////////////////////////////////////////////////////
2094  
// Helper functions to access the current cache statistics
2095  
std::uint64_t addressing_service::get_cache_entries(bool reset)
2096  
{
2097  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2098  
    return gva_cache_->size();
2099  
}
2100  
2101  
std::uint64_t addressing_service::get_cache_hits(bool reset)
2102  
{
2103  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2104  
    return gva_cache_->get_statistics().hits(reset);
2105  
}
2106  
2107  
std::uint64_t addressing_service::get_cache_misses(bool reset)
2108  
{
2109  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2110  
    return gva_cache_->get_statistics().misses(reset);
2111  
}
2112  
2113  
std::uint64_t addressing_service::get_cache_evictions(bool reset)
2114  
{
2115  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2116  
    return gva_cache_->get_statistics().evictions(reset);
2117  
}
2118  
2119  
std::uint64_t addressing_service::get_cache_insertions(bool reset)
2120  
{
2121  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2122  
    return gva_cache_->get_statistics().insertions(reset);
2123  
}
2124  
2125  
///////////////////////////////////////////////////////////////////////////////
2126  
std::uint64_t addressing_service::get_cache_get_entry_count(bool reset)
2127  
{
2128  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2129  
    return gva_cache_->get_statistics().get_get_entry_count(reset);
2130  
}
2131  
2132  
std::uint64_t addressing_service::get_cache_insertion_entry_count(bool reset)
2133  
{
2134  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2135  
    return gva_cache_->get_statistics().get_insert_entry_count(reset);
2136  
}
2137  
2138  
std::uint64_t addressing_service::get_cache_update_entry_count(bool reset)
2139  
{
2140  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2141  
    return gva_cache_->get_statistics().get_update_entry_count(reset);
2142  
}
2143  
2144  
std::uint64_t addressing_service::get_cache_erase_entry_count(bool reset)
2145  
{
2146  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2147  
    return gva_cache_->get_statistics().get_erase_entry_count(reset);
2148  
}
2149  
2150  
std::uint64_t addressing_service::get_cache_get_entry_time(bool reset)
2151  
{
2152  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2153  
    return gva_cache_->get_statistics().get_get_entry_time(reset);
2154  
}
2155  
2156  
std::uint64_t addressing_service::get_cache_insertion_entry_time(bool reset)
2157  
{
2158  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2159  
    return gva_cache_->get_statistics().get_insert_entry_time(reset);
2160  
}
2161  
2162  
std::uint64_t addressing_service::get_cache_update_entry_time(bool reset)
2163  
{
2164  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2165  
    return gva_cache_->get_statistics().get_update_entry_time(reset);
2166  
}
2167  
2168  
std::uint64_t addressing_service::get_cache_erase_entry_time(bool reset)
2169  
{
2170  
    std::lock_guard<mutex_type> lock(gva_cache_mtx_);
2171  
    return gva_cache_->get_statistics().get_erase_entry_time(reset);
2172  
}
2173  
2174  
/// Install performance counter types exposing properties from the local cache.
2175  
void addressing_service::register_counter_types()
2176  
{ // {{{
2177  
    using util::placeholders::_1;
2178  
    using util::placeholders::_2;
2179  
2180  
    // install
2181  
    util::function_nonser<std::int64_t(bool)> cache_entries(
2182  
        util::bind(&addressing_service::get_cache_entries, this, _1));
2183  
    util::function_nonser<std::int64_t(bool)> cache_hits(
2184  
        util::bind(&addressing_service::get_cache_hits, this, _1));
2185  
    util::function_nonser<std::int64_t(bool)> cache_misses(
2186  
        util::bind(&addressing_service::get_cache_misses, this, _1));
2187  
    util::function_nonser<std::int64_t(bool)> cache_evictions(
2188  
        util::bind(&addressing_service::get_cache_evictions, this, _1));
2189  
    util::function_nonser<std::int64_t(bool)> cache_insertions(
2190  
        util::bind(&addressing_service::get_cache_insertions, this, _1));
2191  
2192  
    util::function_nonser<std::int64_t(bool)> cache_get_entry_count(
2193  
        util::bind(
2194  
            &addressing_service::get_cache_get_entry_count, this, _1));
2195  
    util::function_nonser<std::int64_t(bool)> cache_insertion_count(
2196  
        util::bind(
2197  
            &addressing_service::get_cache_insertion_entry_count, this, _1));
2198  
    util::function_nonser<std::int64_t(bool)> cache_update_entry_count(
2199  
        util::bind(
2200  
            &addressing_service::get_cache_update_entry_count, this, _1));
2201  
    util::function_nonser<std::int64_t(bool)> cache_erase_entry_count(
2202  
        util::bind(
2203  
            &addressing_service::get_cache_erase_entry_count, this, _1));
2204  
2205  
    util::function_nonser<std::int64_t(bool)> cache_get_entry_time(
2206  
        util::bind(
2207  
            &addressing_service::get_cache_get_entry_time, this, _1));
2208  
    util::function_nonser<std::int64_t(bool)> cache_insertion_time(
2209  
        util::bind(
2210  
            &addressing_service::get_cache_insertion_entry_time, this, _1));
2211  
    util::function_nonser<std::int64_t(bool)> cache_update_entry_time(
2212  
        util::bind(
2213  
            &addressing_service::get_cache_update_entry_time, this, _1));
2214  
    util::function_nonser<std::int64_t(bool)> cache_erase_entry_time(
2215  
        util::bind(
2216  
            &addressing_service::get_cache_erase_entry_time, this, _1));
2217  
2218  
    performance_counters::generic_counter_type_data const counter_types[] =
2219  
    {
2220  
        { "/agas/count/cache/entries", performance_counters::counter_raw,
2221  
          "returns the number of cache entries in the AGAS cache",
2222  
          HPX_PERFORMANCE_COUNTER_V1,
2223  
          util::bind(&performance_counters::locality_raw_counter_creator,
2224  
              _1, cache_entries, _2),
2225  
          &performance_counters::locality_counter_discoverer,
2226  
          ""
2227  
        },
2228  
        { "/agas/count/cache/hits", performance_counters::counter_raw,
2229  
          "returns the number of cache hits while accessing the AGAS cache",
2230  
          HPX_PERFORMANCE_COUNTER_V1,
2231  
          util::bind(&performance_counters::locality_raw_counter_creator,
2232  
              _1, cache_hits, _2),
2233  
          &performance_counters::locality_counter_discoverer,
2234  
          ""
2235  
        },
2236  
        { "/agas/count/cache/misses", performance_counters::counter_raw,
2237  
          "returns the number of cache misses while accessing the AGAS cache",
2238  
          HPX_PERFORMANCE_COUNTER_V1,
2239  
          util::bind(&performance_counters::locality_raw_counter_creator,
2240  
              _1, cache_misses, _2),
2241  
          &performance_counters::locality_counter_discoverer,
2242  
          ""
2243  
        },
2244  
        { "/agas/count/cache/evictions", performance_counters::counter_raw,
2245  
          "returns the number of cache evictions from the AGAS cache",
2246  
          HPX_PERFORMANCE_COUNTER_V1,
2247  
          util::bind(&performance_counters::locality_raw_counter_creator,
2248  
              _1, cache_evictions, _2),
2249  
          &performance_counters::locality_counter_discoverer,
2250  
          ""
2251  
        },
2252  
        { "/agas/count/cache/insertions", performance_counters::counter_raw,
2253  
          "returns the number of cache insertions into the AGAS cache",
2254  
          HPX_PERFORMANCE_COUNTER_V1,
2255  
          util::bind(&performance_counters::locality_raw_counter_creator,
2256  
              _1, cache_insertions, _2),
2257  
          &performance_counters::locality_counter_discoverer,
2258  
          ""
2259  
        },
2260  
        { "/agas/count/cache/get_entry", performance_counters::counter_raw,
2261  
          "returns the number of invocations of get_entry function of the "
2262  
                "AGAS cache",
2263  
          HPX_PERFORMANCE_COUNTER_V1,
2264  
          util::bind(&performance_counters::locality_raw_counter_creator,
2265  
              _1, cache_get_entry_count, _2),
2266  
          &performance_counters::locality_counter_discoverer,
2267  
          ""
2268  
        },
2269  
        { "/agas/count/cache/insert_entry", performance_counters::counter_raw,
2270  
          "returns the number of invocations of insert function of the "
2271  
                "AGAS cache",
2272  
          HPX_PERFORMANCE_COUNTER_V1,
2273  
          util::bind(&performance_counters::locality_raw_counter_creator,
2274  
              _1, cache_insertion_count, _2),
2275  
          &performance_counters::locality_counter_discoverer,
2276  
          ""
2277  
        },
2278  
        { "/agas/count/cache/update_entry", performance_counters::counter_raw,
2279  
          "returns the number of invocations of update_entry function of the "
2280  
                "AGAS cache",
2281  
          HPX_PERFORMANCE_COUNTER_V1,
2282  
          util::bind(&performance_counters::locality_raw_counter_creator,
2283  
              _1, cache_update_entry_count, _2),
2284  
          &performance_counters::locality_counter_discoverer,
2285  
          ""
2286  
        },
2287  
        { "/agas/count/cache/erase_entry", performance_counters::counter_raw,
2288  
          "returns the number of invocations of erase_entry function of the "
2289  
                "AGAS cache",
2290  
          HPX_PERFORMANCE_COUNTER_V1,
2291  
          util::bind(&performance_counters::locality_raw_counter_creator,
2292  
              _1, cache_erase_entry_count, _2),
2293  
          &performance_counters::locality_counter_discoverer,
2294  
          ""
2295  
        },
2296  
        { "/agas/time/cache/get_entry", performance_counters::counter_raw,
2297  
          "returns the overall time spent executing of the get_entry API "
2298  
                "function of the AGAS cache",
2299  
          HPX_PERFORMANCE_COUNTER_V1,
2300  
          util::bind(&performance_counters::locality_raw_counter_creator,
2301  
              _1, cache_get_entry_time, _2),
2302  
          &performance_counters::locality_counter_discoverer,
2303  
          "ns"
2304  
        },
2305  
        { "/agas/time/cache/insert_entry", performance_counters::counter_raw,
2306  
          "returns the overall time spent executing of the insert_entry API "
2307  
              "function of the AGAS cache",
2308  
          HPX_PERFORMANCE_COUNTER_V1,
2309  
          util::bind(&performance_counters::locality_raw_counter_creator,
2310  
              _1, cache_insertion_time, _2),
2311  
          &performance_counters::locality_counter_discoverer,
2312  
          ""
2313  
        },
2314  
        { "/agas/time/cache/update_entry", performance_counters::counter_raw,
2315  
          "returns the overall time spent executing of the update_entry API "
2316  
                "function of the AGAS cache",
2317  
          HPX_PERFORMANCE_COUNTER_V1,
2318  
          util::bind(&performance_counters::locality_raw_counter_creator,
2319  
              _1, cache_update_entry_time, _2),
2320  
          &performance_counters::locality_counter_discoverer,
2321  
          "ns"
2322  
        },
2323  
        { "/agas/time/cache/erase_entry", performance_counters::counter_raw,
2324  
          "returns the overall time spent executing of the erase_entry API "
2325  
                "function of the AGAS cache",
2326  
          HPX_PERFORMANCE_COUNTER_V1,
2327  
          util::bind(&performance_counters::locality_raw_counter_creator,
2328  
              _1, cache_erase_entry_time, _2),
2329  
          &performance_counters::locality_counter_discoverer,
2330  
          ""
2331  
        },
2332  
    };
2333  
    performance_counters::install_counter_types(
2334  
        counter_types, sizeof(counter_types)/sizeof(counter_types[0]));
2335  
2336  
    // install counters for services
2337  
    primary_ns_.register_counter_types();
2338  
    component_ns_->register_counter_types();
2339  
    locality_ns_->register_counter_types();
2340  
    symbol_ns_.register_counter_types();
2341  
2342  
    // register root server
2343  
    std::uint32_t locality_id =
2344  
        naming::get_locality_id_from_gid(get_local_locality());
2345  
    locality_ns_->register_server_instance(locality_id);
2346  
    primary_ns_.register_server_instance(locality_id);
2347  
    component_ns_->register_server_instance(locality_id);
2348  
    symbol_ns_.register_server_instance(locality_id);
2349  
} // }}}
2350  
2351  
void addressing_service::garbage_collect_non_blocking(
2352  
    error_code& ec
2353  
    )
2354  
{
2355  
    std::unique_lock<mutex_type> l(refcnt_requests_mtx_, std::try_to_lock);
2356  
    if (!l.owns_lock()) return;     // no need to compete for garbage collection
2357  
2358  
    send_refcnt_requests_non_blocking(l, ec);
2359  
}
2360  
2361  
void addressing_service::garbage_collect(
2362  
    error_code& ec
2363  
    )
2364  
{
2365  
    std::unique_lock<mutex_type> l(refcnt_requests_mtx_, std::try_to_lock);
2366  
    if (!l.owns_lock()) return;     // no need to compete for garbage collection
2367  
2368  
    send_refcnt_requests_sync(l, ec);
2369  
}
2370  
2371  
void addressing_service::send_refcnt_requests(
2372  
    std::unique_lock<addressing_service::mutex_type>& l
2373  
  , error_code& ec
2374  
    )
2375  
{
2376  
    if (!l.owns_lock())
2377  
    {
2378  
        HPX_THROWS_IF(ec, lock_error
2379  
          , "addressing_service::send_refcnt_requests"
2380  
          , "mutex is not locked");
2381  
        return;
2382  
    }
2383  
2384  
    if (!enable_refcnt_caching_ || max_refcnt_requests_ == ++refcnt_requests_count_)
2385  
        send_refcnt_requests_non_blocking(l, ec);
2386  
2387  
    else if (&ec != &throws)
2388  
        ec = make_success_code();
2389  
}
2390  
2391  
#if defined(HPX_HAVE_AGAS_DUMP_REFCNT_ENTRIES)
2392  
    void dump_refcnt_requests(
2393  
        std::unique_lock<addressing_service::mutex_type>& l
2394  
      , addressing_service::refcnt_requests_type const& requests
2395  
      , const char* func_name
2396  
        )
2397  
    {
2398  
        HPX_ASSERT(l.owns_lock());
2399  
2400  
        std::stringstream ss;
2401  
        ss << ( boost::format(
2402  
              "%1%, dumping client-side refcnt table, requests(%2%):")
2403  
              % func_name % requests.size());
2404  
2405  
        typedef addressing_service::refcnt_requests_type::const_reference
2406  
            const_reference;
2407  
2408  
        for (const_reference e : requests)
2409  
        {
2410  
            // The [client] tag is in there to make it easier to filter
2411  
            // through the logs.
2412  
            ss << ( boost::format(
2413  
                  "\n  [client] gid(%1%), credits(%2%)")
2414  
                  % e.first
2415  
                  % e.second);
2416  
        }
2417  
2418  
        LAGAS_(debug) << ss.str();
2419  
    }
2420  
#endif
2421  
2422  
void addressing_service::send_refcnt_requests_non_blocking(
2423  
    std::unique_lock<addressing_service::mutex_type>& l
2424  
  , error_code& ec
2425  
    )
2426  
{
2427  
    HPX_ASSERT(l.owns_lock());
2428  
2429  
    try {
2430  
        if (refcnt_requests_->empty())
2431  
        {
2432  
            l.unlock();
2433  
            return;
2434  
        }
2435  
2436  
        std::shared_ptr<refcnt_requests_type> p(new refcnt_requests_type);
2437  
2438  
        p.swap(refcnt_requests_);
2439  
        refcnt_requests_count_ = 0;
2440  
2441  
        l.unlock();
2442  
2443  
        LAGAS_(info) << (boost::format(
2444  
            "addressing_service::send_refcnt_requests_non_blocking, "
2445  
            "requests(%1%)")
2446  
            % p->size());
2447  
2448  
#if defined(HPX_HAVE_AGAS_DUMP_REFCNT_ENTRIES)
2449  
        if (LAGAS_ENABLED(debug))
2450  
            dump_refcnt_requests(l, *p,
2451  
                "addressing_service::send_refcnt_requests_non_blocking");
2452  
#endif
2453  
2454  
        // collect all requests for each locality
2455  
        typedef
2456  
            std::map<
2457  
                naming::id_type,
2458  
                std::vector<
2459  
                    hpx::util::tuple<std::int64_t, naming::gid_type, naming::gid_type>
2460  
                >
2461  
            >
2462  
            requests_type;
2463  
        requests_type requests;
2464  
2465  
        for (refcnt_requests_type::const_reference e : *p)
2466  
        {
2467  
            HPX_ASSERT(e.second < 0);
2468  
2469  
            naming::gid_type raw(e.first);
2470  
2471  
            naming::id_type target(
2472  
                primary_namespace::get_service_instance(raw)
2473  
              , naming::id_type::unmanaged);
2474  
2475  
            requests[target].push_back(hpx::util::make_tuple(e.second, raw, raw));
2476  
        }
2477  
2478  
        // send requests to all locality
2479  
        requests_type::iterator end = requests.end();
2480  
        for (requests_type::iterator it = requests.begin(); it != end; ++it)
2481  
        {
2482  
            server::primary_namespace::decrement_credit_action action;
2483  
            hpx::apply(action, std::move(it->first), std::move(it->second));
2484  
        }
2485  
2486  
        if (&ec != &throws)
2487  
            ec = make_success_code();
2488  
    }
2489  
    catch (hpx::exception const& e) {
2490  
        l.unlock();
2491  
        HPX_RETHROWS_IF(ec, e,
2492  
            "addressing_service::send_refcnt_requests_non_blocking");
2493  
    }
2494  
}
2495  
2496  
std::vector<hpx::future<std::vector<std::int64_t> > >
2497  
addressing_service::send_refcnt_requests_async(
2498  
    std::unique_lock<addressing_service::mutex_type>& l
2499  
    )
2500  
{
2501  
    HPX_ASSERT(l.owns_lock());
2502  
2503  
    if (refcnt_requests_->empty())
2504  
    {
2505  
        l.unlock();
2506  
        return std::vector<hpx::future<std::vector<std::int64_t> > >();
2507  
    }
2508  
2509  
    std::shared_ptr<refcnt_requests_type> p(new refcnt_requests_type);
2510  
2511  
    p.swap(refcnt_requests_);
2512  
    refcnt_requests_count_ = 0;
2513  
2514  
    l.unlock();
2515  
2516  
    LAGAS_(info) << (boost::format(
2517  
        "addressing_service::send_refcnt_requests_async, "
2518  
        "requests(%1%)")
2519  
        % p->size());
2520  
2521  
#if defined(HPX_HAVE_AGAS_DUMP_REFCNT_ENTRIES)
2522  
    if (LAGAS_ENABLED(debug))
2523  
        dump_refcnt_requests(l, *p,
2524  
            "addressing_service::send_refcnt_requests_sync");
2525  
#endif
2526  
2527  
    // collect all requests for each locality
2528  
    typedef
2529  
        std::map<
2530  
            naming::id_type,
2531  
            std::vector<
2532  
                hpx::util::tuple<std::int64_t, naming::gid_type, naming::gid_type>
2533  
            >
2534  
        >
2535  
        requests_type;
2536  
    requests_type requests;
2537  
2538  
    std::vector<hpx::future<std::vector<std::int64_t> > > lazy_results;
2539  
    for (refcnt_requests_type::const_reference e : *p)
2540  
    {
2541  
        HPX_ASSERT(e.second < 0);
2542  
2543  
        naming::gid_type raw(e.first);
2544  
2545  
        naming::id_type target(
2546  
            primary_namespace::get_service_instance(raw)
2547  
          , naming::id_type::unmanaged);
2548  
2549  
        requests[target].push_back(hpx::util::make_tuple(e.second, raw, raw));
2550  
    }
2551  
2552  
    // send requests to all locality
2553  
    requests_type::const_iterator end = requests.end();
2554  
    for (requests_type::const_iterator it = requests.begin(); it != end; ++it)
2555  
    {
2556  
        server::primary_namespace::decrement_credit_action action;
2557  
        lazy_results.push_back(
2558  
            hpx::async(action, std::move(it->first), std::move(it->second)));
2559  
    }
2560  
2561  
    return lazy_results;
2562  
}
2563  
2564  
void addressing_service::send_refcnt_requests_sync(
2565  
    std::unique_lock<addressing_service::mutex_type>& l
2566  
  , error_code& ec
2567  
    )
2568  
{
2569  
    std::vector<hpx::future<std::vector<std::int64_t> > > lazy_results =
2570  
        send_refcnt_requests_async(l);
2571  
2572  
    // re throw possible errors
2573  
    when_all(lazy_results).get();
2574  
2575  
    if (&ec != &throws)
2576  
        ec = make_success_code();
2577  
}
2578  
2579  
///////////////////////////////////////////////////////////////////////////////
2580  
hpx::future<void> addressing_service::mark_as_migrated(
2581  
    naming::gid_type const& gid_
2582  
  , util::unique_function_nonser<std::pair<bool, hpx::future<void> >()> && f //-V669
2583  
    )
2584  
{
2585  
    if (!gid_)
2586  
    {
2587  
        return hpx::make_exceptional_future<void>(
2588  
            HPX_GET_EXCEPTION(bad_parameter,
2589  
                "addressing_service::mark_as_migrated",
2590  
                "invalid reference gid"));
2591  
    }
2592  
2593  
    naming::gid_type gid(naming::detail::get_stripped_gid(gid_));
2594  
2595  
    // Always first grab the AGAS lock before invoking the user supplied
2596  
    // function. The user supplied code will grab another lock. Both locks have
2597  
    // to be acquired and always in the same sequence.
2598  
    // The AGAS lock needs to be acquired first as the migrated object might
2599  
    // not exist on this locality, in which case it should not be accessed
2600  
    // anymore. The only way to determine whether the object still exists on
2601  
    // this locality is to query the migrated objects table in AGAS.
2602  
    typedef std::unique_lock<mutex_type> lock_type;
2603  
2604  
    lock_type lock(migrated_objects_mtx_);
2605  
    util::ignore_while_checking<lock_type> ignore(&lock);
2606  
2607  
    // call the user code for the component instance to be migrated, the
2608  
    // returned future becomes ready whenever the component instance can be
2609  
    // migrated (no threads are pending/active any more)
2610  
    std::pair<bool, hpx::future<void> > result = f();
2611  
2612  
    // mark the gid as 'migrated' right away - the worst what can happen is
2613  
    // that a parcel which comes in for this object is bouncing between this
2614  
    // locality and the locality managing the address resolution for the object
2615  
    if (result.first)
2616  
    {
2617  
        migrated_objects_table_type::iterator it =
2618  
            migrated_objects_table_.find(gid);
2619  
2620  
        // insert the object into the map of migrated objects
2621  
        if (it == migrated_objects_table_.end())
2622  
            migrated_objects_table_.insert(gid);
2623  
2624  
        // avoid interactions with the locking in the cache
2625  
        lock.unlock();
2626  
2627  
        // remove entry from cache
2628  
        remove_cache_entry(gid_);
2629  
    }
2630  
2631  
    return std::move(result.second);
2632  
}
2633  
2634  
void addressing_service::unmark_as_migrated(
2635  
    naming::gid_type const& gid_
2636  
    )
2637  
{
2638  
    if (!gid_)
2639  
    {
2640  
        HPX_THROW_EXCEPTION(bad_parameter,
2641  
            "addressing_service::unmark_as_migrated",
2642  
            "invalid reference gid");
2643  
        return;
2644  
    }
2645  
2646  
    naming::gid_type gid(naming::detail::get_stripped_gid(gid_));
2647  
2648  
    std::unique_lock<mutex_type> lock(migrated_objects_mtx_);
2649  
2650  
    migrated_objects_table_type::iterator it =
2651  
        migrated_objects_table_.find(gid);
2652  
2653  
    // insert the object into the map of migrated objects
2654  
    if (it != migrated_objects_table_.end())
2655  
    {
2656  
        migrated_objects_table_.erase(it);
2657  
2658  
        // remove entry from cache
2659  
        if (caching_ && naming::detail::store_in_cache(gid_))
2660  
        {
2661  
            // avoid interactions with the locking in the cache
2662  
            lock.unlock();
2663  
2664  
            // remove entry from cache
2665  
            remove_cache_entry(gid_);
2666  
        }
2667  
    }
2668  
}
2669  
2670  
hpx::future<std::pair<naming::id_type, naming::address> >
2671  
addressing_service::begin_migration_async(naming::id_type const& id)
2672  
{
2673  
    typedef std::pair<naming::id_type, naming::address> result_type;
2674  
2675  
    if (!id)
2676  
    {
2677  
        return hpx::make_exceptional_future<result_type>(
2678  
            HPX_GET_EXCEPTION(bad_parameter,
2679  
                "addressing_service::begin_migration_async",
2680  
                "invalid reference id"));
2681  
    }
2682  
2683  
    naming::gid_type gid(naming::detail::get_stripped_gid(id.get_gid()));
2684  
2685  
    return primary_ns_.begin_migration(gid);
2686  
}
2687  
2688  
hpx::future<bool> addressing_service::end_migration_async(
2689  
    naming::id_type const& id
2690  
    )
2691  
{
2692  
    if (!id)
2693  
    {
2694  
        return hpx::make_exceptional_future<bool>(
2695  
            HPX_GET_EXCEPTION(bad_parameter,
2696  
                "addressing_service::end_migration_async",
2697  
                "invalid reference id"));
2698  
    }
2699  
2700  
    naming::gid_type gid(naming::detail::get_stripped_gid(id.get_gid()));
2701  
2702  
    return primary_ns_.end_migration(gid);
2703  
}
2704  
2705  
bool addressing_service::was_object_migrated_locked(
2706  
    naming::gid_type const& gid_
2707  
    )
2708  
{
2709  
    naming::gid_type gid(naming::detail::get_stripped_gid(gid_));
2710  
2711  
    return
2712  
        migrated_objects_table_.find(gid) !=
2713  
        migrated_objects_table_.end();
2714  
}
2715  
2716  
std::pair<bool, components::pinned_ptr>
2717  
    addressing_service::was_object_migrated(
2718  
        naming::gid_type const& gid
2719  
      , util::unique_function_nonser<components::pinned_ptr()> && f //-V669
2720  
        )
2721  
{
2722  
    if (!gid)
2723  
    {
2724  
        HPX_THROW_EXCEPTION(bad_parameter,
2725  
            "addressing_service::was_object_migrated",
2726  
            "invalid reference gid");
2727  
        return std::make_pair(false, components::pinned_ptr());
2728  
    }
2729  
2730  
    // Always first grab the AGAS lock before invoking the user supplied
2731  
    // function. The user supplied code will grab another lock. Both locks have
2732  
    // to be acquired and always in the same sequence.
2733  
    // The AGAS lock needs to be acquired first as the migrated object might
2734  
    // not exist on this locality, in which case it should not be accessed
2735  
    // anymore. The only way to determine whether the object still exists on
2736  
    // this locality is to query the migrated objects table in AGAS.
2737  
    typedef std::unique_lock<mutex_type> lock_type;
2738  
2739  
    lock_type lock(migrated_objects_mtx_);
2740  
2741  
    if (was_object_migrated_locked(gid))
2742  
        return std::make_pair(true, components::pinned_ptr());
2743  
2744  
    util::ignore_while_checking<lock_type> ignore(&lock);
2745  
    return std::make_pair(false, f());
2746  
}
2747  
2748  
}}
2749  
2750  
///////////////////////////////////////////////////////////////////////////////
2751  
namespace hpx
2752  
{
2753  
    namespace detail
2754  
    {
2755  
        std::string name_from_basename(std::string const& basename,
2756  
            std::size_t idx)
2757  
        {
2758  
            HPX_ASSERT(!basename.empty());
2759  
2760  
            std::string name;
2761  
2762  
            if (basename[0] != '/')
2763  
                name += '/';
2764  
2765  
            name += basename;
2766  
            if (name[name.size()-1] != '/')
2767  
                name += '/';
2768  
            name += std::to_string(idx);
2769  
2770  
            return name;
2771  
        }
2772  
    }
2773  
2774  
    ///////////////////////////////////////////////////////////////////////////
2775  
    std::vector<hpx::future<hpx::id_type> >
2776  
        find_all_from_basename(std::string const& basename, std::size_t num_ids)
2777  
    {
2778  
        if (basename.empty())
2779  
        {
2780  
            HPX_THROW_EXCEPTION(bad_parameter,
2781  
                "hpx::find_all_from_basename",
2782  
                "no basename specified");
2783  
        }
2784  
2785  
        std::vector<hpx::future<hpx::id_type> > results;
2786  
        for(std::size_t i = 0; i != num_ids; ++i)
2787  
        {
2788  
            std::string name = detail::name_from_basename(basename, i);
2789  
            results.push_back(agas::on_symbol_namespace_event(
2790  
                std::move(name), true));
2791  
        }
2792  
        return results;
2793  
    }
2794  
2795  
    std::vector<hpx::future<hpx::id_type> >
2796  
        find_from_basename(std::string const& basename,
2797  
            std::vector<std::size_t> const& ids)
2798  
    {
2799  
        if (basename.empty())
2800  
        {
2801  
            HPX_THROW_EXCEPTION(bad_parameter,
2802  
                "hpx::find_from_basename",
2803  
                "no basename specified");
2804  
        }
2805  
2806  
        std::vector<hpx::future<hpx::id_type> > results;
2807  
        for (std::size_t i : ids)
2808  
        {
2809  
            std::string name = detail::name_from_basename(basename, i); //-V106
2810  
            results.push_back(agas::on_symbol_namespace_event(
2811  
                std::move(name), true));
2812  
        }
2813  
        return results;
2814  
    }
2815  
2816  
    hpx::future<hpx::id_type> find_from_basename(std::string const& basename,
2817  
        std::size_t sequence_nr)
2818  
    {
2819  
        if (basename.empty())
2820  
        {
2821  
            HPX_THROW_EXCEPTION(bad_parameter,
2822  
                "hpx::find_from_basename",
2823  
                "no basename specified");
2824  
        }
2825  
2826  
        if (sequence_nr == std::size_t(~0U))
2827  
            sequence_nr = std::size_t(naming::get_locality_id_from_id(find_here()));
2828  
2829  
        std::string name = detail::name_from_basename(basename, sequence_nr);
2830  
        return agas::on_symbol_namespace_event(std::move(name), true);
2831  
    }
2832  
2833  
    hpx::future<bool> register_with_basename(std::string const& basename,
2834  
        hpx::id_type id, std::size_t sequence_nr)
2835  
    {
2836  
        if (basename.empty())
2837  
        {
2838  
            HPX_THROW_EXCEPTION(bad_parameter,
2839  
                "hpx::register_with_basename",
2840  
                "no basename specified");
2841  
        }
2842  
2843  
        if (sequence_nr == std::size_t(~0U))
2844  
            sequence_nr = std::size_t(naming::get_locality_id_from_id(find_here()));
2845  
2846  
        std::string name = detail::name_from_basename(basename, sequence_nr);
2847  
        return agas::register_name(std::move(name), id);
2848  
    }
2849  
2850  
    hpx::future<hpx::id_type> unregister_with_basename(
2851  
        std::string const& basename, std::size_t sequence_nr)
2852  
    {
2853  
        if (basename.empty())
2854  
        {
2855  
            HPX_THROW_EXCEPTION(bad_parameter,
2856  
                "hpx::unregister_with_basename",
2857  
                "no basename specified");
2858  
        }
2859  
2860  
        if (sequence_nr == std::size_t(~0U))
2861  
            sequence_nr = std::size_t(naming::get_locality_id_from_id(find_here()));
2862  
2863  
        std::string name = detail::name_from_basename(basename, sequence_nr);
2864  
        return agas::unregister_name(std::move(name));
2865  
    }
2866  
}
2867  

Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.