15 #include <SDKDDKVer.h>
17 #include <boost/asio.hpp>
37#include <condition_variable>
40 #include <catch2/catch_test_macros.hpp>
53std::condition_variable _cv;
55enum class State : uint8_t
62State _state = State::Idle;
65std::atomic<size_t> _activeNodes{ 0 };
66std::chrono::time_point<std::chrono::steady_clock> _startTime;
85 std::scoped_lock<std::mutex> lk(_mutex);
86 return _state != State::Idle;
96 std::scoped_lock<std::mutex> lk(_mutex);
97 _state = State::Starting;
110 std::scoped_lock<std::mutex> lk(_mutex);
111 if (_state == State::Running || _state == State::Starting)
113 _state = State::Stopping;
121 if (_thd.joinable()) { _thd.join(); }
126 LOG_TRACE(
"Waiting for finish of FlowExecutor...");
128 std::unique_lock lk(_mutex);
129 _cv.wait(lk, [] {
return _state == State::Idle; });
133 std::scoped_lock<std::mutex> lk(_mutex);
134 if (_thd.joinable()) { _thd.join(); }
141 LOG_DEBUG(
"Node {} finished.", node->nameId());
144 if (_activeNodes == 0)
146 std::scoped_lock<std::mutex> lk(_mutex);
147 _state = State::Stopping;
160 for (
auto& inputPin : node->inputPins)
162 inputPin.queue.clear();
163 inputPin.queueBlocked =
false;
165 node->pollEvents.clear();
170 std::scoped_lock<std::mutex> lk(_mutex);
171 _state = State::Idle;
176 LOG_INFO(
"=====================================================");
180 LOG_INFO(
"Executing in {} mode", realTimeMode ?
"real-time" :
"post-processing");
182 util::time::SetMode(realTimeMode ? util::time::Mode::REAL_TIME : util::time::Mode::POST_PROCESSING);
190 std::scoped_lock<std::mutex> lk(_mutex);
191 if (_state != State::Starting) {
break; }
198 LOG_TRACE(
"Putting node '{}' into post-processing mode and adding to active nodes.", node->
nameId());
204 for (
size_t i = 0; i < node->
outputPins.size(); i++)
209 outputPin.noMoreDataAvailable =
false;
210 LOG_TRACE(
" Setting pin '{}' to hasDataAvailable", outputPin.name);
213 if (std::holds_alternative<OutputPin::PollDataFunc>(outputPin.data))
215 LOG_TRACE(
" Adding pin '{}' to data poll event list.", outputPin.name);
216 node->
pollEvents.insert(std::make_pair(
InsTime(), std::make_pair(&outputPin, i)));
218 else if (std::holds_alternative<OutputPin::PeekPollDataFunc>(outputPin.data))
220 if (
auto* callback = std::get_if<OutputPin::PeekPollDataFunc>(&outputPin.data);
221 !outputPin.noMoreDataAvailable && callback !=
nullptr && *callback !=
nullptr
223 return link.connectedNode->isInitialized();
226 if (
auto obs = (node->**callback)(i,
true))
228 LOG_TRACE(
" Adding pin '{}' to data poll event list with time {}.", outputPin.name, obs->insTime);
229 node->
pollEvents.insert(std::make_pair(obs->insTime, std::make_pair(&outputPin, i)));
237 std::scoped_lock<std::mutex> lk(_mutex);
238 if (_state == State::Starting)
241 _state = State::Running;
245 _startTime = std::chrono::steady_clock::now();
248 bool anyNodeRunning =
false;
253 anyNodeRunning =
true;
263 auto timeoutDuration = std::chrono::minutes(1);
266 std::unique_lock lk(_mutex);
270 _cv.wait(lk, [] {
return _state == State::Stopping; });
274 timeout = !_cv.wait_for(lk, timeoutDuration, [] {
return _state == State::Stopping; });
275 if (timeout && _activeNodes == 0)
277 LOG_ERROR(
"FlowExecutor had a timeout, but all nodes finished already.");
279 FAIL(
"The FlowExecutor should not have a timeout when all nodes are finished already.");
298 outputPin.noMoreDataAvailable =
true;
306 auto finish = std::chrono::steady_clock::now();
307 [[maybe_unused]] std::chrono::duration<double> elapsed = finish - _startTime;
308 LOG_INFO(
"Elapsed time: {:.2f} s", elapsed.count());
312 LOG_TRACE(
"FlowExecutor deinitialized.");
314 std::scoped_lock<std::mutex> lk(_mutex);
315 _state = State::Idle;
Config management for the Project.
The class is responsible for all time-related tasks.
Utility class for logging to console and file.
#define LOG_DEBUG
Debug information. Should not be called on functions which receive observations (spamming)
#define LOG_ERROR
Error occurred, which stops part of the program to work, but not everything.
#define LOG_INFO
Info to the user on the state of the program.
#define LOG_TRACE
Detailled info to trace the execution of the program. Should not be called on functions which receive...
Keeps track of the current real/simulation time.
static AntexReader & Get()
Get the static Instance of the reader.
The class is responsible for all time-related tasks.
Abstract parent class for all nodes.
bool isDisabled() const
Checks if the node is disabled.
bool isInitialized() const
Checks if the node is initialized.
void wakeWorker()
Wakes the worker thread.
std::mutex _configWindowMutex
Mutex to show the config window (prevents initialization to modify values within the config window)
virtual void flush()
Function called by the flow executer after finishing to flush out remaining data.
std::vector< OutputPin > outputPins
List of output pins.
std::multimap< InsTime, std::pair< OutputPin *, size_t > > pollEvents
Map with callback events (sorted by time)
Kind kind
Kind of the Node.
@ POST_PROCESSING
Node running in post-processing mode.
@ REAL_TIME
Node running in real-time mode.
std::atomic< Mode > _mode
Mode the node is currently running in.
virtual bool resetNode()
Resets the node. It is guaranteed that the node is initialized when this is called.
std::string nameId() const
Node name and id.
bool _onlyRealTime
Whether the node can run in post-processing or only real-time.
const T & Get(const std::string &key, const T &&defaultValue)
Retrieves the value of a corresponding key from the configuration, if one exists.
bool isRunning() noexcept
Checks if the thread is running.
void deregisterNode(const Node *node)
Called by nodes when they finished with sending data.
void start()
Starts the Thread.
void waitForFinish()
Waits for a thread to finish its execution.
void execute()
Main task of the thread.
void stop()
Stops the Thread.
const std::vector< Node * > & m_Nodes()
List of all registered Nodes.
void DisableAllCallbacks()
Disables all Node callbacks.
void ClearAllNodeQueues()
Clears all nodes queues.
void EnableAllCallbacks()
Enables all Node callbacks.
bool InitializeAllNodes()
Initializes all nodes.
@ GroupBox
Group box which can group other nodes and drag them together.
Collection of information about the connected node and pin.