Radium Engine  1.5.0
TaskQueue.cpp
1 #include <Core/Tasks/Task.hpp>
2 #include <Core/Tasks/TaskQueue.hpp>
3 #include <Core/Utils/Log.hpp>
4 
5 #include <algorithm>
6 #include <iostream>
7 #include <memory>
8 #include <mutex>
9 #include <stack>
10 
11 namespace Ra {
12 namespace Core {
13 
14 TaskQueue::TaskQueue( uint numThreads ) : m_processingTasks( 0 ), m_shuttingDown( false ) {
15  m_workerThreads.reserve( numThreads );
16  for ( uint i = 0; i < numThreads; ++i ) {
17  m_workerThreads.emplace_back( &TaskQueue::runThread, this, i );
18  }
19 }
20 
23  m_shuttingDown = true;
24  m_threadNotifier.notify_all();
25  for ( auto& t : m_workerThreads ) {
26  t.join();
27  }
28 }
29 
30 TaskQueue::TaskId TaskQueue::registerTask( std::unique_ptr<Task> task ) {
31  std::lock_guard<std::mutex> lock( m_taskMutex );
32  TimerData tdata;
33  // init tdata with task name before moving ownership
34  tdata.taskName = task->getName();
35  m_timerData.push_back( tdata );
36 
37  m_tasks.push_back( std::move( task ) );
38  m_dependencies.push_back( std::vector<TaskId>() );
39  m_remainingDependencies.push_back( 0 );
40 
41  CORE_ASSERT( m_tasks.size() == m_dependencies.size(), "Inconsistent task list" );
42  CORE_ASSERT( m_tasks.size() == m_remainingDependencies.size(), "Inconsistent task list" );
43  CORE_ASSERT( m_tasks.size() == m_timerData.size(), "Inconsistent task list" );
44  return TaskId { m_tasks.size() - 1 };
45 }
46 
47 void TaskQueue::removeTask( TaskId taskId ) {
48 
49  if ( taskId.isInvalid() || taskId > m_tasks.size() ) {
50  LOG( Utils::logDEBUG ) << "try to remove task " << taskId << " which is out of bounds "
51  << m_tasks.size();
52  return;
53  }
54  std::lock_guard<std::mutex> lock( m_taskMutex );
55 
56  // set task as dummy noop
57  m_tasks[taskId] = std::make_unique<FunctionTask>( []() {}, m_timerData[taskId].taskName );
58 
59  CORE_ASSERT( m_tasks.size() == m_dependencies.size(), "Inconsistent task list" );
60  CORE_ASSERT( m_tasks.size() == m_remainingDependencies.size(), "Inconsistent task list" );
61  CORE_ASSERT( m_tasks.size() == m_timerData.size(), "Inconsistent task list" );
62 }
63 
64 TaskQueue::TaskId TaskQueue::getTaskId( const std::string& taskName ) const {
65  auto itr = std::find_if( m_tasks.begin(), m_tasks.end(), [taskName]( const auto& task ) {
66  return task->getName() == taskName;
67  } );
68 
69  if ( itr == m_tasks.end() ) return {};
70  return TaskId { itr - m_tasks.begin() };
71 }
72 
74  std::lock_guard<std::mutex> lock( m_taskMutex );
75 
76  CORE_ASSERT( predecessor.isValid() && ( predecessor < m_tasks.size() ),
77  "Invalid predecessor task" );
78  CORE_ASSERT( successor.isValid() && ( successor < m_tasks.size() ), "Invalid successor task" );
79  CORE_ASSERT( predecessor != successor, "Cannot add self-dependency" );
80 
81  CORE_ASSERT( std::find( m_dependencies[predecessor].begin(),
82  m_dependencies[predecessor].end(),
83  successor ) == m_dependencies[predecessor].end(),
84  "Cannot add a dependency twice" );
85 
86  m_dependencies[predecessor].push_back( successor );
87  ++m_remainingDependencies[successor];
88 }
89 
90 bool TaskQueue::addDependency( const std::string& predecessor, TaskQueue::TaskId successor ) {
91  bool added = false;
92 
93  auto predecessorId = getTaskId( predecessor );
94 
95  if ( predecessorId.isValid() ) {
96  added = true;
97  addDependency( predecessorId, successor );
98  }
99  return added;
100 }
101 
102 bool TaskQueue::addDependency( TaskQueue::TaskId predecessor, const std::string& successor ) {
103  bool added = false;
104 
105  auto successorId = getTaskId( successor );
106  if ( successorId.isValid() ) {
107  added = true;
108  addDependency( predecessor, successorId );
109  }
110  return added;
111 }
112 
113 void TaskQueue::addPendingDependency( const std::string& predecessors,
114  TaskQueue::TaskId successor ) {
115  std::lock_guard<std::mutex> lock( m_taskMutex );
116  m_pendingDepsSucc.emplace_back( predecessors, successor );
117 }
118 
119 void TaskQueue::addPendingDependency( TaskId predecessor, const std::string& successors ) {
120  std::lock_guard<std::mutex> lock( m_taskMutex );
121  m_pendingDepsPre.emplace_back( predecessor, successors );
122 }
123 
124 void TaskQueue::resolveDependencies() {
125  for ( const auto& pre : m_pendingDepsPre ) {
126  ON_ASSERT( bool result = ) addDependency( pre.first, pre.second );
127  CORE_WARN_IF( !result,
128  "Pending dependency unresolved : " << m_tasks[pre.first]->getName() << " -> ("
129  << pre.second << ")" );
130  }
131  for ( const auto& pre : m_pendingDepsSucc ) {
132  ON_ASSERT( bool result = ) addDependency( pre.first, pre.second );
133  CORE_WARN_IF( !result,
134  "Pending dependency unresolved : (" << pre.first << ") -> "
135  << m_tasks[pre.second]->getName() );
136  }
137  std::lock_guard<std::mutex> lock( m_taskMutex );
138  m_pendingDepsPre.clear();
139  m_pendingDepsSucc.clear();
140 }
141 
142 // queueTask is always called with m_taskQueueMutex locked
143 void TaskQueue::queueTask( TaskQueue::TaskId task ) {
144  CORE_ASSERT( m_remainingDependencies[task] == 0,
145  " Task" << m_tasks[task]->getName() << "has unmet dependencies" );
146 
147  m_taskQueue.push_front( task );
148 }
149 
150 void TaskQueue::detectCycles() {
151 #if defined( CORE_DEBUG )
152  // Do a depth-first search of the nodes.
153  std::vector<bool> visited( m_tasks.size(), false );
154  std::stack<TaskId> pending;
155 
156  for ( uint id = 0; id < m_tasks.size(); ++id ) {
157  if ( m_dependencies[id].size() == 0 ) { pending.push( TaskId( id ) ); }
158  }
159 
160  // If you hit this assert, there are tasks in the list but
161  // all tasks have dependencies so no task can start.
162  CORE_ASSERT( m_tasks.empty() || !pending.empty(), "No free tasks." );
163 
164  while ( !pending.empty() ) {
165  TaskId id = pending.top();
166  pending.pop();
167 
168  // The task has already been visited. It means there is a cycle in the task graph.
169  CORE_ASSERT( !( visited[id] ), "Cycle detected in tasks !" );
170 
171  visited[id] = true;
172  for ( const auto& dep : m_dependencies[id] ) {
173  pending.push( dep );
174  }
175  }
176 #endif
177 }
178 
180  using namespace Ra::Core::Utils;
181  if ( m_workerThreads.empty() ) {
182  LOG( logERROR ) << "TaskQueue as 0 threads, could not start tasks in parallel. Either "
183  "create a task queue with more threads, or use runTasksInThisThread";
184  return;
185  }
186 
187  // Add pending dependencies.
188  resolveDependencies();
189 
190  // Do a debug check
191  detectCycles();
192 
193  // Enqueue all tasks with no dependencies.
194  for ( uint t = 0; t < m_tasks.size(); ++t ) {
195  // only queue non null m_tasks
196  if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) { queueTask( TaskId { t } ); }
197  }
198 
199  // Wake up all threads.
200  m_threadNotifier.notify_all();
201 }
202 
204 
205  // lock task queue so no other worker can start working while this thread do the job.
206  std::lock_guard<std::mutex> lock( m_taskQueueMutex );
207 
208  // Add pending dependencies.
209  resolveDependencies();
210 
211  // Do a debug check
212  detectCycles();
213 
214  // Enqueue all tasks with no dependencies.
215  for ( uint t = 0; t < m_tasks.size(); ++t ) {
216  // only queue non null m_tasks
217  if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) { queueTask( TaskId { t } ); }
218  }
219  while ( !m_taskQueue.empty() ) {
220  TaskId task { m_taskQueue.back() };
221  m_taskQueue.pop_back();
222 
223  // Run task
224  m_timerData[task].start = Utils::Clock::now();
225  m_timerData[task].threadId = 0;
226  m_tasks[task]->process();
227  m_timerData[task].end = Utils::Clock::now();
228 
229  for ( auto t : m_dependencies[task] ) {
230  uint& nDepends = m_remainingDependencies[t];
231  CORE_ASSERT( nDepends > 0, "Inconsistency in dependencies" );
232  --nDepends;
233  if ( nDepends == 0 ) { queueTask( t ); }
234  }
235  }
236  flushTaskQueue();
237 }
238 
240  bool isFinished = false;
241  while ( !isFinished ) {
242  // TODO : use a notifier for task queue empty.
243  {
244  std::lock_guard<std::mutex> lock( m_taskQueueMutex );
245  isFinished = ( m_taskQueue.empty() && m_processingTasks == 0 );
246  }
247  if ( !isFinished ) { std::this_thread::yield(); }
248  }
249 }
250 
251 const std::vector<TaskQueue::TimerData>& TaskQueue::getTimerData() {
252  return m_timerData;
253 }
254 
256  std::lock_guard<std::mutex> lock( m_taskMutex );
257 
258  CORE_ASSERT( m_processingTasks == 0, "You have tasks still in process" );
259  CORE_ASSERT( m_taskQueue.empty(), " You have unprocessed tasks " );
260 
261  m_tasks.clear();
262  m_dependencies.clear();
263  m_timerData.clear();
264  m_remainingDependencies.clear();
265 }
266 
267 void TaskQueue::runThread( uint id ) {
268  while ( true ) {
269  TaskId task;
270 
271  // Acquire mutex.
272  {
273  std::unique_lock<std::mutex> lock( m_taskQueueMutex );
274 
275  // Wait for a new task
276  m_threadNotifier.wait( lock,
277  [this]() { return m_shuttingDown || !m_taskQueue.empty(); } );
278  // If the task queue is shutting down we quit, releasing
279  // the lock.
280  if ( m_shuttingDown ) { return; }
281 
282  // If we are here it means we got a task
283  task = m_taskQueue.back();
284  m_taskQueue.pop_back();
285  ++m_processingTasks;
286  CORE_ASSERT( task.isValid() && task < m_tasks.size(), "Invalid task" );
287  }
288  // Release mutex.
289 
290  // Run task
291  m_timerData[task].start = Utils::Clock::now();
292  m_timerData[task].threadId = id;
293  m_tasks[task]->process();
294  m_timerData[task].end = Utils::Clock::now();
295 
296  // Critical section : mark task as finished and en-queue dependencies.
297  uint newTasks = 0;
298  {
299  std::unique_lock<std::mutex> lock( m_taskQueueMutex );
300  for ( auto t : m_dependencies[task] ) {
301  uint& nDepends = m_remainingDependencies[t];
302  CORE_ASSERT( nDepends > 0, "Inconsistency in dependencies" );
303  --nDepends;
304  if ( nDepends == 0 ) {
305  queueTask( t );
306  ++newTasks;
307  }
308  // TODO :Easy optimization : grab one of the new task and process it immediately.
309  }
310  --m_processingTasks;
311  }
312  // If we added new tasks, we wake up one thread to execute it.
313  if ( newTasks > 0 ) { m_threadNotifier.notify_one(); }
314  } // End of while(true)
315 }
316 
317 void TaskQueue::printTaskGraph( std::ostream& output ) const {
318  output << "digraph tasks {" << std::endl;
319 
320  for ( const auto& t : m_tasks ) {
321  output << "\"" << t->getName() << "\"" << std::endl;
322  }
323 
324  for ( uint i = 0; i < m_dependencies.size(); ++i ) {
325  const auto& task1 = m_tasks[i];
326  for ( const auto& dep : m_dependencies[i] ) {
327  const auto& task2 = m_tasks[dep];
328  output << "\"" << task1->getName() << "\""
329  << " -> ";
330  output << "\"" << task2->getName() << "\"" << std::endl;
331  }
332  }
333 
334  for ( const auto& preDep : m_pendingDepsPre ) {
335  const auto& task1 = m_tasks[preDep.first];
336  std::string t2name = preDep.second;
337 
338  if ( std::find_if( m_tasks.begin(), m_tasks.end(), [=]( const auto& task ) {
339  return task->getName() == t2name;
340  } ) == m_tasks.end() ) {
341  t2name += "?";
342  }
343  output << "\"" << task1->getName() << "\""
344  << " -> ";
345  output << "\"" << t2name << "\"" << std::endl;
346  }
347 
348  for ( const auto& postDep : m_pendingDepsSucc ) {
349  std::string t1name = postDep.first;
350  const auto& t2 = m_tasks[postDep.second];
351 
352  if ( std::find_if( m_tasks.begin(), m_tasks.end(), [=]( const auto& task ) {
353  return task->getName() == t1name;
354  } ) == m_tasks.end() ) {
355  t1name += "?";
356  }
357  output << "\"" << t1name << "\""
358  << " -> ";
359  output << "\"" << t2->getName() << "\"" << std::endl;
360  }
361 
362  output << "}" << std::endl;
363 }
364 } // namespace Core
365 } // namespace Ra
void flushTaskQueue()
Erases all tasks. Will assert if tasks are unprocessed.
Definition: TaskQueue.cpp:255
TaskQueue(uint numThreads)
Definition: TaskQueue.cpp:14
void printTaskGraph(std::ostream &output) const
Prints the current task graph in dot format.
Definition: TaskQueue.cpp:317
const std::vector< TimerData > & getTimerData()
Access the data from the last frame execution after processTaskQueue();.
Definition: TaskQueue.cpp:251
void removeTask(TaskId taskId)
Definition: TaskQueue.cpp:47
Utils::Index TaskId
Identifier for a task in the task queue.
Definition: TaskQueue.hpp:35
void addPendingDependency(const std::string &predecessors, TaskId successor)
Definition: TaskQueue.cpp:113
void addDependency(TaskId predecessor, TaskId successor)
Definition: TaskQueue.cpp:73
TaskId registerTask(std::unique_ptr< Task > task)
Definition: TaskQueue.cpp:30
void runTasksInThisThread()
Definition: TaskQueue.cpp:203
void waitForTasks()
Blocks until all tasks and dependencies are finished.
Definition: TaskQueue.cpp:239
~TaskQueue()
Destructor. Waits for all the threads and safely deletes them.
Definition: TaskQueue.cpp:21
Definition: Cage.cpp:3
Record of a task's start and end time.
Definition: TaskQueue.hpp:38