Radium Engine  1.5.0
TaskQueue.hpp
1 #pragma once
2 
3 #include <Core/RaCore.hpp>
4 #include <Core/Utils/Index.hpp>
5 #include <Core/Utils/Timer.hpp> // Ra::Core::TimePoint
6 
7 #include <condition_variable>
8 #include <deque>
9 #include <memory>
10 #include <mutex>
11 #include <string>
12 #include <thread>
13 #include <vector>
14 
15 namespace Ra {
16 namespace Core {
17 class Task;
18 }
19 } // namespace Ra
20 
21 namespace Ra {
22 namespace Core {
31 class RA_CORE_API TaskQueue
32 {
33  public:
35  using TaskId = Utils::Index;
36 
38  struct TimerData {
39  Utils::TimePoint start;
40  Utils::TimePoint end;
41  uint threadId;
42  std::string taskName;
43  };
44 
45  public:
48  explicit TaskQueue( uint numThreads );
49 
51  ~TaskQueue();
52 
53  //
54  // Task management
55  //
56 
60  TaskId registerTask( std::unique_ptr<Task> task );
61 
65  void removeTask( TaskId taskId );
66 
67  TaskId getTaskId( const std::string& taskName ) const;
68 
71  void addDependency( TaskId predecessor, TaskId successor );
72 
75  bool addDependency( const std::string& predecessors, TaskId successor );
76  bool addDependency( TaskId predecessor, const std::string& successors );
77 
80  void addPendingDependency( const std::string& predecessors, TaskId successor );
81  void addPendingDependency( TaskId predecessor, const std::string& successors );
82 
83  //
84  // Task queue operations
85  //
86 
89  void startTasks();
90 
95  void runTasksInThisThread();
96 
98  void waitForTasks();
99 
101  const std::vector<TimerData>& getTimerData();
102 
104  void flushTaskQueue();
105 
107  void printTaskGraph( std::ostream& output ) const;
108 
109  private:
111  void runThread( uint id );
112 
115  void queueTask( TaskId task );
116 
119  void detectCycles();
120 
122  void resolveDependencies();
123 
124  private:
126  std::vector<std::thread> m_workerThreads;
128  std::vector<std::unique_ptr<Task>> m_tasks;
130  std::vector<std::vector<TaskId>> m_dependencies;
131 
133  std::vector<std::pair<TaskId, std::string>> m_pendingDepsPre;
134  std::vector<std::pair<std::string, TaskId>> m_pendingDepsSucc;
135 
137  std::vector<TimerData> m_timerData;
138 
139  //
140  // mutex protected variables.
141  //
142 
144  std::vector<uint> m_remainingDependencies;
146  std::deque<TaskId> m_taskQueue;
148  uint m_processingTasks;
149 
151  bool m_shuttingDown;
153  std::condition_variable m_threadNotifier;
155  std::mutex m_taskQueueMutex;
158  std::mutex m_taskMutex;
159 };
160 
161 } // namespace Core
162 } // namespace Ra
Utils::Index TaskId
Identifier for a task in the task queue.
Definition: TaskQueue.hpp:35
Definition: Cage.cpp:3
Record of a task's start and end time.
Definition: TaskQueue.hpp:38