INSTINCT Code Coverage Report


Directory: src/
File: internal/FlowExecutor.cpp
Date: 2025-02-07 16:54:41
Exec Total Coverage
Lines: 112 128 87.5%
Functions: 10 11 90.9%
Branches: 136 256 53.1%

Line Branch Exec Source
1 // This file is part of INSTINCT, the INS Toolkit for Integrated
2 // Navigation Concepts and Training by the Institute of Navigation of
3 // the University of Stuttgart, Germany.
4 //
5 // This Source Code Form is subject to the terms of the Mozilla Public
6 // License, v. 2.0. If a copy of the MPL was not distributed with this
7 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
8
9 #include "FlowExecutor.hpp"
10
11 #include "Navigation/GNSS/Positioning/AntexReader.hpp"
12 #include "util/Logger.hpp"
13 #include "Navigation/Time/InsTime.hpp"
14
15 #include "internal/Node/Node.hpp"
16
17 #include "internal/NodeManager.hpp"
18 namespace nm = NAV::NodeManager;
19 #include "internal/ConfigManager.hpp"
20 #include "util/Time/TimeBase.hpp"
21
22 #include <chrono>
23 #include <map>
24 #include <variant>
25 #include <memory>
26
27 #include <thread>
28 #include <atomic>
29 #include <mutex>
30 #include <condition_variable>
31
32 #ifdef TESTING
33 #include <catch2/catch_test_macros.hpp>
34 #endif
35
36 /* -------------------------------------------------------------------------------------------------------- */
37 /* Private Members */
38 /* -------------------------------------------------------------------------------------------------------- */
39
40 namespace NAV::FlowExecutor
41 {
42 namespace
43 {
44
45 std::mutex _mutex;
46 std::condition_variable _cv;
47
48 enum class State : uint8_t
49 {
50 Idle,
51 Starting,
52 Running,
53 Stopping,
54 };
55 State _state = State::Idle;
56
57 std::thread _thd;
58 std::atomic<size_t> _activeNodes{ 0 };
59 std::chrono::time_point<std::chrono::steady_clock> _startTime;
60
61 /* -------------------------------------------------------------------------------------------------------- */
62 /* Private Function Declarations */
63 /* -------------------------------------------------------------------------------------------------------- */
64
65 } // namespace
66
67 /// @brief Main task of the thread
68 void execute();
69
70 } // namespace NAV::FlowExecutor
71
72 /* -------------------------------------------------------------------------------------------------------- */
73 /* Function Definitions */
74 /* -------------------------------------------------------------------------------------------------------- */
75
76 223 bool NAV::FlowExecutor::isRunning() noexcept
77 {
78 223 std::scoped_lock<std::mutex> lk(_mutex);
79 446 return _state != State::Idle;
80 223 }
81
82 111 void NAV::FlowExecutor::start()
83 {
84 LOG_TRACE("called");
85
86 111 stop();
87
88 {
89
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 std::scoped_lock<std::mutex> lk(_mutex);
90 111 _state = State::Starting;
91 111 }
92
93
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 _thd = std::thread(execute);
94 111 }
95
96 111 void NAV::FlowExecutor::stop()
97 {
98 LOG_TRACE("called");
99
100
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 111 times.
111 if (isRunning())
101 {
102 {
103 std::scoped_lock<std::mutex> lk(_mutex);
104 if (_state == State::Running || _state == State::Starting)
105 {
106 _state = State::Stopping;
107 _cv.notify_all();
108 }
109 }
110
111 waitForFinish();
112 }
113
114
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 111 times.
111 if (_thd.joinable()) { _thd.join(); }
115 111 }
116
117 111 void NAV::FlowExecutor::waitForFinish()
118 {
119 LOG_TRACE("Waiting for finish of FlowExecutor...");
120 {
121
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 std::unique_lock lk(_mutex);
122
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
444 _cv.wait(lk, [] { return _state == State::Idle; });
123 111 }
124
125 {
126
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 std::scoped_lock<std::mutex> lk(_mutex);
127
2/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 111 times.
✗ Branch 5 not taken.
111 if (_thd.joinable()) { _thd.join(); }
128 111 }
129 LOG_TRACE("FlowExecutor finished.");
130 111 }
131
132 339 void NAV::FlowExecutor::deregisterNode([[maybe_unused]] const Node* node)
133 {
134
2/4
✓ Branch 2 taken 339 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 340 times.
✗ Branch 7 not taken.
339 LOG_DEBUG("Node {} finished.", node->nameId());
135 340 _activeNodes--;
136
137
2/2
✓ Branch 1 taken 111 times.
✓ Branch 2 taken 229 times.
340 if (_activeNodes == 0)
138 {
139
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 std::scoped_lock<std::mutex> lk(_mutex);
140 111 _state = State::Stopping;
141 111 _cv.notify_all();
142 111 }
143 340 }
144
145 111 void NAV::FlowExecutor::execute()
146 {
147 LOG_TRACE("called");
148
149 111 AntexReader::Get().reset();
150
151
3/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 344 times.
✓ Branch 9 taken 111 times.
455 for (Node* node : nm::m_Nodes())
152 {
153
2/2
✓ Branch 4 taken 271 times.
✓ Branch 5 taken 344 times.
615 for (auto& inputPin : node->inputPins)
154 {
155 271 inputPin.queue.clear();
156 271 inputPin.queueBlocked = false;
157 }
158 344 node->pollEvents.clear();
159 }
160
161
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 111 times.
111 if (!nm::InitializeAllNodes()) // This wakes the threads
162 {
163 std::scoped_lock<std::mutex> lk(_mutex);
164 _state = State::Idle;
165 _cv.notify_all();
166 return;
167 }
168
169
1/2
✓ Branch 4 taken 111 times.
✗ Branch 5 not taken.
222 LOG_INFO("=====================================================");
170 111 bool realTimeMode = std::any_of(nm::m_Nodes().begin(), nm::m_Nodes().end(), [](const Node* node) {
171
4/6
✓ Branch 0 taken 344 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 343 times.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 343 times.
344 return node && !node->isDisabled() && node->_onlyRealTime;
172 });
173
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 111 times.
✓ Branch 5 taken 111 times.
✗ Branch 6 not taken.
111 LOG_INFO("Executing in {} mode", realTimeMode ? "real-time" : "post-processing");
174
175
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 111 times.
111 util::time::SetMode(realTimeMode ? util::time::Mode::REAL_TIME : util::time::Mode::POST_PROCESSING);
176 111 _activeNodes = 0;
177
178
3/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 344 times.
✓ Branch 9 taken 111 times.
455 for (Node* node : nm::m_Nodes())
179 {
180
8/10
✓ Branch 0 taken 344 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 341 times.
✓ Branch 4 taken 3 times.
✓ Branch 6 taken 341 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 1 times.
✓ Branch 9 taken 340 times.
✓ Branch 10 taken 4 times.
✓ Branch 11 taken 340 times.
344 if (node == nullptr || node->kind == Node::Kind::GroupBox || !node->isInitialized()) { continue; }
181
182 {
183
1/2
✓ Branch 1 taken 340 times.
✗ Branch 2 not taken.
340 std::scoped_lock<std::mutex> lk(_mutex);
184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 340 times.
340 if (_state != State::Starting) { break; }
185 340 }
186
187
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 340 times.
340 node->_mode = realTimeMode ? Node::Mode::REAL_TIME : Node::Mode::POST_PROCESSING;
188
1/2
✓ Branch 0 taken 340 times.
✗ Branch 1 not taken.
340 if (!realTimeMode)
189 {
190 340 _activeNodes += 1;
191 LOG_TRACE("Putting node '{}' into post-processing mode and adding to active nodes.", node->nameId());
192 }
193 {
194
1/2
✓ Branch 1 taken 340 times.
✗ Branch 2 not taken.
340 std::scoped_lock<std::mutex> guard(node->_configWindowMutex);
195
1/2
✓ Branch 1 taken 340 times.
✗ Branch 2 not taken.
340 node->resetNode();
196 340 }
197
2/2
✓ Branch 1 taken 266 times.
✓ Branch 2 taken 340 times.
606 for (size_t i = 0; i < node->outputPins.size(); i++) // for (auto& outputPin : node->outputPins)
198 {
199 266 auto& outputPin = node->outputPins[i];
200
7/8
✓ Branch 1 taken 217 times.
✓ Branch 2 taken 49 times.
✓ Branch 4 taken 217 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 211 times.
✓ Branch 7 taken 6 times.
✓ Branch 8 taken 211 times.
✓ Branch 9 taken 55 times.
266 if (outputPin.type == Pin::Type::Flow && outputPin.isPinLinked())
201 {
202 211 outputPin.noMoreDataAvailable = false;
203 LOG_TRACE(" Setting pin '{}' to hasDataAvailable", outputPin.name);
204 }
205
206
2/2
✓ Branch 1 taken 110 times.
✓ Branch 2 taken 156 times.
266 if (std::holds_alternative<OutputPin::PollDataFunc>(outputPin.data))
207 {
208 LOG_TRACE(" Adding pin '{}' to data poll event list.", outputPin.name);
209
1/2
✓ Branch 4 taken 110 times.
✗ Branch 5 not taken.
110 node->pollEvents.insert(std::make_pair(InsTime(), std::make_pair(&outputPin, i)));
210 }
211
2/2
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 130 times.
156 else if (std::holds_alternative<OutputPin::PeekPollDataFunc>(outputPin.data))
212 {
213 26 if (auto* callback = std::get_if<OutputPin::PeekPollDataFunc>(&outputPin.data);
214
2/4
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 20 times.
✗ Branch 4 not taken.
46 !outputPin.noMoreDataAvailable && callback != nullptr && *callback != nullptr
215
6/8
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 6 times.
✓ Branch 3 taken 20 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 20 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 20 times.
✓ Branch 8 taken 6 times.
46 && std::ranges::any_of(outputPin.links, [](const OutputPin::OutgoingLink& link) {
216 20 return link.connectedNode->isInitialized();
217 }))
218 {
219
3/6
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
✓ Branch 3 taken 20 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 20 times.
✗ Branch 7 not taken.
20 if (auto obs = (node->**callback)(i, true)) // Peek the data
220 {
221 LOG_TRACE(" Adding pin '{}' to data poll event list with time {}.", outputPin.name, obs->insTime);
222
1/2
✓ Branch 4 taken 20 times.
✗ Branch 5 not taken.
20 node->pollEvents.insert(std::make_pair(obs->insTime, std::make_pair(&outputPin, i)));
223 20 }
224 }
225 }
226 }
227 }
228
229 {
230
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 std::scoped_lock<std::mutex> lk(_mutex);
231
1/2
✓ Branch 0 taken 111 times.
✗ Branch 1 not taken.
111 if (_state == State::Starting)
232 {
233
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 nm::EnableAllCallbacks();
234 111 _state = State::Running;
235 }
236 111 }
237
238 111 _startTime = std::chrono::steady_clock::now();
239
1/2
✓ Branch 4 taken 111 times.
✗ Branch 5 not taken.
222 LOG_INFO("Execution started");
240
241 111 bool anyNodeRunning = false;
242
3/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 344 times.
✓ Branch 9 taken 111 times.
455 for (Node* node : nm::m_Nodes()) // Search for node pins with data callbacks
243 {
244
8/10
✓ Branch 0 taken 344 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 341 times.
✓ Branch 4 taken 3 times.
✓ Branch 6 taken 341 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 340 times.
✓ Branch 9 taken 1 times.
✓ Branch 10 taken 340 times.
✓ Branch 11 taken 4 times.
344 if (node != nullptr && node->kind != Node::Kind::GroupBox && node->isInitialized())
245 {
246 340 anyNodeRunning = true;
247
3/6
✓ Branch 1 taken 340 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 340 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 340 times.
✗ Branch 9 not taken.
340 LOG_DEBUG("Waking up node {}", node->nameId());
248
1/2
✓ Branch 1 taken 340 times.
✗ Branch 2 not taken.
340 node->wakeWorker();
249 }
250 }
251
252
1/2
✓ Branch 0 taken 111 times.
✗ Branch 1 not taken.
111 if (anyNodeRunning)
253 {
254 // Wait for the nodes to finish
255 111 bool timeout = true;
256 111 auto timeoutDuration = std::chrono::minutes(1);
257
2/2
✓ Branch 0 taken 115 times.
✓ Branch 1 taken 111 times.
226 while (timeout)
258 {
259
1/2
✓ Branch 1 taken 115 times.
✗ Branch 2 not taken.
115 std::unique_lock lk(_mutex);
260
261
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 115 times.
115 if (realTimeMode)
262 {
263 _cv.wait(lk, [] { return _state == State::Stopping; });
264 break;
265 }
266
267
1/2
✓ Branch 1 taken 115 times.
✗ Branch 2 not taken.
345 timeout = !_cv.wait_for(lk, timeoutDuration, [] { return _state == State::Stopping; });
268
4/6
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 111 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 115 times.
115 if (timeout && _activeNodes == 0)
269 {
270 LOG_ERROR("FlowExecutor had a timeout, but all nodes finished already.");
271 #ifdef TESTING
272 FAIL("The FlowExecutor should not have a timeout when all nodes are finished already.");
273 #endif
274 break;
275 }
276 115 }
277 }
278
279 // Deinitialize
280
1/2
✓ Branch 4 taken 111 times.
✗ Branch 5 not taken.
222 LOG_DEBUG("Stopping FlowExecutor...");
281 111 nm::DisableAllCallbacks();
282 111 nm::ClearAllNodeQueues();
283
284
3/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 344 times.
✓ Branch 9 taken 111 times.
455 for (Node* node : nm::m_Nodes())
285 {
286
8/10
✓ Branch 0 taken 344 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 341 times.
✓ Branch 4 taken 3 times.
✓ Branch 6 taken 341 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 1 times.
✓ Branch 9 taken 340 times.
✓ Branch 10 taken 4 times.
✓ Branch 11 taken 340 times.
344 if (node == nullptr || node->kind == Node::Kind::GroupBox || !node->isInitialized()) { continue; }
287
288 340 node->_mode = Node::Mode::REAL_TIME;
289
2/2
✓ Branch 5 taken 266 times.
✓ Branch 6 taken 340 times.
606 for (auto& outputPin : node->outputPins)
290 {
291 266 outputPin.noMoreDataAvailable = true;
292 }
293
1/2
✓ Branch 1 taken 340 times.
✗ Branch 2 not taken.
340 node->flush();
294 }
295
296
3/10
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 111 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 111 times.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
333 if (!ConfigManager::Get<bool>("nogui")
297
13/34
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 111 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 111 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 111 times.
✗ Branch 10 not taken.
✓ Branch 11 taken 111 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 111 times.
✗ Branch 15 not taken.
✓ Branch 17 taken 111 times.
✗ Branch 18 not taken.
✓ Branch 19 taken 111 times.
✗ Branch 20 not taken.
✓ Branch 21 taken 111 times.
✗ Branch 22 not taken.
✓ Branch 24 taken 111 times.
✗ Branch 25 not taken.
✓ Branch 26 taken 111 times.
✗ Branch 27 not taken.
✓ Branch 29 taken 111 times.
✗ Branch 30 not taken.
✓ Branch 31 taken 111 times.
✗ Branch 32 not taken.
✗ Branch 33 not taken.
✗ Branch 34 not taken.
✗ Branch 36 not taken.
✗ Branch 37 not taken.
✗ Branch 38 not taken.
✗ Branch 39 not taken.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
666 || (!ConfigManager::Get<bool>("sigterm") && !ConfigManager::Get<size_t>("duration")))
298 {
299 111 auto finish = std::chrono::steady_clock::now();
300
2/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 111 times.
✗ Branch 5 not taken.
111 [[maybe_unused]] std::chrono::duration<double> elapsed = finish - _startTime;
301
2/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 6 taken 111 times.
✗ Branch 7 not taken.
111 LOG_INFO("Elapsed time: {} s", elapsed.count());
302 }
303
304 111 _activeNodes = 0;
305 LOG_TRACE("FlowExecutor deinitialized.");
306 {
307
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 std::scoped_lock<std::mutex> lk(_mutex);
308 111 _state = State::Idle;
309 111 _cv.notify_all();
310 111 }
311
312 LOG_TRACE("Execute thread finished.");
313 }
314