buffered_channel.qbk 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. [/
  2. Copyright Oliver Kowalke 2013.
  3. Distributed under the Boost Software License, Version 1.0.
  4. (See accompanying file LICENSE_1_0.txt or copy at
  5. http://www.boost.org/LICENSE_1_0.txt
  6. ]
  7. [section:buffered_channel Buffered Channel]
  8. __boost_fiber__ provides a bounded, buffered channel (MPMC queue) suitable to
  9. synchonize fibers (running on same or different threads) via asynchronouss
  10. message passing.
  11. typedef boost::fibers::buffered_channel< int > channel_t;
  12. void send( channel_t & chan) {
  13. for ( int i = 0; i < 5; ++i) {
  14. chan.push( i);
  15. }
  16. chan.close();
  17. }
  18. void recv( channel_t & chan) {
  19. int i;
  20. while ( boost::fibers::channel_op_status::success == chan.pop(i) ) {
  21. std::cout << "received " << i << std::endl;
  22. }
  23. }
  24. channel_t chan{ 2 };
  25. boost::fibers::fiber f1( std::bind( send, std::ref( chan) ) );
  26. boost::fibers::fiber f2( std::bind( recv, std::ref( chan) ) );
  27. f1.join();
  28. f2.join();
  29. Class `buffered_channel` supports range-for syntax:
  30. typedef boost::fibers::buffered_channel< int > channel_t;
  31. void foo( channel_t & chan) {
  32. chan.push( 1);
  33. chan.push( 1);
  34. chan.push( 2);
  35. chan.push( 3);
  36. chan.push( 5);
  37. chan.push( 8);
  38. chan.push( 12);
  39. chan.close();
  40. }
  41. void bar( channel_t & chan) {
  42. for ( unsigned int value : chan) {
  43. std::cout << value << " ";
  44. }
  45. std::cout << std::endl;
  46. }
  47. [template_heading buffered_channel]
  48. #include <boost/fiber/buffered_channel.hpp>
  49. namespace boost {
  50. namespace fibers {
  51. template< typename T >
  52. class buffered_channel {
  53. public:
  54. typedef T value_type;
  55. class iterator;
  56. explicit buffered_channel( std::size_t capacity);
  57. buffered_channel( buffered_channel const& other) = delete;
  58. buffered_channel & operator=( buffered_channel const& other) = delete;
  59. void close() noexcept;
  60. channel_op_status push( value_type const& va);
  61. channel_op_status push( value_type && va);
  62. template< typename Rep, typename Period >
  63. channel_op_status push_wait_for(
  64. value_type const& va,
  65. std::chrono::duration< Rep, Period > const& timeout_duration);
  66. channel_op_status push_wait_for( value_type && va,
  67. std::chrono::duration< Rep, Period > const& timeout_duration);
  68. template< typename Clock, typename Duration >
  69. channel_op_status push_wait_until(
  70. value_type const& va,
  71. std::chrono::time_point< Clock, Duration > const& timeout_time);
  72. template< typename Clock, typename Duration >
  73. channel_op_status push_wait_until(
  74. value_type && va,
  75. std::chrono::time_point< Clock, Duration > const& timeout_time);
  76. channel_op_status try_push( value_type const& va);
  77. channel_op_status try_push( value_type && va);
  78. channel_op_status pop( value_type & va);
  79. value_type value_pop();
  80. template< typename Rep, typename Period >
  81. channel_op_status pop_wait_for(
  82. value_type & va,
  83. std::chrono::duration< Rep, Period > const& timeout_duration);
  84. template< typename Clock, typename Duration >
  85. channel_op_status pop_wait_until(
  86. value_type & va,
  87. std::chrono::time_point< Clock, Duration > const& timeout_time);
  88. channel_op_status try_pop( value_type & va);
  89. };
  90. template< typename T >
  91. buffered_channel< T >::iterator begin( buffered_channel< T > & chan);
  92. template< typename T >
  93. buffered_channel< T >::iterator end( buffered_channel< T > & chan);
  94. }}
  95. [heading Constructor]
  96. explicit buffered_channel( std::size_t capacity);
  97. [variablelist
  98. [[Preconditions:] [`2<=capacity && 0==(capacity & (capacity-1))`]]
  99. [[Effects:] [The constructor constructs an object of class `buffered_channel`
  100. with an internal buffer of size `capacity`.]]
  101. [[Throws:] [`fiber_error`]]
  102. [[Error Conditions:] [
  103. [*invalid_argument]: if `0==capacity || 0!=(capacity & (capacity-1))`.]]
  104. [[Notes:] [A `push()`, `push_wait_for()` or `push_wait_until()` will not block
  105. until the number of values in the channel becomes equal to `capacity`.
  106. The channel can hold only `capacity - 1` elements, otherwise it is
  107. considered to be full.]]
  108. ]
  109. [member_heading buffered_channel..close]
  110. void close() noexcept;
  111. [variablelist
  112. [[Effects:] [Deactivates the channel. No values can be put after calling
  113. `this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()`
  114. or `this->pop_wait_until()` will return `closed`. Fibers blocked in
  115. `this->value_pop()` will receive an exception.]]
  116. [[Throws:] [Nothing.]]
  117. [[Note:] [`close()` is like closing a pipe. It informs waiting consumers
  118. that no more values will arrive.]]
  119. ]
  120. [template buffered_channel_push_effects[enqueues] If channel is closed, returns
  121. `closed`. [enqueues] the value in the channel, wakes up a fiber
  122. blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or
  123. `this->pop_wait_until()` and returns `success`. If the channel is full,
  124. the fiber is blocked.]
  125. [member_heading buffered_channel..push]
  126. channel_op_status push( value_type const& va);
  127. channel_op_status push( value_type && va);
  128. [variablelist
  129. [[Effects:] [[buffered_channel_push_effects Otherwise enqueues]]]
  130. [[Throws:] [Exceptions thrown by copy- or move-operations.]]
  131. ]
  132. [template buffered_channel_try_push_effects[enqueues] If channel is closed, returns
  133. `closed`. [enqueues] the value in the channel, wakes up a fiber
  134. blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or
  135. `this->pop_wait_until()` and returns `success`. If the channel is full,
  136. it doesn't block and returns `full`.]
  137. [member_heading buffered_channel..try_push]
  138. channel_op_status try_push( value_type const& va);
  139. channel_op_status try_push( value_type && va);
  140. [variablelist
  141. [[Effects:] [[buffered_channel_try_push_effects Otherwise enqueues]]]
  142. [[Throws:] [Exceptions thrown by copy- or move-operations.]]
  143. ]
  144. [template buffered_channel_pop[cls unblocking]
  145. [member_heading [cls]..pop]
  146. channel_op_status pop( value_type & va);
  147. [variablelist
  148. [[Effects:] [Dequeues a value from the channel. If the channel is empty, the
  149. fiber gets suspended until at least one new item is `push()`ed (return value
  150. `success` and `va` contains dequeued value) or the channel gets `close()`d
  151. (return value `closed`)[unblocking]]]
  152. [[Throws:] [Exceptions thrown by copy- or move-operations.]]
  153. ]
  154. ]
  155. [buffered_channel_pop buffered_channel .]
  156. [template buffered_channel_value_pop[cls unblocking]
  157. [member_heading [cls]..value_pop]
  158. value_type value_pop();
  159. [variablelist
  160. [[Effects:] [Dequeues a value from the channel. If the channel is empty, the
  161. fiber gets suspended until at least one new item is `push()`ed or the channel
  162. gets `close()`d (which throws an exception)[unblocking]]]
  163. [[Throws:] [`fiber_error` if `*this` is closed or by copy- or move-operations.]]
  164. [[Error conditions:] [`std::errc::operation_not_permitted`]]
  165. ]
  166. ]
  167. [buffered_channel_value_pop buffered_channel .]
  168. [template buffered_channel_try_pop[cls unblocking]
  169. [member_heading [cls]..try_pop]
  170. channel_op_status try_pop( value_type & va);
  171. [variablelist
  172. [[Effects:] [If channel is empty, returns `empty`. If channel is closed,
  173. returns `closed`. Otherwise it returns `success` and `va` contains the
  174. dequeued value[unblocking]]]
  175. [[Throws:] [Exceptions thrown by copy- or move-operations.]]
  176. ]
  177. ]
  178. [buffered_channel_try_pop buffered_channel .]
  179. [template buffered_channel_pop_wait_until_effects[endtime unblocking] If channel
  180. is not empty, immediately dequeues a value from the channel. Otherwise
  181. the fiber gets suspended until at least one new item is `push()`ed (return
  182. value `success` and `va` contains dequeued value), or the channel gets
  183. `close()`d (return value `closed`), or the system time reaches [endtime]
  184. (return value `timeout`)[unblocking]]
  185. [template buffered_channel_pop_wait_for[cls unblocking]
  186. [member_heading [cls]..pop_wait_for]
  187. template< typename Rep, typename Period >
  188. channel_op_status pop_wait_for(
  189. value_type & va,
  190. std::chrono::duration< Rep, Period > const& timeout_duration)
  191. [variablelist
  192. [[Effects:] [Accepts `std::chrono::duration` and internally computes a timeout
  193. time as (system time + `timeout_duration`).
  194. [buffered_channel_pop_wait_until_effects the computed timeout time..[unblocking]]]]
  195. [[Throws:] [timeout-related exceptions or by copy- or move-operations.]]
  196. ]
  197. ]
  198. [buffered_channel_pop_wait_for buffered_channel .]
  199. [template buffered_channel_pop_wait_until[cls unblocking]
  200. [member_heading [cls]..pop_wait_until]
  201. template< typename Clock, typename Duration >
  202. channel_op_status pop_wait_until(
  203. value_type & va,
  204. std::chrono::time_point< Clock, Duration > const& timeout_time)
  205. [variablelist
  206. [[Effects:] [Accepts a `std::chrono::time_point< Clock, Duration >`.
  207. [buffered_channel_pop_wait_until_effects the passed `time_point`..[unblocking]]]]
  208. [[Throws:] [timeout-related exceptions or by copy- or move-operations.]]
  209. ]
  210. ]
  211. [buffered_channel_pop_wait_until buffered_channel .]
  212. [heading Non-member function `begin( buffered_channel< T > &)`]
  213. template< typename T >
  214. buffered_channel< T >::iterator begin( buffered_channel< T > &);
  215. [variablelist
  216. [[Returns:] [Returns a range-iterator (input-iterator).]]
  217. ]
  218. [heading Non-member function `end( buffered_channel< T > &)`]
  219. template< typename T >
  220. buffered_channel< R >::iterator end( buffered_channel< T > &);
  221. [variablelist
  222. [[Returns:] [Returns an end range-iterator (input-iterator).]]
  223. ]
  224. [endsect]