123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509 |
- [/
- Copyright Oliver Kowalke, Nat Goodspeed 2015.
- Distributed under the Boost Software License, Version 1.0.
- (See accompanying file LICENSE_1_0.txt or copy at
- http://www.boost.org/LICENSE_1_0.txt
- ]
- [/ import path is relative to this .qbk file]
- [import ../examples/wait_stuff.cpp]
- [#when_any]
- [section:when_any when_any / when_all functionality]
- [heading Overview]
- A bit of wisdom from the early days of computing still holds true today:
- prefer to model program state using the instruction pointer rather than with
- Boolean flags. In other words, if the program must ["do something] and then
- do something almost the same, but with minor changes... perhaps parts of that
- something should be broken out as smaller separate functions, rather than
- introducing flags to alter the internal behavior of a monolithic function.
- To that we would add: prefer to describe control flow using C++ native
- constructs such as function calls, `if`, `while`, `for`, `do` et al.
- rather than as chains of callbacks.
- One of the great strengths of __boost_fiber__ is the flexibility it confers on
- the coder to restructure an application from chains of callbacks to
- straightforward C++ statement sequence, even when code in that fiber is
- in fact interleaved with code running in other fibers.
- There has been much recent discussion about the benefits of when_any and
- when_all functionality. When dealing with asynchronous and possibly unreliable
- services, these are valuable idioms. But of course when_any and when_all are
- closely tied to the use of chains of callbacks.
- This section presents recipes for achieving the same ends, in the context of a
- fiber that wants to ["do something] when one or more other independent
- activities have completed. Accordingly, these are `wait_something()`
- functions rather than `when_something()` functions. The expectation is that
- the calling fiber asks to launch those independent activities, then waits for
- them, then sequentially proceeds with whatever processing depends on those
- results.
- The function names shown (e.g. [link wait_first_simple `wait_first_simple()`])
- are for illustrative purposes only, because all these functions have been
- bundled into a single source file. Presumably, if (say) [link
- wait_first_success `wait_first_success()`] best suits your application needs,
- you could introduce that variant with the name `wait_any()`.
- [note The functions presented in this section accept variadic argument lists
- of task functions. Corresponding `wait_something()` functions accepting a
- container of task functions are left as an exercise for the interested reader.
- Those should actually be simpler. Most of the complexity would arise from
- overloading the same name for both purposes.]
- [/ @path link is relative to (eventual) doc/html/index.html, hence ../..]
- All the source code for this section is found in
- [@../../examples/wait_stuff.cpp wait_stuff.cpp].
- [heading Example Task Function]
- [#wait_sleeper]
- We found it convenient to model an asynchronous task using this function:
- [wait_sleeper]
- with type-specific `sleeper()` ["front ends] for `std::string`, `double` and
- `int`.
- `Verbose` simply prints a message to `std::cout` on construction and
- destruction.
- Basically:
- # `sleeper()` prints a start message;
- # sleeps for the specified number of milliseconds;
- # if `thrw` is passed as `true`, throws a string description of the passed
- `item`;
- # else returns the passed `item`.
- # On the way out, `sleeper()` produces a stop message.
- This function will feature in the example calls to the various functions
- presented below.
- [section when_any]
- [#wait_first_simple_section]
- [section when_any, simple completion]
- The simplest case is when you only need to know that the first of a set of
- asynchronous tasks has completed [mdash] but you don't need to obtain a return
- value, and you're confident that they will not throw exceptions.
- [#wait_done]
- For this we introduce a `Done` class to wrap a `bool` variable with a
- [class_link condition_variable] and a [class_link mutex]:
- [wait_done]
- The pattern we follow throughout this section is to pass a
- [@http://www.cplusplus.com/reference/memory/shared_ptr/ `std::shared_ptr<>`]
- to the relevant synchronization object to the various tasks' fiber functions.
- This eliminates nagging questions about the lifespan of the synchronization
- object relative to the last of the fibers.
- [#wait_first_simple]
- `wait_first_simple()` uses that tactic for [link wait_done `Done`]:
- [wait_first_simple]
- [#wait_first_simple_impl]
- `wait_first_simple_impl()` is an ordinary recursion over the argument pack,
- capturing `Done::ptr` for each new fiber:
- [wait_first_simple_impl]
- The body of the fiber's lambda is extremely simple, as promised: call the
- function, notify [link wait_done `Done`] when it returns. The first fiber to
- do so allows `wait_first_simple()` to return [mdash] which is why it's useful
- to have `std::shared_ptr<Done>` manage the lifespan of our `Done` object
- rather than declaring it as a stack variable in `wait_first_simple()`.
- This is how you might call it:
- [wait_first_simple_ex]
- In this example, control resumes after `wait_first_simple()` when [link
- wait_sleeper `sleeper("wfs_short", 50)`] completes [mdash] even though the
- other two `sleeper()` fibers are still running.
- [endsect]
- [section when_any, return value]
- It seems more useful to add the ability to capture the return value from the
- first of the task functions to complete. Again, we assume that none will throw
- an exception.
- One tactic would be to adapt our [link wait_done `Done`] class to store the
- first of the return values, rather than a simple `bool`. However, we choose
- instead to use a [template_link buffered_channel]. We'll only need to enqueue
- the first value, so we'll [member_link buffered_channel..close] it once we've
- retrieved that value. Subsequent `push()` calls will return `closed`.
- [#wait_first_value]
- [wait_first_value]
- [#wait_first_value_impl]
- The meat of the `wait_first_value_impl()` function is as you might expect:
- [wait_first_value_impl]
- It calls the passed function, pushes its return value and ignores the `push()`
- result. You might call it like this:
- [wait_first_value_ex]
- [endsect]
- [section when_any, produce first outcome, whether result or exception]
- We may not be running in an environment in which we can guarantee no exception
- will be thrown by any of our task functions. In that case, the above
- implementations of `wait_first_something()` would be naïve: as mentioned in
- [link exceptions the section on Fiber Management], an uncaught exception in one
- of our task fibers would cause `std::terminate()` to be called.
- Let's at least ensure that such an exception would propagate to the fiber
- awaiting the first result. We can use [template_link future] to transport
- either a return value or an exception. Therefore, we will change [link
- wait_first_value `wait_first_value()`]'s [template_link buffered_channel] to
- hold `future< T >` items instead of simply `T`.
- Once we have a `future<>` in hand, all we need do is call [member_link
- future..get], which will either return the value or rethrow the exception.
- [#wait_first_outcome]
- [wait_first_outcome]
- So far so good [mdash] but there's a timing issue. How should we obtain the
- `future<>` to [member_link buffered_channel..push] on the queue?
- We could call [ns_function_link fibers..async]. That would certainly produce a
- `future<>` for the task function. The trouble is that it would return too
- quickly! We only want `future<>` items for ['completed] tasks on our
- `queue<>`. In fact, we only want the `future<>` for the one that
- completes first. If each fiber launched by `wait_first_outcome()` were to
- `push()` the result of calling `async()`, the queue would only ever report
- the result of the leftmost task item [mdash] ['not] the one that completes most
- quickly.
- Calling [member_link future..get] on the future returned by `async()` wouldn't
- be right. You can only call `get()` once per `future<>` instance! And if there
- were an exception, it would be rethrown inside the helper fiber at the
- producer end of the queue, rather than propagated to the consumer end.
- We could call [member_link future..wait]. That would block the helper fiber
- until the `future<>` became ready, at which point we could `push()` it to be
- retrieved by `wait_first_outcome()`.
- That would work [mdash] but there's a simpler tactic that avoids creating an extra
- fiber. We can wrap the task function in a [template_link packaged_task]. While
- one naturally thinks of passing a `packaged_task<>` to a new fiber [mdash] that is,
- in fact, what `async()` does [mdash] in this case, we're already running in the
- helper fiber at the producer end of the queue! We can simply ['call] the
- `packaged_task<>`. On return from that call, the task function has completed,
- meaning that the `future<>` obtained from the `packaged_task<>` is certain to
- be ready. At that point we can simply `push()` it to the queue.
- [#wait_first_outcome_impl]
- [wait_first_outcome_impl]
- Calling it might look like this:
- [wait_first_outcome_ex]
- [endsect]
- [section when_any, produce first success]
- One scenario for ["when_any] functionality is when we're redundantly contacting
- some number of possibly-unreliable web services. Not only might they be slow
- [mdash] any one of them might produce a failure rather than the desired
- result.
- In such a case, [link wait_first_outcome `wait_first_outcome()`] isn't the
- right approach. If one of the services produces an error quickly, while
- another follows up with a real answer, we don't want to prefer the error just
- because it arrived first!
- Given the `queue< future< T > >` we already constructed for
- `wait_first_outcome()`, though, we can readily recast the interface function
- to deliver the first ['successful] result.
- That does beg the question: what if ['all] the task functions throw an
- exception? In that case we'd probably better know about it.
- [#exception_list]
- The
- [@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/n4407.html#parallel.exceptions.synopsis
- C++ Parallelism Draft Technical Specification] proposes a
- `std::exception_list` exception capable of delivering a collection of
- `std::exception_ptr`s. Until that becomes universally available, let's fake up
- an `exception_list` of our own:
- [exception_list]
- Now we can build `wait_first_success()`, using [link wait_first_outcome_impl
- `wait_first_outcome_impl()`].
- Instead of retrieving only the first `future<>` from the queue, we must now
- loop over `future<>` items. Of course we must limit that iteration! If we
- launch only `count` producer fibers, the `(count+1)`[superscript st]
- [member_link buffered_channel..pop] call would block forever.
- Given a ready `future<>`, we can distinguish failure by calling [member_link
- future..get_exception_ptr]. If the `future<>` in fact contains a result rather
- than an exception, `get_exception_ptr()` returns `nullptr`. In that case, we
- can confidently call [member_link future..get] to return that result to our
- caller.
- If the `std::exception_ptr` is ['not] `nullptr`, though, we collect it into
- our pending `exception_list` and loop back for the next `future<>` from the
- queue.
- If we fall out of the loop [mdash] if every single task fiber threw an
- exception [mdash] we throw the `exception_list` exception into which we've
- been collecting those `std::exception_ptr`s.
- [#wait_first_success]
- [wait_first_success]
- A call might look like this:
- [wait_first_success_ex]
- [endsect]
- [section when_any, heterogeneous types]
- We would be remiss to ignore the case in which the various task functions have
- distinct return types. That means that the value returned by the first of them
- might have any one of those types. We can express that with
- [@http://www.boost.org/doc/libs/release/doc/html/variant.html Boost.Variant].
- To keep the example simple, we'll revert to pretending that none of them can
- throw an exception. That makes `wait_first_value_het()` strongly resemble
- [link wait_first_value `wait_first_value()`]. We can actually reuse [link
- wait_first_value_impl `wait_first_value_impl()`], merely passing
- `boost::variant<T0, T1, ...>` as the queue's value type rather than the
- common `T`!
- Naturally this could be extended to use [link wait_first_success
- `wait_first_success()`] semantics instead.
- [wait_first_value_het]
- It might be called like this:
- [wait_first_value_het_ex]
- [endsect]
- [section when_any, a dubious alternative]
- Certain topics in C++ can arouse strong passions, and exceptions are no
- exception. We cannot resist mentioning [mdash] for purely informational
- purposes [mdash] that when you need only the ['first] result from some number
- of concurrently-running fibers, it would be possible to pass a
- [^shared_ptr<[template_link promise]>] to the participating fibers, then cause
- the initiating fiber to call [member_link future..get] on its [template_link
- future]. The first fiber to call [member_link promise..set_value] on that
- shared `promise` will succeed; subsequent `set_value()` calls on the same
- `promise` instance will throw `future_error`.
- Use this information at your own discretion. Beware the dark side.
- [endsect]
- [endsect][/ when_any]
- [section when_all functionality]
- [section when_all, simple completion]
- For the case in which we must wait for ['all] task functions to complete
- [mdash] but we don't need results (or expect exceptions) from any of them
- [mdash] we can write `wait_all_simple()` that looks remarkably like [link
- wait_first_simple `wait_first_simple()`]. The difference is that instead of
- our [link wait_done `Done`] class, we instantiate a [class_link barrier] and
- call its [member_link barrier..wait].
- We initialize the `barrier` with `(count+1)` because we are launching `count`
- fibers, plus the `wait()` call within `wait_all_simple()` itself.
- [wait_all_simple]
- As stated above, the only difference between `wait_all_simple_impl()` and
- [link wait_first_simple_impl `wait_first_simple_impl()`] is that the former
- calls `barrier::wait()` rather than `Done::notify()`:
- [wait_all_simple_impl]
- You might call it like this:
- [wait_all_simple_ex]
- Control will not return from the `wait_all_simple()` call until the last of
- its task functions has completed.
- [endsect]
- [section when_all, return values]
- As soon as we want to collect return values from all the task functions, we
- can see right away how to reuse [link wait_first_value `wait_first_value()`]'s
- queue<T> for the purpose. All we have to do is avoid closing it after the
- first value!
- But in fact, collecting multiple values raises an interesting question: do we
- ['really] want to wait until the slowest of them has arrived? Wouldn't we
- rather process each result as soon as it becomes available?
- Fortunately we can present both APIs. Let's define `wait_all_values_source()`
- to return `shared_ptr<buffered_channel<T>>`.
- [#wait_all_values]
- Given `wait_all_values_source()`, it's straightforward to implement
- `wait_all_values()`:
- [wait_all_values]
- It might be called like this:
- [wait_all_values_ex]
- As you can see from the loop in `wait_all_values()`, instead of requiring its
- caller to count values, we define `wait_all_values_source()` to [member_link
- buffered_channel..close] the queue when done. But how do we do that? Each
- producer fiber is independent. It has no idea whether it is the last one to
- [member_link buffered_channel..push] a value.
- [#wait_nqueue]
- We can address that problem with a counting façade for the
- `queue<>`. In fact, our façade need only support the producer end of
- the queue.
- [wait_nqueue]
- [#wait_all_values_source]
- Armed with `nqueue<>`, we can implement `wait_all_values_source()`. It
- starts just like [link wait_first_value `wait_first_value()`]. The difference
- is that we wrap the `queue<T>` with an `nqueue<T>` to pass to
- the producer fibers.
- Then, of course, instead of popping the first value, closing the queue and
- returning it, we simply return the `shared_ptr<queue<T>>`.
- [wait_all_values_source]
- For example:
- [wait_all_values_source_ex]
- [#wait_all_values_impl]
- `wait_all_values_impl()` really is just like [link wait_first_value_impl
- `wait_first_value_impl()`] except for the use of `nqueue<T>` rather than
- `queue<T>`:
- [wait_all_values_impl]
- [endsect]
- [section when_all until first exception]
- Naturally, just as with [link wait_first_outcome `wait_first_outcome()`], we
- can elaborate [link wait_all_values `wait_all_values()`] and [link
- wait_all_values_source `wait_all_values_source()`] by passing `future< T >`
- instead of plain `T`.
- [#wait_all_until_error]
- `wait_all_until_error()` pops that `future< T >` and calls its [member_link
- future..get]:
- [wait_all_until_error]
- For example:
- [wait_all_until_error_ex]
- [#wait_all_until_error_source]
- Naturally this complicates the API for `wait_all_until_error_source()`. The
- caller must both retrieve a `future< T >` and call its `get()` method. It would,
- of course, be possible to return a façade over the consumer end of the
- queue that would implicitly perform the `get()` and return a simple `T` (or
- throw).
- The implementation is just as you would expect. Notice, however, that we can
- reuse [link wait_first_outcome_impl `wait_first_outcome_impl()`], passing the
- `nqueue<T>` rather than `queue<T>`.
- [wait_all_until_error_source]
- For example:
- [wait_all_until_error_source_ex]
- [endsect]
- [section wait_all, collecting all exceptions]
- [#wait_all_collect_errors]
- Given [link wait_all_until_error_source `wait_all_until_error_source()`], it
- might be more reasonable to make a `wait_all_...()` that collects ['all]
- errors instead of presenting only the first:
- [wait_all_collect_errors]
- The implementation is a simple variation on [link wait_first_success
- `wait_first_success()`], using the same [link exception_list `exception_list`]
- exception class.
- [endsect]
- [section when_all, heterogeneous types]
- But what about the case when we must wait for all results of different types?
- We can present an API that is frankly quite cool. Consider a sample struct:
- [wait_Data]
- Let's fill its members from task functions all running concurrently:
- [wait_all_members_data_ex]
- Note that for this case, we abandon the notion of capturing the earliest
- result first, and so on: we must fill exactly the passed struct in
- left-to-right order.
- That permits a beautifully simple implementation:
- [wait_all_members]
- [wait_all_members_get]
- It is tempting to try to implement `wait_all_members()` as a one-liner like
- this:
- return Result{ boost::fibers::async(functions).get()... };
- The trouble with this tactic is that it would serialize all the task
- functions. The runtime makes a single pass through `functions`, calling
- [ns_function_link fibers..async] for each and then immediately calling
- [member_link future..get] on its returned `future<>`. That blocks the implicit
- loop. The above is almost equivalent to writing:
- return Result{ functions()... };
- in which, of course, there is no concurrency at all.
- Passing the argument pack through a function-call boundary
- (`wait_all_members_get()`) forces the runtime to make ['two] passes: one in
- `wait_all_members()` to collect the `future<>`s from all the `async()` calls,
- the second in `wait_all_members_get()` to fetch each of the results.
- As noted in comments, within the `wait_all_members_get()` parameter pack
- expansion pass, the blocking behavior of `get()` becomes irrelevant. Along the
- way, we will hit the `get()` for the slowest task function; after that every
- subsequent `get()` will complete in trivial time.
- By the way, we could also use this same API to fill a vector or other
- collection:
- [wait_all_members_vector_ex]
- [endsect]
- [endsect][/ when_all]
- [endsect][/ outermost]
|