// Copyright Oliver Kowalke 2013. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) #ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H #define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H #include #include #include #include #include #include #include #include #include #include // David Chase and Yossi Lev. Dynamic circular work-stealing deque. // In SPAA ’05: Proceedings of the seventeenth annual ACM symposium // on Parallelism in algorithms and architectures, pages 21–28, // New York, NY, USA, 2005. ACM. // // Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013. // Correct and efficient work-stealing for weak memory models. // In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice // of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80. #if BOOST_COMP_CLANG #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wunused-private-field" #endif namespace boost { namespace fibers { namespace detail { class context_spmc_queue { private: class array { private: typedef std::atomic< context * > atomic_type; typedef atomic_type storage_type; std::size_t capacity_; storage_type * storage_; public: array( std::size_t capacity) : capacity_{ capacity }, storage_{ new storage_type[capacity_] } { for ( std::size_t i = 0; i < capacity_; ++i) { ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr }; } } ~array() { for ( std::size_t i = 0; i < capacity_; ++i) { reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type(); } delete [] storage_; } std::size_t capacity() const noexcept { return capacity_; } void push( std::size_t bottom, context * ctx) noexcept { reinterpret_cast< atomic_type * >( std::addressof( storage_[bottom % capacity_]) ) ->store( ctx, std::memory_order_relaxed); } context * pop( std::size_t top) noexcept { return reinterpret_cast< atomic_type * >( std::addressof( storage_[top % capacity_]) ) ->load( std::memory_order_relaxed); } array * resize( std::size_t bottom, std::size_t top) { std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } }; for ( std::size_t i = top; i != bottom; ++i) { tmp->push( i, pop( i) ); } return tmp.release(); } }; std::atomic< std::size_t > top_{ 0 }; std::atomic< std::size_t > bottom_{ 0 }; std::atomic< array * > array_; std::vector< array * > old_arrays_{}; char padding_[cacheline_length]; public: context_spmc_queue( std::size_t capacity = 4096) : array_{ new array{ capacity } } { old_arrays_.reserve( 32); } ~context_spmc_queue() { for ( array * a : old_arrays_) { delete a; } delete array_.load(); } context_spmc_queue( context_spmc_queue const&) = delete; context_spmc_queue & operator=( context_spmc_queue const&) = delete; bool empty() const noexcept { std::size_t bottom = bottom_.load( std::memory_order_relaxed); std::size_t top = top_.load( std::memory_order_relaxed); return bottom <= top; } void push( context * ctx) { std::size_t bottom = bottom_.load( std::memory_order_relaxed); std::size_t top = top_.load( std::memory_order_acquire); array * a = array_.load( std::memory_order_relaxed); if ( (a->capacity() - 1) < (bottom - top) ) { // queue is full // resize array * tmp = a->resize( bottom, top); old_arrays_.push_back( a); std::swap( a, tmp); array_.store( a, std::memory_order_relaxed); } a->push( bottom, ctx); std::atomic_thread_fence( std::memory_order_release); bottom_.store( bottom + 1, std::memory_order_relaxed); } context * pop() { std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1; array * a = array_.load( std::memory_order_relaxed); bottom_.store( bottom, std::memory_order_relaxed); std::atomic_thread_fence( std::memory_order_seq_cst); std::size_t top = top_.load( std::memory_order_relaxed); context * ctx = nullptr; if ( top <= bottom) { // queue is not empty ctx = a->pop( bottom); BOOST_ASSERT( nullptr != ctx); if ( top == bottom) { // last element dequeued if ( ! top_.compare_exchange_strong( top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed) ) { // lose the race ctx = nullptr; } bottom_.store( bottom + 1, std::memory_order_relaxed); } } else { // queue is empty bottom_.store( bottom + 1, std::memory_order_relaxed); } return ctx; } context * steal() { std::size_t top = top_.load( std::memory_order_acquire); std::atomic_thread_fence( std::memory_order_seq_cst); std::size_t bottom = bottom_.load( std::memory_order_acquire); context * ctx = nullptr; if ( top < bottom) { // queue is not empty array * a = array_.load( std::memory_order_consume); ctx = a->pop( top); BOOST_ASSERT( nullptr != ctx); // do not steal pinned context (e.g. main-/dispatcher-context) if ( ctx->is_context( type::pinned_context) ) { return nullptr; } if ( ! top_.compare_exchange_strong( top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed) ) { // lose the race return nullptr; } } return ctx; } }; }}} #if BOOST_COMP_CLANG #pragma clang diagnostic pop #endif #endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H