/home/users/khuck/src/hpx-lsu/hpx/lcos/local/packaged_continuation.hpp

Line% of fetchesSource
1  
//  Copyright (c) 2007-2015 Hartmut Kaiser
2  
//  Copyright (c) 2014-2015 Agustin Berge
3  
//
4  
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
7  
#if !defined(HPX_LCOS_LOCAL_CONTINUATION_APR_17_2012_0150PM)
8  
#define HPX_LCOS_LOCAL_CONTINUATION_APR_17_2012_0150PM
9  
10  
#include <hpx/config.hpp>
11  
#include <hpx/error_code.hpp>
12  
#include <hpx/lcos/detail/future_data.hpp>
13  
#include <hpx/lcos/future.hpp>
14  
#include <hpx/runtime/launch_policy.hpp>
15  
#include <hpx/throw_exception.hpp>
16  
#include <hpx/traits/future_access.hpp>
17  
#include <hpx/traits/future_traits.hpp>
18  
#include <hpx/traits/is_executor.hpp>
19  
#include <hpx/util/decay.hpp>
20  
#include <hpx/util/thread_description.hpp>
21  
22  
#include <boost/intrusive_ptr.hpp>
23  
24  
#include <functional>
25  
#include <mutex>
26  
#include <utility>
27  
#include <type_traits>
28  
29  
///////////////////////////////////////////////////////////////////////////////
30  
namespace hpx { namespace lcos { namespace detail
31  
{
32  
    template <typename Future>
33  
    struct transfer_result
34  
    {
35  
        template <typename Source, typename Destination>
36  
        void apply(Source&& src, Destination& dest, std::false_type) const
37  
        {
38  
            try {
39  
                dest.set_value(src.get());
40  
            }
41  
            catch (...) {
42  
                dest.set_exception(boost::current_exception());
43  
            }
44  
        }
45  
46  
        template <typename Source, typename Destination>
47  
        void apply(Source&& src, Destination& dest, std::true_type) const
48  
        {
49  
            try {
50  
                src.get();
51  
                dest.set_value(util::unused);
52  
            }
53  
            catch (...) {
54  
                dest.set_exception(boost::current_exception());
55  
            }
56  
        }
57  
58  
        template <typename SourceState, typename DestinationState>
59  
        void operator()(SourceState& src, DestinationState const& dest) const
60  
        {
61  
            typedef std::is_void<
62  
                typename traits::future_traits<Future>::type
63  
            > is_void;
64  
65  
            apply(traits::future_access<Future>::create(src), *dest, is_void());
66  
        }
67  
    };
68  
69  
    template <typename Func, typename Future, typename Continuation>
70  
    void invoke_continuation(Func& func, Future& future, Continuation& cont,
71  
        std::false_type)
72  
    {
73  
        try {
74  
            cont.set_value(func(std::move(future)));
75  
        }
76  
        catch (...) {
77  
            cont.set_exception(boost::current_exception());
78  
        }
79  
    }
80  
81  
    template <typename Func, typename Future, typename Continuation>
82  
    void invoke_continuation(Func& func, Future& future, Continuation& cont,
83  
        std::true_type)
84  
    {
85  
        try {
86  
            func(std::move(future));
87  
            cont.set_value(util::unused);
88  
        }
89  
        catch (...) {
90  
            cont.set_exception(boost::current_exception());
91  
        }
92  
    }
93  
94  
    template <typename Func, typename Future, typename Continuation>
95  
    typename std::enable_if<
96  
        !traits::detail::is_unique_future<
97  
            typename util::result_of<Func(Future)>::type
98  
        >::value
99  
    >::type invoke_continuation(Func& func, Future& future, Continuation& cont)
100  
    {
101  
        typedef std::is_void<
102  
            typename util::result_of<Func(Future)>::type
103  
        > is_void;
104  
105  
        invoke_continuation(func, future, cont, is_void());
106  
    }
107  
108  
    template <typename Func, typename Future, typename Continuation>
109  
    typename std::enable_if<
110  
        traits::detail::is_unique_future<
111  
            typename util::result_of<Func(Future)>::type
112  
        >::value
113  
    >::type invoke_continuation(Func& func, Future& future, Continuation& cont)
114  
    {
115  
        try {
116  
            typedef
117  
                typename util::result_of<Func(Future)>::type
118  
                inner_future;
119  
            typedef
120  
                typename traits::detail::shared_state_ptr_for<inner_future>::type
121  
                inner_shared_state_ptr;
122  
123  
            // take by value, as the future may go away immediately
124  
            inner_shared_state_ptr inner_state =
125  
                traits::detail::get_shared_state(func(std::move(future)));
126  
127  
            if (inner_state.get() == nullptr)
128  
            {
129  
                HPX_THROW_EXCEPTION(no_state,
130  
                    "invoke_continuation",
131  
                    "the inner future has no valid shared state");
132  
            }
133  
134  
            // Bind an on_completed handler to this future which will transfer
135  
            // its result to the new future.
136  
            boost::intrusive_ptr<Continuation> cont_(&cont);
137  
            inner_state->execute_deferred();
138  
            inner_state->set_on_completed(util::bind(
139  
                transfer_result<inner_future>(), inner_state, cont_));
140  
        }
141  
        catch (...) {
142  
            cont.set_exception(boost::current_exception());
143  
        }
144  
     }
145  
146  
    ///////////////////////////////////////////////////////////////////////////
147  
    template <typename ContResult>
148  
    struct continuation_result
149  
    {
150  
        typedef ContResult type;
151  
    };
152  
153  
    template <typename ContResult>
154  
    struct continuation_result<future<ContResult> >
155  
    {
156  
        typedef ContResult type;
157  
    };
158  
159  
    ///////////////////////////////////////////////////////////////////////////
160  
    template <typename Future, typename F, typename ContResult>
161  
    class continuation
162  
      : public future_data<typename continuation_result<ContResult>::type>
163  
    {
164  
    private:
165  
        typedef future_data<ContResult> base_type;
166  
167  
        typedef typename base_type::mutex_type mutex_type;
168  
        typedef typename base_type::result_type result_type;
169  
170  
    protected:
171  
        threads::thread_id_type get_id() const
172  
        {
173  
            std::lock_guard<mutex_type> l(this->mtx_);
174  
            return id_;
175  
        }
176  
        void set_id(threads::thread_id_type const& id)
177  
        {
178  
            std::lock_guard<mutex_type> l(this->mtx_);
179  
            id_ = id;
180  
        }
181  
182  
        struct reset_id
183  
        {
184  
            reset_id(continuation& target)
185  
              : target_(target)
186  
            {
187  
                if (threads::get_self_ptr() != nullptr)
188  
                    target.set_id(threads::get_self_id());
189  
            }
190  
            ~reset_id()
191  
            {
192  
                target_.set_id(threads::invalid_thread_id);
193  
            }
194  
            continuation& target_;
195  
        };
196  
197  
    public:
198  
        template <typename Func>
199  
        continuation(Func && f)
200  
          : started_(false), id_(threads::invalid_thread_id)
201  
          , f_(std::forward<Func>(f))
202  
        {}
203  
204  
        void run_impl(
205  
            typename traits::detail::shared_state_ptr_for<
206  
                Future
207  
            >::type const& f)
208  
        {
209  
            Future future = traits::future_access<Future>::create(f);
210  
            invoke_continuation(f_, future, *this);
211  
        }
212  
213  
        void run(
214  
            typename traits::detail::shared_state_ptr_for<
215  
                Future
216  
            >::type const& f, error_code& ec)
217  
        {
218  
            {
219  
                std::lock_guard<mutex_type> l(this->mtx_);
220  
                if (started_) {
221  
                    HPX_THROWS_IF(ec, task_already_started,
222  
                        "continuation::run",
223  
                        "this task has already been started");
224  
                    return;
225  
                }
226  
                started_ = true;
227  
            }
228  
229  
            run_impl(f);
230  
231  
            if (&ec != &throws)
232  
                ec = make_success_code();
233  
        }
234  
235  
        void run(
236  
            typename traits::detail::shared_state_ptr_for<
237  
                Future
238  
            >::type const& f)
239  
        {
240  
            run(f, throws);
241  
        }
242  
243  
        threads::thread_result_type
244  
        async_impl(
245  
            typename traits::detail::shared_state_ptr_for<
246  
                Future
247  
            >::type const& f)
248  
        {
249  
            reset_id r(*this);
250  
251  
            Future future = traits::future_access<Future>::create(f);
252  
            invoke_continuation(f_, future, *this);
253  
            return threads::thread_result_type(threads::terminated, nullptr);
254  
        }
255  
256  
        void async(
257  
            typename traits::detail::shared_state_ptr_for<
258  
                Future
259  
            >::type const& f,
260  
            error_code& ec)
261  
        {
262  
            {
263  
                std::lock_guard<mutex_type> l(this->mtx_);
264  
                if (started_) {
265  
                    HPX_THROWS_IF(ec, task_already_started,
266  
                        "continuation::async",
267  
                        "this task has already been started");
268  
                    return;
269  
                }
270  
                started_ = true;
271  
            }
272  
273  
            boost::intrusive_ptr<continuation> this_(this);
274  
            threads::thread_result_type (continuation::*async_impl_ptr)(
275  
                typename traits::detail::shared_state_ptr_for<Future>::type const&
276  
            ) = &continuation::async_impl;
277  
278  
            util::thread_description desc(f_, "continuation::async");
279  
            applier::register_thread_plain(
280  
                util::bind(async_impl_ptr, std::move(this_), f), desc);
281  
282  
            if (&ec != &throws)
283  
                ec = make_success_code();
284  
        }
285  
286  
        void async(
287  
            typename traits::detail::shared_state_ptr_for<
288  
                Future
289  
            >::type const& f,
290  
            threads::executor& sched, error_code& ec)
291  
        {
292  
            {
293  
                std::lock_guard<mutex_type> l(this->mtx_);
294  
                if (started_) {
295  
                    HPX_THROWS_IF(ec, task_already_started,
296  
                        "continuation::async",
297  
                        "this task has already been started");
298  
                    return;
299  
                }
300  
                started_ = true;
301  
            }
302  
303  
            boost::intrusive_ptr<continuation> this_(this);
304  
            threads::thread_result_type (continuation::*async_impl_ptr)(
305  
                typename traits::detail::shared_state_ptr_for<Future>::type const&
306  
            ) = &continuation::async_impl;
307  
308  
            util::thread_description desc(f_, "continuation::async");
309  
            sched.add(util::bind(async_impl_ptr, std::move(this_), f), desc);
310  
311  
            if (&ec != &throws)
312  
                ec = make_success_code();
313  
        }
314  
315  
        template <typename Executor>
316  
        void async_exec(
317  
            typename traits::detail::shared_state_ptr_for<
318  
                Future
319  
            >::type const& f,
320  
            Executor& exec, error_code& ec)
321  
        {
322  
            {
323  
                std::lock_guard<mutex_type> l(this->mtx_);
324  
                if (started_) {
325  
                    HPX_THROWS_IF(ec, task_already_started,
326  
                        "continuation::async_exec",
327  
                        "this task has already been started");
328  
                    return;
329  
                }
330  
                started_ = true;
331  
            }
332  
333  
            boost::intrusive_ptr<continuation> this_(this);
334  
            threads::thread_result_type (continuation::*async_impl_ptr)(
335  
                typename traits::detail::shared_state_ptr_for<Future>::type const&
336  
            ) = &continuation::async_impl;
337  
338  
            parallel::executor_traits<Executor>::apply_execute(
339  
                exec, async_impl_ptr, std::move(this_), f);
340  
341  
            if (&ec != &throws)
342  
                ec = make_success_code();
343  
        }
344  
345  
        void async(
346  
            typename traits::detail::shared_state_ptr_for<
347  
                Future
348  
            >::type const& f)
349  
        {
350  
            async(f, throws);
351  
        }
352  
353  
        void async(
354  
            typename traits::detail::shared_state_ptr_for<
355  
                Future
356  
            >::type const& f, threads::executor& sched)
357  
        {
358  
            async(f, sched, throws);
359  
        }
360  
361  
        template <typename Executor>
362  
        void async_exec(
363  
            typename traits::detail::shared_state_ptr_for<
364  
                Future
365  
            >::type const& f,
366  
            Executor& exec)
367  
        {
368  
            async_exec(f, exec, throws);
369  
        }
370  
371  
        // cancellation support
372  
        bool cancelable() const
373  
        {
374  
            return true;
375  
        }
376  
377  
        void cancel()
378  
        {
379  
            std::unique_lock<mutex_type> l(this->mtx_);
380  
            try {
381  
                if (!this->started_)
382  
                    HPX_THROW_THREAD_INTERRUPTED_EXCEPTION();
383  
384  
                if (this->is_ready_locked())
385  
                    return;   // nothing we can do
386  
387  
                if (id_ != threads::invalid_thread_id) {
388  
                    // interrupt the executing thread
389  
                    threads::interrupt_thread(id_);
390  
391  
                    this->started_ = true;
392  
393  
                    l.unlock();
394  
                    this->set_error(future_cancelled,
395  
                        "continuation<Future, ContResult>::cancel",
396  
                        "future has been canceled");
397  
                }
398  
                else {
399  
                    HPX_THROW_EXCEPTION(future_can_not_be_cancelled,
400  
                        "continuation<Future, ContResult>::cancel",
401  
                        "future can't be canceled at this time");
402  
                }
403  
            }
404  
            catch (...) {
405  
                this->started_ = true;
406  
                this->set_exception(boost::current_exception());
407  
                throw;
408  
            }
409  
        }
410  
411  
    public:
412  
        void attach(Future const& future, launch policy)
413  
        {
414  
            typedef
415  
                typename traits::detail::shared_state_ptr_for<Future>::type
416  
                shared_state_ptr;
417  
418  
            // bind an on_completed handler to this future which will invoke
419  
            // the continuation
420  
            boost::intrusive_ptr<continuation> this_(this);
421  
            void (continuation::*cb)(shared_state_ptr const&);
422  
            if (policy & launch::sync)
423  
                cb = &continuation::run;
424  
            else
425  
                cb = &continuation::async;
426  
427  
            shared_state_ptr const& state =
428  
                traits::detail::get_shared_state(future);
429  
            state->execute_deferred();
430  
            state->set_on_completed(util::bind(cb, std::move(this_), state));
431  
        }
432  
433  
        void attach(Future const& future, threads::executor& sched)
434  
        {
435  
            typedef
436  
                typename traits::detail::shared_state_ptr_for<Future>::type
437  
                shared_state_ptr;
438  
439  
            // bind an on_completed handler to this future which will invoke
440  
            // the continuation
441  
            boost::intrusive_ptr<continuation> this_(this);
442  
            void (continuation::*cb)(
443  
                    shared_state_ptr const&, threads::executor&
444  
                ) = &continuation::async;
445  
446  
            shared_state_ptr const& state =
447  
                traits::detail::get_shared_state(future);
448  
            state->execute_deferred();
449  
            state->set_on_completed(util::bind(cb, std::move(this_),
450  
                state, std::ref(sched)));
451  
        }
452  
453  
        template <typename Executor>
454  
        void attach_exec(Future const& future, Executor& exec)
455  
        {
456  
            typedef
457  
                typename traits::detail::shared_state_ptr_for<Future>::type
458  
                shared_state_ptr;
459  
460  
            // bind an on_completed handler to this future which will invoke
461  
            // the continuation
462  
            boost::intrusive_ptr<continuation> this_(this);
463  
            void (continuation::*cb)(shared_state_ptr const&, Executor&) =
464  
                &continuation::async_exec<Executor>;
465  
466  
            shared_state_ptr const& state =
467  
                traits::detail::get_shared_state(future);
468  
            state->execute_deferred();
469  
            state->set_on_completed(util::bind(cb, std::move(this_),
470  
                state, std::ref(exec)));
471  
        }
472  
473  
    protected:
474  
        bool started_;
475  
        threads::thread_id_type id_;
476  
        typename util::decay<F>::type f_;
477  
    };
478  
479  
    ///////////////////////////////////////////////////////////////////////////
480  
    template <typename ContResult, typename Future, typename F>
481  
    inline typename traits::detail::shared_state_ptr<
482  
        typename continuation_result<ContResult>::type
483  
    >::type
484  
    make_continuation(Future const& future, launch policy, F && f)
485  
    {
486  
        typedef detail::continuation<Future, F, ContResult> shared_state;
487  
        typedef typename continuation_result<ContResult>::type result_type;
488  
489  
        // create a continuation
490  
        typename traits::detail::shared_state_ptr<result_type>::type p(
491  
            new shared_state(std::forward<F>(f)));
492  
        static_cast<shared_state*>(p.get())->attach(future, policy);
493  
        return p;
494  
    }
495  
496  
    template <typename ContResult, typename Future, typename F>
497  
    inline typename traits::detail::shared_state_ptr<
498  
        typename continuation_result<ContResult>::type
499  
    >::type
500  
    make_continuation(Future const& future, threads::executor& sched, F && f)
501  
    {
502  
        typedef detail::continuation<Future, F, ContResult> shared_state;
503  
        typedef typename continuation_result<ContResult>::type result_type;
504  
505  
        // create a continuation
506  
        typename traits::detail::shared_state_ptr<result_type>::type p(
507  
            new shared_state(std::forward<F>(f)));
508  
        static_cast<shared_state*>(p.get())->attach(future, sched);
509  
        return p;
510  
    }
511  
512  
    template <typename ContResult, typename Future, typename Executor,
513  
        typename F>
514  
    inline typename traits::detail::shared_state_ptr<
515  
        typename continuation_result<ContResult>::type
516  
    >::type
517  
    make_continuation_exec(Future const& future, Executor& exec, F && f)
518  
    {
519  
        typedef detail::continuation<Future, F, ContResult> shared_state;
520  
        typedef typename continuation_result<ContResult>::type result_type;
521  
522  
        // create a continuation
523  
        typename traits::detail::shared_state_ptr<result_type>::type p(
524  
            new shared_state(std::forward<F>(f)));
525  
        static_cast<shared_state*>(p.get())->attach_exec(future, exec);
526  
        return p;
527  
    }
528  
}}}
529  
530  
///////////////////////////////////////////////////////////////////////////////
531  
namespace hpx { namespace lcos { namespace detail
532  
{
533  
    ///////////////////////////////////////////////////////////////////////////
534  
    template <typename ContResult>
535  
    class unwrap_continuation : public future_data<ContResult>
536  
    {
537  
    private:
538  
        template <typename Inner>
539  
        void on_inner_ready(
540  
            typename traits::detail::shared_state_ptr_for<
541  
                Inner
542  
            >::type const& inner_state)
543  
        {
544  
            try {
545  
                unwrap_continuation* this_ = this;
546  
                transfer_result<Inner>()(inner_state, this_);
547  
            }
548  
            catch (...) {
549  
                this->set_exception(boost::current_exception());
550  
            }
551  
        }
552  
553  
        template <typename Outer>
554  
        void on_outer_ready(
555  
            typename traits::detail::shared_state_ptr_for<
556  
                Outer
557  
            >::type const& outer_state)
558  
        {
559  
            typedef typename traits::future_traits<Outer>::type inner_future;
560  
            typedef
561  
                typename traits::detail::shared_state_ptr_for<inner_future>::type
562  
                inner_shared_state_ptr;
563  
564  
            // Bind an on_completed handler to this future which will transfer
565  
            // its result to the new future.
566  
            boost::intrusive_ptr<unwrap_continuation> this_(this);
567  
            void (unwrap_continuation::*inner_ready)(
568  
                inner_shared_state_ptr const&) =
569  
                    &unwrap_continuation::on_inner_ready<inner_future>;
570  
571  
            try {
572  
                // if we get here, this future is ready
573  
                Outer outer = traits::future_access<Outer>::create(outer_state);
574  
575  
                // take by value, as the future will go away immediately
576  
                inner_shared_state_ptr inner_state =
577  
                    traits::detail::get_shared_state(outer.get());
578  
579  
                if (inner_state.get() == nullptr)
580  
                {
581  
                    HPX_THROW_EXCEPTION(no_state,
582  
                        "unwrap_continuation<ContResult>::on_outer_ready",
583  
                        "the inner future has no valid shared state");
584  
                }
585  
586  
                inner_state->execute_deferred();
587  
                inner_state->set_on_completed(
588  
                    util::bind(inner_ready, std::move(this_), inner_state));
589  
            }
590  
            catch (...) {
591  
                this->set_exception(boost::current_exception());
592  
            }
593  
        }
594  
595  
    public:
596  
        template <typename Future>
597  
        void attach(Future& future)
598  
        {
599  
            typedef
600  
                typename traits::detail::shared_state_ptr_for<Future>::type
601  
                outer_shared_state_ptr;
602  
603  
            // Bind an on_completed handler to this future which will wait for
604  
            // the inner future and will transfer its result to the new future.
605  
            boost::intrusive_ptr<unwrap_continuation> this_(this);
606  
            void (unwrap_continuation::*outer_ready)(
607  
                outer_shared_state_ptr const&) =
608  
                    &unwrap_continuation::on_outer_ready<Future>;
609  
610  
            outer_shared_state_ptr const& outer_state =
611  
                traits::detail::get_shared_state(future);
612  
            outer_state->execute_deferred();
613  
            outer_state->set_on_completed(
614  
                util::bind(outer_ready, std::move(this_), outer_state));
615  
        }
616  
    };
617  
618  
    template <typename Future>
619  
    inline typename traits::detail::shared_state_ptr<
620  
        typename future_unwrap_result<Future>::result_type>::type
621  
    unwrap(Future&& future, error_code& ec)
622  
    {
623  
        typedef typename future_unwrap_result<Future>::result_type result_type;
624  
        typedef detail::unwrap_continuation<result_type> shared_state;
625  
626  
        // create a continuation
627  
        typename traits::detail::shared_state_ptr<result_type>::type p(
628  
            new shared_state());
629  
        static_cast<shared_state*>(p.get())->attach(future);
630  
        return p;
631  
    }
632  
}}}
633  
634  
///////////////////////////////////////////////////////////////////////////////
635  
namespace hpx { namespace lcos { namespace detail
636  
{
637  
    class void_continuation : public future_data<void>
638  
    {
639  
    private:
640  
        template <typename Future>
641  
        void on_ready(
642  
            typename traits::detail::shared_state_ptr_for<
643  
                Future
644  
            >::type const& state)
645  
        {
646  
            try {
647  
                (void)state->get_result();
648  
                this->set_value(util::unused);
649  
            }
650  
            catch (...) {
651  
                this->set_exception(boost::current_exception());
652  
            }
653  
        }
654  
655  
    public:
656  
        template <typename Future>
657  
        void attach(Future& future)
658  
        {
659  
            typedef
660  
                typename traits::detail::shared_state_ptr_for<Future>::type
661  
                shared_state_ptr;
662  
663  
            // Bind an on_completed handler to this future which will wait for
664  
            // the inner future and will transfer its result to the new future.
665  
            boost::intrusive_ptr<void_continuation> this_(this);
666  
            void (void_continuation::*ready)(shared_state_ptr const&) =
667  
                &void_continuation::on_ready<Future>;
668  
669  
            shared_state_ptr const& state =
670  
                traits::detail::get_shared_state(future);
671  
            state->execute_deferred();
672  
            state->set_on_completed(util::bind(ready, std::move(this_), state));
673  
        }
674  
    };
675  
676  
    template <typename Future>
677  
    inline typename traits::detail::shared_state_ptr<void>::type
678  
    make_void_continuation(Future& future)
679  
    {
680  
        typedef detail::void_continuation void_shared_state;
681  
682  
        // create a continuation
683  
        typename traits::detail::shared_state_ptr<void>::type p(
684  
            new void_shared_state());
685  
        static_cast<void_shared_state*>(p.get())->attach(future);
686  
        return p;
687  
    }
688  
}}}
689  
690  
#endif
691  

Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.