198 lines
6.7 KiB
C++
198 lines
6.7 KiB
C++
|
||
// Copyright Oliver Kowalke 2013.
|
||
// Distributed under the Boost Software License, Version 1.0.
|
||
// (See accompanying file LICENSE_1_0.txt or copy at
|
||
// http://www.boost.org/LICENSE_1_0.txt)
|
||
|
||
#ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
|
||
#define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
|
||
|
||
#include <atomic>
|
||
#include <cstddef>
|
||
#include <cstdint>
|
||
#include <memory>
|
||
#include <type_traits>
|
||
#include <utility>
|
||
|
||
#include <boost/assert.hpp>
|
||
#include <boost/config.hpp>
|
||
|
||
#include <boost/fiber/detail/config.hpp>
|
||
#include <boost/fiber/context.hpp>
|
||
|
||
// David Chase and Yossi Lev. Dynamic circular work-stealing deque.
|
||
// In SPAA ’05: Proceedings of the seventeenth annual ACM symposium
|
||
// on Parallelism in algorithms and architectures, pages 21–28,
|
||
// New York, NY, USA, 2005. ACM.
|
||
//
|
||
// Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013.
|
||
// Correct and efficient work-stealing for weak memory models.
|
||
// In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice
|
||
// of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80.
|
||
|
||
#if BOOST_COMP_CLANG
|
||
#pragma clang diagnostic push
|
||
#pragma clang diagnostic ignored "-Wunused-private-field"
|
||
#endif
|
||
|
||
namespace boost {
|
||
namespace fibers {
|
||
namespace detail {
|
||
|
||
class context_spmc_queue {
|
||
private:
|
||
class array {
|
||
private:
|
||
typedef std::atomic< context * > atomic_type;
|
||
typedef atomic_type storage_type;
|
||
|
||
std::size_t capacity_;
|
||
storage_type * storage_;
|
||
|
||
public:
|
||
array( std::size_t capacity) :
|
||
capacity_{ capacity },
|
||
storage_{ new storage_type[capacity_] } {
|
||
for ( std::size_t i = 0; i < capacity_; ++i) {
|
||
::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
|
||
}
|
||
}
|
||
|
||
~array() {
|
||
for ( std::size_t i = 0; i < capacity_; ++i) {
|
||
reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
|
||
}
|
||
delete [] storage_;
|
||
}
|
||
|
||
std::size_t capacity() const noexcept {
|
||
return capacity_;
|
||
}
|
||
|
||
void push( std::size_t bottom, context * ctx) noexcept {
|
||
reinterpret_cast< atomic_type * >(
|
||
std::addressof( storage_[bottom % capacity_]) )
|
||
->store( ctx, std::memory_order_relaxed);
|
||
}
|
||
|
||
context * pop( std::size_t top) noexcept {
|
||
return reinterpret_cast< atomic_type * >(
|
||
std::addressof( storage_[top % capacity_]) )
|
||
->load( std::memory_order_relaxed);
|
||
}
|
||
|
||
array * resize( std::size_t bottom, std::size_t top) {
|
||
std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } };
|
||
for ( std::size_t i = top; i != bottom; ++i) {
|
||
tmp->push( i, pop( i) );
|
||
}
|
||
return tmp.release();
|
||
}
|
||
};
|
||
|
||
std::atomic< std::size_t > top_{ 0 };
|
||
std::atomic< std::size_t > bottom_{ 0 };
|
||
std::atomic< array * > array_;
|
||
std::vector< array * > old_arrays_{};
|
||
char padding_[cacheline_length];
|
||
|
||
public:
|
||
context_spmc_queue( std::size_t capacity = 4096) :
|
||
array_{ new array{ capacity } } {
|
||
old_arrays_.reserve( 32);
|
||
}
|
||
|
||
~context_spmc_queue() {
|
||
for ( array * a : old_arrays_) {
|
||
delete a;
|
||
}
|
||
delete array_.load();
|
||
}
|
||
|
||
context_spmc_queue( context_spmc_queue const&) = delete;
|
||
context_spmc_queue & operator=( context_spmc_queue const&) = delete;
|
||
|
||
bool empty() const noexcept {
|
||
std::size_t bottom = bottom_.load( std::memory_order_relaxed);
|
||
std::size_t top = top_.load( std::memory_order_relaxed);
|
||
return bottom <= top;
|
||
}
|
||
|
||
void push( context * ctx) {
|
||
std::size_t bottom = bottom_.load( std::memory_order_relaxed);
|
||
std::size_t top = top_.load( std::memory_order_acquire);
|
||
array * a = array_.load( std::memory_order_relaxed);
|
||
if ( (a->capacity() - 1) < (bottom - top) ) {
|
||
// queue is full
|
||
// resize
|
||
array * tmp = a->resize( bottom, top);
|
||
old_arrays_.push_back( a);
|
||
std::swap( a, tmp);
|
||
array_.store( a, std::memory_order_relaxed);
|
||
}
|
||
a->push( bottom, ctx);
|
||
std::atomic_thread_fence( std::memory_order_release);
|
||
bottom_.store( bottom + 1, std::memory_order_relaxed);
|
||
}
|
||
|
||
context * pop() {
|
||
std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
|
||
array * a = array_.load( std::memory_order_relaxed);
|
||
bottom_.store( bottom, std::memory_order_relaxed);
|
||
std::atomic_thread_fence( std::memory_order_seq_cst);
|
||
std::size_t top = top_.load( std::memory_order_relaxed);
|
||
context * ctx = nullptr;
|
||
if ( top <= bottom) {
|
||
// queue is not empty
|
||
ctx = a->pop( bottom);
|
||
BOOST_ASSERT( nullptr != ctx);
|
||
if ( top == bottom) {
|
||
// last element dequeued
|
||
if ( ! top_.compare_exchange_strong( top, top + 1,
|
||
std::memory_order_seq_cst,
|
||
std::memory_order_relaxed) ) {
|
||
// lose the race
|
||
ctx = nullptr;
|
||
}
|
||
bottom_.store( bottom + 1, std::memory_order_relaxed);
|
||
}
|
||
} else {
|
||
// queue is empty
|
||
bottom_.store( bottom + 1, std::memory_order_relaxed);
|
||
}
|
||
return ctx;
|
||
}
|
||
|
||
context * steal() {
|
||
std::size_t top = top_.load( std::memory_order_acquire);
|
||
std::atomic_thread_fence( std::memory_order_seq_cst);
|
||
std::size_t bottom = bottom_.load( std::memory_order_acquire);
|
||
context * ctx = nullptr;
|
||
if ( top < bottom) {
|
||
// queue is not empty
|
||
array * a = array_.load( std::memory_order_consume);
|
||
ctx = a->pop( top);
|
||
BOOST_ASSERT( nullptr != ctx);
|
||
// do not steal pinned context (e.g. main-/dispatcher-context)
|
||
if ( ctx->is_context( type::pinned_context) ) {
|
||
return nullptr;
|
||
}
|
||
if ( ! top_.compare_exchange_strong( top, top + 1,
|
||
std::memory_order_seq_cst,
|
||
std::memory_order_relaxed) ) {
|
||
// lose the race
|
||
return nullptr;
|
||
}
|
||
}
|
||
return ctx;
|
||
}
|
||
};
|
||
|
||
}}}
|
||
|
||
#if BOOST_COMP_CLANG
|
||
#pragma clang diagnostic pop
|
||
#endif
|
||
|
||
#endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
|