when_any.qbk 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. [/
  2. Copyright Oliver Kowalke, Nat Goodspeed 2015.
  3. Distributed under the Boost Software License, Version 1.0.
  4. (See accompanying file LICENSE_1_0.txt or copy at
  5. http://www.boost.org/LICENSE_1_0.txt
  6. ]
  7. [/ import path is relative to this .qbk file]
  8. [import ../examples/wait_stuff.cpp]
  9. [#when_any]
  10. [section:when_any when_any / when_all functionality]
  11. [heading Overview]
  12. A bit of wisdom from the early days of computing still holds true today:
  13. prefer to model program state using the instruction pointer rather than with
  14. Boolean flags. In other words, if the program must ["do something] and then
  15. do something almost the same, but with minor changes... perhaps parts of that
  16. something should be broken out as smaller separate functions, rather than
  17. introducing flags to alter the internal behavior of a monolithic function.
  18. To that we would add: prefer to describe control flow using C++ native
  19. constructs such as function calls, `if`, `while`, `for`, `do` et al.
  20. rather than as chains of callbacks.
  21. One of the great strengths of __boost_fiber__ is the flexibility it confers on
  22. the coder to restructure an application from chains of callbacks to
  23. straightforward C++ statement sequence, even when code in that fiber is
  24. in fact interleaved with code running in other fibers.
  25. There has been much recent discussion about the benefits of when_any and
  26. when_all functionality. When dealing with asynchronous and possibly unreliable
  27. services, these are valuable idioms. But of course when_any and when_all are
  28. closely tied to the use of chains of callbacks.
  29. This section presents recipes for achieving the same ends, in the context of a
  30. fiber that wants to ["do something] when one or more other independent
  31. activities have completed. Accordingly, these are `wait_something()`
  32. functions rather than `when_something()` functions. The expectation is that
  33. the calling fiber asks to launch those independent activities, then waits for
  34. them, then sequentially proceeds with whatever processing depends on those
  35. results.
  36. The function names shown (e.g. [link wait_first_simple `wait_first_simple()`])
  37. are for illustrative purposes only, because all these functions have been
  38. bundled into a single source file. Presumably, if (say) [link
  39. wait_first_success `wait_first_success()`] best suits your application needs,
  40. you could introduce that variant with the name `wait_any()`.
  41. [note The functions presented in this section accept variadic argument lists
  42. of task functions. Corresponding `wait_something()` functions accepting a
  43. container of task functions are left as an exercise for the interested reader.
  44. Those should actually be simpler. Most of the complexity would arise from
  45. overloading the same name for both purposes.]
  46. [/ @path link is relative to (eventual) doc/html/index.html, hence ../..]
  47. All the source code for this section is found in
  48. [@../../examples/wait_stuff.cpp wait_stuff.cpp].
  49. [heading Example Task Function]
  50. [#wait_sleeper]
  51. We found it convenient to model an asynchronous task using this function:
  52. [wait_sleeper]
  53. with type-specific `sleeper()` ["front ends] for `std::string`, `double` and
  54. `int`.
  55. `Verbose` simply prints a message to `std::cout` on construction and
  56. destruction.
  57. Basically:
  58. # `sleeper()` prints a start message;
  59. # sleeps for the specified number of milliseconds;
  60. # if `thrw` is passed as `true`, throws a string description of the passed
  61. `item`;
  62. # else returns the passed `item`.
  63. # On the way out, `sleeper()` produces a stop message.
  64. This function will feature in the example calls to the various functions
  65. presented below.
  66. [section when_any]
  67. [#wait_first_simple_section]
  68. [section when_any, simple completion]
  69. The simplest case is when you only need to know that the first of a set of
  70. asynchronous tasks has completed [mdash] but you don't need to obtain a return
  71. value, and you're confident that they will not throw exceptions.
  72. [#wait_done]
  73. For this we introduce a `Done` class to wrap a `bool` variable with a
  74. [class_link condition_variable] and a [class_link mutex]:
  75. [wait_done]
  76. The pattern we follow throughout this section is to pass a
  77. [@http://www.cplusplus.com/reference/memory/shared_ptr/ `std::shared_ptr<>`]
  78. to the relevant synchronization object to the various tasks' fiber functions.
  79. This eliminates nagging questions about the lifespan of the synchronization
  80. object relative to the last of the fibers.
  81. [#wait_first_simple]
  82. `wait_first_simple()` uses that tactic for [link wait_done `Done`]:
  83. [wait_first_simple]
  84. [#wait_first_simple_impl]
  85. `wait_first_simple_impl()` is an ordinary recursion over the argument pack,
  86. capturing `Done::ptr` for each new fiber:
  87. [wait_first_simple_impl]
  88. The body of the fiber's lambda is extremely simple, as promised: call the
  89. function, notify [link wait_done `Done`] when it returns. The first fiber to
  90. do so allows `wait_first_simple()` to return [mdash] which is why it's useful
  91. to have `std::shared_ptr<Done>` manage the lifespan of our `Done` object
  92. rather than declaring it as a stack variable in `wait_first_simple()`.
  93. This is how you might call it:
  94. [wait_first_simple_ex]
  95. In this example, control resumes after `wait_first_simple()` when [link
  96. wait_sleeper `sleeper("wfs_short", 50)`] completes [mdash] even though the
  97. other two `sleeper()` fibers are still running.
  98. [endsect]
  99. [section when_any, return value]
  100. It seems more useful to add the ability to capture the return value from the
  101. first of the task functions to complete. Again, we assume that none will throw
  102. an exception.
  103. One tactic would be to adapt our [link wait_done `Done`] class to store the
  104. first of the return values, rather than a simple `bool`. However, we choose
  105. instead to use a [template_link buffered_channel]. We'll only need to enqueue
  106. the first value, so we'll [member_link buffered_channel..close] it once we've
  107. retrieved that value. Subsequent `push()` calls will return `closed`.
  108. [#wait_first_value]
  109. [wait_first_value]
  110. [#wait_first_value_impl]
  111. The meat of the `wait_first_value_impl()` function is as you might expect:
  112. [wait_first_value_impl]
  113. It calls the passed function, pushes its return value and ignores the `push()`
  114. result. You might call it like this:
  115. [wait_first_value_ex]
  116. [endsect]
  117. [section when_any, produce first outcome, whether result or exception]
  118. We may not be running in an environment in which we can guarantee no exception
  119. will be thrown by any of our task functions. In that case, the above
  120. implementations of `wait_first_something()` would be naïve: as mentioned in
  121. [link exceptions the section on Fiber Management], an uncaught exception in one
  122. of our task fibers would cause `std::terminate()` to be called.
  123. Let's at least ensure that such an exception would propagate to the fiber
  124. awaiting the first result. We can use [template_link future] to transport
  125. either a return value or an exception. Therefore, we will change [link
  126. wait_first_value `wait_first_value()`]'s [template_link buffered_channel] to
  127. hold `future< T >` items instead of simply `T`.
  128. Once we have a `future<>` in hand, all we need do is call [member_link
  129. future..get], which will either return the value or rethrow the exception.
  130. [#wait_first_outcome]
  131. [wait_first_outcome]
  132. So far so good [mdash] but there's a timing issue. How should we obtain the
  133. `future<>` to [member_link buffered_channel..push] on the queue?
  134. We could call [ns_function_link fibers..async]. That would certainly produce a
  135. `future<>` for the task function. The trouble is that it would return too
  136. quickly! We only want `future<>` items for ['completed] tasks on our
  137. `queue<>`. In fact, we only want the `future<>` for the one that
  138. completes first. If each fiber launched by `wait_first_outcome()` were to
  139. `push()` the result of calling `async()`, the queue would only ever report
  140. the result of the leftmost task item [mdash] ['not] the one that completes most
  141. quickly.
  142. Calling [member_link future..get] on the future returned by `async()` wouldn't
  143. be right. You can only call `get()` once per `future<>` instance! And if there
  144. were an exception, it would be rethrown inside the helper fiber at the
  145. producer end of the queue, rather than propagated to the consumer end.
  146. We could call [member_link future..wait]. That would block the helper fiber
  147. until the `future<>` became ready, at which point we could `push()` it to be
  148. retrieved by `wait_first_outcome()`.
  149. That would work [mdash] but there's a simpler tactic that avoids creating an extra
  150. fiber. We can wrap the task function in a [template_link packaged_task]. While
  151. one naturally thinks of passing a `packaged_task<>` to a new fiber [mdash] that is,
  152. in fact, what `async()` does [mdash] in this case, we're already running in the
  153. helper fiber at the producer end of the queue! We can simply ['call] the
  154. `packaged_task<>`. On return from that call, the task function has completed,
  155. meaning that the `future<>` obtained from the `packaged_task<>` is certain to
  156. be ready. At that point we can simply `push()` it to the queue.
  157. [#wait_first_outcome_impl]
  158. [wait_first_outcome_impl]
  159. Calling it might look like this:
  160. [wait_first_outcome_ex]
  161. [endsect]
  162. [section when_any, produce first success]
  163. One scenario for ["when_any] functionality is when we're redundantly contacting
  164. some number of possibly-unreliable web services. Not only might they be slow
  165. [mdash] any one of them might produce a failure rather than the desired
  166. result.
  167. In such a case, [link wait_first_outcome `wait_first_outcome()`] isn't the
  168. right approach. If one of the services produces an error quickly, while
  169. another follows up with a real answer, we don't want to prefer the error just
  170. because it arrived first!
  171. Given the `queue< future< T > >` we already constructed for
  172. `wait_first_outcome()`, though, we can readily recast the interface function
  173. to deliver the first ['successful] result.
  174. That does beg the question: what if ['all] the task functions throw an
  175. exception? In that case we'd probably better know about it.
  176. [#exception_list]
  177. The
  178. [@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/n4407.html#parallel.exceptions.synopsis
  179. C++ Parallelism Draft Technical Specification] proposes a
  180. `std::exception_list` exception capable of delivering a collection of
  181. `std::exception_ptr`s. Until that becomes universally available, let's fake up
  182. an `exception_list` of our own:
  183. [exception_list]
  184. Now we can build `wait_first_success()`, using [link wait_first_outcome_impl
  185. `wait_first_outcome_impl()`].
  186. Instead of retrieving only the first `future<>` from the queue, we must now
  187. loop over `future<>` items. Of course we must limit that iteration! If we
  188. launch only `count` producer fibers, the `(count+1)`[superscript st]
  189. [member_link buffered_channel..pop] call would block forever.
  190. Given a ready `future<>`, we can distinguish failure by calling [member_link
  191. future..get_exception_ptr]. If the `future<>` in fact contains a result rather
  192. than an exception, `get_exception_ptr()` returns `nullptr`. In that case, we
  193. can confidently call [member_link future..get] to return that result to our
  194. caller.
  195. If the `std::exception_ptr` is ['not] `nullptr`, though, we collect it into
  196. our pending `exception_list` and loop back for the next `future<>` from the
  197. queue.
  198. If we fall out of the loop [mdash] if every single task fiber threw an
  199. exception [mdash] we throw the `exception_list` exception into which we've
  200. been collecting those `std::exception_ptr`s.
  201. [#wait_first_success]
  202. [wait_first_success]
  203. A call might look like this:
  204. [wait_first_success_ex]
  205. [endsect]
  206. [section when_any, heterogeneous types]
  207. We would be remiss to ignore the case in which the various task functions have
  208. distinct return types. That means that the value returned by the first of them
  209. might have any one of those types. We can express that with
  210. [@http://www.boost.org/doc/libs/release/doc/html/variant.html Boost.Variant].
  211. To keep the example simple, we'll revert to pretending that none of them can
  212. throw an exception. That makes `wait_first_value_het()` strongly resemble
  213. [link wait_first_value `wait_first_value()`]. We can actually reuse [link
  214. wait_first_value_impl `wait_first_value_impl()`], merely passing
  215. `boost::variant<T0, T1, ...>` as the queue's value type rather than the
  216. common `T`!
  217. Naturally this could be extended to use [link wait_first_success
  218. `wait_first_success()`] semantics instead.
  219. [wait_first_value_het]
  220. It might be called like this:
  221. [wait_first_value_het_ex]
  222. [endsect]
  223. [section when_any, a dubious alternative]
  224. Certain topics in C++ can arouse strong passions, and exceptions are no
  225. exception. We cannot resist mentioning [mdash] for purely informational
  226. purposes [mdash] that when you need only the ['first] result from some number
  227. of concurrently-running fibers, it would be possible to pass a
  228. [^shared_ptr<[template_link promise]>] to the participating fibers, then cause
  229. the initiating fiber to call [member_link future..get] on its [template_link
  230. future]. The first fiber to call [member_link promise..set_value] on that
  231. shared `promise` will succeed; subsequent `set_value()` calls on the same
  232. `promise` instance will throw `future_error`.
  233. Use this information at your own discretion. Beware the dark side.
  234. [endsect]
  235. [endsect][/ when_any]
  236. [section when_all functionality]
  237. [section when_all, simple completion]
  238. For the case in which we must wait for ['all] task functions to complete
  239. [mdash] but we don't need results (or expect exceptions) from any of them
  240. [mdash] we can write `wait_all_simple()` that looks remarkably like [link
  241. wait_first_simple `wait_first_simple()`]. The difference is that instead of
  242. our [link wait_done `Done`] class, we instantiate a [class_link barrier] and
  243. call its [member_link barrier..wait].
  244. We initialize the `barrier` with `(count+1)` because we are launching `count`
  245. fibers, plus the `wait()` call within `wait_all_simple()` itself.
  246. [wait_all_simple]
  247. As stated above, the only difference between `wait_all_simple_impl()` and
  248. [link wait_first_simple_impl `wait_first_simple_impl()`] is that the former
  249. calls `barrier::wait()` rather than `Done::notify()`:
  250. [wait_all_simple_impl]
  251. You might call it like this:
  252. [wait_all_simple_ex]
  253. Control will not return from the `wait_all_simple()` call until the last of
  254. its task functions has completed.
  255. [endsect]
  256. [section when_all, return values]
  257. As soon as we want to collect return values from all the task functions, we
  258. can see right away how to reuse [link wait_first_value `wait_first_value()`]'s
  259. queue<T> for the purpose. All we have to do is avoid closing it after the
  260. first value!
  261. But in fact, collecting multiple values raises an interesting question: do we
  262. ['really] want to wait until the slowest of them has arrived? Wouldn't we
  263. rather process each result as soon as it becomes available?
  264. Fortunately we can present both APIs. Let's define `wait_all_values_source()`
  265. to return `shared_ptr<buffered_channel<T>>`.
  266. [#wait_all_values]
  267. Given `wait_all_values_source()`, it's straightforward to implement
  268. `wait_all_values()`:
  269. [wait_all_values]
  270. It might be called like this:
  271. [wait_all_values_ex]
  272. As you can see from the loop in `wait_all_values()`, instead of requiring its
  273. caller to count values, we define `wait_all_values_source()` to [member_link
  274. buffered_channel..close] the queue when done. But how do we do that? Each
  275. producer fiber is independent. It has no idea whether it is the last one to
  276. [member_link buffered_channel..push] a value.
  277. [#wait_nqueue]
  278. We can address that problem with a counting façade for the
  279. `queue<>`. In fact, our façade need only support the producer end of
  280. the queue.
  281. [wait_nqueue]
  282. [#wait_all_values_source]
  283. Armed with `nqueue<>`, we can implement `wait_all_values_source()`. It
  284. starts just like [link wait_first_value `wait_first_value()`]. The difference
  285. is that we wrap the `queue<T>` with an `nqueue<T>` to pass to
  286. the producer fibers.
  287. Then, of course, instead of popping the first value, closing the queue and
  288. returning it, we simply return the `shared_ptr<queue<T>>`.
  289. [wait_all_values_source]
  290. For example:
  291. [wait_all_values_source_ex]
  292. [#wait_all_values_impl]
  293. `wait_all_values_impl()` really is just like [link wait_first_value_impl
  294. `wait_first_value_impl()`] except for the use of `nqueue<T>` rather than
  295. `queue<T>`:
  296. [wait_all_values_impl]
  297. [endsect]
  298. [section when_all until first exception]
  299. Naturally, just as with [link wait_first_outcome `wait_first_outcome()`], we
  300. can elaborate [link wait_all_values `wait_all_values()`] and [link
  301. wait_all_values_source `wait_all_values_source()`] by passing `future< T >`
  302. instead of plain `T`.
  303. [#wait_all_until_error]
  304. `wait_all_until_error()` pops that `future< T >` and calls its [member_link
  305. future..get]:
  306. [wait_all_until_error]
  307. For example:
  308. [wait_all_until_error_ex]
  309. [#wait_all_until_error_source]
  310. Naturally this complicates the API for `wait_all_until_error_source()`. The
  311. caller must both retrieve a `future< T >` and call its `get()` method. It would,
  312. of course, be possible to return a façade over the consumer end of the
  313. queue that would implicitly perform the `get()` and return a simple `T` (or
  314. throw).
  315. The implementation is just as you would expect. Notice, however, that we can
  316. reuse [link wait_first_outcome_impl `wait_first_outcome_impl()`], passing the
  317. `nqueue<T>` rather than `queue<T>`.
  318. [wait_all_until_error_source]
  319. For example:
  320. [wait_all_until_error_source_ex]
  321. [endsect]
  322. [section wait_all, collecting all exceptions]
  323. [#wait_all_collect_errors]
  324. Given [link wait_all_until_error_source `wait_all_until_error_source()`], it
  325. might be more reasonable to make a `wait_all_...()` that collects ['all]
  326. errors instead of presenting only the first:
  327. [wait_all_collect_errors]
  328. The implementation is a simple variation on [link wait_first_success
  329. `wait_first_success()`], using the same [link exception_list `exception_list`]
  330. exception class.
  331. [endsect]
  332. [section when_all, heterogeneous types]
  333. But what about the case when we must wait for all results of different types?
  334. We can present an API that is frankly quite cool. Consider a sample struct:
  335. [wait_Data]
  336. Let's fill its members from task functions all running concurrently:
  337. [wait_all_members_data_ex]
  338. Note that for this case, we abandon the notion of capturing the earliest
  339. result first, and so on: we must fill exactly the passed struct in
  340. left-to-right order.
  341. That permits a beautifully simple implementation:
  342. [wait_all_members]
  343. [wait_all_members_get]
  344. It is tempting to try to implement `wait_all_members()` as a one-liner like
  345. this:
  346. return Result{ boost::fibers::async(functions).get()... };
  347. The trouble with this tactic is that it would serialize all the task
  348. functions. The runtime makes a single pass through `functions`, calling
  349. [ns_function_link fibers..async] for each and then immediately calling
  350. [member_link future..get] on its returned `future<>`. That blocks the implicit
  351. loop. The above is almost equivalent to writing:
  352. return Result{ functions()... };
  353. in which, of course, there is no concurrency at all.
  354. Passing the argument pack through a function-call boundary
  355. (`wait_all_members_get()`) forces the runtime to make ['two] passes: one in
  356. `wait_all_members()` to collect the `future<>`s from all the `async()` calls,
  357. the second in `wait_all_members_get()` to fetch each of the results.
  358. As noted in comments, within the `wait_all_members_get()` parameter pack
  359. expansion pass, the blocking behavior of `get()` becomes irrelevant. Along the
  360. way, we will hit the `get()` for the slowest task function; after that every
  361. subsequent `get()` will complete in trivial time.
  362. By the way, we could also use this same API to fill a vector or other
  363. collection:
  364. [wait_all_members_vector_ex]
  365. [endsect]
  366. [endsect][/ when_all]
  367. [endsect][/ outermost]