Loading [MathJax]/extensions/TeX/AMSmath.js
Radium Engine  1.5.29
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
taskqueue.cpp
1#include <Core/Tasks/Task.hpp>
2#include <Core/Tasks/TaskQueue.hpp>
3#include <Core/Utils/Index.hpp>
4#include <catch2/catch_test_macros.hpp>
5#include <chrono>
6#include <memory>
7#include <sstream>
8#include <string>
9#include <thread>
10#include <utility>
11
12using namespace Ra::Core;
13using namespace Ra::Core::Utils;
14
15// run some dummy task, to check that there is no deadlock
16TEST_CASE( "Core/TaskQueueInit", "[unittests][Core][TaskQueue]" ) {
17 for ( int i = 0; i < 5; ++i ) {
18 TaskQueue taskQueue1( 10 );
19 for ( int j = 0; j < 20; ++j ) {
20 taskQueue1.registerTask( std::make_unique<FunctionTask>(
22 }
23 taskQueue1.startTasks();
24 taskQueue1.waitForTasks();
25 taskQueue1.flushTaskQueue();
26 }
27 for ( int i = 0; i < 50; ++i ) {
28 TaskQueue taskQueue1( 10 );
29 TaskQueue taskQueue2( 20 );
30 TaskQueue taskQueue3( 30 );
31 taskQueue1.startTasks();
32 taskQueue1.waitForTasks();
33 taskQueue1.flushTaskQueue();
34 taskQueue2.startTasks();
35 taskQueue2.waitForTasks();
36 taskQueue2.flushTaskQueue();
37 taskQueue3.startTasks();
38 taskQueue3.waitForTasks();
39 taskQueue3.flushTaskQueue();
40 }
41}
42
43TEST_CASE( "Core/TaskQueue", "[unittests][Core][TaskQueue]" ) {
44 TaskQueue taskQueue( 4 );
45
46 const int arraySize = 7; // if changed, update test values also
47 int array[arraySize] = { -1, -1, -1, -1, -1, -1, -1 };
48
49 SECTION( "no dependency" ) {
50 for ( int tidx = 0; tidx < arraySize; ++tidx ) {
51 auto task =
52 std::make_unique<FunctionTask>( [&array, tidx]() { array[tidx] = tidx; },
53 std::string( "task " ) + std::to_string( tidx ) );
54 auto tid = taskQueue.registerTask( std::move( task ) );
55 REQUIRE( tid ==
56 taskQueue.getTaskId( std::string( "task " ) + std::to_string( tidx ) ) );
57 }
59 taskQueue.printTaskGraph( oss );
60 std::string s = oss.str();
61 REQUIRE( s == "digraph tasks {\n"
62 "\"task 0\"\n"
63 "\"task 1\"\n"
64 "\"task 2\"\n"
65 "\"task 3\"\n"
66 "\"task 4\"\n"
67 "\"task 5\"\n"
68 "\"task 6\"\n"
69 "}\n" );
70 SECTION( "parallel run" ) {
71 taskQueue.startTasks();
72 taskQueue.waitForTasks();
73 taskQueue.flushTaskQueue();
74 }
75 SECTION( "one thread run" ) {
76 taskQueue.runTasksInThisThread();
77 }
78 for ( int tidx = 0; tidx < arraySize; ++tidx ) {
79 REQUIRE( array[tidx] == tidx );
80 }
81 }
82 SECTION( "dependencies" ) {
83 TaskQueue::TaskId tids[arraySize];
84 for ( int tidx = 0; tidx < 4; ++tidx ) {
85 auto task =
86 std::make_unique<FunctionTask>( [&array, tidx]() { array[tidx] = tidx; },
87 std::string( "task " ) + std::to_string( tidx ) );
88 tids[tidx] = taskQueue.registerTask( std::move( task ) );
89 REQUIRE( tids[tidx] ==
90 taskQueue.getTaskId( std::string( "task " ) + std::to_string( tidx ) ) );
91 if ( tidx < 2 ) {
92 taskQueue.addPendingDependency( tids[tidx],
93 std::string( "task " ) + std::to_string( 4 ) );
94 }
95 }
96 for ( int tidx = 4; tidx < 6; ++tidx ) {
97 int pred1 = 2 * ( tidx - 4 );
98 int pred2 = 2 * ( tidx - 4 ) + 1;
99 auto task = std::make_unique<FunctionTask>(
100 [&array, tidx, pred1, pred2]() { array[tidx] = array[pred1] + array[pred2]; },
101 std::string( "task " ) + std::to_string( tidx ) );
102 auto tid = taskQueue.registerTask( std::move( task ) );
103 tids[tidx] = tid;
104 if ( tidx == 5 ) {
105 taskQueue.addDependency( std::string( "task " ) + std::to_string( pred1 ), tid );
106 taskQueue.addDependency( std::string( "task " ) + std::to_string( pred2 ), tid );
107 }
108 }
109 {
110 auto task =
111 std::make_unique<FunctionTask>( [&array]() { array[6] = array[4] + array[5]; },
112 std::string( "task " ) + std::to_string( 6 ) );
113 taskQueue.registerTask( std::move( task ) );
114 taskQueue.addDependency( tids[4], std::string( "task " ) + std::to_string( 6 ) );
115 taskQueue.addDependency( tids[5], std::string( "task " ) + std::to_string( 6 ) );
116 }
117
118 SECTION( "parallel run" ) {
119 taskQueue.startTasks();
120 taskQueue.waitForTasks();
121 taskQueue.flushTaskQueue();
122 }
123 SECTION( "one thread run" ) {
124 taskQueue.runTasksInThisThread();
125 }
126 REQUIRE( array[0] == 0 );
127 REQUIRE( array[1] == 1 );
128 REQUIRE( array[2] == 2 );
129 REQUIRE( array[3] == 3 );
130 REQUIRE( array[4] == 1 ); // 0+1
131 REQUIRE( array[5] == 5 ); // 2+3
132 REQUIRE( array[6] == 6 ); // 5+1
133 }
134
135 SECTION( "remove with dependencies" ) {
136 TaskQueue::TaskId tids[arraySize];
137 for ( int tidx = 0; tidx < 4; ++tidx ) {
138 auto task =
139 std::make_unique<FunctionTask>( [&array, tidx]() { array[tidx] = tidx; },
140 std::string( "task " ) + std::to_string( tidx ) );
141 tids[tidx] = taskQueue.registerTask( std::move( task ) );
142 REQUIRE( tids[tidx] ==
143 taskQueue.getTaskId( std::string( "task " ) + std::to_string( tidx ) ) );
144 if ( tidx < 2 ) {
145 taskQueue.addPendingDependency( tids[tidx],
146 std::string( "task " ) + std::to_string( 4 ) );
147 }
148 }
149 for ( int tidx = 4; tidx < 6; ++tidx ) {
150 int pred1 = 2 * ( tidx - 4 );
151 int pred2 = 2 * ( tidx - 4 ) + 1;
152 auto task = std::make_unique<FunctionTask>(
153 [&array, tidx, pred1, pred2]() { array[tidx] = array[pred1] + array[pred2]; },
154 std::string( "task " ) + std::to_string( tidx ) );
155 auto tid = taskQueue.registerTask( std::move( task ) );
156 tids[tidx] = tid;
157 if ( tidx == 5 ) {
158 taskQueue.addDependency( std::string( "task " ) + std::to_string( pred1 ), tid );
159 taskQueue.addDependency( std::string( "task " ) + std::to_string( pred2 ), tid );
160 }
161 }
162 {
163 auto task =
164 std::make_unique<FunctionTask>( [&array]() { array[6] = array[4] + array[5]; },
165 std::string( "task " ) + std::to_string( 6 ) );
166 tids[6] = taskQueue.registerTask( std::move( task ) );
167 taskQueue.addDependency( tids[4], std::string( "task " ) + std::to_string( 6 ) );
168 taskQueue.addDependency( tids[5], std::string( "task " ) + std::to_string( 6 ) );
169 }
170 taskQueue.removeTask( tids[0] );
171 taskQueue.removeTask( tids[6] );
172
173 // remove invalid task id, silently
174 taskQueue.removeTask( TaskQueue::TaskId {} );
175 taskQueue.removeTask( 100 );
176
177 SECTION( "parallel run" ) {
178 taskQueue.startTasks();
179 taskQueue.waitForTasks();
180 taskQueue.flushTaskQueue();
181 }
182
183 SECTION( "one thread run" ) {
184 taskQueue.runTasksInThisThread();
185 }
186
187 REQUIRE( array[0] == -1 ); // task 0 removed
188 REQUIRE( array[1] == 1 );
189 REQUIRE( array[2] == 2 );
190 REQUIRE( array[3] == 3 );
191 REQUIRE( array[4] == 0 ); // 0+1
192 REQUIRE( array[5] == 5 ); // 2+3
193 REQUIRE( array[6] == -1 ); // task 6 removed
194 }
195}
This class allows tasks to be registered and then executed in parallel on separate threads.
Definition TaskQueue.hpp:50
Utils::Index TaskId
Identifier for a task in the task queue.
Definition TaskQueue.hpp:53
T move(T... args)
This namespace contains everything "low level", related to data, datastuctures, and computation.
Definition Cage.cpp:5
T sleep_for(T... args)
T str(T... args)
T to_string(T... args)