work_sharing.cpp 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. // Copyright Nat Goodspeed + Oliver Kowalke 2015.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. #include <chrono>
  6. #include <condition_variable>
  7. #include <cstddef>
  8. #include <deque>
  9. #include <iomanip>
  10. #include <iostream>
  11. #include <mutex>
  12. #include <sstream>
  13. #include <string>
  14. #include <thread>
  15. #include <boost/assert.hpp>
  16. #include <boost/fiber/all.hpp>
  17. #include <boost/fiber/detail/thread_barrier.hpp>
  18. static std::size_t fiber_count{ 0 };
  19. static std::mutex mtx_count{};
  20. static boost::fibers::condition_variable_any cnd_count{};
  21. typedef std::unique_lock< std::mutex > lock_type;
  22. /*****************************************************************************
  23. * example fiber function
  24. *****************************************************************************/
  25. //[fiber_fn_ws
  26. void whatevah( char me) {
  27. try {
  28. std::thread::id my_thread = std::this_thread::get_id(); /*< get ID of initial thread >*/
  29. {
  30. std::ostringstream buffer;
  31. buffer << "fiber " << me << " started on thread " << my_thread << '\n';
  32. std::cout << buffer.str() << std::flush;
  33. }
  34. for ( unsigned i = 0; i < 10; ++i) { /*< loop ten times >*/
  35. boost::this_fiber::yield(); /*< yield to other fibers >*/
  36. std::thread::id new_thread = std::this_thread::get_id(); /*< get ID of current thread >*/
  37. if ( new_thread != my_thread) { /*< test if fiber was migrated to another thread >*/
  38. my_thread = new_thread;
  39. std::ostringstream buffer;
  40. buffer << "fiber " << me << " switched to thread " << my_thread << '\n';
  41. std::cout << buffer.str() << std::flush;
  42. }
  43. }
  44. } catch ( ... ) {
  45. }
  46. lock_type lk( mtx_count);
  47. if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
  48. lk.unlock();
  49. cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
  50. }
  51. }
  52. //]
  53. /*****************************************************************************
  54. * example thread function
  55. *****************************************************************************/
  56. //[thread_fn_ws
  57. void thread( boost::fibers::detail::thread_barrier * b) {
  58. std::ostringstream buffer;
  59. buffer << "thread started " << std::this_thread::get_id() << std::endl;
  60. std::cout << buffer.str() << std::flush;
  61. boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /*<
  62. Install the scheduling algorithm `boost::fibers::algo::shared_work` in order to
  63. join the work sharing.
  64. >*/
  65. b->wait(); /*< sync with other threads: allow them to start processing >*/
  66. lock_type lk( mtx_count);
  67. cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*<
  68. Suspend main fiber and resume worker fibers in the meanwhile.
  69. Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
  70. if all worker fibers are complete.
  71. >*/
  72. BOOST_ASSERT( 0 == fiber_count);
  73. }
  74. //]
  75. /*****************************************************************************
  76. * main()
  77. *****************************************************************************/
  78. int main( int argc, char *argv[]) {
  79. std::cout << "main thread started " << std::this_thread::get_id() << std::endl;
  80. //[main_ws
  81. boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /*<
  82. Install the scheduling algorithm `boost::fibers::algo::shared_work` in the main thread
  83. too, so each new fiber gets launched into the shared pool.
  84. >*/
  85. for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { /*<
  86. Launch a number of worker fibers; each worker fiber picks up a character
  87. that is passed as parameter to fiber-function `whatevah`.
  88. Each worker fiber gets detached.
  89. >*/
  90. boost::fibers::fiber([c](){ whatevah( c); }).detach();
  91. ++fiber_count; /*< Increment fiber counter for each new fiber. >*/
  92. }
  93. boost::fibers::detail::thread_barrier b( 4);
  94. std::thread threads[] = { /*<
  95. Launch a couple of threads that join the work sharing.
  96. >*/
  97. std::thread( thread, & b),
  98. std::thread( thread, & b),
  99. std::thread( thread, & b)
  100. };
  101. b.wait(); /*< sync with other threads: allow them to start processing >*/
  102. {
  103. lock_type/*< `lock_type` is typedef'ed as __unique_lock__< [@http://en.cppreference.com/w/cpp/thread/mutex `std::mutex`] > >*/ lk( mtx_count);
  104. cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*<
  105. Suspend main fiber and resume worker fibers in the meanwhile.
  106. Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
  107. if all worker fibers are complete.
  108. >*/
  109. } /*<
  110. Releasing lock of mtx_count is required before joining the threads, otherwise
  111. the other threads would be blocked inside condition_variable::wait() and
  112. would never return (deadlock).
  113. >*/
  114. BOOST_ASSERT( 0 == fiber_count);
  115. for ( std::thread & t : threads) { /*< wait for threads to terminate >*/
  116. t.join();
  117. }
  118. //]
  119. std::cout << "done." << std::endl;
  120. return EXIT_SUCCESS;
  121. }