buffered_channel.hpp 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. // Copyright Oliver Kowalke 2016.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. // based on Dmitry Vyukov's MPMC queue
  7. // (http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue)
  8. #ifndef BUFFERED_CHANNEL_H
  9. #define BUFFERED_CHANNEL_H
  10. #include <atomic>
  11. #include <chrono>
  12. #include <condition_variable>
  13. #include <cstddef>
  14. #include <cstdint>
  15. #include <memory>
  16. #include <mutex>
  17. #include <stdexcept>
  18. #include <type_traits>
  19. #include <boost/assert.hpp>
  20. #include <boost/config.hpp>
  21. #include <boost/fiber/detail/config.hpp>
  22. enum class channel_op_status {
  23. success = 0,
  24. empty,
  25. full,
  26. closed,
  27. timeout
  28. };
  29. template< typename T >
  30. class buffered_channel {
  31. public:
  32. typedef T value_type;
  33. private:
  34. typedef typename std::aligned_storage< sizeof( T), alignof( T) >::type storage_type;
  35. struct alignas(cache_alignment) slot {
  36. std::atomic< std::size_t > cycle{ 0 };
  37. storage_type storage{};
  38. slot() = default;
  39. };
  40. // procuder cacheline
  41. alignas(cache_alignment) std::atomic< std::size_t > producer_idx_{ 0 };
  42. // consumer cacheline
  43. alignas(cache_alignment) std::atomic< std::size_t > consumer_idx_{ 0 };
  44. // shared write cacheline
  45. alignas(cache_alignment) std::atomic_bool closed_{ false };
  46. mutable std::mutex mtx_{};
  47. std::condition_variable not_full_cnd_{};
  48. std::condition_variable not_empty_cnd_{};
  49. // shared read cacheline
  50. alignas(cache_alignment) slot * slots_{ nullptr };
  51. std::size_t capacity_;
  52. char pad_[cacheline_length];
  53. std::size_t waiting_consumer_{ 0 };
  54. bool is_full_() {
  55. std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) };
  56. return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx);
  57. }
  58. bool is_empty_() {
  59. std::size_t idx{ consumer_idx_.load( std::memory_order_relaxed) };
  60. return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx + 1);
  61. }
  62. template< typename ValueType >
  63. channel_op_status try_push_( ValueType && value) {
  64. slot * s{ nullptr };
  65. std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) };
  66. for (;;) {
  67. s = & slots_[idx & (capacity_ - 1)];
  68. std::size_t cycle{ s->cycle.load( std::memory_order_acquire) };
  69. std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx) };
  70. if ( 0 == diff) {
  71. if ( producer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) {
  72. break;
  73. }
  74. } else if ( 0 > diff) {
  75. return channel_op_status::full;
  76. } else {
  77. idx = producer_idx_.load( std::memory_order_relaxed);
  78. }
  79. }
  80. ::new ( static_cast< void * >( std::addressof( s->storage) ) ) value_type( std::forward< ValueType >( value) );
  81. s->cycle.store( idx + 1, std::memory_order_release);
  82. return channel_op_status::success;
  83. }
  84. channel_op_status try_value_pop_( slot *& s, std::size_t & idx) {
  85. idx = consumer_idx_.load( std::memory_order_relaxed);
  86. for (;;) {
  87. s = & slots_[idx & (capacity_ - 1)];
  88. std::size_t cycle = s->cycle.load( std::memory_order_acquire);
  89. std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx + 1) };
  90. if ( 0 == diff) {
  91. if ( consumer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) {
  92. break;
  93. }
  94. } else if ( 0 > diff) {
  95. return channel_op_status::empty;
  96. } else {
  97. idx = consumer_idx_.load( std::memory_order_relaxed);
  98. }
  99. }
  100. // incrementing the slot cycle must be deferred till the value has been consumed
  101. // slot cycle tells procuders that the cell can be re-used (store new value)
  102. return channel_op_status::success;
  103. }
  104. channel_op_status try_pop_( value_type & value) {
  105. slot * s{ nullptr };
  106. std::size_t idx{ 0 };
  107. channel_op_status status{ try_value_pop_( s, idx) };
  108. if ( channel_op_status::success == status) {
  109. value = std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) );
  110. s->cycle.store( idx + capacity_, std::memory_order_release);
  111. }
  112. return status;
  113. }
  114. public:
  115. explicit buffered_channel( std::size_t capacity) :
  116. capacity_{ capacity } {
  117. if ( 0 == capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) {
  118. throw std::runtime_error{ "boost fiber: buffer capacity is invalid" };
  119. }
  120. slots_ = new slot[capacity_]();
  121. for ( std::size_t i = 0; i < capacity_; ++i) {
  122. slots_[i].cycle.store( i, std::memory_order_relaxed);
  123. }
  124. }
  125. ~buffered_channel() {
  126. close();
  127. for (;;) {
  128. slot * s{ nullptr };
  129. std::size_t idx{ 0 };
  130. if ( channel_op_status::success == try_value_pop_( s, idx) ) {
  131. reinterpret_cast< value_type * >( std::addressof( s->storage) )->~value_type();
  132. s->cycle.store( idx + capacity_, std::memory_order_release);
  133. } else {
  134. break;
  135. }
  136. }
  137. delete [] slots_;
  138. }
  139. buffered_channel( buffered_channel const&) = delete;
  140. buffered_channel & operator=( buffered_channel const&) = delete;
  141. bool is_closed() const noexcept {
  142. return closed_.load( std::memory_order_acquire);
  143. }
  144. void close() noexcept {
  145. std::unique_lock< std::mutex > lk{ mtx_ };
  146. closed_.store( true, std::memory_order_release);
  147. not_full_cnd_.notify_all();
  148. not_empty_cnd_.notify_all();
  149. }
  150. channel_op_status push( value_type const& value) {
  151. for (;;) {
  152. if ( is_closed() ) {
  153. return channel_op_status::closed;
  154. }
  155. channel_op_status status{ try_push_( value) };
  156. if ( channel_op_status::success == status) {
  157. std::unique_lock< std::mutex > lk{ mtx_ };
  158. if ( 0 < waiting_consumer_) {
  159. not_empty_cnd_.notify_one();
  160. }
  161. return status;
  162. } else if ( channel_op_status::full == status) {
  163. std::unique_lock< std::mutex > lk{ mtx_ };
  164. if ( is_closed() ) {
  165. return channel_op_status::closed;
  166. }
  167. if ( ! is_full_() ) {
  168. continue;
  169. }
  170. not_full_cnd_.wait( lk, [this]{ return is_closed() || ! is_full_(); });
  171. } else {
  172. BOOST_ASSERT( channel_op_status::closed == status);
  173. return status;
  174. }
  175. }
  176. }
  177. value_type value_pop() {
  178. for (;;) {
  179. slot * s{ nullptr };
  180. std::size_t idx{ 0 };
  181. channel_op_status status{ try_value_pop_( s, idx) };
  182. if ( channel_op_status::success == status) {
  183. value_type value{ std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) ) };
  184. s->cycle.store( idx + capacity_, std::memory_order_release);
  185. not_full_cnd_.notify_one();
  186. return std::move( value);
  187. } else if ( channel_op_status::empty == status) {
  188. std::unique_lock< std::mutex > lk{ mtx_ };
  189. ++waiting_consumer_;
  190. if ( is_closed() ) {
  191. throw std::runtime_error{ "boost fiber: channel is closed" };
  192. }
  193. if ( ! is_empty_() ) {
  194. continue;
  195. }
  196. not_empty_cnd_.wait( lk, [this](){ return is_closed() || ! is_empty_(); });
  197. --waiting_consumer_;
  198. } else {
  199. BOOST_ASSERT( channel_op_status::closed == status);
  200. throw std::runtime_error{ "boost fiber: channel is closed" };
  201. }
  202. }
  203. }
  204. };
  205. #endif // BUFFERED_CHANNEL_H