123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671 |
- // Copyright Oliver Kowalke 2016.
- // Distributed under the Boost Software License, Version 1.0.
- // (See accompanying file LICENSE_1_0.txt or copy at
- // http://www.boost.org/LICENSE_1_0.txt)
- #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
- #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
- #include <atomic>
- #include <chrono>
- #include <cstddef>
- #include <cstdint>
- #include <memory>
- #include <vector>
- #include <boost/config.hpp>
- #include <boost/fiber/channel_op_status.hpp>
- #include <boost/fiber/context.hpp>
- #include <boost/fiber/detail/config.hpp>
- #include <boost/fiber/detail/convert.hpp>
- #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
- #include <boost/fiber/detail/exchange.hpp>
- #endif
- #include <boost/fiber/detail/spinlock.hpp>
- #include <boost/fiber/exceptions.hpp>
- #ifdef BOOST_HAS_ABI_HEADERS
- # include BOOST_ABI_PREFIX
- #endif
- namespace boost {
- namespace fibers {
- template< typename T >
- class unbuffered_channel {
- public:
- typedef typename std::remove_reference< T >::type value_type;
- private:
- typedef context::wait_queue_t wait_queue_type;
- struct slot {
- value_type value;
- context * ctx;
- slot( value_type const& value_, context * ctx_) :
- value{ value_ },
- ctx{ ctx_ } {
- }
- slot( value_type && value_, context * ctx_) :
- value{ std::move( value_) },
- ctx{ ctx_ } {
- }
- };
- // shared cacheline
- std::atomic< slot * > slot_{ nullptr };
- // shared cacheline
- std::atomic_bool closed_{ false };
- mutable detail::spinlock splk_producers_{};
- wait_queue_type waiting_producers_{};
- mutable detail::spinlock splk_consumers_{};
- wait_queue_type waiting_consumers_{};
- char pad_[cacheline_length];
- bool is_empty_() {
- return nullptr == slot_.load( std::memory_order_acquire);
- }
- bool try_push_( slot * own_slot) {
- for (;;) {
- slot * s = slot_.load( std::memory_order_acquire);
- if ( nullptr == s) {
- if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
- continue;
- }
- return true;
- } else {
- return false;
- }
- }
- }
- slot * try_pop_() {
- slot * nil_slot = nullptr;
- for (;;) {
- slot * s = slot_.load( std::memory_order_acquire);
- if ( nullptr != s) {
- if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
- continue;}
- }
- return s;
- }
- }
- public:
- unbuffered_channel() {
- }
- ~unbuffered_channel() {
- close();
- }
- unbuffered_channel( unbuffered_channel const&) = delete;
- unbuffered_channel & operator=( unbuffered_channel const&) = delete;
- bool is_closed() const noexcept {
- return closed_.load( std::memory_order_acquire);
- }
- void close() noexcept {
- context * active_ctx = context::active();
- // set flag
- if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
- // notify current waiting
- slot * s = slot_.load( std::memory_order_acquire);
- if ( nullptr != s) {
- // notify context
- active_ctx->schedule( s->ctx);
- }
- // notify all waiting producers
- detail::spinlock_lock lk1{ splk_producers_ };
- while ( ! waiting_producers_.empty() ) {
- context * producer_ctx = & waiting_producers_.front();
- waiting_producers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( producer_ctx);
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( producer_ctx);
- }
- }
- // notify all waiting consumers
- detail::spinlock_lock lk2{ splk_consumers_ };
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( consumer_ctx);
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- }
- }
- }
- }
- channel_op_status push( value_type const& value) {
- context * active_ctx = context::active();
- slot s{ value, active_ctx };
- for (;;) {
- if ( BOOST_UNLIKELY( is_closed() ) ) {
- return channel_op_status::closed;
- }
- if ( try_push_( & s) ) {
- detail::spinlock_lock lk{ splk_consumers_ };
- // notify one waiting consumer
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
- }
- // suspend till value has been consumed
- active_ctx->suspend( lk);
- // resumed
- if ( nullptr == s.ctx) {
- // value has been consumed
- return channel_op_status::success;
- } else {
- // channel was closed before value was consumed
- return channel_op_status::closed;
- }
- } else {
- detail::spinlock_lock lk{ splk_producers_ };
- if ( BOOST_UNLIKELY( is_closed() ) ) {
- return channel_op_status::closed;
- }
- if ( is_empty_() ) {
- continue;
- }
- active_ctx->wait_link( waiting_producers_);
- active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
- // suspend this producer
- active_ctx->suspend( lk);
- // resumed, slot mabye free
- }
- }
- }
- channel_op_status push( value_type && value) {
- context * active_ctx = context::active();
- slot s{ std::move( value), active_ctx };
- for (;;) {
- if ( BOOST_UNLIKELY( is_closed() ) ) {
- return channel_op_status::closed;
- }
- if ( try_push_( & s) ) {
- detail::spinlock_lock lk{ splk_consumers_ };
- // notify one waiting consumer
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
- }
- // suspend till value has been consumed
- active_ctx->suspend( lk);
- // resumed
- if ( nullptr == s.ctx) {
- // value has been consumed
- return channel_op_status::success;
- } else {
- // channel was closed before value was consumed
- return channel_op_status::closed;
- }
- } else {
- detail::spinlock_lock lk{ splk_producers_ };
- if ( BOOST_UNLIKELY( is_closed() ) ) {
- return channel_op_status::closed;
- }
- if ( is_empty_() ) {
- continue;
- }
- active_ctx->wait_link( waiting_producers_);
- active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
- // suspend this producer
- active_ctx->suspend( lk);
- // resumed, slot mabye free
- }
- }
- }
- template< typename Rep, typename Period >
- channel_op_status push_wait_for( value_type const& value,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return push_wait_until( value,
- std::chrono::steady_clock::now() + timeout_duration);
- }
- template< typename Rep, typename Period >
- channel_op_status push_wait_for( value_type && value,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return push_wait_until( std::forward< value_type >( value),
- std::chrono::steady_clock::now() + timeout_duration);
- }
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until( value_type const& value,
- std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- context * active_ctx = context::active();
- slot s{ value, active_ctx };
- std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
- for (;;) {
- if ( BOOST_UNLIKELY( is_closed() ) ) {
- return channel_op_status::closed;
- }
- if ( try_push_( & s) ) {
- detail::spinlock_lock lk{ splk_consumers_ };
- // notify one waiting consumer
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
- }
- // suspend this producer
- active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
- if ( ! active_ctx->wait_until( timeout_time, lk) ) {
- // clear slot
- slot * nil_slot = nullptr, * own_slot = & s;
- slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
- // resumed, value has not been consumed
- return channel_op_status::timeout;
- }
- // resumed
- if ( nullptr == s.ctx) {
- // value has been consumed
- return channel_op_status::success;
- } else {
- // channel was closed before value was consumed
- return channel_op_status::closed;
- }
- } else {
- detail::spinlock_lock lk{ splk_producers_ };
- if ( BOOST_UNLIKELY( is_closed() ) ) {
- return channel_op_status::closed;
- }
- if ( is_empty_() ) {
- continue;
- }
- active_ctx->wait_link( waiting_producers_);
- active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
- // suspend this producer
- if ( ! active_ctx->wait_until( timeout_time, lk) ) {
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- waiting_producers_.remove( * active_ctx);
- return channel_op_status::timeout;
- }
- // resumed, slot maybe free
- }
- }
- }
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until( value_type && value,
- std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- context * active_ctx = context::active();
- slot s{ std::move( value), active_ctx };
- std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
- for (;;) {
- if ( BOOST_UNLIKELY( is_closed() ) ) {
- return channel_op_status::closed;
- }
- if ( try_push_( & s) ) {
- detail::spinlock_lock lk{ splk_consumers_ };
- // notify one waiting consumer
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
- }
- // suspend this producer
- active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
- if ( ! active_ctx->wait_until( timeout_time, lk) ) {
- // clear slot
- slot * nil_slot = nullptr, * own_slot = & s;
- slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
- // resumed, value has not been consumed
- return channel_op_status::timeout;
- }
- // resumed
- if ( nullptr == s.ctx) {
- // value has been consumed
- return channel_op_status::success;
- } else {
- // channel was closed before value was consumed
- return channel_op_status::closed;
- }
- } else {
- detail::spinlock_lock lk{ splk_producers_ };
- if ( BOOST_UNLIKELY( is_closed() ) ) {
- return channel_op_status::closed;
- }
- if ( is_empty_() ) {
- continue;
- }
- active_ctx->wait_link( waiting_producers_);
- active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
- // suspend this producer
- if ( ! active_ctx->wait_until( timeout_time, lk) ) {
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- waiting_producers_.remove( * active_ctx);
- return channel_op_status::timeout;
- }
- // resumed, slot maybe free
- }
- }
- }
- channel_op_status pop( value_type & value) {
- context * active_ctx = context::active();
- slot * s = nullptr;
- for (;;) {
- if ( nullptr != ( s = try_pop_() ) ) {
- {
- detail::spinlock_lock lk{ splk_producers_ };
- // notify one waiting producer
- while ( ! waiting_producers_.empty() ) {
- context * producer_ctx = & waiting_producers_.front();
- waiting_producers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- lk.unlock();
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- lk.unlock();
- // no timed-wait op.
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- }
- }
- }
- value = std::move( s->value);
- // notify context
- #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
- active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
- #else
- active_ctx->schedule( std::exchange( s->ctx, nullptr) );
- #endif
- return channel_op_status::success;
- } else {
- detail::spinlock_lock lk{ splk_consumers_ };
- if ( BOOST_UNLIKELY( is_closed() ) ) {
- return channel_op_status::closed;
- }
- if ( ! is_empty_() ) {
- continue;
- }
- active_ctx->wait_link( waiting_consumers_);
- active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
- // suspend this consumer
- active_ctx->suspend( lk);
- // resumed, slot mabye set
- }
- }
- }
- value_type value_pop() {
- context * active_ctx = context::active();
- slot * s = nullptr;
- for (;;) {
- if ( nullptr != ( s = try_pop_() ) ) {
- {
- detail::spinlock_lock lk{ splk_producers_ };
- // notify one waiting producer
- while ( ! waiting_producers_.empty() ) {
- context * producer_ctx = & waiting_producers_.front();
- waiting_producers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- lk.unlock();
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- lk.unlock();
- // no timed-wait op.
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- }
- }
- }
- // consume value
- value_type value = std::move( s->value);
- // notify context
- #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
- active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
- #else
- active_ctx->schedule( std::exchange( s->ctx, nullptr) );
- #endif
- return std::move( value);
- } else {
- detail::spinlock_lock lk{ splk_consumers_ };
- if ( BOOST_UNLIKELY( is_closed() ) ) {
- throw fiber_error{
- std::make_error_code( std::errc::operation_not_permitted),
- "boost fiber: channel is closed" };
- }
- if ( ! is_empty_() ) {
- continue;
- }
- active_ctx->wait_link( waiting_consumers_);
- active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
- // suspend this consumer
- active_ctx->suspend( lk);
- // resumed, slot mabye set
- }
- }
- }
- template< typename Rep, typename Period >
- channel_op_status pop_wait_for( value_type & value,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return pop_wait_until( value,
- std::chrono::steady_clock::now() + timeout_duration);
- }
- template< typename Clock, typename Duration >
- channel_op_status pop_wait_until( value_type & value,
- std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- context * active_ctx = context::active();
- slot * s = nullptr;
- std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
- for (;;) {
- if ( nullptr != ( s = try_pop_() ) ) {
- {
- detail::spinlock_lock lk{ splk_producers_ };
- // notify one waiting producer
- while ( ! waiting_producers_.empty() ) {
- context * producer_ctx = & waiting_producers_.front();
- waiting_producers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- lk.unlock();
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- lk.unlock();
- // no timed-wait op.
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- }
- }
- }
- // consume value
- value = std::move( s->value);
- // notify context
- #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
- active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
- #else
- active_ctx->schedule( std::exchange( s->ctx, nullptr) );
- #endif
- return channel_op_status::success;
- } else {
- detail::spinlock_lock lk{ splk_consumers_ };
- if ( BOOST_UNLIKELY( is_closed() ) ) {
- return channel_op_status::closed;
- }
- if ( ! is_empty_() ) {
- continue;
- }
- active_ctx->wait_link( waiting_consumers_);
- active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
- // suspend this consumer
- if ( ! active_ctx->wait_until( timeout_time, lk) ) {
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- waiting_consumers_.remove( * active_ctx);
- return channel_op_status::timeout;
- }
- }
- }
- }
- class iterator {
- private:
- typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
- unbuffered_channel * chan_{ nullptr };
- storage_type storage_;
- void increment_() {
- BOOST_ASSERT( nullptr != chan_);
- try {
- ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
- } catch ( fiber_error const&) {
- chan_ = nullptr;
- }
- }
- public:
- typedef std::input_iterator_tag iterator_category;
- typedef std::ptrdiff_t difference_type;
- typedef value_type * pointer;
- typedef value_type & reference;
- typedef pointer pointer_t;
- typedef reference reference_t;
- iterator() noexcept = default;
- explicit iterator( unbuffered_channel< T > * chan) noexcept :
- chan_{ chan } {
- increment_();
- }
- iterator( iterator const& other) noexcept :
- chan_{ other.chan_ } {
- }
- iterator & operator=( iterator const& other) noexcept {
- if ( this == & other) return * this;
- chan_ = other.chan_;
- return * this;
- }
- bool operator==( iterator const& other) const noexcept {
- return other.chan_ == chan_;
- }
- bool operator!=( iterator const& other) const noexcept {
- return other.chan_ != chan_;
- }
- iterator & operator++() {
- reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
- increment_();
- return * this;
- }
- iterator operator++( int) = delete;
- reference_t operator*() noexcept {
- return * reinterpret_cast< value_type * >( std::addressof( storage_) );
- }
- pointer_t operator->() noexcept {
- return reinterpret_cast< value_type * >( std::addressof( storage_) );
- }
- };
- friend class iterator;
- };
- template< typename T >
- typename unbuffered_channel< T >::iterator
- begin( unbuffered_channel< T > & chan) {
- return typename unbuffered_channel< T >::iterator( & chan);
- }
- template< typename T >
- typename unbuffered_channel< T >::iterator
- end( unbuffered_channel< T > &) {
- return typename unbuffered_channel< T >::iterator();
- }
- }}
- #ifdef BOOST_HAS_ABI_HEADERS
- # include BOOST_ABI_SUFFIX
- #endif
- #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H
|