/home/users/khuck/src/hpx-lsu/hpx/lcos/local/futures_factory.hpp

Line% of fetchesSource
1  
//  Copyright (c) 2007-2016 Hartmut Kaiser
2  
//
3  
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
6  
#ifndef HPX_LCOS_LOCAL_FUTURES_FACTORY_HPP
7  
#define HPX_LCOS_LOCAL_FUTURES_FACTORY_HPP
8  
9  
#include <hpx/config.hpp>
10  
#include <hpx/lcos/detail/future_data.hpp>
11  
#include <hpx/lcos/future.hpp>
12  
#include <hpx/runtime/launch_policy.hpp>
13  
#include <hpx/runtime/threads/thread_data_fwd.hpp>
14  
#include <hpx/runtime/threads/thread_enums.hpp>
15  
#include <hpx/runtime/threads/thread_helpers.hpp>
16  
#include <hpx/throw_exception.hpp>
17  
#include <hpx/traits/future_access.hpp>
18  
#include <hpx/util/deferred_call.hpp>
19  
#include <hpx/util/thread_description.hpp>
20  
21  
#include <boost/exception_ptr.hpp>
22  
#include <boost/intrusive_ptr.hpp>
23  
24  
#include <cstddef>
25  
#include <type_traits>
26  
#include <utility>
27  
28  
namespace hpx { namespace lcos { namespace local
29  
{
30  
    namespace detail
31  
    {
32  
        ///////////////////////////////////////////////////////////////////////
33  
        template <typename Result, typename F,
34  
            typename Base = lcos::detail::task_base<Result> >
35  
        struct task_object : Base
36  
        {
37  
            typedef Base base_type;
38  
            typedef typename Base::result_type result_type;
39  
40  
            F f_;
41  
42  
            task_object(F const& f)
43  
              : f_(f)
44  
            {}
45  
46  
            task_object(F&& f)
47  
              : f_(std::move(f))
48  
            {}
49  
50  
            task_object(threads::executor& sched, F const& f)
51  
              : base_type(sched), f_(f)
52  
            {}
53  
54  
            task_object(threads::executor& sched, F&& f)
55  
              : base_type(sched), f_(std::move(f))
56  
            {}
57  
58  
            void do_run()
59  
            {
60  
                return do_run_impl(std::is_void<Result>());
61  
            }
62  
63  
        private:
64  
            void do_run_impl(/*is_void=*/std::false_type)
65  
            {
66  
                try {
67  
                    this->set_value(f_());
68  
                }
69  
                catch(...) {
70  
                    this->set_exception(boost::current_exception());
71  
                }
72  
            }
73  
74  
            void do_run_impl(/*is_void=*/std::true_type)
75  
            {
76  
                try {
77  
                    f_();
78  
                    this->set_value(result_type());
79  
                }
80  
                catch(...) {
81  
                    this->set_exception(boost::current_exception());
82  
                }
83  
            }
84  
85  
        protected:
86  
            // run in a separate thread
87  
            threads::thread_id_type apply(launch policy,
88  
                threads::thread_priority priority,
89  
                threads::thread_stacksize stacksize, error_code& ec)
90  
            {
91  
                this->check_started();
92  
93  
                typedef typename Base::future_base_type future_base_type;
94  
                future_base_type this_(this);
95  
96  
                if (this->sched_) {
97  
                    this->sched_->add(
98  
                        util::deferred_call(&base_type::run_impl, std::move(this_)),
99  
                        util::thread_description(f_),
100  
                        threads::pending, false, stacksize, ec);
101  
                    return threads::invalid_thread_id;
102  
                }
103  
                else if (policy == launch::fork) {
104  
                    return threads::register_thread_nullary(
105  
                        util::deferred_call(&base_type::run_impl, std::move(this_)),
106  
                        util::thread_description(f_),
107  
                        threads::pending_do_not_schedule, true,
108  
                        threads::thread_priority_boost,
109  
                        get_worker_thread_num(), stacksize, ec);
110  
                }
111  
                else {
112  
                    threads::register_thread_nullary(
113  
                        util::deferred_call(&base_type::run_impl, std::move(this_)),
114  
                        util::thread_description(f_),
115  
                        threads::pending, false, priority, std::size_t(-1),
116  
                        stacksize, ec);
117  
                    return threads::invalid_thread_id;
118  
                }
119  
            }
120  
        };
121  
122  
        template <typename Result, typename F>
123  
        struct cancelable_task_object
124  
          : task_object<Result, F, lcos::detail::cancelable_task_base<Result> >
125  
        {
126  
            typedef task_object<
127  
                    Result, F, lcos::detail::cancelable_task_base<Result>
128  
                > base_type;
129  
            typedef typename base_type::result_type result_type;
130  
131  
            cancelable_task_object(F const& f)
132  
              : base_type(f)
133  
            {}
134  
135  
            cancelable_task_object(F&& f)
136  
              : base_type(std::move(f))
137  
            {}
138  
139  
            cancelable_task_object(threads::executor& sched, F const& f)
140  
              : base_type(sched, f)
141  
            {}
142  
143  
            cancelable_task_object(threads::executor& sched, F&& f)
144  
              : base_type(sched, std::move(f))
145  
            {}
146  
        };
147  
    }
148  
149  
    ///////////////////////////////////////////////////////////////////////////
150  
    // The futures_factory is very similar to a packaged_task except that it
151  
    // allows for the owner to go out of scope before the future becomes ready.
152  
    // We provide this class to avoid semantic differences to the C++11
153  
    // std::packaged_task, while otoh it is a very convenient way for us to
154  
    // implement hpx::async.
155  
    template <typename Func, bool Cancelable = false>
156  
    class futures_factory;
157  
158  
    namespace detail
159  
    {
160  
        template <typename Result, bool Cancelable>
161  
        struct create_task_object
162  
        {
163  
            typedef boost::intrusive_ptr<lcos::detail::task_base<Result> >
164  
                return_type;
165  
166  
            template <typename F>
167  
            static return_type call(threads::executor& sched, F&& f)
168  
            {
169  
                return new task_object<Result, F>(sched, std::forward<F>(f));
170  
            }
171  
172  
            template <typename R>
173  
            static return_type call(threads::executor& sched, R (*f)())
174  
            {
175  
                return new task_object<Result, Result (*)()>(sched, f);
176  
            }
177  
178  
            template <typename F>
179  
            static return_type call(F&& f)
180  
            {
181  
                return new task_object<Result, F>(std::forward<F>(f));
182  
            }
183  
184  
            template <typename R>
185  
            static return_type call(R (*f)())
186  
            {
187  
                return new task_object<Result, Result (*)()>(f);
188  
            }
189  
        };
190  
191  
        template <typename Result>
192  
        struct create_task_object<Result, true>
193  
        {
194  
            typedef boost::intrusive_ptr<lcos::detail::task_base<Result> >
195  
                return_type;
196  
197  
            template <typename F>
198  
            static return_type call(threads::executor& sched, F&& f)
199  
            {
200  
                return new cancelable_task_object<Result, F>(
201  
                    sched, std::forward<F>(f));
202  
            }
203  
204  
            template <typename R>
205  
            static return_type call(threads::executor& sched, R (*f)())
206  
            {
207  
                return new cancelable_task_object<Result, Result (*)()>(sched, f);
208  
            }
209  
210  
            template <typename F>
211  
            static return_type call(F&& f)
212  
            {
213  
                return new cancelable_task_object<Result, F>(std::forward<F>(f));
214  
            }
215  
216  
            template <typename R>
217  
            static return_type call(R (*f)())
218  
            {
219  
                return new cancelable_task_object<Result, Result (*)()>(f);
220  
            }
221  
        };
222  
    }
223  
224  
    template <typename Result, bool Cancelable>
225  
    class futures_factory<Result(), Cancelable>
226  
    {
227  
    protected:
228  
        typedef lcos::detail::task_base<Result> task_impl_type;
229  
230  
    private:
231  
        HPX_MOVABLE_ONLY(futures_factory);
232  
233  
    public:
234  
        // construction and destruction
235  
        futures_factory()
236  
          : future_obtained_(false)
237  
        {}
238  
239  
        template <typename F>
240  
        explicit futures_factory(threads::executor& sched, F&& f)
241  
          : task_(detail::create_task_object<Result, Cancelable>::call(
242  
                sched, std::forward<F>(f))),
243  
            future_obtained_(false)
244  
        {}
245  
246  
        explicit futures_factory(threads::executor& sched, Result (*f)())
247  
          : task_(detail::create_task_object<Result, Cancelable>::call(sched, f)),
248  
            future_obtained_(false)
249  
        {}
250  
251  
        template <typename F>
252  
        explicit futures_factory(F&& f)
253  
          : task_(detail::create_task_object<Result, Cancelable>::call(
254  
                std::forward<F>(f))),
255  
            future_obtained_(false)
256  
        {}
257  
258  
        explicit futures_factory(Result (*f)())
259  
          : task_(detail::create_task_object<Result, Cancelable>::call(f)),
260  
            future_obtained_(false)
261  
        {}
262  
263  
        ~futures_factory()
264  
        {}
265  
266  
        futures_factory(futures_factory&& rhs)
267  
          : task_(std::move(rhs.task_)),
268  
            future_obtained_(rhs.future_obtained_)
269  
        {
270  
            rhs.task_.reset();
271  
            rhs.future_obtained_ = false;
272  
        }
273  
274  
        futures_factory& operator=(futures_factory&& rhs)
275  
        {
276  
            if (this != &rhs) {
277  
                task_ = std::move(rhs.task_);
278  
                future_obtained_ = rhs.future_obtained_;
279  
280  
                rhs.task_.reset();
281  
                rhs.future_obtained_ = false;
282  
            }
283  
            return *this;
284  
        }
285  
286  
        // synchronous execution
287  
        void operator()() const
288  
        {
289  
            if (!task_) {
290  
                HPX_THROW_EXCEPTION(task_moved,
291  
                    "futures_factory<Result()>::operator()",
292  
                    "futures_factory invalid (has it been moved?)");
293  
                return;
294  
            }
295  
            task_->run();
296  
        }
297  
298  
        // asynchronous execution
299  
        threads::thread_id_type apply(
300  
            launch policy = launch::async,
301  
            threads::thread_priority priority = threads::thread_priority_default,
302  
            threads::thread_stacksize stacksize = threads::thread_stacksize_default,
303  
            error_code& ec = throws) const
304  
        {
305  
            if (!task_) {
306  
                HPX_THROW_EXCEPTION(task_moved,
307  
                    "futures_factory<Result()>::apply()",
308  
                    "futures_factory invalid (has it been moved?)");
309  
                return threads::invalid_thread_id;
310  
            }
311  
            return task_->apply(policy, priority, stacksize, ec);
312  
        }
313  
314  
        // Result retrieval
315  
        lcos::future<Result> get_future(error_code& ec = throws)
316  
        {
317  
            if (!task_) {
318  
                HPX_THROWS_IF(ec, task_moved,
319  
                    "futures_factory<Result()>::get_future",
320  
                    "futures_factory invalid (has it been moved?)");
321  
                return lcos::future<Result>();
322  
            }
323  
            if (future_obtained_) {
324  
                HPX_THROWS_IF(ec, future_already_retrieved,
325  
                    "futures_factory<Result()>::get_future",
326  
                    "future already has been retrieved from this promise");
327  
                return lcos::future<Result>();
328  
            }
329  
330  
            future_obtained_ = true;
331  
332  
            using traits::future_access;
333  
            return future_access<future<Result> >::create(task_);
334  
        }
335  
336  
        bool valid() const HPX_NOEXCEPT
337  
        {
338  
            return !!task_;
339  
        }
340  
341  
    protected:
342  
        boost::intrusive_ptr<task_impl_type> task_;
343  
        bool future_obtained_;
344  
    };
345  
}}}
346  
347  
#endif /*HPX_LCOS_LOCAL_FUTURES_FACTORY_HPP*/
348  

Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.