44template <
typename T,
typename Allocator = std::allocator<T>>
class SPSCQueue {
46#if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t)
47 template <
typename Alloc2,
typename =
void>
struct has_allocate_at_least : std::false_type {};
49 template <
typename Alloc2>
50 struct has_allocate_at_least<Alloc2,
51 std::void_t<typename Alloc2::value_type, decltype(std::declval<Alloc2&>().allocate_at_least(size_t{}))>>
56 explicit SPSCQueue(
const size_t capacity,
const Allocator& allocator = Allocator())
58 , allocator_(allocator) {
65 if(capacity_ > SIZE_MAX - 2 * kPadding) {
66 capacity_ = SIZE_MAX - 2 * kPadding;
69#if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t)
70 if constexpr(has_allocate_at_least<Allocator>::value) {
71 auto res = allocator_.allocate_at_least(capacity_ + 2 * kPadding);
73 capacity_ = res.count - 2 * kPadding;
75 slots_ = std::allocator_traits<Allocator>::allocate(allocator_, capacity_ + 2 * kPadding);
78 slots_ = std::allocator_traits<Allocator>::allocate(allocator_, capacity_ + 2 * kPadding);
81 static_assert(
alignof(SPSCQueue<T>) == kCacheLineSize,
"");
82 static_assert(
sizeof(SPSCQueue<T>) >= 3 * kCacheLineSize,
"");
83 assert(
reinterpret_cast<char*
>(&readIdx_) -
reinterpret_cast<char*
>(&writeIdx_) >=
static_cast<std::ptrdiff_t
>(kCacheLineSize));
90 std::allocator_traits<Allocator>::deallocate(allocator_, slots_, capacity_ + 2 * kPadding);
94 SPSCQueue(
const SPSCQueue&) =
delete;
95 SPSCQueue& operator=(
const SPSCQueue&) =
delete;
97 template <
typename... Args>
void emplace(Args&&... args)
noexcept(std::is_nothrow_constructible<T, Args&&...>::value) {
98 static_assert(std::is_constructible<T, Args&&...>::value,
"T must be constructible with Args&&...");
99 auto const writeIdx = writeIdx_.load(std::memory_order_relaxed);
100 auto nextWriteIdx = writeIdx + 1;
101 if(nextWriteIdx == capacity_) {
104 while(nextWriteIdx == readIdxCache_) {
105 readIdxCache_ = readIdx_.load(std::memory_order_acquire);
107 new(&slots_[writeIdx + kPadding]) T(std::forward<Args>(args)...);
108 writeIdx_.store(nextWriteIdx, std::memory_order_release);
111 template <
typename... Args>
112 RIGTORP_NODISCARD
bool try_emplace(Args&&... args)
noexcept(std::is_nothrow_constructible<T, Args&&...>::value) {
113 static_assert(std::is_constructible<T, Args&&...>::value,
"T must be constructible with Args&&...");
114 auto const writeIdx = writeIdx_.load(std::memory_order_relaxed);
115 auto nextWriteIdx = writeIdx + 1;
116 if(nextWriteIdx == capacity_) {
119 if(nextWriteIdx == readIdxCache_) {
120 readIdxCache_ = readIdx_.load(std::memory_order_acquire);
121 if(nextWriteIdx == readIdxCache_) {
125 new(&slots_[writeIdx + kPadding]) T(std::forward<Args>(args)...);
126 writeIdx_.store(nextWriteIdx, std::memory_order_release);
130 void push(
const T& v)
noexcept(std::is_nothrow_copy_constructible<T>::value) {
131 static_assert(std::is_copy_constructible<T>::value,
"T must be copy constructible");
135 template <typename P, typename = typename std::enable_if<std::is_constructible<T, P&&>::value>::type>
136 void push(P&& v)
noexcept(std::is_nothrow_constructible<T, P&&>::value) {
137 emplace(std::forward<P>(v));
140 RIGTORP_NODISCARD
bool try_push(
const T& v)
noexcept(std::is_nothrow_copy_constructible<T>::value) {
141 static_assert(std::is_copy_constructible<T>::value,
"T must be copy constructible");
142 return try_emplace(v);
145 template <typename P, typename = typename std::enable_if<std::is_constructible<T, P&&>::value>::type>
146 RIGTORP_NODISCARD
bool try_push(P&& v)
noexcept(std::is_nothrow_constructible<T, P&&>::value) {
147 return try_emplace(std::forward<P>(v));
150 RIGTORP_NODISCARD T* front()
noexcept {
151 auto const readIdx = readIdx_.load(std::memory_order_relaxed);
152 if(readIdx == writeIdxCache_) {
153 writeIdxCache_ = writeIdx_.load(std::memory_order_acquire);
154 if(writeIdxCache_ == readIdx) {
158 return &slots_[readIdx + kPadding];
161 void pop()
noexcept {
162 static_assert(std::is_nothrow_destructible<T>::value,
"T must be nothrow destructible");
163 auto const readIdx = readIdx_.load(std::memory_order_relaxed);
164 assert(writeIdx_.load(std::memory_order_acquire) != readIdx &&
"Can only call pop() after front() has returned a non-nullptr");
165 slots_[readIdx + kPadding].~T();
166 auto nextReadIdx = readIdx + 1;
167 if(nextReadIdx == capacity_) {
170 readIdx_.store(nextReadIdx, std::memory_order_release);
173 RIGTORP_NODISCARD
size_t size()
const noexcept {
174 std::ptrdiff_t diff = writeIdx_.load(std::memory_order_acquire) - readIdx_.load(std::memory_order_acquire);
178 return static_cast<size_t>(diff);
181 RIGTORP_NODISCARD
bool empty()
const noexcept {
182 return writeIdx_.load(std::memory_order_acquire) == readIdx_.load(std::memory_order_acquire);
185 RIGTORP_NODISCARD
size_t capacity()
const noexcept {
return capacity_ - 1; }
188#ifdef __cpp_lib_hardware_interference_size
189 static constexpr size_t kCacheLineSize = std::hardware_destructive_interference_size;
191 static constexpr size_t kCacheLineSize = 64;
195 static constexpr size_t kPadding = (kCacheLineSize - 1) /
sizeof(T) + 1;
200#if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address)
201 Allocator allocator_ [[no_unique_address]];
203 Allocator allocator_;
209 alignas(kCacheLineSize) std::atomic<size_t> writeIdx_ = {0};
210 alignas(kCacheLineSize)
size_t readIdxCache_ = 0;
211 alignas(kCacheLineSize) std::atomic<size_t> readIdx_ = {0};
212 alignas(kCacheLineSize)
size_t writeIdxCache_ = 0;