INSTINCT Code Coverage Report


Directory: src/
File: internal/Node/Node.cpp
Date: 2025-11-25 23:34:18
Exec Total Coverage
Lines: 372 576 64.6%
Functions: 38 59 64.4%
Branches: 391 891 43.9%

Line Branch Exec Source
1 // This file is part of INSTINCT, the INS Toolkit for Integrated
2 // Navigation Concepts and Training by the Institute of Navigation of
3 // the University of Stuttgart, Germany.
4 //
5 // This Source Code Form is subject to the terms of the Mozilla Public
6 // License, v. 2.0. If a copy of the MPL was not distributed with this
7 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
8
9 #include "Node.hpp"
10
11 #include <stdexcept>
12
13 #include "util/Logger.hpp"
14 #include "util/StringUtil.hpp"
15 #include "util/Assert.h"
16
17 #include "internal/FlowManager.hpp"
18 #include "internal/FlowExecutor.hpp"
19 #include "internal/gui/FlowAnimation.hpp"
20 #include "util/Json.hpp"
21 #include "internal/gui/NodeEditorApplication.hpp"
22
23 #include <imgui_node_editor.h>
24 namespace ed = ax::NodeEditor;
25 #ifdef TESTING
26 #include <catch2/catch_test_macros.hpp>
27 #endif
28
29 6969 NAV::Node::Node(std::string name)
30
1/2
✓ Branch 14 taken 6969 times.
✗ Branch 15 not taken.
6969 : name(std::move(name))
31 {
32 LOG_TRACE("{}: called", nameId());
33
2/2
✓ Branch 0 taken 357 times.
✓ Branch 1 taken 6612 times.
6969 if (_autostartWorker)
34 {
35
1/2
✓ Branch 1 taken 357 times.
✗ Branch 2 not taken.
357 _worker = std::thread(workerThread, this);
36 }
37 6969 }
38
39 13934 NAV::Node::~Node()
40 {
41 LOG_TRACE("{}: called", nameId());
42
43
2/2
✓ Branch 0 taken 355 times.
✓ Branch 1 taken 6612 times.
13934 if (_autostartWorker)
44 {
45 710 _state = State::DoShutdown;
46 710 wakeWorker();
47
48 // // wait for the worker
49 // {
50 // std::unique_lock lk(_workerMutex);
51 // _workerConditionVariable.wait(lk, [&, this] { return _state == State::Shutdown; });
52 // }
53 710 _worker.join();
54 }
55 13934 }
56
57 void NAV::Node::guiConfig() {}
58
59 json NAV::Node::save() const { return {}; }
60
61 53 void NAV::Node::restore(const json& /*j*/) {}
62
63 315 void NAV::Node::restoreAtferLink(const json& /*j*/) {}
64
65 158 bool NAV::Node::initialize()
66 {
67 158 return true;
68 }
69
70 92 void NAV::Node::deinitialize() {}
71
72 334 void NAV::Node::flush() {}
73
74 333 bool NAV::Node::resetNode()
75 {
76 LOG_TRACE("{}: called", nameId());
77
78 333 return initialize();
79 }
80
81 554 bool NAV::Node::onCreateLink(OutputPin& /*startPin*/, InputPin& /*endPin*/)
82 {
83 554 return true;
84 }
85
86 583 void NAV::Node::onDeleteLink(OutputPin& /*startPin*/, InputPin& /*endPin*/) {}
87
88 490 void NAV::Node::afterCreateLink(OutputPin& /*startPin*/, InputPin& /*endPin*/) {}
89
90 538 void NAV::Node::afterDeleteLink(OutputPin& /*startPin*/, InputPin& /*endPin*/) {}
91
92 void NAV::Node::notifyOutputValueChanged(size_t pinIdx, const InsTime& insTime, const std::scoped_lock<std::mutex>&& /* guard */)
93 {
94 if (callbacksEnabled && isInitialized())
95 {
96 auto& outputPin = outputPins.at(pinIdx);
97
98 if (!outputPin.isPinLinked()) { return; }
99
100 for (auto& link : outputPin.links)
101 {
102 auto* targetPin = link.getConnectedPin();
103 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
104 {
105 outputPin.dataAccessCounter++;
106 link.dataChangeNotification = true;
107 LOG_DATA("{}: Increasing data access counter on output pin '{}'. Value now {}.", nameId(), outputPin.name, outputPin.dataAccessCounter);
108
109 if (gui::NodeEditorApplication::showFlowWhenNotifyingValueChange)
110 {
111 FlowAnimation::Add(link.linkId);
112 }
113
114 auto data = std::make_shared<NodeData>();
115 data->insTime = insTime;
116
117 targetPin->queue.push_back(data);
118 }
119 }
120 for (const auto& link : outputPin.links)
121 {
122 auto* targetPin = link.getConnectedPin();
123 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
124 {
125 LOG_DATA("{}: Waking up worker of node '{}'. New data on pin '{}'", nameId(), link.connectedNode->nameId(), targetPin->name);
126 link.connectedNode->wakeWorker();
127 }
128 }
129 }
130 }
131
132 150 std::scoped_lock<std::mutex> NAV::Node::requestOutputValueLock(size_t pinIdx)
133 {
134 150 auto& outputPin = outputPins.at(pinIdx);
135 {
136
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 std::unique_lock<std::mutex> lk(outputPin.dataAccessMutex);
137
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 150 times.
150 if (outputPin.dataAccessCounter > 0)
138 {
139 LOG_DATA("{}: Requesting lock on output pin '{}', {} threads accessing still.", nameId(), outputPin.name, outputPin.dataAccessCounter);
140 outputPin.dataAccessConditionVariable.wait(lk, [&outputPin]() { return outputPin.dataAccessCounter == 0; });
141 LOG_DATA("{}: Lock on output pin '{}' acquired.", nameId(), outputPin.name);
142 }
143 150 }
144 150 return std::scoped_lock(outputPin.dataAccessMutex);
145 }
146
147 void NAV::Node::releaseInputValue(size_t portIndex)
148 {
149 if (OutputPin* outputPin = inputPins.at(portIndex).link.getConnectedPin())
150 {
151 std::scoped_lock<std::mutex> lk(outputPin->dataAccessMutex);
152 if (outputPin->dataAccessCounter > 0)
153 {
154 auto outgoingLink = std::ranges::find_if(outputPin->links, [&](const OutputPin::OutgoingLink& link) {
155 return link.connectedPinId == inputPins.at(portIndex).id;
156 });
157 if (outgoingLink != outputPin->links.end() && outgoingLink->dataChangeNotification)
158 {
159 outgoingLink->dataChangeNotification = false;
160 outputPin->dataAccessCounter--;
161
162 if (outputPin->dataAccessCounter == 0)
163 {
164 LOG_DATA("{}: Notifying node '{}' connected to pin {} that all data is read.", nameId(), outputPin->parentNode->nameId(), outputPin->name);
165 outputPin->dataAccessConditionVariable.notify_all();
166 }
167 }
168 }
169 }
170 }
171
172 bool NAV::Node::hasInputPinWithSameTime(const InsTime& insTime) const
173 {
174 return std::ranges::any_of(inputPins, [&insTime](const InputPin& pin) {
175 return !pin.queue.empty() && pin.queue.front()->insTime == insTime;
176 });
177 }
178
179 370876 void NAV::Node::invokeCallbacks(size_t portIndex, const std::shared_ptr<const NAV::NodeData>& data)
180 {
181
2/2
✓ Branch 0 taken 370867 times.
✓ Branch 1 taken 9 times.
370876 if (callbacksEnabled)
182 {
183
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 370799 times.
370867 if (data == nullptr)
184 {
185 LOG_DEBUG("{}: Tried to invokeCallbacks on pin {} with a nullptr, which is not allowed!!!", nameId(), portIndex);
186 return;
187 }
188
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 370731 times.
370799 if (data->insTime.empty())
189 {
190 LOG_DATA("{}: Tried to invokeCallbacks on pin {} without a InsTime. The time is mandatory though!!! ", nameId(), portIndex);
191 return;
192 }
193
194
3/4
✓ Branch 1 taken 370629 times.
✗ Branch 2 not taken.
✓ Branch 8 taken 555997 times.
✓ Branch 9 taken 370879 times.
927254 for (const auto& link : outputPins.at(portIndex).links)
195 {
196
1/2
✓ Branch 1 taken 556461 times.
✗ Branch 2 not taken.
556045 auto* targetPin = link.getConnectedPin();
197
7/8
✓ Branch 1 taken 556504 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 508061 times.
✓ Branch 4 taken 48443 times.
✓ Branch 5 taken 481271 times.
✓ Branch 6 taken 26790 times.
✓ Branch 7 taken 481271 times.
✓ Branch 8 taken 75233 times.
556461 if (link.connectedNode->isInitialized() && !targetPin->queueBlocked)
198 {
199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 481271 times.
481271 if (gui::NodeEditorApplication::showFlowWhenInvokingCallbacks)
200 {
201 FlowAnimation::Add(link.linkId);
202 }
203
204
1/2
✓ Branch 1 taken 481158 times.
✗ Branch 2 not taken.
481271 targetPin->queue.push_back(data);
205 LOG_DATA("{}: Waking up worker of node {}. New data on pin '{}'", nameId(), size_t(link.connectedNode->id), targetPin->name);
206
1/2
✓ Branch 1 taken 481290 times.
✗ Branch 2 not taken.
481158 link.connectedNode->wakeWorker();
207 }
208 }
209 }
210 }
211
212 NAV::InputPin& NAV::Node::inputPinFromId(ax::NodeEditor::PinId pinId)
213 {
214 for (auto& inputPin : inputPins)
215 {
216 if (pinId == inputPin.id) { return inputPin; }
217 }
218
219 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
220 }
221
222 NAV::OutputPin& NAV::Node::outputPinFromId(ax::NodeEditor::PinId pinId)
223 {
224 for (auto& outputPin : outputPins)
225 {
226 if (pinId == outputPin.id) { return outputPin; }
227 }
228
229 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
230 }
231
232 80 size_t NAV::Node::inputPinIndexFromId(ax::NodeEditor::PinId pinId) const
233 {
234
1/2
✓ Branch 1 taken 176 times.
✗ Branch 2 not taken.
176 for (size_t i = 0; i < inputPins.size(); i++)
235 {
236
2/2
✓ Branch 2 taken 80 times.
✓ Branch 3 taken 96 times.
176 if (pinId == inputPins.at(i).id) { return i; }
237 }
238
239 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
240 }
241
242 size_t NAV::Node::outputPinIndexFromId(ax::NodeEditor::PinId pinId) const
243 {
244 for (size_t i = 0; i < outputPins.size(); i++)
245 {
246 if (pinId == outputPins.at(i).id) { return i; }
247 }
248
249 throw std::runtime_error(fmt::format("{}: The Pin {} is not on this node.", nameId(), size_t(pinId)).c_str());
250 }
251
252 5207 NAV::InputPin* NAV::Node::CreateInputPin(const char* name, NAV::Pin::Type pinType, const std::vector<std::string>& dataIdentifier,
253 InputPin::Callback callback, InputPin::FlowFirableCheckFunc firable, int priority, int idx)
254 {
255 LOG_TRACE("called for pin ({}) of type ({}) for node [{}]", name, std::string(pinType), nameId());
256
2/2
✓ Branch 0 taken 5058 times.
✓ Branch 1 taken 149 times.
5207 if (idx < 0)
257 {
258 5058 idx = static_cast<int>(inputPins.size());
259 }
260 5207 idx = std::min(idx, static_cast<int>(inputPins.size()));
261 5207 auto iter = std::next(inputPins.begin(), idx);
262
263
2/4
✓ Branch 1 taken 5207 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 5207 times.
✗ Branch 6 not taken.
5207 inputPins.emplace(iter, flow::GetNextPinId(), name, pinType, this);
264
265
1/2
✓ Branch 1 taken 5207 times.
✗ Branch 2 not taken.
5207 inputPins.at(static_cast<size_t>(idx)).callback = callback;
266
2/2
✓ Branch 0 taken 377 times.
✓ Branch 1 taken 4830 times.
5207 if (firable != nullptr)
267 {
268
1/2
✓ Branch 1 taken 377 times.
✗ Branch 2 not taken.
377 inputPins.at(static_cast<size_t>(idx)).firable = firable;
269 }
270
2/4
✓ Branch 1 taken 5207 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5207 times.
✗ Branch 5 not taken.
5207 inputPins.at(static_cast<size_t>(idx)).dataIdentifier = dataIdentifier;
271
1/2
✓ Branch 1 taken 5207 times.
✗ Branch 2 not taken.
5207 inputPins.at(static_cast<size_t>(idx)).priority = priority;
272
273
1/2
✓ Branch 1 taken 5207 times.
✗ Branch 2 not taken.
5207 flow::ApplyChanges();
274
275
1/2
✓ Branch 1 taken 5207 times.
✗ Branch 2 not taken.
10414 return &inputPins.at(static_cast<size_t>(idx));
276 }
277
278 7027 NAV::OutputPin* NAV::Node::CreateOutputPin(const char* name, NAV::Pin::Type pinType, const std::vector<std::string>& dataIdentifier, OutputPin::PinData data, int idx)
279 {
280 LOG_TRACE("called for pin ({}) of type ({}) for node [{}]", name, std::string(pinType), nameId());
281
2/2
✓ Branch 0 taken 7004 times.
✓ Branch 1 taken 23 times.
7027 if (idx < 0)
282 {
283 7004 idx = static_cast<int>(outputPins.size());
284 }
285 7027 idx = std::min(idx, static_cast<int>(outputPins.size()));
286 7027 auto iter = std::next(outputPins.begin(), idx);
287
288
2/4
✓ Branch 1 taken 7027 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 7027 times.
✗ Branch 6 not taken.
7027 outputPins.emplace(iter, flow::GetNextPinId(), name, pinType, this);
289
290
1/2
✓ Branch 1 taken 7027 times.
✗ Branch 2 not taken.
7027 outputPins.at(static_cast<size_t>(idx)).data = data;
291
2/4
✓ Branch 1 taken 7027 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 7027 times.
✗ Branch 5 not taken.
7027 outputPins.at(static_cast<size_t>(idx)).dataIdentifier = dataIdentifier;
292
293
1/2
✓ Branch 1 taken 7027 times.
✗ Branch 2 not taken.
7027 flow::ApplyChanges();
294
295
1/2
✓ Branch 1 taken 7027 times.
✗ Branch 2 not taken.
14054 return &outputPins.at(static_cast<size_t>(idx));
296 }
297
298 7 bool NAV::Node::DeleteOutputPin(size_t pinIndex)
299 {
300 7 auto& pin = outputPins.at(pinIndex);
301 LOG_TRACE("called for pin ({})", size_t(pin.id));
302
303 7 pin.deleteLinks();
304
305
1/2
✓ Branch 4 taken 7 times.
✗ Branch 5 not taken.
7 pin.parentNode->outputPins.erase(pin.parentNode->outputPins.begin() + static_cast<int64_t>(pinIndex));
306
307 7 return true;
308 }
309
310 3 bool NAV::Node::DeleteInputPin(size_t pinIndex)
311 {
312 3 auto& pin = inputPins.at(pinIndex);
313 LOG_TRACE("called for pin ({})", size_t(pin.id));
314
315 3 pin.deleteLink();
316
317
1/2
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
3 LOG_DEBUG("Erasing pin at idx {}", pinIndex);
318
1/2
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
3 pin.parentNode->inputPins.erase(pin.parentNode->inputPins.begin() + static_cast<int64_t>(pinIndex));
319
320 3 return true;
321 }
322
323 134393 std::string NAV::Node::nameId() const
324 {
325
5/10
✓ Branch 1 taken 134390 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 134388 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 134395 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 134393 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 134389 times.
✗ Branch 14 not taken.
806346 return fmt::format("{} ({})", str::replaceAll_copy(name, "\n", ""), size_t(id));
326 }
327
328 const ImVec2& NAV::Node::getSize() const
329 {
330 return _size;
331 }
332
333 696 std::string NAV::Node::toString(State state)
334 {
335
2/10
✗ Branch 0 not taken.
✗ Branch 1 not taken.
✓ Branch 2 taken 351 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 350 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
696 switch (state)
336 {
337 case State::Disabled:
338 return "Disabled";
339 case State::Deinitialized:
340 return "Deinitialized";
341 351 case State::DoInitialize:
342
1/2
✓ Branch 1 taken 351 times.
✗ Branch 2 not taken.
702 return "DoInitialize";
343 case State::Initializing:
344 return "Initializing";
345 case State::Initialized:
346 return "Initialized";
347 350 case State::DoDeinitialize:
348
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
700 return "DoDeinitialize";
349 case State::Deinitializing:
350 return "Deinitializing";
351 case State::DoShutdown:
352 return "DoShutdown";
353 case State::Shutdown:
354 return "Shutdown";
355 }
356 return "";
357 }
358
359 NAV::Node::State NAV::Node::getState() const
360 {
361 std::scoped_lock lk(_stateMutex);
362 return _state;
363 }
364
365 8 NAV::Node::Mode NAV::Node::getMode() const
366 {
367 8 return _mode;
368 }
369
370 522 bool NAV::Node::doInitialize(bool wait)
371 {
372
1/2
✓ Branch 1 taken 522 times.
✗ Branch 2 not taken.
522 std::unique_lock<std::mutex> lk(_stateMutex);
373 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
374
375
2/6
✗ Branch 0 not taken.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 170 times.
✓ Branch 4 taken 352 times.
✗ Branch 5 not taken.
522 switch (_state)
376 {
377 case State::Initialized:
378 lk.unlock();
379 return true;
380 case State::Disabled:
381 case State::DoShutdown:
382 case State::Shutdown:
383 lk.unlock();
384 return false;
385 case State::DoDeinitialize:
386 case State::Deinitializing:
387 if (_reinitialize)
388 {
389 lk.unlock();
390 break;
391 }
392 lk.unlock();
393 return false;
394 170 case State::DoInitialize:
395 case State::Initializing:
396
1/2
✓ Branch 1 taken 170 times.
✗ Branch 2 not taken.
170 lk.unlock();
397 170 break;
398 352 case State::Deinitialized:
399 {
400 352 _state = State::DoInitialize;
401
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 lk.unlock();
402
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 wakeWorker();
403 352 break;
404 }
405 }
406
407
2/2
✓ Branch 0 taken 170 times.
✓ Branch 1 taken 352 times.
522 if (wait)
408 {
409
1/2
✓ Branch 1 taken 170 times.
✗ Branch 2 not taken.
170 std::unique_lock lk(_workerMutex);
410
1/2
✓ Branch 1 taken 170 times.
✗ Branch 2 not taken.
170 _workerConditionVariable.wait(lk, [&, this] {
411
1/2
✓ Branch 1 taken 367 times.
✗ Branch 2 not taken.
367 std::scoped_lock lks(_stateMutex);
412
4/4
✓ Branch 0 taken 198 times.
✓ Branch 1 taken 169 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 197 times.
734 return _state == State::Initialized || _state == State::Deinitialized;
413 367 });
414
1/2
✓ Branch 1 taken 170 times.
✗ Branch 2 not taken.
170 std::scoped_lock lks(_stateMutex);
415 170 return _state == State::Initialized;
416 170 }
417 352 return true;
418 522 }
419
420 bool NAV::Node::doReinitialize(bool wait)
421 {
422 std::unique_lock<std::mutex> lk(_stateMutex);
423 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
424
425 switch (_state)
426 {
427 case State::Disabled:
428 case State::DoShutdown:
429 case State::Shutdown:
430 lk.unlock();
431 return false;
432 case State::DoDeinitialize:
433 case State::Deinitializing:
434 _reinitialize = true;
435 lk.unlock();
436 break;
437 case State::DoInitialize:
438 case State::Initializing:
439 lk.unlock();
440 break;
441 case State::Deinitialized:
442 lk.unlock();
443 return doInitialize(wait);
444 case State::Initialized:
445 _state = State::DoDeinitialize;
446 _reinitialize = true;
447 lk.unlock();
448 wakeWorker();
449 break;
450 }
451
452 if (wait)
453 {
454 std::unique_lock lk(_workerMutex);
455 _workerConditionVariable.wait(lk, [&, this] {
456 std::scoped_lock lks(_stateMutex);
457 return _state == State::Initialized || _state == State::Deinitialized;
458 });
459 std::scoped_lock lks(_stateMutex);
460 return _state == State::Initialized;
461 }
462 return true;
463 }
464
465 378 bool NAV::Node::doDeinitialize(bool wait)
466 {
467
1/2
✓ Branch 1 taken 378 times.
✗ Branch 2 not taken.
378 std::unique_lock<std::mutex> lk(_stateMutex);
468 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
469
470
3/5
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 350 times.
✗ Branch 4 not taken.
378 switch (_state)
471 {
472 22 case State::Deinitialized:
473
1/2
✓ Branch 1 taken 22 times.
✗ Branch 2 not taken.
22 lk.unlock();
474 22 return true;
475 6 case State::Disabled:
476 case State::DoShutdown:
477 case State::Shutdown:
478 case State::DoInitialize:
479 case State::Initializing:
480
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 lk.unlock();
481 6 return false;
482 case State::DoDeinitialize:
483 case State::Deinitializing:
484 lk.unlock();
485 break;
486 350 case State::Initialized:
487 {
488 350 _state = State::DoDeinitialize;
489
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 lk.unlock();
490
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 wakeWorker();
491 350 break;
492 }
493 }
494
495
2/2
✓ Branch 0 taken 347 times.
✓ Branch 1 taken 3 times.
350 if (wait)
496 {
497
1/2
✓ Branch 1 taken 347 times.
✗ Branch 2 not taken.
347 std::unique_lock lk(_workerMutex);
498
1/2
✓ Branch 1 taken 347 times.
✗ Branch 2 not taken.
347 _workerConditionVariable.wait(lk, [&, this] {
499
1/2
✓ Branch 1 taken 694 times.
✗ Branch 2 not taken.
694 std::scoped_lock lks(_stateMutex);
500 694 return _state == State::Deinitialized;
501 694 });
502 347 }
503 350 return true;
504 378 }
505
506 bool NAV::Node::doDisable(bool wait)
507 {
508 std::unique_lock<std::mutex> lk(_stateMutex);
509 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
510
511 switch (_state)
512 {
513 case State::Disabled:
514 lk.unlock();
515 return true;
516 case State::DoShutdown:
517 case State::Shutdown:
518 case State::Initializing:
519 lk.unlock();
520 return false;
521 case State::Initialized:
522 case State::DoDeinitialize:
523 case State::Deinitializing:
524 _disable = true;
525 lk.unlock();
526 doDeinitialize();
527 break;
528 case State::DoInitialize:
529 case State::Deinitialized:
530 {
531 _state = State::Disabled;
532 lk.unlock();
533 break;
534 }
535 }
536
537 if (wait)
538 {
539 std::unique_lock lk(_workerMutex);
540 _workerConditionVariable.wait(lk, [&, this] {
541 std::scoped_lock lks(_stateMutex);
542 return _state == State::Deinitialized;
543 });
544 }
545 return true;
546 }
547
548 bool NAV::Node::doEnable()
549 {
550 std::scoped_lock lk(_stateMutex);
551 LOG_TRACE("{}: Current state = {}", nameId(), toString(_state));
552
553 if (_state == State::Disabled)
554 {
555 _state = State::Deinitialized;
556 }
557 return true;
558 }
559
560 483098 void NAV::Node::wakeWorker()
561 {
562 {
563
1/2
✓ Branch 1 taken 483188 times.
✗ Branch 2 not taken.
483098 std::scoped_lock lk(_workerMutex);
564 483188 _workerWakeup = true;
565 483188 }
566 483163 _workerConditionVariable.notify_all();
567 483276 }
568
569 363223 bool NAV::Node::isDisabled() const
570 {
571
1/2
✓ Branch 1 taken 363288 times.
✗ Branch 2 not taken.
363223 std::scoped_lock lk(_stateMutex);
572 363288 return _state == State::Disabled;
573 363288 }
574 1734005 bool NAV::Node::isInitialized() const
575 {
576
1/2
✓ Branch 1 taken 1733593 times.
✗ Branch 2 not taken.
1734005 std::scoped_lock lk(_stateMutex);
577 1733468 return _state == State::Initialized;
578 1733593 }
579 284491 bool NAV::Node::isTransient() const
580 {
581
1/2
✓ Branch 1 taken 284504 times.
✗ Branch 2 not taken.
284491 std::scoped_lock lk(_stateMutex);
582
1/3
✓ Branch 0 taken 284511 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
284504 switch (_state)
583 {
584 284511 case State::Disabled:
585 case State::Initialized:
586 case State::Deinitialized:
587 284511 return false;
588 case State::DoShutdown:
589 case State::Shutdown:
590 case State::Initializing:
591 case State::DoDeinitialize:
592 case State::Deinitializing:
593 case State::DoInitialize:
594 return true;
595 }
596
597 return true;
598 284504 }
599 bool NAV::Node::isOnlyRealtime() const
600 {
601 return _onlyRealTime;
602 }
603
604 357 void NAV::Node::workerThread(Node* node)
605 {
606 LOG_TRACE("{}: Worker thread started.", node->nameId());
607
608
1/2
✓ Branch 1 taken 357 times.
✗ Branch 2 not taken.
357 std::unique_lock<std::mutex> lk(node->_stateMutex);
609
4/6
✓ Branch 1 taken 285555 times.
✓ Branch 2 taken 10 times.
✓ Branch 3 taken 285565 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 285572 times.
✗ Branch 6 not taken.
571118 while ([]() { return true; }() && node->_state != State::Shutdown)
610 {
611
3/4
✓ Branch 1 taken 1059 times.
✓ Branch 2 taken 284502 times.
✓ Branch 4 taken 1059 times.
✗ Branch 5 not taken.
285572 if (lk.owns_lock()) { lk.unlock(); }
612
613
1/2
✓ Branch 1 taken 285525 times.
✗ Branch 2 not taken.
285561 if (std::unique_lock<std::mutex> lock(node->_stateMutex);
614
2/2
✓ Branch 0 taken 355 times.
✓ Branch 1 taken 285170 times.
285525 node->_state == State::DoShutdown)
615 {
616 LOG_TRACE("{}: Worker doing shutdown...", node->nameId());
617 355 node->_state = State::Shutdown;
618
1/2
✓ Branch 1 taken 355 times.
✗ Branch 2 not taken.
355 lock.unlock();
619 355 node->_workerConditionVariable.notify_all();
620 355 break;
621 285525 }
622
1/2
✓ Branch 1 taken 285199 times.
✗ Branch 2 not taken.
285191 if (std::unique_lock<std::mutex> lock(node->_stateMutex);
623
2/2
✓ Branch 0 taken 347 times.
✓ Branch 1 taken 284852 times.
285199 node->_state == State::DoInitialize)
624 {
625 LOG_TRACE("{}: Worker doing initialization...", node->nameId());
626
1/2
✓ Branch 1 taken 342 times.
✗ Branch 2 not taken.
347 lock.unlock();
627
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
342 node->workerInitializeNode();
628 LOG_TRACE("{}: Worker finished initialization, notifying all waiting threads (state = {})", node->nameId(), Node::toString(node->_state));
629 352 node->_workerConditionVariable.notify_all();
630
631
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 lk.lock();
632 352 continue;
633 285204 }
634
1/2
✓ Branch 1 taken 284845 times.
✗ Branch 2 not taken.
284834 if (std::unique_lock<std::mutex> lock(node->_stateMutex);
635
2/2
✓ Branch 0 taken 350 times.
✓ Branch 1 taken 284495 times.
284845 node->_state == State::DoDeinitialize)
636 {
637 LOG_TRACE("{}: Worker doing deinitialization...", node->nameId());
638
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 lock.unlock();
639
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 node->workerDeinitializeNode();
640 LOG_TRACE("{}: Worker finished deinitialization, notifying all waiting threads (state = {})", node->nameId(), Node::toString(node->_state));
641 350 node->_workerConditionVariable.notify_all();
642
643
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 lk.lock();
644 350 continue;
645 284845 }
646
647
2/4
✓ Branch 1 taken 284500 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 284505 times.
✗ Branch 4 not taken.
284502 if (!node->isTransient())
648 {
649 // Wait for data or state change
650 LOG_DATA("{}: Worker going to sleep", node->nameId());
651 284505 bool timeout = false;
652 {
653
1/2
✓ Branch 1 taken 284493 times.
✗ Branch 2 not taken.
284505 std::unique_lock lk(node->_workerMutex);
654
1/2
✓ Branch 1 taken 284236 times.
✗ Branch 2 not taken.
845217 timeout = !node->_workerConditionVariable.wait_for(lk, node->_workerTimeout, [node] { return node->_workerWakeup; });
655 284236 node->_workerWakeup = false;
656 284236 }
657 LOG_DATA("{}: Worker woke up", node->nameId());
658
659
7/8
✓ Branch 1 taken 284443 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 283209 times.
✓ Branch 4 taken 1234 times.
✓ Branch 5 taken 283091 times.
✓ Branch 6 taken 118 times.
✓ Branch 7 taken 283046 times.
✓ Branch 8 taken 1397 times.
284256 if (node->isInitialized() && node->callbacksEnabled)
660 {
661
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 283046 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
283046 if (timeout && node->callbacksEnabled) // Timeout reached
662 {
663 node->workerTimeoutHandler();
664 }
665
666 // Check input pin for data and trigger callbacks
667
3/4
✓ Branch 1 taken 282433 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 282250 times.
✓ Branch 4 taken 183 times.
283046 if (std::ranges::any_of(node->inputPins, [](const InputPin& inputPin) {
668 282075 return inputPin.isPinLinked();
669 }))
670 {
671
2/4
✓ Branch 1 taken 763594 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 763663 times.
✗ Branch 4 not taken.
763350 while (node->isInitialized())
672 {
673 // -------------------------- Data processing on input non-flow pins -----------------------------
674 763663 bool notifyTriggered = false;
675
2/2
✓ Branch 1 taken 2237512 times.
✓ Branch 2 taken 764107 times.
3002390 for (size_t i = 0; i < node->inputPins.size(); i++)
676 {
677 2237512 auto& inputPin = node->inputPins[i];
678
4/6
✓ Branch 1 taken 4443 times.
✓ Branch 2 taken 2234284 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 4443 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 2238727 times.
2238686 if (inputPin.type != Pin::Type::Flow && !inputPin.queue.empty())
679 {
680 if (auto callback = std::get<InputPin::DataChangedNotifyFunc>(inputPin.callback))
681 {
682 LOG_DATA("{}: Invoking notify callback on input pin '{}'", node->nameId(), inputPin.name);
683 InsTime insTime = inputPin.queue.extract_front()->insTime;
684 #ifdef TESTING
685 for (const auto& watcherCallback : inputPin.watcherCallbacks)
686 {
687 if (auto watcherCall = std::get<InputPin::DataChangedWatcherNotifyFunc>(watcherCallback))
688 {
689 std::invoke(watcherCall, node, insTime, i);
690 }
691 }
692 #endif
693 std::invoke(callback, node, insTime, i);
694 notifyTriggered = true;
695 }
696 }
697 }
698
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 764107 times.
764107 if (notifyTriggered) { continue; }
699
700 // ------------------------------ Process data on input flow pins --------------------------------
701
3/6
✗ Branch 0 not taken.
✓ Branch 1 taken 764107 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 764044 times.
✓ Branch 6 taken 63 times.
764107 if (node->callbacksEnabled || node->_mode == Node::Mode::REAL_TIME)
702 {
703 LOG_DATA("{}: Checking for firable input pins", node->nameId());
704
705
2/2
✓ Branch 1 taken 763299 times.
✓ Branch 2 taken 25 times.
764044 if (node->_mode == Mode::POST_PROCESSING)
706 {
707 // Check if all input flow pins have data
708 763299 bool allInputPinsHaveData = !node->inputPins.empty();
709
2/2
✓ Branch 5 taken 2089027 times.
✓ Branch 6 taken 481388 times.
2572205 for (const auto& inputPin : node->inputPins)
710 {
711
9/10
✓ Branch 1 taken 2085604 times.
✓ Branch 2 taken 5483 times.
✓ Branch 3 taken 2085916 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2008867 times.
✓ Branch 6 taken 77049 times.
✓ Branch 8 taken 518188 times.
✓ Branch 9 taken 1491217 times.
✓ Branch 10 taken 518189 times.
✓ Branch 11 taken 1573436 times.
2089893 if (inputPin.type == Pin::Type::Flow && inputPin.neededForTemporalQueueCheck && !inputPin.queueBlocked && inputPin.queue.empty())
712 {
713
1/2
✓ Branch 1 taken 518172 times.
✗ Branch 2 not taken.
518189 if (auto* connectedPin = inputPin.link.getConnectedPin();
714
6/6
✓ Branch 0 taken 517975 times.
✓ Branch 1 taken 197 times.
✓ Branch 3 taken 282784 times.
✓ Branch 4 taken 235155 times.
✓ Branch 5 taken 282785 times.
✓ Branch 6 taken 235351 times.
518172 connectedPin && !connectedPin->noMoreDataAvailable)
715 {
716 282785 allInputPinsHaveData = false;
717 282785 break;
718 }
719 }
720 }
721
2/2
✓ Branch 0 taken 282782 times.
✓ Branch 1 taken 481391 times.
764173 if (!allInputPinsHaveData)
722 {
723 LOG_DATA("{}: Not all pins have data for temporal sorting", node->nameId());
724 282969 break;
725 }
726 LOG_DATA("{}: All pins have data for temporal sorting", node->nameId());
727 }
728
729 // Find pin with the earliest data
730 481416 InsTime earliestTime;
731 481416 size_t earliestInputPinIdx = 0;
732 481416 int earliestInputPinPriority = -1000;
733
2/2
✓ Branch 1 taken 1453646 times.
✓ Branch 2 taken 481427 times.
1935448 for (size_t i = 0; i < node->inputPins.size(); i++)
734 {
735 1453646 auto& inputPin = node->inputPins[i];
736
2/2
✓ Branch 2 taken 1217812 times.
✓ Branch 3 taken 232194 times.
2904399 if (inputPin.type == Pin::Type::Flow && !inputPin.queue.empty()
737
6/6
✓ Branch 0 taken 1450106 times.
✓ Branch 1 taken 4068 times.
✓ Branch 3 taken 736874 times.
✓ Branch 4 taken 480805 times.
✓ Branch 5 taken 710095 times.
✓ Branch 6 taken 744249 times.
3641457 && (earliestTime.empty()
738
2/2
✓ Branch 3 taken 507720 times.
✓ Branch 4 taken 228923 times.
736874 || inputPin.queue.front()->insTime < earliestTime
739
4/4
✓ Branch 3 taken 415804 times.
✓ Branch 4 taken 92550 times.
✓ Branch 5 taken 442 times.
✓ Branch 6 taken 415362 times.
507720 || (inputPin.queue.front()->insTime == earliestTime && inputPin.priority > earliestInputPinPriority)))
740 {
741 710095 earliestTime = inputPin.queue.front()->insTime;
742 709783 earliestInputPinIdx = i;
743 709783 earliestInputPinPriority = inputPin.priority;
744 }
745 }
746
2/2
✓ Branch 0 taken 188 times.
✓ Branch 1 taken 481239 times.
481427 if (earliestInputPinPriority == -1000) { break; }
747
748 481239 auto& inputPin = node->inputPins[earliestInputPinIdx];
749
5/8
✓ Branch 0 taken 481226 times.
✓ Branch 1 taken 2 times.
✓ Branch 3 taken 481199 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 481201 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 481202 times.
✗ Branch 8 not taken.
481228 if (inputPin.firable && inputPin.firable(node, inputPin))
750 {
751
2/4
✓ Branch 1 taken 480942 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 480955 times.
✗ Branch 4 not taken.
481202 if (auto callback = std::get<InputPin::FlowFirableCallbackFunc>(inputPin.callback))
752 {
753 LOG_DATA("{}: Invoking callback on input pin '{}'", node->nameId(), inputPin.name);
754 #ifdef TESTING
755
2/2
✓ Branch 5 taken 63644 times.
✓ Branch 6 taken 480670 times.
544603 for (const auto& watcherCallback : inputPin.watcherCallbacks)
756 {
757
3/6
✓ Branch 1 taken 63644 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 63647 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 63647 times.
✗ Branch 8 not taken.
63643 if (auto watcherCall = std::get<InputPin::FlowFirableWatcherCallbackFunc>(watcherCallback))
758 {
759
1/2
✓ Branch 1 taken 63649 times.
✗ Branch 2 not taken.
63647 std::invoke(watcherCall, node, inputPin.queue, earliestInputPinIdx);
760 63649 }
761 }
762 #endif
763
1/2
✓ Branch 1 taken 481223 times.
✗ Branch 2 not taken.
480670 std::invoke(callback, node, inputPin.queue, earliestInputPinIdx);
764 }
765 }
766 else if (inputPin.dropQueueIfNotFirable)
767 {
768 LOG_DATA("{}: Dropping message on input pin '{}'", node->nameId(), inputPin.name);
769 inputPin.queue.pop_front();
770 }
771 else
772 {
773 LOG_DATA("{}: Skipping message on input pin '{}'", node->nameId(), inputPin.name);
774 break; // Do not drop an item, but put the worker to sleep
775 }
776 }
777 else
778 {
779 63 break;
780 }
781 }
782 }
783
784 // Post-processing (FileReader/Simulator)
785
2/2
✓ Branch 1 taken 125 times.
✓ Branch 2 taken 283008 times.
283146 if (!node->pollEvents.empty())
786 {
787 125 std::multimap<InsTime, std::pair<OutputPin*, size_t>>::iterator it;
788
8/10
✓ Branch 3 taken 126977 times.
✓ Branch 4 taken 7 times.
✓ Branch 6 taken 126988 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 126989 times.
✗ Branch 9 not taken.
✓ Branch 10 taken 126986 times.
✓ Branch 11 taken 3 times.
✓ Branch 12 taken 126985 times.
✓ Branch 13 taken 10 times.
127002 while (it = node->pollEvents.begin(), it != node->pollEvents.end() && node->isInitialized() && node->callbacksEnabled)
789 {
790 126985 OutputPin* outputPin = it->second.first;
791 126977 size_t outputPinIdx = it->second.second;
792 126984 Node* node = outputPin->parentNode;
793
794
2/2
✓ Branch 1 taken 60772 times.
✓ Branch 2 taken 66212 times.
126984 if (std::holds_alternative<OutputPin::PollDataFunc>(outputPin->data))
795 {
796 60772 auto* callback = std::get_if<OutputPin::PollDataFunc>(&outputPin->data);
797
2/4
✓ Branch 0 taken 60763 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 60763 times.
✗ Branch 3 not taken.
60762 if (callback != nullptr && *callback != nullptr)
798 {
799 LOG_DATA("{}: Polling data from output pin '{}'", node->nameId(), str::replaceAll_copy(outputPin->name, "\n", ""));
800
4/6
✗ Branch 0 not taken.
✓ Branch 1 taken 60763 times.
✓ Branch 3 taken 60763 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 116 times.
✓ Branch 8 taken 60656 times.
60763 if ((node->**callback)() == nullptr)
801 {
802
1/2
✓ Branch 1 taken 116 times.
✗ Branch 2 not taken.
116 node->pollEvents.erase(it); // Delete the event if no more data on this pin
803 116 break;
804 }
805 }
806 }
807
1/2
✓ Branch 1 taken 66222 times.
✗ Branch 2 not taken.
66212 else if (std::holds_alternative<OutputPin::PeekPollDataFunc>(outputPin->data))
808 {
809 66222 auto* callback = std::get_if<OutputPin::PeekPollDataFunc>(&outputPin->data);
810
3/4
✓ Branch 0 taken 66217 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 66216 times.
✓ Branch 3 taken 1 times.
66217 if (callback != nullptr && *callback != nullptr)
811 {
812
2/2
✓ Branch 2 taken 66219 times.
✓ Branch 3 taken 1 times.
66216 if (!it->first.empty())
813 {
814 LOG_DATA("{}: Polling data from output pin '{}'", node->nameId(), str::replaceAll_copy(outputPin->name, "\n", ""));
815 // Trigger the already peeked observation and invoke it's callbacks (peek = false)
816
3/6
✗ Branch 0 not taken.
✓ Branch 1 taken 66219 times.
✓ Branch 3 taken 66227 times.
✗ Branch 4 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 66230 times.
66219 if ((node->**callback)(outputPinIdx, false) == nullptr)
817 {
818 LOG_ERROR("{}: {} could not poll its observation despite being able to peek it.", node->nameId(), outputPin->name);
819 }
820 }
821
822 // Check if data available (peek = true)
823
4/6
✗ Branch 0 not taken.
✓ Branch 1 taken 66231 times.
✓ Branch 3 taken 66226 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 66207 times.
✓ Branch 7 taken 16 times.
66231 if (auto obs = (node->**callback)(outputPinIdx, true))
824 {
825 // Check if data has a time
826
1/2
✓ Branch 2 taken 66205 times.
✗ Branch 3 not taken.
66207 if (!obs->insTime.empty())
827 {
828
1/2
✓ Branch 4 taken 66211 times.
✗ Branch 5 not taken.
66205 node->pollEvents.insert(std::make_pair(obs->insTime, std::make_pair(outputPin, outputPinIdx)));
829 }
830 else // If no time, call the object and remove it
831 {
832 (node->**callback)(outputPinIdx, false);
833 continue; // Do not erase the iterator, because this pin needs to be called again
834 }
835 }
836 else // nullptr -> no more data incoming on this pin
837 {
838 LOG_TRACE("{}: Output Pin finished: {}", node->nameId(), outputPin->name);
839 16 outputPin->noMoreDataAvailable = true;
840
2/2
✓ Branch 5 taken 25 times.
✓ Branch 6 taken 18 times.
43 for (auto& link : outputPin->links)
841 {
842
1/2
✓ Branch 1 taken 25 times.
✗ Branch 2 not taken.
25 link.connectedNode->wakeWorker();
843 }
844 66229 }
845 66222 }
846 else
847 {
848
0/8
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
1 LOG_ERROR("{} - {}: Callback is not valid anymore", node->nameId(), size_t(outputPin->id));
849 }
850
1/2
✓ Branch 1 taken 66222 times.
✗ Branch 2 not taken.
66222 node->pollEvents.erase(it);
851 }
852 }
853
854
1/2
✓ Branch 1 taken 125 times.
✗ Branch 2 not taken.
126 if (node->pollEvents.empty())
855 {
856 LOG_TRACE("{}: Finished polling all pins.", node->nameId());
857
858 125 node->callbacksEnabled = false;
859
2/2
✓ Branch 5 taken 141 times.
✓ Branch 6 taken 125 times.
266 for (auto& outputPin : node->outputPins)
860 {
861
2/2
✓ Branch 1 taken 116 times.
✓ Branch 2 taken 25 times.
141 if (!outputPin.noMoreDataAvailable)
862 {
863 LOG_TRACE("{}: Output Pin finished: {}", node->nameId(), outputPin.name);
864 116 outputPin.noMoreDataAvailable = true;
865
2/2
✓ Branch 5 taken 122 times.
✓ Branch 6 taken 116 times.
238 for (auto& link : outputPin.links)
866 {
867
1/2
✓ Branch 1 taken 122 times.
✗ Branch 2 not taken.
122 link.connectedNode->wakeWorker();
868 }
869 }
870 }
871 125 node->_mode = Node::Mode::REAL_TIME;
872
1/2
✓ Branch 1 taken 125 times.
✗ Branch 2 not taken.
125 FlowExecutor::deregisterNode(node);
873 }
874 }
875 }
876
877 // Check if node finished
878
2/2
✓ Branch 1 taken 282983 times.
✓ Branch 2 taken 1491 times.
284530 if (node->_mode == Mode::POST_PROCESSING)
879 {
880
3/4
✓ Branch 1 taken 283028 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 220 times.
✓ Branch 4 taken 282808 times.
282983 if (std::ranges::all_of(node->inputPins, [](const InputPin& inputPin) {
881
6/8
✓ Branch 2 taken 361113 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 361112 times.
✓ Branch 5 taken 1 times.
✓ Branch 7 taken 361163 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 283083 times.
✓ Branch 10 taken 78080 times.
1083399 return inputPin.type != Pin::Type::Flow || !inputPin.isPinLinked() || inputPin.link.connectedNode->isDisabled()
882
2/4
✓ Branch 1 taken 283081 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 283072 times.
✗ Branch 5 not taken.
283083 || !inputPin.link.getConnectedPin()->blocksConnectedNodeFromFinishing
883
7/8
✓ Branch 0 taken 361116 times.
✓ Branch 1 taken 7 times.
✓ Branch 3 taken 168201 times.
✓ Branch 4 taken 114864 times.
✓ Branch 6 taken 168233 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 283 times.
✓ Branch 10 taken 167949 times.
1083460 || (inputPin.queue.empty() && inputPin.link.getConnectedPin()->noMoreDataAvailable);
884 }))
885 {
886 LOG_TRACE("{}: Node finished", node->nameId());
887 220 node->callbacksEnabled = false;
888
2/2
✓ Branch 5 taken 130 times.
✓ Branch 6 taken 220 times.
350 for (auto& outputPin : node->outputPins)
889 {
890 LOG_TRACE("{}: Output Pin finished: {}", node->nameId(), outputPin.name);
891 130 outputPin.noMoreDataAvailable = true;
892
2/2
✓ Branch 5 taken 129 times.
✓ Branch 6 taken 130 times.
259 for (auto& link : outputPin.links)
893 {
894
1/2
✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
129 link.connectedNode->wakeWorker();
895 }
896 }
897 220 node->_mode = Node::Mode::REAL_TIME;
898
1/2
✓ Branch 1 taken 212 times.
✗ Branch 2 not taken.
220 FlowExecutor::deregisterNode(node);
899 }
900 }
901 }
902 }
903
904 LOG_TRACE("{}: Worker thread ended.", node->nameId());
905 343 }
906
907 336 bool NAV::Node::workerInitializeNode()
908 {
909 LOG_TRACE("{}: called", nameId());
910 {
911
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
336 std::scoped_lock lk(_stateMutex);
912
6/14
✓ Branch 0 taken 343 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 352 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 348 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 348 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 341 times.
✓ Branch 14 taken 1 times.
✗ Branch 16 not taken.
✗ Branch 17 not taken.
✗ Branch 19 not taken.
✗ Branch 20 not taken.
694 INS_ASSERT_USER_ERROR(_state == State::DoInitialize, fmt::format("Worker can only initialize the node if the state is set to DoInitialize, but it is {}.", toString(_state)).c_str());
913 348 _state = Node::State::Initializing;
914 348 }
915 346 _mode = Node::Mode::REAL_TIME;
916
917
3/6
✓ Branch 1 taken 342 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 349 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 352 times.
✗ Branch 9 not taken.
346 LOG_DEBUG("{}: Initializing Node", nameId());
918
919 // Initialize Nodes connected to the input pins
920
2/2
✓ Branch 5 taken 300 times.
✓ Branch 6 taken 352 times.
652 for (const auto& inputPin : inputPins)
921 {
922
2/2
✓ Branch 1 taken 30 times.
✓ Branch 2 taken 270 times.
300 if (inputPin.type != Pin::Type::Flow)
923 {
924
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 8 times.
30 if (Node* connectedNode = inputPin.link.connectedNode)
925 {
926
3/4
✓ Branch 1 taken 22 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 13 times.
✓ Branch 4 taken 9 times.
22 if (!connectedNode->isInitialized())
927 {
928
5/10
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 13 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 13 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 13 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 13 times.
✗ Branch 15 not taken.
13 LOG_DEBUG("{}: Initializing connected Node '{}' on input Pin {}", nameId(), connectedNode->nameId(), size_t(inputPin.id));
929
2/4
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 13 times.
13 if (!connectedNode->doInitialize(true))
930 {
931 LOG_ERROR("{}: Could not initialize connected node {}", nameId(), connectedNode->nameId());
932 std::scoped_lock lk(_stateMutex);
933 if (_state == State::Initializing)
934 {
935 _state = Node::State::Deinitialized;
936 }
937 return false;
938 }
939 }
940 }
941 }
942 }
943
944 352 _reinitialize = false;
945
946 // Initialize the node itself
947 LOG_TRACE("{}: calling initialize()", nameId());
948 352 bool initSucceeded = false;
949 {
950
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 std::scoped_lock<std::mutex> guard(_configWindowMutex);
951
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 initSucceeded = initialize();
952 352 }
953
2/2
✓ Branch 0 taken 351 times.
✓ Branch 1 taken 1 times.
352 if (initSucceeded)
954 {
955 LOG_TRACE("{}: initialize() was successful", nameId());
956
957
2/2
✓ Branch 4 taken 299 times.
✓ Branch 5 taken 349 times.
650 for (auto& inputPin : inputPins)
958 {
959 299 inputPin.queue.clear();
960 300 inputPin.queueBlocked = false;
961 }
962
963 349 pollEvents.clear();
964 LOG_TRACE("{}: calling resetNode()", nameId());
965 {
966
1/2
✓ Branch 1 taken 351 times.
✗ Branch 2 not taken.
350 std::scoped_lock<std::mutex> guard(_configWindowMutex);
967
1/2
✓ Branch 1 taken 351 times.
✗ Branch 2 not taken.
351 resetNode();
968 351 }
969 LOG_TRACE("{}: resetNode() was successful", nameId());
970
2/2
✓ Branch 5 taken 291 times.
✓ Branch 6 taken 350 times.
642 for (auto& outputPin : outputPins)
971 {
972 291 outputPin.noMoreDataAvailable = true;
973
2/2
✓ Branch 5 taken 287 times.
✓ Branch 6 taken 292 times.
579 for (auto& link : outputPin.links)
974 {
975 LOG_TRACE("{}: Waking connected node '{}'", nameId(), link.connectedNode->nameId());
976
1/2
✓ Branch 1 taken 287 times.
✗ Branch 2 not taken.
287 link.connectedNode->wakeWorker();
977 }
978 }
979
980
1/2
✓ Branch 1 taken 351 times.
✗ Branch 2 not taken.
350 std::scoped_lock lk(_stateMutex);
981
1/2
✓ Branch 0 taken 351 times.
✗ Branch 1 not taken.
351 if (_state == State::Initializing)
982 {
983 351 _state = Node::State::Initialized;
984 }
985 351 return true;
986 351 }
987
988 LOG_TRACE("{}: initialize() failed", nameId());
989
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 std::scoped_lock lk(_stateMutex);
990
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (_state == State::Initializing)
991 {
992 1 _state = Node::State::Deinitialized;
993 }
994 1 return false;
995 1 }
996
997 350 bool NAV::Node::workerDeinitializeNode()
998 {
999 LOG_TRACE("{}: called", nameId());
1000 {
1001
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 std::scoped_lock lk(_stateMutex);
1002
5/14
✓ Branch 0 taken 350 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 350 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 350 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 350 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 350 times.
✗ Branch 14 not taken.
✗ Branch 16 not taken.
✗ Branch 17 not taken.
✗ Branch 19 not taken.
✗ Branch 20 not taken.
700 INS_ASSERT_USER_ERROR(_state == State::DoDeinitialize, fmt::format("Worker can only deinitialize the node if the state is set to DoDeinitialize, but it is {}.", toString(_state)).c_str());
1003 {
1004 350 _state = Node::State::Deinitializing;
1005 }
1006 350 }
1007
3/6
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 350 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 350 times.
✗ Branch 9 not taken.
350 LOG_DEBUG("{}: Deinitializing Node", nameId());
1008
1009 350 callbacksEnabled = false;
1010
1011 // Re-/Deinitialize Nodes connected to the output pins
1012
2/2
✓ Branch 5 taken 292 times.
✓ Branch 6 taken 350 times.
642 for (const auto& outputPin : outputPins)
1013 {
1014
2/2
✓ Branch 1 taken 68 times.
✓ Branch 2 taken 224 times.
292 if (outputPin.type != Pin::Type::Flow)
1015 {
1016
2/2
✓ Branch 5 taken 7 times.
✓ Branch 6 taken 68 times.
75 for (const auto& link : outputPin.links)
1017 {
1018
3/4
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
✓ Branch 4 taken 4 times.
7 if (link.connectedNode->isInitialized())
1019 {
1020
6/12
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 3 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 3 times.
✓ Branch 12 taken 3 times.
✗ Branch 13 not taken.
✓ Branch 16 taken 3 times.
✗ Branch 17 not taken.
3 LOG_DEBUG("{}: {} connected Node '{}' on output Pin {}", nameId(),
1021 _reinitialize ? "Reinitializing" : "Deinitializing", link.connectedNode->nameId(), size_t(outputPin.id));
1022
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
3 if (_reinitialize) { link.connectedNode->doReinitialize(); }
1023
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 else { link.connectedNode->doDeinitialize(); }
1024 }
1025 }
1026 }
1027 }
1028
1029 // Deinitialize the node itself
1030 {
1031
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 std::scoped_lock<std::mutex> guard(_configWindowMutex);
1032
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 deinitialize();
1033 350 }
1034
1035
1/2
✓ Branch 1 taken 350 times.
✗ Branch 2 not taken.
350 std::scoped_lock lk(_stateMutex);
1036
1/2
✓ Branch 0 taken 350 times.
✗ Branch 1 not taken.
350 if (_state == State::Deinitializing)
1037 {
1038
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 350 times.
350 if (_disable)
1039 {
1040 _state = State::Disabled;
1041 }
1042
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 350 times.
350 else if (_reinitialize)
1043 {
1044 _state = State::DoInitialize;
1045 }
1046 else
1047 {
1048 350 _state = State::Deinitialized;
1049 }
1050 }
1051
1052 350 return true;
1053 350 }
1054
1055 void NAV::Node::workerTimeoutHandler()
1056 {
1057 LOG_TRACE("{}: called", nameId());
1058 }
1059
1060 void NAV::to_json(json& j, const Node& node)
1061 {
1062 ImVec2 realSize = ed::GetNodeSize(node.id);
1063 realSize.x -= 16;
1064 realSize.y -= 38.0F;
1065 j = json{
1066 { "id", size_t(node.id) },
1067 { "type", node.type() },
1068 { "kind", std::string(node.kind) },
1069 { "name", node.name },
1070 { "size", node._size.x == 0 && node._size.y == 0 ? node._size : realSize },
1071 { "pos", ed::GetNodePosition(node.id) },
1072 { "enabled", !node.isDisabled() },
1073 { "inputPins", node.inputPins },
1074 { "outputPins", node.outputPins },
1075 };
1076 }
1077 714 void NAV::from_json(const json& j, Node& node)
1078 {
1079
3/6
✓ Branch 1 taken 714 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 714 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 714 times.
✗ Branch 8 not taken.
714 node.id = j.at("id").get<size_t>();
1080
1/2
✓ Branch 1 taken 714 times.
✗ Branch 2 not taken.
714 if (j.contains("kind"))
1081 {
1082
3/6
✓ Branch 1 taken 714 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 714 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 714 times.
✗ Branch 8 not taken.
714 node.kind = Node::Kind(j.at("kind").get<std::string>());
1083 }
1084
1/2
✓ Branch 1 taken 714 times.
✗ Branch 2 not taken.
714 if (j.contains("name"))
1085 {
1086 714 j.at("name").get_to(node.name);
1087 }
1088
1/2
✓ Branch 1 taken 714 times.
✗ Branch 2 not taken.
714 if (j.contains("size"))
1089 {
1090 714 j.at("size").get_to(node._size);
1091
6/8
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 706 times.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 8 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 8 times.
✓ Branch 9 taken 706 times.
714 if (node.kind == Node::Kind::GroupBox && gui::NodeEditorApplication::isUsingBigDefaultFont())
1092 {
1093 8 node._size.y -= 20;
1094 }
1095 }
1096
1/2
✓ Branch 1 taken 714 times.
✗ Branch 2 not taken.
714 if (j.contains("enabled"))
1097 {
1098 714 bool enabled = j.at("enabled").get<bool>();
1099
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 712 times.
714 if (!enabled)
1100 {
1101
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 std::scoped_lock lk(node._stateMutex);
1102 2 node._state = Node::State::Disabled;
1103 2 }
1104 }
1105
1106
1/2
✓ Branch 1 taken 714 times.
✗ Branch 2 not taken.
714 if (j.contains("inputPins"))
1107 {
1108
2/4
✓ Branch 1 taken 714 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 714 times.
✗ Branch 5 not taken.
714 auto inputPins = j.at("inputPins").get<std::vector<InputPin>>();
1109
2/2
✓ Branch 1 taken 566 times.
✓ Branch 2 taken 664 times.
1230 for (size_t i = 0; i < inputPins.size(); ++i)
1110 {
1111
2/2
✓ Branch 1 taken 50 times.
✓ Branch 2 taken 516 times.
566 if (node.inputPins.size() <= i)
1112 {
1113 50 break;
1114 }
1115
4/8
✓ Branch 1 taken 516 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 516 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 516 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 516 times.
✗ Branch 11 not taken.
516 j.at("inputPins").at(i).get_to(node.inputPins.at(i));
1116 }
1117 714 }
1118
1119
1/2
✓ Branch 1 taken 714 times.
✗ Branch 2 not taken.
714 if (j.contains("outputPins"))
1120 {
1121
2/4
✓ Branch 1 taken 714 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 714 times.
✗ Branch 5 not taken.
714 auto outputPins = j.at("outputPins").get<std::vector<OutputPin>>();
1122
2/2
✓ Branch 1 taken 571 times.
✓ Branch 2 taken 710 times.
1281 for (size_t i = 0; i < outputPins.size(); ++i)
1123 {
1124
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 567 times.
571 if (node.outputPins.size() <= i)
1125 {
1126 4 break;
1127 }
1128
4/8
✓ Branch 1 taken 567 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 567 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 567 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 567 times.
✗ Branch 11 not taken.
567 j.at("outputPins").at(i).get_to(node.outputPins.at(i));
1129 }
1130 714 }
1131 714 }
1132