15 #include <SDKDDKVer.h>
17 #include <boost/asio.hpp>
39#include <condition_variable>
42 #include <catch2/catch_test_macros.hpp>
55std::condition_variable _cv;
57enum class State : uint8_t
64State _state = State::Idle;
67std::atomic<size_t> _activeNodes{ 0 };
68std::chrono::time_point<std::chrono::steady_clock> _startTime;
87 std::scoped_lock<std::mutex> lk(_mutex);
88 return _state != State::Idle;
98 std::scoped_lock<std::mutex> lk(_mutex);
99 _state = State::Starting;
112 std::scoped_lock<std::mutex> lk(_mutex);
113 if (_state == State::Running || _state == State::Starting)
115 _state = State::Stopping;
123 if (_thd.joinable()) { _thd.join(); }
128 LOG_TRACE(
"Waiting for finish of FlowExecutor...");
130 std::unique_lock lk(_mutex);
131 _cv.wait(lk, [] {
return _state == State::Idle; });
135 std::scoped_lock<std::mutex> lk(_mutex);
136 if (_thd.joinable()) { _thd.join(); }
143 LOG_DEBUG(
"Node {} finished.", node->nameId());
146 if (_activeNodes == 0)
148 std::scoped_lock<std::mutex> lk(_mutex);
149 _state = State::Stopping;
162 for (
auto& inputPin : node->inputPins)
164 inputPin.queue.clear();
165 inputPin.queueBlocked =
false;
167 node->pollEvents.clear();
172 std::scoped_lock<std::mutex> lk(_mutex);
173 _state = State::Idle;
178 LOG_INFO(
"=====================================================");
182 LOG_INFO(
"Executing in {} mode", realTimeMode ?
"real-time" :
"post-processing");
184 util::time::SetMode(realTimeMode ? util::time::Mode::REAL_TIME : util::time::Mode::POST_PROCESSING);
192 std::scoped_lock<std::mutex> lk(_mutex);
193 if (_state != State::Starting) {
break; }
200 LOG_TRACE(
"Putting node '{}' into post-processing mode and adding to active nodes.", node->
nameId());
206 for (
size_t i = 0; i < node->
outputPins.size(); i++)
211 outputPin.noMoreDataAvailable =
false;
212 LOG_TRACE(
" Setting pin '{}' to hasDataAvailable", outputPin.name);
215 if (std::holds_alternative<OutputPin::PollDataFunc>(outputPin.data))
217 LOG_TRACE(
" Adding pin '{}' to data poll event list.", outputPin.name);
218 node->
pollEvents.insert(std::make_pair(
InsTime(), std::make_pair(&outputPin, i)));
220 else if (std::holds_alternative<OutputPin::PeekPollDataFunc>(outputPin.data))
222 if (
auto* callback = std::get_if<OutputPin::PeekPollDataFunc>(&outputPin.data);
223 !outputPin.noMoreDataAvailable && callback !=
nullptr && *callback !=
nullptr
225 return link.connectedNode->isInitialized();
228 if (
auto obs = (node->**callback)(i,
true))
230 LOG_TRACE(
" Adding pin '{}' to data poll event list with time {}.", outputPin.name, obs->insTime);
231 node->
pollEvents.insert(std::make_pair(obs->insTime, std::make_pair(&outputPin, i)));
239 std::scoped_lock<std::mutex> lk(_mutex);
240 if (_state == State::Starting)
243 _state = State::Running;
247 _startTime = std::chrono::steady_clock::now();
250 bool anyNodeRunning =
false;
255 anyNodeRunning =
true;
265 auto timeoutDuration = std::chrono::minutes(1);
268 std::unique_lock lk(_mutex);
272 _cv.wait(lk, [] {
return _state == State::Stopping; });
276 timeout = !_cv.wait_for(lk, timeoutDuration, [] {
return _state == State::Stopping; });
277 if (timeout && _activeNodes == 0)
279 LOG_ERROR(
"FlowExecutor had a timeout, but all nodes finished already.");
281 FAIL(
"The FlowExecutor should not have a timeout when all nodes are finished already.");
300 outputPin.noMoreDataAvailable =
true;
308 auto finish = std::chrono::steady_clock::now();
309 [[maybe_unused]] std::chrono::duration<double> elapsed = finish - _startTime;
310 LOG_INFO(
"Elapsed time: {} s", elapsed.count());
314 LOG_TRACE(
"FlowExecutor deinitialized.");
316 std::scoped_lock<std::mutex> lk(_mutex);
317 _state = State::Idle;
Config management for the Project.
The class is responsible for all time-related tasks.
Utility class for logging to console and file.
#define LOG_DEBUG
Debug information. Should not be called on functions which receive observations (spamming)
#define LOG_ERROR
Error occurred, which stops part of the program to work, but not everything.
#define LOG_INFO
Info to the user on the state of the program.
#define LOG_TRACE
Detailled info to trace the execution of the program. Should not be called on functions which receive...
Keeps track of the current real/simulation time.
static AntexReader & Get()
Get the static Instance of the reader.
The class is responsible for all time-related tasks.
Abstract parent class for all nodes.
bool isDisabled() const
Checks if the node is disabled.
bool isInitialized() const
Checks if the node is initialized.
void wakeWorker()
Wakes the worker thread.
std::mutex _configWindowMutex
Mutex to show the config window (prevents initialization to modify values within the config window)
virtual void flush()
Function called by the flow executer after finishing to flush out remaining data.
std::vector< OutputPin > outputPins
List of output pins.
std::multimap< InsTime, std::pair< OutputPin *, size_t > > pollEvents
Map with callback events (sorted by time)
Kind kind
Kind of the Node.
@ POST_PROCESSING
Node running in post-processing mode.
@ REAL_TIME
Node running in real-time mode.
std::atomic< Mode > _mode
Mode the node is currently running in.
virtual bool resetNode()
Resets the node. It is guaranteed that the node is initialized when this is called.
std::string nameId() const
Node name and id.
bool _onlyRealTime
Whether the node can run in post-processing or only real-time.
const T & Get(const std::string &key, const T &&defaultValue)
Retrieves the value of a corresponding key from the configuration, if one exists.
bool isRunning() noexcept
Checks if the thread is running.
void deregisterNode(const Node *node)
Called by nodes when they finished with sending data.
void start()
Starts the Thread.
void waitForFinish()
Waits for a thread to finish its execution.
void execute()
Main task of the thread.
void stop()
Stops the Thread.
void ClearAllNodeQueues()
Clears all nodes queues.
void EnableAllCallbacks()
Enables all Node callbacks.
void DisableAllCallbacks()
Disables all Node callbacks.
bool InitializeAllNodes()
Initializes all nodes.
const std::vector< Node * > & m_Nodes()
List of all registered Nodes.
@ GroupBox
Group box which can group other nodes and drag them together.
Collection of information about the connected node and pin.