0.4.1
Loading...
Searching...
No Matches
FlowExecutor.cpp
Go to the documentation of this file.
1// This file is part of INSTINCT, the INS Toolkit for Integrated
2// Navigation Concepts and Training by the Institute of Navigation of
3// the University of Stuttgart, Germany.
4//
5// This Source Code Form is subject to the terms of the Mozilla Public
6// License, v. 2.0. If a copy of the MPL was not distributed with this
7// file, You can obtain one at https://mozilla.org/MPL/2.0/.
8
9#include "FlowExecutor.hpp"
10
11// <boost/asio.hpp> needs to be included before <winsock.h> (even though not used in this file)
12// https://stackoverflow.com/questions/9750344/boostasio-winsock-and-winsock-2-compatibility-issue
13#ifdef _WIN32
14 // Set the proper SDK version before including boost/Asio
15 #include <SDKDDKVer.h>
16 // Note boost/ASIO includes Windows.h.
17 #include <boost/asio.hpp>
18#endif //_WIN32
19
21#include "util/Logger.hpp"
23
25
27namespace nm = NAV::NodeManager;
30
31#include <chrono>
32#include <map>
33#include <variant>
34#include <memory>
35
36#include <thread>
37#include <atomic>
38#include <mutex>
39#include <condition_variable>
40
41#ifdef TESTING
42 #include <catch2/catch_test_macros.hpp>
43#endif
44
45/* -------------------------------------------------------------------------------------------------------- */
46/* Private Members */
47/* -------------------------------------------------------------------------------------------------------- */
48
50{
51namespace
52{
53
54std::mutex _mutex;
55std::condition_variable _cv;
56
57enum class State : uint8_t
58{
59 Idle,
60 Starting,
61 Running,
62 Stopping,
63};
64State _state = State::Idle;
65
66std::thread _thd;
67std::atomic<size_t> _activeNodes{ 0 };
68std::chrono::time_point<std::chrono::steady_clock> _startTime;
69
70/* -------------------------------------------------------------------------------------------------------- */
71/* Private Function Declarations */
72/* -------------------------------------------------------------------------------------------------------- */
73
74} // namespace
75
76/// @brief Main task of the thread
77void execute();
78
79} // namespace NAV::FlowExecutor
80
81/* -------------------------------------------------------------------------------------------------------- */
82/* Function Definitions */
83/* -------------------------------------------------------------------------------------------------------- */
84
86{
87 std::scoped_lock<std::mutex> lk(_mutex);
88 return _state != State::Idle;
89}
90
92{
93 LOG_TRACE("called");
94
95 stop();
96
97 {
98 std::scoped_lock<std::mutex> lk(_mutex);
99 _state = State::Starting;
100 }
101
102 _thd = std::thread(execute);
103}
104
106{
107 LOG_TRACE("called");
108
109 if (isRunning())
110 {
111 {
112 std::scoped_lock<std::mutex> lk(_mutex);
113 if (_state == State::Running || _state == State::Starting)
114 {
115 _state = State::Stopping;
116 _cv.notify_all();
117 }
118 }
119
121 }
122
123 if (_thd.joinable()) { _thd.join(); }
124}
125
127{
128 LOG_TRACE("Waiting for finish of FlowExecutor...");
129 {
130 std::unique_lock lk(_mutex);
131 _cv.wait(lk, [] { return _state == State::Idle; });
132 }
133
134 {
135 std::scoped_lock<std::mutex> lk(_mutex);
136 if (_thd.joinable()) { _thd.join(); }
137 }
138 LOG_TRACE("FlowExecutor finished.");
139}
140
141void NAV::FlowExecutor::deregisterNode([[maybe_unused]] const Node* node)
142{
143 LOG_DEBUG("Node {} finished.", node->nameId());
144 _activeNodes--;
145
146 if (_activeNodes == 0)
147 {
148 std::scoped_lock<std::mutex> lk(_mutex);
149 _state = State::Stopping;
150 _cv.notify_all();
151 }
152}
153
155{
156 LOG_TRACE("called");
157
158 AntexReader::Get().reset();
159
160 for (Node* node : nm::m_Nodes())
161 {
162 for (auto& inputPin : node->inputPins)
163 {
164 inputPin.queue.clear();
165 inputPin.queueBlocked = false;
166 }
167 node->pollEvents.clear();
168 }
169
170 if (!nm::InitializeAllNodes()) // This wakes the threads
171 {
172 std::scoped_lock<std::mutex> lk(_mutex);
173 _state = State::Idle;
174 _cv.notify_all();
175 return;
176 }
177
178 LOG_INFO("=====================================================");
179 bool realTimeMode = std::any_of(nm::m_Nodes().begin(), nm::m_Nodes().end(), [](const Node* node) {
180 return node && !node->isDisabled() && node->_onlyRealTime;
181 });
182 LOG_INFO("Executing in {} mode", realTimeMode ? "real-time" : "post-processing");
183
184 util::time::SetMode(realTimeMode ? util::time::Mode::REAL_TIME : util::time::Mode::POST_PROCESSING);
185 _activeNodes = 0;
186
187 for (Node* node : nm::m_Nodes())
188 {
189 if (node == nullptr || node->kind == Node::Kind::GroupBox || !node->isInitialized()) { continue; }
190
191 {
192 std::scoped_lock<std::mutex> lk(_mutex);
193 if (_state != State::Starting) { break; }
194 }
195
197 if (!realTimeMode)
198 {
199 _activeNodes += 1;
200 LOG_TRACE("Putting node '{}' into post-processing mode and adding to active nodes.", node->nameId());
201 }
202 {
203 std::scoped_lock<std::mutex> guard(node->_configWindowMutex);
204 node->resetNode();
205 }
206 for (size_t i = 0; i < node->outputPins.size(); i++) // for (auto& outputPin : node->outputPins)
207 {
208 auto& outputPin = node->outputPins[i];
209 if (outputPin.type == Pin::Type::Flow && outputPin.isPinLinked())
210 {
211 outputPin.noMoreDataAvailable = false;
212 LOG_TRACE(" Setting pin '{}' to hasDataAvailable", outputPin.name);
213 }
214
215 if (std::holds_alternative<OutputPin::PollDataFunc>(outputPin.data))
216 {
217 LOG_TRACE(" Adding pin '{}' to data poll event list.", outputPin.name);
218 node->pollEvents.insert(std::make_pair(InsTime(), std::make_pair(&outputPin, i)));
219 }
220 else if (std::holds_alternative<OutputPin::PeekPollDataFunc>(outputPin.data))
221 {
222 if (auto* callback = std::get_if<OutputPin::PeekPollDataFunc>(&outputPin.data);
223 !outputPin.noMoreDataAvailable && callback != nullptr && *callback != nullptr
224 && std::ranges::any_of(outputPin.links, [](const OutputPin::OutgoingLink& link) {
225 return link.connectedNode->isInitialized();
226 }))
227 {
228 if (auto obs = (node->**callback)(i, true)) // Peek the data
229 {
230 LOG_TRACE(" Adding pin '{}' to data poll event list with time {}.", outputPin.name, obs->insTime);
231 node->pollEvents.insert(std::make_pair(obs->insTime, std::make_pair(&outputPin, i)));
232 }
233 }
234 }
235 }
236 }
237
238 {
239 std::scoped_lock<std::mutex> lk(_mutex);
240 if (_state == State::Starting)
241 {
243 _state = State::Running;
244 }
245 }
246
247 _startTime = std::chrono::steady_clock::now();
248 LOG_INFO("Execution started");
249
250 bool anyNodeRunning = false;
251 for (Node* node : nm::m_Nodes()) // Search for node pins with data callbacks
252 {
253 if (node != nullptr && node->kind != Node::Kind::GroupBox && node->isInitialized())
254 {
255 anyNodeRunning = true;
256 LOG_DEBUG("Waking up node {}", node->nameId());
257 node->wakeWorker();
258 }
259 }
260
261 if (anyNodeRunning)
262 {
263 // Wait for the nodes to finish
264 bool timeout = true;
265 auto timeoutDuration = std::chrono::minutes(1);
266 while (timeout)
267 {
268 std::unique_lock lk(_mutex);
269
270 if (realTimeMode)
271 {
272 _cv.wait(lk, [] { return _state == State::Stopping; });
273 break;
274 }
275
276 timeout = !_cv.wait_for(lk, timeoutDuration, [] { return _state == State::Stopping; });
277 if (timeout && _activeNodes == 0)
278 {
279 LOG_ERROR("FlowExecutor had a timeout, but all nodes finished already.");
280#ifdef TESTING
281 FAIL("The FlowExecutor should not have a timeout when all nodes are finished already.");
282#endif
283 break;
284 }
285 }
286 }
287
288 // Deinitialize
289 LOG_DEBUG("Stopping FlowExecutor...");
292
293 for (Node* node : nm::m_Nodes())
294 {
295 if (node == nullptr || node->kind == Node::Kind::GroupBox || !node->isInitialized()) { continue; }
296
298 for (auto& outputPin : node->outputPins)
299 {
300 outputPin.noMoreDataAvailable = true;
301 }
302 node->flush();
303 }
304
305 if (!ConfigManager::Get<bool>("nogui")
306 || (!ConfigManager::Get<bool>("sigterm") && !ConfigManager::Get<size_t>("duration")))
307 {
308 auto finish = std::chrono::steady_clock::now();
309 [[maybe_unused]] std::chrono::duration<double> elapsed = finish - _startTime;
310 LOG_INFO("Elapsed time: {} s", elapsed.count());
311 }
312
313 _activeNodes = 0;
314 LOG_TRACE("FlowExecutor deinitialized.");
315 {
316 std::scoped_lock<std::mutex> lk(_mutex);
317 _state = State::Idle;
318 _cv.notify_all();
319 }
320
321 LOG_TRACE("Execute thread finished.");
322}
ANTEX file reader.
Config management for the Project.
Flow Executor Thread.
The class is responsible for all time-related tasks.
Utility class for logging to console and file.
#define LOG_DEBUG
Debug information. Should not be called on functions which receive observations (spamming)
Definition Logger.hpp:67
#define LOG_ERROR
Error occurred, which stops part of the program to work, but not everything.
Definition Logger.hpp:73
#define LOG_INFO
Info to the user on the state of the program.
Definition Logger.hpp:69
#define LOG_TRACE
Detailled info to trace the execution of the program. Should not be called on functions which receive...
Definition Logger.hpp:65
Manages all Nodes.
Node Class.
Keeps track of the current real/simulation time.
static AntexReader & Get()
Get the static Instance of the reader.
The class is responsible for all time-related tasks.
Definition InsTime.hpp:710
Abstract parent class for all nodes.
Definition Node.hpp:92
bool isDisabled() const
Checks if the node is disabled.
Definition Node.cpp:499
bool isInitialized() const
Checks if the node is initialized.
Definition Node.cpp:504
void wakeWorker()
Wakes the worker thread.
Definition Node.cpp:490
std::mutex _configWindowMutex
Mutex to show the config window (prevents initialization to modify values within the config window)
Definition Node.hpp:438
virtual void flush()
Function called by the flow executer after finishing to flush out remaining data.
Definition Node.cpp:73
std::vector< OutputPin > outputPins
List of output pins.
Definition Node.hpp:399
std::multimap< InsTime, std::pair< OutputPin *, size_t > > pollEvents
Map with callback events (sorted by time)
Definition Node.hpp:405
Kind kind
Kind of the Node.
Definition Node.hpp:393
@ POST_PROCESSING
Node running in post-processing mode.
Definition Node.hpp:192
@ REAL_TIME
Node running in real-time mode.
Definition Node.hpp:191
std::atomic< Mode > _mode
Mode the node is currently running in.
Definition Node.hpp:426
virtual bool resetNode()
Resets the node. It is guaranteed that the node is initialized when this is called.
Definition Node.cpp:75
std::string nameId() const
Node name and id.
Definition Node.cpp:253
bool _onlyRealTime
Whether the node can run in post-processing or only real-time.
Definition Node.hpp:419
const T & Get(const std::string &key, const T &&defaultValue)
Retrieves the value of a corresponding key from the configuration, if one exists.
bool isRunning() noexcept
Checks if the thread is running.
void deregisterNode(const Node *node)
Called by nodes when they finished with sending data.
void start()
Starts the Thread.
void waitForFinish()
Waits for a thread to finish its execution.
void execute()
Main task of the thread.
void stop()
Stops the Thread.
void ClearAllNodeQueues()
Clears all nodes queues.
void EnableAllCallbacks()
Enables all Node callbacks.
void DisableAllCallbacks()
Disables all Node callbacks.
bool InitializeAllNodes()
Initializes all nodes.
const std::vector< Node * > & m_Nodes()
List of all registered Nodes.
@ GroupBox
Group box which can group other nodes and drag them together.
Definition Node.hpp:102
@ Flow
NodeData Trigger.
Definition Pin.hpp:52