SeqAn3 3.3.0
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
serialised_resource_pool.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2023, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2023, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
14#pragma once
15
16#include <mutex>
17
19
20namespace seqan3::contrib
21{
22
23// ============================================================================
24// Classes
25// ============================================================================
26
27template <typename TValue>
28struct ResourcePool
29{
30 typedef ConcurrentQueue<TValue *, Suspendable<>> TStack;
31 typedef typename TStack::TSize TSize;
32
33 TStack recycled;
34
35 ResourcePool(TSize maxSize)
36 {
37 setWriterCount(recycled, 1);
38 for (; maxSize != 0; --maxSize)
39 appendValue(recycled, (TValue *)NULL);
40 }
41
42 ~ResourcePool()
43 {
44 unlockWriting(recycled);
45 TValue * ptr = NULL;
46 unsigned count = 0;
47 while (popBack(ptr, recycled))
48 {
49 if (ptr != NULL)
50 count++;
51 delete ptr;
52 }
53 }
54};
55
56// ----------------------------------------------------------------------------
57// Struct SerializerItem
58// ----------------------------------------------------------------------------
59
60template <typename TValue>
61struct SerializerItem
62{
63 TValue val;
64 SerializerItem * next;
65 bool ready;
66};
67
68// ----------------------------------------------------------------------------
69// Class Serializer
70// ----------------------------------------------------------------------------
71
72template <typename TValue, typename TWorker>
73class Serializer
74{
75public:
76 typedef SerializerItem<TValue> TItem;
77 typedef TItem * TItemPtr;
78 typedef ResourcePool<TItem> TPool;
79 typedef size_t TSize;
80
81 std::mutex cs;
82 TWorker worker;
83 TItemPtr first;
84 TItemPtr last;
85 TPool pool;
86 bool stop;
87
88 Serializer() : first(NULL), last(NULL), stop(false)
89 {}
90
91 template <typename TArg>
92 explicit Serializer(TArg & arg, TSize maxItems = 1024) :
93 worker(arg),
94 first(NULL),
95 last(NULL),
96 pool(maxItems),
97 stop(false)
98 {}
99
100 ~Serializer()
101 {
102 while (first != NULL)
103 {
104 TItemPtr item = first;
105 first = first->next;
106 delete item;
107 }
108 }
109
110 operator bool()
111 {
112 return !stop;
113 }
114};
115
116// ============================================================================
117// Functions
118// ============================================================================
119
120// ----------------------------------------------------------------------------
121// Function aquireValue()
122// ----------------------------------------------------------------------------
123
124template <typename TValue>
125inline TValue * aquireValue(ResourcePool<TValue> & me)
126{
127 TValue * ptr = NULL;
128 if (!popBack(ptr, me.recycled))
129 return NULL;
130
131 if (ptr == NULL)
132 ptr = new TValue;
133
134 return ptr;
135}
136
137// ----------------------------------------------------------------------------
138// Function releaseValue()
139// ----------------------------------------------------------------------------
140
141template <typename TValue>
142inline void releaseValue(ResourcePool<TValue> & me, TValue * ptr)
143{
144 appendValue(me.recycled, ptr);
145}
146
147// ----------------------------------------------------------------------------
148// Function clear()
149// ----------------------------------------------------------------------------
150
151template <typename TValue, typename TWorker>
152inline void clear(Serializer<TValue, TWorker> & me)
153{
154 me.stop = false;
155 while (me.first != NULL)
156 {
157 TValue * item = me.first;
158 me.first = me.first->next;
159 releaseValue(me.recycled, item);
160 }
161 me.last = NULL;
162}
163
164// ----------------------------------------------------------------------------
165// Function aquireValue()
166// ----------------------------------------------------------------------------
167
168// this function is not thread-safe as it would make
169// not much sense to order a stream by the random
170// order of executition behind a mutex
171template <typename TValue, typename TWorker>
172inline TValue * aquireValue(Serializer<TValue, TWorker> & me)
173{
174 typedef SerializerItem<TValue> TItem;
175
176 TItem * item = aquireValue(me.pool);
177 item->next = NULL;
178 item->ready = false;
179
180 // add item to the end of our linked list
181 {
183 if (me.first == NULL)
184 me.first = item;
185 else
186 me.last->next = item;
187 me.last = item;
188 }
189 return &item->val;
190}
191
192// ----------------------------------------------------------------------------
193// Function releaseValue()
194// ----------------------------------------------------------------------------
195
196template <typename TValue, typename TWorker>
197inline bool releaseValue(Serializer<TValue, TWorker> & me, TValue * ptr)
198{
199 typedef SerializerItem<TValue> TItem;
200
201 TItem * item = reinterpret_cast<TItem *>(ptr);
202 assert(!item->ready);
203
204 // changing me.first or the ready flag must be done synchronized (me.mutex)
205 // the thread who changed me.first->ready to be true has to write it.
206
207 // change our ready flag and test if me.first->ready became true
208 {
210 item->ready = true;
211 if (item != me.first)
212 return true;
213 }
214
215 // ok, if we are here it seems that we are responsible for writing the buffer
216
217 assert(me.first != NULL);
218
219 bool success;
220 do
221 {
222 // process item
223 success = me.worker(item->val);
224
225 // remove item from linked list
226 {
228 me.first = item->next;
229
230 // recycle released items
231 releaseValue(me.pool, item);
232
233 // can we leave?
234 item = me.first;
235 if (item == NULL || !item->ready)
236 return success;
237 }
238
239 // we continue to process the next buffer
240 }
241 while (success);
242
243 return false;
244}
245
246} // namespace seqan3::contrib
constexpr ptrdiff_t count
Count the occurrences of a type in a pack.
Definition type_pack/traits.hpp:164
T lock(T... args)
T next(T... args)
Provides seqan suspendable queue.