/home/users/khuck/src/hpx-lsu/hpx/lcos/detail/future_data.hpp

Line% of fetchesSource
1  
//  Copyright (c) 2007-2016 Hartmut Kaiser
2  
//
3  
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
6  
#if !defined(HPX_LCOS_DETAIL_FUTURE_DATA_MAR_06_2012_1055AM)
7  
#define HPX_LCOS_DETAIL_FUTURE_DATA_MAR_06_2012_1055AM
8  
9  
#include <hpx/config.hpp>
10  
#include <hpx/error_code.hpp>
11  
#include <hpx/lcos/local/detail/condition_variable.hpp>
12  
#include <hpx/lcos/local/spinlock.hpp>
13  
#include <hpx/runtime/get_worker_thread_num.hpp>
14  
#include <hpx/runtime/launch_policy.hpp>
15  
#include <hpx/runtime/threads/thread_executor.hpp>
16  
#include <hpx/runtime/threads/thread_helpers.hpp>
17  
#include <hpx/runtime/threads/coroutines/detail/get_stack_pointer.hpp>
18  
#include <hpx/throw_exception.hpp>
19  
#include <hpx/traits/get_remote_result.hpp>
20  
#include <hpx/util/atomic_count.hpp>
21  
#include <hpx/util/bind.hpp>
22  
#include <hpx/util/decay.hpp>
23  
#include <hpx/util/deferred_call.hpp>
24  
#include <hpx/util/steady_clock.hpp>
25  
#include <hpx/util/unique_function.hpp>
26  
#include <hpx/util/unused.hpp>
27  
28  
#include <boost/exception_ptr.hpp>
29  
#include <boost/intrusive_ptr.hpp>
30  
31  
#include <chrono>
32  
#include <cstddef>
33  
#include <functional>
34  
#include <memory>
35  
#include <mutex>
36  
#include <type_traits>
37  
#include <utility>
38  
39  
///////////////////////////////////////////////////////////////////////////////
40  
namespace hpx { namespace lcos
41  
{
42  
    enum class future_status
43  
    {
44  
        ready, timeout, deferred, uninitialized
45  
    };
46  
}}
47  
48  
///////////////////////////////////////////////////////////////////////////////
49  
namespace hpx { namespace lcos
50  
{
51  
52  
namespace detail
53  
{
54  
    template <typename Result> struct future_data;
55  
56  
    ///////////////////////////////////////////////////////////////////////
57  
    struct future_data_refcnt_base;
58  
59  
    void intrusive_ptr_add_ref(future_data_refcnt_base* p);
60  
    void intrusive_ptr_release(future_data_refcnt_base* p);
61  
62  
    ///////////////////////////////////////////////////////////////////////
63  
    struct future_data_refcnt_base
64  
    {
65  
    private:
66  
        typedef util::unique_function_nonser<void()> completed_callback_type;
67  
    public:
68  
        typedef void has_future_data_refcnt_base;
69  
70  
        virtual ~future_data_refcnt_base() {}
71  
72  
        virtual void set_on_completed(completed_callback_type) = 0;
73  
74  
        virtual bool requires_delete()
75  
        {
76  
            return 0 == --count_;
77  
        }
78  
79  
        virtual void destroy()
80  
        {
81  
            delete this;
82  
        }
83  
84  
    protected:
85  
        future_data_refcnt_base() : count_(0) {}
86  
87  
        // reference counting
88  
        friend void intrusive_ptr_add_ref(future_data_refcnt_base* p);
89  
        friend void intrusive_ptr_release(future_data_refcnt_base* p);
90  
91  
        util::atomic_count count_;
92  
    };
93  
94  
    /// support functions for boost::intrusive_ptr
95  
    inline void intrusive_ptr_add_ref(future_data_refcnt_base* p)
96  
    {
97  
        ++p->count_;
98  
    }
99  
    inline void intrusive_ptr_release(future_data_refcnt_base* p)
100  
    {
101  
        if (p->requires_delete())
102  
            p->destroy();
103  
    }
104  
105  
    ///////////////////////////////////////////////////////////////////////////
106  
    template <typename Result>
107  
    struct future_data_result
108  
    {
109  
        typedef Result type;
110  
111  
        template <typename U>
112  
        HPX_FORCEINLINE static
113  
        U && set(U && u)
114  
        {
115  
            return std::forward<U>(u);
116  
        }
117  
    };
118  
119  
    template <typename Result>
120  
    struct future_data_result<Result&>
121  
    {
122  
        typedef Result* type;
123  
124  
        HPX_FORCEINLINE static
125  
        Result* set(Result* u)
126  
        {
127  
            return u;
128  
        }
129  
130  
        HPX_FORCEINLINE static
131  
        Result* set(Result& u)
132  
        {
133  
            return &u;
134  
        }
135  
    };
136  
137  
    template <>
138  
    struct future_data_result<void>
139  
    {
140  
        typedef util::unused_type type;
141  
142  
        HPX_FORCEINLINE static
143  
        util::unused_type set(util::unused_type u)
144  
        {
145  
            return u;
146  
        }
147  
    };
148  
149  
    ///////////////////////////////////////////////////////////////////////////
150  
    template <typename R>
151  
    struct future_data_storage
152  
    {
153  
        typedef typename future_data_result<R>::type value_type;
154  
        typedef boost::exception_ptr error_type;
155  
156  
        // determine the required alignment, define aligned storage of proper
157  
        // size
158  
        HPX_STATIC_CONSTEXPR std::size_t max_alignment =
159  
            (std::alignment_of<value_type>::value >
160  
             std::alignment_of<error_type>::value) ?
161  
            std::alignment_of<value_type>::value
162  
          : std::alignment_of<error_type>::value;
163  
164  
        HPX_STATIC_CONSTEXPR std::size_t max_size =
165  
                (sizeof(value_type) > sizeof(error_type)) ?
166  
                    sizeof(value_type) : sizeof(error_type);
167  
168  
        typedef typename std::aligned_storage<max_size, max_alignment>::type type;
169  
    };
170  
171  
    ///////////////////////////////////////////////////////////////////////////
172  
    template <typename F1, typename F2>
173  
    class compose_cb_impl
174  
    {
175  
        HPX_MOVABLE_ONLY(compose_cb_impl);
176  
177  
    public:
178  
        template <typename A1, typename A2>
179  
        compose_cb_impl(A1 && f1, A2 && f2)
180  
          : f1_(std::forward<A1>(f1))
181  
          , f2_(std::forward<A2>(f2))
182  
        {}
183  
184  
        compose_cb_impl(compose_cb_impl&& other)
185  
          : f1_(std::move(other.f1_))
186  
          , f2_(std::move(other.f2_))
187  
        {}
188  
189  
        void operator()() const
190  
        {
191  
            f1_();
192  
            f2_();
193  
        }
194  
195  
    private:
196  
        F1 f1_;
197  
        F2 f2_;
198  
    };
199  
200  
    template <typename F1, typename F2>
201  
    static HPX_FORCEINLINE util::unique_function_nonser<void()>
202  
    compose_cb(F1 && f1, F2 && f2)
203  
    {
204  
        if (!f1)
205  
            return std::forward<F2>(f2);
206  
        else if (!f2)
207  
            return std::forward<F1>(f1);
208  
209  
        // otherwise create a combined callback
210  
        typedef compose_cb_impl<
211  
            typename util::decay<F1>::type, typename util::decay<F2>::type
212  
        > result_type;
213  
        return result_type(std::forward<F1>(f1), std::forward<F2>(f2));
214  
    }
215  
216  
    ///////////////////////////////////////////////////////////////////////////
217  
    struct handle_continuation_recursion_count
218  
    {
219  
        handle_continuation_recursion_count()
220  
          : count_(threads::get_continuation_recursion_count())
221  
        {
222  
            ++count_;
223  
        }
224  
        ~handle_continuation_recursion_count()
225  
        {
226  
            --count_;
227  
        }
228  
229  
        std::size_t& count_;
230  
    };
231  
232  
    ///////////////////////////////////////////////////////////////////////////
233  
    HPX_EXPORT bool run_on_completed_on_new_thread(
234  
        util::unique_function_nonser<bool()> && f, error_code& ec);
235  
236  
    ///////////////////////////////////////////////////////////////////////////
237  
    template <typename Result>
238  
    struct future_data : future_data_refcnt_base
239  
    {
240  
        HPX_NON_COPYABLE(future_data);
241  
242  
        typedef typename future_data_result<Result>::type result_type;
243  
        typedef util::unique_function_nonser<void()> completed_callback_type;
244  
        typedef lcos::local::spinlock mutex_type;
245  
246  
        enum state
247  
        {
248  
            empty = 0,
249  
            ready = 1,
250  
            value = 2 | ready,
251  
            exception = 4 | ready
252  
        };
253  
254  
    public:
255  
        future_data()
256  
          : state_(empty)
257  
        {}
258  
259  
        ~future_data()
260  
        {
261  
            reset();
262  
        }
263  
264  
        virtual void execute_deferred(error_code& ec = throws) {}
265  
266  
        // cancellation is disabled by default
267  
        virtual bool cancelable() const
268  
        {
269  
            return false;
270  
        }
271  
        virtual void cancel()
272  
        {
273  
            HPX_THROW_EXCEPTION(future_does_not_support_cancellation,
274  
                "future_data::cancel",
275  
                "this future does not support cancellation");
276  
        }
277  
278  
        /// Get the result of the requested action. This call blocks (yields
279  
        /// control) if the result is not ready. As soon as the result has been
280  
        /// returned and the waiting thread has been re-scheduled by the thread
281  
        /// manager the function will return.
282  
        ///
283  
        /// \param ec     [in,out] this represents the error status on exit,
284  
        ///               if this is pre-initialized to \a hpx#throws
285  
        ///               the function will throw on error instead. If the
286  
        ///               operation blocks and is aborted because the object
287  
        ///               went out of scope, the code \a hpx#yield_aborted is
288  
        ///               set or thrown.
289  
        ///
290  
        /// \note         If there has been an error reported (using the action
291  
        ///               \a base_lco#set_exception), this function will throw an
292  
        ///               exception encapsulating the reported error code and
293  
        ///               error description if <code>&ec == &throws</code>.
294  
        virtual result_type* get_result(error_code& ec = throws)
295  
        {
296  
            // yields control if needed
297  
            wait(ec);
298  
            if (ec) return nullptr;
299  
300  
            // No locking is required. Once a future has been made ready, which
301  
            // is a postcondition of wait, either:
302  
            //
303  
            // - there is only one writer (future), or
304  
            // - there are multiple readers only (shared_future, lock hurts
305  
            //   concurrency)
306  
307  
            if (state_ == empty) {
308  
                // the value has already been moved out of this future
309  
                HPX_THROWS_IF(ec, no_state,
310  
                    "future_data::get_result",
311  
                    "this future has no valid shared state");
312  
                return nullptr;
313  
            }
314  
315  
            // the thread has been re-activated by one of the actions
316  
            // supported by this promise (see promise::set_event
317  
            // and promise::set_exception).
318  
            if (state_ == exception)
319  
            {
320  
                boost::exception_ptr* exception_ptr =
321  
                    reinterpret_cast<boost::exception_ptr*>(&storage_);
322  
                // an error has been reported in the meantime, throw or set
323  
                // the error code
324  
                if (&ec == &throws) {
325  
                    boost::rethrow_exception(*exception_ptr);
326  
                    // never reached
327  
                }
328  
                else {
329  
                    ec = make_error_code(*exception_ptr);
330  
                }
331  
                return nullptr;
332  
            }
333  
            return reinterpret_cast<result_type*>(&storage_);
334  
        }
335  
336  
        // deferred execution of a given continuation
337  
        bool run_on_completed(completed_callback_type && on_completed,
338  
            boost::exception_ptr& ptr)
339  
        {
340  
            try {
341  
                on_completed();
342  
            }
343  
            catch (...) {
344  
                ptr = boost::current_exception();
345  
                return false;
346  
            }
347  
            return true;
348  
        }
349  
350  
        // make sure continuation invocation does not recurse deeper than
351  
        // allowed
352  
        void handle_on_completed(completed_callback_type && on_completed)
353  
        {
354  
            // We need to run the completion on a new thread if we are on a
355  
            // non HPX thread.
356  
            bool recurse_asynchronously = hpx::threads::get_self_ptr() == nullptr;
357  
#if defined(HPX_HAVE_THREADS_GET_STACK_POINTER)
358  
            recurse_asynchronously =
359  
                !this_thread::has_sufficient_stack_space();
360  
#else
361  
            handle_continuation_recursion_count cnt;
362  
            recurse_asynchronously = recurse_asynchronously ||
363  
                cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH;
364  
#endif
365  
            if (!recurse_asynchronously)
366  
            {
367  
                // directly execute continuation on this thread
368  
                on_completed();
369  
            }
370  
            else
371  
            {
372  
                // re-spawn continuation on a new thread
373  
                boost::intrusive_ptr<future_data> this_(this);
374  
375  
                error_code ec(lightweight);
376  
                boost::exception_ptr ptr;
377  
                if (!run_on_completed_on_new_thread(
378  
                        util::deferred_call(&future_data::run_on_completed,
379  
                            std::move(this_), std::move(on_completed),
380  
                            std::ref(ptr)),
381  
                        ec))
382  
                {
383  
                    // thread creation went wrong
384  
                    if (ec) {
385  
                        set_exception(hpx::detail::access_exception(ec));
386  
                        return;
387  
                    }
388  
389  
                    // re-throw exception in this context
390  
                    HPX_ASSERT(ptr);        // exception should have been set
391  
                    boost::rethrow_exception(ptr);
392  
                }
393  
            }
394  
        }
395  
396  
        /// Set the result of the requested action.
397  
        template <typename Target>
398  
        void set_value(Target && data, error_code& ec = throws)
399  
        {
400  
            std::unique_lock<mutex_type> l(this->mtx_);
401  
402  
            // check whether the data has already been set
403  
            if (is_ready_locked()) {
404  
                l.unlock();
405  
                HPX_THROWS_IF(ec, promise_already_satisfied,
406  
                    "future_data::set_value",
407  
                    "data has already been set for this future");
408  
                return;
409  
            }
410  
411  
            completed_callback_type on_completed;
412  
413  
            on_completed = std::move(this->on_completed_);
414  
415  
            // set the data
416  
            result_type* value_ptr =
417  
                reinterpret_cast<result_type*>(&storage_);
418  
            ::new ((void*)value_ptr) result_type(
419  
                future_data_result<Result>::set(std::forward<Target>(data)));
420  
            state_ = value;
421  
422  
            // handle all threads waiting for the future to become ready
423  
            cond_.notify_all(std::move(l), ec);
424  
425  
            // Note: cv.notify_all() above 'consumes' the lock 'l' and leaves
426  
            //       it unlocked when returning.
427  
428  
            // invoke the callback (continuation) function
429  
            if (on_completed)
430  
                handle_on_completed(std::move(on_completed));
431  
        }
432  
433  
        template <typename Target>
434  
        void set_exception(Target && data, error_code& ec = throws)
435  
        {
436  
            std::unique_lock<mutex_type> l(this->mtx_);
437  
438  
            // check whether the data has already been set
439  
            if (is_ready_locked()) {
440  
                l.unlock();
441  
                HPX_THROWS_IF(ec, promise_already_satisfied,
442  
                    "future_data::set_exception",
443  
                    "data has already been set for this future");
444  
                return;
445  
            }
446  
447  
            completed_callback_type on_completed;
448  
449  
            on_completed = std::move(this->on_completed_);
450  
451  
            // set the data
452  
            boost::exception_ptr* exception_ptr =
453  
                reinterpret_cast<boost::exception_ptr*>(&storage_);
454  
            ::new ((void*)exception_ptr) boost::exception_ptr(
455  
                std::forward<Target>(data));
456  
            state_ = exception;
457  
458  
            // handle all threads waiting for the future to become ready
459  
            cond_.notify_all(std::move(l), ec);
460  
461  
            // Note: cv.notify_all() above 'consumes' the lock 'l' and leaves
462  
            //       it unlocked when returning.
463  
464  
            // invoke the callback (continuation) function
465  
            if (on_completed)
466  
                handle_on_completed(std::move(on_completed));
467  
        }
468  
469  
        // helper functions for setting data (if successful) or the error (if
470  
        // non-successful)
471  
        template <typename T>
472  
        void set_data(T && result)
473  
        {
474  
            // set the received result, reset error status
475  
            try {
476  
                typedef typename util::decay<T>::type naked_type;
477  
478  
                typedef traits::get_remote_result<
479  
                    result_type, naked_type
480  
                > get_remote_result_type;
481  
482  
                // store the value
483  
                set_value(std::move(get_remote_result_type::call(
484  
                        std::forward<T>(result))));
485  
            }
486  
            catch (...) {
487  
                // store the error instead
488  
                return set_exception(boost::current_exception());
489  
            }
490  
        }
491  
492  
        // trigger the future with the given error condition
493  
        void set_error(error e, char const* f, char const* msg)
494  
        {
495  
            try {
496  
                HPX_THROW_EXCEPTION(e, f, msg);
497  
            }
498  
            catch (...) {
499  
                // store the error code
500  
                set_exception(boost::current_exception());
501  
            }
502  
        }
503  
504  
        /// Reset the promise to allow to restart an asynchronous
505  
        /// operation. Allows any subsequent set_data operation to succeed.
506  
        void reset(error_code& /*ec*/ = throws)
507  
        {
508  
            // no locking is required as semantics guarantee a single writer
509  
            // and no reader
510  
511  
            // release any stored data and callback functions
512  
            switch (state_) {
513  
            case value:
514  
            {
515  
                result_type* value_ptr =
516  
                    reinterpret_cast<result_type*>(&storage_);
517  
                value_ptr->~result_type();
518  
                break;
519  
            }
520  
            case exception:
521  
            {
522  
                boost::exception_ptr* exception_ptr =
523  
                    reinterpret_cast<boost::exception_ptr*>(&storage_);
524  
                exception_ptr->~exception_ptr();
525  
                break;
526  
            }
527  
            default: break;
528  
            }
529  
530  
            state_ = empty;
531  
            on_completed_ = completed_callback_type();
532  
        }
533  
534  
        // continuation support
535  
536  
        /// Set the callback which needs to be invoked when the future becomes
537  
        /// ready. If the future is ready the function will be invoked
538  
        /// immediately.
539  
        void set_on_completed(completed_callback_type data_sink)
540  
        {
541  
            if (!data_sink) return;
542  
543  
            std::unique_lock<mutex_type> l(this->mtx_);
544  
545  
            if (is_ready_locked()) {
546  
547  
                HPX_ASSERT(!on_completed_);
548  
549  
                // invoke the callback (continuation) function right away
550  
                l.unlock();
551  
552  
                handle_on_completed(std::move(data_sink));
553  
            }
554  
            else {
555  
                // store a combined callback wrapping the old and the new one
556  
                this->on_completed_ = compose_cb(
557  
                    std::move(data_sink), std::move(on_completed_));
558  
            }
559  
        }
560  
561  
        virtual void wait(error_code& ec = throws)
562  
        {
563  
            std::unique_lock<mutex_type> l(mtx_);
564  
565  
            // block if this entry is empty
566  
            if (state_ == empty) {
567  
                cond_.wait(l, "future_data::wait", ec);
568  
                if (ec) return;
569  
            }
570  
571  
            if (&ec != &throws)
572  
                ec = make_success_code();
573  
        }
574  
575  
        virtual future_status
576  
        wait_until(util::steady_clock::time_point const& abs_time,
577  
            error_code& ec = throws)
578  
        {
579  
            std::unique_lock<mutex_type> l(mtx_);
580  
581  
            // block if this entry is empty
582  
            if (state_ == empty) {
583  
                threads::thread_state_ex_enum const reason =
584  
                    cond_.wait_until(l, abs_time,
585  
                        "future_data::wait_until", ec);
586  
                if (ec) return future_status::uninitialized;
587  
588  
                if (reason == threads::wait_timeout)
589  
                    return future_status::timeout;
590  
591  
                return future_status::ready;
592  
            }
593  
594  
            if (&ec != &throws)
595  
                ec = make_success_code();
596  
597  
            return future_status::ready; //-V110
598  
        }
599  
600  
        /// Return whether or not the data is available for this
601  
        /// \a future.
602  
        bool is_ready() const
603  
        {
604  
            std::unique_lock<mutex_type> l(mtx_);
605  
            return is_ready_locked();
606  
        }
607  
608  
        bool is_ready_locked() const
609  
        {
610  
            return (state_ & ready) != 0;
611  
        }
612  
613  
        bool has_value() const
614  
        {
615  
            std::unique_lock<mutex_type> l(mtx_);
616  
            return state_ == value;
617  
        }
618  
619  
        bool has_exception() const
620  
        {
621  
            std::unique_lock<mutex_type> l(mtx_);
622  
            return state_ == exception;
623  
        }
624  
625  
    protected:
626  
        mutable mutex_type mtx_;
627  
        completed_callback_type on_completed_;
628  
629  
    private:
630  
        local::detail::condition_variable cond_;    // threads waiting in read
631  
        state state_;                               // current state
632  
        typename future_data_storage<Result>::type storage_;
633  
    };
634  
635  
    ///////////////////////////////////////////////////////////////////////////
636  
    template <typename Result>
637  
    struct timed_future_data : future_data<Result>
638  
    {
639  
    public:
640  
        typedef future_data<Result> base_type;
641  
        typedef typename base_type::result_type result_type;
642  
        typedef typename base_type::mutex_type mutex_type;
643  
644  
    public:
645  
        timed_future_data() {}
646  
647  
        template <typename Result_>
648  
        timed_future_data(
649  
            util::steady_clock::time_point const& abs_time,
650  
            Result_&& init)
651  
        {
652  
            boost::intrusive_ptr<timed_future_data> this_(this);
653  
654  
            error_code ec;
655  
            threads::thread_id_type id = threads::register_thread_nullary(
656  
                util::bind(util::one_shot(&timed_future_data::set_value),
657  
                    std::move(this_),
658  
                    future_data_result<Result>::set(std::forward<Result_>(init))),
659  
                "timed_future_data<Result>::timed_future_data",
660  
                threads::suspended, true, threads::thread_priority_normal,
661  
                std::size_t(-1), threads::thread_stacksize_default, ec);
662  
            if (ec) {
663  
                // thread creation failed, report error to the new future
664  
                this->base_type::set_exception(hpx::detail::access_exception(ec));
665  
            }
666  
667  
            // start new thread at given point in time
668  
            threads::set_thread_state(id, abs_time, threads::pending,
669  
                threads::wait_timeout, threads::thread_priority_boost, ec);
670  
            if (ec) {
671  
                // thread scheduling failed, report error to the new future
672  
                this->base_type::set_exception(hpx::detail::access_exception(ec));
673  
            }
674  
        }
675  
676  
        void set_value(result_type const& value)
677  
        {
678  
            this->base_type::set_value(value);
679  
        }
680  
    };
681  
682  
    ///////////////////////////////////////////////////////////////////////////
683  
    template <typename Result>
684  
    struct task_base : future_data<Result>
685  
    {
686  
    protected:
687  
        typedef typename future_data<Result>::mutex_type mutex_type;
688  
        typedef boost::intrusive_ptr<task_base> future_base_type;
689  
        typedef typename future_data<Result>::result_type result_type;
690  
691  
    public:
692  
        task_base()
693  
          : started_(false), sched_(nullptr)
694  
        {}
695  
696  
        task_base(threads::executor& sched)
697  
          : started_(false),
698  
            sched_(sched ? &sched : nullptr)
699  
        {}
700  
701  
        virtual void execute_deferred(error_code& ec = throws)
702  
        {
703  
            if (!started_test_and_set())
704  
                this->do_run();
705  
        }
706  
707  
        // retrieving the value
708  
        virtual result_type* get_result(error_code& ec = throws)
709  
        {
710  
            if (!started_test_and_set())
711  
                this->do_run();
712  
            return this->future_data<Result>::get_result(ec);
713  
        }
714  
715  
        // wait support
716  
        virtual void wait(error_code& ec = throws)
717  
        {
718  
            if (!started_test_and_set())
719  
                this->do_run();
720  
            this->future_data<Result>::wait(ec);
721  
        }
722  
723  
        virtual future_status
724  
        wait_until(util::steady_clock::time_point const& abs_time,
725  
            error_code& ec = throws)
726  
        {
727  
            if (!started_test())
728  
                return future_status::deferred; //-V110
729  
            return this->future_data<Result>::wait_until(abs_time, ec);
730  
        }
731  
732  
    private:
733  
        bool started_test() const
734  
        {
735  
            std::lock_guard<mutex_type> l(this->mtx_);
736  
            return started_;
737  
        }
738  
739  
        bool started_test_and_set()
740  
        {
741  
            std::lock_guard<mutex_type> l(this->mtx_);
742  
            if (started_)
743  
                return true;
744  
745  
            started_ = true;
746  
            return false;
747  
        }
748  
749  
    protected:
750  
        void check_started()
751  
        {
752  
            std::lock_guard<mutex_type> l(this->mtx_);
753  
            if (started_) {
754  
                HPX_THROW_EXCEPTION(task_already_started,
755  
                    "task_base::check_started",
756  
                    "this task has already been started");
757  
                return;
758  
            }
759  
            started_ = true;
760  
        }
761  
762  
    public:
763  
        // run synchronously
764  
        void run()
765  
        {
766  
            check_started();
767  
            this->do_run();       // always on this thread
768  
        }
769  
770  
        // run in a separate thread
771  
        virtual threads::thread_id_type apply(launch policy,
772  
            threads::thread_priority priority,
773  
            threads::thread_stacksize stacksize, error_code& ec)
774  
        {
775  
            HPX_ASSERT(false);      // shouldn't ever be called
776  
            return threads::invalid_thread_id;
777  
        }
778  
779  
    protected:
780  
        static threads::thread_result_type run_impl(future_base_type this_)
781  
        {
782  
            this_->do_run();
783  
            return threads::thread_result_type(threads::terminated, nullptr);
784  
        }
785  
786  
    public:
787  
        template <typename T>
788  
        void set_data(T && result)
789  
        {
790  
            this->future_data<Result>::set_data(std::forward<T>(result));
791  
        }
792  
793  
        void set_exception(
794  
            boost::exception_ptr const& e, error_code& ec = throws)
795  
        {
796  
            this->future_data<Result>::set_exception(e, ec);
797  
        }
798  
799  
        virtual void do_run()
800  
        {
801  
            HPX_ASSERT(false);      // shouldn't ever be called
802  
        }
803  
804  
    protected:
805  
        bool started_;
806  
        threads::executor* sched_;
807  
    };
808  
809  
    ///////////////////////////////////////////////////////////////////////////
810  
    template <typename Result>
811  
    struct cancelable_task_base : task_base<Result>
812  
    {
813  
    protected:
814  
        typedef typename task_base<Result>::mutex_type mutex_type;
815  
        typedef boost::intrusive_ptr<cancelable_task_base> future_base_type;
816  
        typedef typename future_data<Result>::result_type result_type;
817  
818  
    protected:
819  
        threads::thread_id_type get_thread_id() const
820  
        {
821  
            std::lock_guard<mutex_type> l(this->mtx_);
822  
            return id_;
823  
        }
824  
        void set_thread_id(threads::thread_id_type id)
825  
        {
826  
            std::lock_guard<mutex_type> l(this->mtx_);
827  
            id_ = id;
828  
        }
829  
830  
    public:
831  
        cancelable_task_base()
832  
          : task_base<Result>(), id_(threads::invalid_thread_id)
833  
        {}
834  
835  
        cancelable_task_base(threads::executor& sched)
836  
          : task_base<Result>(sched), id_(threads::invalid_thread_id)
837  
        {}
838  
839  
    private:
840  
        struct reset_id
841  
        {
842  
            reset_id(cancelable_task_base& target)
843  
              : target_(target)
844  
            {
845  
                target.set_thread_id(threads::get_self_id());
846  
            }
847  
            ~reset_id()
848  
            {
849  
                target_.set_thread_id(threads::invalid_thread_id);
850  
            }
851  
            cancelable_task_base& target_;
852  
        };
853  
854  
    protected:
855  
        static threads::thread_result_type run_impl(future_base_type this_)
856  
        {
857  
            reset_id r(*this_);
858  
            this_->do_run();
859  
            return threads::thread_result_type(threads::terminated, nullptr);
860  
        }
861  
862  
    public:
863  
        // cancellation support
864  
        bool cancelable() const
865  
        {
866  
            return true;
867  
        }
868  
869  
        void cancel()
870  
        {
871  
            std::unique_lock<mutex_type> l(this->mtx_);
872  
            try {
873  
                if (!this->started_)
874  
                    HPX_THROW_THREAD_INTERRUPTED_EXCEPTION();
875  
876  
                if (this->is_ready_locked())
877  
                    return;   // nothing we can do
878  
879  
                if (id_ != threads::invalid_thread_id) {
880  
                    // interrupt the executing thread
881  
                    threads::interrupt_thread(id_);
882  
883  
                    this->started_ = true;
884  
885  
                    l.unlock();
886  
                    this->set_error(future_cancelled,
887  
                        "task_base<Result>::cancel",
888  
                        "future has been canceled");
889  
                }
890  
                else {
891  
                    HPX_THROW_EXCEPTION(future_can_not_be_cancelled,
892  
                        "task_base<Result>::cancel",
893  
                        "future can't be canceled at this time");
894  
                }
895  
            }
896  
            catch (...) {
897  
                this->started_ = true;
898  
                this->set_exception(boost::current_exception());
899  
                throw;
900  
            }
901  
        }
902  
903  
    protected:
904  
        threads::thread_id_type id_;
905  
    };
906  
}}}
907  
908  
#endif
909  

Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.