0.5.1
Loading...
Searching...
No Matches
Node.cpp
Go to the documentation of this file.
1// This file is part of INSTINCT, the INS Toolkit for Integrated
2// Navigation Concepts and Training by the Institute of Navigation of
3// the University of Stuttgart, Germany.
4//
5// This Source Code Form is subject to the terms of the Mozilla Public
6// License, v. 2.0. If a copy of the MPL was not distributed with this
7// file, You can obtain one at https://mozilla.org/MPL/2.0/.
8
9#include "Node.hpp"
10
11#include <stdexcept>
12
13#include "util/Logger.hpp"
14#include "util/StringUtil.hpp"
15#include "util/Assert.h"
16
20#include "util/Json.hpp"
22
23#include <imgui_node_editor.h>
24namespace ed = ax::NodeEditor;
25#ifdef TESTING
26 #include <catch2/catch_test_macros.hpp>
27#endif
28
30 : name(std::move(name))
31{
32 LOG_TRACE("{}: called", nameId());
34 {
35 _worker = std::thread(workerThread, this);
36 }
37}
38
40{
41 LOG_TRACE("{}: called", nameId());
42
44 {
46 wakeWorker();
47
48 // // wait for the worker
49 // {
50 // std::unique_lock lk(_workerMutex);
51 // _workerConditionVariable.wait(lk, [&, this] { return _state == State::Shutdown; });
52 // }
53 _worker.join();
54 }
55}
56
58
59json NAV::Node::save() const { return {}; }
60
61void NAV::Node::restore(const json& /*j*/) {}
62
63void NAV::Node::restoreAtferLink(const json& /*j*/) {}
64
66{
67 return true;
68}
69
71
73
75{
76 LOG_TRACE("{}: called", nameId());
77
78 return initialize();
79}
80
81bool NAV::Node::onCreateLink(OutputPin& /*startPin*/, InputPin& /*endPin*/)
82{
83 return true;
84}
85
86void NAV::Node::onDeleteLink(OutputPin& /*startPin*/, InputPin& /*endPin*/) {}
87
88void NAV::Node::afterCreateLink(OutputPin& /*startPin*/, InputPin& /*endPin*/) {}
89
90void NAV::Node::afterDeleteLink(OutputPin& /*startPin*/, InputPin& /*endPin*/) {}
91
92void NAV::Node::notifyOutputValueChanged(size_t pinIdx, const InsTime& insTime, const std::scoped_lock<std::mutex>&& /* guard */)
93{
95 {
96 auto& outputPin = outputPins.at(pinIdx);
97
98 if (!outputPin.isPinLinked()) { return; }
99
100 for (auto& link : outputPin.links)
101 {
102 auto* targetPin = link.getConnectedPin();
103 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
104 {
105 outputPin.dataAccessCounter++;
106 link.dataChangeNotification = true;
107 LOG_DATA("{}: Increasing data access counter on output pin '{}'. Value now {}.", nameId(), outputPin.name, outputPin.dataAccessCounter);
108
110 {
111 FlowAnimation::Add(link.linkId);
112 }
113
114 auto data = std::make_shared<NodeData>();
115 data->insTime = insTime;
116
117 targetPin->queue.push_back(data);
118 }
119 }
120 for (const auto& link : outputPin.links)
121 {
122 auto* targetPin = link.getConnectedPin();
123 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
124 {
125 LOG_DATA("{}: Waking up worker of node '{}'. New data on pin '{}'", nameId(), link.connectedNode->nameId(), targetPin->name);
126 link.connectedNode->wakeWorker();
127 }
128 }
129 }
130}
131
132std::scoped_lock<std::mutex> NAV::Node::requestOutputValueLock(size_t pinIdx)
133{
134 auto& outputPin = outputPins.at(pinIdx);
135 {
136 std::unique_lock<std::mutex> lk(outputPin.dataAccessMutex);
137 if (outputPin.dataAccessCounter > 0)
138 {
139 LOG_DATA("{}: Requesting lock on output pin '{}', {} threads accessing still.", nameId(), outputPin.name, outputPin.dataAccessCounter);
140 outputPin.dataAccessConditionVariable.wait(lk, [&outputPin]() { return outputPin.dataAccessCounter == 0; });
141 LOG_DATA("{}: Lock on output pin '{}' acquired.", nameId(), outputPin.name);
142 }
143 }
144 return std::scoped_lock(outputPin.dataAccessMutex);
145}
146
147void NAV::Node::releaseInputValue(size_t portIndex)
148{
149 if (OutputPin* outputPin = inputPins.at(portIndex).link.getConnectedPin())
150 {
151 std::scoped_lock<std::mutex> lk(outputPin->dataAccessMutex);
152 if (outputPin->dataAccessCounter > 0)
153 {
154 auto outgoingLink = std::ranges::find_if(outputPin->links, [&](const OutputPin::OutgoingLink& link) {
155 return link.connectedPinId == inputPins.at(portIndex).id;
156 });
157 if (outgoingLink != outputPin->links.end() && outgoingLink->dataChangeNotification)
158 {
159 outgoingLink->dataChangeNotification = false;
160 outputPin->dataAccessCounter--;
161
162 if (outputPin->dataAccessCounter == 0)
163 {
164 LOG_DATA("{}: Notifying node '{}' connected to pin {} that all data is read.", nameId(), outputPin->parentNode->nameId(), outputPin->name);
165 outputPin->dataAccessConditionVariable.notify_all();
166 }
167 }
168 }
169 }
170}
171
173{
174 return std::ranges::any_of(inputPins, [&insTime](const InputPin& pin) {
175 return !pin.queue.empty() && pin.queue.front()->insTime == insTime;
176 });
177}
178
179void NAV::Node::invokeCallbacks(size_t portIndex, const std::shared_ptr<const NAV::NodeData>& data)
180{
182 {
183 if (data == nullptr)
184 {
185 LOG_DEBUG("{}: Tried to invokeCallbacks on pin {} with a nullptr, which is not allowed!!!", nameId(), portIndex);
186 return;
187 }
188 if (data->insTime.empty())
189 {
190 LOG_DATA("{}: Tried to invokeCallbacks on pin {} without a InsTime. The time is mandatory though!!! ", nameId(), portIndex);
191 return;
192 }
193
194 for (const auto& link : outputPins.at(portIndex).links)
195 {
196 auto* targetPin = link.getConnectedPin();
197 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
198 {
200 {
201 FlowAnimation::Add(link.linkId);
202 }
203
204 targetPin->queue.push_back(data);
205 LOG_DATA("{}: Waking up worker of node {}. New data on pin '{}'", nameId(), size_t(link.connectedNode->id), targetPin->name);
206 link.connectedNode->wakeWorker();
207 }
208 }
209 }
210}
211
212NAV::InputPin& NAV::Node::inputPinFromId(ax::NodeEditor::PinId pinId)
213{
214 for (auto& inputPin : inputPins)
215 {
216 if (pinId == inputPin.id) { return inputPin; }
217 }
218
219 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
220}
221
222NAV::OutputPin& NAV::Node::outputPinFromId(ax::NodeEditor::PinId pinId)
223{
224 for (auto& outputPin : outputPins)
225 {
226 if (pinId == outputPin.id) { return outputPin; }
227 }
228
229 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
230}
231
232size_t NAV::Node::inputPinIndexFromId(ax::NodeEditor::PinId pinId) const
233{
234 for (size_t i = 0; i < inputPins.size(); i++)
235 {
236 if (pinId == inputPins.at(i).id) { return i; }
237 }
238
239 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
240}
241
242size_t NAV::Node::outputPinIndexFromId(ax::NodeEditor::PinId pinId) const
243{
244 for (size_t i = 0; i < outputPins.size(); i++)
245 {
246 if (pinId == outputPins.at(i).id) { return i; }
247 }
248
249 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
250}
251
252NAV::InputPin* NAV::Node::CreateInputPin(const char* name, NAV::Pin::Type pinType, const std::vector<std::string>& dataIdentifier,
253 InputPin::Callback callback, InputPin::FlowFirableCheckFunc firable, int priority, int idx)
254{
255 LOG_TRACE("called for pin ({}) of type ({}) for node [{}]", name, std::string(pinType), nameId());
256 if (idx < 0)
257 {
258 idx = static_cast<int>(inputPins.size());
259 }
260 idx = std::min(idx, static_cast<int>(inputPins.size()));
261 auto iter = std::next(inputPins.begin(), idx);
262
263 inputPins.emplace(iter, flow::GetNextPinId(), name, pinType, this);
264
265 inputPins.at(static_cast<size_t>(idx)).callback = callback;
266 if (firable != nullptr)
267 {
268 inputPins.at(static_cast<size_t>(idx)).firable = firable;
269 }
270 inputPins.at(static_cast<size_t>(idx)).dataIdentifier = dataIdentifier;
271 inputPins.at(static_cast<size_t>(idx)).priority = priority;
272
274
275 return &inputPins.at(static_cast<size_t>(idx));
276}
277
278NAV::OutputPin* NAV::Node::CreateOutputPin(const char* name, NAV::Pin::Type pinType, const std::vector<std::string>& dataIdentifier, OutputPin::PinData data, int idx)
279{
280 LOG_TRACE("called for pin ({}) of type ({}) for node [{}]", name, std::string(pinType), nameId());
281 if (idx < 0)
282 {
283 idx = static_cast<int>(outputPins.size());
284 }
285 idx = std::min(idx, static_cast<int>(outputPins.size()));
286 auto iter = std::next(outputPins.begin(), idx);
287
288 outputPins.emplace(iter, flow::GetNextPinId(), name, pinType, this);
289
290 outputPins.at(static_cast<size_t>(idx)).data = data;
291 outputPins.at(static_cast<size_t>(idx)).dataIdentifier = dataIdentifier;
292
294
295 return &outputPins.at(static_cast<size_t>(idx));
296}
297
298bool NAV::Node::DeleteOutputPin(size_t pinIndex)
299{
300 auto& pin = outputPins.at(pinIndex);
301 LOG_TRACE("called for pin ({})", size_t(pin.id));
302
303 pin.deleteLinks();
304
305 pin.parentNode->outputPins.erase(pin.parentNode->outputPins.begin() + static_cast<int64_t>(pinIndex));
306
307 return true;
308}
309
310bool NAV::Node::DeleteInputPin(size_t pinIndex)
311{
312 auto& pin = inputPins.at(pinIndex);
313 LOG_TRACE("called for pin ({})", size_t(pin.id));
314
315 pin.deleteLink();
316
317 LOG_DEBUG("Erasing pin at idx {}", pinIndex);
318 pin.parentNode->inputPins.erase(pin.parentNode->inputPins.begin() + static_cast<int64_t>(pinIndex));
319
320 return true;
321}
322
323std::string NAV::Node::nameId() const
324{
325 return fmt::format("{} ({})", str::replaceAll_copy(name, "\n", ""), size_t(id));
326}
327
328const ImVec2& NAV::Node::getSize() const
329{
330 return _size;
331}
332
333std::string NAV::Node::toString(State state)
334{
335 switch (state)
336 {
337 case State::Disabled:
338 return "Disabled";
340 return "Deinitialized";
342 return "DoInitialize";
344 return "Initializing";
346 return "Initialized";
348 return "DoDeinitialize";
350 return "Deinitializing";
352 return "DoShutdown";
353 case State::Shutdown:
354 return "Shutdown";
355 }
356 return "";
357}
358
360{
361 std::scoped_lock lk(_stateMutex);
362 return _state;
363}
364
366{
367 return _mode;
368}
369
371{
372 std::unique_lock<std::mutex> lk(_stateMutex);
373 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
374
375 switch (_state)
376 {
378 lk.unlock();
379 return true;
380 case State::Disabled:
382 case State::Shutdown:
383 lk.unlock();
384 return false;
387 if (_reinitialize)
388 {
389 lk.unlock();
390 break;
391 }
392 lk.unlock();
393 return false;
396 lk.unlock();
397 break;
399 {
401 lk.unlock();
402 wakeWorker();
403 break;
404 }
405 }
406
407 if (wait)
408 {
409 std::unique_lock lk(_workerMutex);
410 _workerConditionVariable.wait(lk, [&, this] {
411 std::scoped_lock lks(_stateMutex);
413 });
414 std::scoped_lock lks(_stateMutex);
415 return _state == State::Initialized;
416 }
417 return true;
418}
419
421{
422 std::unique_lock<std::mutex> lk(_stateMutex);
423 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
424
425 switch (_state)
426 {
427 case State::Disabled:
429 case State::Shutdown:
430 lk.unlock();
431 return false;
434 _reinitialize = true;
435 lk.unlock();
436 break;
439 lk.unlock();
440 break;
442 lk.unlock();
443 return doInitialize(wait);
446 _reinitialize = true;
447 lk.unlock();
448 wakeWorker();
449 break;
450 }
451
452 if (wait)
453 {
454 std::unique_lock lk(_workerMutex);
455 _workerConditionVariable.wait(lk, [&, this] {
456 std::scoped_lock lks(_stateMutex);
458 });
459 std::scoped_lock lks(_stateMutex);
460 return _state == State::Initialized;
461 }
462 return true;
463}
464
466{
467 std::unique_lock<std::mutex> lk(_stateMutex);
468 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
469
470 switch (_state)
471 {
473 lk.unlock();
474 return true;
475 case State::Disabled:
477 case State::Shutdown:
480 lk.unlock();
481 return false;
484 lk.unlock();
485 break;
487 {
489 lk.unlock();
490 wakeWorker();
491 break;
492 }
493 }
494
495 if (wait)
496 {
497 std::unique_lock lk(_workerMutex);
498 _workerConditionVariable.wait(lk, [&, this] {
499 std::scoped_lock lks(_stateMutex);
501 });
502 }
503 return true;
504}
505
506bool NAV::Node::doDisable(bool wait)
507{
508 std::unique_lock<std::mutex> lk(_stateMutex);
509 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
510
511 switch (_state)
512 {
513 case State::Disabled:
514 lk.unlock();
515 return true;
517 case State::Shutdown:
519 lk.unlock();
520 return false;
524 _disable = true;
525 lk.unlock();
527 break;
530 {
532 lk.unlock();
533 break;
534 }
535 }
536
537 if (wait)
538 {
539 std::unique_lock lk(_workerMutex);
540 _workerConditionVariable.wait(lk, [&, this] {
541 std::scoped_lock lks(_stateMutex);
543 });
544 }
545 return true;
546}
547
549{
550 std::scoped_lock lk(_stateMutex);
551 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
552
553 if (_state == State::Disabled)
554 {
556 }
557 return true;
558}
559
561{
562 {
563 std::scoped_lock lk(_workerMutex);
564 _workerWakeup = true;
565 }
566 _workerConditionVariable.notify_all();
567}
568
570{
571 std::scoped_lock lk(_stateMutex);
572 return _state == State::Disabled;
573}
575{
576 std::scoped_lock lk(_stateMutex);
577 return _state == State::Initialized;
578}
580{
581 std::scoped_lock lk(_stateMutex);
582 switch (_state)
583 {
584 case State::Disabled:
587 return false;
589 case State::Shutdown:
594 return true;
595 }
596
597 return true;
598}
600{
601 return _onlyRealTime;
602}
603
605{
606 LOG_TRACE("{}: Worker thread started.", node->nameId());
607
608 std::unique_lock<std::mutex> lk(node->_stateMutex);
609 while ([]() { return true; }() && node->_state != State::Shutdown)
610 {
611 if (lk.owns_lock()) { lk.unlock(); }
612
613 if (std::unique_lock<std::mutex> lock(node->_stateMutex);
614 node->_state == State::DoShutdown)
615 {
616 LOG_TRACE("{}: Worker doing shutdown...", node->nameId());
617 node->_state = State::Shutdown;
618 lock.unlock();
619 node->_workerConditionVariable.notify_all();
620 break;
621 }
622 if (std::unique_lock<std::mutex> lock(node->_stateMutex);
624 {
625 LOG_TRACE("{}: Worker doing initialization...", node->nameId());
626 lock.unlock();
627 node->workerInitializeNode();
628 LOG_TRACE("{}: Worker finished initialization, notifying all waiting threads (state = {})", node->nameId(), Node::toString(node->_state));
629 node->_workerConditionVariable.notify_all();
630
631 lk.lock();
632 continue;
633 }
634 if (std::unique_lock<std::mutex> lock(node->_stateMutex);
636 {
637 LOG_TRACE("{}: Worker doing deinitialization...", node->nameId());
638 lock.unlock();
640 LOG_TRACE("{}: Worker finished deinitialization, notifying all waiting threads (state = {})", node->nameId(), Node::toString(node->_state));
641 node->_workerConditionVariable.notify_all();
642
643 lk.lock();
644 continue;
645 }
646
647 if (!node->isTransient())
648 {
649 // Wait for data or state change
650 LOG_DATA("{}: Worker going to sleep", node->nameId());
651 bool timeout = false;
652 {
653 std::unique_lock lk(node->_workerMutex);
654 timeout = !node->_workerConditionVariable.wait_for(lk, node->_workerTimeout, [node] { return node->_workerWakeup; });
655 node->_workerWakeup = false;
656 }
657 LOG_DATA("{}: Worker woke up", node->nameId());
658
659 if (node->isInitialized() && node->callbacksEnabled)
660 {
661 if (timeout && node->callbacksEnabled) // Timeout reached
662 {
663 node->workerTimeoutHandler();
664 }
665
666 // Check input pin for data and trigger callbacks
667 if (std::ranges::any_of(node->inputPins, [](const InputPin& inputPin) {
668 return inputPin.isPinLinked();
669 }))
670 {
671 while (node->isInitialized())
672 {
673 // -------------------------- Data processing on input non-flow pins -----------------------------
674 bool notifyTriggered = false;
675 for (size_t i = 0; i < node->inputPins.size(); i++)
676 {
677 auto& inputPin = node->inputPins[i];
678 if (inputPin.type != Pin::Type::Flow && !inputPin.queue.empty())
679 {
680 if (auto callback = std::get<InputPin::DataChangedNotifyFunc>(inputPin.callback))
681 {
682 LOG_DATA("{}: Invoking notify callback on input pin '{}'", node->nameId(), inputPin.name);
683 InsTime insTime = inputPin.queue.extract_front()->insTime;
684#ifdef TESTING
685 for (const auto& watcherCallback : inputPin.watcherCallbacks)
686 {
687 if (auto watcherCall = std::get<InputPin::DataChangedWatcherNotifyFunc>(watcherCallback))
688 {
689 std::invoke(watcherCall, node, insTime, i);
690 }
691 }
692#endif
693 std::invoke(callback, node, insTime, i);
694 notifyTriggered = true;
695 }
696 }
697 }
698 if (notifyTriggered) { continue; }
699
700 // ------------------------------ Process data on input flow pins --------------------------------
701 if (node->callbacksEnabled || node->_mode == Node::Mode::REAL_TIME)
702 {
703 LOG_DATA("{}: Checking for firable input pins", node->nameId());
704
705 if (node->_mode == Mode::POST_PROCESSING)
706 {
707 // Check if all input flow pins have data
708 bool allInputPinsHaveData = !node->inputPins.empty();
709 for (const auto& inputPin : node->inputPins)
710 {
711 if (inputPin.type == Pin::Type::Flow && inputPin.neededForTemporalQueueCheck && !inputPin.queueBlocked && inputPin.queue.empty())
712 {
713 if (auto* connectedPin = inputPin.link.getConnectedPin();
714 connectedPin && !connectedPin->noMoreDataAvailable)
715 {
716 allInputPinsHaveData = false;
717 break;
718 }
719 }
720 }
721 if (!allInputPinsHaveData)
722 {
723 LOG_DATA("{}: Not all pins have data for temporal sorting", node->nameId());
724 break;
725 }
726 LOG_DATA("{}: All pins have data for temporal sorting", node->nameId());
727 }
728
729 // Find pin with the earliest data
730 InsTime earliestTime;
731 size_t earliestInputPinIdx = 0;
732 int earliestInputPinPriority = -1000;
733 for (size_t i = 0; i < node->inputPins.size(); i++)
734 {
735 auto& inputPin = node->inputPins[i];
736 if (inputPin.type == Pin::Type::Flow && !inputPin.queue.empty()
737 && (earliestTime.empty()
738 || inputPin.queue.front()->insTime < earliestTime
739 || (inputPin.queue.front()->insTime == earliestTime && inputPin.priority > earliestInputPinPriority)))
740 {
741 earliestTime = inputPin.queue.front()->insTime;
742 earliestInputPinIdx = i;
743 earliestInputPinPriority = inputPin.priority;
744 }
745 }
746 if (earliestInputPinPriority == -1000) { break; }
747
748 auto& inputPin = node->inputPins[earliestInputPinIdx];
749 if (inputPin.firable && inputPin.firable(node, inputPin))
750 {
751 if (auto callback = std::get<InputPin::FlowFirableCallbackFunc>(inputPin.callback))
752 {
753 LOG_DATA("{}: Invoking callback on input pin '{}'", node->nameId(), inputPin.name);
754#ifdef TESTING
755 for (const auto& watcherCallback : inputPin.watcherCallbacks)
756 {
757 if (auto watcherCall = std::get<InputPin::FlowFirableWatcherCallbackFunc>(watcherCallback))
758 {
759 std::invoke(watcherCall, node, inputPin.queue, earliestInputPinIdx);
760 }
761 }
762#endif
763 std::invoke(callback, node, inputPin.queue, earliestInputPinIdx);
764 }
765 }
766 else if (inputPin.dropQueueIfNotFirable)
767 {
768 LOG_DATA("{}: Dropping message on input pin '{}'", node->nameId(), inputPin.name);
769 inputPin.queue.pop_front();
770 }
771 else
772 {
773 LOG_DATA("{}: Skipping message on input pin '{}'", node->nameId(), inputPin.name);
774 break; // Do not drop an item, but put the worker to sleep
775 }
776 }
777 else
778 {
779 break;
780 }
781 }
782 }
783
784 // Post-processing (FileReader/Simulator)
785 if (!node->pollEvents.empty())
786 {
787 std::multimap<InsTime, std::pair<OutputPin*, size_t>>::iterator it;
788 while (it = node->pollEvents.begin(), it != node->pollEvents.end() && node->isInitialized() && node->callbacksEnabled)
789 {
790 OutputPin* outputPin = it->second.first;
791 size_t outputPinIdx = it->second.second;
792 Node* node = outputPin->parentNode;
793
794 if (std::holds_alternative<OutputPin::PollDataFunc>(outputPin->data))
795 {
796 auto* callback = std::get_if<OutputPin::PollDataFunc>(&outputPin->data);
797 if (callback != nullptr && *callback != nullptr)
798 {
799 LOG_DATA("{}: Polling data from output pin '{}'", node->nameId(), str::replaceAll_copy(outputPin->name, "\n", ""));
800 if ((node->**callback)() == nullptr)
801 {
802 node->pollEvents.erase(it); // Delete the event if no more data on this pin
803 break;
804 }
805 }
806 }
807 else if (std::holds_alternative<OutputPin::PeekPollDataFunc>(outputPin->data))
808 {
809 auto* callback = std::get_if<OutputPin::PeekPollDataFunc>(&outputPin->data);
810 if (callback != nullptr && *callback != nullptr)
811 {
812 if (!it->first.empty())
813 {
814 LOG_DATA("{}: Polling data from output pin '{}'", node->nameId(), str::replaceAll_copy(outputPin->name, "\n", ""));
815 // Trigger the already peeked observation and invoke it's callbacks (peek = false)
816 if ((node->**callback)(outputPinIdx, false) == nullptr)
817 {
818 LOG_ERROR("{}: {} could not poll its observation despite being able to peek it.", node->nameId(), outputPin->name);
819 }
820 }
821
822 // Check if data available (peek = true)
823 if (auto obs = (node->**callback)(outputPinIdx, true))
824 {
825 // Check if data has a time
826 if (!obs->insTime.empty())
827 {
828 node->pollEvents.insert(std::make_pair(obs->insTime, std::make_pair(outputPin, outputPinIdx)));
829 }
830 else // If no time, call the object and remove it
831 {
832 (node->**callback)(outputPinIdx, false);
833 continue; // Do not erase the iterator, because this pin needs to be called again
834 }
835 }
836 else // nullptr -> no more data incoming on this pin
837 {
838 LOG_TRACE("{}: Output Pin finished: {}", node->nameId(), outputPin->name);
839 outputPin->noMoreDataAvailable = true;
840 for (auto& link : outputPin->links)
841 {
842 link.connectedNode->wakeWorker();
843 }
844 }
845 }
846 else
847 {
848 LOG_ERROR("{} - {}: Callback is not valid anymore", node->nameId(), size_t(outputPin->id));
849 }
850 node->pollEvents.erase(it);
851 }
852 }
853
854 if (node->pollEvents.empty())
855 {
856 LOG_TRACE("{}: Finished polling all pins.", node->nameId());
857
858 node->callbacksEnabled = false;
859 for (auto& outputPin : node->outputPins)
860 {
861 if (!outputPin.noMoreDataAvailable)
862 {
863 LOG_TRACE("{}: Output Pin finished: {}", node->nameId(), outputPin.name);
864 outputPin.noMoreDataAvailable = true;
865 for (auto& link : outputPin.links)
866 {
867 link.connectedNode->wakeWorker();
868 }
869 }
870 }
873 }
874 }
875 }
876
877 // Check if node finished
878 if (node->_mode == Mode::POST_PROCESSING)
879 {
880 if (std::ranges::all_of(node->inputPins, [](const InputPin& inputPin) {
881 return inputPin.type != Pin::Type::Flow || !inputPin.isPinLinked() || inputPin.link.connectedNode->isDisabled()
882 || !inputPin.link.getConnectedPin()->blocksConnectedNodeFromFinishing
883 || (inputPin.queue.empty() && inputPin.link.getConnectedPin()->noMoreDataAvailable);
884 }))
885 {
886 LOG_TRACE("{}: Node finished", node->nameId());
887 node->callbacksEnabled = false;
888 for (auto& outputPin : node->outputPins)
889 {
890 LOG_TRACE("{}: Output Pin finished: {}", node->nameId(), outputPin.name);
891 outputPin.noMoreDataAvailable = true;
892 for (auto& link : outputPin.links)
893 {
894 link.connectedNode->wakeWorker();
895 }
896 }
899 }
900 }
901 }
902 }
903
904 LOG_TRACE("{}: Worker thread ended.", node->nameId());
905}
906
908{
909 LOG_TRACE("{}: called", nameId());
910 {
911 std::scoped_lock lk(_stateMutex);
912 INS_ASSERT_USER_ERROR(_state == State::DoInitialize, fmt::format("Worker can only initialize the node if the state is set to DoInitialize, but it is {}.", toString(_state)).c_str());
914 }
916
917 LOG_DEBUG("{}: Initializing Node", nameId());
918
919 // Initialize Nodes connected to the input pins
920 for (const auto& inputPin : inputPins)
921 {
922 if (inputPin.type != Pin::Type::Flow)
923 {
924 if (Node* connectedNode = inputPin.link.connectedNode)
925 {
926 if (!connectedNode->isInitialized())
927 {
928 LOG_DEBUG("{}: Initializing connected Node '{}' on input Pin {}", nameId(), connectedNode->nameId(), size_t(inputPin.id));
929 if (!connectedNode->doInitialize(true))
930 {
931 LOG_ERROR("{}: Could not initialize connected node {}", nameId(), connectedNode->nameId());
932 std::scoped_lock lk(_stateMutex);
934 {
936 }
937 return false;
938 }
939 }
940 }
941 }
942 }
943
944 _reinitialize = false;
945
946 // Initialize the node itself
947 LOG_TRACE("{}: calling initialize()", nameId());
948 bool initSucceeded = false;
949 {
950 std::scoped_lock<std::mutex> guard(_configWindowMutex);
951 initSucceeded = initialize();
952 }
953 if (initSucceeded)
954 {
955 LOG_TRACE("{}: initialize() was successful", nameId());
956
957 for (auto& inputPin : inputPins)
958 {
959 inputPin.queue.clear();
960 inputPin.queueBlocked = false;
961 }
962
963 pollEvents.clear();
964 LOG_TRACE("{}: calling resetNode()", nameId());
965 {
966 std::scoped_lock<std::mutex> guard(_configWindowMutex);
967 resetNode();
968 }
969 LOG_TRACE("{}: resetNode() was successful", nameId());
970 for (auto& outputPin : outputPins)
971 {
972 outputPin.noMoreDataAvailable = true;
973 for (auto& link : outputPin.links)
974 {
975 LOG_TRACE("{}: Waking connected node '{}'", nameId(), link.connectedNode->nameId());
976 link.connectedNode->wakeWorker();
977 }
978 }
979
980 std::scoped_lock lk(_stateMutex);
982 {
984 }
985 return true;
986 }
987
988 LOG_TRACE("{}: initialize() failed", nameId());
989 std::scoped_lock lk(_stateMutex);
991 {
993 }
994 return false;
995}
996
998{
999 LOG_TRACE("{}: called", nameId());
1000 {
1001 std::scoped_lock lk(_stateMutex);
1002 INS_ASSERT_USER_ERROR(_state == State::DoDeinitialize, fmt::format("Worker can only deinitialize the node if the state is set to DoDeinitialize, but it is {}.", toString(_state)).c_str());
1003 {
1005 }
1006 }
1007 LOG_DEBUG("{}: Deinitializing Node", nameId());
1008
1009 callbacksEnabled = false;
1010
1011 // Re-/Deinitialize Nodes connected to the output pins
1012 for (const auto& outputPin : outputPins)
1013 {
1014 if (outputPin.type != Pin::Type::Flow)
1015 {
1016 for (const auto& link : outputPin.links)
1017 {
1018 if (link.connectedNode->isInitialized())
1019 {
1020 LOG_DEBUG("{}: {} connected Node '{}' on output Pin {}", nameId(),
1021 _reinitialize ? "Reinitializing" : "Deinitializing", link.connectedNode->nameId(), size_t(outputPin.id));
1022 if (_reinitialize) { link.connectedNode->doReinitialize(); }
1023 else { link.connectedNode->doDeinitialize(); }
1024 }
1025 }
1026 }
1027 }
1028
1029 // Deinitialize the node itself
1030 {
1031 std::scoped_lock<std::mutex> guard(_configWindowMutex);
1032 deinitialize();
1033 }
1034
1035 std::scoped_lock lk(_stateMutex);
1037 {
1038 if (_disable)
1039 {
1041 }
1042 else if (_reinitialize)
1043 {
1045 }
1046 else
1047 {
1049 }
1050 }
1051
1052 return true;
1053}
1054
1056{
1057 LOG_TRACE("{}: called", nameId());
1058}
1059
1060void NAV::to_json(json& j, const Node& node)
1061{
1062 ImVec2 realSize = ed::GetNodeSize(node.id);
1063 realSize.x -= 16;
1064 realSize.y -= 38.0F;
1065 j = json{
1066 { "id", size_t(node.id) },
1067 { "type", node.type() },
1068 { "kind", std::string(node.kind) },
1069 { "name", node.name },
1070 { "size", node._size.x == 0 && node._size.y == 0 ? node._size : realSize },
1071 { "pos", ed::GetNodePosition(node.id) },
1072 { "enabled", !node.isDisabled() },
1073 { "inputPins", node.inputPins },
1074 { "outputPins", node.outputPins },
1075 };
1076}
1077void NAV::from_json(const json& j, Node& node)
1078{
1079 node.id = j.at("id").get<size_t>();
1080 if (j.contains("kind"))
1081 {
1082 node.kind = Node::Kind(j.at("kind").get<std::string>());
1083 }
1084 if (j.contains("name"))
1085 {
1086 j.at("name").get_to(node.name);
1087 }
1088 if (j.contains("size"))
1089 {
1090 j.at("size").get_to(node._size);
1091 if (node.kind == Node::Kind::GroupBox && gui::NodeEditorApplication::isUsingBigDefaultFont())
1092 {
1093 node._size.y -= 20;
1094 }
1095 }
1096 if (j.contains("enabled"))
1097 {
1098 bool enabled = j.at("enabled").get<bool>();
1099 if (!enabled)
1100 {
1101 std::scoped_lock lk(node._stateMutex);
1103 }
1104 }
1105
1106 if (j.contains("inputPins"))
1107 {
1108 auto inputPins = j.at("inputPins").get<std::vector<InputPin>>();
1109 for (size_t i = 0; i < inputPins.size(); ++i)
1110 {
1111 if (node.inputPins.size() <= i)
1112 {
1113 break;
1114 }
1115 j.at("inputPins").at(i).get_to(node.inputPins.at(i));
1116 }
1117 }
1118
1119 if (j.contains("outputPins"))
1120 {
1121 auto outputPins = j.at("outputPins").get<std::vector<OutputPin>>();
1122 for (size_t i = 0; i < outputPins.size(); ++i)
1123 {
1124 if (node.outputPins.size() <= i)
1125 {
1126 break;
1127 }
1128 j.at("outputPins").at(i).get_to(node.outputPins.at(i));
1129 }
1130 }
1131}
Assertion helpers.
#define INS_ASSERT_USER_ERROR(_EXP, _MSG)
Assert function with message.
Definition Assert.h:21
Handles Flow animations.
Flow Executor Thread.
Save/Load the Nodes.
nlohmann::json json
json namespace
Defines how to save certain datatypes to json.
Utility class for logging to console and file.
#define LOG_DEBUG
Debug information. Should not be called on functions which receive observations (spamming)
Definition Logger.hpp:67
#define LOG_DATA
All output which occurs repeatedly every time observations are received.
Definition Logger.hpp:29
#define LOG_ERROR
Error occurred, which stops part of the program to work, but not everything.
Definition Logger.hpp:73
#define LOG_TRACE
Detailled info to trace the execution of the program. Should not be called on functions which receive...
Definition Logger.hpp:65
Node Class.
Utility functions for working with std::strings.
Input pins of nodes.
Definition Pin.hpp:491
IncomingLink link
Info to identify the linked pin.
Definition Pin.hpp:704
Callback callback
Callback to call when the node is firable or when it should be notified of data change.
Definition Pin.hpp:722
FlowFirableCheckFunc firable
Function to check if the callback is firable.
Definition Pin.hpp:728
std::variant< FlowFirableCallbackFunc, DataChangedNotifyFunc > Callback
Callback function types.
Definition Pin.hpp:718
bool(*)(const Node *, const InputPin &) FlowFirableCheckFunc
Function type to call when checking if a pin is firable.
Definition Pin.hpp:725
bool queueBlocked
If true no more messages are accepted to the queue.
Definition Pin.hpp:740
bool neededForTemporalQueueCheck
Whether it should be checked for temporal ordering.
Definition Pin.hpp:734
int priority
Priority when checking firable condition related to other pins (higher priority gets triggered first)
Definition Pin.hpp:731
NodeDataQueue queue
Queue with received data.
Definition Pin.hpp:743
bool dropQueueIfNotFirable
If true, drops elements from the queue if not firable, otherwise sleeps the worker.
Definition Pin.hpp:737
The class is responsible for all time-related tasks.
Definition InsTime.hpp:710
constexpr bool empty() const
Checks if the Time object has a value.
Definition InsTime.hpp:1089
Abstract parent class for all nodes.
Definition Node.hpp:92
bool _workerWakeup
Variable to prevent the worker from sleeping.
Definition Node.hpp:569
bool isDisabled() const
Checks if the node is disabled.
Definition Node.cpp:569
bool isOnlyRealtime() const
Checks if the node is only working in real time (sensors, network interfaces, ...)
Definition Node.cpp:599
bool isInitialized() const
Checks if the node is initialized.
Definition Node.cpp:574
bool doDeinitialize(bool wait=false)
Asks the node worker to deinitialize the node.
Definition Node.cpp:465
static void workerThread(Node *node)
Worker thread.
Definition Node.cpp:604
virtual void restore(const json &j)
Restores the node from a json object.
Definition Node.cpp:61
void releaseInputValue(size_t portIndex)
Unblocks the connected node. Has to be called when the input value should be released and getInputVal...
Definition Node.cpp:147
bool _reinitialize
Flag if the node should be reinitialize after deinitializing.
Definition Node.hpp:541
virtual void workerTimeoutHandler()
Handler which gets triggered if the worker runs into a periodic timeout.
Definition Node.cpp:1055
bool _disable
Flag if the node should be disabled after deinitializing.
Definition Node.hpp:544
bool doDisable(bool wait=false)
Asks the node worker to disable the node.
Definition Node.cpp:506
State
Possible states of the node.
Definition Node.hpp:176
@ DoInitialize
Node should be initialized.
Definition Node.hpp:179
@ Shutdown
Node is shutting down.
Definition Node.hpp:185
@ DoShutdown
Node should shut down.
Definition Node.hpp:184
@ Initializing
Node is currently initializing.
Definition Node.hpp:180
@ Initialized
Node is initialized (green)
Definition Node.hpp:181
@ DoDeinitialize
Node should be deinitialized.
Definition Node.hpp:182
@ Disabled
Node is disabled and won't be initialized.
Definition Node.hpp:177
@ Deinitializing
Node is currently deinitializing.
Definition Node.hpp:183
@ Deinitialized
Node is deinitialized (red)
Definition Node.hpp:178
void wakeWorker()
Wakes the worker thread.
Definition Node.cpp:560
ImVec2 _size
Size of the node in pixels.
Definition Node.hpp:560
std::mutex _configWindowMutex
Mutex to show the config window (prevents initialization to modify values within the config window)
Definition Node.hpp:550
State getState() const
Get the current state of the node.
Definition Node.cpp:359
bool workerInitializeNode()
Called by the worker to initialize the node.
Definition Node.cpp:907
void notifyOutputValueChanged(size_t pinIdx, const InsTime &insTime, const std::scoped_lock< std::mutex > &&guard)
Notifies connected nodes about the change.
Definition Node.cpp:92
const ImVec2 & getSize() const
Get the size of the node.
Definition Node.cpp:328
bool hasInputPinWithSameTime(const InsTime &insTime) const
Checks wether there is an input pin with the same time.
Definition Node.cpp:172
virtual void flush()
Function called by the flow executer after finishing to flush out remaining data.
Definition Node.cpp:72
std::vector< OutputPin > outputPins
List of output pins.
Definition Node.hpp:511
Node(std::string name)
Constructor.
Definition Node.cpp:29
bool DeleteOutputPin(size_t pinIndex)
Deletes the output pin. Invalidates the pin reference given.
Definition Node.cpp:298
bool workerDeinitializeNode()
Called by the worker to deinitialize the node.
Definition Node.cpp:997
std::multimap< InsTime, std::pair< OutputPin *, size_t > > pollEvents
Map with callback events (sorted by time)
Definition Node.hpp:517
Kind kind
Kind of the Node.
Definition Node.hpp:505
size_t outputPinIndexFromId(ax::NodeEditor::PinId pinId) const
Returns the index of the pin.
Definition Node.cpp:242
std::vector< InputPin > inputPins
List of input pins.
Definition Node.hpp:509
OutputPin * CreateOutputPin(const char *name, Pin::Type pinType, const std::vector< std::string > &dataIdentifier, OutputPin::PinData data=static_cast< void * >(nullptr), int idx=-1)
Create an Output Pin object.
Definition Node.cpp:278
OutputPin & outputPinFromId(ax::NodeEditor::PinId pinId)
Returns the pin with the given id.
Definition Node.cpp:222
virtual void deinitialize()
Deinitialize the Node.
Definition Node.cpp:70
bool doInitialize(bool wait=false)
Asks the node worker to initialize the node.
Definition Node.cpp:370
State _state
Current state of the node.
Definition Node.hpp:534
Mode
Different Modes the Node can work in.
Definition Node.hpp:190
@ POST_PROCESSING
Node running in post-processing mode.
Definition Node.hpp:192
@ REAL_TIME
Node running in real-time mode.
Definition Node.hpp:191
bool callbacksEnabled
Enables the callbacks.
Definition Node.hpp:514
std::atomic< Mode > _mode
Mode the node is currently running in.
Definition Node.hpp:538
virtual bool resetNode()
Resets the node. It is guaranteed that the node is initialized when this is called.
Definition Node.cpp:74
std::string nameId() const
Node name and id.
Definition Node.cpp:323
virtual void afterCreateLink(OutputPin &startPin, InputPin &endPin)
Called when a new link was established.
Definition Node.cpp:88
std::thread _worker
Worker handling initialization and processing of data.
Definition Node.hpp:566
size_t inputPinIndexFromId(ax::NodeEditor::PinId pinId) const
Returns the index of the pin.
Definition Node.cpp:232
virtual void onDeleteLink(OutputPin &startPin, InputPin &endPin)
Called when a link is to be deleted.
Definition Node.cpp:86
virtual ~Node()
Destructor.
Definition Node.cpp:39
Mode getMode() const
Get the current mode of the node.
Definition Node.cpp:365
std::mutex _workerMutex
Mutex to interact with the worker condition variable.
Definition Node.hpp:567
virtual void guiConfig()
ImGui config window which is shown on double click.
Definition Node.cpp:57
std::string name
Name of the Node.
Definition Node.hpp:507
bool _onlyRealTime
Whether the node can run in post-processing or only real-time.
Definition Node.hpp:531
bool doEnable()
Enable the node.
Definition Node.cpp:548
bool doReinitialize(bool wait=false)
Asks the node worker to reinitialize the node.
Definition Node.cpp:420
std::chrono::duration< int64_t > _workerTimeout
Periodic timeout of the worker to check if new data available.
Definition Node.hpp:565
std::condition_variable _workerConditionVariable
Condition variable to signal the worker thread to do something.
Definition Node.hpp:568
InputPin * CreateInputPin(const char *name, Pin::Type pinType, const std::vector< std::string > &dataIdentifier={}, InputPin::Callback callback=static_cast< InputPin::FlowFirableCallbackFunc >(nullptr), InputPin::FlowFirableCheckFunc firable=nullptr, int priority=0, int idx=-1)
Create an Input Pin object.
Definition Node.cpp:252
virtual void restoreAtferLink(const json &j)
Restores link related properties of the node from a json object.
Definition Node.cpp:63
virtual void afterDeleteLink(OutputPin &startPin, InputPin &endPin)
Called when a link was deleted.
Definition Node.cpp:90
InputPin & inputPinFromId(ax::NodeEditor::PinId pinId)
Returns the pin with the given id.
Definition Node.cpp:212
std::scoped_lock< std::mutex > requestOutputValueLock(size_t pinIdx)
Blocks the thread till the output values was read by all connected nodes.
Definition Node.cpp:132
void invokeCallbacks(size_t portIndex, const std::shared_ptr< const NodeData > &data)
Calls all registered callbacks on the specified output port.
Definition Node.cpp:179
virtual json save() const
Saves the node into a json object.
Definition Node.cpp:59
static std::string toString(State state)
Converts the state into a printable text.
Definition Node.cpp:333
static bool _autostartWorker
Flag which prevents the worker to be autostarted if false.
Definition Node.hpp:563
ax::NodeEditor::NodeId id
Unique Id of the Node.
Definition Node.hpp:503
bool DeleteInputPin(size_t pinIndex)
Deletes the input pin. Invalidates the pin reference given.
Definition Node.cpp:310
virtual bool onCreateLink(OutputPin &startPin, InputPin &endPin)
Called when a new link is to be established.
Definition Node.cpp:81
virtual std::string type() const =0
String representation of the Class Type.
std::mutex _stateMutex
Mutex to interact with the worker state variable.
Definition Node.hpp:535
bool isTransient() const
Checks if the node is changing its state currently.
Definition Node.cpp:579
virtual bool initialize()
Initialize the Node.
Definition Node.cpp:65
Output pins of nodes.
Definition Pin.hpp:338
std::variant< const void *, const bool *, const int *, const float *, const double *, const std::string *, PeekPollDataFunc, PollDataFunc > PinData
Possible Types represented by an output pin.
Definition Pin.hpp:448
PinData data
Pointer to data (owned by this node) which is transferred over this pin.
Definition Pin.hpp:458
std::vector< OutgoingLink > links
Info to identify the linked pins.
Definition Pin.hpp:433
std::atomic< bool > noMoreDataAvailable
Flag set, when no more data is available on this pin.
Definition Pin.hpp:470
Node * parentNode
Reference to the parent node.
Definition Pin.hpp:307
std::string name
Name of the Pin.
Definition Pin.hpp:299
ax::NodeEditor::PinId id
Unique Id of the Pin.
Definition Pin.hpp:297
Type type
Type of the Pin.
Definition Pin.hpp:301
auto & front()
Returns a reference to the first element in the container.
Definition TsDeque.hpp:198
auto extract_front()
Returns a copy of the first element in the container and removes it from the container.
Definition TsDeque.hpp:494
bool empty() const noexcept
Checks if the container has no elements, i.e. whether 'begin() == end()'.
Definition TsDeque.hpp:275
void pop_front()
Removes the first element of the container. If there are no elements in the container,...
Definition TsDeque.hpp:461
static bool showFlowWhenInvokingCallbacks
Flag if invokeCallbacks triggers a GUI Flow event.
static bool showFlowWhenNotifyingValueChange
Flag if notifyOutputValueChanged & notifyInputValueChanged triggers a GUI Flow event.
void Add(ax::NodeEditor::LinkId id, ax::NodeEditor::FlowDirection direction=ax::NodeEditor::FlowDirection::Forward)
Thread safe flow animaton of links.
void deregisterNode(const Node *node)
Called by nodes when they finished with sending data.
ax::NodeEditor::PinId GetNextPinId()
Generates a new pin id.
void ApplyChanges()
Signals that there have been changes to the flow.
static std::string replaceAll_copy(std::string str, const std::string &from, const std::string &to, CaseSensitivity cs)
Replaces all occurrence of a search pattern with another sequence.
void to_json(json &j, const Node &node)
Converts the provided node into a json object.
Definition Node.cpp:1060
void move(std::vector< T > &v, size_t sourceIdx, size_t targetIdx)
Moves an element within a vector to a new position.
Definition Vector.hpp:27
void from_json(const json &j, Node &node)
Converts the provided json object into a node object.
Definition Node.cpp:1077
Kind information class.
Definition Node.hpp:96
@ GroupBox
Group box which can group other nodes and drag them together.
Definition Node.hpp:102
Type of the data on the Pin.
Definition Pin.hpp:47
@ Flow
NodeData Trigger.
Definition Pin.hpp:52