Node
Abstraction of a task.
In Dagrs, we provide the trait Node
as the representation of
user's task. Node
is the basic scheduling unit of Graph
.
Node's interfaces
///# The [`Node`] trait
///
/// Nodes are the basic scheduling units of Graph. They can be identified by
/// a globally assigned [`NodeId`] and a user-provided name.
///
/// Nodes can communicate with others asynchronously through [`InChannels`] and [`OutChannels`].
///
/// In addition to the above properties, users can also customize some other attributes.
#[async_trait]
pub trait Node: Send + Sync {
/// id is the unique identifier of each node, it will be assigned by the [`NodeTable`]
/// when creating a new node, you can find this node through this identifier.
fn id(&self) -> NodeId;
/// The node's name.
fn name(&self) -> NodeName;
/// Input Channels of this node.
fn input_channels(&mut self) -> &mut InChannels;
/// Output Channels of this node.
fn output_channels(&mut self) -> &mut OutChannels;
/// Execute a run of this node.
async fn run(&mut self, env: Arc<EnvVar>) -> Output;
/// Return true if this node is conditional node. By default, it returns false.
fn is_condition(&self) -> bool {
false
}
/// Returns the list of nodes that are part of this node's loop structure, if any.
///
/// This method is used to identify nodes that are part of a loop-like structure, such as a loop subgraph.
/// When this method returns Some(nodes), the loop detection check will skip checking these nodes for cycles.
///
/// Returns None by default, indicating this is not a loop-containing node.
fn loop_structure(&self) -> Option<Vec<Arc<Mutex<dyn Node>>>> {
None
}
/// Returns true if this node has TypedContent input.
/// By default, it returns false.
fn has_typed_input(&self) -> bool {
false
}
/// Returns true if this node has TypedContent output.
/// By default, it returns false.
fn has_typed_output(&self) -> bool {
false
}
}
The above methods ensure the communication and execution function when Graph
schedules Tasks.
Dynamic execution logics
It is possible to execute different logic on the same data layout. So we provide trait
Action
as the abstraction of a task's logic.
Here is an example of implementing Action
, that simply returns Output::Out
containing a String "Hello world".
use dagrs::{Action, InChannels, OutChannels, EnvVar, Output};
/// An implementation of [`Action`] that returns [`Output::Out`] containing a String "Hello world".
#[derive(Default)]
pub struct HelloAction;
#[async_trait]
impl Action for HelloAction {
async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc<EnvVar>) -> Output {
Output::Out(Some(Content::new("Hello world".to_string())))
}
}
Create a node
Get a node id
Before creating a new node, let's first get to know the NodeTable
- a mapping of nodes' names to their ids.
During runtime, we need to figure out the specific node to send/receive messages to/from it. Therefore, each node will be assigned
a unique id before the run.
You can call the alloc_id_for
method of NodeTable
to alloc an id by providing a name, just like the following example.
use dagrs::NodeTable;
let node_table = NodeTable::default();
let name = "Node Name";
let id = node_table.alloc_id_for(name);
Create a default node
If you don't need a Node
to hold any extra information, DefaultNode
is the best choice.
We can create a DefaultNode
with a
custom Action
and other necceray fields. The example below creates a DefaultNode
with a name of
"My Node", an action of HelloAction
(defined above), a NodeId
assigned by the node_table
, and empty
input_channels and output_channels.
use dagrs::{NodeName, NodeTable, DefaultNode, EmptyAction};
let node_name = "My Node";
let mut node_table = NodeTable::new();
let mut node = DefaultNode::with_action(
NodeName::from(node_name),
HelloAction::default(),
&mut node_table,
);
// Check if node table has key-value pair (node.name, node.id)
assert_eq!(node_table.get(node_name).unwrap(), &node.id());
let env = Arc::new(EnvVar::new(node_table));
let out = node.run(env).get_out().unwrap();
let out: &String = out.get().unwrap();
assert_eq!(out, "Hello world");
Implement a customized node
Don't worry if the DefaultNode
can't satisfy your need.
You are free to implement a customized Node
like the following example:
use std::sync::Arc;
use async_trait::async_trait;
use dagrs::{Content, EnvVar, InChannels, Node, NodeId, NodeName, OutChannels, Output};
struct MessageNode {
id: NodeId,
name: NodeName,
in_channels: InChannels,
out_channels: OutChannels,
/*Put your custom fields here.*/
message: String,
}
#[async_trait]
impl Node for MessageNode {
fn id(&self) -> NodeId {
self.id
}
fn name(&self) -> NodeName {
self.name.clone()
}
fn input_channels(&mut self) -> &mut InChannels {
&mut self.in_channels
}
fn output_channels(&mut self) -> &mut OutChannels {
&mut self.out_channels
}
async fn run(&mut self, _: Arc<EnvVar>) -> Output {
Output::Out(Some(Content::new(self.message.clone())))
}
}
The MessageNode
above simply returns its field message
when run. If needed,
the execution logic of this node can also be replaced with an Action
, just like the
DefaultNode
.
If you need many custom Nodes like this, it can be annoying to duplicate these necceray fields
and methods required by the Node
trait. The auto_node
macro can help you out.