2#include <Dataflow/Core/DataflowGraph.hpp>
4#include <Dataflow/Core/Port.hpp>
9#include <Core/Utils/Log.hpp>
16RA_SINGLETON_IMPLEMENTATION( PortFactory );
18using namespace Ra::Core::Utils;
23 Node( instanceName, typeName ) {}
28 std::for_each( m_nodes_by_level.begin(), m_nodes_by_level.end(), [](
const auto& level ) {
29 std::for_each( level.begin(), level.end(), []( auto node ) {
30 if ( !node->is_initialized() ) { node->init(); }
36bool DataflowGraph::execute() {
37 if ( m_inputs.size() > 0 || m_outputs.size() > 0 )
return true;
40 if ( !compile() ) {
return false; }
44 m_nodes_by_level.begin(), m_nodes_by_level.end(), [&result](
const auto& level ) {
45 std::for_each( level.begin(), level.end(), [&result]( auto node ) {
46 bool executed = node->execute();
48 LOG( logERROR ) <<
"Execution failed with node " << node->instance_name()
49 <<
" (" << node->model_name() <<
").";
51 result = result && executed;
36bool DataflowGraph::execute() {
…}
57void DataflowGraph::destroy() {
59 m_nodes_by_level.begin(), m_nodes_by_level.end(), [](
auto& level ) { level.clear(); } );
60 m_nodes_by_level.clear();
57void DataflowGraph::destroy() {
…}
66void DataflowGraph::saveToJson(
const std::string& jsonFilePath ) {
67 if ( !jsonFilePath.
empty() ) {
72 m_should_save =
false;
66void DataflowGraph::saveToJson(
const std::string& jsonFilePath ) {
…}
76void DataflowGraph::toJsonInternal( nlohmann::json& data )
const {
77 nlohmann::json nodes = nlohmann::json::array();
78 nlohmann::json connections = nlohmann::json::array();
81 for (
const auto& n : m_nodes ) {
82 nlohmann::json nodeData;
83 n->toJson( nodeData );
84 nodes.push_back( nodeData );
86 if ( n != m_input_node ) {
87 for (
const auto& input : n->inputs() ) {
88 if ( input->is_linked() ) {
89 nlohmann::json link = nlohmann::json::object();
90 auto portOut = input->link();
91 auto nodeOut = portOut->node();
92 if (
auto casted =
dynamic_cast<GraphOutputNode*
>( nodeOut ); casted ) {
93 nodeOut = casted->graph();
96 link[
"out_node"] = nodeOut->instance_name();
97 link[
"out_port"] = portOut->name();
98 link[
"in_node"] = n->instance_name();
99 link[
"in_port"] = input->name();
100 connections.push_back( link );
107 graph[
"nodes"] = nodes;
108 graph[
"connections"] = connections;
110 data.emplace(
"graph", graph );
76void DataflowGraph::toJsonInternal( nlohmann::json& data )
const {
…}
113std::optional<nlohmann::json> read_json(
const std::string& jsonFilePath ) {
115 nlohmann::json j = nlohmann::json::parse( f,
nullptr,
false );
117 if ( j.is_discarded() ) {
118 LOG( logERROR ) << jsonFilePath <<
" is not a valid json file !!";
124bool DataflowGraph::loadFromJson(
const std::string& jsonFilePath ) {
125 auto j = read_json( jsonFilePath );
126 if ( !j )
return false;
128 m_should_save =
false;
129 return fromJson( *j );
134 const nlohmann::json& linkData,
139 auto itNode = nodeByName.find( linkData[field] );
140 if ( itNode != nodeByName.end() ) { node = itNode->second; }
144 std::string( linkData[field] ) +
" not found in cache " +
" : " +
146 return {
nullptr, msg };
152 field = which +
"_port";
153 if ( linkData.contains( field ) ) {
154 auto p = node->port_by_name( which, linkData[field] ).second;
155 if ( p !=
nullptr ) { port = p->name(); }
156 else { err = linkData[field]; }
159 field = which +
"_index";
160 if ( linkData.contains( field ) ) {
161 auto p = node->port_by_index( which, Node::PortIndex {
int { linkData[field] } } );
162 if ( p !=
nullptr ) { port = p->name(); }
166 if ( port.
empty() ) {
168 node->instance_name() +
" : " + linkData.dump();
169 return {
nullptr, msg };
171 return { node, port };
174bool DataflowGraph::fromJsonInternal(
const nlohmann::json& data ) {
175 if ( data.contains(
"graph" ) ) {
179 auto factories = NodeFactoriesManager::factory_manager();
182 if (
auto nodes_itr = data[
"graph"].find(
"nodes" ); nodes_itr != data[
"graph"].end() ) {
183 auto nodes = *nodes_itr;
184 for (
auto& n : nodes ) {
185 if ( !n[
"model"].contains(
"name" ) ) {
186 LOG( logERROR ) <<
"Found a node without model description." << n.dump()
187 <<
"Unable to build an instance.";
193 if (
auto instance_itr = n.find(
"instance" ); instance_itr != n.end() ) {
194 instanceName = *instance_itr;
198 <<
"Found a node of type " << nodeTypeName <<
" without identification ";
202 auto newNode = factories.create_node( nodeTypeName, n,
this );
204 if ( !instanceName.
empty() ) {
205 auto [it, inserted] = nodeByName.
insert( { instanceName, newNode } );
207 LOG( logERROR ) <<
"DataflowGraph::loadFromJson : duplicated node name "
212 if ( nodeTypeName == GraphInputNode::node_typename() ) {
214 m_input_node->set_graph(
this );
216 if ( nodeTypeName == GraphOutputNode::node_typename() ) {
218 m_output_node->set_graph(
this );
222 LOG( logERROR ) <<
"Unable to create the node " << nodeTypeName;
227 if (
auto links_itr = data[
"graph"].find(
"connections" );
228 links_itr != data[
"graph"].end() ) {
229 auto links = *links_itr;
230 for (
auto& l : links ) {
231 auto [nodeFrom, fromOutput] = getLinkInfo(
"out", l, nodeByName );
232 if ( nodeFrom ==
nullptr ) {
233 LOG( logERROR ) <<
"DataflowGraph::loadFromJson: error when parsing JSON."
234 <<
" Could not find the link source (" << fromOutput
235 <<
"). Link not added.";
238 auto [nodeTo, toInput] = getLinkInfo(
"in", l, nodeByName );
239 if ( nodeTo ==
nullptr ) {
241 <<
"DataflowGraph::loadFromJson: error when parsing JSON."
242 <<
" Could not find the link target (" << toInput <<
"). Link not added.";
245 if ( !add_link( nodeFrom, fromOutput, nodeTo, toInput ) ) {
247 <<
"DataflowGraph::loadFromJson: error when parsing JSON"
248 <<
": Could not add a link (missing or wrong information, please refer to "
249 "the previous error messages). Link not added.";
174bool DataflowGraph::fromJsonInternal(
const nlohmann::json& data ) {
…}
261 if ( !has_node_by_name( newNode->instance_name(), newNode->model_name() ) ) {
262 m_nodes.emplace_back(
std::move( newNode ) );
266 else {
return false; }
271 if ( m_nodesAndLinksProtected ) {
return false; }
273 if (
auto itr =
std::find( m_nodes.begin(), m_nodes.end(), node ); itr != m_nodes.end() ) {
274 m_nodes.erase( itr );
281bool DataflowGraph::are_ports_compatible(
const Node* nodeFrom,
282 const PortBaseOut* portOut,
284 const PortBaseIn* portIn ) {
286 if ( !( portIn->
type() == portOut->
type() ) ) {
287 Log::link_type_mismatch( nodeFrom, portOut, nodeTo, portIn );
292 if ( portIn->is_linked() ) {
293 Log::already_linked( nodeTo, portIn );
300 LOG( logERROR ) <<
"DataflowGraph::add_link Unable to find " << type <<
"input port " << name
301 <<
" from destination node " << node->instance_name() <<
" ("
302 << node->model_name() <<
")";
309 if ( !are_nodes_valids( nodeFrom.
get(), nodeTo.
get(),
true ) ) {
return false; }
311 auto [inputIdx, inputPort] = nodeTo->input_by_name( nodeToInputName );
313 nodeNotFoundMessage(
"input", nodeToInputName, nodeTo.
get() );
316 auto [outputIdx, outputPort] = nodeFrom->output_by_name( nodeFromOutputName );
318 nodeNotFoundMessage(
"output", nodeFromOutputName, nodeFrom.
get() );
322 return add_link( outputPort, inputPort );
326 Node::PortIndex portOutIdx,
328 Node::PortIndex portInIdx ) {
330 if ( !are_nodes_valids( nodeFrom.
get(), nodeTo.
get(),
true ) ) {
return false; }
331 if ( check_last_port_io_nodes( nodeFrom.
get(), portOutIdx, nodeTo.
get(), portInIdx ) ) {
332 if ( m_input_node && nodeFrom == m_input_node &&
333 portOutIdx == m_input_node->outputs().size() ) {
334 auto portIn = nodeTo->input_by_index( portInIdx );
336 Log::bad_port_index(
"input", nodeTo->model_name(), portInIdx );
339 auto idx = m_input_node->add_output_port( portIn );
340 return idx.isValid();
342 if ( nodeTo && nodeTo == m_output_node && portInIdx == m_output_node->inputs().size() ) {
343 auto portOut = nodeFrom->output_by_index( portOutIdx );
345 Log::bad_port_index(
"output", nodeFrom->model_name(), portOutIdx );
348 auto idx = m_output_node->add_input_port( portOut );
349 return idx.isValid();
353 auto portOut = nodeFrom->output_by_index( portOutIdx );
354 auto portIn = nodeTo->input_by_index( portInIdx );
357 Log::bad_port_index(
"output", nodeFrom->model_name(), portOutIdx );
361 Log::bad_port_index(
"input", nodeTo->model_name(), portInIdx );
365 return add_link( portOut, portIn );
368bool DataflowGraph::add_link( Node::PortBaseOutRawPtr outputPort,
369 Node::PortBaseInRawPtr inputPort ) {
370 auto nodeFrom = outputPort->node();
371 auto nodeTo = inputPort->node();
372 if ( !are_nodes_valids( nodeFrom, nodeTo ) ) {
return false; }
374 if ( !are_ports_compatible( nodeFrom, outputPort, nodeTo, inputPort ) ) {
return false; }
376 inputPort->connect( outputPort );
368bool DataflowGraph::add_link( Node::PortBaseOutRawPtr outputPort, {
…}
383 auto [idx, port] = node->input_by_name( nodeInputName );
384 return remove_link( node, idx );
389 if ( m_nodesAndLinksProtected ) {
return false; }
394 contains_node_recursive( node.
get() ) &&
396 in_port_index.isValid() &&
398 static_cast<size_t>( in_port_index ) < node->inputs().size() ) {
399 ret = node->inputs()[in_port_index]->disconnect();
400 if ( ret ) needs_recompile();
407 return std::find_if( m_nodes.begin(), m_nodes.end(), [instance, model](
const auto& p ) {
408 return p->model_name() == model && p->instance_name() == instance;
409 } ) != m_nodes.end();
412bool DataflowGraph::contains_node_recursive(
const Node* node )
const {
413 if ( !node )
return false;
414 for (
const auto& n : m_nodes ) {
415 if ( n.get() == node )
return true;
418 if ( g->contains_node_recursive( node ) )
return true;
412bool DataflowGraph::contains_node_recursive(
const Node* node )
const {
…}
424void DataflowGraph::generate_ports() {
425 if ( m_input_node ) m_inputs = m_input_node->inputs();
426 if ( m_output_node ) m_outputs = m_output_node->outputs();
424void DataflowGraph::generate_ports() {
…}
429bool DataflowGraph::compile() {
436 if ( m_output_node ) {
437 backtrack_graph( m_output_node.get(), infoNodes );
440 for (
auto const& n : m_nodes ) {
442 if ( n->is_output() && n != m_output_node ) {
444 if (
std::any_of( n->inputs().begin(), n->inputs().end(), [](
const auto& p ) {
445 return p->is_linked();
450 backtrack_graph( n.get(), infoNodes );
453 LOG( logWARNING ) <<
"Sink Node " << n->instance_name()
454 <<
" is inactive (belongs to the graph but not connected)";
460 for (
auto& infNode : infoNodes ) {
461 auto n = infNode.first;
463 if ( n->is_input() || n == m_input_node.get() ) {
466 infNode.second.first = 0;
468 maxLevel =
std::max( maxLevel, traverse_graph( n, infoNodes ) );
471 m_nodes_by_level.
clear();
472 m_nodes_by_level.resize( infoNodes.
size() != 0 ? maxLevel + 1 : 0 );
473 for (
auto& infNode : infoNodes ) {
474 CORE_ASSERT(
size_t( infNode.second.first ) < m_nodes_by_level.size(),
475 std::string(
"Node " ) + infNode.first->instance_name() +
" is at level " +
479 m_nodes_by_level[infNode.second.first].push_back( infNode.first );
483 for (
auto& lvl : m_nodes_by_level ) {
485 for (
size_t j = 0; j < lvl.size(); j++ ) {
486 if ( !lvl[j]->compile() ) {
return m_ready =
false; }
488 for (
size_t k = 0; k < lvl[j]->inputs().size(); k++ ) {
489 if ( lvl[j] != m_input_node.get() && lvl[j]->inputs()[k]->is_link_mandatory() &&
490 !lvl[j]->inputs()[k]->is_linked() ) {
492 <<
"Node <" << lvl[j]->instance_name() <<
"> is not ready" <<
std::endl;
493 return m_ready =
false;
429bool DataflowGraph::compile() {
…}
504void DataflowGraph::clear_nodes() {
505 for (
size_t i = 0; i < m_nodes_by_level.size(); i++ ) {
506 m_nodes_by_level[i].clear();
507 m_nodes_by_level[i].shrink_to_fit();
509 m_nodes_by_level.clear();
510 m_nodes_by_level.shrink_to_fit();
511 m_nodes.erase( m_nodes.begin(), m_nodes.end() );
512 m_nodes.shrink_to_fit();
513 m_inputs.erase( m_inputs.begin(), m_inputs.end() );
514 m_inputs.shrink_to_fit();
515 m_outputs.erase( m_outputs.begin(), m_outputs.end() );
516 m_outputs.shrink_to_fit();
517 m_should_save =
true;
504void DataflowGraph::clear_nodes() {
…}
520void DataflowGraph::backtrack_graph(
523 for (
auto& input : current->
inputs() ) {
524 if ( input->link() ) {
525 Node* previous = input->link()->node();
526 if ( previous && previous != m_input_node.get() ) {
527 auto previousInInfoNodes = infoNodes.find( previous );
528 if ( previousInInfoNodes != infoNodes.end() ) {
531 auto& previousSuccessors = previousInInfoNodes->second.second;
532 bool foundCurrent =
std::any_of( previousSuccessors.begin(),
533 previousSuccessors.end(),
534 [current](
auto c ) { return c == current; } );
535 if ( !foundCurrent ) {
537 previousSuccessors.push_back( current );
547 backtrack_graph( previous, infoNodes );
554int DataflowGraph::traverse_graph(
559 if ( infoNodes.find( current ) != infoNodes.end() ) {
561 for (
auto const& successor : infoNodes[current].second ) {
563 infoNodes[successor].first =
564 std::max( infoNodes[successor].first, infoNodes[current].first + 1 );
567 std::max( infoNodes[successor].first, traverse_graph( successor, infoNodes ) ) );
576 std::find_if( m_nodes.begin(), m_nodes.end(), [instanceNameNode](
const auto& n ) {
577 return n->instance_name() == instanceNameNode;
579 if ( nodeIt != m_nodes.end() ) {
return *nodeIt; }
580 LOG( logERROR ) <<
"DataflowGraph::node : The node with the instance name \""
581 << instanceNameNode <<
"\" has not been found";
587 auto oj = read_json( filename );
588 if ( !oj )
return nullptr;
593 if ( j.contains(
"instance" ) && j.contains(
"model" ) ) {
594 valid = j[
"model"].contains(
"name" );
597 LOG( logERROR ) <<
"loadGraphFromJsonFile :" << filename
598 <<
" does not contain a valid json NodeGraph\n";
605 LOG( logINFO ) <<
"Loading the graph " << instanceName <<
", with type " << graphType <<
"\n";
608 auto node = factories.create_node( graphType, j );
610 if ( node ==
nullptr ) {
611 LOG( logERROR ) <<
"Unable to load a graph with type " << graphType <<
"\n";
616 if ( graph !=
nullptr ) {
617 graph->m_should_save =
false;
621 LOG( logERROR ) <<
"Loaded graph failed (not derived from DataflowGraph) " << graphType <<
"\n";
626bool DataflowGraph::are_nodes_valids(
const Node* nodeFrom,
628 bool verbose )
const {
629 using namespace Ra::Core::Utils;
631 if ( !contains_node_recursive( nodeFrom ) ) {
632 if ( verbose ) Log::unable_to_find(
"initial node", nodeFrom->
instance_name() );
637 if ( !contains_node_recursive( nodeTo ) ) {
638 if ( verbose ) Log::unable_to_find(
"destination node", nodeTo->
instance_name() );
642 if ( ( nodeFrom == m_input_node.get() || nodeFrom == m_output_node.get() ) &&
643 ( nodeTo == m_input_node.get() || nodeTo == m_output_node.get() ) ) {
644 if ( verbose ) Log::try_to_link_input_to_output();
650void DataflowGraph::Log::already_linked(
const Node* node,
const PortBase* port ) {
652 <<
"DataflowGraph::add_link destination port not available (already linked) for "
653 << node->instance_name() <<
" (" << node->model_name() <<
"), port " << port->name();
656void DataflowGraph::Log::link_type_mismatch(
const Node* nodeFrom,
657 const PortBase* portOut,
659 const PortBase* portIn ) {
660 LOG( logERROR ) <<
"DataflowGraph link type mismatch from " << nodeFrom->display_name() <<
" ("
661 << nodeFrom->model_name() <<
") / " << portOut->name() <<
" with type "
662 << portOut->port_typename() <<
")"
663 <<
" to " << nodeTo->display_name() <<
" (" << nodeTo->model_name() <<
") / "
664 << portIn->name() <<
" ( with type " << portIn->port_typename() <<
") ";
667void DataflowGraph::Log::unable_to_find(
const std::string& type,
669 LOG( logERROR ) <<
"DataflowGraph::add_link Unable to find " << type <<
" " << instanceName;
672void DataflowGraph::Log::bad_port_index(
const std::string& type,
674 Node::PortIndex idx ) {
675 LOG( logERROR ) <<
"DataflowGraph::add_link node " << instanceName <<
" as no " << type
676 <<
" port with index " << idx;
679void DataflowGraph::Log::try_to_link_input_to_output() {
680 LOG( logERROR ) <<
"DataflowGraph could not link input to ouput directrly";
Represent a set of connected nodes that define a Direct Acyclic Computational Graph Ownership of node...
void init() override
Initializes the node content.
DataflowGraph(const std::string &name)
Base abstract class for all the nodes added and used by the node system.
const std::string & instance_name() const
Gets the instance name of the node.
virtual void init()
Initializes the node content.
const PortBaseInCollection & inputs() const
Gets the in ports of the node.
std::type_index type() const
Gets the type of the data (efficient for comparisons).
auto factory_manager() -> NodeFactorySet &
Allow static initialization without init order problems.
hepler function to manage enum as underlying types in VariableSet
T dynamic_pointer_cast(T... args)