Radium Engine  1.7.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TaskQueue.cpp
1#include <Core/CoreMacros.hpp>
2#include <Core/Tasks/Task.hpp>
3#include <Core/Tasks/TaskQueue.hpp>
4#include <Core/Utils/Index.hpp>
5#include <Core/Utils/Log.hpp>
6#include <Core/Utils/Timer.hpp>
7#include <algorithm>
8#include <condition_variable>
9#include <deque>
10#include <iostream>
11#include <memory>
12#include <stack>
13#include <string>
14#include <thread>
15#include <utility>
16#include <vector>
17
18namespace Ra {
19namespace Core {
20
21TaskQueue::TaskQueue( uint numThreads ) : m_processingTasks( 0 ), m_shuttingDown( false ) {
22 wlock lock( m_mutex );
23 m_workerThreads.reserve( numThreads );
24 for ( uint i = 0; i < numThreads; ++i ) {
25 m_workerThreads.emplace_back( &TaskQueue::runThread, this, i );
26 }
27}
28
30 {
31 wlock lock( m_mutex );
32 m_shuttingDown = true;
33 }
35 for ( auto& t : m_workerThreads ) {
36 t.join();
37 }
38}
39
41 wlock lock( m_mutex );
42 TimerData tdata;
43 // init tdata with task name before moving ownership
44 tdata.taskName = task->getName();
45 m_timerData.push_back( tdata );
46
47 m_tasks.push_back( std::move( task ) );
48 m_dependencies.push_back( std::vector<TaskId>() );
49 m_remainingDependencies.push_back( 0 );
50
51 CORE_ASSERT( m_tasks.size() == m_dependencies.size(), "Inconsistent task list" );
52 CORE_ASSERT( m_tasks.size() == m_remainingDependencies.size(), "Inconsistent task list" );
53 CORE_ASSERT( m_tasks.size() == m_timerData.size(), "Inconsistent task list" );
54 return TaskId { m_tasks.size() - 1 };
55}
56
58
59 if ( taskId.isInvalid() || taskId > m_tasks.size() ) {
60 LOG( Utils::logDEBUG ) << "try to remove task " << taskId << " which is out of bounds "
61 << m_tasks.size();
62 return;
63 }
64 wlock lock( m_mutex );
65
66 // set task as dummy noop
67 m_tasks[taskId] = std::make_unique<FunctionTask>( []() {}, m_timerData[taskId].taskName );
68
69 CORE_ASSERT( m_tasks.size() == m_dependencies.size(), "Inconsistent task list" );
70 CORE_ASSERT( m_tasks.size() == m_remainingDependencies.size(), "Inconsistent task list" );
71 CORE_ASSERT( m_tasks.size() == m_timerData.size(), "Inconsistent task list" );
72}
73
74TaskQueue::TaskId TaskQueue::getTaskId( const std::string& taskName ) const {
75 rlock lock( m_mutex );
76 auto itr = std::find_if( m_tasks.begin(), m_tasks.end(), [taskName]( const auto& task ) {
77 return task->getName() == taskName;
78 } );
79
80 if ( itr == m_tasks.end() ) return {};
81 return TaskId { itr - m_tasks.begin() };
82}
83
85 wlock lock( m_mutex );
86
87 CORE_ASSERT( predecessor.isValid() && ( predecessor < m_tasks.size() ),
88 "Invalid predecessor task" );
89 CORE_ASSERT( successor.isValid() && ( successor < m_tasks.size() ), "Invalid successor task" );
90 CORE_ASSERT( predecessor != successor, "Cannot add self-dependency" );
91
92 CORE_ASSERT( std::find( m_dependencies[predecessor].begin(),
93 m_dependencies[predecessor].end(),
94 successor ) == m_dependencies[predecessor].end(),
95 "Cannot add a dependency twice" );
96
97 m_dependencies[predecessor].push_back( successor );
98 ++m_remainingDependencies[successor];
99}
100
101bool TaskQueue::addDependency( const std::string& predecessor, TaskQueue::TaskId successor ) {
102 bool added = false;
103
104 auto predecessorId = getTaskId( predecessor );
105
106 if ( predecessorId.isValid() ) {
107 added = true;
108 addDependency( predecessorId, successor );
109 }
110 return added;
111}
112
113bool TaskQueue::addDependency( TaskQueue::TaskId predecessor, const std::string& successor ) {
114 bool added = false;
115
116 auto successorId = getTaskId( successor );
117 if ( successorId.isValid() ) {
118 added = true;
119 addDependency( predecessor, successorId );
120 }
121 return added;
122}
123
125 TaskQueue::TaskId successor ) {
126 wlock lock( m_mutex );
127 m_pendingDepsSucc.emplace_back( predecessors, successor );
128}
129
130void TaskQueue::addPendingDependency( TaskId predecessor, const std::string& successors ) {
131 wlock lock( m_mutex );
132 m_pendingDepsPre.emplace_back( predecessor, successors );
133}
134
135void TaskQueue::resolveDependencies() {
136 for ( const auto& pre : m_pendingDepsPre ) {
137 ON_ASSERT( bool result = ) addDependency( pre.first, pre.second );
138 CORE_WARN_IF( !result,
139 "Pending dependency unresolved : " << m_tasks[pre.first]->getName() << " -> ("
140 << pre.second << ")" );
141 }
142 for ( const auto& pre : m_pendingDepsSucc ) {
143 ON_ASSERT( bool result = ) addDependency( pre.first, pre.second );
144 CORE_WARN_IF( !result,
145 "Pending dependency unresolved : (" << pre.first << ") -> "
146 << m_tasks[pre.second]->getName() );
147 }
148 {
149 wlock lock( m_mutex );
150 m_pendingDepsPre.clear();
151 m_pendingDepsSucc.clear();
152 }
153}
154
155// queueTask is always called with m_taskQueueMutex locked
156void TaskQueue::queueTask( TaskQueue::TaskId task ) {
157 CORE_ASSERT( m_remainingDependencies[task] == 0,
158 " Task" << m_tasks[task]->getName() << "has unmet dependencies" );
159
160 m_taskQueue.push_front( task );
161}
162
163void TaskQueue::detectCycles() {
164#if defined( CORE_DEBUG )
165 // Do a depth-first search of the nodes.
166 std::vector<bool> visited( m_tasks.size(), false );
167 std::stack<TaskId> pending;
168 rlock lock( m_mutex );
169 for ( uint id = 0; id < m_tasks.size(); ++id ) {
170 if ( m_dependencies[id].size() == 0 ) { pending.push( TaskId( id ) ); }
171 }
172
173 // If you hit this assert, there are tasks in the list but
174 // all tasks have dependencies so no task can start.
175 CORE_ASSERT( m_tasks.empty() || !pending.empty(), "No free tasks." );
176
177 while ( !pending.empty() ) {
178 TaskId id = pending.top();
179 pending.pop();
180
181 // The task has already been visited. It means there is a cycle in the task graph.
182 CORE_ASSERT( !( visited[id] ), "Cycle detected in tasks !" );
183
184 visited[id] = true;
185 for ( const auto& dep : m_dependencies[id] ) {
186 pending.push( dep );
187 }
188 }
189#endif
190}
191
193 using namespace Ra::Core::Utils;
194 {
195 rlock lock( m_mutex );
196 if ( m_workerThreads.empty() ) {
197 LOG( logERROR ) << "TaskQueue as 0 threads, could not start tasks in parallel. Either "
198 "create a task queue with more threads, or use runTasksInThisThread";
199 return;
200 }
201 }
202 // Add pending dependencies.
203 resolveDependencies();
204
205 // Do a debug check
206 detectCycles();
207
208 // Enqueue all tasks with no dependencies.
209 {
210 wlock lock( m_mutex );
211 for ( uint t = 0; t < m_tasks.size(); ++t ) {
212 // only queue non null m_tasks
213
214 if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) { queueTask( TaskId { t } ); }
215 }
216 }
217 // Wake up all threads.
218 m_threadNotifier.notify_all();
219}
220
222 // this method should not be called between startTasks/waitForTasks, we do not lock anything
223 // here.
224
225 // use local task queue, preventing workers to pickup jobs.
226 std::deque<TaskId> taskQueue;
227
228 // Add pending dependencies.
229 resolveDependencies();
230
231 // Do a debug check
232 detectCycles();
233 {
234 // Enqueue all tasks with no dependencies.
235 for ( uint t = 0; t < m_tasks.size(); ++t ) {
236 // only queue non null m_tasks
237 if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) {
238 taskQueue.push_front( TaskId { t } );
239 }
240 }
241 }
242 {
243 while ( !taskQueue.empty() ) {
244 TaskId task;
245 {
246 task = taskQueue.back();
247 taskQueue.pop_back();
248 }
249 // Run task
250 m_timerData[task].start = Utils::Clock::now();
251 m_timerData[task].threadId = 0;
252 m_tasks[task]->process();
253 m_timerData[task].end = Utils::Clock::now();
254
255 for ( auto t : m_dependencies[task] ) {
256 uint& nDepends = m_remainingDependencies[t];
257 CORE_ASSERT( nDepends > 0, "Inconsistency in dependencies" );
258 --nDepends;
259 if ( nDepends == 0 ) { taskQueue.push_front( TaskId { t } ); }
260 }
261 }
262 }
264}
265
267 rlock lock( m_mutex );
268 m_waitForTasksNotifier.wait(
269 lock, [this]() { return ( m_taskQueue.empty() && m_processingTasks == 0 ); } );
270}
271
273 return m_timerData;
274}
275
277 m_threadNotifier.notify_all();
278 wlock lock( m_mutex );
279
280 CORE_ASSERT( m_processingTasks == 0, "You have tasks still in process" );
281 CORE_ASSERT( m_taskQueue.empty(), " You have unprocessed tasks " );
282
283 m_tasks.clear();
284 m_dependencies.clear();
285 m_timerData.clear();
286 m_remainingDependencies.clear();
287}
288
289void TaskQueue::runThread( uint id ) {
290 while ( true ) {
291 TaskId task;
292
293 // Acquire mutex.
294 {
295 wlock lock( m_mutex );
296 // Wait for a new task
297 m_threadNotifier.wait( lock,
298 [this]() { return m_shuttingDown || !m_taskQueue.empty(); } );
299 // If the task queue is shutting down we quit, releasing
300 // the lock.
301 if ( m_shuttingDown ) { return; }
302
303 // If we are here it means we got a task
304 task = m_taskQueue.back();
305 m_taskQueue.pop_back();
306 ++m_processingTasks;
307 CORE_ASSERT( task.isValid() && task < m_tasks.size(), "Invalid task" );
308 }
309 // Release mutex.
310
311 // Run task
312 {
313 {
314 rlock lock( m_mutex );
315 m_timerData[task].start = Utils::Clock::now();
316 m_timerData[task].threadId = id;
317 }
318 m_tasks[task]->process();
319 {
320 rlock lock( m_mutex );
321 m_timerData[task].end = Utils::Clock::now();
322 }
323 }
324 // Critical section : mark task as finished and en-queue dependencies.
325 uint newTasks = 0;
326 {
327 wlock lock( m_mutex );
328 for ( auto t : m_dependencies[task] ) {
329 uint& nDepends = m_remainingDependencies[t];
330 CORE_ASSERT( nDepends > 0, "Inconsistency in dependencies" );
331 --nDepends;
332 if ( nDepends == 0 ) {
333 queueTask( t );
334 ++newTasks;
335 }
336 }
337 --m_processingTasks;
338 if ( m_processingTasks == 0 ) { m_waitForTasksNotifier.notify_one(); }
339 }
340 // If we added new tasks, we wake up threads to execute thems.
341 if ( newTasks > 0 ) { m_threadNotifier.notify_all(); }
342 } // End of while(true)
343}
344
346 output << "digraph tasks {" << std::endl;
347
348 for ( const auto& t : m_tasks ) {
349 output << "\"" << t->getName() << "\"" << std::endl;
350 }
351
352 for ( uint i = 0; i < m_dependencies.size(); ++i ) {
353 const auto& task1 = m_tasks[i];
354 for ( const auto& dep : m_dependencies[i] ) {
355 const auto& task2 = m_tasks[dep];
356 output << "\"" << task1->getName() << "\""
357 << " -> ";
358 output << "\"" << task2->getName() << "\"" << std::endl;
359 }
360 }
361
362 for ( const auto& preDep : m_pendingDepsPre ) {
363 const auto& task1 = m_tasks[preDep.first];
364 std::string t2name = preDep.second;
365
366 if ( std::find_if( m_tasks.begin(), m_tasks.end(), [=]( const auto& task ) {
367 return task->getName() == t2name;
368 } ) == m_tasks.end() ) {
369 t2name += "?";
370 }
371 output << "\"" << task1->getName() << "\""
372 << " -> ";
373 output << "\"" << t2name << "\"" << std::endl;
374 }
375
376 for ( const auto& postDep : m_pendingDepsSucc ) {
377 std::string t1name = postDep.first;
378 const auto& t2 = m_tasks[postDep.second];
379
380 if ( std::find_if( m_tasks.begin(), m_tasks.end(), [=]( const auto& task ) {
381 return task->getName() == t1name;
382 } ) == m_tasks.end() ) {
383 t1name += "?";
384 }
385 output << "\"" << t1name << "\""
386 << " -> ";
387 output << "\"" << t2->getName() << "\"" << std::endl;
388 }
389
390 output << "}" << std::endl;
391}
392} // namespace Core
393} // namespace Ra
T back(T... args)
void flushTaskQueue()
Erases all tasks. Will assert if tasks are unprocessed.
TaskQueue(uint numThreads)
Definition TaskQueue.cpp:21
void printTaskGraph(std::ostream &output) const
Prints the current task graph in dot format.
const std::vector< TimerData > & getTimerData()
Access the data from the last frame execution after processTaskQueue();.
void removeTask(TaskId taskId)
Definition TaskQueue.cpp:57
Utils::Index TaskId
Identifier for a task in the task queue.
Definition TaskQueue.hpp:53
void addPendingDependency(const std::string &predecessors, TaskId successor)
void addDependency(TaskId predecessor, TaskId successor)
Definition TaskQueue.cpp:84
TaskId registerTask(std::unique_ptr< Task > task)
Definition TaskQueue.cpp:40
void waitForTasks()
Blocks until all tasks and dependencies are finished.
~TaskQueue()
Destructor. Waits for all the threads and safely deletes them.
Definition TaskQueue.cpp:29
T clear(T... args)
T emplace_back(T... args)
T empty(T... args)
T endl(T... args)
T find_if(T... args)
T lock(T... args)
T move(T... args)
hepler function to manage enum as underlying types in VariableSet
Definition Cage.cpp:4
T pop_back(T... args)
T pop(T... args)
T push_back(T... args)
T push_front(T... args)
T push(T... args)
T reserve(T... args)
T size(T... args)
Record of a task's start and end time.
Definition TaskQueue.hpp:56
T top(T... args)