Eclipse SUMO - Simulation of Urban MObility
MFXWorkerThread.h
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3// Copyright (C) 2004-2022 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
18// A thread class together with a pool and a task for parallelized computation
19/****************************************************************************/
20
21#ifndef MFXWorkerThread_h
22#define MFXWorkerThread_h
23
24// #define WORKLOAD_PROFILING
25// at which interval report maximum workload of the threads, needs WORKLOAD_PROFILING
26// undefine to use summary report only
27#define WORKLOAD_INTERVAL 100
28#include <config.h>
29
30#include <list>
31#include <vector>
32#include "fxheader.h"
33#ifdef WORKLOAD_PROFILING
34#include <chrono>
37#endif
39
40
41// ===========================================================================
42// class definitions
43// ===========================================================================
48class MFXWorkerThread : public FXThread {
49
50public:
55 class Task {
56 public:
58 virtual ~Task() {};
59
68 virtual void run(MFXWorkerThread* context) = 0;
69
76 void setIndex(const int newIndex) {
77 myIndex = newIndex;
78 }
79 private:
82 };
83
88 class Pool {
89 public:
96 Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr)
97#ifdef WORKLOAD_PROFILING
98 , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.)
99#endif
100 {
101#ifdef WORKLOAD_PROFILING
102 long long int timeDiff = 0;
103 for (int i = 0; i < 100; i++) {
104 const auto begin = std::chrono::high_resolution_clock::now();
105 const auto end = std::chrono::high_resolution_clock::now();
106 timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count();
107 }
108 //std::cout << ("Average cost of a timing call (in ns): " + toString(timeDiff / 100.)) << std::endl;
109#endif
110 while (numThreads > 0) {
111 new MFXWorkerThread(*this);
112 numThreads--;
113 }
114 }
115
120 virtual ~Pool() {
121 clear();
122 }
123
126 void clear() {
127 for (MFXWorkerThread* const worker : myWorkers) {
128 delete worker;
129 }
130 myWorkers.clear();
131 }
132
137 void addWorker(MFXWorkerThread* const w) {
138 myWorkers.push_back(w);
139 }
140
147 void add(Task* const t, int index = -1) {
148 if (index < 0) {
149 index = myRunningIndex % myWorkers.size();
150 }
151#ifdef WORKLOAD_PROFILING
152 if (myRunningIndex == 0) {
153 for (MFXWorkerThread* const worker : myWorkers) {
154 worker->startProfile();
155 }
156 myProfileStart = std::chrono::high_resolution_clock::now();
157 }
158#endif
160 myWorkers[index]->add(t);
161 }
162
169 void addFinished(std::list<Task*>& tasks) {
170 myMutex.lock();
171 myFinishedTasks.splice(myFinishedTasks.end(), tasks);
172 myCondition.signal();
173 myMutex.unlock();
174 }
175
177 myMutex.lock();
178 if (myException == nullptr) {
179 myException = new ProcessError(e);
180 }
181 myMutex.unlock();
182 }
183
185 void waitAll(const bool deleteFinished = true) {
186 myMutex.lock();
187 while ((int)myFinishedTasks.size() < myRunningIndex) {
188 myCondition.wait(myMutex);
189 }
190#ifdef WORKLOAD_PROFILING
191 if (myRunningIndex > 0) {
192 const auto end = std::chrono::high_resolution_clock::now();
193 const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count();
194 double minLoad = std::numeric_limits<double>::max();
195 double maxLoad = 0.;
196 for (MFXWorkerThread* const worker : myWorkers) {
197 const double load = worker->endProfile(elapsed);
198 minLoad = MIN2(minLoad, load);
199 maxLoad = MAX2(maxLoad, load);
200 }
201#ifdef WORKLOAD_INTERVAL
202 myTotalMaxLoad += maxLoad;
203 myTotalSpread += maxLoad / minLoad;
204 myNumBatches++;
205 if (myNumBatches % WORKLOAD_INTERVAL == 0) {
206 WRITE_MESSAGE(toString(myFinishedTasks.size()) + " tasks, average maximum load: " + toString(myTotalMaxLoad / WORKLOAD_INTERVAL) + ", average spread: " + toString(myTotalSpread / WORKLOAD_INTERVAL));
207 myTotalMaxLoad = 0.;
208 myTotalSpread = 0.;
209 }
210#endif
211 }
212#endif
213 if (deleteFinished) {
214 for (Task* task : myFinishedTasks) {
215 delete task;
216 }
217 }
218 ProcessError* toRaise = myException;
219 myException = nullptr;
220 myFinishedTasks.clear();
221 myRunningIndex = 0;
222 myMutex.unlock();
223 if (toRaise != nullptr) {
224 throw* toRaise;
225 }
226 }
227
235 bool isFull() const {
236 return myRunningIndex - (int)myFinishedTasks.size() >= size();
237 }
238
243 int size() const {
244 return (int)myWorkers.size();
245 }
246
248 void lock() {
249 myPoolMutex.lock();
250 }
251
253 void unlock() {
254 myPoolMutex.unlock();
255 }
256
257 const std::vector<MFXWorkerThread*>& getWorkers() {
258 return myWorkers;
259 }
260 private:
262 std::vector<MFXWorkerThread*> myWorkers;
264 FXMutex myMutex;
266 FXMutex myPoolMutex;
268 FXCondition myCondition;
270 std::list<Task*> myFinishedTasks;
275#ifdef WORKLOAD_PROFILING
277 int myNumBatches;
279 double myTotalMaxLoad;
281 double myTotalSpread;
283 std::chrono::high_resolution_clock::time_point myProfileStart;
284#endif
285 };
286
287public:
294 MFXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false)
295#ifdef WORKLOAD_PROFILING
296 , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0)
297#endif
298 {
299 pool.addWorker(this);
300 start();
301 }
302
308 stop();
309#ifdef WORKLOAD_PROFILING
310 const double load = 100. * myTotalBusyTime / myTotalTime;
311 WRITE_MESSAGE("Thread " + toString((long long int)this) + " ran " + toString(myCounter) +
312 " tasks and had a load of " + toString(load) + "% (" + toString(myTotalBusyTime) +
313 "us / " + toString(myTotalTime) + "us), " + toString(myTotalBusyTime / (double)myCounter) + " per task.");
314#endif
315 }
316
321 void add(Task* t) {
322 myMutex.lock();
323 myTasks.push_back(t);
324 myCondition.signal();
325 myMutex.unlock();
326 }
327
334 FXint run() {
335 while (!myStopped) {
336 myMutex.lock();
337 while (!myStopped && myTasks.empty()) {
338 myCondition.wait(myMutex);
339 }
340 if (myStopped) {
341 myMutex.unlock();
342 break;
343 }
345 myMutex.unlock();
346 try {
347 for (Task* const t : myCurrentTasks) {
348#ifdef WORKLOAD_PROFILING
349 const auto before = std::chrono::high_resolution_clock::now();
350#endif
351 t->run(this);
352#ifdef WORKLOAD_PROFILING
353 const auto after = std::chrono::high_resolution_clock::now();
354 myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count();
355 myCounter++;
356#endif
357 }
358 } catch (ProcessError& e) {
360 }
362 }
363 return 0;
364 }
365
370 void stop() {
371 myMutex.lock();
372 myStopped = true;
373 myCondition.signal();
374 myMutex.unlock();
375 join();
376 }
377
378#ifdef WORKLOAD_PROFILING
379 void startProfile() {
380 myBusyTime = 0;
381 }
382
383 double endProfile(const long long int time) {
384 myTotalTime += time;
385 myTotalBusyTime += myBusyTime;
386 return time == 0 ? 100. : 100. * myBusyTime / time;
387 }
388#endif
389
390private:
394 FXMutex myMutex;
396 FXCondition myCondition;
398 std::list<Task*> myTasks;
400 std::list<Task*> myCurrentTasks;
403#ifdef WORKLOAD_PROFILING
405 int myCounter;
407 long long int myBusyTime;
409 long long int myTotalBusyTime;
411 long long int myTotalTime;
412#endif
413};
414
415
416#endif
#define WORKLOAD_INTERVAL
#define WRITE_MESSAGE(msg)
Definition: MsgHandler.h:267
T MIN2(T a, T b)
Definition: StdDefs.h:71
T MAX2(T a, T b)
Definition: StdDefs.h:77
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition: ToString.h:46
A pool of worker threads which distributes the tasks and collects the results.
std::list< Task * > myFinishedTasks
list of finished tasks
void waitAll(const bool deleteFinished=true)
waits for all tasks to be finished
std::vector< MFXWorkerThread * > myWorkers
the current worker threads
bool isFull() const
Checks whether there are currently more pending tasks than threads.
void lock()
locks the pool mutex
int myRunningIndex
the running index for the next task
void clear()
Stops and deletes all worker threads.
void add(Task *const t, int index=-1)
Gives a number to the given task and assigns it to the worker with the given index....
virtual ~Pool()
Destructor.
void addFinished(std::list< Task * > &tasks)
Adds the given tasks to the list of finished tasks.
const std::vector< MFXWorkerThread * > & getWorkers()
void setException(ProcessError &e)
FXCondition myCondition
the semaphore to wait on for finishing all tasks
ProcessError * myException
the exception from a child thread
void addWorker(MFXWorkerThread *const w)
Adds the given thread to the pool.
void unlock()
unlocks the pool mutex
FXMutex myPoolMutex
the pool mutex for external sync
Pool(int numThreads=0)
Constructor.
int size() const
Returns the number of threads in the pool.
FXMutex myMutex
the internal mutex for the task list
Abstract superclass of a task to be run with an index to keep track of pending tasks.
int myIndex
the index of the task, valid only after the task has been added to the pool
virtual void run(MFXWorkerThread *context)=0
Abstract method which in subclasses should contain the computations to be performed.
virtual ~Task()
Desctructor.
void setIndex(const int newIndex)
Sets the running index of this task.
A thread repeatingly calculating incoming tasks.
FXMutex myMutex
the mutex for the task list
FXint run()
Main execution method of this thread.
Pool & myPool
the pool for this thread
MFXWorkerThread(Pool &pool)
Constructor.
bool myStopped
whether we are still running
void add(Task *t)
Adds the given task to this thread to be calculated.
std::list< Task * > myCurrentTasks
the list of tasks which are currently calculated
void stop()
Stops the thread.
FXCondition myCondition
the semaphore when waiting for new tasks
virtual ~MFXWorkerThread()
Destructor.
std::list< Task * > myTasks
the list of pending tasks