INSTINCT Code Coverage Report


Directory: src/
File: internal/FlowExecutor.cpp
Date: 2025-11-25 23:34:18
Exec Total Coverage
Lines: 120 128 93.8%
Functions: 11 11 100.0%
Branches: 148 256 57.8%

Line Branch Exec Source
1 // This file is part of INSTINCT, the INS Toolkit for Integrated
2 // Navigation Concepts and Training by the Institute of Navigation of
3 // the University of Stuttgart, Germany.
4 //
5 // This Source Code Form is subject to the terms of the Mozilla Public
6 // License, v. 2.0. If a copy of the MPL was not distributed with this
7 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
8
9 #include "FlowExecutor.hpp"
10
11 // <boost/asio.hpp> needs to be included before <winsock.h> (even though not used in this file)
12 // https://stackoverflow.com/questions/9750344/boostasio-winsock-and-winsock-2-compatibility-issue
13 #ifdef _WIN32
14 // Set the proper SDK version before including boost/Asio
15 #include <SDKDDKVer.h>
16 // Note boost/ASIO includes Windows.h.
17 #include <boost/asio.hpp>
18 #endif //_WIN32
19
20 #include "Navigation/GNSS/Positioning/AntexReader.hpp"
21 #include "util/Logger.hpp"
22 #include "Navigation/Time/InsTime.hpp"
23
24 #include "internal/Node/Node.hpp"
25
26 #include "internal/ConfigManager.hpp"
27 #include "util/Time/TimeBase.hpp"
28
29 #include <chrono>
30 #include <map>
31 #include <variant>
32 #include <memory>
33
34 #include <thread>
35 #include <atomic>
36 #include <mutex>
37 #include <condition_variable>
38
39 #ifdef TESTING
40 #include <catch2/catch_test_macros.hpp>
41 #endif
42
43 /* -------------------------------------------------------------------------------------------------------- */
44 /* Private Members */
45 /* -------------------------------------------------------------------------------------------------------- */
46
47 namespace NAV::FlowExecutor
48 {
49 namespace
50 {
51
52 std::mutex _mutex;
53 std::condition_variable _cv;
54
55 enum class State : uint8_t
56 {
57 Idle,
58 Starting,
59 Running,
60 Stopping,
61 };
62 State _state = State::Idle;
63
64 std::thread _thd;
65 std::atomic<size_t> _activeNodes{ 0 };
66 std::chrono::time_point<std::chrono::steady_clock> _startTime;
67
68 /* -------------------------------------------------------------------------------------------------------- */
69 /* Private Function Declarations */
70 /* -------------------------------------------------------------------------------------------------------- */
71
72 } // namespace
73
74 /// @brief Main task of the thread
75 void execute();
76
77 } // namespace NAV::FlowExecutor
78
79 /* -------------------------------------------------------------------------------------------------------- */
80 /* Function Definitions */
81 /* -------------------------------------------------------------------------------------------------------- */
82
83 228 bool NAV::FlowExecutor::isRunning() noexcept
84 {
85 228 std::scoped_lock<std::mutex> lk(_mutex);
86 456 return _state != State::Idle;
87 228 }
88
89 113 void NAV::FlowExecutor::start()
90 {
91 LOG_TRACE("called");
92
93 113 stop();
94
95 {
96
1/2
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
113 std::scoped_lock<std::mutex> lk(_mutex);
97 113 _state = State::Starting;
98 113 }
99
100
1/2
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
113 _thd = std::thread(execute);
101 113 }
102
103 114 void NAV::FlowExecutor::stop()
104 {
105 LOG_TRACE("called");
106
107
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 113 times.
114 if (isRunning())
108 {
109 {
110
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 std::scoped_lock<std::mutex> lk(_mutex);
111
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1 if (_state == State::Running || _state == State::Starting)
112 {
113 1 _state = State::Stopping;
114 1 _cv.notify_all();
115 }
116 1 }
117
118 1 waitForFinish();
119 }
120
121
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 114 times.
114 if (_thd.joinable()) { _thd.join(); }
122 114 }
123
124 113 void NAV::FlowExecutor::waitForFinish()
125 {
126 LOG_TRACE("Waiting for finish of FlowExecutor...");
127 {
128
1/2
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
113 std::unique_lock lk(_mutex);
129
1/2
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
451 _cv.wait(lk, [] { return _state == State::Idle; });
130 113 }
131
132 {
133
1/2
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
113 std::scoped_lock<std::mutex> lk(_mutex);
134
2/4
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 113 times.
✗ Branch 5 not taken.
113 if (_thd.joinable()) { _thd.join(); }
135 113 }
136 LOG_TRACE("FlowExecutor finished.");
137 113 }
138
139 345 void NAV::FlowExecutor::deregisterNode([[maybe_unused]] const Node* node)
140 {
141
2/4
✓ Branch 2 taken 345 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 345 times.
✗ Branch 7 not taken.
345 LOG_DEBUG("Node {} finished.", node->nameId());
142 345 _activeNodes--;
143
144
2/2
✓ Branch 1 taken 112 times.
✓ Branch 2 taken 233 times.
345 if (_activeNodes == 0)
145 {
146
1/2
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
112 std::scoped_lock<std::mutex> lk(_mutex);
147 112 _state = State::Stopping;
148 112 _cv.notify_all();
149 112 }
150 345 }
151
152 113 void NAV::FlowExecutor::execute()
153 {
154 LOG_TRACE("called");
155
156 113 AntexReader::Get().reset();
157
158
3/4
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 355 times.
✓ Branch 9 taken 113 times.
468 for (Node* node : flow::m_Nodes())
159 {
160
2/2
✓ Branch 4 taken 301 times.
✓ Branch 5 taken 355 times.
656 for (auto& inputPin : node->inputPins)
161 {
162 301 inputPin.queue.clear();
163 301 inputPin.queueBlocked = false;
164 }
165 355 node->pollEvents.clear();
166 }
167
168
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 113 times.
113 if (!flow::InitializeAllNodes()) // This wakes the threads
169 {
170 std::scoped_lock<std::mutex> lk(_mutex);
171 _state = State::Idle;
172 _cv.notify_all();
173 return;
174 }
175
176
1/2
✓ Branch 4 taken 113 times.
✗ Branch 5 not taken.
226 LOG_INFO("=====================================================");
177 113 bool realTimeMode = std::any_of(flow::m_Nodes().begin(), flow::m_Nodes().end(), [](const Node* node) {
178
5/6
✓ Branch 0 taken 352 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 351 times.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 350 times.
352 return node && !node->isDisabled() && node->_onlyRealTime;
179 });
180
3/4
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 112 times.
✓ Branch 5 taken 113 times.
✗ Branch 6 not taken.
113 LOG_INFO("Executing in {} mode", realTimeMode ? "real-time" : "post-processing");
181
182
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 112 times.
113 util::time::SetMode(realTimeMode ? util::time::Mode::REAL_TIME : util::time::Mode::POST_PROCESSING);
183 113 _activeNodes = 0;
184
185
3/4
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 355 times.
✓ Branch 9 taken 113 times.
468 for (Node* node : flow::m_Nodes())
186 {
187
8/10
✓ Branch 0 taken 355 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 351 times.
✓ Branch 4 taken 4 times.
✓ Branch 6 taken 351 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 1 times.
✓ Branch 9 taken 350 times.
✓ Branch 10 taken 5 times.
✓ Branch 11 taken 350 times.
355 if (node == nullptr || node->kind == Node::Kind::GroupBox || !node->isInitialized()) { continue; }
188
189 {
190
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 std::scoped_lock<std::mutex> lk(_mutex);
191
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 350 times.
350 if (_state != State::Starting) { break; }
192 350 }
193
194
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 344 times.
350 node->_mode = realTimeMode ? Node::Mode::REAL_TIME : Node::Mode::POST_PROCESSING;
195
2/2
✓ Branch 0 taken 344 times.
✓ Branch 1 taken 6 times.
350 if (!realTimeMode)
196 {
197 344 _activeNodes += 1;
198 LOG_TRACE("Putting node '{}' into post-processing mode and adding to active nodes.", node->nameId());
199 }
200 {
201
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 std::scoped_lock<std::mutex> guard(node->_configWindowMutex);
202
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 node->resetNode();
203 350 }
204
2/2
✓ Branch 1 taken 292 times.
✓ Branch 2 taken 350 times.
642 for (size_t i = 0; i < node->outputPins.size(); i++) // for (auto& outputPin : node->outputPins)
205 {
206 292 auto& outputPin = node->outputPins[i];
207
7/8
✓ Branch 1 taken 224 times.
✓ Branch 2 taken 68 times.
✓ Branch 4 taken 224 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 215 times.
✓ Branch 7 taken 9 times.
✓ Branch 8 taken 215 times.
✓ Branch 9 taken 77 times.
292 if (outputPin.type == Pin::Type::Flow && outputPin.isPinLinked())
208 {
209 215 outputPin.noMoreDataAvailable = false;
210 LOG_TRACE(" Setting pin '{}' to hasDataAvailable", outputPin.name);
211 }
212
213
2/2
✓ Branch 1 taken 116 times.
✓ Branch 2 taken 176 times.
292 if (std::holds_alternative<OutputPin::PollDataFunc>(outputPin.data))
214 {
215 LOG_TRACE(" Adding pin '{}' to data poll event list.", outputPin.name);
216
1/2
✓ Branch 4 taken 116 times.
✗ Branch 5 not taken.
116 node->pollEvents.insert(std::make_pair(InsTime(), std::make_pair(&outputPin, i)));
217 }
218
2/2
✓ Branch 1 taken 24 times.
✓ Branch 2 taken 152 times.
176 else if (std::holds_alternative<OutputPin::PeekPollDataFunc>(outputPin.data))
219 {
220 24 if (auto* callback = std::get_if<OutputPin::PeekPollDataFunc>(&outputPin.data);
221
2/4
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 18 times.
✗ Branch 4 not taken.
42 !outputPin.noMoreDataAvailable && callback != nullptr && *callback != nullptr
222
6/8
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 6 times.
✓ Branch 3 taken 18 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 18 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 18 times.
✓ Branch 8 taken 6 times.
42 && std::ranges::any_of(outputPin.links, [](const OutputPin::OutgoingLink& link) {
223 18 return link.connectedNode->isInitialized();
224 }))
225 {
226
3/6
✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
✓ Branch 3 taken 18 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 18 times.
✗ Branch 7 not taken.
18 if (auto obs = (node->**callback)(i, true)) // Peek the data
227 {
228 LOG_TRACE(" Adding pin '{}' to data poll event list with time {}.", outputPin.name, obs->insTime);
229
1/2
✓ Branch 4 taken 18 times.
✗ Branch 5 not taken.
18 node->pollEvents.insert(std::make_pair(obs->insTime, std::make_pair(&outputPin, i)));
230 18 }
231 }
232 }
233 }
234 }
235
236 {
237
1/2
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
113 std::scoped_lock<std::mutex> lk(_mutex);
238
1/2
✓ Branch 0 taken 113 times.
✗ Branch 1 not taken.
113 if (_state == State::Starting)
239 {
240
1/2
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
113 flow::EnableAllCallbacks();
241 113 _state = State::Running;
242 }
243 113 }
244
245 113 _startTime = std::chrono::steady_clock::now();
246
1/2
✓ Branch 4 taken 113 times.
✗ Branch 5 not taken.
226 LOG_INFO("Execution started");
247
248 113 bool anyNodeRunning = false;
249
3/4
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 355 times.
✓ Branch 9 taken 113 times.
468 for (Node* node : flow::m_Nodes()) // Search for node pins with data callbacks
250 {
251
8/10
✓ Branch 0 taken 355 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 351 times.
✓ Branch 4 taken 4 times.
✓ Branch 6 taken 351 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 350 times.
✓ Branch 9 taken 1 times.
✓ Branch 10 taken 350 times.
✓ Branch 11 taken 5 times.
355 if (node != nullptr && node->kind != Node::Kind::GroupBox && node->isInitialized())
252 {
253 350 anyNodeRunning = true;
254
3/6
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 350 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 350 times.
✗ Branch 9 not taken.
350 LOG_DEBUG("Waking up node {}", node->nameId());
255
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 node->wakeWorker();
256 }
257 }
258
259
1/2
✓ Branch 0 taken 113 times.
✗ Branch 1 not taken.
113 if (anyNodeRunning)
260 {
261 // Wait for the nodes to finish
262 113 bool timeout = true;
263 113 auto timeoutDuration = std::chrono::minutes(1);
264
2/2
✓ Branch 0 taken 119 times.
✓ Branch 1 taken 112 times.
231 while (timeout)
265 {
266
1/2
✓ Branch 1 taken 119 times.
✗ Branch 2 not taken.
119 std::unique_lock lk(_mutex);
267
268
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 118 times.
119 if (realTimeMode)
269 {
270
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
3 _cv.wait(lk, [] { return _state == State::Stopping; });
271 1 break;
272 }
273
274
1/2
✓ Branch 1 taken 118 times.
✗ Branch 2 not taken.
354 timeout = !_cv.wait_for(lk, timeoutDuration, [] { return _state == State::Stopping; });
275
4/6
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 112 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 6 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 118 times.
118 if (timeout && _activeNodes == 0)
276 {
277 LOG_ERROR("FlowExecutor had a timeout, but all nodes finished already.");
278 #ifdef TESTING
279 FAIL("The FlowExecutor should not have a timeout when all nodes are finished already.");
280 #endif
281 break;
282 }
283 119 }
284 }
285
286 // Deinitialize
287
1/2
✓ Branch 4 taken 113 times.
✗ Branch 5 not taken.
226 LOG_DEBUG("Stopping FlowExecutor...");
288 113 flow::DisableAllCallbacks();
289 113 flow::ClearAllNodeQueues();
290
291
3/4
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 355 times.
✓ Branch 9 taken 113 times.
468 for (Node* node : flow::m_Nodes())
292 {
293
8/10
✓ Branch 0 taken 355 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 351 times.
✓ Branch 4 taken 4 times.
✓ Branch 6 taken 351 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 1 times.
✓ Branch 9 taken 350 times.
✓ Branch 10 taken 5 times.
✓ Branch 11 taken 350 times.
355 if (node == nullptr || node->kind == Node::Kind::GroupBox || !node->isInitialized()) { continue; }
294
295 350 node->_mode = Node::Mode::REAL_TIME;
296
2/2
✓ Branch 5 taken 292 times.
✓ Branch 6 taken 350 times.
642 for (auto& outputPin : node->outputPins)
297 {
298 292 outputPin.noMoreDataAvailable = true;
299 }
300
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 node->flush();
301 }
302
303
4/10
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 113 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 112 times.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
339 if (!ConfigManager::Get<bool>("nogui")
304
14/34
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 113 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 113 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 113 times.
✗ Branch 10 not taken.
✓ Branch 11 taken 113 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 113 times.
✗ Branch 15 not taken.
✓ Branch 17 taken 113 times.
✗ Branch 18 not taken.
✓ Branch 19 taken 112 times.
✓ Branch 20 taken 1 times.
✓ Branch 21 taken 113 times.
✗ Branch 22 not taken.
✓ Branch 24 taken 113 times.
✗ Branch 25 not taken.
✓ Branch 26 taken 113 times.
✗ Branch 27 not taken.
✓ Branch 29 taken 113 times.
✗ Branch 30 not taken.
✓ Branch 31 taken 113 times.
✗ Branch 32 not taken.
✗ Branch 33 not taken.
✗ Branch 34 not taken.
✗ Branch 36 not taken.
✗ Branch 37 not taken.
✗ Branch 38 not taken.
✗ Branch 39 not taken.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
678 || (!ConfigManager::Get<bool>("sigterm") && !ConfigManager::Get<size_t>("duration")))
305 {
306 112 auto finish = std::chrono::steady_clock::now();
307
2/4
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 112 times.
✗ Branch 5 not taken.
112 [[maybe_unused]] std::chrono::duration<double> elapsed = finish - _startTime;
308
2/4
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
✓ Branch 6 taken 112 times.
✗ Branch 7 not taken.
112 LOG_INFO("Elapsed time: {:.2f} s", elapsed.count());
309 }
310
311 113 _activeNodes = 0;
312 LOG_TRACE("FlowExecutor deinitialized.");
313 {
314
1/2
✓ Branch 1 taken 113 times.
✗ Branch 2 not taken.
113 std::scoped_lock<std::mutex> lk(_mutex);
315 113 _state = State::Idle;
316 113 _cv.notify_all();
317 113 }
318
319 LOG_TRACE("Execute thread finished.");
320 }
321