round_robin.hpp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright Oliver Kowalke 2013.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H
  6. #define BOOST_FIBERS_ASIO_ROUND_ROBIN_H
  7. #include <chrono>
  8. #include <cstddef>
  9. #include <memory>
  10. #include <mutex>
  11. #include <queue>
  12. #include <boost/asio.hpp>
  13. #include <boost/assert.hpp>
  14. #include <boost/asio/steady_timer.hpp>
  15. #include <boost/config.hpp>
  16. #include <boost/fiber/condition_variable.hpp>
  17. #include <boost/fiber/context.hpp>
  18. #include <boost/fiber/mutex.hpp>
  19. #include <boost/fiber/operations.hpp>
  20. #include <boost/fiber/scheduler.hpp>
  21. #include "yield.hpp"
  22. #ifdef BOOST_HAS_ABI_HEADERS
  23. # include BOOST_ABI_PREFIX
  24. #endif
  25. namespace boost {
  26. namespace fibers {
  27. namespace asio {
  28. class round_robin : public algo::algorithm {
  29. private:
  30. //[asio_rr_suspend_timer
  31. std::shared_ptr< boost::asio::io_service > io_svc_;
  32. boost::asio::steady_timer suspend_timer_;
  33. //]
  34. boost::fibers::scheduler::ready_queue_type rqueue_{};
  35. boost::fibers::mutex mtx_{};
  36. boost::fibers::condition_variable cnd_{};
  37. std::size_t counter_{ 0 };
  38. public:
  39. //[asio_rr_service_top
  40. struct service : public boost::asio::io_service::service {
  41. static boost::asio::io_service::id id;
  42. std::unique_ptr< boost::asio::io_service::work > work_;
  43. service( boost::asio::io_service & io_svc) :
  44. boost::asio::io_service::service( io_svc),
  45. work_{ new boost::asio::io_service::work( io_svc) } {
  46. }
  47. virtual ~service() {}
  48. service( service const&) = delete;
  49. service & operator=( service const&) = delete;
  50. void shutdown_service() override final {
  51. work_.reset();
  52. }
  53. };
  54. //]
  55. //[asio_rr_ctor
  56. round_robin( std::shared_ptr< boost::asio::io_service > const& io_svc) :
  57. io_svc_( io_svc),
  58. suspend_timer_( * io_svc_) {
  59. // We use add_service() very deliberately. This will throw
  60. // service_already_exists if you pass the same io_service instance to
  61. // more than one round_robin instance.
  62. boost::asio::add_service( * io_svc_, new service( * io_svc_) );
  63. io_svc_->post([this]() mutable {
  64. //]
  65. //[asio_rr_service_lambda
  66. while ( ! io_svc_->stopped() ) {
  67. if ( has_ready_fibers() ) {
  68. // run all pending handlers in round_robin
  69. while ( io_svc_->poll() );
  70. // block this fiber till all pending (ready) fibers are processed
  71. // == round_robin::suspend_until() has been called
  72. std::unique_lock< boost::fibers::mutex > lk( mtx_);
  73. cnd_.wait( lk);
  74. } else {
  75. // run one handler inside io_service
  76. // if no handler available, block this thread
  77. if ( ! io_svc_->run_one() ) {
  78. break;
  79. }
  80. }
  81. }
  82. //]
  83. });
  84. }
  85. void awakened( context * ctx) noexcept {
  86. BOOST_ASSERT( nullptr != ctx);
  87. BOOST_ASSERT( ! ctx->ready_is_linked() );
  88. ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/
  89. if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
  90. ++counter_;
  91. }
  92. }
  93. context * pick_next() noexcept {
  94. context * ctx( nullptr);
  95. if ( ! rqueue_.empty() ) { /*<
  96. pop an item from the ready queue
  97. >*/
  98. ctx = & rqueue_.front();
  99. rqueue_.pop_front();
  100. BOOST_ASSERT( nullptr != ctx);
  101. BOOST_ASSERT( context::active() != ctx);
  102. if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
  103. --counter_;
  104. }
  105. }
  106. return ctx;
  107. }
  108. bool has_ready_fibers() const noexcept {
  109. return 0 < counter_;
  110. }
  111. //[asio_rr_suspend_until
  112. void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept {
  113. // Set a timer so at least one handler will eventually fire, causing
  114. // run_one() to eventually return.
  115. if ( (std::chrono::steady_clock::time_point::max)() != abs_time) {
  116. // Each expires_at(time_point) call cancels any previous pending
  117. // call. We could inadvertently spin like this:
  118. // dispatcher calls suspend_until() with earliest wake time
  119. // suspend_until() sets suspend_timer_
  120. // lambda loop calls run_one()
  121. // some other asio handler runs before timer expires
  122. // run_one() returns to lambda loop
  123. // lambda loop yields to dispatcher
  124. // dispatcher finds no ready fibers
  125. // dispatcher calls suspend_until() with SAME wake time
  126. // suspend_until() sets suspend_timer_ to same time, canceling
  127. // previous async_wait()
  128. // lambda loop calls run_one()
  129. // asio calls suspend_timer_ handler with operation_aborted
  130. // run_one() returns to lambda loop... etc. etc.
  131. // So only actually set the timer when we're passed a DIFFERENT
  132. // abs_time value.
  133. suspend_timer_.expires_at( abs_time);
  134. suspend_timer_.async_wait([](boost::system::error_code const&){
  135. this_fiber::yield();
  136. });
  137. }
  138. cnd_.notify_one();
  139. }
  140. //]
  141. //[asio_rr_notify
  142. void notify() noexcept {
  143. // Something has happened that should wake one or more fibers BEFORE
  144. // suspend_timer_ expires. Reset the timer to cause it to fire
  145. // immediately, causing the run_one() call to return. In theory we
  146. // could use cancel() because we don't care whether suspend_timer_'s
  147. // handler is called with operation_aborted or success. However --
  148. // cancel() doesn't change the expiration time, and we use
  149. // suspend_timer_'s expiration time to decide whether it's already
  150. // set. If suspend_until() set some specific wake time, then notify()
  151. // canceled it, then suspend_until() was called again with the same
  152. // wake time, it would match suspend_timer_'s expiration time and we'd
  153. // refrain from setting the timer. So instead of simply calling
  154. // cancel(), reset the timer, which cancels the pending sleep AND sets
  155. // a new expiration time. This will cause us to spin the loop twice --
  156. // once for the operation_aborted handler, once for timer expiration
  157. // -- but that shouldn't be a big problem.
  158. suspend_timer_.async_wait([](boost::system::error_code const&){
  159. this_fiber::yield();
  160. });
  161. suspend_timer_.expires_at( std::chrono::steady_clock::now() );
  162. }
  163. //]
  164. };
  165. boost::asio::io_service::id round_robin::service::id;
  166. }}}
  167. #ifdef BOOST_HAS_ABI_HEADERS
  168. # include BOOST_ABI_SUFFIX
  169. #endif
  170. #endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H