1 #include <Core/Tasks/Task.hpp>
2 #include <Core/Tasks/TaskQueue.hpp>
3 #include <Core/Utils/Log.hpp>
15 m_workerThreads.reserve( numThreads );
16 for ( uint i = 0; i < numThreads; ++i ) {
17 m_workerThreads.emplace_back( &TaskQueue::runThread,
this, i );
23 m_shuttingDown =
true;
24 m_threadNotifier.notify_all();
25 for (
auto& t : m_workerThreads ) {
31 std::lock_guard<std::mutex> lock( m_taskMutex );
34 tdata.taskName = task->getName();
35 m_timerData.push_back( tdata );
37 m_tasks.push_back( std::move( task ) );
38 m_dependencies.push_back( std::vector<TaskId>() );
39 m_remainingDependencies.push_back( 0 );
41 CORE_ASSERT( m_tasks.size() == m_dependencies.size(),
"Inconsistent task list" );
42 CORE_ASSERT( m_tasks.size() == m_remainingDependencies.size(),
"Inconsistent task list" );
43 CORE_ASSERT( m_tasks.size() == m_timerData.size(),
"Inconsistent task list" );
44 return TaskId { m_tasks.size() - 1 };
49 if ( taskId.isInvalid() || taskId > m_tasks.size() ) {
50 LOG( Utils::logDEBUG ) <<
"try to remove task " << taskId <<
" which is out of bounds "
54 std::lock_guard<std::mutex> lock( m_taskMutex );
57 m_tasks[taskId] = std::make_unique<FunctionTask>( []() {}, m_timerData[taskId].taskName );
59 CORE_ASSERT( m_tasks.size() == m_dependencies.size(),
"Inconsistent task list" );
60 CORE_ASSERT( m_tasks.size() == m_remainingDependencies.size(),
"Inconsistent task list" );
61 CORE_ASSERT( m_tasks.size() == m_timerData.size(),
"Inconsistent task list" );
65 auto itr = std::find_if( m_tasks.begin(), m_tasks.end(), [taskName](
const auto& task ) {
66 return task->getName() == taskName;
69 if ( itr == m_tasks.end() )
return {};
70 return TaskId { itr - m_tasks.begin() };
74 std::lock_guard<std::mutex> lock( m_taskMutex );
76 CORE_ASSERT( predecessor.isValid() && ( predecessor < m_tasks.size() ),
77 "Invalid predecessor task" );
78 CORE_ASSERT( successor.isValid() && ( successor < m_tasks.size() ),
"Invalid successor task" );
79 CORE_ASSERT( predecessor != successor,
"Cannot add self-dependency" );
81 CORE_ASSERT( std::find( m_dependencies[predecessor].begin(),
82 m_dependencies[predecessor].end(),
83 successor ) == m_dependencies[predecessor].end(),
84 "Cannot add a dependency twice" );
86 m_dependencies[predecessor].push_back( successor );
87 ++m_remainingDependencies[successor];
93 auto predecessorId = getTaskId( predecessor );
95 if ( predecessorId.isValid() ) {
105 auto successorId = getTaskId( successor );
106 if ( successorId.isValid() ) {
115 std::lock_guard<std::mutex> lock( m_taskMutex );
116 m_pendingDepsSucc.emplace_back( predecessors, successor );
120 std::lock_guard<std::mutex> lock( m_taskMutex );
121 m_pendingDepsPre.emplace_back( predecessor, successors );
124 void TaskQueue::resolveDependencies() {
125 for (
const auto& pre : m_pendingDepsPre ) {
126 ON_ASSERT(
bool result = )
addDependency( pre.first, pre.second );
127 CORE_WARN_IF( !result,
128 "Pending dependency unresolved : " << m_tasks[pre.first]->getName() << " -> ("
129 << pre.second << ")" );
131 for ( const auto& pre : m_pendingDepsSucc ) {
132 ON_ASSERT(
bool result = )
addDependency( pre.first, pre.second );
133 CORE_WARN_IF( !result,
134 "Pending dependency unresolved : (" << pre.first << ") -> "
135 << m_tasks[pre.second]->getName() );
137 std::lock_guard<std::mutex> lock( m_taskMutex );
138 m_pendingDepsPre.clear();
139 m_pendingDepsSucc.clear();
144 CORE_ASSERT( m_remainingDependencies[task] == 0,
145 " Task" << m_tasks[task]->getName() <<
"has unmet dependencies" );
147 m_taskQueue.push_front( task );
150 void TaskQueue::detectCycles() {
151 #if defined( CORE_DEBUG )
153 std::vector<bool> visited( m_tasks.size(),
false );
154 std::stack<TaskId> pending;
156 for ( uint
id = 0;
id < m_tasks.size(); ++id ) {
157 if ( m_dependencies[
id].size() == 0 ) { pending.push(
TaskId(
id ) ); }
162 CORE_ASSERT( m_tasks.empty() || !pending.empty(),
"No free tasks." );
164 while ( !pending.empty() ) {
165 TaskId id = pending.top();
169 CORE_ASSERT( !( visited[
id] ),
"Cycle detected in tasks !" );
172 for (
const auto& dep : m_dependencies[
id] ) {
180 using namespace Ra::Core::Utils;
181 if ( m_workerThreads.empty() ) {
182 LOG( logERROR ) <<
"TaskQueue as 0 threads, could not start tasks in parallel. Either "
183 "create a task queue with more threads, or use runTasksInThisThread";
188 resolveDependencies();
194 for ( uint t = 0; t < m_tasks.size(); ++t ) {
196 if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) { queueTask(
TaskId { t } ); }
200 m_threadNotifier.notify_all();
206 std::lock_guard<std::mutex> lock( m_taskQueueMutex );
209 resolveDependencies();
215 for ( uint t = 0; t < m_tasks.size(); ++t ) {
217 if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) { queueTask(
TaskId { t } ); }
219 while ( !m_taskQueue.empty() ) {
220 TaskId task { m_taskQueue.back() };
221 m_taskQueue.pop_back();
224 m_timerData[task].start = Utils::Clock::now();
225 m_timerData[task].threadId = 0;
226 m_tasks[task]->process();
227 m_timerData[task].end = Utils::Clock::now();
229 for (
auto t : m_dependencies[task] ) {
230 uint& nDepends = m_remainingDependencies[t];
231 CORE_ASSERT( nDepends > 0,
"Inconsistency in dependencies" );
233 if ( nDepends == 0 ) { queueTask( t ); }
240 bool isFinished =
false;
241 while ( !isFinished ) {
244 std::lock_guard<std::mutex> lock( m_taskQueueMutex );
245 isFinished = ( m_taskQueue.empty() && m_processingTasks == 0 );
247 if ( !isFinished ) { std::this_thread::yield(); }
256 std::lock_guard<std::mutex> lock( m_taskMutex );
258 CORE_ASSERT( m_processingTasks == 0,
"You have tasks still in process" );
259 CORE_ASSERT( m_taskQueue.empty(),
" You have unprocessed tasks " );
262 m_dependencies.clear();
264 m_remainingDependencies.clear();
267 void TaskQueue::runThread( uint
id ) {
273 std::unique_lock<std::mutex> lock( m_taskQueueMutex );
276 m_threadNotifier.wait( lock,
277 [
this]() {
return m_shuttingDown || !m_taskQueue.empty(); } );
280 if ( m_shuttingDown ) {
return; }
283 task = m_taskQueue.back();
284 m_taskQueue.pop_back();
286 CORE_ASSERT( task.isValid() && task < m_tasks.size(),
"Invalid task" );
291 m_timerData[task].start = Utils::Clock::now();
292 m_timerData[task].threadId = id;
293 m_tasks[task]->process();
294 m_timerData[task].end = Utils::Clock::now();
299 std::unique_lock<std::mutex> lock( m_taskQueueMutex );
300 for (
auto t : m_dependencies[task] ) {
301 uint& nDepends = m_remainingDependencies[t];
302 CORE_ASSERT( nDepends > 0,
"Inconsistency in dependencies" );
304 if ( nDepends == 0 ) {
313 if ( newTasks > 0 ) { m_threadNotifier.notify_one(); }
318 output <<
"digraph tasks {" << std::endl;
320 for (
const auto& t : m_tasks ) {
321 output <<
"\"" << t->getName() <<
"\"" << std::endl;
324 for ( uint i = 0; i < m_dependencies.size(); ++i ) {
325 const auto& task1 = m_tasks[i];
326 for (
const auto& dep : m_dependencies[i] ) {
327 const auto& task2 = m_tasks[dep];
328 output <<
"\"" << task1->getName() <<
"\""
330 output <<
"\"" << task2->getName() <<
"\"" << std::endl;
334 for (
const auto& preDep : m_pendingDepsPre ) {
335 const auto& task1 = m_tasks[preDep.first];
336 std::string t2name = preDep.second;
338 if ( std::find_if( m_tasks.begin(), m_tasks.end(), [=](
const auto& task ) {
339 return task->getName() == t2name;
340 } ) == m_tasks.end() ) {
343 output <<
"\"" << task1->getName() <<
"\""
345 output <<
"\"" << t2name <<
"\"" << std::endl;
348 for (
const auto& postDep : m_pendingDepsSucc ) {
349 std::string t1name = postDep.first;
350 const auto& t2 = m_tasks[postDep.second];
352 if ( std::find_if( m_tasks.begin(), m_tasks.end(), [=](
const auto& task ) {
353 return task->getName() == t1name;
354 } ) == m_tasks.end() ) {
357 output <<
"\"" << t1name <<
"\""
359 output <<
"\"" << t2->getName() <<
"\"" << std::endl;
362 output <<
"}" << std::endl;
void flushTaskQueue()
Erases all tasks. Will assert if tasks are unprocessed.
TaskQueue(uint numThreads)
void printTaskGraph(std::ostream &output) const
Prints the current task graph in dot format.
const std::vector< TimerData > & getTimerData()
Access the data from the last frame execution after processTaskQueue();.
void removeTask(TaskId taskId)
Utils::Index TaskId
Identifier for a task in the task queue.
void addPendingDependency(const std::string &predecessors, TaskId successor)
void addDependency(TaskId predecessor, TaskId successor)
TaskId registerTask(std::unique_ptr< Task > task)
void runTasksInThisThread()
void waitForTasks()
Blocks until all tasks and dependencies are finished.
~TaskQueue()
Destructor. Waits for all the threads and safely deletes them.
Record of a task's start and end time.