xenium
ramalhete_queue.hpp
1//
2// Copyright (c) 2018-2020 Manuel Pöter.
3// Licensed under the MIT License. See LICENSE file in the project root for full license information.
4//
5
6#ifndef XENIUM_RAMALHETE_QUEUE_HPP
7#define XENIUM_RAMALHETE_QUEUE_HPP
8
9#include <xenium/acquire_guard.hpp>
10#include <xenium/backoff.hpp>
11#include <xenium/marked_ptr.hpp>
12#include <xenium/parameter.hpp>
13#include <xenium/policy.hpp>
14#include <xenium/detail/pointer_queue_traits.hpp>
15
16#include <algorithm>
17#include <atomic>
18#include <stdexcept>
19
20#ifdef _MSC_VER
21#pragma warning(push)
22#pragma warning(disable: 4324) // structure was padded due to alignment specifier
23#endif
24
25namespace xenium {
26
27namespace policy {
33 template <unsigned Value>
35}
36
64template <class T, class... Policies>
66private:
67 using traits = detail::pointer_queue_traits_t<T, Policies...>;
68 using raw_value_type = typename traits::raw_type;
69public:
70 using value_type = T;
71 using reclaimer = parameter::type_param_t<policy::reclaimer, parameter::nil, Policies...>;
72 using backoff = parameter::type_param_t<policy::backoff, no_backoff, Policies...>;
73 static constexpr unsigned entries_per_node = parameter::value_param_t<unsigned, policy::entries_per_node, 512, Policies...>::value;
74 static constexpr unsigned pop_retries = parameter::value_param_t<unsigned, policy::pop_retries, 1000, Policies...>::value;;
75
76 static_assert(entries_per_node > 0, "entries_per_node must be greater than zero");
77 static_assert(parameter::is_set<reclaimer>::value, "reclaimer policy must be specified");
78
79 template <class... NewPolicies>
80 using with = ramalhete_queue<T, NewPolicies..., Policies...>;
81
84
92 void push(value_type value);
93
101 [[nodiscard]] bool try_pop(value_type &result);
102
103private:
104 struct node;
105
106 using concurrent_ptr = typename reclaimer::template concurrent_ptr<node, 0>;
107 using marked_ptr = typename concurrent_ptr::marked_ptr;
108 using guard_ptr = typename concurrent_ptr::guard_ptr;
109
110 // TODO - use type from traits
112
113 struct entry {
114 std::atomic<marked_value> value;
115 };
116
117 // TODO - make this configurable via policy.
118 static constexpr unsigned step_size = 11;
119 static constexpr unsigned max_idx = step_size * entries_per_node;
120
121 struct node : reclaimer::template enable_concurrent_ptr<node> {
122 // pop_idx and push_idx are incremented by step_size to avoid false sharing, so the
123 // actual index has to be calculated modulo entries_per_node
124 std::atomic<unsigned> pop_idx;
125 entry entries[entries_per_node];
126 std::atomic<unsigned> push_idx;
127 concurrent_ptr next;
128
129 // Start with the first entry pre-filled
130 node(raw_value_type item) :
131 pop_idx{0},
132 push_idx{step_size},
133 next{nullptr}
134 {
135 entries[0].value.store(item, std::memory_order_relaxed);
136 for (unsigned i = 1; i < entries_per_node; i++)
137 entries[i].value.store(nullptr, std::memory_order_relaxed);
138 }
139
140 ~node() {
141 for (unsigned i = pop_idx; i < push_idx; i += step_size) {
142 traits::delete_value(entries[i % entries_per_node].value.load(std::memory_order_relaxed).get());
143 }
144 }
145 };
146
147 alignas(64) concurrent_ptr head;
148 alignas(64) concurrent_ptr tail;
149};
150
151template <class T, class... Policies>
153{
154 auto n = new node(nullptr);
155 n->push_idx.store(0, std::memory_order_relaxed);
156 head.store(n, std::memory_order_relaxed);
157 tail.store(n, std::memory_order_relaxed);
158}
159
160template <class T, class... Policies>
161ramalhete_queue<T, Policies...>::~ramalhete_queue()
162{
163 // (1) - this acquire-load synchronizes-with the release-CAS (13)
164 auto n = head.load(std::memory_order_acquire);
165 while (n)
166 {
167 // (2) - this acquire-load synchronizes-with the release-CAS (4)
168 auto next = n->next.load(std::memory_order_acquire);
169 delete n.get();
170 n = next;
171 }
172}
173
174template <class T, class... Policies>
176{
177 raw_value_type raw_val = traits::get_raw(value);
178 if (raw_val == nullptr)
179 throw std::invalid_argument("value can not be nullptr");
180
181 backoff backoff;
182 guard_ptr t;
183 for (;;) {
184 // (3) - this acquire-load synchronizes-with the release-CAS (5, 7)
185 t.acquire(tail, std::memory_order_acquire);
186
187 unsigned idx = t->push_idx.fetch_add(step_size, std::memory_order_relaxed);
188 if (idx >= max_idx) {
189 // This node is full
190 if (t != tail.load(std::memory_order_relaxed))
191 continue; // some other thread already added a new node.
192
193 auto next = t->next.load(std::memory_order_relaxed);
194 if (next == nullptr)
195 {
196 node* new_node = new node(raw_val);
197 traits::release(value);
198
199 marked_ptr expected = nullptr;
200 // (4) - this release-CAS synchronizes-with the acquire-load (2, 6, 12)
201 if (t->next.compare_exchange_strong(expected, new_node,
202 std::memory_order_release,
203 std::memory_order_relaxed))
204 {
205 expected = t;
206 // (5) - this release-CAS synchronizes-with the acquire-load (3)
207 tail.compare_exchange_strong(expected, new_node, std::memory_order_release, std::memory_order_relaxed);
208 return;
209 }
210 // prevent the pre-stored value from beeing deleted
211 new_node->push_idx.store(0, std::memory_order_relaxed);
212 // some other node already added a new node
213 delete new_node;
214 } else {
215 // (6) - this acquire-load synchronizes-with the release-CAS (4)
216 next = t->next.load(std::memory_order_acquire);
217 marked_ptr expected = t;
218 // (7) - this release-CAS synchronizes-with the acquire-load (3)
219 tail.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed);
220 }
221 continue;
222 }
223 idx %= entries_per_node;
224
225 marked_value expected = nullptr;
226 // (8) - this release-CAS synchronizes-with the acquire-load (14) and the acquire-exchange (15)
227 if (t->entries[idx].value.compare_exchange_strong(expected, raw_val, std::memory_order_release, std::memory_order_relaxed)) {
228 traits::release(value);
229 return;
230 }
231
232 backoff();
233 }
234}
235
236template <class T, class... Policies>
238{
239 backoff backoff;
240
241 guard_ptr h;
242 for (;;) {
243 // (9) - this acquire-load synchronizes-with the release-CAS (13)
244 h.acquire(head, std::memory_order_acquire);
245
246 // (10) - this acquire-load synchronizes-with the release-fetch-add (11)
247 const auto pop_idx = h->pop_idx.load(std::memory_order_acquire);
248 // This synchronization is necessary to avoid a situation where we see an up-to-date
249 // pop_idx, but an out-of-date push_idx and would (falsly) assume that the queue is empty.
250 const auto push_idx = h->push_idx.load(std::memory_order_relaxed);
251 if (pop_idx >= push_idx &&
252 h->next.load(std::memory_order_relaxed) == nullptr)
253 break;
254
255 // (11) - this release-fetch-add synchronizes with the acquire-load (10)
256 unsigned idx = h->pop_idx.fetch_add(step_size, std::memory_order_release);
257 if (idx >= max_idx) {
258 // This node has been drained, check if there is another one
259 // (12) - this acquire-load synchronizes-with the release-CAS (4)
260 auto next = h->next.load(std::memory_order_acquire);
261 if (next == nullptr)
262 break; // No more nodes in the queue
263
264 marked_ptr expected = h;
265 // (13) - this release-CAS synchronizes-with the acquire-load (1, 9)
266 if (head.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed))
267 h.reclaim(); // The old node has been unlinked -> reclaim it.
268
269 continue;
270 }
271 idx %= entries_per_node;
272
273 auto value = h->entries[idx].value.load(std::memory_order_relaxed);
274 if constexpr(pop_retries > 0) {
275 unsigned cnt = 0;
276 ramalhete_queue::backoff retry_backoff;
277 while (value == nullptr && ++cnt <= pop_retries) {
278 value = h->entries[idx].value.load(std::memory_order_relaxed);
279 retry_backoff(); // TODO - use a backoff type that can be configured separately
280 }
281 }
282
283 if (value != nullptr) {
284 // (14) - this acquire-load synchronizes-with the release-CAS (8)
285 h->entries[idx].value.load(std::memory_order_acquire);
286 traits::store(result, value.get());
287 return true;
288 } else {
289 // (15) - this acquire-exchange synchronizes-with the release-CAS (8)
290 auto value = h->entries[idx].value.exchange(marked_value(nullptr, 1), std::memory_order_acquire);
291 if (value != nullptr) {
292 traits::store(result, value.get());
293 return true;
294 }
295 }
296
297 backoff();
298 }
299
300 return false;
301}
302}
303
304#ifdef _MSC_VER
305#pragma warning(pop)
306#endif
307
308#endif
A pointer with an embedded mark/tag value.
Definition: marked_ptr.hpp:41
A fast unbounded lock-free multi-producer/multi-consumer FIFO queue.
Definition: ramalhete_queue.hpp:65
void push(value_type value)
Pushes the given value to the queue.
Definition: ramalhete_queue.hpp:175
bool try_pop(value_type &result)
Tries to pop an object from the queue.
Definition: ramalhete_queue.hpp:237
Dummy backoff strategy that does nothing.
Definition: backoff.hpp:17
Policy to configure the backoff strategy.
Definition: policy.hpp:39
Policy to configure the number of entries per allocated node in ramalhete_queue.
Definition: policy.hpp:104
Policy to configure the number of iterations to spin on a queue entry while waiting for a pending pus...
Definition: ramalhete_queue.hpp:34
Policy to configure the reclamation scheme to be used.
Definition: policy.hpp:25