0.3.0
Loading...
Searching...
No Matches
Node.cpp
Go to the documentation of this file.
1// This file is part of INSTINCT, the INS Toolkit for Integrated
2// Navigation Concepts and Training by the Institute of Navigation of
3// the University of Stuttgart, Germany.
4//
5// This Source Code Form is subject to the terms of the Mozilla Public
6// License, v. 2.0. If a copy of the MPL was not distributed with this
7// file, You can obtain one at https://mozilla.org/MPL/2.0/.
8
9#include "Node.hpp"
10
11#include <stdexcept>
12
13#include "util/Logger.hpp"
14#include "util/StringUtil.hpp"
15#include "util/Assert.h"
16
20namespace nm = NAV::NodeManager;
21#include "util/Json.hpp"
23
24#include <imgui_node_editor.h>
25namespace ed = ax::NodeEditor;
26#ifdef TESTING
27 #include <catch2/catch_test_macros.hpp>
28#endif
29
31 : name(std::move(name))
32{
33 LOG_TRACE("{}: called", nameId());
35 {
36 _worker = std::thread(workerThread, this);
37 }
38}
39
41{
42 LOG_TRACE("{}: called", nameId());
43
45 {
47 wakeWorker();
48
49 // // wait for the worker
50 // {
51 // std::unique_lock lk(_workerMutex);
52 // _workerConditionVariable.wait(lk, [&, this] { return _state == State::Shutdown; });
53 // }
54 _worker.join();
55 }
56}
57
59
60json NAV::Node::save() const { return {}; }
61
62void NAV::Node::restore(const json& /*j*/) {}
63
64void NAV::Node::restoreAtferLink(const json& /*j*/) {}
65
67{
68 return true;
69}
70
72
74
76{
77 LOG_TRACE("{}: called", nameId());
78
79 return initialize();
80}
81
82bool NAV::Node::onCreateLink(OutputPin& /*startPin*/, InputPin& /*endPin*/)
83{
84 return true;
85}
86
87void NAV::Node::onDeleteLink(OutputPin& /*startPin*/, InputPin& /*endPin*/) {}
88
89void NAV::Node::afterCreateLink(OutputPin& /*startPin*/, InputPin& /*endPin*/) {}
90
91void NAV::Node::afterDeleteLink(OutputPin& /*startPin*/, InputPin& /*endPin*/) {}
92
93void NAV::Node::notifyOutputValueChanged(size_t pinIdx, const InsTime& insTime, const std::scoped_lock<std::mutex>&& /* guard */)
94{
96 {
97 auto& outputPin = outputPins.at(pinIdx);
98
99 if (!outputPin.isPinLinked()) { return; }
100
101 for (auto& link : outputPin.links)
102 {
103 auto* targetPin = link.getConnectedPin();
104 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
105 {
106 outputPin.dataAccessCounter++;
107 link.dataChangeNotification = true;
108 LOG_DATA("{}: Increasing data access counter on output pin '{}'. Value now {}.", nameId(), outputPin.name, outputPin.dataAccessCounter);
109
111 {
112 FlowAnimation::Add(link.linkId);
113 }
114
115 auto data = std::make_shared<NodeData>();
116 data->insTime = insTime;
117
118 targetPin->queue.push_back(data);
119 }
120 }
121 for (const auto& link : outputPin.links)
122 {
123 auto* targetPin = link.getConnectedPin();
124 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
125 {
126 LOG_DATA("{}: Waking up worker of node '{}'. New data on pin '{}'", nameId(), link.connectedNode->nameId(), targetPin->name);
127 link.connectedNode->wakeWorker();
128 }
129 }
130 }
131}
132
133std::scoped_lock<std::mutex> NAV::Node::requestOutputValueLock(size_t pinIdx)
134{
135 auto& outputPin = outputPins.at(pinIdx);
136 {
137 std::unique_lock<std::mutex> lk(outputPin.dataAccessMutex);
138 if (outputPin.dataAccessCounter > 0)
139 {
140 LOG_DATA("{}: Requesting lock on output pin '{}', {} threads accessing still.", nameId(), outputPin.name, outputPin.dataAccessCounter);
141 outputPin.dataAccessConditionVariable.wait(lk, [&outputPin]() { return outputPin.dataAccessCounter == 0; });
142 LOG_DATA("{}: Lock on output pin '{}' acquired.", nameId(), outputPin.name);
143 }
144 }
145 return std::scoped_lock(outputPin.dataAccessMutex);
146}
147
148void NAV::Node::releaseInputValue(size_t portIndex)
149{
150 if (OutputPin* outputPin = inputPins.at(portIndex).link.getConnectedPin())
151 {
152 std::scoped_lock<std::mutex> lk(outputPin->dataAccessMutex);
153 if (outputPin->dataAccessCounter > 0)
154 {
155 auto outgoingLink = std::ranges::find_if(outputPin->links, [&](const OutputPin::OutgoingLink& link) {
156 return link.connectedPinId == inputPins.at(portIndex).id;
157 });
158 if (outgoingLink != outputPin->links.end() && outgoingLink->dataChangeNotification)
159 {
160 outgoingLink->dataChangeNotification = false;
161 outputPin->dataAccessCounter--;
162
163 if (outputPin->dataAccessCounter == 0)
164 {
165 LOG_DATA("{}: Notifying node '{}' connected to pin {} that all data is read.", nameId(), outputPin->parentNode->nameId(), outputPin->name);
166 outputPin->dataAccessConditionVariable.notify_all();
167 }
168 }
169 }
170 }
171}
172
174{
175 return std::ranges::any_of(inputPins, [&insTime](const InputPin& pin) {
176 return !pin.queue.empty() && pin.queue.front()->insTime == insTime;
177 });
178}
179
180void NAV::Node::invokeCallbacks(size_t portIndex, const std::shared_ptr<const NAV::NodeData>& data)
181{
183 {
184 if (data == nullptr)
185 {
186 LOG_DEBUG("{}: Tried to invokeCallbacks on pin {} with a nullptr, which is not allowed!!!", nameId(), portIndex);
187 return;
188 }
189 if (data->insTime.empty())
190 {
191 LOG_DATA("{}: Tried to invokeCallbacks on pin {} without a InsTime. The time is mandatory though!!! ", nameId(), portIndex);
192 return;
193 }
194
195 for (const auto& link : outputPins.at(portIndex).links)
196 {
197 auto* targetPin = link.getConnectedPin();
198 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
199 {
201 {
202 FlowAnimation::Add(link.linkId);
203 }
204
205 targetPin->queue.push_back(data);
206 LOG_DATA("{}: Waking up worker of node {}. New data on pin '{}'", nameId(), size_t(link.connectedNode->id), targetPin->name);
207 link.connectedNode->wakeWorker();
208 }
209 }
210 }
211}
212
213NAV::InputPin& NAV::Node::inputPinFromId(ax::NodeEditor::PinId pinId)
214{
215 for (auto& inputPin : inputPins)
216 {
217 if (pinId == inputPin.id) { return inputPin; }
218 }
219
220 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
221}
222
223NAV::OutputPin& NAV::Node::outputPinFromId(ax::NodeEditor::PinId pinId)
224{
225 for (auto& outputPin : outputPins)
226 {
227 if (pinId == outputPin.id) { return outputPin; }
228 }
229
230 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
231}
232
233size_t NAV::Node::inputPinIndexFromId(ax::NodeEditor::PinId pinId) const
234{
235 for (size_t i = 0; i < inputPins.size(); i++)
236 {
237 if (pinId == inputPins.at(i).id) { return i; }
238 }
239
240 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
241}
242
243size_t NAV::Node::outputPinIndexFromId(ax::NodeEditor::PinId pinId) const
244{
245 for (size_t i = 0; i < outputPins.size(); i++)
246 {
247 if (pinId == outputPins.at(i).id) { return i; }
248 }
249
250 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
251}
252
253std::string NAV::Node::nameId() const
254{
255 return fmt::format("{} ({})", str::replaceAll_copy(name, "\n", ""), size_t(id));
256}
257
258const ImVec2& NAV::Node::getSize() const
259{
260 return _size;
261}
262
263std::string NAV::Node::toString(State state)
264{
265 switch (state)
266 {
267 case State::Disabled:
268 return "Disabled";
270 return "Deinitialized";
272 return "DoInitialize";
274 return "Initializing";
276 return "Initialized";
278 return "DoDeinitialize";
280 return "Deinitializing";
282 return "DoShutdown";
283 case State::Shutdown:
284 return "Shutdown";
285 }
286 return "";
287}
288
290{
291 std::scoped_lock lk(_stateMutex);
292 return _state;
293}
294
296{
297 return _mode;
298}
299
301{
302 std::unique_lock<std::mutex> lk(_stateMutex);
303 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
304
305 switch (_state)
306 {
308 lk.unlock();
309 return true;
310 case State::Disabled:
312 case State::Shutdown:
313 lk.unlock();
314 return false;
317 if (_reinitialize)
318 {
319 lk.unlock();
320 break;
321 }
322 lk.unlock();
323 return false;
326 lk.unlock();
327 break;
329 {
331 lk.unlock();
332 wakeWorker();
333 break;
334 }
335 }
336
337 if (wait)
338 {
339 std::unique_lock lk(_workerMutex);
340 _workerConditionVariable.wait(lk, [&, this] {
341 std::scoped_lock lks(_stateMutex);
343 });
344 std::scoped_lock lks(_stateMutex);
345 return _state == State::Initialized;
346 }
347 return true;
348}
349
351{
352 std::unique_lock<std::mutex> lk(_stateMutex);
353 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
354
355 switch (_state)
356 {
357 case State::Disabled:
359 case State::Shutdown:
360 lk.unlock();
361 return false;
364 _reinitialize = true;
365 lk.unlock();
366 break;
369 lk.unlock();
370 break;
372 lk.unlock();
373 return doInitialize(wait);
376 _reinitialize = true;
377 lk.unlock();
378 wakeWorker();
379 break;
380 }
381
382 if (wait)
383 {
384 std::unique_lock lk(_workerMutex);
385 _workerConditionVariable.wait(lk, [&, this] {
386 std::scoped_lock lks(_stateMutex);
388 });
389 std::scoped_lock lks(_stateMutex);
390 return _state == State::Initialized;
391 }
392 return true;
393}
394
396{
397 std::unique_lock<std::mutex> lk(_stateMutex);
398 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
399
400 switch (_state)
401 {
403 lk.unlock();
404 return true;
405 case State::Disabled:
407 case State::Shutdown:
410 lk.unlock();
411 return false;
414 lk.unlock();
415 break;
417 {
419 lk.unlock();
420 wakeWorker();
421 break;
422 }
423 }
424
425 if (wait)
426 {
427 std::unique_lock lk(_workerMutex);
428 _workerConditionVariable.wait(lk, [&, this] {
429 std::scoped_lock lks(_stateMutex);
431 });
432 }
433 return true;
434}
435
436bool NAV::Node::doDisable(bool wait)
437{
438 std::unique_lock<std::mutex> lk(_stateMutex);
439 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
440
441 switch (_state)
442 {
443 case State::Disabled:
444 lk.unlock();
445 return true;
447 case State::Shutdown:
449 lk.unlock();
450 return false;
454 _disable = true;
455 lk.unlock();
457 break;
460 {
462 lk.unlock();
463 break;
464 }
465 }
466
467 if (wait)
468 {
469 std::unique_lock lk(_workerMutex);
470 _workerConditionVariable.wait(lk, [&, this] {
471 std::scoped_lock lks(_stateMutex);
473 });
474 }
475 return true;
476}
477
479{
480 std::scoped_lock lk(_stateMutex);
481 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
482
483 if (_state == State::Disabled)
484 {
486 }
487 return true;
488}
489
491{
492 {
493 std::scoped_lock lk(_workerMutex);
494 _workerWakeup = true;
495 }
496 _workerConditionVariable.notify_all();
497}
498
500{
501 std::scoped_lock lk(_stateMutex);
502 return _state == State::Disabled;
503}
505{
506 std::scoped_lock lk(_stateMutex);
507 return _state == State::Initialized;
508}
510{
511 std::scoped_lock lk(_stateMutex);
512 switch (_state)
513 {
514 case State::Disabled:
517 return false;
519 case State::Shutdown:
524 return true;
525 }
526
527 return true;
528}
530{
531 return _onlyRealTime;
532}
533
535{
536 LOG_TRACE("{}: Worker thread started.", node->nameId());
537
538 std::unique_lock<std::mutex> lk(node->_stateMutex);
539 while ([]() { return true; }() && node->_state != State::Shutdown)
540 {
541 if (lk.owns_lock()) { lk.unlock(); }
542
543 if (std::unique_lock<std::mutex> lock(node->_stateMutex);
544 node->_state == State::DoShutdown)
545 {
546 LOG_TRACE("{}: Worker doing shutdown...", node->nameId());
547 node->_state = State::Shutdown;
548 lock.unlock();
549 node->_workerConditionVariable.notify_all();
550 break;
551 }
552 if (std::unique_lock<std::mutex> lock(node->_stateMutex);
554 {
555 LOG_TRACE("{}: Worker doing initialization...", node->nameId());
556 lock.unlock();
557 node->workerInitializeNode();
558 LOG_TRACE("{}: Worker finished initialization, notifying all waiting threads (state = {})", node->nameId(), Node::toString(node->_state));
559 node->_workerConditionVariable.notify_all();
560
561 lk.lock();
562 continue;
563 }
564 if (std::unique_lock<std::mutex> lock(node->_stateMutex);
566 {
567 LOG_TRACE("{}: Worker doing deinitialization...", node->nameId());
568 lock.unlock();
570 LOG_TRACE("{}: Worker finished deinitialization, notifying all waiting threads (state = {})", node->nameId(), Node::toString(node->_state));
571 node->_workerConditionVariable.notify_all();
572
573 lk.lock();
574 continue;
575 }
576
577 if (!node->isTransient())
578 {
579 // Wait for data or state change
580 LOG_DATA("{}: Worker going to sleep", node->nameId());
581 bool timeout = false;
582 {
583 std::unique_lock lk(node->_workerMutex);
584 timeout = !node->_workerConditionVariable.wait_for(lk, node->_workerTimeout, [node] { return node->_workerWakeup; });
585 node->_workerWakeup = false;
586 }
587 LOG_DATA("{}: Worker woke up", node->nameId());
588
589 if (node->isInitialized() && node->callbacksEnabled)
590 {
591 if (timeout && node->callbacksEnabled) // Timeout reached
592 {
593 node->workerTimeoutHandler();
594 }
595
596 // Check input pin for data and trigger callbacks
597 if (std::ranges::any_of(node->inputPins, [](const InputPin& inputPin) {
598 return inputPin.isPinLinked();
599 }))
600 {
601 while (node->isInitialized())
602 {
603 // -------------------------- Data processing on input non-flow pins -----------------------------
604 bool notifyTriggered = false;
605 for (size_t i = 0; i < node->inputPins.size(); i++)
606 {
607 auto& inputPin = node->inputPins[i];
608 if (inputPin.type != Pin::Type::Flow && !inputPin.queue.empty())
609 {
610 if (auto callback = std::get<InputPin::DataChangedNotifyFunc>(inputPin.callback))
611 {
612 LOG_DATA("{}: Invoking notify callback on input pin '{}'", node->nameId(), inputPin.name);
613 InsTime insTime = inputPin.queue.extract_front()->insTime;
614#ifdef TESTING
615 for (const auto& watcherCallback : inputPin.watcherCallbacks)
616 {
617 if (auto watcherCall = std::get<InputPin::DataChangedWatcherNotifyFunc>(watcherCallback))
618 {
619 std::invoke(watcherCall, node, insTime, i);
620 }
621 }
622#endif
623 std::invoke(callback, node, insTime, i);
624 notifyTriggered = true;
625 }
626 }
627 }
628 if (notifyTriggered) { continue; }
629
630 // ------------------------------ Process data on input flow pins --------------------------------
631 if (node->callbacksEnabled || node->_mode == Node::Mode::REAL_TIME)
632 {
633 LOG_DATA("{}: Checking for firable input pins", node->nameId());
634
635 if (node->_mode == Mode::POST_PROCESSING)
636 {
637 // Check if all input flow pins have data
638 bool allInputPinsHaveData = !node->inputPins.empty();
639 for (const auto& inputPin : node->inputPins)
640 {
641 if (inputPin.type == Pin::Type::Flow && inputPin.neededForTemporalQueueCheck && !inputPin.queueBlocked && inputPin.queue.empty())
642 {
643 if (auto* connectedPin = inputPin.link.getConnectedPin();
644 connectedPin && !connectedPin->noMoreDataAvailable)
645 {
646 allInputPinsHaveData = false;
647 break;
648 }
649 }
650 }
651 if (!allInputPinsHaveData)
652 {
653 LOG_DATA("{}: Not all pins have data for temporal sorting", node->nameId());
654 break;
655 }
656 LOG_DATA("{}: All pins have data for temporal sorting", node->nameId());
657 }
658
659 // Find pin with the earliest data
660 InsTime earliestTime;
661 size_t earliestInputPinIdx = 0;
662 int earliestInputPinPriority = -1000;
663 for (size_t i = 0; i < node->inputPins.size(); i++)
664 {
665 auto& inputPin = node->inputPins[i];
666 if (inputPin.type == Pin::Type::Flow && !inputPin.queue.empty()
667 && (earliestTime.empty()
668 || inputPin.queue.front()->insTime < earliestTime
669 || (inputPin.queue.front()->insTime == earliestTime && inputPin.priority > earliestInputPinPriority)))
670 {
671 earliestTime = inputPin.queue.front()->insTime;
672 earliestInputPinIdx = i;
673 earliestInputPinPriority = inputPin.priority;
674 }
675 }
676 if (earliestInputPinPriority == -1000) { break; }
677
678 auto& inputPin = node->inputPins[earliestInputPinIdx];
679 if (inputPin.firable && inputPin.firable(node, inputPin))
680 {
681 if (auto callback = std::get<InputPin::FlowFirableCallbackFunc>(inputPin.callback))
682 {
683 LOG_DATA("{}: Invoking callback on input pin '{}'", node->nameId(), inputPin.name);
684#ifdef TESTING
685 for (const auto& watcherCallback : inputPin.watcherCallbacks)
686 {
687 if (auto watcherCall = std::get<InputPin::FlowFirableWatcherCallbackFunc>(watcherCallback))
688 {
689 std::invoke(watcherCall, node, inputPin.queue, earliestInputPinIdx);
690 }
691 }
692#endif
693 std::invoke(callback, node, inputPin.queue, earliestInputPinIdx);
694 }
695 }
696 else if (inputPin.dropQueueIfNotFirable)
697 {
698 LOG_DATA("{}: Dropping message on input pin '{}'", node->nameId(), inputPin.name);
699 inputPin.queue.pop_front();
700 }
701 else
702 {
703 LOG_DATA("{}: Skipping message on input pin '{}'", node->nameId(), inputPin.name);
704 break; // Do not drop an item, but put the worker to sleep
705 }
706 }
707 else
708 {
709 break;
710 }
711 }
712 }
713
714 // Post-processing (FileReader/Simulator)
715 if (!node->pollEvents.empty())
716 {
717 std::multimap<InsTime, std::pair<OutputPin*, size_t>>::iterator it;
718 while (it = node->pollEvents.begin(), it != node->pollEvents.end() && node->isInitialized() && node->callbacksEnabled)
719 {
720 OutputPin* outputPin = it->second.first;
721 size_t outputPinIdx = it->second.second;
722 Node* node = outputPin->parentNode;
723
724 if (std::holds_alternative<OutputPin::PollDataFunc>(outputPin->data))
725 {
726 auto* callback = std::get_if<OutputPin::PollDataFunc>(&outputPin->data);
727 if (callback != nullptr && *callback != nullptr)
728 {
729 LOG_DATA("{}: Polling data from output pin '{}'", node->nameId(), str::replaceAll_copy(outputPin->name, "\n", ""));
730 if ((node->**callback)() == nullptr)
731 {
732 node->pollEvents.erase(it); // Delete the event if no more data on this pin
733 break;
734 }
735 }
736 }
737 else if (std::holds_alternative<OutputPin::PeekPollDataFunc>(outputPin->data))
738 {
739 auto* callback = std::get_if<OutputPin::PeekPollDataFunc>(&outputPin->data);
740 if (callback != nullptr && *callback != nullptr)
741 {
742 if (!it->first.empty())
743 {
744 LOG_DATA("{}: Polling data from output pin '{}'", node->nameId(), str::replaceAll_copy(outputPin->name, "\n", ""));
745 // Trigger the already peeked observation and invoke it's callbacks (peek = false)
746 if ((node->**callback)(outputPinIdx, false) == nullptr)
747 {
748 LOG_ERROR("{}: {} could not poll its observation despite being able to peek it.", node->nameId(), outputPin->name);
749 }
750 }
751
752 // Check if data available (peek = true)
753 if (auto obs = (node->**callback)(outputPinIdx, true))
754 {
755 // Check if data has a time
756 if (!obs->insTime.empty())
757 {
758 node->pollEvents.insert(std::make_pair(obs->insTime, std::make_pair(outputPin, outputPinIdx)));
759 }
760 else // If no time, call the object and remove it
761 {
762 (node->**callback)(outputPinIdx, false);
763 continue; // Do not erase the iterator, because this pin needs to be called again
764 }
765 }
766 else // nullptr -> no more data incoming on this pin
767 {
768 LOG_TRACE("{}: Output Pin finished: {}", node->nameId(), outputPin->name);
769 outputPin->noMoreDataAvailable = true;
770 for (auto& link : outputPin->links)
771 {
772 link.connectedNode->wakeWorker();
773 }
774 }
775 }
776 else
777 {
778 LOG_ERROR("{} - {}: Callback is not valid anymore", node->nameId(), size_t(outputPin->id));
779 }
780 node->pollEvents.erase(it);
781 }
782 }
783
784 if (node->pollEvents.empty())
785 {
786 LOG_TRACE("{}: Finished polling all pins.", node->nameId());
787
788 node->callbacksEnabled = false;
789 for (auto& outputPin : node->outputPins)
790 {
791 if (!outputPin.noMoreDataAvailable)
792 {
793 LOG_TRACE("{}: Output Pin finished: {}", node->nameId(), outputPin.name);
794 outputPin.noMoreDataAvailable = true;
795 for (auto& link : outputPin.links)
796 {
797 link.connectedNode->wakeWorker();
798 }
799 }
800 }
803 }
804 }
805 }
806
807 // Check if node finished
808 if (node->_mode == Mode::POST_PROCESSING)
809 {
810 if (std::ranges::all_of(node->inputPins, [](const InputPin& inputPin) {
811 return inputPin.type != Pin::Type::Flow || !inputPin.isPinLinked() || inputPin.link.connectedNode->isDisabled()
812 || !inputPin.link.getConnectedPin()->blocksConnectedNodeFromFinishing
813 || (inputPin.queue.empty() && inputPin.link.getConnectedPin()->noMoreDataAvailable);
814 }))
815 {
816 LOG_TRACE("{}: Node finished", node->nameId());
817 node->callbacksEnabled = false;
818 for (auto& outputPin : node->outputPins)
819 {
820 LOG_TRACE("{}: Output Pin finished: {}", node->nameId(), outputPin.name);
821 outputPin.noMoreDataAvailable = true;
822 for (auto& link : outputPin.links)
823 {
824 link.connectedNode->wakeWorker();
825 }
826 }
829 }
830 }
831 }
832 }
833
834 LOG_TRACE("{}: Worker thread ended.", node->nameId());
835}
836
838{
839 LOG_TRACE("{}: called", nameId());
840 {
841 std::scoped_lock lk(_stateMutex);
842 INS_ASSERT_USER_ERROR(_state == State::DoInitialize, fmt::format("Worker can only initialize the node if the state is set to DoInitialize, but it is {}.", toString(_state)).c_str());
844 }
846
847 LOG_DEBUG("{}: Initializing Node", nameId());
848
849 // Initialize Nodes connected to the input pins
850 for (const auto& inputPin : inputPins)
851 {
852 if (inputPin.type != Pin::Type::Flow)
853 {
854 if (Node* connectedNode = inputPin.link.connectedNode)
855 {
856 if (!connectedNode->isInitialized())
857 {
858 LOG_DEBUG("{}: Initializing connected Node '{}' on input Pin {}", nameId(), connectedNode->nameId(), size_t(inputPin.id));
859 if (!connectedNode->doInitialize(true))
860 {
861 LOG_ERROR("{}: Could not initialize connected node {}", nameId(), connectedNode->nameId());
862 std::scoped_lock lk(_stateMutex);
864 {
866 }
867 return false;
868 }
869 }
870 }
871 }
872 }
873
874 _reinitialize = false;
875
876 // Initialize the node itself
877 LOG_TRACE("{}: calling initialize()", nameId());
878 bool initSucceeded = false;
879 {
880 std::scoped_lock<std::mutex> guard(_configWindowMutex);
881 initSucceeded = initialize();
882 }
883 if (initSucceeded)
884 {
885 LOG_TRACE("{}: initialize() was successful", nameId());
886
887 for (auto& inputPin : inputPins)
888 {
889 inputPin.queue.clear();
890 inputPin.queueBlocked = false;
891 }
892
893 pollEvents.clear();
894 LOG_TRACE("{}: calling resetNode()", nameId());
895 {
896 std::scoped_lock<std::mutex> guard(_configWindowMutex);
897 resetNode();
898 }
899 LOG_TRACE("{}: resetNode() was successful", nameId());
900 for (auto& outputPin : outputPins)
901 {
902 outputPin.noMoreDataAvailable = true;
903 for (auto& link : outputPin.links)
904 {
905 LOG_TRACE("{}: Waking connected node '{}'", nameId(), link.connectedNode->nameId());
906 link.connectedNode->wakeWorker();
907 }
908 }
909
910 std::scoped_lock lk(_stateMutex);
912 {
914 }
915 return true;
916 }
917
918 LOG_TRACE("{}: initialize() failed", nameId());
919 std::scoped_lock lk(_stateMutex);
921 {
923 }
924 return false;
925}
926
928{
929 LOG_TRACE("{}: called", nameId());
930 {
931 std::scoped_lock lk(_stateMutex);
932 INS_ASSERT_USER_ERROR(_state == State::DoDeinitialize, fmt::format("Worker can only deinitialize the node if the state is set to DoDeinitialize, but it is {}.", toString(_state)).c_str());
933 {
935 }
936 }
937 LOG_DEBUG("{}: Deinitializing Node", nameId());
938
939 callbacksEnabled = false;
940
941 // Re-/Deinitialize Nodes connected to the output pins
942 for (const auto& outputPin : outputPins)
943 {
944 if (outputPin.type != Pin::Type::Flow)
945 {
946 for (const auto& link : outputPin.links)
947 {
948 if (link.connectedNode->isInitialized())
949 {
950 LOG_DEBUG("{}: {} connected Node '{}' on output Pin {}", nameId(),
951 _reinitialize ? "Reinitializing" : "Deinitializing", link.connectedNode->nameId(), size_t(outputPin.id));
952 if (_reinitialize) { link.connectedNode->doReinitialize(); }
953 else { link.connectedNode->doDeinitialize(); }
954 }
955 }
956 }
957 }
958
959 // Deinitialize the node itself
960 {
961 std::scoped_lock<std::mutex> guard(_configWindowMutex);
962 deinitialize();
963 }
964
965 std::scoped_lock lk(_stateMutex);
967 {
968 if (_disable)
969 {
971 }
972 else if (_reinitialize)
973 {
975 }
976 else
977 {
979 }
980 }
981
982 return true;
983}
984
986{
987 LOG_TRACE("{}: called", nameId());
988}
989
990void NAV::to_json(json& j, const Node& node)
991{
992 ImVec2 realSize = ed::GetNodeSize(node.id);
993 realSize.x -= 16;
994 realSize.y -= 38.0F;
995 j = json{
996 { "id", size_t(node.id) },
997 { "type", node.type() },
998 { "kind", std::string(node.kind) },
999 { "name", node.name },
1000 { "size", node._size.x == 0 && node._size.y == 0 ? node._size : realSize },
1001 { "pos", ed::GetNodePosition(node.id) },
1002 { "enabled", !node.isDisabled() },
1003 { "inputPins", node.inputPins },
1004 { "outputPins", node.outputPins },
1005 };
1006}
1007void NAV::from_json(const json& j, Node& node)
1008{
1009 node.id = j.at("id").get<size_t>();
1010 if (j.contains("kind"))
1011 {
1012 node.kind = Node::Kind(j.at("kind").get<std::string>());
1013 }
1014 if (j.contains("name"))
1015 {
1016 j.at("name").get_to(node.name);
1017 }
1018 if (j.contains("size"))
1019 {
1020 j.at("size").get_to(node._size);
1021 if (node.kind == Node::Kind::GroupBox && gui::NodeEditorApplication::isUsingBigDefaultFont())
1022 {
1023 node._size.y -= 20;
1024 }
1025 }
1026 if (j.contains("enabled"))
1027 {
1028 bool enabled = j.at("enabled").get<bool>();
1029 if (!enabled)
1030 {
1031 std::scoped_lock lk(node._stateMutex);
1033 }
1034 }
1035
1036 if (j.contains("inputPins"))
1037 {
1038 auto inputPins = j.at("inputPins").get<std::vector<InputPin>>();
1039 for (size_t i = 0; i < inputPins.size(); ++i)
1040 {
1041 if (node.inputPins.size() <= i)
1042 {
1043 break;
1044 }
1045 j.at("inputPins").at(i).get_to(node.inputPins.at(i));
1046 }
1047 }
1048
1049 if (j.contains("outputPins"))
1050 {
1051 auto outputPins = j.at("outputPins").get<std::vector<OutputPin>>();
1052 for (size_t i = 0; i < outputPins.size(); ++i)
1053 {
1054 if (node.outputPins.size() <= i)
1055 {
1056 break;
1057 }
1058 j.at("outputPins").at(i).get_to(node.outputPins.at(i));
1059 }
1060 }
1061}
Assertion helpers.
#define INS_ASSERT_USER_ERROR(_EXP, _MSG)
Assert function with message.
Definition Assert.h:21
Handles Flow animations.
Flow Executor Thread.
nlohmann::json json
json namespace
Defines how to save certain datatypes to json.
Utility class for logging to console and file.
#define LOG_DEBUG
Debug information. Should not be called on functions which receive observations (spamming)
Definition Logger.hpp:67
#define LOG_DATA
All output which occurs repeatedly every time observations are received.
Definition Logger.hpp:29
#define LOG_ERROR
Error occurred, which stops part of the program to work, but not everything.
Definition Logger.hpp:73
#define LOG_TRACE
Detailled info to trace the execution of the program. Should not be called on functions which receive...
Definition Logger.hpp:65
Manages all Nodes.
Node Class.
Utility functions for working with std::strings.
Input pins of nodes.
Definition Pin.hpp:491
IncomingLink link
Info to identify the linked pin.
Definition Pin.hpp:704
Callback callback
Callback to call when the node is firable or when it should be notified of data change.
Definition Pin.hpp:722
FlowFirableCheckFunc firable
Function to check if the callback is firable.
Definition Pin.hpp:728
bool queueBlocked
If true no more messages are accepted to the queue.
Definition Pin.hpp:740
bool neededForTemporalQueueCheck
Whether it should be checked for temporal ordering.
Definition Pin.hpp:734
int priority
Priority when checking firable condition related to other pins (higher priority gets triggered first)
Definition Pin.hpp:731
NodeDataQueue queue
Queue with received data.
Definition Pin.hpp:743
bool dropQueueIfNotFirable
If true, drops elements from the queue if not firable, otherwise sleeps the worker.
Definition Pin.hpp:737
The class is responsible for all time-related tasks.
Definition InsTime.hpp:710
constexpr bool empty() const
Checks if the Time object has a value.
Definition InsTime.hpp:1089
Abstract parent class for all nodes.
Definition Node.hpp:92
bool _workerWakeup
Variable to prevent the worker from sleeping.
Definition Node.hpp:457
bool isDisabled() const
Checks if the node is disabled.
Definition Node.cpp:499
bool isOnlyRealtime() const
Checks if the node is only working in real time (sensors, network interfaces, ...)
Definition Node.cpp:529
bool isInitialized() const
Checks if the node is initialized.
Definition Node.cpp:504
bool doDeinitialize(bool wait=false)
Asks the node worker to deinitialize the node.
Definition Node.cpp:395
static void workerThread(Node *node)
Worker thread.
Definition Node.cpp:534
virtual void restore(const json &j)
Restores the node from a json object.
Definition Node.cpp:62
void releaseInputValue(size_t portIndex)
Unblocks the connected node. Has to be called when the input value should be released and getInputVal...
Definition Node.cpp:148
bool _reinitialize
Flag if the node should be reinitialize after deinitializing.
Definition Node.hpp:429
virtual void workerTimeoutHandler()
Handler which gets triggered if the worker runs into a periodic timeout.
Definition Node.cpp:985
bool _disable
Flag if the node should be disabled after deinitializing.
Definition Node.hpp:432
bool doDisable(bool wait=false)
Asks the node worker to disable the node.
Definition Node.cpp:436
State
Possible states of the node.
Definition Node.hpp:176
@ DoInitialize
Node should be initialized.
Definition Node.hpp:179
@ Shutdown
Node is shutting down.
Definition Node.hpp:185
@ DoShutdown
Node should shut down.
Definition Node.hpp:184
@ Initializing
Node is currently initializing.
Definition Node.hpp:180
@ Initialized
Node is initialized (green)
Definition Node.hpp:181
@ DoDeinitialize
Node should be deinitialized.
Definition Node.hpp:182
@ Disabled
Node is disabled and won't be initialized.
Definition Node.hpp:177
@ Deinitializing
Node is currently deinitializing.
Definition Node.hpp:183
@ Deinitialized
Node is deinitialized (red)
Definition Node.hpp:178
void wakeWorker()
Wakes the worker thread.
Definition Node.cpp:490
ImVec2 _size
Size of the node in pixels.
Definition Node.hpp:448
std::mutex _configWindowMutex
Mutex to show the config window (prevents initialization to modify values within the config window)
Definition Node.hpp:438
State getState() const
Get the current state of the node.
Definition Node.cpp:289
bool workerInitializeNode()
Called by the worker to initialize the node.
Definition Node.cpp:837
void notifyOutputValueChanged(size_t pinIdx, const InsTime &insTime, const std::scoped_lock< std::mutex > &&guard)
Notifies connected nodes about the change.
Definition Node.cpp:93
const ImVec2 & getSize() const
Get the size of the node.
Definition Node.cpp:258
bool hasInputPinWithSameTime(const InsTime &insTime) const
Checks wether there is an input pin with the same time.
Definition Node.cpp:173
virtual void flush()
Function called by the flow executer after finishing to flush out remaining data.
Definition Node.cpp:73
std::vector< OutputPin > outputPins
List of output pins.
Definition Node.hpp:399
Node(std::string name)
Constructor.
Definition Node.cpp:30
bool workerDeinitializeNode()
Called by the worker to deinitialize the node.
Definition Node.cpp:927
std::multimap< InsTime, std::pair< OutputPin *, size_t > > pollEvents
Map with callback events (sorted by time)
Definition Node.hpp:405
Kind kind
Kind of the Node.
Definition Node.hpp:393
size_t outputPinIndexFromId(ax::NodeEditor::PinId pinId) const
Returns the index of the pin.
Definition Node.cpp:243
std::vector< InputPin > inputPins
List of input pins.
Definition Node.hpp:397
OutputPin & outputPinFromId(ax::NodeEditor::PinId pinId)
Returns the pin with the given id.
Definition Node.cpp:223
virtual void deinitialize()
Deinitialize the Node.
Definition Node.cpp:71
bool doInitialize(bool wait=false)
Asks the node worker to initialize the node.
Definition Node.cpp:300
State _state
Current state of the node.
Definition Node.hpp:422
Mode
Different Modes the Node can work in.
Definition Node.hpp:190
@ POST_PROCESSING
Node running in post-processing mode.
Definition Node.hpp:192
@ REAL_TIME
Node running in real-time mode.
Definition Node.hpp:191
bool callbacksEnabled
Enables the callbacks.
Definition Node.hpp:402
std::atomic< Mode > _mode
Mode the node is currently running in.
Definition Node.hpp:426
virtual bool resetNode()
Resets the node. It is guaranteed that the node is initialized when this is called.
Definition Node.cpp:75
std::string nameId() const
Node name and id.
Definition Node.cpp:253
virtual void afterCreateLink(OutputPin &startPin, InputPin &endPin)
Called when a new link was established.
Definition Node.cpp:89
std::thread _worker
Worker handling initialization and processing of data.
Definition Node.hpp:454
size_t inputPinIndexFromId(ax::NodeEditor::PinId pinId) const
Returns the index of the pin.
Definition Node.cpp:233
virtual void onDeleteLink(OutputPin &startPin, InputPin &endPin)
Called when a link is to be deleted.
Definition Node.cpp:87
virtual ~Node()
Destructor.
Definition Node.cpp:40
Mode getMode() const
Get the current mode of the node.
Definition Node.cpp:295
std::mutex _workerMutex
Mutex to interact with the worker condition variable.
Definition Node.hpp:455
virtual void guiConfig()
ImGui config window which is shown on double click.
Definition Node.cpp:58
std::string name
Name of the Node.
Definition Node.hpp:395
bool _onlyRealTime
Whether the node can run in post-processing or only real-time.
Definition Node.hpp:419
bool doEnable()
Enable the node.
Definition Node.cpp:478
bool doReinitialize(bool wait=false)
Asks the node worker to reinitialize the node.
Definition Node.cpp:350
std::chrono::duration< int64_t > _workerTimeout
Periodic timeout of the worker to check if new data available.
Definition Node.hpp:453
std::condition_variable _workerConditionVariable
Condition variable to signal the worker thread to do something.
Definition Node.hpp:456
virtual void restoreAtferLink(const json &j)
Restores link related properties of the node from a json object.
Definition Node.cpp:64
virtual void afterDeleteLink(OutputPin &startPin, InputPin &endPin)
Called when a link was deleted.
Definition Node.cpp:91
InputPin & inputPinFromId(ax::NodeEditor::PinId pinId)
Returns the pin with the given id.
Definition Node.cpp:213
std::scoped_lock< std::mutex > requestOutputValueLock(size_t pinIdx)
Blocks the thread till the output values was read by all connected nodes.
Definition Node.cpp:133
void invokeCallbacks(size_t portIndex, const std::shared_ptr< const NodeData > &data)
Calls all registered callbacks on the specified output port.
Definition Node.cpp:180
virtual json save() const
Saves the node into a json object.
Definition Node.cpp:60
static std::string toString(State state)
Converts the state into a printable text.
Definition Node.cpp:263
static bool _autostartWorker
Flag which prevents the worker to be autostarted if false.
Definition Node.hpp:451
ax::NodeEditor::NodeId id
Unique Id of the Node.
Definition Node.hpp:391
virtual bool onCreateLink(OutputPin &startPin, InputPin &endPin)
Called when a new link is to be established.
Definition Node.cpp:82
virtual std::string type() const =0
String representation of the Class Type.
std::mutex _stateMutex
Mutex to interact with the worker state variable.
Definition Node.hpp:423
bool isTransient() const
Checks if the node is changing its state currently.
Definition Node.cpp:509
virtual bool initialize()
Initialize the Node.
Definition Node.cpp:66
Output pins of nodes.
Definition Pin.hpp:338
PinData data
Pointer to data (owned by this node) which is transferred over this pin.
Definition Pin.hpp:458
std::vector< OutgoingLink > links
Info to identify the linked pins.
Definition Pin.hpp:433
std::atomic< bool > noMoreDataAvailable
Flag set, when no more data is available on this pin.
Definition Pin.hpp:470
Node * parentNode
Reference to the parent node.
Definition Pin.hpp:307
std::string name
Name of the Pin.
Definition Pin.hpp:299
ax::NodeEditor::PinId id
Unique Id of the Pin.
Definition Pin.hpp:297
Type type
Type of the Pin.
Definition Pin.hpp:301
auto & front()
Returns a reference to the first element in the container.
Definition TsDeque.hpp:198
auto extract_front()
Returns a copy of the first element in the container and removes it from the container.
Definition TsDeque.hpp:494
bool empty() const noexcept
Checks if the container has no elements, i.e. whether 'begin() == end()'.
Definition TsDeque.hpp:275
void pop_front()
Removes the first element of the container. If there are no elements in the container,...
Definition TsDeque.hpp:461
void Add(ax::NodeEditor::LinkId id, ax::NodeEditor::FlowDirection direction=ax::NodeEditor::FlowDirection::Forward)
Thread safe flow animaton of links.
void deregisterNode(const Node *node)
Called by nodes when they finished with sending data.
bool showFlowWhenNotifyingValueChange
Flag if notifyOutputValueChanged & notifyInputValueChanged triggers a GUI Flow event.
static std::string replaceAll_copy(std::string str, const std::string &from, const std::string &to, CaseSensitivity cs)
Replaces all occurrence of a search pattern with another sequence.
void to_json(json &j, const Node &node)
Converts the provided node into a json object.
Definition Node.cpp:990
void move(std::vector< T > &v, size_t sourceIdx, size_t targetIdx)
Moves an element within a vector to a new position.
Definition Vector.hpp:27
void from_json(const json &j, Node &node)
Converts the provided json object into a node object.
Definition Node.cpp:1007
Kind information class.
Definition Node.hpp:96
@ GroupBox
Group box which can group other nodes and drag them together.
Definition Node.hpp:102
@ Flow
NodeData Trigger.
Definition Pin.hpp:52