6#ifndef XENIUM_VYUKOV_BOUNDED_QUEUE_HPP
7#define XENIUM_VYUKOV_BOUNDED_QUEUE_HPP
9#include <xenium/utils.hpp>
10#include <xenium/parameter.hpp>
19#pragma warning(disable: 4324)
58template<
class T,
class... Policies>
63 static constexpr bool default_to_weak =
71 cells(new cell[size]),
74 assert(size >= 2 && utils::is_power_of_two(size));
75 for (std::size_t i = 0; i < size; ++i)
76 cells[i].sequence.store(i, std::memory_order_relaxed);
77 enqueue_pos.store(0, std::memory_order_relaxed);
78 dequeue_pos.store(0, std::memory_order_relaxed);
99 template <
class... Args>
101 return do_try_push<default_to_weak>(std::forward<Args>(args)...);
121 template <
class... Args>
123 return do_try_push<false>(std::forward<Args>(args)...);
143 template <
class... Args>
145 return do_try_push<true>(std::forward<Args>(args)...);
160 return do_try_pop<default_to_weak>(result);
175 return do_try_pop<false>(result);
190 return do_try_pop<true>(result);
194 template <
bool Weak,
class... Args>
195 bool do_try_push(Args&&... args) {
197 std::size_t pos = enqueue_pos.load(std::memory_order_relaxed);
199 c = &cells[pos & index_mask];
201 std::size_t seq = c->sequence.load(std::memory_order_acquire);
203 if (enqueue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
210 pos = enqueue_pos.load(std::memory_order_relaxed);
212 auto pos2 = enqueue_pos.load(std::memory_order_relaxed);
213 if (pos2 == pos && dequeue_pos.load(std::memory_order_relaxed) + index_mask + 1 == pos)
219 assign_value(c->value, std::forward<Args>(args)...);
221 c->sequence.store(pos + 1, std::memory_order_release);
226 bool do_try_pop(T& result) {
228 std::size_t pos = dequeue_pos.load(std::memory_order_relaxed);
230 c = &cells[pos & index_mask];
232 std::size_t seq = c->sequence.load(std::memory_order_acquire);
233 auto new_pos = pos + 1;
234 if (seq == new_pos) {
235 if (dequeue_pos.compare_exchange_weak(pos, new_pos, std::memory_order_relaxed))
241 pos = dequeue_pos.load(std::memory_order_relaxed);
243 auto pos2 = dequeue_pos.load(std::memory_order_relaxed);
244 if (pos2 == pos && enqueue_pos.load(std::memory_order_relaxed) == pos)
250 result = std::move(c->value);
252 c->sequence.store(pos + index_mask + 1, std::memory_order_release);
256 void assign_value(T& v,
const T& source) { v = source; }
257 void assign_value(T& v, T&& source) { v = std::move(source); }
258 template <
class... Args>
259 void assign_value(T& v, Args&&... args) { v = T{std::forward<Args>(args)...}; }
263 std::atomic<std::size_t> sequence;
267 std::unique_ptr<cell[]> cells;
268 const std::size_t index_mask;
269 alignas(64) std::atomic<size_t> enqueue_pos;
270 alignas(64) std::atomic<size_t> dequeue_pos;
Policy to configure whether try_push/try_pop in vyukov_bounded_queue should default to try_push_weak/...
Definition: vyukov_bounded_queue.hpp:31
A bounded generic multi-producer/multi-consumer FIFO queue.
Definition: vyukov_bounded_queue.hpp:59
bool try_push_strong(Args &&... args)
Tries to push a new element to the queue.
Definition: vyukov_bounded_queue.hpp:122
bool try_pop_strong(T &result)
Tries to pop an element from the queue as long as the queue is not empty.
Definition: vyukov_bounded_queue.hpp:174
vyukov_bounded_queue(std::size_t size)
Constructs a new instance with the specified maximum size.
Definition: vyukov_bounded_queue.hpp:70
bool try_pop(T &result)
Tries to pop an element from the queue.
Definition: vyukov_bounded_queue.hpp:159
bool try_pop_weak(T &result)
Tries to pop an element from the queue.
Definition: vyukov_bounded_queue.hpp:189
bool try_push(Args &&... args)
Tries to push a new element to the queue.
Definition: vyukov_bounded_queue.hpp:100
bool try_push_weak(Args &&... args)
Tries to push a new element to the queue.
Definition: vyukov_bounded_queue.hpp:144