INSTINCT Code Coverage Report


Directory: src/
File: internal/FlowExecutor.cpp
Date: 2025-06-02 15:19:59
Exec Total Coverage
Lines: 120 128 93.8%
Functions: 11 11 100.0%
Branches: 145 256 56.6%

Line Branch Exec Source
1 // This file is part of INSTINCT, the INS Toolkit for Integrated
2 // Navigation Concepts and Training by the Institute of Navigation of
3 // the University of Stuttgart, Germany.
4 //
5 // This Source Code Form is subject to the terms of the Mozilla Public
6 // License, v. 2.0. If a copy of the MPL was not distributed with this
7 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
8
9 #include "FlowExecutor.hpp"
10
11 // <boost/asio.hpp> needs to be included before <winsock.h> (even though not used in this file)
12 // https://stackoverflow.com/questions/9750344/boostasio-winsock-and-winsock-2-compatibility-issue
13 #ifdef _WIN32
14 // Set the proper SDK version before including boost/Asio
15 #include <SDKDDKVer.h>
16 // Note boost/ASIO includes Windows.h.
17 #include <boost/asio.hpp>
18 #endif //_WIN32
19
20 #include "Navigation/GNSS/Positioning/AntexReader.hpp"
21 #include "util/Logger.hpp"
22 #include "Navigation/Time/InsTime.hpp"
23
24 #include "internal/Node/Node.hpp"
25
26 #include "internal/NodeManager.hpp"
27 namespace nm = NAV::NodeManager;
28 #include "internal/ConfigManager.hpp"
29 #include "util/Time/TimeBase.hpp"
30
31 #include <chrono>
32 #include <map>
33 #include <variant>
34 #include <memory>
35
36 #include <thread>
37 #include <atomic>
38 #include <mutex>
39 #include <condition_variable>
40
41 #ifdef TESTING
42 #include <catch2/catch_test_macros.hpp>
43 #endif
44
45 /* -------------------------------------------------------------------------------------------------------- */
46 /* Private Members */
47 /* -------------------------------------------------------------------------------------------------------- */
48
49 namespace NAV::FlowExecutor
50 {
51 namespace
52 {
53
54 std::mutex _mutex;
55 std::condition_variable _cv;
56
57 enum class State : uint8_t
58 {
59 Idle,
60 Starting,
61 Running,
62 Stopping,
63 };
64 State _state = State::Idle;
65
66 std::thread _thd;
67 std::atomic<size_t> _activeNodes{ 0 };
68 std::chrono::time_point<std::chrono::steady_clock> _startTime;
69
70 /* -------------------------------------------------------------------------------------------------------- */
71 /* Private Function Declarations */
72 /* -------------------------------------------------------------------------------------------------------- */
73
74 } // namespace
75
76 /// @brief Main task of the thread
77 void execute();
78
79 } // namespace NAV::FlowExecutor
80
81 /* -------------------------------------------------------------------------------------------------------- */
82 /* Function Definitions */
83 /* -------------------------------------------------------------------------------------------------------- */
84
85 224 bool NAV::FlowExecutor::isRunning() noexcept
86 {
87 224 std::scoped_lock<std::mutex> lk(_mutex);
88 448 return _state != State::Idle;
89 224 }
90
91 111 void NAV::FlowExecutor::start()
92 {
93 LOG_TRACE("called");
94
95 111 stop();
96
97 {
98
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 std::scoped_lock<std::mutex> lk(_mutex);
99 111 _state = State::Starting;
100 111 }
101
102
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 _thd = std::thread(execute);
103 111 }
104
105 112 void NAV::FlowExecutor::stop()
106 {
107 LOG_TRACE("called");
108
109
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 111 times.
112 if (isRunning())
110 {
111 {
112
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 std::scoped_lock<std::mutex> lk(_mutex);
113
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1 if (_state == State::Running || _state == State::Starting)
114 {
115 1 _state = State::Stopping;
116 1 _cv.notify_all();
117 }
118 1 }
119
120 1 waitForFinish();
121 }
122
123
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 112 times.
112 if (_thd.joinable()) { _thd.join(); }
124 112 }
125
126 111 void NAV::FlowExecutor::waitForFinish()
127 {
128 LOG_TRACE("Waiting for finish of FlowExecutor...");
129 {
130
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 std::unique_lock lk(_mutex);
131
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
443 _cv.wait(lk, [] { return _state == State::Idle; });
132 111 }
133
134 {
135
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 std::scoped_lock<std::mutex> lk(_mutex);
136
2/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 111 times.
✗ Branch 5 not taken.
111 if (_thd.joinable()) { _thd.join(); }
137 111 }
138 LOG_TRACE("FlowExecutor finished.");
139 111 }
140
141 332 void NAV::FlowExecutor::deregisterNode([[maybe_unused]] const Node* node)
142 {
143
2/4
✓ Branch 2 taken 331 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 332 times.
✗ Branch 7 not taken.
332 LOG_DEBUG("Node {} finished.", node->nameId());
144 332 _activeNodes--;
145
146
2/2
✓ Branch 1 taken 110 times.
✓ Branch 2 taken 222 times.
332 if (_activeNodes == 0)
147 {
148
1/2
✓ Branch 1 taken 110 times.
✗ Branch 2 not taken.
110 std::scoped_lock<std::mutex> lk(_mutex);
149 110 _state = State::Stopping;
150 110 _cv.notify_all();
151 110 }
152 332 }
153
154 111 void NAV::FlowExecutor::execute()
155 {
156 LOG_TRACE("called");
157
158 111 AntexReader::Get().reset();
159
160
3/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 338 times.
✓ Branch 9 taken 111 times.
449 for (Node* node : nm::m_Nodes())
161 {
162
2/2
✓ Branch 4 taken 282 times.
✓ Branch 5 taken 338 times.
620 for (auto& inputPin : node->inputPins)
163 {
164 282 inputPin.queue.clear();
165 282 inputPin.queueBlocked = false;
166 }
167 338 node->pollEvents.clear();
168 }
169
170
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 111 times.
111 if (!nm::InitializeAllNodes()) // This wakes the threads
171 {
172 std::scoped_lock<std::mutex> lk(_mutex);
173 _state = State::Idle;
174 _cv.notify_all();
175 return;
176 }
177
178
1/2
✓ Branch 4 taken 111 times.
✗ Branch 5 not taken.
222 LOG_INFO("=====================================================");
179 111 bool realTimeMode = std::any_of(nm::m_Nodes().begin(), nm::m_Nodes().end(), [](const Node* node) {
180
5/6
✓ Branch 0 taken 335 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 334 times.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 333 times.
335 return node && !node->isDisabled() && node->_onlyRealTime;
181 });
182
3/4
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 110 times.
✓ Branch 5 taken 111 times.
✗ Branch 6 not taken.
111 LOG_INFO("Executing in {} mode", realTimeMode ? "real-time" : "post-processing");
183
184
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 110 times.
111 util::time::SetMode(realTimeMode ? util::time::Mode::REAL_TIME : util::time::Mode::POST_PROCESSING);
185 111 _activeNodes = 0;
186
187
3/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 338 times.
✓ Branch 9 taken 111 times.
449 for (Node* node : nm::m_Nodes())
188 {
189
7/10
✓ Branch 0 taken 338 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 338 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 338 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 1 times.
✓ Branch 9 taken 337 times.
✓ Branch 10 taken 1 times.
✓ Branch 11 taken 337 times.
338 if (node == nullptr || node->kind == Node::Kind::GroupBox || !node->isInitialized()) { continue; }
190
191 {
192
1/2
✓ Branch 1 taken 337 times.
✗ Branch 2 not taken.
337 std::scoped_lock<std::mutex> lk(_mutex);
193
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 337 times.
337 if (_state != State::Starting) { break; }
194 337 }
195
196
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 331 times.
337 node->_mode = realTimeMode ? Node::Mode::REAL_TIME : Node::Mode::POST_PROCESSING;
197
2/2
✓ Branch 0 taken 331 times.
✓ Branch 1 taken 6 times.
337 if (!realTimeMode)
198 {
199 331 _activeNodes += 1;
200 LOG_TRACE("Putting node '{}' into post-processing mode and adding to active nodes.", node->nameId());
201 }
202 {
203
1/2
✓ Branch 1 taken 337 times.
✗ Branch 2 not taken.
337 std::scoped_lock<std::mutex> guard(node->_configWindowMutex);
204
1/2
✓ Branch 1 taken 337 times.
✗ Branch 2 not taken.
337 node->resetNode();
205 337 }
206
2/2
✓ Branch 1 taken 280 times.
✓ Branch 2 taken 337 times.
617 for (size_t i = 0; i < node->outputPins.size(); i++) // for (auto& outputPin : node->outputPins)
207 {
208 280 auto& outputPin = node->outputPins[i];
209
7/8
✓ Branch 1 taken 215 times.
✓ Branch 2 taken 65 times.
✓ Branch 4 taken 215 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 206 times.
✓ Branch 7 taken 9 times.
✓ Branch 8 taken 206 times.
✓ Branch 9 taken 74 times.
280 if (outputPin.type == Pin::Type::Flow && outputPin.isPinLinked())
210 {
211 206 outputPin.noMoreDataAvailable = false;
212 LOG_TRACE(" Setting pin '{}' to hasDataAvailable", outputPin.name);
213 }
214
215
2/2
✓ Branch 1 taken 110 times.
✓ Branch 2 taken 170 times.
280 if (std::holds_alternative<OutputPin::PollDataFunc>(outputPin.data))
216 {
217 LOG_TRACE(" Adding pin '{}' to data poll event list.", outputPin.name);
218
1/2
✓ Branch 4 taken 110 times.
✗ Branch 5 not taken.
110 node->pollEvents.insert(std::make_pair(InsTime(), std::make_pair(&outputPin, i)));
219 }
220
2/2
✓ Branch 1 taken 24 times.
✓ Branch 2 taken 146 times.
170 else if (std::holds_alternative<OutputPin::PeekPollDataFunc>(outputPin.data))
221 {
222 24 if (auto* callback = std::get_if<OutputPin::PeekPollDataFunc>(&outputPin.data);
223
2/4
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 18 times.
✗ Branch 4 not taken.
42 !outputPin.noMoreDataAvailable && callback != nullptr && *callback != nullptr
224
6/8
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 6 times.
✓ Branch 3 taken 18 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 18 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 18 times.
✓ Branch 8 taken 6 times.
42 && std::ranges::any_of(outputPin.links, [](const OutputPin::OutgoingLink& link) {
225 18 return link.connectedNode->isInitialized();
226 }))
227 {
228
3/6
✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
✓ Branch 3 taken 18 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 18 times.
✗ Branch 7 not taken.
18 if (auto obs = (node->**callback)(i, true)) // Peek the data
229 {
230 LOG_TRACE(" Adding pin '{}' to data poll event list with time {}.", outputPin.name, obs->insTime);
231
1/2
✓ Branch 4 taken 18 times.
✗ Branch 5 not taken.
18 node->pollEvents.insert(std::make_pair(obs->insTime, std::make_pair(&outputPin, i)));
232 18 }
233 }
234 }
235 }
236 }
237
238 {
239
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 std::scoped_lock<std::mutex> lk(_mutex);
240
1/2
✓ Branch 0 taken 111 times.
✗ Branch 1 not taken.
111 if (_state == State::Starting)
241 {
242
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 nm::EnableAllCallbacks();
243 111 _state = State::Running;
244 }
245 111 }
246
247 111 _startTime = std::chrono::steady_clock::now();
248
1/2
✓ Branch 4 taken 111 times.
✗ Branch 5 not taken.
222 LOG_INFO("Execution started");
249
250 111 bool anyNodeRunning = false;
251
3/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 338 times.
✓ Branch 9 taken 111 times.
449 for (Node* node : nm::m_Nodes()) // Search for node pins with data callbacks
252 {
253
7/10
✓ Branch 0 taken 338 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 338 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 338 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 337 times.
✓ Branch 9 taken 1 times.
✓ Branch 10 taken 337 times.
✓ Branch 11 taken 1 times.
338 if (node != nullptr && node->kind != Node::Kind::GroupBox && node->isInitialized())
254 {
255 337 anyNodeRunning = true;
256
3/6
✓ Branch 1 taken 337 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 337 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 337 times.
✗ Branch 9 not taken.
337 LOG_DEBUG("Waking up node {}", node->nameId());
257
1/2
✓ Branch 1 taken 337 times.
✗ Branch 2 not taken.
337 node->wakeWorker();
258 }
259 }
260
261
1/2
✓ Branch 0 taken 111 times.
✗ Branch 1 not taken.
111 if (anyNodeRunning)
262 {
263 // Wait for the nodes to finish
264 111 bool timeout = true;
265 111 auto timeoutDuration = std::chrono::minutes(1);
266
2/2
✓ Branch 0 taken 115 times.
✓ Branch 1 taken 110 times.
225 while (timeout)
267 {
268
1/2
✓ Branch 1 taken 115 times.
✗ Branch 2 not taken.
115 std::unique_lock lk(_mutex);
269
270
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 114 times.
115 if (realTimeMode)
271 {
272
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
3 _cv.wait(lk, [] { return _state == State::Stopping; });
273 1 break;
274 }
275
276
1/2
✓ Branch 1 taken 114 times.
✗ Branch 2 not taken.
342 timeout = !_cv.wait_for(lk, timeoutDuration, [] { return _state == State::Stopping; });
277
4/6
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 110 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 114 times.
114 if (timeout && _activeNodes == 0)
278 {
279 LOG_ERROR("FlowExecutor had a timeout, but all nodes finished already.");
280 #ifdef TESTING
281 FAIL("The FlowExecutor should not have a timeout when all nodes are finished already.");
282 #endif
283 break;
284 }
285 115 }
286 }
287
288 // Deinitialize
289
1/2
✓ Branch 4 taken 111 times.
✗ Branch 5 not taken.
222 LOG_DEBUG("Stopping FlowExecutor...");
290 111 nm::DisableAllCallbacks();
291 111 nm::ClearAllNodeQueues();
292
293
3/4
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 338 times.
✓ Branch 9 taken 111 times.
449 for (Node* node : nm::m_Nodes())
294 {
295
7/10
✓ Branch 0 taken 338 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 338 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 338 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 1 times.
✓ Branch 9 taken 337 times.
✓ Branch 10 taken 1 times.
✓ Branch 11 taken 337 times.
338 if (node == nullptr || node->kind == Node::Kind::GroupBox || !node->isInitialized()) { continue; }
296
297 337 node->_mode = Node::Mode::REAL_TIME;
298
2/2
✓ Branch 5 taken 280 times.
✓ Branch 6 taken 337 times.
617 for (auto& outputPin : node->outputPins)
299 {
300 280 outputPin.noMoreDataAvailable = true;
301 }
302
1/2
✓ Branch 1 taken 337 times.
✗ Branch 2 not taken.
337 node->flush();
303 }
304
305
4/10
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 111 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 110 times.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
333 if (!ConfigManager::Get<bool>("nogui")
306
14/34
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 111 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 111 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 111 times.
✗ Branch 10 not taken.
✓ Branch 11 taken 111 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 111 times.
✗ Branch 15 not taken.
✓ Branch 17 taken 111 times.
✗ Branch 18 not taken.
✓ Branch 19 taken 110 times.
✓ Branch 20 taken 1 times.
✓ Branch 21 taken 111 times.
✗ Branch 22 not taken.
✓ Branch 24 taken 111 times.
✗ Branch 25 not taken.
✓ Branch 26 taken 111 times.
✗ Branch 27 not taken.
✓ Branch 29 taken 111 times.
✗ Branch 30 not taken.
✓ Branch 31 taken 111 times.
✗ Branch 32 not taken.
✗ Branch 33 not taken.
✗ Branch 34 not taken.
✗ Branch 36 not taken.
✗ Branch 37 not taken.
✗ Branch 38 not taken.
✗ Branch 39 not taken.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
666 || (!ConfigManager::Get<bool>("sigterm") && !ConfigManager::Get<size_t>("duration")))
307 {
308 110 auto finish = std::chrono::steady_clock::now();
309
2/4
✓ Branch 1 taken 110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 110 times.
✗ Branch 5 not taken.
110 [[maybe_unused]] std::chrono::duration<double> elapsed = finish - _startTime;
310
2/4
✓ Branch 1 taken 110 times.
✗ Branch 2 not taken.
✓ Branch 6 taken 110 times.
✗ Branch 7 not taken.
110 LOG_INFO("Elapsed time: {} s", elapsed.count());
311 }
312
313 111 _activeNodes = 0;
314 LOG_TRACE("FlowExecutor deinitialized.");
315 {
316
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 std::scoped_lock<std::mutex> lk(_mutex);
317 111 _state = State::Idle;
318 111 _cv.notify_all();
319 111 }
320
321 LOG_TRACE("Execute thread finished.");
322 }
323