Node

Abstraction of a task.

In Dagrs, we provide the trait Node as the representation of user's task. Node is the basic scheduling unit of Graph.

Node's interfaces


///# The [`Node`] trait
///
/// Nodes are the basic scheduling units of Graph. They can be identified by
/// a globally assigned [`NodeId`] and a user-provided name.
///
/// Nodes can communicate with others asynchronously through [`InChannels`] and [`OutChannels`].
///
/// In addition to the above properties, users can also customize some other attributes.
#[async_trait]
pub trait Node: Send + Sync {
    /// id is the unique identifier of each node, it will be assigned by the [`NodeTable`]
    /// when creating a new node, you can find this node through this identifier.
    fn id(&self) -> NodeId;
    /// The node's name.
    fn name(&self) -> NodeName;
    /// Input Channels of this node.
    fn input_channels(&mut self) -> &mut InChannels;
    /// Output Channels of this node.
    fn output_channels(&mut self) -> &mut OutChannels;
    /// Execute a run of this node.
    async fn run(&mut self, env: Arc<EnvVar>) -> Output;
    /// Return true if this node is conditional node. By default, it returns false.
    fn is_condition(&self) -> bool {
        false
    }
    /// Returns the list of nodes that are part of this node's loop structure, if any.
    ///
    /// This method is used to identify nodes that are part of a loop-like structure, such as a loop subgraph.
    /// When this method returns Some(nodes), the loop detection check will skip checking these nodes for cycles.
    ///
    /// Returns None by default, indicating this is not a loop-containing node.
    fn loop_structure(&self) -> Option<Vec<Arc<Mutex<dyn Node>>>> {
        None
    }

    /// Returns true if this node has TypedContent input.
    /// By default, it returns false.
    fn has_typed_input(&self) -> bool {
        false
    }

    /// Returns true if this node has TypedContent output.
    /// By default, it returns false.
    fn has_typed_output(&self) -> bool {
        false
    }
}

The above methods ensure the communication and execution function when Graph schedules Tasks.

Dynamic execution logics

It is possible to execute different logic on the same data layout. So we provide trait Action as the abstraction of a task's logic.

Here is an example of implementing Action, that simply returns Output::Out containing a String "Hello world".

use dagrs::{Action, InChannels, OutChannels, EnvVar, Output};
/// An implementation of [`Action`] that returns [`Output::Out`] containing a String "Hello world".
#[derive(Default)]
pub struct HelloAction;
#[async_trait]
impl Action for HelloAction {
    async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc<EnvVar>) -> Output {
        Output::Out(Some(Content::new("Hello world".to_string())))
    }
}

Create a node

Get a node id

Before creating a new node, let's first get to know the NodeTable - a mapping of nodes' names to their ids. During runtime, we need to figure out the specific node to send/receive messages to/from it. Therefore, each node will be assigned a unique id before the run.

You can call the alloc_id_for method of NodeTable to alloc an id by providing a name, just like the following example.

use dagrs::NodeTable;
let node_table = NodeTable::default();
let name = "Node Name";
let id = node_table.alloc_id_for(name);

Create a default node

If you don't need a Node to hold any extra information, DefaultNode is the best choice.

We can create a DefaultNode with a custom Action and other necceray fields. The example below creates a DefaultNode with a name of "My Node", an action of HelloAction (defined above), a NodeId assigned by the node_table, and empty input_channels and output_channels.

use dagrs::{NodeName, NodeTable, DefaultNode, EmptyAction};

let node_name = "My Node";
let mut node_table = NodeTable::new();
let mut node = DefaultNode::with_action(
    NodeName::from(node_name),
    HelloAction::default(),
    &mut node_table,
);

// Check if node table has key-value pair (node.name, node.id)
assert_eq!(node_table.get(node_name).unwrap(), &node.id());

let env = Arc::new(EnvVar::new(node_table));
let out = node.run(env).get_out().unwrap();
let out: &String = out.get().unwrap();
assert_eq!(out, "Hello world");

Implement a customized node

Don't worry if the DefaultNode can't satisfy your need. You are free to implement a customized Node like the following example:

use std::sync::Arc;
use async_trait::async_trait;
use dagrs::{Content, EnvVar, InChannels, Node, NodeId, NodeName, OutChannels, Output};

struct MessageNode {
    id: NodeId,
    name: NodeName,
    in_channels: InChannels,
    out_channels: OutChannels,
    /*Put your custom fields here.*/
    message: String,
}

#[async_trait]
impl Node for MessageNode {
    fn id(&self) -> NodeId {
        self.id
    }

    fn name(&self) -> NodeName {
        self.name.clone()
    }

    fn input_channels(&mut self) -> &mut InChannels {
        &mut self.in_channels
    }

    fn output_channels(&mut self) -> &mut OutChannels {
        &mut self.out_channels
    }

    async fn run(&mut self, _: Arc<EnvVar>) -> Output {
        Output::Out(Some(Content::new(self.message.clone())))
    }
}

The MessageNode above simply returns its field message when run. If needed, the execution logic of this node can also be replaced with an Action, just like the DefaultNode.

If you need many custom Nodes like this, it can be annoying to duplicate these necceray fields and methods required by the Node trait. The auto_node macro can help you out.