23#include <imgui_node_editor.h>
26 #include <catch2/catch_test_macros.hpp>
98 if (!outputPin.isPinLinked()) {
return; }
100 for (
auto& link : outputPin.links)
102 auto* targetPin = link.getConnectedPin();
103 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
105 outputPin.dataAccessCounter++;
106 link.dataChangeNotification =
true;
107 LOG_DATA(
"{}: Increasing data access counter on output pin '{}'. Value now {}.",
nameId(), outputPin.name, outputPin.dataAccessCounter);
114 auto data = std::make_shared<NodeData>();
115 data->insTime = insTime;
117 targetPin->queue.push_back(data);
120 for (
const auto& link : outputPin.links)
122 auto* targetPin = link.getConnectedPin();
123 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
125 LOG_DATA(
"{}: Waking up worker of node '{}'. New data on pin '{}'",
nameId(), link.connectedNode->nameId(), targetPin->name);
126 link.connectedNode->wakeWorker();
136 std::unique_lock<std::mutex> lk(outputPin.dataAccessMutex);
137 if (outputPin.dataAccessCounter > 0)
139 LOG_DATA(
"{}: Requesting lock on output pin '{}', {} threads accessing still.",
nameId(), outputPin.name, outputPin.dataAccessCounter);
140 outputPin.dataAccessConditionVariable.wait(lk, [&outputPin]() {
return outputPin.dataAccessCounter == 0; });
141 LOG_DATA(
"{}: Lock on output pin '{}' acquired.",
nameId(), outputPin.name);
144 return std::scoped_lock(outputPin.dataAccessMutex);
151 std::scoped_lock<std::mutex> lk(outputPin->dataAccessMutex);
152 if (outputPin->dataAccessCounter > 0)
155 return link.connectedPinId == inputPins.at(portIndex).id;
157 if (outgoingLink != outputPin->links.end() && outgoingLink->dataChangeNotification)
159 outgoingLink->dataChangeNotification =
false;
160 outputPin->dataAccessCounter--;
162 if (outputPin->dataAccessCounter == 0)
164 LOG_DATA(
"{}: Notifying node '{}' connected to pin {} that all data is read.",
nameId(), outputPin->parentNode->nameId(), outputPin->name);
165 outputPin->dataAccessConditionVariable.notify_all();
185 LOG_DEBUG(
"{}: Tried to invokeCallbacks on pin {} with a nullptr, which is not allowed!!!",
nameId(), portIndex);
188 if (data->insTime.empty())
190 LOG_DATA(
"{}: Tried to invokeCallbacks on pin {} without a InsTime. The time is mandatory though!!! ",
nameId(), portIndex);
194 for (
const auto& link :
outputPins.at(portIndex).links)
196 auto* targetPin = link.getConnectedPin();
197 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
204 targetPin->queue.push_back(data);
205 LOG_DATA(
"{}: Waking up worker of node {}. New data on pin '{}'",
nameId(),
size_t(link.connectedNode->id), targetPin->name);
206 link.connectedNode->wakeWorker();
216 if (pinId == inputPin.id) {
return inputPin; }
219 throw std::runtime_error(fmt::format(
"{}: The Pin {} is not on this node.",
nameId(),
size_t(pinId)).c_str());
226 if (pinId == outputPin.id) {
return outputPin; }
229 throw std::runtime_error(fmt::format(
"{}: The Pin {} is not on this node.",
nameId(),
size_t(pinId)).c_str());
234 for (
size_t i = 0; i <
inputPins.size(); i++)
236 if (pinId ==
inputPins.at(i).id) {
return i; }
239 throw std::runtime_error(fmt::format(
"{}: The Pin {} is not on this node.",
nameId(),
size_t(pinId)).c_str());
244 for (
size_t i = 0; i <
outputPins.size(); i++)
246 if (pinId ==
outputPins.at(i).id) {
return i; }
249 throw std::runtime_error(fmt::format(
"{}: The Pin {} is not on this node.",
nameId(),
size_t(pinId)).c_str());
255 LOG_TRACE(
"called for pin ({}) of type ({}) for node [{}]",
name, std::string(pinType),
nameId());
258 idx =
static_cast<int>(
inputPins.size());
260 idx = std::min(idx,
static_cast<int>(
inputPins.size()));
261 auto iter = std::next(
inputPins.begin(), idx);
265 inputPins.at(
static_cast<size_t>(idx)).callback = callback;
266 if (firable !=
nullptr)
268 inputPins.at(
static_cast<size_t>(idx)).firable = firable;
270 inputPins.at(
static_cast<size_t>(idx)).dataIdentifier = dataIdentifier;
271 inputPins.at(
static_cast<size_t>(idx)).priority = priority;
275 return &
inputPins.at(
static_cast<size_t>(idx));
280 LOG_TRACE(
"called for pin ({}) of type ({}) for node [{}]",
name, std::string(pinType),
nameId());
285 idx = std::min(idx,
static_cast<int>(
outputPins.size()));
286 auto iter = std::next(
outputPins.begin(), idx);
290 outputPins.at(
static_cast<size_t>(idx)).data = data;
291 outputPins.at(
static_cast<size_t>(idx)).dataIdentifier = dataIdentifier;
295 return &
outputPins.at(
static_cast<size_t>(idx));
301 LOG_TRACE(
"called for pin ({})",
size_t(pin.id));
305 pin.parentNode->outputPins.erase(pin.parentNode->outputPins.begin() +
static_cast<int64_t
>(pinIndex));
313 LOG_TRACE(
"called for pin ({})",
size_t(pin.id));
317 LOG_DEBUG(
"Erasing pin at idx {}", pinIndex);
318 pin.parentNode->inputPins.erase(pin.parentNode->inputPins.begin() +
static_cast<int64_t
>(pinIndex));
340 return "Deinitialized";
342 return "DoInitialize";
344 return "Initializing";
346 return "Initialized";
348 return "DoDeinitialize";
350 return "Deinitializing";
608 std::unique_lock<std::mutex> lk(node->
_stateMutex);
611 if (lk.owns_lock()) { lk.unlock(); }
613 if (std::unique_lock<std::mutex> lock(node->
_stateMutex);
622 if (std::unique_lock<std::mutex> lock(node->
_stateMutex);
634 if (std::unique_lock<std::mutex> lock(node->
_stateMutex);
651 bool timeout =
false;
668 return inputPin.isPinLinked();
674 bool notifyTriggered =
false;
675 for (
size_t i = 0; i < node->
inputPins.size(); i++)
680 if (
auto callback = std::get<InputPin::DataChangedNotifyFunc>(inputPin.
callback))
682 LOG_DATA(
"{}: Invoking notify callback on input pin '{}'", node->
nameId(), inputPin.
name);
685 for (
const auto& watcherCallback : inputPin.watcherCallbacks)
687 if (
auto watcherCall = std::get<InputPin::DataChangedWatcherNotifyFunc>(watcherCallback))
689 std::invoke(watcherCall, node, insTime, i);
693 std::invoke(callback, node, insTime, i);
694 notifyTriggered =
true;
698 if (notifyTriggered) {
continue; }
708 bool allInputPinsHaveData = !node->
inputPins.empty();
709 for (
const auto& inputPin : node->
inputPins)
716 allInputPinsHaveData =
false;
721 if (!allInputPinsHaveData)
723 LOG_DATA(
"{}: Not all pins have data for temporal sorting", node->
nameId());
726 LOG_DATA(
"{}: All pins have data for temporal sorting", node->
nameId());
731 size_t earliestInputPinIdx = 0;
732 int earliestInputPinPriority = -1000;
733 for (
size_t i = 0; i < node->
inputPins.size(); i++)
737 && (earliestTime.
empty()
738 || inputPin.
queue.
front()->insTime < earliestTime
739 || (inputPin.
queue.
front()->insTime == earliestTime && inputPin.
priority > earliestInputPinPriority)))
741 earliestTime = inputPin.
queue.
front()->insTime;
742 earliestInputPinIdx = i;
743 earliestInputPinPriority = inputPin.
priority;
746 if (earliestInputPinPriority == -1000) {
break; }
748 auto& inputPin = node->
inputPins[earliestInputPinIdx];
751 if (
auto callback = std::get<InputPin::FlowFirableCallbackFunc>(inputPin.
callback))
755 for (
const auto& watcherCallback : inputPin.watcherCallbacks)
757 if (
auto watcherCall = std::get<InputPin::FlowFirableWatcherCallbackFunc>(watcherCallback))
759 std::invoke(watcherCall, node, inputPin.
queue, earliestInputPinIdx);
763 std::invoke(callback, node, inputPin.
queue, earliestInputPinIdx);
787 std::multimap<InsTime, std::pair<OutputPin*, size_t>>::iterator it;
791 size_t outputPinIdx = it->second.second;
794 if (std::holds_alternative<OutputPin::PollDataFunc>(outputPin->
data))
796 auto* callback = std::get_if<OutputPin::PollDataFunc>(&outputPin->
data);
797 if (callback !=
nullptr && *callback !=
nullptr)
800 if ((node->**callback)() ==
nullptr)
807 else if (std::holds_alternative<OutputPin::PeekPollDataFunc>(outputPin->
data))
809 auto* callback = std::get_if<OutputPin::PeekPollDataFunc>(&outputPin->
data);
810 if (callback !=
nullptr && *callback !=
nullptr)
812 if (!it->first.empty())
816 if ((node->**callback)(outputPinIdx,
false) ==
nullptr)
818 LOG_ERROR(
"{}: {} could not poll its observation despite being able to peek it.", node->
nameId(), outputPin->
name);
823 if (
auto obs = (node->**callback)(outputPinIdx,
true))
826 if (!obs->insTime.empty())
828 node->
pollEvents.insert(std::make_pair(obs->insTime, std::make_pair(outputPin, outputPinIdx)));
832 (node->**callback)(outputPinIdx,
false);
840 for (
auto& link : outputPin->
links)
842 link.connectedNode->wakeWorker();
848 LOG_ERROR(
"{} - {}: Callback is not valid anymore", node->
nameId(),
size_t(outputPin->
id));
861 if (!outputPin.noMoreDataAvailable)
863 LOG_TRACE(
"{}: Output Pin finished: {}", node->
nameId(), outputPin.name);
864 outputPin.noMoreDataAvailable =
true;
865 for (
auto& link : outputPin.links)
867 link.connectedNode->wakeWorker();
881 return inputPin.type != Pin::Type::Flow || !inputPin.isPinLinked() || inputPin.link.connectedNode->isDisabled()
882 || !inputPin.link.getConnectedPin()->blocksConnectedNodeFromFinishing
883 || (inputPin.queue.empty() && inputPin.link.getConnectedPin()->noMoreDataAvailable);
890 LOG_TRACE(
"{}: Output Pin finished: {}", node->
nameId(), outputPin.name);
891 outputPin.noMoreDataAvailable =
true;
892 for (
auto& link : outputPin.links)
894 link.connectedNode->wakeWorker();
924 if (
Node* connectedNode = inputPin.link.connectedNode)
926 if (!connectedNode->isInitialized())
928 LOG_DEBUG(
"{}: Initializing connected Node '{}' on input Pin {}",
nameId(), connectedNode->nameId(),
size_t(inputPin.id));
929 if (!connectedNode->doInitialize(
true))
931 LOG_ERROR(
"{}: Could not initialize connected node {}",
nameId(), connectedNode->nameId());
948 bool initSucceeded =
false;
959 inputPin.queue.clear();
960 inputPin.queueBlocked =
false;
972 outputPin.noMoreDataAvailable =
true;
973 for (
auto& link : outputPin.links)
975 LOG_TRACE(
"{}: Waking connected node '{}'",
nameId(), link.connectedNode->nameId());
976 link.connectedNode->wakeWorker();
1016 for (
const auto& link : outputPin.links)
1018 if (link.connectedNode->isInitialized())
1021 _reinitialize ?
"Reinitializing" :
"Deinitializing", link.connectedNode->nameId(),
size_t(outputPin.id));
1022 if (
_reinitialize) { link.connectedNode->doReinitialize(); }
1023 else { link.connectedNode->doDeinitialize(); }
1062 ImVec2 realSize = ed::GetNodeSize(node.
id);
1064 realSize.y -= 38.0F;
1066 {
"id", size_t(node.
id) },
1067 {
"type", node.
type() },
1068 {
"kind", std::string(node.
kind) },
1069 {
"name", node.
name },
1070 {
"size", node.
_size.x == 0 && node.
_size.y == 0 ? node.
_size : realSize },
1071 {
"pos", ed::GetNodePosition(node.
id) },
1079 node.
id = j.at(
"id").get<
size_t>();
1080 if (j.contains(
"kind"))
1084 if (j.contains(
"name"))
1086 j.at(
"name").get_to(node.
name);
1088 if (j.contains(
"size"))
1090 j.at(
"size").get_to(node.
_size);
1096 if (j.contains(
"enabled"))
1098 bool enabled = j.at(
"enabled").get<
bool>();
1106 if (j.contains(
"inputPins"))
1108 auto inputPins = j.at(
"inputPins").get<std::vector<InputPin>>();
1109 for (
size_t i = 0; i < inputPins.size(); ++i)
1115 j.at(
"inputPins").at(i).get_to(node.
inputPins.at(i));
1119 if (j.contains(
"outputPins"))
1121 auto outputPins = j.at(
"outputPins").get<std::vector<OutputPin>>();
1122 for (
size_t i = 0; i < outputPins.size(); ++i)
1128 j.at(
"outputPins").at(i).get_to(node.
outputPins.at(i));
#define INS_ASSERT_USER_ERROR(_EXP, _MSG)
Assert function with message.
nlohmann::json json
json namespace
Defines how to save certain datatypes to json.
Utility class for logging to console and file.
#define LOG_DEBUG
Debug information. Should not be called on functions which receive observations (spamming)
#define LOG_DATA
All output which occurs repeatedly every time observations are received.
#define LOG_ERROR
Error occurred, which stops part of the program to work, but not everything.
#define LOG_TRACE
Detailled info to trace the execution of the program. Should not be called on functions which receive...
Utility functions for working with std::strings.
The class is responsible for all time-related tasks.
constexpr bool empty() const
Checks if the Time object has a value.
Abstract parent class for all nodes.
bool _workerWakeup
Variable to prevent the worker from sleeping.
bool isDisabled() const
Checks if the node is disabled.
bool isOnlyRealtime() const
Checks if the node is only working in real time (sensors, network interfaces, ...)
bool isInitialized() const
Checks if the node is initialized.
bool doDeinitialize(bool wait=false)
Asks the node worker to deinitialize the node.
static void workerThread(Node *node)
Worker thread.
virtual void restore(const json &j)
Restores the node from a json object.
void releaseInputValue(size_t portIndex)
Unblocks the connected node. Has to be called when the input value should be released and getInputVal...
bool _reinitialize
Flag if the node should be reinitialize after deinitializing.
virtual void workerTimeoutHandler()
Handler which gets triggered if the worker runs into a periodic timeout.
bool _disable
Flag if the node should be disabled after deinitializing.
bool doDisable(bool wait=false)
Asks the node worker to disable the node.
State
Possible states of the node.
@ DoInitialize
Node should be initialized.
@ Shutdown
Node is shutting down.
@ DoShutdown
Node should shut down.
@ Initializing
Node is currently initializing.
@ Initialized
Node is initialized (green)
@ DoDeinitialize
Node should be deinitialized.
@ Disabled
Node is disabled and won't be initialized.
@ Deinitializing
Node is currently deinitializing.
@ Deinitialized
Node is deinitialized (red)
void wakeWorker()
Wakes the worker thread.
ImVec2 _size
Size of the node in pixels.
std::mutex _configWindowMutex
Mutex to show the config window (prevents initialization to modify values within the config window)
State getState() const
Get the current state of the node.
bool workerInitializeNode()
Called by the worker to initialize the node.
void notifyOutputValueChanged(size_t pinIdx, const InsTime &insTime, const std::scoped_lock< std::mutex > &&guard)
Notifies connected nodes about the change.
const ImVec2 & getSize() const
Get the size of the node.
bool hasInputPinWithSameTime(const InsTime &insTime) const
Checks wether there is an input pin with the same time.
virtual void flush()
Function called by the flow executer after finishing to flush out remaining data.
std::vector< OutputPin > outputPins
List of output pins.
Node(std::string name)
Constructor.
bool DeleteOutputPin(size_t pinIndex)
Deletes the output pin. Invalidates the pin reference given.
bool workerDeinitializeNode()
Called by the worker to deinitialize the node.
std::multimap< InsTime, std::pair< OutputPin *, size_t > > pollEvents
Map with callback events (sorted by time)
Kind kind
Kind of the Node.
size_t outputPinIndexFromId(ax::NodeEditor::PinId pinId) const
Returns the index of the pin.
std::vector< InputPin > inputPins
List of input pins.
OutputPin * CreateOutputPin(const char *name, Pin::Type pinType, const std::vector< std::string > &dataIdentifier, OutputPin::PinData data=static_cast< void * >(nullptr), int idx=-1)
Create an Output Pin object.
OutputPin & outputPinFromId(ax::NodeEditor::PinId pinId)
Returns the pin with the given id.
virtual void deinitialize()
Deinitialize the Node.
bool doInitialize(bool wait=false)
Asks the node worker to initialize the node.
State _state
Current state of the node.
Mode
Different Modes the Node can work in.
@ POST_PROCESSING
Node running in post-processing mode.
@ REAL_TIME
Node running in real-time mode.
bool callbacksEnabled
Enables the callbacks.
std::atomic< Mode > _mode
Mode the node is currently running in.
virtual bool resetNode()
Resets the node. It is guaranteed that the node is initialized when this is called.
std::string nameId() const
Node name and id.
virtual void afterCreateLink(OutputPin &startPin, InputPin &endPin)
Called when a new link was established.
std::thread _worker
Worker handling initialization and processing of data.
size_t inputPinIndexFromId(ax::NodeEditor::PinId pinId) const
Returns the index of the pin.
virtual void onDeleteLink(OutputPin &startPin, InputPin &endPin)
Called when a link is to be deleted.
virtual ~Node()
Destructor.
Mode getMode() const
Get the current mode of the node.
std::mutex _workerMutex
Mutex to interact with the worker condition variable.
virtual void guiConfig()
ImGui config window which is shown on double click.
std::string name
Name of the Node.
bool _onlyRealTime
Whether the node can run in post-processing or only real-time.
bool doEnable()
Enable the node.
bool doReinitialize(bool wait=false)
Asks the node worker to reinitialize the node.
std::chrono::duration< int64_t > _workerTimeout
Periodic timeout of the worker to check if new data available.
std::condition_variable _workerConditionVariable
Condition variable to signal the worker thread to do something.
InputPin * CreateInputPin(const char *name, Pin::Type pinType, const std::vector< std::string > &dataIdentifier={}, InputPin::Callback callback=static_cast< InputPin::FlowFirableCallbackFunc >(nullptr), InputPin::FlowFirableCheckFunc firable=nullptr, int priority=0, int idx=-1)
Create an Input Pin object.
virtual void restoreAtferLink(const json &j)
Restores link related properties of the node from a json object.
virtual void afterDeleteLink(OutputPin &startPin, InputPin &endPin)
Called when a link was deleted.
InputPin & inputPinFromId(ax::NodeEditor::PinId pinId)
Returns the pin with the given id.
std::scoped_lock< std::mutex > requestOutputValueLock(size_t pinIdx)
Blocks the thread till the output values was read by all connected nodes.
void invokeCallbacks(size_t portIndex, const std::shared_ptr< const NodeData > &data)
Calls all registered callbacks on the specified output port.
virtual json save() const
Saves the node into a json object.
static std::string toString(State state)
Converts the state into a printable text.
static bool _autostartWorker
Flag which prevents the worker to be autostarted if false.
ax::NodeEditor::NodeId id
Unique Id of the Node.
bool DeleteInputPin(size_t pinIndex)
Deletes the input pin. Invalidates the pin reference given.
virtual bool onCreateLink(OutputPin &startPin, InputPin &endPin)
Called when a new link is to be established.
virtual std::string type() const =0
String representation of the Class Type.
std::mutex _stateMutex
Mutex to interact with the worker state variable.
bool isTransient() const
Checks if the node is changing its state currently.
virtual bool initialize()
Initialize the Node.
std::variant< const void *, const bool *, const int *, const float *, const double *, const std::string *, PeekPollDataFunc, PollDataFunc > PinData
Possible Types represented by an output pin.
PinData data
Pointer to data (owned by this node) which is transferred over this pin.
std::vector< OutgoingLink > links
Info to identify the linked pins.
std::atomic< bool > noMoreDataAvailable
Flag set, when no more data is available on this pin.
Node * parentNode
Reference to the parent node.
std::string name
Name of the Pin.
ax::NodeEditor::PinId id
Unique Id of the Pin.
Type type
Type of the Pin.
auto & front()
Returns a reference to the first element in the container.
auto extract_front()
Returns a copy of the first element in the container and removes it from the container.
bool empty() const noexcept
Checks if the container has no elements, i.e. whether 'begin() == end()'.
void pop_front()
Removes the first element of the container. If there are no elements in the container,...
static bool showFlowWhenInvokingCallbacks
Flag if invokeCallbacks triggers a GUI Flow event.
static bool showFlowWhenNotifyingValueChange
Flag if notifyOutputValueChanged & notifyInputValueChanged triggers a GUI Flow event.
void Add(ax::NodeEditor::LinkId id, ax::NodeEditor::FlowDirection direction=ax::NodeEditor::FlowDirection::Forward)
Thread safe flow animaton of links.
void deregisterNode(const Node *node)
Called by nodes when they finished with sending data.
ax::NodeEditor::PinId GetNextPinId()
Generates a new pin id.
void ApplyChanges()
Signals that there have been changes to the flow.
static std::string replaceAll_copy(std::string str, const std::string &from, const std::string &to, CaseSensitivity cs)
Replaces all occurrence of a search pattern with another sequence.
void to_json(json &j, const Node &node)
Converts the provided node into a json object.
void move(std::vector< T > &v, size_t sourceIdx, size_t targetIdx)
Moves an element within a vector to a new position.
void from_json(const json &j, Node &node)
Converts the provided json object into a node object.
@ GroupBox
Group box which can group other nodes and drag them together.
Collection of information about the connected node and pin.
Type of the data on the Pin.