1#include <Core/CoreMacros.hpp>
2#include <Core/Tasks/Task.hpp>
3#include <Core/Tasks/TaskQueue.hpp>
4#include <Core/Utils/Index.hpp>
5#include <Core/Utils/Log.hpp>
6#include <Core/Utils/Timer.hpp>
8#include <condition_variable>
22 wlock lock( m_mutex );
23 m_workerThreads.
reserve( numThreads );
24 for ( uint i = 0; i < numThreads; ++i ) {
25 m_workerThreads.
emplace_back( &TaskQueue::runThread,
this, i );
31 wlock lock( m_mutex );
32 m_shuttingDown =
true;
35 for (
auto& t : m_workerThreads ) {
41 wlock lock( m_mutex );
44 tdata.taskName = task->getName();
45 m_timerData.push_back( tdata );
51 CORE_ASSERT( m_tasks.size() == m_dependencies.
size(),
"Inconsistent task list" );
52 CORE_ASSERT( m_tasks.size() == m_remainingDependencies.
size(),
"Inconsistent task list" );
53 CORE_ASSERT( m_tasks.size() == m_timerData.size(),
"Inconsistent task list" );
54 return TaskId { m_tasks.size() - 1 };
59 if ( taskId.isInvalid() || taskId > m_tasks.size() ) {
60 LOG( Utils::logDEBUG ) <<
"try to remove task " << taskId <<
" which is out of bounds "
64 wlock lock( m_mutex );
67 m_tasks[taskId] = std::make_unique<FunctionTask>( []() {}, m_timerData[taskId].taskName );
69 CORE_ASSERT( m_tasks.size() == m_dependencies.
size(),
"Inconsistent task list" );
70 CORE_ASSERT( m_tasks.size() == m_remainingDependencies.
size(),
"Inconsistent task list" );
71 CORE_ASSERT( m_tasks.size() == m_timerData.size(),
"Inconsistent task list" );
75 rlock lock( m_mutex );
76 auto itr =
std::find_if( m_tasks.begin(), m_tasks.end(), [taskName](
const auto& task ) {
77 return task->getName() == taskName;
80 if ( itr == m_tasks.end() )
return {};
81 return TaskId { itr - m_tasks.begin() };
85 wlock lock( m_mutex );
87 CORE_ASSERT( predecessor.isValid() && ( predecessor < m_tasks.size() ),
88 "Invalid predecessor task" );
89 CORE_ASSERT( successor.isValid() && ( successor < m_tasks.size() ),
"Invalid successor task" );
90 CORE_ASSERT( predecessor != successor,
"Cannot add self-dependency" );
92 CORE_ASSERT(
std::find( m_dependencies[predecessor].begin(),
93 m_dependencies[predecessor].end(),
94 successor ) == m_dependencies[predecessor].end(),
95 "Cannot add a dependency twice" );
97 m_dependencies[predecessor].
push_back( successor );
98 ++m_remainingDependencies[successor];
104 auto predecessorId = getTaskId( predecessor );
106 if ( predecessorId.isValid() ) {
116 auto successorId = getTaskId( successor );
117 if ( successorId.isValid() ) {
126 wlock lock( m_mutex );
127 m_pendingDepsSucc.
emplace_back( predecessors, successor );
131 wlock lock( m_mutex );
132 m_pendingDepsPre.
emplace_back( predecessor, successors );
135void TaskQueue::resolveDependencies() {
136 for (
const auto& pre : m_pendingDepsPre ) {
137 ON_ASSERT(
bool result = )
addDependency( pre.first, pre.second );
138 CORE_WARN_IF( !result,
139 "Pending dependency unresolved : " << m_tasks[pre.first]->getName() << " -> ("
140 << pre.second << ")" );
142 for ( const auto& pre : m_pendingDepsSucc ) {
143 ON_ASSERT(
bool result = )
addDependency( pre.first, pre.second );
144 CORE_WARN_IF( !result,
145 "Pending dependency unresolved : (" << pre.first << ") -> "
146 << m_tasks[pre.second]->getName() );
149 wlock lock( m_mutex );
150 m_pendingDepsPre.clear();
151 m_pendingDepsSucc.clear();
157 CORE_ASSERT( m_remainingDependencies[task] == 0,
158 " Task" << m_tasks[task]->getName() <<
"has unmet dependencies" );
163void TaskQueue::detectCycles() {
164#if defined( CORE_DEBUG )
168 rlock
lock( m_mutex );
169 for ( uint
id = 0;
id < m_tasks.size(); ++id ) {
170 if ( m_dependencies[
id].size() == 0 ) { pending.
push(
TaskId(
id ) ); }
175 CORE_ASSERT( m_tasks.empty() || !pending.
empty(),
"No free tasks." );
177 while ( !pending.
empty() ) {
182 CORE_ASSERT( !( visited[
id] ),
"Cycle detected in tasks !" );
185 for (
const auto& dep : m_dependencies[
id] ) {
193 using namespace Ra::Core::Utils;
195 rlock lock( m_mutex );
196 if ( m_workerThreads.
empty() ) {
197 LOG( logERROR ) <<
"TaskQueue as 0 threads, could not start tasks in parallel. Either "
198 "create a task queue with more threads, or use runTasksInThisThread";
203 resolveDependencies();
210 wlock lock( m_mutex );
211 for ( uint t = 0; t < m_tasks.size(); ++t ) {
214 if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) { queueTask(
TaskId { t } ); }
229 resolveDependencies();
235 for ( uint t = 0; t < m_tasks.size(); ++t ) {
237 if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) {
243 while ( !taskQueue.
empty() ) {
246 task = taskQueue.
back();
251 m_timerData[task].threadId = 0;
252 m_tasks[task]->process();
255 for (
auto t : m_dependencies[task] ) {
256 uint& nDepends = m_remainingDependencies[t];
257 CORE_ASSERT( nDepends > 0,
"Inconsistency in dependencies" );
267 rlock lock( m_mutex );
268 m_waitForTasksNotifier.
wait(
269 lock, [
this]() {
return ( m_taskQueue.
empty() && m_processingTasks == 0 ); } );
278 wlock lock( m_mutex );
280 CORE_ASSERT( m_processingTasks == 0,
"You have tasks still in process" );
281 CORE_ASSERT( m_taskQueue.
empty(),
" You have unprocessed tasks " );
284 m_dependencies.
clear();
286 m_remainingDependencies.
clear();
289void TaskQueue::runThread( uint
id ) {
295 wlock lock( m_mutex );
297 m_threadNotifier.
wait( lock,
298 [
this]() {
return m_shuttingDown || !m_taskQueue.
empty(); } );
301 if ( m_shuttingDown ) {
return; }
304 task = m_taskQueue.
back();
307 CORE_ASSERT( task.isValid() && task < m_tasks.size(),
"Invalid task" );
314 rlock
lock( m_mutex );
316 m_timerData[task].threadId = id;
318 m_tasks[task]->process();
320 rlock
lock( m_mutex );
327 wlock
lock( m_mutex );
328 for (
auto t : m_dependencies[task] ) {
329 uint& nDepends = m_remainingDependencies[t];
330 CORE_ASSERT( nDepends > 0,
"Inconsistency in dependencies" );
332 if ( nDepends == 0 ) {
338 if ( m_processingTasks == 0 ) { m_waitForTasksNotifier.
notify_one(); }
341 if ( newTasks > 0 ) { m_threadNotifier.
notify_all(); }
346 output <<
"digraph tasks {" <<
std::endl;
348 for (
const auto& t : m_tasks ) {
349 output <<
"\"" << t->getName() <<
"\"" <<
std::endl;
352 for ( uint i = 0; i < m_dependencies.
size(); ++i ) {
353 const auto& task1 = m_tasks[i];
354 for (
const auto& dep : m_dependencies[i] ) {
355 const auto& task2 = m_tasks[dep];
356 output <<
"\"" << task1->getName() <<
"\""
358 output <<
"\"" << task2->getName() <<
"\"" <<
std::endl;
362 for (
const auto& preDep : m_pendingDepsPre ) {
363 const auto& task1 = m_tasks[preDep.first];
366 if (
std::find_if( m_tasks.begin(), m_tasks.end(), [=](
const auto& task ) {
367 return task->getName() == t2name;
368 } ) == m_tasks.end() ) {
371 output <<
"\"" << task1->getName() <<
"\""
373 output <<
"\"" << t2name <<
"\"" <<
std::endl;
376 for (
const auto& postDep : m_pendingDepsSucc ) {
378 const auto& t2 = m_tasks[postDep.second];
380 if (
std::find_if( m_tasks.begin(), m_tasks.end(), [=](
const auto& task ) {
381 return task->getName() == t1name;
382 } ) == m_tasks.end() ) {
385 output <<
"\"" << t1name <<
"\""
387 output <<
"\"" << t2->getName() <<
"\"" <<
std::endl;
void flushTaskQueue()
Erases all tasks. Will assert if tasks are unprocessed.
TaskQueue(uint numThreads)
void printTaskGraph(std::ostream &output) const
Prints the current task graph in dot format.
const std::vector< TimerData > & getTimerData()
Access the data from the last frame execution after processTaskQueue();.
void removeTask(TaskId taskId)
Utils::Index TaskId
Identifier for a task in the task queue.
void addPendingDependency(const std::string &predecessors, TaskId successor)
void addDependency(TaskId predecessor, TaskId successor)
TaskId registerTask(std::unique_ptr< Task > task)
void runTasksInThisThread()
void waitForTasks()
Blocks until all tasks and dependencies are finished.
~TaskQueue()
Destructor. Waits for all the threads and safely deletes them.
T emplace_back(T... args)
hepler function to manage enum as underlying types in VariableSet
Record of a task's start and end time.