wait_stuff.cpp 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984
  1. // Copyright Nat Goodspeed 2015.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. #include <algorithm>
  7. #include <cassert>
  8. #include <chrono>
  9. #include <iostream>
  10. #include <memory>
  11. #include <sstream>
  12. #include <string>
  13. #include <type_traits>
  14. #include <utility>
  15. #include <vector>
  16. #include <boost/fiber/all.hpp>
  17. #include <boost/variant/variant.hpp>
  18. #include <boost/variant/get.hpp>
  19. // These are wait_something() functions rather than when_something()
  20. // functions. A big part of the point of the Fiber library is to model
  21. // sequencing using the processor's instruction pointer rather than chains of
  22. // callbacks. The future-oriented when_all() / when_any() functions are still
  23. // based on chains of callbacks. With Fiber, we can do better.
  24. /*****************************************************************************
  25. * Verbose
  26. *****************************************************************************/
  27. class Verbose {
  28. public:
  29. Verbose( std::string const& d):
  30. desc( d) {
  31. std::cout << desc << " start" << std::endl;
  32. }
  33. ~Verbose() {
  34. std::cout << desc << " stop" << std::endl;
  35. }
  36. Verbose( Verbose const&) = delete;
  37. Verbose & operator=( Verbose const&) = delete;
  38. private:
  39. const std::string desc;
  40. };
  41. /*****************************************************************************
  42. * Runner and Example
  43. *****************************************************************************/
  44. // collect and ultimately run every Example
  45. class Runner {
  46. typedef std::vector< std::pair< std::string, std::function< void() > > > function_list;
  47. public:
  48. void add( std::string const& desc, std::function< void() > const& func) {
  49. functions_.push_back( function_list::value_type( desc, func) );
  50. }
  51. void run() {
  52. for ( function_list::value_type const& pair : functions_) {
  53. Verbose v( pair.first);
  54. pair.second();
  55. }
  56. }
  57. private:
  58. function_list functions_;
  59. };
  60. Runner runner;
  61. // Example allows us to embed Runner::add() calls at module scope
  62. struct Example {
  63. Example( Runner & runner, std::string const& desc, std::function< void() > const& func) {
  64. runner.add( desc, func);
  65. }
  66. };
  67. /*****************************************************************************
  68. * example task functions
  69. *****************************************************************************/
  70. //[wait_sleeper
  71. template< typename T >
  72. T sleeper_impl( T item, int ms, bool thrw = false) {
  73. std::ostringstream descb, funcb;
  74. descb << item;
  75. std::string desc( descb.str() );
  76. funcb << " sleeper(" << item << ")";
  77. Verbose v( funcb.str() );
  78. boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) );
  79. if ( thrw) {
  80. throw std::runtime_error( desc);
  81. }
  82. return item;
  83. }
  84. //]
  85. inline
  86. std::string sleeper( std::string const& item, int ms, bool thrw = false) {
  87. return sleeper_impl( item, ms, thrw);
  88. }
  89. inline
  90. double sleeper( double item, int ms, bool thrw = false) {
  91. return sleeper_impl( item, ms, thrw);
  92. }
  93. inline
  94. int sleeper(int item, int ms, bool thrw = false) {
  95. return sleeper_impl( item, ms, thrw);
  96. }
  97. /*****************************************************************************
  98. * Done
  99. *****************************************************************************/
  100. //[wait_done
  101. // Wrap canonical pattern for condition_variable + bool flag
  102. struct Done {
  103. private:
  104. boost::fibers::condition_variable cond;
  105. boost::fibers::mutex mutex;
  106. bool ready = false;
  107. public:
  108. typedef std::shared_ptr< Done > ptr;
  109. void wait() {
  110. std::unique_lock< boost::fibers::mutex > lock( mutex);
  111. cond.wait( lock, [this](){ return ready; });
  112. }
  113. void notify() {
  114. {
  115. std::unique_lock< boost::fibers::mutex > lock( mutex);
  116. ready = true;
  117. } // release mutex
  118. cond.notify_one();
  119. }
  120. };
  121. //]
  122. /*****************************************************************************
  123. * when_any, simple completion
  124. *****************************************************************************/
  125. //[wait_first_simple_impl
  126. // Degenerate case: when there are no functions to wait for, return
  127. // immediately.
  128. void wait_first_simple_impl( Done::ptr) {
  129. }
  130. // When there's at least one function to wait for, launch it and recur to
  131. // process the rest.
  132. template< typename Fn, typename ... Fns >
  133. void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) {
  134. boost::fibers::fiber( [done, function](){
  135. function();
  136. done->notify();
  137. }).detach();
  138. wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
  139. }
  140. //]
  141. // interface function: instantiate Done, launch tasks, wait for Done
  142. //[wait_first_simple
  143. template< typename ... Fns >
  144. void wait_first_simple( Fns && ... functions) {
  145. // Use shared_ptr because each function's fiber will bind it separately,
  146. // and we're going to return before the last of them completes.
  147. auto done( std::make_shared< Done >() );
  148. wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
  149. done->wait();
  150. }
  151. //]
  152. // example usage
  153. Example wfs( runner, "wait_first_simple()", [](){
  154. //[wait_first_simple_ex
  155. wait_first_simple(
  156. [](){ sleeper("wfs_long", 150); },
  157. [](){ sleeper("wfs_medium", 100); },
  158. [](){ sleeper("wfs_short", 50); });
  159. //]
  160. });
  161. /*****************************************************************************
  162. * when_any, return value
  163. *****************************************************************************/
  164. // When there's only one function, call this overload
  165. //[wait_first_value_impl
  166. template< typename T, typename Fn >
  167. void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
  168. Fn && function) {
  169. boost::fibers::fiber( [chan, function](){
  170. // Ignore channel_op_status returned by push():
  171. // might be closed; we simply don't care.
  172. chan->push( function() );
  173. }).detach();
  174. }
  175. //]
  176. // When there are two or more functions, call this overload
  177. template< typename T, typename Fn0, typename Fn1, typename ... Fns >
  178. void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
  179. Fn0 && function0,
  180. Fn1 && function1,
  181. Fns && ... functions) {
  182. // process the first function using the single-function overload
  183. wait_first_value_impl< T >( chan,
  184. std::forward< Fn0 >( function0) );
  185. // then recur to process the rest
  186. wait_first_value_impl< T >( chan,
  187. std::forward< Fn1 >( function1),
  188. std::forward< Fns >( functions) ... );
  189. }
  190. //[wait_first_value
  191. // Assume that all passed functions have the same return type. The return type
  192. // of wait_first_value() is the return type of the first passed function. It is
  193. // simply invalid to pass NO functions.
  194. template< typename Fn, typename ... Fns >
  195. typename std::result_of< Fn() >::type
  196. wait_first_value( Fn && function, Fns && ... functions) {
  197. typedef typename std::result_of< Fn() >::type return_t;
  198. typedef boost::fibers::buffered_channel< return_t > channel_t;
  199. auto chanp( std::make_shared< channel_t >( 64) );
  200. // launch all the relevant fibers
  201. wait_first_value_impl< return_t >( chanp,
  202. std::forward< Fn >( function),
  203. std::forward< Fns >( functions) ... );
  204. // retrieve the first value
  205. return_t value( chanp->value_pop() );
  206. // close the channel: no subsequent push() has to succeed
  207. chanp->close();
  208. return value;
  209. }
  210. //]
  211. // example usage
  212. Example wfv( runner, "wait_first_value()", [](){
  213. //[wait_first_value_ex
  214. std::string result = wait_first_value(
  215. [](){ return sleeper("wfv_third", 150); },
  216. [](){ return sleeper("wfv_second", 100); },
  217. [](){ return sleeper("wfv_first", 50); });
  218. std::cout << "wait_first_value() => " << result << std::endl;
  219. assert(result == "wfv_first");
  220. //]
  221. });
  222. /*****************************************************************************
  223. * when_any, produce first outcome, whether result or exception
  224. *****************************************************************************/
  225. // When there's only one function, call this overload.
  226. //[wait_first_outcome_impl
  227. template< typename T, typename CHANP, typename Fn >
  228. void wait_first_outcome_impl( CHANP chan, Fn && function) {
  229. boost::fibers::fiber(
  230. // Use std::bind() here for C++11 compatibility. C++11 lambda capture
  231. // can't move a move-only Fn type, but bind() can. Let bind() move the
  232. // channel pointer and the function into the bound object, passing
  233. // references into the lambda.
  234. std::bind(
  235. []( CHANP & chan,
  236. typename std::decay< Fn >::type & function) {
  237. // Instantiate a packaged_task to capture any exception thrown by
  238. // function.
  239. boost::fibers::packaged_task< T() > task( function);
  240. // Immediately run this packaged_task on same fiber. We want
  241. // function() to have completed BEFORE we push the future.
  242. task();
  243. // Pass the corresponding future to consumer. Ignore
  244. // channel_op_status returned by push(): might be closed; we
  245. // simply don't care.
  246. chan->push( task.get_future() );
  247. },
  248. chan,
  249. std::forward< Fn >( function)
  250. )).detach();
  251. }
  252. //]
  253. // When there are two or more functions, call this overload
  254. template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns >
  255. void wait_first_outcome_impl( CHANP chan,
  256. Fn0 && function0,
  257. Fn1 && function1,
  258. Fns && ... functions) {
  259. // process the first function using the single-function overload
  260. wait_first_outcome_impl< T >( chan,
  261. std::forward< Fn0 >( function0) );
  262. // then recur to process the rest
  263. wait_first_outcome_impl< T >( chan,
  264. std::forward< Fn1 >( function1),
  265. std::forward< Fns >( functions) ... );
  266. }
  267. // Assume that all passed functions have the same return type. The return type
  268. // of wait_first_outcome() is the return type of the first passed function. It is
  269. // simply invalid to pass NO functions.
  270. //[wait_first_outcome
  271. template< typename Fn, typename ... Fns >
  272. typename std::result_of< Fn() >::type
  273. wait_first_outcome( Fn && function, Fns && ... functions) {
  274. // In this case, the value we pass through the channel is actually a
  275. // future -- which is already ready. future can carry either a value or an
  276. // exception.
  277. typedef typename std::result_of< Fn() >::type return_t;
  278. typedef boost::fibers::future< return_t > future_t;
  279. typedef boost::fibers::buffered_channel< future_t > channel_t;
  280. auto chanp(std::make_shared< channel_t >( 64) );
  281. // launch all the relevant fibers
  282. wait_first_outcome_impl< return_t >( chanp,
  283. std::forward< Fn >( function),
  284. std::forward< Fns >( functions) ... );
  285. // retrieve the first future
  286. future_t future( chanp->value_pop() );
  287. // close the channel: no subsequent push() has to succeed
  288. chanp->close();
  289. // either return value or throw exception
  290. return future.get();
  291. }
  292. //]
  293. // example usage
  294. Example wfo( runner, "wait_first_outcome()", [](){
  295. //[wait_first_outcome_ex
  296. std::string result = wait_first_outcome(
  297. [](){ return sleeper("wfos_first", 50); },
  298. [](){ return sleeper("wfos_second", 100); },
  299. [](){ return sleeper("wfos_third", 150); });
  300. std::cout << "wait_first_outcome(success) => " << result << std::endl;
  301. assert(result == "wfos_first");
  302. std::string thrown;
  303. try {
  304. result = wait_first_outcome(
  305. [](){ return sleeper("wfof_first", 50, true); },
  306. [](){ return sleeper("wfof_second", 100); },
  307. [](){ return sleeper("wfof_third", 150); });
  308. } catch ( std::exception const& e) {
  309. thrown = e.what();
  310. }
  311. std::cout << "wait_first_outcome(fail) threw '" << thrown
  312. << "'" << std::endl;
  313. assert(thrown == "wfof_first");
  314. //]
  315. });
  316. /*****************************************************************************
  317. * when_any, collect exceptions until success; throw exception_list if no
  318. * success
  319. *****************************************************************************/
  320. // define an exception to aggregate exception_ptrs; prefer
  321. // std::exception_list (N4407 et al.) once that becomes available
  322. //[exception_list
  323. class exception_list : public std::runtime_error {
  324. public:
  325. exception_list( std::string const& what) :
  326. std::runtime_error( what) {
  327. }
  328. typedef std::vector< std::exception_ptr > bundle_t;
  329. // N4407 proposed std::exception_list API
  330. typedef bundle_t::const_iterator iterator;
  331. std::size_t size() const noexcept {
  332. return bundle_.size();
  333. }
  334. iterator begin() const noexcept {
  335. return bundle_.begin();
  336. }
  337. iterator end() const noexcept {
  338. return bundle_.end();
  339. }
  340. // extension to populate
  341. void add( std::exception_ptr ep) {
  342. bundle_.push_back( ep);
  343. }
  344. private:
  345. bundle_t bundle_;
  346. };
  347. //]
  348. // Assume that all passed functions have the same return type. The return type
  349. // of wait_first_success() is the return type of the first passed function. It is
  350. // simply invalid to pass NO functions.
  351. //[wait_first_success
  352. template< typename Fn, typename ... Fns >
  353. typename std::result_of< Fn() >::type
  354. wait_first_success( Fn && function, Fns && ... functions) {
  355. std::size_t count( 1 + sizeof ... ( functions) );
  356. // In this case, the value we pass through the channel is actually a
  357. // future -- which is already ready. future can carry either a value or an
  358. // exception.
  359. typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t;
  360. typedef boost::fibers::future< return_t > future_t;
  361. typedef boost::fibers::buffered_channel< future_t > channel_t;
  362. auto chanp( std::make_shared< channel_t >( 64) );
  363. // launch all the relevant fibers
  364. wait_first_outcome_impl< return_t >( chanp,
  365. std::forward< Fn >( function),
  366. std::forward< Fns >( functions) ... );
  367. // instantiate exception_list, just in case
  368. exception_list exceptions("wait_first_success() produced only errors");
  369. // retrieve up to 'count' results -- but stop there!
  370. for ( std::size_t i = 0; i < count; ++i) {
  371. // retrieve the next future
  372. future_t future( chanp->value_pop() );
  373. // retrieve exception_ptr if any
  374. std::exception_ptr error( future.get_exception_ptr() );
  375. // if no error, then yay, return value
  376. if ( ! error) {
  377. // close the channel: no subsequent push() has to succeed
  378. chanp->close();
  379. // show caller the value we got
  380. return future.get();
  381. }
  382. // error is non-null: collect
  383. exceptions.add( error);
  384. }
  385. // We only arrive here when every passed function threw an exception.
  386. // Throw our collection to inform caller.
  387. throw exceptions;
  388. }
  389. //]
  390. // example usage
  391. Example wfss( runner, "wait_first_success()", [](){
  392. //[wait_first_success_ex
  393. std::string result = wait_first_success(
  394. [](){ return sleeper("wfss_first", 50, true); },
  395. [](){ return sleeper("wfss_second", 100); },
  396. [](){ return sleeper("wfss_third", 150); });
  397. std::cout << "wait_first_success(success) => " << result << std::endl;
  398. assert(result == "wfss_second");
  399. //]
  400. std::string thrown;
  401. std::size_t count = 0;
  402. try {
  403. result = wait_first_success(
  404. [](){ return sleeper("wfsf_first", 50, true); },
  405. [](){ return sleeper("wfsf_second", 100, true); },
  406. [](){ return sleeper("wfsf_third", 150, true); });
  407. } catch ( exception_list const& e) {
  408. thrown = e.what();
  409. count = e.size();
  410. } catch ( std::exception const& e) {
  411. thrown = e.what();
  412. }
  413. std::cout << "wait_first_success(fail) threw '" << thrown << "': "
  414. << count << " errors" << std::endl;
  415. assert(thrown == "wait_first_success() produced only errors");
  416. assert(count == 3);
  417. });
  418. /*****************************************************************************
  419. * when_any, heterogeneous
  420. *****************************************************************************/
  421. //[wait_first_value_het
  422. // No need to break out the first Fn for interface function: let the compiler
  423. // complain if empty.
  424. // Our functions have different return types, and we might have to return any
  425. // of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in
  426. // parameter pack.
  427. template< typename ... Fns >
  428. boost::variant< typename std::result_of< Fns() >::type ... >
  429. wait_first_value_het( Fns && ... functions) {
  430. // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above.
  431. typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t;
  432. typedef boost::fibers::buffered_channel< return_t > channel_t;
  433. auto chanp( std::make_shared< channel_t >( 64) );
  434. // launch all the relevant fibers
  435. wait_first_value_impl< return_t >( chanp,
  436. std::forward< Fns >( functions) ... );
  437. // retrieve the first value
  438. return_t value( chanp->value_pop() );
  439. // close the channel: no subsequent push() has to succeed
  440. chanp->close();
  441. return value;
  442. }
  443. //]
  444. // example usage
  445. Example wfvh( runner, "wait_first_value_het()", [](){
  446. //[wait_first_value_het_ex
  447. boost::variant< std::string, double, int > result =
  448. wait_first_value_het(
  449. [](){ return sleeper("wfvh_third", 150); },
  450. [](){ return sleeper(3.14, 100); },
  451. [](){ return sleeper(17, 50); });
  452. std::cout << "wait_first_value_het() => " << result << std::endl;
  453. assert(boost::get< int >( result) == 17);
  454. //]
  455. });
  456. /*****************************************************************************
  457. * when_all, simple completion
  458. *****************************************************************************/
  459. // Degenerate case: when there are no functions to wait for, return
  460. // immediately.
  461. void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) {
  462. }
  463. // When there's at least one function to wait for, launch it and recur to
  464. // process the rest.
  465. //[wait_all_simple_impl
  466. template< typename Fn, typename ... Fns >
  467. void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier,
  468. Fn && function, Fns && ... functions) {
  469. boost::fibers::fiber(
  470. std::bind(
  471. []( std::shared_ptr< boost::fibers::barrier > & barrier,
  472. typename std::decay< Fn >::type & function) mutable {
  473. function();
  474. barrier->wait();
  475. },
  476. barrier,
  477. std::forward< Fn >( function)
  478. )).detach();
  479. wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
  480. }
  481. //]
  482. // interface function: instantiate barrier, launch tasks, wait for barrier
  483. //[wait_all_simple
  484. template< typename ... Fns >
  485. void wait_all_simple( Fns && ... functions) {
  486. std::size_t count( sizeof ... ( functions) );
  487. // Initialize a barrier(count+1) because we'll immediately wait on it. We
  488. // don't want to wake up until 'count' more fibers wait on it. Even though
  489. // we'll stick around until the last of them completes, use shared_ptr
  490. // anyway because it's easier to be confident about lifespan issues.
  491. auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) );
  492. wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
  493. barrier->wait();
  494. }
  495. //]
  496. // example usage
  497. Example was( runner, "wait_all_simple()", [](){
  498. //[wait_all_simple_ex
  499. wait_all_simple(
  500. [](){ sleeper("was_long", 150); },
  501. [](){ sleeper("was_medium", 100); },
  502. [](){ sleeper("was_short", 50); });
  503. //]
  504. });
  505. /*****************************************************************************
  506. * when_all, return values
  507. *****************************************************************************/
  508. //[wait_nchannel
  509. // Introduce a channel facade that closes the channel once a specific number
  510. // of items has been pushed. This allows an arbitrary consumer to read until
  511. // 'closed' without itself having to count items.
  512. template< typename T >
  513. class nchannel {
  514. public:
  515. nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
  516. std::size_t lm):
  517. chan_( chan),
  518. limit_( lm) {
  519. assert(chan_);
  520. if ( 0 == limit_) {
  521. chan_->close();
  522. }
  523. }
  524. boost::fibers::channel_op_status push( T && va) {
  525. boost::fibers::channel_op_status ok =
  526. chan_->push( std::forward< T >( va) );
  527. if ( ok == boost::fibers::channel_op_status::success &&
  528. --limit_ == 0) {
  529. // after the 'limit_'th successful push, close the channel
  530. chan_->close();
  531. }
  532. return ok;
  533. }
  534. private:
  535. std::shared_ptr< boost::fibers::buffered_channel< T > > chan_;
  536. std::size_t limit_;
  537. };
  538. //]
  539. // When there's only one function, call this overload
  540. //[wait_all_values_impl
  541. template< typename T, typename Fn >
  542. void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
  543. Fn && function) {
  544. boost::fibers::fiber( [chan, function](){
  545. chan->push(function());
  546. }).detach();
  547. }
  548. //]
  549. // When there are two or more functions, call this overload
  550. template< typename T, typename Fn0, typename Fn1, typename ... Fns >
  551. void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
  552. Fn0 && function0,
  553. Fn1 && function1,
  554. Fns && ... functions) {
  555. // process the first function using the single-function overload
  556. wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) );
  557. // then recur to process the rest
  558. wait_all_values_impl< T >( chan,
  559. std::forward< Fn1 >( function1),
  560. std::forward< Fns >( functions) ... );
  561. }
  562. //[wait_all_values_source
  563. // Return a shared_ptr<buffered_channel<T>> from which the caller can
  564. // retrieve each new result as it arrives, until 'closed'.
  565. template< typename Fn, typename ... Fns >
  566. std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > >
  567. wait_all_values_source( Fn && function, Fns && ... functions) {
  568. std::size_t count( 1 + sizeof ... ( functions) );
  569. typedef typename std::result_of< Fn() >::type return_t;
  570. typedef boost::fibers::buffered_channel< return_t > channel_t;
  571. // make the channel
  572. auto chanp( std::make_shared< channel_t >( 64) );
  573. // and make an nchannel facade to close it after 'count' items
  574. auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) );
  575. // pass that nchannel facade to all the relevant fibers
  576. wait_all_values_impl< return_t >( ncp,
  577. std::forward< Fn >( function),
  578. std::forward< Fns >( functions) ... );
  579. // then return the channel for consumer
  580. return chanp;
  581. }
  582. //]
  583. // When all passed functions have completed, return vector<T> containing
  584. // collected results. Assume that all passed functions have the same return
  585. // type. It is simply invalid to pass NO functions.
  586. //[wait_all_values
  587. template< typename Fn, typename ... Fns >
  588. std::vector< typename std::result_of< Fn() >::type >
  589. wait_all_values( Fn && function, Fns && ... functions) {
  590. std::size_t count( 1 + sizeof ... ( functions) );
  591. typedef typename std::result_of< Fn() >::type return_t;
  592. typedef std::vector< return_t > vector_t;
  593. vector_t results;
  594. results.reserve( count);
  595. // get channel
  596. std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan =
  597. wait_all_values_source( std::forward< Fn >( function),
  598. std::forward< Fns >( functions) ... );
  599. // fill results vector
  600. return_t value;
  601. while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
  602. results.push_back( value);
  603. }
  604. // return vector to caller
  605. return results;
  606. }
  607. //]
  608. Example wav( runner, "wait_all_values()", [](){
  609. //[wait_all_values_source_ex
  610. std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan =
  611. wait_all_values_source(
  612. [](){ return sleeper("wavs_third", 150); },
  613. [](){ return sleeper("wavs_second", 100); },
  614. [](){ return sleeper("wavs_first", 50); });
  615. std::string value;
  616. while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
  617. std::cout << "wait_all_values_source() => '" << value
  618. << "'" << std::endl;
  619. }
  620. //]
  621. //[wait_all_values_ex
  622. std::vector< std::string > values =
  623. wait_all_values(
  624. [](){ return sleeper("wav_late", 150); },
  625. [](){ return sleeper("wav_middle", 100); },
  626. [](){ return sleeper("wav_early", 50); });
  627. //]
  628. std::cout << "wait_all_values() =>";
  629. for ( std::string const& v : values) {
  630. std::cout << " '" << v << "'";
  631. }
  632. std::cout << std::endl;
  633. });
  634. /*****************************************************************************
  635. * when_all, throw first exception
  636. *****************************************************************************/
  637. //[wait_all_until_error_source
  638. // Return a shared_ptr<buffered_channel<future<T>>> from which the caller can
  639. // get() each new result as it arrives, until 'closed'.
  640. template< typename Fn, typename ... Fns >
  641. std::shared_ptr<
  642. boost::fibers::buffered_channel<
  643. boost::fibers::future<
  644. typename std::result_of< Fn() >::type > > >
  645. wait_all_until_error_source( Fn && function, Fns && ... functions) {
  646. std::size_t count( 1 + sizeof ... ( functions) );
  647. typedef typename std::result_of< Fn() >::type return_t;
  648. typedef boost::fibers::future< return_t > future_t;
  649. typedef boost::fibers::buffered_channel< future_t > channel_t;
  650. // make the channel
  651. auto chanp( std::make_shared< channel_t >( 64) );
  652. // and make an nchannel facade to close it after 'count' items
  653. auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) );
  654. // pass that nchannel facade to all the relevant fibers
  655. wait_first_outcome_impl< return_t >( ncp,
  656. std::forward< Fn >( function),
  657. std::forward< Fns >( functions) ... );
  658. // then return the channel for consumer
  659. return chanp;
  660. }
  661. //]
  662. // When all passed functions have completed, return vector<T> containing
  663. // collected results, or throw the first exception thrown by any of the passed
  664. // functions. Assume that all passed functions have the same return type. It
  665. // is simply invalid to pass NO functions.
  666. //[wait_all_until_error
  667. template< typename Fn, typename ... Fns >
  668. std::vector< typename std::result_of< Fn() >::type >
  669. wait_all_until_error( Fn && function, Fns && ... functions) {
  670. std::size_t count( 1 + sizeof ... ( functions) );
  671. typedef typename std::result_of< Fn() >::type return_t;
  672. typedef typename boost::fibers::future< return_t > future_t;
  673. typedef std::vector< return_t > vector_t;
  674. vector_t results;
  675. results.reserve( count);
  676. // get channel
  677. std::shared_ptr<
  678. boost::fibers::buffered_channel< future_t > > chan(
  679. wait_all_until_error_source( std::forward< Fn >( function),
  680. std::forward< Fns >( functions) ... ) );
  681. // fill results vector
  682. future_t future;
  683. while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
  684. results.push_back( future.get() );
  685. }
  686. // return vector to caller
  687. return results;
  688. }
  689. //]
  690. Example waue( runner, "wait_all_until_error()", [](){
  691. //[wait_all_until_error_source_ex
  692. typedef boost::fibers::future< std::string > future_t;
  693. std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan =
  694. wait_all_until_error_source(
  695. [](){ return sleeper("wauess_third", 150); },
  696. [](){ return sleeper("wauess_second", 100); },
  697. [](){ return sleeper("wauess_first", 50); });
  698. future_t future;
  699. while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
  700. std::string value( future.get() );
  701. std::cout << "wait_all_until_error_source(success) => '" << value
  702. << "'" << std::endl;
  703. }
  704. //]
  705. chan = wait_all_until_error_source(
  706. [](){ return sleeper("wauesf_third", 150); },
  707. [](){ return sleeper("wauesf_second", 100, true); },
  708. [](){ return sleeper("wauesf_first", 50); });
  709. //[wait_all_until_error_ex
  710. std::string thrown;
  711. //<-
  712. try {
  713. while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
  714. std::string value( future.get() );
  715. std::cout << "wait_all_until_error_source(fail) => '" << value
  716. << "'" << std::endl;
  717. }
  718. } catch ( std::exception const& e) {
  719. thrown = e.what();
  720. }
  721. std::cout << "wait_all_until_error_source(fail) threw '" << thrown
  722. << "'" << std::endl;
  723. thrown.clear();
  724. //->
  725. try {
  726. std::vector< std::string > values = wait_all_until_error(
  727. [](){ return sleeper("waue_late", 150); },
  728. [](){ return sleeper("waue_middle", 100, true); },
  729. [](){ return sleeper("waue_early", 50); });
  730. //<-
  731. std::cout << "wait_all_until_error(fail) =>";
  732. for ( std::string const& v : values) {
  733. std::cout << " '" << v << "'";
  734. }
  735. std::cout << std::endl;
  736. //->
  737. } catch ( std::exception const& e) {
  738. thrown = e.what();
  739. }
  740. std::cout << "wait_all_until_error(fail) threw '" << thrown
  741. << "'" << std::endl;
  742. //]
  743. });
  744. /*****************************************************************************
  745. * when_all, collect exceptions
  746. *****************************************************************************/
  747. // When all passed functions have succeeded, return vector<T> containing
  748. // collected results, or throw exception_list containing all exceptions thrown
  749. // by any of the passed functions. Assume that all passed functions have the
  750. // same return type. It is simply invalid to pass NO functions.
  751. //[wait_all_collect_errors
  752. template< typename Fn, typename ... Fns >
  753. std::vector< typename std::result_of< Fn() >::type >
  754. wait_all_collect_errors( Fn && function, Fns && ... functions) {
  755. std::size_t count( 1 + sizeof ... ( functions) );
  756. typedef typename std::result_of< Fn() >::type return_t;
  757. typedef typename boost::fibers::future< return_t > future_t;
  758. typedef std::vector< return_t > vector_t;
  759. vector_t results;
  760. results.reserve( count);
  761. exception_list exceptions("wait_all_collect_errors() exceptions");
  762. // get channel
  763. std::shared_ptr<
  764. boost::fibers::buffered_channel< future_t > > chan(
  765. wait_all_until_error_source( std::forward< Fn >( function),
  766. std::forward< Fns >( functions) ... ) );
  767. // fill results and/or exceptions vectors
  768. future_t future;
  769. while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
  770. std::exception_ptr exp = future.get_exception_ptr();
  771. if ( ! exp) {
  772. results.push_back( future.get() );
  773. } else {
  774. exceptions.add( exp);
  775. }
  776. }
  777. // if there were any exceptions, throw
  778. if ( exceptions.size() ) {
  779. throw exceptions;
  780. }
  781. // no exceptions: return vector to caller
  782. return results;
  783. }
  784. //]
  785. Example wace( runner, "wait_all_collect_errors()", [](){
  786. std::vector< std::string > values = wait_all_collect_errors(
  787. [](){ return sleeper("waces_late", 150); },
  788. [](){ return sleeper("waces_middle", 100); },
  789. [](){ return sleeper("waces_early", 50); });
  790. std::cout << "wait_all_collect_errors(success) =>";
  791. for ( std::string const& v : values) {
  792. std::cout << " '" << v << "'";
  793. }
  794. std::cout << std::endl;
  795. std::string thrown;
  796. std::size_t errors = 0;
  797. try {
  798. values = wait_all_collect_errors(
  799. [](){ return sleeper("wacef_late", 150, true); },
  800. [](){ return sleeper("wacef_middle", 100, true); },
  801. [](){ return sleeper("wacef_early", 50); });
  802. std::cout << "wait_all_collect_errors(fail) =>";
  803. for ( std::string const& v : values) {
  804. std::cout << " '" << v << "'";
  805. }
  806. std::cout << std::endl;
  807. } catch ( exception_list const& e) {
  808. thrown = e.what();
  809. errors = e.size();
  810. } catch ( std::exception const& e) {
  811. thrown = e.what();
  812. }
  813. std::cout << "wait_all_collect_errors(fail) threw '" << thrown
  814. << "': " << errors << " errors" << std::endl;
  815. });
  816. /*****************************************************************************
  817. * when_all, heterogeneous
  818. *****************************************************************************/
  819. //[wait_all_members_get
  820. template< typename Result, typename ... Futures >
  821. Result wait_all_members_get( Futures && ... futures) {
  822. // Fetch the results from the passed futures into Result's initializer
  823. // list. It's true that the get() calls here will block the implicit
  824. // iteration over futures -- but that doesn't matter because we won't be
  825. // done until the slowest of them finishes anyway. As results are
  826. // processed in argument-list order rather than order of completion, the
  827. // leftmost get() to throw an exception will cause that exception to
  828. // propagate to the caller.
  829. return Result{ futures.get() ... };
  830. }
  831. //]
  832. //[wait_all_members
  833. // Explicitly pass Result. This can be any type capable of being initialized
  834. // from the results of the passed functions, such as a struct.
  835. template< typename Result, typename ... Fns >
  836. Result wait_all_members( Fns && ... functions) {
  837. // Run each of the passed functions on a separate fiber, passing all their
  838. // futures to helper function for processing.
  839. return wait_all_members_get< Result >(
  840. boost::fibers::async( std::forward< Fns >( functions) ) ... );
  841. }
  842. //]
  843. // used by following example
  844. //[wait_Data
  845. struct Data {
  846. std::string str;
  847. double inexact;
  848. int exact;
  849. friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=;
  850. ...*/
  851. //<-
  852. {
  853. return out << "Data{str='" << data.str << "', inexact=" << data.inexact
  854. << ", exact=" << data.exact << "}";
  855. }
  856. //->
  857. };
  858. //]
  859. // example usage
  860. Example wam( runner, "wait_all_members()", [](){
  861. //[wait_all_members_data_ex
  862. Data data = wait_all_members< Data >(
  863. [](){ return sleeper("wams_left", 100); },
  864. [](){ return sleeper(3.14, 150); },
  865. [](){ return sleeper(17, 50); });
  866. std::cout << "wait_all_members<Data>(success) => " << data << std::endl;
  867. //]
  868. std::string thrown;
  869. try {
  870. data = wait_all_members< Data >(
  871. [](){ return sleeper("wamf_left", 100, true); },
  872. [](){ return sleeper(3.14, 150); },
  873. [](){ return sleeper(17, 50, true); });
  874. std::cout << "wait_all_members<Data>(fail) => " << data << std::endl;
  875. } catch ( std::exception const& e) {
  876. thrown = e.what();
  877. }
  878. std::cout << "wait_all_members<Data>(fail) threw '" << thrown
  879. << '"' << std::endl;
  880. //[wait_all_members_vector_ex
  881. // If we don't care about obtaining results as soon as they arrive, and we
  882. // prefer a result vector in passed argument order rather than completion
  883. // order, wait_all_members() is another possible implementation of
  884. // wait_all_until_error().
  885. auto strings = wait_all_members< std::vector< std::string > >(
  886. [](){ return sleeper("wamv_left", 150); },
  887. [](){ return sleeper("wamv_middle", 100); },
  888. [](){ return sleeper("wamv_right", 50); });
  889. std::cout << "wait_all_members<vector>() =>";
  890. for ( std::string const& str : strings) {
  891. std::cout << " '" << str << "'";
  892. }
  893. std::cout << std::endl;
  894. //]
  895. });
  896. /*****************************************************************************
  897. * main()
  898. *****************************************************************************/
  899. int main( int argc, char *argv[]) {
  900. runner.run();
  901. std::cout << "done." << std::endl;
  902. return EXIT_SUCCESS;
  903. }