1#include <Core/Tasks/Task.hpp>
2#include <Core/Tasks/TaskQueue.hpp>
3#include <Core/Utils/Log.hpp>
15 wlock lock( m_mutex );
16 m_workerThreads.
reserve( numThreads );
17 for ( uint i = 0; i < numThreads; ++i ) {
18 m_workerThreads.
emplace_back( &TaskQueue::runThread,
this, i );
24 wlock lock( m_mutex );
25 m_shuttingDown =
true;
28 for (
auto& t : m_workerThreads ) {
34 wlock lock( m_mutex );
37 tdata.taskName = task->getName();
38 m_timerData.push_back( tdata );
44 CORE_ASSERT( m_tasks.size() == m_dependencies.
size(),
"Inconsistent task list" );
45 CORE_ASSERT( m_tasks.size() == m_remainingDependencies.
size(),
"Inconsistent task list" );
46 CORE_ASSERT( m_tasks.size() == m_timerData.size(),
"Inconsistent task list" );
47 return TaskId { m_tasks.size() - 1 };
52 if ( taskId.isInvalid() || taskId > m_tasks.size() ) {
53 LOG( Utils::logDEBUG ) <<
"try to remove task " << taskId <<
" which is out of bounds "
57 wlock lock( m_mutex );
60 m_tasks[taskId] = std::make_unique<FunctionTask>( []() {}, m_timerData[taskId].taskName );
62 CORE_ASSERT( m_tasks.size() == m_dependencies.
size(),
"Inconsistent task list" );
63 CORE_ASSERT( m_tasks.size() == m_remainingDependencies.
size(),
"Inconsistent task list" );
64 CORE_ASSERT( m_tasks.size() == m_timerData.size(),
"Inconsistent task list" );
68 rlock lock( m_mutex );
69 auto itr =
std::find_if( m_tasks.begin(), m_tasks.end(), [taskName](
const auto& task ) {
70 return task->getName() == taskName;
73 if ( itr == m_tasks.end() )
return {};
74 return TaskId { itr - m_tasks.begin() };
78 wlock lock( m_mutex );
80 CORE_ASSERT( predecessor.isValid() && ( predecessor < m_tasks.size() ),
81 "Invalid predecessor task" );
82 CORE_ASSERT( successor.isValid() && ( successor < m_tasks.size() ),
"Invalid successor task" );
83 CORE_ASSERT( predecessor != successor,
"Cannot add self-dependency" );
85 CORE_ASSERT(
std::find( m_dependencies[predecessor].begin(),
86 m_dependencies[predecessor].end(),
87 successor ) == m_dependencies[predecessor].end(),
88 "Cannot add a dependency twice" );
90 m_dependencies[predecessor].
push_back( successor );
91 ++m_remainingDependencies[successor];
97 auto predecessorId = getTaskId( predecessor );
99 if ( predecessorId.isValid() ) {
109 auto successorId = getTaskId( successor );
110 if ( successorId.isValid() ) {
119 wlock lock( m_mutex );
120 m_pendingDepsSucc.
emplace_back( predecessors, successor );
124 wlock lock( m_mutex );
125 m_pendingDepsPre.
emplace_back( predecessor, successors );
128void TaskQueue::resolveDependencies() {
129 for (
const auto& pre : m_pendingDepsPre ) {
130 ON_ASSERT(
bool result = )
addDependency( pre.first, pre.second );
131 CORE_WARN_IF( !result,
132 "Pending dependency unresolved : " << m_tasks[pre.first]->getName() << " -> ("
133 << pre.second << ")" );
135 for ( const auto& pre : m_pendingDepsSucc ) {
136 ON_ASSERT(
bool result = )
addDependency( pre.first, pre.second );
137 CORE_WARN_IF( !result,
138 "Pending dependency unresolved : (" << pre.first << ") -> "
139 << m_tasks[pre.second]->getName() );
142 wlock lock( m_mutex );
143 m_pendingDepsPre.clear();
144 m_pendingDepsSucc.clear();
150 CORE_ASSERT( m_remainingDependencies[task] == 0,
151 " Task" << m_tasks[task]->getName() <<
"has unmet dependencies" );
156void TaskQueue::detectCycles() {
157#if defined( CORE_DEBUG )
161 rlock
lock( m_mutex );
162 for ( uint
id = 0;
id < m_tasks.size(); ++id ) {
163 if ( m_dependencies[
id].size() == 0 ) { pending.
push(
TaskId(
id ) ); }
168 CORE_ASSERT( m_tasks.empty() || !pending.
empty(),
"No free tasks." );
170 while ( !pending.
empty() ) {
175 CORE_ASSERT( !( visited[
id] ),
"Cycle detected in tasks !" );
178 for (
const auto& dep : m_dependencies[
id] ) {
186 using namespace Ra::Core::Utils;
188 rlock lock( m_mutex );
189 if ( m_workerThreads.
empty() ) {
190 LOG( logERROR ) <<
"TaskQueue as 0 threads, could not start tasks in parallel. Either "
191 "create a task queue with more threads, or use runTasksInThisThread";
196 resolveDependencies();
203 wlock lock( m_mutex );
204 for ( uint t = 0; t < m_tasks.size(); ++t ) {
207 if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) { queueTask(
TaskId { t } ); }
222 resolveDependencies();
228 for ( uint t = 0; t < m_tasks.size(); ++t ) {
230 if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) {
236 while ( !taskQueue.
empty() ) {
239 task = taskQueue.
back();
244 m_timerData[task].threadId = 0;
245 m_tasks[task]->process();
248 for (
auto t : m_dependencies[task] ) {
249 uint& nDepends = m_remainingDependencies[t];
250 CORE_ASSERT( nDepends > 0,
"Inconsistency in dependencies" );
260 rlock lock( m_mutex );
261 m_waitForTasksNotifier.
wait(
262 lock, [
this]() {
return ( m_taskQueue.
empty() && m_processingTasks == 0 ); } );
271 wlock lock( m_mutex );
273 CORE_ASSERT( m_processingTasks == 0,
"You have tasks still in process" );
274 CORE_ASSERT( m_taskQueue.
empty(),
" You have unprocessed tasks " );
277 m_dependencies.
clear();
279 m_remainingDependencies.
clear();
282void TaskQueue::runThread( uint
id ) {
288 wlock lock( m_mutex );
290 m_threadNotifier.
wait( lock,
291 [
this]() {
return m_shuttingDown || !m_taskQueue.
empty(); } );
294 if ( m_shuttingDown ) {
return; }
297 task = m_taskQueue.
back();
300 CORE_ASSERT( task.isValid() && task < m_tasks.size(),
"Invalid task" );
307 rlock
lock( m_mutex );
309 m_timerData[task].threadId = id;
311 m_tasks[task]->process();
313 rlock
lock( m_mutex );
320 wlock
lock( m_mutex );
321 for (
auto t : m_dependencies[task] ) {
322 uint& nDepends = m_remainingDependencies[t];
323 CORE_ASSERT( nDepends > 0,
"Inconsistency in dependencies" );
325 if ( nDepends == 0 ) {
331 if ( m_processingTasks == 0 ) { m_waitForTasksNotifier.
notify_one(); }
334 if ( newTasks > 0 ) { m_threadNotifier.
notify_all(); }
339 output <<
"digraph tasks {" <<
std::endl;
341 for (
const auto& t : m_tasks ) {
342 output <<
"\"" << t->getName() <<
"\"" <<
std::endl;
345 for ( uint i = 0; i < m_dependencies.
size(); ++i ) {
346 const auto& task1 = m_tasks[i];
347 for (
const auto& dep : m_dependencies[i] ) {
348 const auto& task2 = m_tasks[dep];
349 output <<
"\"" << task1->getName() <<
"\""
351 output <<
"\"" << task2->getName() <<
"\"" <<
std::endl;
355 for (
const auto& preDep : m_pendingDepsPre ) {
356 const auto& task1 = m_tasks[preDep.first];
359 if (
std::find_if( m_tasks.begin(), m_tasks.end(), [=](
const auto& task ) {
360 return task->getName() == t2name;
361 } ) == m_tasks.end() ) {
364 output <<
"\"" << task1->getName() <<
"\""
366 output <<
"\"" << t2name <<
"\"" <<
std::endl;
369 for (
const auto& postDep : m_pendingDepsSucc ) {
371 const auto& t2 = m_tasks[postDep.second];
373 if (
std::find_if( m_tasks.begin(), m_tasks.end(), [=](
const auto& task ) {
374 return task->getName() == t1name;
375 } ) == m_tasks.end() ) {
378 output <<
"\"" << t1name <<
"\""
380 output <<
"\"" << t2->getName() <<
"\"" <<
std::endl;
void flushTaskQueue()
Erases all tasks. Will assert if tasks are unprocessed.
TaskQueue(uint numThreads)
void printTaskGraph(std::ostream &output) const
Prints the current task graph in dot format.
const std::vector< TimerData > & getTimerData()
Access the data from the last frame execution after processTaskQueue();.
void removeTask(TaskId taskId)
Utils::Index TaskId
Identifier for a task in the task queue.
void addPendingDependency(const std::string &predecessors, TaskId successor)
void addDependency(TaskId predecessor, TaskId successor)
TaskId registerTask(std::unique_ptr< Task > task)
void runTasksInThisThread()
void waitForTasks()
Blocks until all tasks and dependencies are finished.
~TaskQueue()
Destructor. Waits for all the threads and safely deletes them.
T emplace_back(T... args)
hepler function to manage enum as underlying types in VariableSet
Record of a task's start and end time.