basic_thread_pool.hpp 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. // Copyright (C) 2013-2014 Vicente J. Botet Escriba
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. // 2013/09 Vicente J. Botet Escriba
  7. // Adapt to boost from CCIA C++11 implementation
  8. // first implementation of a simple pool thread using a vector of threads and a sync_queue.
  9. #ifndef BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
  10. #define BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
  11. #include <boost/thread/detail/config.hpp>
  12. #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION && defined BOOST_THREAD_PROVIDES_EXECUTORS && defined BOOST_THREAD_USES_MOVE
  13. #include <boost/thread/detail/delete.hpp>
  14. #include <boost/thread/detail/move.hpp>
  15. #include <boost/thread/thread.hpp>
  16. #include <boost/thread/concurrent_queues/sync_queue.hpp>
  17. #include <boost/thread/executors/work.hpp>
  18. #include <boost/thread/csbl/vector.hpp>
  19. #include <boost/config/abi_prefix.hpp>
  20. namespace boost
  21. {
  22. namespace executors
  23. {
  24. class basic_thread_pool
  25. {
  26. public:
  27. /// type-erasure to store the works to do
  28. typedef executors::work work;
  29. private:
  30. typedef thread thread_t;
  31. /// A move aware vector type
  32. typedef csbl::vector<thread_t> thread_vector;
  33. /// A move aware vector
  34. thread_vector threads;
  35. /// the thread safe work queue
  36. concurrent::sync_queue<work > work_queue;
  37. public:
  38. /**
  39. * Effects: try to execute one task.
  40. * Returns: whether a task has been executed.
  41. * Throws: whatever the current task constructor throws or the task() throws.
  42. */
  43. bool try_executing_one()
  44. {
  45. try
  46. {
  47. work task;
  48. if (work_queue.try_pull(task) == queue_op_status::success)
  49. {
  50. task();
  51. return true;
  52. }
  53. return false;
  54. }
  55. catch (...)
  56. {
  57. std::terminate();
  58. //return false;
  59. }
  60. }
  61. /**
  62. * Effects: schedule one task or yields
  63. * Throws: whatever the current task constructor throws or the task() throws.
  64. */
  65. void schedule_one_or_yield()
  66. {
  67. if ( ! try_executing_one())
  68. {
  69. this_thread::yield();
  70. }
  71. }
  72. private:
  73. /**
  74. * The main loop of the worker threads
  75. */
  76. void worker_thread()
  77. {
  78. try
  79. {
  80. for(;;)
  81. {
  82. work task;
  83. try
  84. {
  85. queue_op_status st = work_queue.wait_pull(task);
  86. if (st == queue_op_status::closed) {
  87. return;
  88. }
  89. task();
  90. }
  91. catch (boost::thread_interrupted&)
  92. {
  93. return;
  94. }
  95. }
  96. }
  97. catch (...)
  98. {
  99. std::terminate();
  100. return;
  101. }
  102. }
  103. #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
  104. template <class AtThreadEntry>
  105. void worker_thread1(AtThreadEntry& at_thread_entry)
  106. {
  107. at_thread_entry(*this);
  108. worker_thread();
  109. }
  110. #endif
  111. void worker_thread2(void(*at_thread_entry)(basic_thread_pool&))
  112. {
  113. at_thread_entry(*this);
  114. worker_thread();
  115. }
  116. template <class AtThreadEntry>
  117. void worker_thread3(BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
  118. {
  119. at_thread_entry(*this);
  120. worker_thread();
  121. }
  122. static void do_nothing_at_thread_entry(basic_thread_pool&) {}
  123. public:
  124. /// basic_thread_pool is not copyable.
  125. BOOST_THREAD_NO_COPYABLE(basic_thread_pool)
  126. /**
  127. * \b Effects: creates a thread pool that runs closures on \c thread_count threads.
  128. *
  129. * \b Throws: Whatever exception is thrown while initializing the needed resources.
  130. */
  131. basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
  132. {
  133. try
  134. {
  135. threads.reserve(thread_count);
  136. for (unsigned i = 0; i < thread_count; ++i)
  137. {
  138. #if 1
  139. thread th (&basic_thread_pool::worker_thread, this);
  140. threads.push_back(thread_t(boost::move(th)));
  141. #else
  142. threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
  143. #endif
  144. }
  145. }
  146. catch (...)
  147. {
  148. close();
  149. throw;
  150. }
  151. }
  152. /**
  153. * \b Effects: creates a thread pool that runs closures on \c thread_count threads
  154. * and executes the at_thread_entry function at the entry of each created thread. .
  155. *
  156. * \b Throws: Whatever exception is thrown while initializing the needed resources.
  157. */
  158. #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
  159. template <class AtThreadEntry>
  160. basic_thread_pool( unsigned const thread_count, AtThreadEntry& at_thread_entry)
  161. {
  162. try
  163. {
  164. threads.reserve(thread_count);
  165. for (unsigned i = 0; i < thread_count; ++i)
  166. {
  167. thread th (&basic_thread_pool::worker_thread1<AtThreadEntry>, this, at_thread_entry);
  168. threads.push_back(thread_t(boost::move(th)));
  169. //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
  170. }
  171. }
  172. catch (...)
  173. {
  174. close();
  175. throw;
  176. }
  177. }
  178. #endif
  179. basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool&))
  180. {
  181. try
  182. {
  183. threads.reserve(thread_count);
  184. for (unsigned i = 0; i < thread_count; ++i)
  185. {
  186. thread th (&basic_thread_pool::worker_thread2, this, at_thread_entry);
  187. threads.push_back(thread_t(boost::move(th)));
  188. //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
  189. }
  190. }
  191. catch (...)
  192. {
  193. close();
  194. throw;
  195. }
  196. }
  197. template <class AtThreadEntry>
  198. basic_thread_pool( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
  199. {
  200. try
  201. {
  202. threads.reserve(thread_count);
  203. for (unsigned i = 0; i < thread_count; ++i)
  204. {
  205. thread th (&basic_thread_pool::worker_thread3<AtThreadEntry>, this, boost::forward<AtThreadEntry>(at_thread_entry));
  206. threads.push_back(thread_t(boost::move(th)));
  207. //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
  208. }
  209. }
  210. catch (...)
  211. {
  212. close();
  213. throw;
  214. }
  215. }
  216. /**
  217. * \b Effects: Destroys the thread pool.
  218. *
  219. * \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
  220. */
  221. ~basic_thread_pool()
  222. {
  223. // signal to all the worker threads that there will be no more submissions.
  224. close();
  225. // joins all the threads before destroying the thread pool resources (e.g. the queue).
  226. interrupt_and_join();
  227. }
  228. /**
  229. * \b Effects: join all the threads.
  230. */
  231. void join()
  232. {
  233. for (unsigned i = 0; i < threads.size(); ++i)
  234. {
  235. //threads[i].interrupt();
  236. threads[i].join();
  237. }
  238. }
  239. /**
  240. * \b Effects: interrupt all the threads.
  241. */
  242. void interrupt()
  243. {
  244. for (unsigned i = 0; i < threads.size(); ++i)
  245. {
  246. threads[i].interrupt();
  247. }
  248. }
  249. /**
  250. * \b Effects: interrupt and join all the threads.
  251. */
  252. void interrupt_and_join()
  253. {
  254. for (unsigned i = 0; i < threads.size(); ++i)
  255. {
  256. threads[i].interrupt();
  257. threads[i].join();
  258. }
  259. }
  260. /**
  261. * \b Effects: close the \c basic_thread_pool for submissions.
  262. * The worker threads will work until there is no more closures to run.
  263. */
  264. void close()
  265. {
  266. work_queue.close();
  267. }
  268. /**
  269. * \b Returns: whether the pool is closed for submissions.
  270. */
  271. bool closed()
  272. {
  273. return work_queue.closed();
  274. }
  275. /**
  276. * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
  277. *
  278. * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
  279. * If invoked closure throws an exception the \c basic_thread_pool will call \c std::terminate, as is the case with threads.
  280. *
  281. * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
  282. *
  283. * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
  284. * Whatever exception that can be throw while storing the closure.
  285. */
  286. void submit(BOOST_THREAD_RV_REF(work) closure) {
  287. work_queue.push(boost::move(closure));
  288. }
  289. #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
  290. template <typename Closure>
  291. void submit(Closure & closure)
  292. {
  293. submit(work(closure));
  294. }
  295. #endif
  296. void submit(void (*closure)())
  297. {
  298. submit(work(closure));
  299. }
  300. template <typename Closure>
  301. void submit(BOOST_THREAD_FWD_REF(Closure) closure)
  302. {
  303. //submit(work(boost::forward<Closure>(closure)));
  304. work w((boost::forward<Closure>(closure)));
  305. submit(boost::move(w));
  306. }
  307. /**
  308. * \b Requires: This must be called from an scheduled task.
  309. *
  310. * \b Effects: reschedule functions until pred()
  311. */
  312. template <typename Pred>
  313. bool reschedule_until(Pred const& pred)
  314. {
  315. do {
  316. if ( ! try_executing_one())
  317. {
  318. return false;
  319. }
  320. } while (! pred());
  321. return true;
  322. }
  323. };
  324. }
  325. using executors::basic_thread_pool;
  326. }
  327. #include <boost/config/abi_suffix.hpp>
  328. #endif
  329. #endif