0.5.1
Loading...
Searching...
No Matches
FlowExecutor.cpp
Go to the documentation of this file.
1// This file is part of INSTINCT, the INS Toolkit for Integrated
2// Navigation Concepts and Training by the Institute of Navigation of
3// the University of Stuttgart, Germany.
4//
5// This Source Code Form is subject to the terms of the Mozilla Public
6// License, v. 2.0. If a copy of the MPL was not distributed with this
7// file, You can obtain one at https://mozilla.org/MPL/2.0/.
8
9#include "FlowExecutor.hpp"
10
11// <boost/asio.hpp> needs to be included before <winsock.h> (even though not used in this file)
12// https://stackoverflow.com/questions/9750344/boostasio-winsock-and-winsock-2-compatibility-issue
13#ifdef _WIN32
14 // Set the proper SDK version before including boost/Asio
15 #include <SDKDDKVer.h>
16 // Note boost/ASIO includes Windows.h.
17 #include <boost/asio.hpp>
18#endif //_WIN32
19
21#include "util/Logger.hpp"
23
25
28
29#include <chrono>
30#include <map>
31#include <variant>
32#include <memory>
33
34#include <thread>
35#include <atomic>
36#include <mutex>
37#include <condition_variable>
38
39#ifdef TESTING
40 #include <catch2/catch_test_macros.hpp>
41#endif
42
43/* -------------------------------------------------------------------------------------------------------- */
44/* Private Members */
45/* -------------------------------------------------------------------------------------------------------- */
46
48{
49namespace
50{
51
52std::mutex _mutex;
53std::condition_variable _cv;
54
55enum class State : uint8_t
56{
57 Idle,
58 Starting,
59 Running,
60 Stopping,
61};
62State _state = State::Idle;
63
64std::thread _thd;
65std::atomic<size_t> _activeNodes{ 0 };
66std::chrono::time_point<std::chrono::steady_clock> _startTime;
67
68/* -------------------------------------------------------------------------------------------------------- */
69/* Private Function Declarations */
70/* -------------------------------------------------------------------------------------------------------- */
71
72} // namespace
73
74/// @brief Main task of the thread
75void execute();
76
77} // namespace NAV::FlowExecutor
78
79/* -------------------------------------------------------------------------------------------------------- */
80/* Function Definitions */
81/* -------------------------------------------------------------------------------------------------------- */
82
84{
85 std::scoped_lock<std::mutex> lk(_mutex);
86 return _state != State::Idle;
87}
88
90{
91 LOG_TRACE("called");
92
93 stop();
94
95 {
96 std::scoped_lock<std::mutex> lk(_mutex);
97 _state = State::Starting;
98 }
99
100 _thd = std::thread(execute);
101}
102
104{
105 LOG_TRACE("called");
106
107 if (isRunning())
108 {
109 {
110 std::scoped_lock<std::mutex> lk(_mutex);
111 if (_state == State::Running || _state == State::Starting)
112 {
113 _state = State::Stopping;
114 _cv.notify_all();
115 }
116 }
117
119 }
120
121 if (_thd.joinable()) { _thd.join(); }
122}
123
125{
126 LOG_TRACE("Waiting for finish of FlowExecutor...");
127 {
128 std::unique_lock lk(_mutex);
129 _cv.wait(lk, [] { return _state == State::Idle; });
130 }
131
132 {
133 std::scoped_lock<std::mutex> lk(_mutex);
134 if (_thd.joinable()) { _thd.join(); }
135 }
136 LOG_TRACE("FlowExecutor finished.");
137}
138
139void NAV::FlowExecutor::deregisterNode([[maybe_unused]] const Node* node)
140{
141 LOG_DEBUG("Node {} finished.", node->nameId());
142 _activeNodes--;
143
144 if (_activeNodes == 0)
145 {
146 std::scoped_lock<std::mutex> lk(_mutex);
147 _state = State::Stopping;
148 _cv.notify_all();
149 }
150}
151
153{
154 LOG_TRACE("called");
155
156 AntexReader::Get().reset();
157
158 for (Node* node : flow::m_Nodes())
159 {
160 for (auto& inputPin : node->inputPins)
161 {
162 inputPin.queue.clear();
163 inputPin.queueBlocked = false;
164 }
165 node->pollEvents.clear();
166 }
167
168 if (!flow::InitializeAllNodes()) // This wakes the threads
169 {
170 std::scoped_lock<std::mutex> lk(_mutex);
171 _state = State::Idle;
172 _cv.notify_all();
173 return;
174 }
175
176 LOG_INFO("=====================================================");
177 bool realTimeMode = std::any_of(flow::m_Nodes().begin(), flow::m_Nodes().end(), [](const Node* node) {
178 return node && !node->isDisabled() && node->_onlyRealTime;
179 });
180 LOG_INFO("Executing in {} mode", realTimeMode ? "real-time" : "post-processing");
181
182 util::time::SetMode(realTimeMode ? util::time::Mode::REAL_TIME : util::time::Mode::POST_PROCESSING);
183 _activeNodes = 0;
184
185 for (Node* node : flow::m_Nodes())
186 {
187 if (node == nullptr || node->kind == Node::Kind::GroupBox || !node->isInitialized()) { continue; }
188
189 {
190 std::scoped_lock<std::mutex> lk(_mutex);
191 if (_state != State::Starting) { break; }
192 }
193
195 if (!realTimeMode)
196 {
197 _activeNodes += 1;
198 LOG_TRACE("Putting node '{}' into post-processing mode and adding to active nodes.", node->nameId());
199 }
200 {
201 std::scoped_lock<std::mutex> guard(node->_configWindowMutex);
202 node->resetNode();
203 }
204 for (size_t i = 0; i < node->outputPins.size(); i++) // for (auto& outputPin : node->outputPins)
205 {
206 auto& outputPin = node->outputPins[i];
207 if (outputPin.type == Pin::Type::Flow && outputPin.isPinLinked())
208 {
209 outputPin.noMoreDataAvailable = false;
210 LOG_TRACE(" Setting pin '{}' to hasDataAvailable", outputPin.name);
211 }
212
213 if (std::holds_alternative<OutputPin::PollDataFunc>(outputPin.data))
214 {
215 LOG_TRACE(" Adding pin '{}' to data poll event list.", outputPin.name);
216 node->pollEvents.insert(std::make_pair(InsTime(), std::make_pair(&outputPin, i)));
217 }
218 else if (std::holds_alternative<OutputPin::PeekPollDataFunc>(outputPin.data))
219 {
220 if (auto* callback = std::get_if<OutputPin::PeekPollDataFunc>(&outputPin.data);
221 !outputPin.noMoreDataAvailable && callback != nullptr && *callback != nullptr
222 && std::ranges::any_of(outputPin.links, [](const OutputPin::OutgoingLink& link) {
223 return link.connectedNode->isInitialized();
224 }))
225 {
226 if (auto obs = (node->**callback)(i, true)) // Peek the data
227 {
228 LOG_TRACE(" Adding pin '{}' to data poll event list with time {}.", outputPin.name, obs->insTime);
229 node->pollEvents.insert(std::make_pair(obs->insTime, std::make_pair(&outputPin, i)));
230 }
231 }
232 }
233 }
234 }
235
236 {
237 std::scoped_lock<std::mutex> lk(_mutex);
238 if (_state == State::Starting)
239 {
241 _state = State::Running;
242 }
243 }
244
245 _startTime = std::chrono::steady_clock::now();
246 LOG_INFO("Execution started");
247
248 bool anyNodeRunning = false;
249 for (Node* node : flow::m_Nodes()) // Search for node pins with data callbacks
250 {
251 if (node != nullptr && node->kind != Node::Kind::GroupBox && node->isInitialized())
252 {
253 anyNodeRunning = true;
254 LOG_DEBUG("Waking up node {}", node->nameId());
255 node->wakeWorker();
256 }
257 }
258
259 if (anyNodeRunning)
260 {
261 // Wait for the nodes to finish
262 bool timeout = true;
263 auto timeoutDuration = std::chrono::minutes(1);
264 while (timeout)
265 {
266 std::unique_lock lk(_mutex);
267
268 if (realTimeMode)
269 {
270 _cv.wait(lk, [] { return _state == State::Stopping; });
271 break;
272 }
273
274 timeout = !_cv.wait_for(lk, timeoutDuration, [] { return _state == State::Stopping; });
275 if (timeout && _activeNodes == 0)
276 {
277 LOG_ERROR("FlowExecutor had a timeout, but all nodes finished already.");
278#ifdef TESTING
279 FAIL("The FlowExecutor should not have a timeout when all nodes are finished already.");
280#endif
281 break;
282 }
283 }
284 }
285
286 // Deinitialize
287 LOG_DEBUG("Stopping FlowExecutor...");
290
291 for (Node* node : flow::m_Nodes())
292 {
293 if (node == nullptr || node->kind == Node::Kind::GroupBox || !node->isInitialized()) { continue; }
294
296 for (auto& outputPin : node->outputPins)
297 {
298 outputPin.noMoreDataAvailable = true;
299 }
300 node->flush();
301 }
302
303 if (!ConfigManager::Get<bool>("nogui")
304 || (!ConfigManager::Get<bool>("sigterm") && !ConfigManager::Get<size_t>("duration")))
305 {
306 auto finish = std::chrono::steady_clock::now();
307 [[maybe_unused]] std::chrono::duration<double> elapsed = finish - _startTime;
308 LOG_INFO("Elapsed time: {:.2f} s", elapsed.count());
309 }
310
311 _activeNodes = 0;
312 LOG_TRACE("FlowExecutor deinitialized.");
313 {
314 std::scoped_lock<std::mutex> lk(_mutex);
315 _state = State::Idle;
316 _cv.notify_all();
317 }
318
319 LOG_TRACE("Execute thread finished.");
320}
ANTEX file reader.
Config management for the Project.
Flow Executor Thread.
The class is responsible for all time-related tasks.
Utility class for logging to console and file.
#define LOG_DEBUG
Debug information. Should not be called on functions which receive observations (spamming)
Definition Logger.hpp:67
#define LOG_ERROR
Error occurred, which stops part of the program to work, but not everything.
Definition Logger.hpp:73
#define LOG_INFO
Info to the user on the state of the program.
Definition Logger.hpp:69
#define LOG_TRACE
Detailled info to trace the execution of the program. Should not be called on functions which receive...
Definition Logger.hpp:65
Node Class.
Keeps track of the current real/simulation time.
static AntexReader & Get()
Get the static Instance of the reader.
The class is responsible for all time-related tasks.
Definition InsTime.hpp:710
Abstract parent class for all nodes.
Definition Node.hpp:92
bool isDisabled() const
Checks if the node is disabled.
Definition Node.cpp:569
bool isInitialized() const
Checks if the node is initialized.
Definition Node.cpp:574
void wakeWorker()
Wakes the worker thread.
Definition Node.cpp:560
std::mutex _configWindowMutex
Mutex to show the config window (prevents initialization to modify values within the config window)
Definition Node.hpp:550
virtual void flush()
Function called by the flow executer after finishing to flush out remaining data.
Definition Node.cpp:72
std::vector< OutputPin > outputPins
List of output pins.
Definition Node.hpp:511
std::multimap< InsTime, std::pair< OutputPin *, size_t > > pollEvents
Map with callback events (sorted by time)
Definition Node.hpp:517
Kind kind
Kind of the Node.
Definition Node.hpp:505
@ POST_PROCESSING
Node running in post-processing mode.
Definition Node.hpp:192
@ REAL_TIME
Node running in real-time mode.
Definition Node.hpp:191
std::atomic< Mode > _mode
Mode the node is currently running in.
Definition Node.hpp:538
virtual bool resetNode()
Resets the node. It is guaranteed that the node is initialized when this is called.
Definition Node.cpp:74
std::string nameId() const
Node name and id.
Definition Node.cpp:323
bool _onlyRealTime
Whether the node can run in post-processing or only real-time.
Definition Node.hpp:531
const T & Get(const std::string &key, const T &&defaultValue)
Retrieves the value of a corresponding key from the configuration, if one exists.
bool isRunning() noexcept
Checks if the thread is running.
void deregisterNode(const Node *node)
Called by nodes when they finished with sending data.
void start()
Starts the Thread.
void waitForFinish()
Waits for a thread to finish its execution.
void execute()
Main task of the thread.
void stop()
Stops the Thread.
const std::vector< Node * > & m_Nodes()
List of all registered Nodes.
void DisableAllCallbacks()
Disables all Node callbacks.
void ClearAllNodeQueues()
Clears all nodes queues.
void EnableAllCallbacks()
Enables all Node callbacks.
bool InitializeAllNodes()
Initializes all nodes.
@ GroupBox
Group box which can group other nodes and drag them together.
Definition Node.hpp:102
@ Flow
NodeData Trigger.
Definition Pin.hpp:52