SeqAn3 3.3.0
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
suspendable_queue.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2023, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2023, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
14#pragma once
15
16#include <algorithm>
17#include <cassert>
18#include <condition_variable>
19#include <iterator>
20#include <mutex>
21#include <ranges>
22#include <span>
23#include <thread>
24#include <vector>
25
27
28namespace seqan3::contrib
29{
30
31// ============================================================================
32// Forwards
33// ============================================================================
34
35// ============================================================================
36// Classes
37// ============================================================================
38
39// ----------------------------------------------------------------------------
40// Class ConcurrentQueue
41// ----------------------------------------------------------------------------
42
43template <typename TSpec = void>
44struct Tag
45{};
46
47template <typename TSpec = Tag<void>>
48struct Suspendable;
49
50template <typename TValue, typename TSpec = Suspendable<>>
51class ConcurrentQueue;
52
53struct Limit_;
54using Limit = Tag<Limit_>;
55
56template <typename TValue, typename TSpec>
57class ConcurrentQueue<TValue, Suspendable<TSpec>>
58{
59public:
60 typedef std::vector<TValue> TString;
61 typedef typename TString::size_type TSize;
62
63 size_t readerCount;
64 size_t writerCount;
65
66 TString data;
67 TSize occupied;
68 TSize back;
69 TSize front;
70
71 std::mutex cs;
73
74 bool virgin;
75
76 ConcurrentQueue() : readerCount(0), writerCount(0), occupied(0), back(0), front(0), virgin(true)
77 {}
78
79 ~ConcurrentQueue()
80 {
81 assert(writerCount == 0u);
82
83 // wait for all pending readers to finish
84 while (readerCount != 0u)
85 {}
86 }
87};
88
89template <typename TValue>
90class ConcurrentQueue<TValue, Suspendable<Limit>> : public ConcurrentQueue<TValue, Suspendable<>>
91{
92public:
93 typedef ConcurrentQueue<TValue, Suspendable<>> TBase;
94 typedef typename TBase::TString TString;
95 typedef typename TBase::TSize TSize;
96
98
99 ConcurrentQueue(TSize maxSize) : TBase()
100 {
101 this->data.resize(maxSize);
102 // reserve(this->data, maxSize, Exact());
103 // _setLength(this->data, maxSize);
104 }
105
106 ConcurrentQueue(ConcurrentQueue const & other) : TBase((TBase const &)other)
107 {}
108};
109
110template <typename TValue, typename TSpec>
111inline void lockReading(ConcurrentQueue<TValue, Suspendable<TSpec>> &)
112{}
113
114template <typename TValue, typename TSpec>
115inline void unlockReading(ConcurrentQueue<TValue, Suspendable<TSpec>> & me)
116{
117 {
119 if (--me.readerCount != 0u)
120 return;
121 }
122 me.less.notify_all(); // publish the condition that reader count is 0.
123}
124
125template <typename TValue, typename TSpec>
126inline void lockWriting(ConcurrentQueue<TValue, Suspendable<TSpec>> &)
127{}
128
129template <typename TValue, typename TSpec>
130inline void unlockWriting(ConcurrentQueue<TValue, Suspendable<TSpec>> & me)
131{
132 {
134 if (--me.writerCount != 0u)
135 return;
136 }
137 me.more.notify_all(); // publish the condition, that writer count is 0.
138}
139
140template <typename TValue, typename TSize, typename TSpec>
141inline void setReaderCount(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TSize readerCount)
142{
144 me.readerCount = readerCount;
145}
146
147template <typename TValue, typename TSize, typename TSpec>
148inline void setWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TSize writerCount)
149{
151 me.writerCount = writerCount;
152}
153
154template <typename TValue, typename TSize1, typename TSize2, typename TSpec>
155inline void
156setReaderWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TSize1 readerCount, TSize2 writerCount)
157{
159 me.readerCount = readerCount;
160 me.writerCount = writerCount;
161}
162
163template <typename TValue, typename TSize, typename TSpec>
164inline bool waitForMinSize(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TSize minSize)
165{
167 while (me.occupied < minSize && me.writerCount > 0u)
168 me.more.wait(lock);
169 return me.occupied >= minSize;
170}
171
172template <typename TValue, typename TSpec>
173inline bool empty(ConcurrentQueue<TValue, Suspendable<TSpec>> const & me)
174{
175 return me.occupied == 0;
176}
177
178template <typename TValue, typename TSpec>
179inline typename ConcurrentQueue<TValue, Suspendable<TSpec>>::SizeType
180length(ConcurrentQueue<TValue, Suspendable<TSpec>> const & me)
181{
182 return me.occupied;
183}
184
185template <typename TValue, typename TSpec>
186inline bool
187_popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec>> & me, std::unique_lock<std::mutex> & lk)
188{
189 typedef ConcurrentQueue<TValue, Suspendable<TSpec>> TQueue;
190 typedef typename TQueue::TString TString;
191 typedef typename TString::size_type TSize;
192
193 TSize cap = me.data.size();
194
195 while (me.occupied == 0u && me.writerCount > 0u)
196 me.more.wait(lk);
197
198 if (me.occupied == 0u)
199 return false;
200
201 assert(me.occupied > 0u);
202
203 // extract value and destruct it in the data string
204 // TIter it = me.data.begin() + me.front;
205 result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.front));
206 // std::swap(result, *it);
207 // valueDestruct(it);
208
209 me.front = (me.front + 1) % cap;
210 me.occupied--;
211
212 /* now: either me.occupied > 0 and me.nextout is the index
213 of the next occupied slot in the buffer, or
214 me.occupied == 0 and me.nextout is the index of the next
215 (empty) slot that will be filled by a producer (such as
216 me.nextout == me.nextin) */
217
218 return true;
219}
220
221template <typename TValue, typename TSpec>
222inline bool
223_popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec>> & me, std::unique_lock<std::mutex> & lk)
224{
225 typedef ConcurrentQueue<TValue, Suspendable<TSpec>> TQueue;
226 typedef typename TQueue::TString TString;
227 typedef typename TString::size_type TSize;
228
229 TSize cap = me.data.size();
230
231 while (me.occupied == 0u && me.writerCount > 0u)
232 me.more.wait(lk);
233
234 if (me.occupied == 0u)
235 return false;
236
237 assert(me.occupied > 0u);
238
239 me.back = (me.back + cap - 1) % cap;
240
241 // extract value and destruct it in the data string
242 // TIter it = me.data.begin() + me.back;
243 result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.back));
244 // std::swap(result, *it);
245 // valueDestruct(it);
246
247 me.occupied--;
248
249 /* now: either me.occupied > 0 and me.nextout is the index
250 of the next occupied slot in the buffer, or
251 me.occupied == 0 and me.nextout is the index of the next
252 (empty) slot that will be filled by a producer (such as
253 me.nextout == me.nextin) */
254
255 return true;
256}
257
258template <typename TValue, typename TSpec>
259inline bool popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec>> & me)
260{
262 return _popFront(result, me, lock);
263}
264
265template <typename TValue>
266inline bool popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit>> & me)
267{
268 {
270 if (!_popFront(result, me, lk))
271 return false;
272 }
273 me.less.notify_all();
274 return true;
275}
276
277template <typename TValue, typename TSpec>
278inline bool popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec>> & me)
279{
281 return _popBack(result, me, lk);
282}
283
284template <typename TValue>
285inline bool popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit>> & me)
286{
287 {
289 if (!_popBack(result, me, lk))
290 return false;
291 }
292 me.less.notify_all();
293 return true;
294}
295
296template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
297inline bool
298appendValue(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TValue2 && val, [[maybe_unused]] Tag<TExpand> expandTag)
299{
300 typedef ConcurrentQueue<TValue, Suspendable<TSpec>> TQueue;
301 typedef typename TQueue::TString TString;
302 typedef typename TString::size_type TSize;
303
304 {
306 TSize cap = me.data.size();
307
308 if (me.occupied >= cap)
309 {
310 // increase capacity
311 // _setLength(me.data, cap);
312 // reserve(me.data, cap + 1, expandTag);
313 me.data.resize(cap + 1);
314 TSize delta = me.data.size() - cap;
315 assert(delta == 1);
316
317 // create a gap of delta many values between tail and head
318 // Why?
319 // _clearSpace(me.data, delta, me.back, me.back, expandTag);
320 std::ranges::move_backward(std::span{me.data.data() + me.front, me.data.data() + cap},
321 me.data.data() + me.data.size());
322 if (me.occupied != 0 && me.back <= me.front)
323 me.front += delta;
324
325 cap += delta;
326 }
327
328 // valueConstruct(begin(me.data, Standard()) + me.back, val);
329 *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
330 me.back = (me.back + 1) % cap;
331
332 ++me.occupied;
333 }
334
335 /* now: either me.occupied < BSIZE and me.nextin is the index
336 of the next empty slot in the buffer, or
337 me.occupied == BSIZE and me.nextin is the index of the
338 next (occupied) slot that will be emptied by a consumer
339 (such as me.nextin == me.nextout) */
340 me.more.notify_all();
341 return true;
342}
343
344template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
345inline bool appendValue(ConcurrentQueue<TValue, Suspendable<Limit>> & me, TValue2 && val, Tag<TExpand> expandTag);
346
347template <typename TValue, typename TValue2>
348inline bool appendValue(ConcurrentQueue<TValue, Suspendable<Limit>> & me, TValue2 && val, Limit)
349{
350 typedef ConcurrentQueue<TValue, Suspendable<Limit>> TQueue;
351 typedef typename TQueue::TString TString;
352 typedef typename TString::size_type TSize;
353
354 {
356 TSize cap = me.data.size();
357
358 while (me.occupied >= cap && me.readerCount > 0u)
359 me.less.wait(lock);
360
361 if (me.occupied >= cap)
362 return false;
363
364 assert(me.occupied < cap);
365
366 // valueConstruct(begin(me.data, Standard()) + me.back, val);
367 *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
368 me.back = (me.back + 1) % cap;
369 me.occupied++;
370 }
371
372 /* now: either me.occupied < BSIZE and me.nextin is the index
373 of the next empty slot in the buffer, or
374 me.occupied == BSIZE and me.nextin is the index of the
375 next (occupied) slot that will be emptied by a consumer
376 (such as me.nextin == me.nextout) */
377 me.more.notify_all();
378 return true;
379}
380
381template <typename TValue, typename TValue2, typename TSpec>
382inline bool appendValue(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TValue2 && val)
383{
384 return appendValue(me, std::forward<TValue2>(val), TSpec{});
385}
386
387} // namespace seqan3::contrib
T data(T... args)
T empty(T... args)
typename decltype(detail::front(list_t{}))::type front
Return the first type from the type list.
Definition type_list/traits.hpp:296
typename decltype(detail::back(list_t{}))::type back
Return the last type from the type list.
Definition type_list/traits.hpp:316
T lock(T... args)
Provides platform and dependency checks.