libcarla/include/system/boost/fiber/detail/context_spmc_queue.hpp
2024-10-18 13:19:59 +08:00

198 lines
6.7 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
#define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <type_traits>
#include <utility>
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/context.hpp>
// David Chase and Yossi Lev. Dynamic circular work-stealing deque.
// In SPAA 05: Proceedings of the seventeenth annual ACM symposium
// on Parallelism in algorithms and architectures, pages 2128,
// New York, NY, USA, 2005. ACM.
//
// Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013.
// Correct and efficient work-stealing for weak memory models.
// In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice
// of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80.
#if BOOST_COMP_CLANG
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-private-field"
#endif
namespace boost {
namespace fibers {
namespace detail {
class context_spmc_queue {
private:
class array {
private:
typedef std::atomic< context * > atomic_type;
typedef atomic_type storage_type;
std::size_t capacity_;
storage_type * storage_;
public:
array( std::size_t capacity) :
capacity_{ capacity },
storage_{ new storage_type[capacity_] } {
for ( std::size_t i = 0; i < capacity_; ++i) {
::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
}
}
~array() {
for ( std::size_t i = 0; i < capacity_; ++i) {
reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
}
delete [] storage_;
}
std::size_t capacity() const noexcept {
return capacity_;
}
void push( std::size_t bottom, context * ctx) noexcept {
reinterpret_cast< atomic_type * >(
std::addressof( storage_[bottom % capacity_]) )
->store( ctx, std::memory_order_relaxed);
}
context * pop( std::size_t top) noexcept {
return reinterpret_cast< atomic_type * >(
std::addressof( storage_[top % capacity_]) )
->load( std::memory_order_relaxed);
}
array * resize( std::size_t bottom, std::size_t top) {
std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } };
for ( std::size_t i = top; i != bottom; ++i) {
tmp->push( i, pop( i) );
}
return tmp.release();
}
};
std::atomic< std::size_t > top_{ 0 };
std::atomic< std::size_t > bottom_{ 0 };
std::atomic< array * > array_;
std::vector< array * > old_arrays_{};
char padding_[cacheline_length];
public:
context_spmc_queue( std::size_t capacity = 4096) :
array_{ new array{ capacity } } {
old_arrays_.reserve( 32);
}
~context_spmc_queue() {
for ( array * a : old_arrays_) {
delete a;
}
delete array_.load();
}
context_spmc_queue( context_spmc_queue const&) = delete;
context_spmc_queue & operator=( context_spmc_queue const&) = delete;
bool empty() const noexcept {
std::size_t bottom = bottom_.load( std::memory_order_relaxed);
std::size_t top = top_.load( std::memory_order_relaxed);
return bottom <= top;
}
void push( context * ctx) {
std::size_t bottom = bottom_.load( std::memory_order_relaxed);
std::size_t top = top_.load( std::memory_order_acquire);
array * a = array_.load( std::memory_order_relaxed);
if ( (a->capacity() - 1) < (bottom - top) ) {
// queue is full
// resize
array * tmp = a->resize( bottom, top);
old_arrays_.push_back( a);
std::swap( a, tmp);
array_.store( a, std::memory_order_relaxed);
}
a->push( bottom, ctx);
std::atomic_thread_fence( std::memory_order_release);
bottom_.store( bottom + 1, std::memory_order_relaxed);
}
context * pop() {
std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
array * a = array_.load( std::memory_order_relaxed);
bottom_.store( bottom, std::memory_order_relaxed);
std::atomic_thread_fence( std::memory_order_seq_cst);
std::size_t top = top_.load( std::memory_order_relaxed);
context * ctx = nullptr;
if ( top <= bottom) {
// queue is not empty
ctx = a->pop( bottom);
BOOST_ASSERT( nullptr != ctx);
if ( top == bottom) {
// last element dequeued
if ( ! top_.compare_exchange_strong( top, top + 1,
std::memory_order_seq_cst,
std::memory_order_relaxed) ) {
// lose the race
ctx = nullptr;
}
bottom_.store( bottom + 1, std::memory_order_relaxed);
}
} else {
// queue is empty
bottom_.store( bottom + 1, std::memory_order_relaxed);
}
return ctx;
}
context * steal() {
std::size_t top = top_.load( std::memory_order_acquire);
std::atomic_thread_fence( std::memory_order_seq_cst);
std::size_t bottom = bottom_.load( std::memory_order_acquire);
context * ctx = nullptr;
if ( top < bottom) {
// queue is not empty
array * a = array_.load( std::memory_order_consume);
ctx = a->pop( top);
BOOST_ASSERT( nullptr != ctx);
// do not steal pinned context (e.g. main-/dispatcher-context)
if ( ctx->is_context( type::pinned_context) ) {
return nullptr;
}
if ( ! top_.compare_exchange_strong( top, top + 1,
std::memory_order_seq_cst,
std::memory_order_relaxed) ) {
// lose the race
return nullptr;
}
}
return ctx;
}
};
}}}
#if BOOST_COMP_CLANG
#pragma clang diagnostic pop
#endif
#endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H