Radium Engine  1.5.20
Loading...
Searching...
No Matches
TaskQueue.cpp
1#include <Core/Tasks/Task.hpp>
2#include <Core/Tasks/TaskQueue.hpp>
3#include <Core/Utils/Log.hpp>
4
5#include <algorithm>
6#include <iostream>
7#include <memory>
8#include <mutex>
9#include <stack>
10
11namespace Ra {
12namespace Core {
13
14TaskQueue::TaskQueue( uint numThreads ) : m_processingTasks( 0 ), m_shuttingDown( false ) {
15 wlock lock( m_mutex );
16 m_workerThreads.reserve( numThreads );
17 for ( uint i = 0; i < numThreads; ++i ) {
18 m_workerThreads.emplace_back( &TaskQueue::runThread, this, i );
19 }
20}
21
23 {
24 wlock lock( m_mutex );
25 m_shuttingDown = true;
26 }
28 for ( auto& t : m_workerThreads ) {
29 t.join();
30 }
31}
32
34 wlock lock( m_mutex );
35 TimerData tdata;
36 // init tdata with task name before moving ownership
37 tdata.taskName = task->getName();
38 m_timerData.push_back( tdata );
39
40 m_tasks.push_back( std::move( task ) );
41 m_dependencies.push_back( std::vector<TaskId>() );
42 m_remainingDependencies.push_back( 0 );
43
44 CORE_ASSERT( m_tasks.size() == m_dependencies.size(), "Inconsistent task list" );
45 CORE_ASSERT( m_tasks.size() == m_remainingDependencies.size(), "Inconsistent task list" );
46 CORE_ASSERT( m_tasks.size() == m_timerData.size(), "Inconsistent task list" );
47 return TaskId { m_tasks.size() - 1 };
48}
49
51
52 if ( taskId.isInvalid() || taskId > m_tasks.size() ) {
53 LOG( Utils::logDEBUG ) << "try to remove task " << taskId << " which is out of bounds "
54 << m_tasks.size();
55 return;
56 }
57 wlock lock( m_mutex );
58
59 // set task as dummy noop
60 m_tasks[taskId] = std::make_unique<FunctionTask>( []() {}, m_timerData[taskId].taskName );
61
62 CORE_ASSERT( m_tasks.size() == m_dependencies.size(), "Inconsistent task list" );
63 CORE_ASSERT( m_tasks.size() == m_remainingDependencies.size(), "Inconsistent task list" );
64 CORE_ASSERT( m_tasks.size() == m_timerData.size(), "Inconsistent task list" );
65}
66
67TaskQueue::TaskId TaskQueue::getTaskId( const std::string& taskName ) const {
68 rlock lock( m_mutex );
69 auto itr = std::find_if( m_tasks.begin(), m_tasks.end(), [taskName]( const auto& task ) {
70 return task->getName() == taskName;
71 } );
72
73 if ( itr == m_tasks.end() ) return {};
74 return TaskId { itr - m_tasks.begin() };
75}
76
78 wlock lock( m_mutex );
79
80 CORE_ASSERT( predecessor.isValid() && ( predecessor < m_tasks.size() ),
81 "Invalid predecessor task" );
82 CORE_ASSERT( successor.isValid() && ( successor < m_tasks.size() ), "Invalid successor task" );
83 CORE_ASSERT( predecessor != successor, "Cannot add self-dependency" );
84
85 CORE_ASSERT( std::find( m_dependencies[predecessor].begin(),
86 m_dependencies[predecessor].end(),
87 successor ) == m_dependencies[predecessor].end(),
88 "Cannot add a dependency twice" );
89
90 m_dependencies[predecessor].push_back( successor );
91 ++m_remainingDependencies[successor];
92}
93
94bool TaskQueue::addDependency( const std::string& predecessor, TaskQueue::TaskId successor ) {
95 bool added = false;
96
97 auto predecessorId = getTaskId( predecessor );
98
99 if ( predecessorId.isValid() ) {
100 added = true;
101 addDependency( predecessorId, successor );
102 }
103 return added;
104}
105
106bool TaskQueue::addDependency( TaskQueue::TaskId predecessor, const std::string& successor ) {
107 bool added = false;
108
109 auto successorId = getTaskId( successor );
110 if ( successorId.isValid() ) {
111 added = true;
112 addDependency( predecessor, successorId );
113 }
114 return added;
115}
116
118 TaskQueue::TaskId successor ) {
119 wlock lock( m_mutex );
120 m_pendingDepsSucc.emplace_back( predecessors, successor );
121}
122
123void TaskQueue::addPendingDependency( TaskId predecessor, const std::string& successors ) {
124 wlock lock( m_mutex );
125 m_pendingDepsPre.emplace_back( predecessor, successors );
126}
127
128void TaskQueue::resolveDependencies() {
129 for ( const auto& pre : m_pendingDepsPre ) {
130 ON_ASSERT( bool result = ) addDependency( pre.first, pre.second );
131 CORE_WARN_IF( !result,
132 "Pending dependency unresolved : " << m_tasks[pre.first]->getName() << " -> ("
133 << pre.second << ")" );
134 }
135 for ( const auto& pre : m_pendingDepsSucc ) {
136 ON_ASSERT( bool result = ) addDependency( pre.first, pre.second );
137 CORE_WARN_IF( !result,
138 "Pending dependency unresolved : (" << pre.first << ") -> "
139 << m_tasks[pre.second]->getName() );
140 }
141 {
142 wlock lock( m_mutex );
143 m_pendingDepsPre.clear();
144 m_pendingDepsSucc.clear();
145 }
146}
147
148// queueTask is always called with m_taskQueueMutex locked
149void TaskQueue::queueTask( TaskQueue::TaskId task ) {
150 CORE_ASSERT( m_remainingDependencies[task] == 0,
151 " Task" << m_tasks[task]->getName() << "has unmet dependencies" );
152
153 m_taskQueue.push_front( task );
154}
155
156void TaskQueue::detectCycles() {
157#if defined( CORE_DEBUG )
158 // Do a depth-first search of the nodes.
159 std::vector<bool> visited( m_tasks.size(), false );
160 std::stack<TaskId> pending;
161 rlock lock( m_mutex );
162 for ( uint id = 0; id < m_tasks.size(); ++id ) {
163 if ( m_dependencies[id].size() == 0 ) { pending.push( TaskId( id ) ); }
164 }
165
166 // If you hit this assert, there are tasks in the list but
167 // all tasks have dependencies so no task can start.
168 CORE_ASSERT( m_tasks.empty() || !pending.empty(), "No free tasks." );
169
170 while ( !pending.empty() ) {
171 TaskId id = pending.top();
172 pending.pop();
173
174 // The task has already been visited. It means there is a cycle in the task graph.
175 CORE_ASSERT( !( visited[id] ), "Cycle detected in tasks !" );
176
177 visited[id] = true;
178 for ( const auto& dep : m_dependencies[id] ) {
179 pending.push( dep );
180 }
181 }
182#endif
183}
184
186 using namespace Ra::Core::Utils;
187 {
188 rlock lock( m_mutex );
189 if ( m_workerThreads.empty() ) {
190 LOG( logERROR ) << "TaskQueue as 0 threads, could not start tasks in parallel. Either "
191 "create a task queue with more threads, or use runTasksInThisThread";
192 return;
193 }
194 }
195 // Add pending dependencies.
196 resolveDependencies();
197
198 // Do a debug check
199 detectCycles();
200
201 // Enqueue all tasks with no dependencies.
202 {
203 wlock lock( m_mutex );
204 for ( uint t = 0; t < m_tasks.size(); ++t ) {
205 // only queue non null m_tasks
206
207 if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) { queueTask( TaskId { t } ); }
208 }
209 }
210 // Wake up all threads.
211 m_threadNotifier.notify_all();
212}
213
215 // this method should not be called between startTasks/waitForTasks, we do not lock anything
216 // here.
217
218 // use local task queue, preventing workers to pickup jobs.
219 std::deque<TaskId> taskQueue;
220
221 // Add pending dependencies.
222 resolveDependencies();
223
224 // Do a debug check
225 detectCycles();
226 {
227 // Enqueue all tasks with no dependencies.
228 for ( uint t = 0; t < m_tasks.size(); ++t ) {
229 // only queue non null m_tasks
230 if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) {
231 taskQueue.push_front( TaskId { t } );
232 }
233 }
234 }
235 {
236 while ( !taskQueue.empty() ) {
237 TaskId task;
238 {
239 task = taskQueue.back();
240 taskQueue.pop_back();
241 }
242 // Run task
243 m_timerData[task].start = Utils::Clock::now();
244 m_timerData[task].threadId = 0;
245 m_tasks[task]->process();
246 m_timerData[task].end = Utils::Clock::now();
247
248 for ( auto t : m_dependencies[task] ) {
249 uint& nDepends = m_remainingDependencies[t];
250 CORE_ASSERT( nDepends > 0, "Inconsistency in dependencies" );
251 --nDepends;
252 if ( nDepends == 0 ) { taskQueue.push_front( TaskId { t } ); }
253 }
254 }
255 }
257}
258
260 rlock lock( m_mutex );
261 m_waitForTasksNotifier.wait(
262 lock, [this]() { return ( m_taskQueue.empty() && m_processingTasks == 0 ); } );
263}
264
266 return m_timerData;
267}
268
270 m_threadNotifier.notify_all();
271 wlock lock( m_mutex );
272
273 CORE_ASSERT( m_processingTasks == 0, "You have tasks still in process" );
274 CORE_ASSERT( m_taskQueue.empty(), " You have unprocessed tasks " );
275
276 m_tasks.clear();
277 m_dependencies.clear();
278 m_timerData.clear();
279 m_remainingDependencies.clear();
280}
281
282void TaskQueue::runThread( uint id ) {
283 while ( true ) {
284 TaskId task;
285
286 // Acquire mutex.
287 {
288 wlock lock( m_mutex );
289 // Wait for a new task
290 m_threadNotifier.wait( lock,
291 [this]() { return m_shuttingDown || !m_taskQueue.empty(); } );
292 // If the task queue is shutting down we quit, releasing
293 // the lock.
294 if ( m_shuttingDown ) { return; }
295
296 // If we are here it means we got a task
297 task = m_taskQueue.back();
298 m_taskQueue.pop_back();
299 ++m_processingTasks;
300 CORE_ASSERT( task.isValid() && task < m_tasks.size(), "Invalid task" );
301 }
302 // Release mutex.
303
304 // Run task
305 {
306 {
307 rlock lock( m_mutex );
308 m_timerData[task].start = Utils::Clock::now();
309 m_timerData[task].threadId = id;
310 }
311 m_tasks[task]->process();
312 {
313 rlock lock( m_mutex );
314 m_timerData[task].end = Utils::Clock::now();
315 }
316 }
317 // Critical section : mark task as finished and en-queue dependencies.
318 uint newTasks = 0;
319 {
320 wlock lock( m_mutex );
321 for ( auto t : m_dependencies[task] ) {
322 uint& nDepends = m_remainingDependencies[t];
323 CORE_ASSERT( nDepends > 0, "Inconsistency in dependencies" );
324 --nDepends;
325 if ( nDepends == 0 ) {
326 queueTask( t );
327 ++newTasks;
328 }
329 }
330 --m_processingTasks;
331 if ( m_processingTasks == 0 ) { m_waitForTasksNotifier.notify_one(); }
332 }
333 // If we added new tasks, we wake up threads to execute thems.
334 if ( newTasks > 0 ) { m_threadNotifier.notify_all(); }
335 } // End of while(true)
336}
337
339 output << "digraph tasks {" << std::endl;
340
341 for ( const auto& t : m_tasks ) {
342 output << "\"" << t->getName() << "\"" << std::endl;
343 }
344
345 for ( uint i = 0; i < m_dependencies.size(); ++i ) {
346 const auto& task1 = m_tasks[i];
347 for ( const auto& dep : m_dependencies[i] ) {
348 const auto& task2 = m_tasks[dep];
349 output << "\"" << task1->getName() << "\""
350 << " -> ";
351 output << "\"" << task2->getName() << "\"" << std::endl;
352 }
353 }
354
355 for ( const auto& preDep : m_pendingDepsPre ) {
356 const auto& task1 = m_tasks[preDep.first];
357 std::string t2name = preDep.second;
358
359 if ( std::find_if( m_tasks.begin(), m_tasks.end(), [=]( const auto& task ) {
360 return task->getName() == t2name;
361 } ) == m_tasks.end() ) {
362 t2name += "?";
363 }
364 output << "\"" << task1->getName() << "\""
365 << " -> ";
366 output << "\"" << t2name << "\"" << std::endl;
367 }
368
369 for ( const auto& postDep : m_pendingDepsSucc ) {
370 std::string t1name = postDep.first;
371 const auto& t2 = m_tasks[postDep.second];
372
373 if ( std::find_if( m_tasks.begin(), m_tasks.end(), [=]( const auto& task ) {
374 return task->getName() == t1name;
375 } ) == m_tasks.end() ) {
376 t1name += "?";
377 }
378 output << "\"" << t1name << "\""
379 << " -> ";
380 output << "\"" << t2->getName() << "\"" << std::endl;
381 }
382
383 output << "}" << std::endl;
384}
385} // namespace Core
386} // namespace Ra
T back(T... args)
void flushTaskQueue()
Erases all tasks. Will assert if tasks are unprocessed.
TaskQueue(uint numThreads)
Definition TaskQueue.cpp:14
void printTaskGraph(std::ostream &output) const
Prints the current task graph in dot format.
const std::vector< TimerData > & getTimerData()
Access the data from the last frame execution after processTaskQueue();.
void removeTask(TaskId taskId)
Definition TaskQueue.cpp:50
Utils::Index TaskId
Identifier for a task in the task queue.
Definition TaskQueue.hpp:51
void addPendingDependency(const std::string &predecessors, TaskId successor)
void addDependency(TaskId predecessor, TaskId successor)
Definition TaskQueue.cpp:77
TaskId registerTask(std::unique_ptr< Task > task)
Definition TaskQueue.cpp:33
void waitForTasks()
Blocks until all tasks and dependencies are finished.
~TaskQueue()
Destructor. Waits for all the threads and safely deletes them.
Definition TaskQueue.cpp:22
T clear(T... args)
T emplace_back(T... args)
T empty(T... args)
T endl(T... args)
T find_if(T... args)
T lock(T... args)
T move(T... args)
hepler function to manage enum as underlying types in VariableSet
Definition Cage.cpp:3
T pop_back(T... args)
T pop(T... args)
T push_back(T... args)
T push_front(T... args)
T push(T... args)
T reserve(T... args)
T size(T... args)
Record of a task's start and end time.
Definition TaskQueue.hpp:54
T top(T... args)