loop_executor.hpp 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. // Copyright (C) 2013,2014 Vicente J. Botet Escriba
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. // 2013/11 Vicente J. Botet Escriba
  7. // first implementation of a simple user scheduler.
  8. // 2013/11 Vicente J. Botet Escriba
  9. // rename loop_executor.
  10. #ifndef BOOST_THREAD_EXECUTORS_LOOP_EXECUTOR_HPP
  11. #define BOOST_THREAD_EXECUTORS_LOOP_EXECUTOR_HPP
  12. #include <boost/thread/detail/config.hpp>
  13. #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION && defined BOOST_THREAD_PROVIDES_EXECUTORS && defined BOOST_THREAD_USES_MOVE
  14. #include <boost/thread/detail/delete.hpp>
  15. #include <boost/thread/detail/move.hpp>
  16. #include <boost/thread/concurrent_queues/sync_queue.hpp>
  17. #include <boost/thread/executors/work.hpp>
  18. #include <boost/assert.hpp>
  19. #include <boost/config/abi_prefix.hpp>
  20. namespace boost
  21. {
  22. namespace executors
  23. {
  24. class loop_executor
  25. {
  26. public:
  27. /// type-erasure to store the works to do
  28. typedef executors::work work;
  29. private:
  30. /// the thread safe work queue
  31. concurrent::sync_queue<work > work_queue;
  32. public:
  33. /**
  34. * Effects: try to execute one task.
  35. * Returns: whether a task has been executed.
  36. * Throws: whatever the current task constructor throws or the task() throws.
  37. */
  38. bool try_executing_one()
  39. {
  40. return execute_one(/*wait:*/false);
  41. }
  42. private:
  43. /**
  44. * Effects: Execute one task.
  45. * Remark: If wait is true, waits until a task is available or the executor
  46. * is closed. If wait is false, returns false immediately if no
  47. * task is available.
  48. * Returns: whether a task has been executed (if wait is true, only returns false if closed).
  49. * Throws: whatever the current task constructor throws or the task() throws.
  50. */
  51. bool execute_one(bool wait)
  52. {
  53. work task;
  54. try
  55. {
  56. queue_op_status status = wait ?
  57. work_queue.wait_pull(task) :
  58. work_queue.try_pull(task);
  59. if (status == queue_op_status::success)
  60. {
  61. task();
  62. return true;
  63. }
  64. BOOST_ASSERT(!wait || status == queue_op_status::closed);
  65. return false;
  66. }
  67. catch (...)
  68. {
  69. std::terminate();
  70. //return false;
  71. }
  72. }
  73. public:
  74. /// loop_executor is not copyable.
  75. BOOST_THREAD_NO_COPYABLE(loop_executor)
  76. /**
  77. * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
  78. *
  79. * \b Throws: Whatever exception is thrown while initializing the needed resources.
  80. */
  81. loop_executor()
  82. {
  83. }
  84. /**
  85. * \b Effects: Destroys the thread pool.
  86. *
  87. * \b Synchronization: The completion of all the closures happen before the completion of the \c loop_executor destructor.
  88. */
  89. ~loop_executor()
  90. {
  91. // signal to all the worker thread that there will be no more submissions.
  92. close();
  93. }
  94. /**
  95. * The main loop of the worker thread
  96. */
  97. void loop()
  98. {
  99. while (execute_one(/*wait:*/true))
  100. {
  101. }
  102. BOOST_ASSERT(closed());
  103. while (try_executing_one())
  104. {
  105. }
  106. }
  107. /**
  108. * \b Effects: close the \c loop_executor for submissions.
  109. * The loop will work until there is no more closures to run.
  110. */
  111. void close()
  112. {
  113. work_queue.close();
  114. }
  115. /**
  116. * \b Returns: whether the pool is closed for submissions.
  117. */
  118. bool closed()
  119. {
  120. return work_queue.closed();
  121. }
  122. /**
  123. * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
  124. *
  125. * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
  126. * If invoked closure throws an exception the \c loop_executor will call \c std::terminate, as is the case with threads.
  127. *
  128. * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
  129. *
  130. * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
  131. * Whatever exception that can be throw while storing the closure.
  132. */
  133. void submit(BOOST_THREAD_RV_REF(work) closure) {
  134. work_queue.push(boost::move(closure));
  135. }
  136. #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
  137. template <typename Closure>
  138. void submit(Closure & closure)
  139. {
  140. submit(work(closure));
  141. }
  142. #endif
  143. void submit(void (*closure)())
  144. {
  145. submit(work(closure));
  146. }
  147. template <typename Closure>
  148. void submit(BOOST_THREAD_FWD_REF(Closure) closure)
  149. {
  150. //work_queue.push(work(boost::forward<Closure>(closure)));
  151. work w((boost::forward<Closure>(closure)));
  152. submit(boost::move(w));
  153. }
  154. /**
  155. * \b Requires: This must be called from an scheduled task.
  156. *
  157. * \b Effects: reschedule functions until pred()
  158. */
  159. template <typename Pred>
  160. bool reschedule_until(Pred const& pred)
  161. {
  162. do {
  163. if ( ! try_executing_one())
  164. {
  165. return false;
  166. }
  167. } while (! pred());
  168. return true;
  169. }
  170. /**
  171. * run queued closures
  172. */
  173. void run_queued_closures()
  174. {
  175. sync_queue<work>::underlying_queue_type q = work_queue.underlying_queue();
  176. while (! q.empty())
  177. {
  178. work& task = q.front();
  179. task();
  180. q.pop_front();
  181. }
  182. }
  183. };
  184. }
  185. using executors::loop_executor;
  186. }
  187. #include <boost/config/abi_suffix.hpp>
  188. #endif
  189. #endif