sync_queue.hpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. #ifndef BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP
  2. #define BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP
  3. //////////////////////////////////////////////////////////////////////////////
  4. //
  5. // (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost
  6. // Software License, Version 1.0. (See accompanying file
  7. // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  8. //
  9. // See http://www.boost.org/libs/thread for documentation.
  10. //
  11. //////////////////////////////////////////////////////////////////////////////
  12. #include <boost/thread/detail/config.hpp>
  13. #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
  14. #include <boost/thread/concurrent_queues/queue_op_status.hpp>
  15. #include <boost/thread/condition_variable.hpp>
  16. #include <boost/thread/csbl/devector.hpp>
  17. #include <boost/thread/detail/move.hpp>
  18. #include <boost/thread/mutex.hpp>
  19. #include <boost/throw_exception.hpp>
  20. #include <boost/smart_ptr/shared_ptr.hpp>
  21. #include <boost/smart_ptr/make_shared.hpp>
  22. #include <boost/config/abi_prefix.hpp>
  23. namespace boost
  24. {
  25. namespace concurrent
  26. {
  27. template <class ValueType, class Container = csbl::devector<ValueType> >
  28. class sync_queue
  29. : public detail::sync_queue_base<ValueType, Container >
  30. {
  31. typedef detail::sync_queue_base<ValueType, Container > super;
  32. public:
  33. typedef ValueType value_type;
  34. //typedef typename super::value_type value_type; // fixme
  35. typedef typename super::underlying_queue_type underlying_queue_type;
  36. typedef typename super::size_type size_type;
  37. typedef typename super::op_status op_status;
  38. // Constructors/Assignment/Destructors
  39. BOOST_THREAD_NO_COPYABLE(sync_queue)
  40. inline sync_queue();
  41. //template <class Range>
  42. //inline explicit sync_queue(Range range);
  43. inline ~sync_queue();
  44. // Modifiers
  45. inline void push(const value_type& x);
  46. inline queue_op_status try_push(const value_type& x);
  47. inline queue_op_status nonblocking_push(const value_type& x);
  48. inline queue_op_status wait_push(const value_type& x);
  49. inline void push(BOOST_THREAD_RV_REF(value_type) x);
  50. inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x);
  51. inline queue_op_status nonblocking_push(BOOST_THREAD_RV_REF(value_type) x);
  52. inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x);
  53. // Observers/Modifiers
  54. inline void pull(value_type&);
  55. // enable_if is_nothrow_copy_movable<value_type>
  56. inline value_type pull();
  57. inline queue_op_status try_pull(value_type&);
  58. inline queue_op_status nonblocking_pull(value_type&);
  59. inline queue_op_status wait_pull(ValueType& elem);
  60. private:
  61. inline queue_op_status try_pull(value_type& x, unique_lock<mutex>& lk);
  62. inline queue_op_status wait_pull(value_type& x, unique_lock<mutex>& lk);
  63. inline queue_op_status try_push(const value_type& x, unique_lock<mutex>& lk);
  64. inline queue_op_status wait_push(const value_type& x, unique_lock<mutex>& lk);
  65. inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
  66. inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
  67. inline void pull(value_type& elem, unique_lock<mutex>& )
  68. {
  69. elem = boost::move(super::data_.front());
  70. super::data_.pop_front();
  71. }
  72. inline value_type pull(unique_lock<mutex>& )
  73. {
  74. value_type e = boost::move(super::data_.front());
  75. super::data_.pop_front();
  76. return boost::move(e);
  77. }
  78. inline void push(const value_type& elem, unique_lock<mutex>& lk)
  79. {
  80. super::data_.push_back(elem);
  81. super::notify_elem_added(lk);
  82. }
  83. inline void push(BOOST_THREAD_RV_REF(value_type) elem, unique_lock<mutex>& lk)
  84. {
  85. super::data_.push_back(boost::move(elem));
  86. super::notify_elem_added(lk);
  87. }
  88. };
  89. template <class ValueType, class Container>
  90. sync_queue<ValueType, Container>::sync_queue() :
  91. super()
  92. {
  93. }
  94. // template <class ValueType, class Container>
  95. // template <class Range>
  96. // explicit sync_queue<ValueType, Container>::sync_queue(Range range) :
  97. // data_(), closed_(false)
  98. // {
  99. // try
  100. // {
  101. // typedef typename Range::iterator iterator_t;
  102. // iterator_t first = boost::begin(range);
  103. // iterator_t end = boost::end(range);
  104. // for (iterator_t cur = first; cur != end; ++cur)
  105. // {
  106. // data_.push(boost::move(*cur));;
  107. // }
  108. // notify_elem_added(lk);
  109. // }
  110. // catch (...)
  111. // {
  112. // delete[] data_;
  113. // }
  114. // }
  115. template <class ValueType, class Container>
  116. sync_queue<ValueType, Container>::~sync_queue()
  117. {
  118. }
  119. template <class ValueType, class Container>
  120. queue_op_status sync_queue<ValueType, Container>::try_pull(ValueType& elem, unique_lock<mutex>& lk)
  121. {
  122. if (super::empty(lk))
  123. {
  124. if (super::closed(lk)) return queue_op_status::closed;
  125. return queue_op_status::empty;
  126. }
  127. pull(elem, lk);
  128. return queue_op_status::success;
  129. }
  130. template <class ValueType, class Container>
  131. queue_op_status sync_queue<ValueType, Container>::wait_pull(ValueType& elem, unique_lock<mutex>& lk)
  132. {
  133. const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
  134. if (has_been_closed) return queue_op_status::closed;
  135. pull(elem, lk);
  136. return queue_op_status::success;
  137. }
  138. template <class ValueType, class Container>
  139. queue_op_status sync_queue<ValueType, Container>::try_pull(ValueType& elem)
  140. {
  141. unique_lock<mutex> lk(super::mtx_);
  142. return try_pull(elem, lk);
  143. }
  144. template <class ValueType, class Container>
  145. queue_op_status sync_queue<ValueType, Container>::wait_pull(ValueType& elem)
  146. {
  147. unique_lock<mutex> lk(super::mtx_);
  148. return wait_pull(elem, lk);
  149. }
  150. template <class ValueType, class Container>
  151. queue_op_status sync_queue<ValueType, Container>::nonblocking_pull(ValueType& elem)
  152. {
  153. unique_lock<mutex> lk(super::mtx_, try_to_lock);
  154. if (!lk.owns_lock())
  155. {
  156. return queue_op_status::busy;
  157. }
  158. return try_pull(elem, lk);
  159. }
  160. template <class ValueType, class Container>
  161. void sync_queue<ValueType, Container>::pull(ValueType& elem)
  162. {
  163. unique_lock<mutex> lk(super::mtx_);
  164. const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
  165. if (has_been_closed) super::throw_if_closed(lk);
  166. pull(elem, lk);
  167. }
  168. // enable if ValueType is nothrow movable
  169. template <class ValueType, class Container>
  170. ValueType sync_queue<ValueType, Container>::pull()
  171. {
  172. unique_lock<mutex> lk(super::mtx_);
  173. const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
  174. if (has_been_closed) super::throw_if_closed(lk);
  175. return pull(lk);
  176. }
  177. template <class ValueType, class Container>
  178. queue_op_status sync_queue<ValueType, Container>::try_push(const ValueType& elem, unique_lock<mutex>& lk)
  179. {
  180. if (super::closed(lk)) return queue_op_status::closed;
  181. push(elem, lk);
  182. return queue_op_status::success;
  183. }
  184. template <class ValueType, class Container>
  185. queue_op_status sync_queue<ValueType, Container>::try_push(const ValueType& elem)
  186. {
  187. unique_lock<mutex> lk(super::mtx_);
  188. return try_push(elem, lk);
  189. }
  190. template <class ValueType, class Container>
  191. queue_op_status sync_queue<ValueType, Container>::wait_push(const ValueType& elem, unique_lock<mutex>& lk)
  192. {
  193. if (super::closed(lk)) return queue_op_status::closed;
  194. push(elem, lk);
  195. return queue_op_status::success;
  196. }
  197. template <class ValueType, class Container>
  198. queue_op_status sync_queue<ValueType, Container>::wait_push(const ValueType& elem)
  199. {
  200. unique_lock<mutex> lk(super::mtx_);
  201. return wait_push(elem, lk);
  202. }
  203. template <class ValueType, class Container>
  204. queue_op_status sync_queue<ValueType, Container>::nonblocking_push(const ValueType& elem)
  205. {
  206. unique_lock<mutex> lk(super::mtx_, try_to_lock);
  207. if (!lk.owns_lock()) return queue_op_status::busy;
  208. return try_push(elem, lk);
  209. }
  210. template <class ValueType, class Container>
  211. void sync_queue<ValueType, Container>::push(const ValueType& elem)
  212. {
  213. unique_lock<mutex> lk(super::mtx_);
  214. super::throw_if_closed(lk);
  215. push(elem, lk);
  216. }
  217. template <class ValueType, class Container>
  218. queue_op_status sync_queue<ValueType, Container>::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
  219. {
  220. if (super::closed(lk)) return queue_op_status::closed;
  221. push(boost::move(elem), lk);
  222. return queue_op_status::success;
  223. }
  224. template <class ValueType, class Container>
  225. queue_op_status sync_queue<ValueType, Container>::try_push(BOOST_THREAD_RV_REF(ValueType) elem)
  226. {
  227. unique_lock<mutex> lk(super::mtx_);
  228. return try_push(boost::move(elem), lk);
  229. }
  230. template <class ValueType, class Container>
  231. queue_op_status sync_queue<ValueType, Container>::wait_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
  232. {
  233. if (super::closed(lk)) return queue_op_status::closed;
  234. push(boost::move(elem), lk);
  235. return queue_op_status::success;
  236. }
  237. template <class ValueType, class Container>
  238. queue_op_status sync_queue<ValueType, Container>::wait_push(BOOST_THREAD_RV_REF(ValueType) elem)
  239. {
  240. unique_lock<mutex> lk(super::mtx_);
  241. return wait_push(boost::move(elem), lk);
  242. }
  243. template <class ValueType, class Container>
  244. queue_op_status sync_queue<ValueType, Container>::nonblocking_push(BOOST_THREAD_RV_REF(ValueType) elem)
  245. {
  246. unique_lock<mutex> lk(super::mtx_, try_to_lock);
  247. if (!lk.owns_lock())
  248. {
  249. return queue_op_status::busy;
  250. }
  251. return try_push(boost::move(elem), lk);
  252. }
  253. template <class ValueType, class Container>
  254. void sync_queue<ValueType, Container>::push(BOOST_THREAD_RV_REF(ValueType) elem)
  255. {
  256. unique_lock<mutex> lk(super::mtx_);
  257. super::throw_if_closed(lk);
  258. push(boost::move(elem), lk);
  259. }
  260. template <class ValueType, class Container>
  261. sync_queue<ValueType, Container>& operator<<(sync_queue<ValueType, Container>& sbq, BOOST_THREAD_RV_REF(ValueType) elem)
  262. {
  263. sbq.push(boost::move(elem));
  264. return sbq;
  265. }
  266. template <class ValueType, class Container>
  267. sync_queue<ValueType, Container>& operator<<(sync_queue<ValueType, Container>& sbq, ValueType const&elem)
  268. {
  269. sbq.push(elem);
  270. return sbq;
  271. }
  272. template <class ValueType, class Container>
  273. sync_queue<ValueType, Container>& operator>>(sync_queue<ValueType, Container>& sbq, ValueType &elem)
  274. {
  275. sbq.pull(elem);
  276. return sbq;
  277. }
  278. }
  279. using concurrent::sync_queue;
  280. }
  281. #include <boost/config/abi_suffix.hpp>
  282. #endif