/home/users/khuck/src/hpx-lsu/hpx/lcos/wait_all.hpp

Line% of fetchesSource
1  
//  Copyright (c) 2007-2015 Hartmut Kaiser
2  
//  Copyright (c) 2013 Agustin Berge
3  
//
4  
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
7  
/// \file lcos/wait_all.hpp
8  
9  
#if !defined(HPX_LCOS_WAIT_ALL_APR_19_2012_1140AM)
10  
#define HPX_LCOS_WAIT_ALL_APR_19_2012_1140AM
11  
12  
#if defined(DOXYGEN)
13  
namespace hpx
14  
{
15  
    /// The function \a wait_all is an operator allowing to join on the result
16  
    /// of all given futures. It AND-composes all future objects given and
17  
    /// returns after they finished executing.
18  
    ///
19  
    /// \param first    The iterator pointing to the first element of a
20  
    ///                 sequence of \a future or \a shared_future objects for
21  
    ///                 which \a wait_all should wait.
22  
    /// \param last     The iterator pointing to the last element of a
23  
    ///                 sequence of \a future or \a shared_future objects for
24  
    ///                 which \a wait_all should wait.
25  
    ///
26  
    /// \note The function \a wait_all returns after all futures have become
27  
    ///       ready. All input futures are still valid after \a wait_all
28  
    ///       returns.
29  
    ///
30  
    template <typename InputIter>
31  
    void wait_all(InputIter first, InputIter last);
32  
33  
    /// The function \a wait_all is an operator allowing to join on the result
34  
    /// of all given futures. It AND-composes all future objects given and
35  
    /// returns after they finished executing.
36  
    ///
37  
    /// \param futures  A vector holding an arbitrary amount of \a future or
38  
    ///                 \a shared_future objects for which \a wait_all should
39  
    ///                 wait.
40  
    ///
41  
    /// \note The function \a wait_all returns after all futures have become
42  
    ///       ready. All input futures are still valid after \a wait_all
43  
    ///       returns.
44  
    ///
45  
    template <typename R>
46  
    void wait_all(std::vector<future<R>>&& futures);
47  
48  
    /// The function \a wait_all is an operator allowing to join on the result
49  
    /// of all given futures. It AND-composes all future objects given and
50  
    /// returns after they finished executing.
51  
    ///
52  
    /// \param futures  An arbitrary number of \a future or \a shared_future
53  
    ///                 objects, possibly holding different types for which
54  
    ///                 \a wait_all should wait.
55  
    ///
56  
    /// \note The function \a wait_all returns after all futures have become
57  
    ///       ready. All input futures are still valid after \a wait_all
58  
    ///       returns.
59  
    ///
60  
    template <typename ...T>
61  
    void wait_all(T &&... futures);
62  
63  
    /// The function \a wait_all_n is an operator allowing to join on the result
64  
    /// of all given futures. It AND-composes all future objects given and
65  
    /// returns after they finished executing.
66  
    ///
67  
    /// \param begin    The iterator pointing to the first element of a
68  
    ///                 sequence of \a future or \a shared_future objects for
69  
    ///                 which \a wait_all_n should wait.
70  
    /// \param count    The number of elements in the sequence starting at
71  
    ///                 \a first.
72  
    ///
73  
    /// \return         The function \a wait_all_n will return an iterator
74  
    ///                 referring to the first element in the input sequence
75  
    ///                 after the last processed element.
76  
    ///
77  
    /// \note The function \a wait_all_n returns after all futures have become
78  
    ///       ready. All input futures are still valid after \a wait_all_n
79  
    ///       returns.
80  
    ///
81  
    template <typename InputIter>
82  
    InputIter wait_all_n(InputIter begin, std::size_t count);
83  
}
84  
85  
#else // DOXYGEN
86  
87  
#include <hpx/config.hpp>
88  
#include <hpx/lcos/detail/future_data.hpp>
89  
#include <hpx/lcos/future.hpp>
90  
#include <hpx/lcos/wait_some.hpp>
91  
#include <hpx/traits/acquire_shared_state.hpp>
92  
#include <hpx/traits/future_access.hpp>
93  
#include <hpx/traits/future_traits.hpp>
94  
#include <hpx/traits/is_future.hpp>
95  
#include <hpx/util/always_void.hpp>
96  
#include <hpx/util/decay.hpp>
97  
#include <hpx/util/deferred_call.hpp>
98  
#include <hpx/util/tuple.hpp>
99  
100  
#include <boost/intrusive_ptr.hpp>
101  
#include <boost/range/functions.hpp>
102  
103  
#include <algorithm>
104  
#include <cstddef>
105  
#include <iterator>
106  
#include <type_traits>
107  
#include <utility>
108  
#include <vector>
109  
110  
///////////////////////////////////////////////////////////////////////////////
111  
namespace hpx { namespace lcos
112  
{
113  
    namespace detail
114  
    {
115  
        ///////////////////////////////////////////////////////////////////////
116  
        template <typename Future, typename Enable = void>
117  
        struct is_future_or_shared_state
118  
          : traits::is_future<Future>
119  
        {};
120  
121  
        template <typename R>
122  
        struct is_future_or_shared_state<
123  
                boost::intrusive_ptr<future_data<R> > >
124  
          : std::true_type
125  
        {};
126  
127  
        ///////////////////////////////////////////////////////////////////////
128  
        template <typename Range, typename Enable = void>
129  
        struct is_future_or_shared_state_range
130  
            : std::false_type
131  
        {};
132  
133  
        template <typename T>
134  
        struct is_future_or_shared_state_range<std::vector<T> >
135  
            : is_future_or_shared_state<T>
136  
        {};
137  
138  
        ///////////////////////////////////////////////////////////////////////
139  
        template <typename Future, typename Enable = void>
140  
        struct future_or_shared_state_result;
141  
142  
        template <typename Future>
143  
        struct future_or_shared_state_result<Future,
144  
                typename std::enable_if<traits::is_future<Future>::value>::type>
145  
          : traits::future_traits<Future>
146  
        {};
147  
148  
        template <typename R>
149  
        struct future_or_shared_state_result<
150  
            boost::intrusive_ptr<future_data<R> > >
151  
        {
152  
            typedef R type;
153  
        };
154  
155  
        ///////////////////////////////////////////////////////////////////////
156  
        template <typename Tuple>
157  
        struct wait_all_frame //-V690
158  
          : hpx::lcos::detail::future_data<void>
159  
        {
160  
        private:
161  
            // workaround gcc regression wrongly instantiating constructors
162  
            wait_all_frame();
163  
            wait_all_frame(wait_all_frame const&);
164  
165  
            template <std::size_t I>
166  
            struct is_end
167  
              : std::integral_constant<
168  
                    bool,
169  
                    util::tuple_size<Tuple>::value == I
170  
                >
171  
            {};
172  
173  
        public:
174  
            wait_all_frame(Tuple const& t)
175  
              : t_(t)
176  
            {}
177  
178  
        protected:
179  
            // End of the tuple is reached
180  
            template <std::size_t I>
181  
            HPX_FORCEINLINE
182  
            void do_await(std::true_type)
183  
            {
184  
                this->set_value(util::unused);     // simply make ourself ready
185  
            }
186  
187  
            // Current element is a range (vector) of futures
188  
            template <std::size_t I, typename Iter>
189  
            void await_range(Iter next, Iter end)
190  
            {
191  
                typedef typename std::iterator_traits<Iter>::value_type
192  
                    future_type;
193  
                typedef typename detail::future_or_shared_state_result<
194  
                        future_type
195  
                    >::type future_result_type;
196  
197  
                void (wait_all_frame::*f)(Iter, Iter) =
198  
                    &wait_all_frame::await_range<I>;
199  
200  
                for (/**/; next != end; ++next)
201  
                {
202  
                    boost::intrusive_ptr<
203  
                        lcos::detail::future_data<future_result_type>
204  
                    > next_future_data =
205  
                        traits::detail::get_shared_state(*next);
206  
207  
                    if (!next_future_data->is_ready())
208  
                    {
209  
                        next_future_data->execute_deferred();
210  
211  
                        // execute_deferred might have made the future ready
212  
                        if (!next_future_data->is_ready())
213  
                        {
214  
                            // Attach a continuation to this future which will
215  
                            // re-evaluate it and continue to the next element
216  
                            // in the sequence (if any).
217  
                            boost::intrusive_ptr<wait_all_frame> this_(this);
218  
                            next_future_data->set_on_completed(
219  
                                util::deferred_call(f, std::move(this_),
220  
                                    std::move(next), std::move(end)));
221  
                            return;
222  
                        }
223  
                    }
224  
                }
225  
226  
                // All elements of the sequence are ready now, proceed to the
227  
                // next argument.
228  
                do_await<I + 1>(is_end<I + 1>());
229  
            }
230  
231  
            template <std::size_t I>
232  
            HPX_FORCEINLINE
233  
            void await_next(std::false_type, std::true_type)
234  
            {
235  
                await_range<I>(
236  
                    boost::begin(boost::unwrap_ref(util::get<I>(t_))),
237  
                    boost::end(boost::unwrap_ref(util::get<I>(t_))));
238  
            }
239  
240  
            // Current element is a simple future
241  
            template <std::size_t I>
242  
            HPX_FORCEINLINE
243  
            void await_next(std::true_type, std::false_type)
244  
            {
245  
                typedef typename util::decay_unwrap<
246  
                    typename util::tuple_element<I, Tuple>::type
247  
                >::type future_type;
248  
249  
                typedef typename detail::future_or_shared_state_result<
250  
                        future_type
251  
                    >::type future_result_type;
252  
253  
                boost::intrusive_ptr<
254  
                    lcos::detail::future_data<future_result_type>
255  
                > next_future_data = traits::detail::get_shared_state(
256  
                    util::get<I>(t_));
257  
258  
                if (!next_future_data->is_ready())
259  
                {
260  
                    next_future_data->execute_deferred();
261  
262  
                    // execute_deferred might have made the future ready
263  
                    if (!next_future_data->is_ready())
264  
                    {
265  
                        // Attach a continuation to this future which will
266  
                        // re-evaluate it and continue to the next argument
267  
                        // (if any).
268  
                        void (wait_all_frame::*f)(std::true_type, std::false_type) =
269  
                            &wait_all_frame::await_next<I>;
270  
271  
                        boost::intrusive_ptr<wait_all_frame> this_(this);
272  
                        next_future_data->set_on_completed(util::deferred_call(
273  
                            f, std::move(this_), std::true_type(), std::false_type()));
274  
                        return;
275  
                    }
276  
                }
277  
278  
                do_await<I + 1>(is_end<I + 1>());
279  
            }
280  
281  
            template <std::size_t I>
282  
            HPX_FORCEINLINE
283  
            void do_await(std::false_type)
284  
            {
285  
                typedef typename util::decay_unwrap<
286  
                    typename util::tuple_element<I, Tuple>::type
287  
                >::type future_type;
288  
289  
                typedef typename detail::is_future_or_shared_state<future_type>::type
290  
                    is_future;
291  
                typedef typename detail::is_future_or_shared_state_range<future_type>
292  
                      ::type
293  
                    is_range;
294  
295  
                await_next<I>(is_future(), is_range());
296  
            }
297  
298  
        public:
299  
            void wait_all()
300  
            {
301  
                do_await<0>(is_end<0>());
302  
303  
                // If there are still futures which are not ready, suspend and
304  
                // wait.
305  
                if (!this->is_ready())
306  
                    this->wait();
307  
            }
308  
309  
        private:
310  
            Tuple const& t_;
311  
        };
312  
    }
313  
314  
    ///////////////////////////////////////////////////////////////////////////
315  
    template <typename Future>
316  
    void wait_all(std::vector<Future> const& values)
317  
    {
318  
        typedef util::tuple<std::vector<Future> const&> result_type;
319  
        typedef detail::wait_all_frame<result_type> frame_type;
320  
321  
        result_type data(values);
322  
        boost::intrusive_ptr<frame_type> frame(new frame_type(data));
323  
        frame->wait_all();
324  
    }
325  
326  
    template <typename Future>
327  
    HPX_FORCEINLINE void wait_all(std::vector<Future>& values)
328  
    {
329  
        lcos::wait_all(const_cast<std::vector<Future> const&>(values));
330  
    }
331  
332  
    template <typename Future>
333  
    HPX_FORCEINLINE void wait_all(std::vector<Future>&& values)
334  
    {
335  
        lcos::wait_all(const_cast<std::vector<Future> const&>(values));
336  
    }
337  
338  
    template <typename Iterator>
339  
    typename util::always_void<
340  
        typename lcos::detail::future_iterator_traits<Iterator>::type
341  
    >::type
342  
    wait_all(Iterator begin, Iterator end)
343  
    {
344  
        typedef typename lcos::detail::future_iterator_traits<Iterator>::type
345  
            future_type;
346  
        typedef typename traits::detail::shared_state_ptr_for<future_type>::type
347  
            shared_state_ptr;
348  
        typedef std::vector<shared_state_ptr> result_type;
349  
350  
        result_type values;
351  
        std::transform(begin, end, std::back_inserter(values),
352  
            detail::wait_get_shared_state<future_type>());
353  
354  
        lcos::wait_all(values);
355  
    }
356  
357  
    template <typename Iterator>
358  
    Iterator
359  
    wait_all_n(Iterator begin, std::size_t count)
360  
    {
361  
        typedef typename lcos::detail::future_iterator_traits<Iterator>::type
362  
            future_type;
363  
        typedef typename traits::detail::shared_state_ptr_for<future_type>::type
364  
            shared_state_ptr;
365  
        typedef std::vector<shared_state_ptr> result_type;
366  
367  
        result_type values;
368  
        values.reserve(count);
369  
370  
        detail::wait_get_shared_state<future_type> func;
371  
        for (std::size_t i = 0; i != count; ++i)
372  
            values.push_back(func(*begin++));
373  
374  
        lcos::wait_all(std::move(values));
375  
376  
        return begin;
377  
    }
378  
379  
    inline void wait_all()
380  
    {
381  
    }
382  
383  
    ///////////////////////////////////////////////////////////////////////////
384  
    template <typename... Ts>
385  
    void wait_all(Ts&&... ts)
386  
    {
387  
        typedef util::tuple<
388  
                typename traits::detail::shared_state_ptr_for<Ts>::type...
389  
            > result_type;
390  
        typedef detail::wait_all_frame<result_type> frame_type;
391  
392  
        result_type values =
393  
            result_type(traits::detail::get_shared_state(ts)...);
394  
395  
        boost::intrusive_ptr<frame_type> frame(new frame_type(values));
396  
        frame->wait_all();
397  
    }
398  
}}
399  
400  
namespace hpx
401  
{
402  
    using lcos::wait_all;
403  
    using lcos::wait_all_n;
404  
}
405  
406  
#endif // DOXYGEN
407  
#endif
408  

Copyright (c) 2006-2012 Rogue Wave Software, Inc. All Rights Reserved.
Patents pending.