priority_scheduler.cpp 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #include <boost/asio/dispatch.hpp>
  2. #include <boost/asio/execution_context.hpp>
  3. #include <condition_variable>
  4. #include <iostream>
  5. #include <memory>
  6. #include <mutex>
  7. #include <queue>
  8. using boost::asio::dispatch;
  9. using boost::asio::execution_context;
  10. class priority_scheduler : public execution_context
  11. {
  12. public:
  13. // A class that satisfies the Executor requirements.
  14. class executor_type
  15. {
  16. public:
  17. executor_type(priority_scheduler& ctx, int pri) noexcept
  18. : context_(ctx), priority_(pri)
  19. {
  20. }
  21. priority_scheduler& context() const noexcept
  22. {
  23. return context_;
  24. }
  25. void on_work_started() const noexcept
  26. {
  27. // This executor doesn't count work. Instead, the scheduler simply runs
  28. // until explicitly stopped.
  29. }
  30. void on_work_finished() const noexcept
  31. {
  32. // This executor doesn't count work. Instead, the scheduler simply runs
  33. // until explicitly stopped.
  34. }
  35. template <class Func, class Alloc>
  36. void dispatch(Func&& f, const Alloc& a) const
  37. {
  38. post(std::forward<Func>(f), a);
  39. }
  40. template <class Func, class Alloc>
  41. void post(Func f, const Alloc& a) const
  42. {
  43. auto p(std::allocate_shared<item<Func>>(
  44. typename std::allocator_traits<
  45. Alloc>::template rebind_alloc<char>(a),
  46. priority_, std::move(f)));
  47. std::lock_guard<std::mutex> lock(context_.mutex_);
  48. context_.queue_.push(p);
  49. context_.condition_.notify_one();
  50. }
  51. template <class Func, class Alloc>
  52. void defer(Func&& f, const Alloc& a) const
  53. {
  54. post(std::forward<Func>(f), a);
  55. }
  56. friend bool operator==(const executor_type& a,
  57. const executor_type& b) noexcept
  58. {
  59. return &a.context_ == &b.context_;
  60. }
  61. friend bool operator!=(const executor_type& a,
  62. const executor_type& b) noexcept
  63. {
  64. return &a.context_ != &b.context_;
  65. }
  66. private:
  67. priority_scheduler& context_;
  68. int priority_;
  69. };
  70. executor_type get_executor(int pri = 0) noexcept
  71. {
  72. return executor_type(*const_cast<priority_scheduler*>(this), pri);
  73. }
  74. void run()
  75. {
  76. std::unique_lock<std::mutex> lock(mutex_);
  77. for (;;)
  78. {
  79. condition_.wait(lock, [&]{ return stopped_ || !queue_.empty(); });
  80. if (stopped_)
  81. return;
  82. auto p(queue_.top());
  83. queue_.pop();
  84. lock.unlock();
  85. p->execute_(p);
  86. lock.lock();
  87. }
  88. }
  89. void stop()
  90. {
  91. std::lock_guard<std::mutex> lock(mutex_);
  92. stopped_ = true;
  93. condition_.notify_all();
  94. }
  95. private:
  96. struct item_base
  97. {
  98. int priority_;
  99. void (*execute_)(std::shared_ptr<item_base>&);
  100. };
  101. template <class Func>
  102. struct item : item_base
  103. {
  104. item(int pri, Func f) : function_(std::move(f))
  105. {
  106. priority_ = pri;
  107. execute_ = [](std::shared_ptr<item_base>& p)
  108. {
  109. Func tmp(std::move(static_cast<item*>(p.get())->function_));
  110. p.reset();
  111. tmp();
  112. };
  113. }
  114. Func function_;
  115. };
  116. struct item_comp
  117. {
  118. bool operator()(
  119. const std::shared_ptr<item_base>& a,
  120. const std::shared_ptr<item_base>& b)
  121. {
  122. return a->priority_ < b->priority_;
  123. }
  124. };
  125. std::mutex mutex_;
  126. std::condition_variable condition_;
  127. std::priority_queue<
  128. std::shared_ptr<item_base>,
  129. std::vector<std::shared_ptr<item_base>>,
  130. item_comp> queue_;
  131. bool stopped_ = false;
  132. };
  133. int main()
  134. {
  135. priority_scheduler sched;
  136. auto low = sched.get_executor(0);
  137. auto med = sched.get_executor(1);
  138. auto high = sched.get_executor(2);
  139. dispatch(low, []{ std::cout << "1\n"; });
  140. dispatch(low, []{ std::cout << "11\n"; });
  141. dispatch(med, []{ std::cout << "2\n"; });
  142. dispatch(med, []{ std::cout << "22\n"; });
  143. dispatch(high, []{ std::cout << "3\n"; });
  144. dispatch(high, []{ std::cout << "33\n"; });
  145. dispatch(high, []{ std::cout << "333\n"; });
  146. dispatch(sched.get_executor(-1), [&]{ sched.stop(); });
  147. sched.run();
  148. }