use std::{env, fmt::Display, sync::Arc};
use async_trait::async_trait;
use dagrs::{
graph::loop_subgraph::LoopSubgraph, Action, Content, DefaultNode, EnvVar, Graph, InChannels,
Node, NodeId, NodeTable, OutChannels, Output,
};
struct InAction;
#[async_trait]
impl Action for InAction {
async fn run(
&self,
_in_channels: &mut InChannels,
out_channels: &mut OutChannels,
_env: Arc<EnvVar>,
) -> Output {
log::info!("`In` send start signal to INTER node");
out_channels.broadcast(Content::new(())).await;
Output::Out(None)
}
}
struct InterAction {
in_id: NodeId,
proc_id: NodeId,
limit: usize,
}
#[async_trait]
impl Action for InterAction {
async fn run(
&self,
in_channels: &mut InChannels,
out_channels: &mut OutChannels,
_env: Arc<EnvVar>,
) -> Output {
let content = in_channels.recv_from(&self.in_id).await.unwrap();
in_channels.close_async(&self.in_id).await;
log::info!("`Inter` Received start signal from IN node");
let mut times = 0usize;
out_channels.send_to(&self.proc_id, content).await.unwrap();
log::info!("`Inter` send start signal to PROC node");
while let Ok(content) = in_channels.recv_from(&self.proc_id).await {
log::info!(
"`Inter` Displaying input: [{}]",
content.get::<Arc<dyn Display + Send + Sync>>().unwrap()
);
out_channels.send_to(&self.proc_id, content).await.unwrap();
log::info!("`Inter` send output to PROC node");
times += 1;
if times >= self.limit {
log::info!("`Inter` reached iter limit {}, exit", times);
out_channels.close(&self.proc_id);
break;
}
}
log::info!("`Inter` exit");
Output::empty()
}
}
struct ProcAction {
inter_node: NodeId,
}
#[async_trait]
impl Action for ProcAction {
async fn run(
&self,
in_channels: &mut InChannels,
out_channels: &mut OutChannels,
_env: Arc<EnvVar>,
) -> Output {
let mut times = 0usize;
while let Ok(_) = in_channels.recv_from(&self.inter_node).await {
log::info!("`Proc` send {} to INTER node", times);
out_channels
.send_to(
&self.inter_node,
Content::new(Arc::new(times) as Arc<dyn Display + Send + Sync>),
)
.await
.unwrap();
times += 1;
}
log::info!("`Proc` exit");
Output::empty()
}
}
fn main() {
env::set_var("RUST_LOG", "info");
env_logger::init();
let mut node_table = NodeTable::default();
let in_node = DefaultNode::with_action("IN".to_string(), InAction, &mut node_table);
let in_id = in_node.id();
let mut inter = DefaultNode::new("Inter".to_string(), &mut node_table);
let inter_id = inter.id();
let mut proc = DefaultNode::new("Proc".to_string(), &mut node_table);
let proc_id = proc.id();
inter.set_action(InterAction {
in_id,
proc_id,
limit: 10,
});
proc.set_action(ProcAction {
inter_node: inter_id,
});
let mut inter_proc = LoopSubgraph::new("inter_proc".to_string(), &mut node_table);
inter_proc.add_node(inter);
inter_proc.add_node(proc);
let mut graph = Graph::new();
graph.add_node(in_node);
graph.add_node(inter_proc);
graph.add_edge(in_id, vec![inter_id]);
graph.add_edge(inter_id, vec![proc_id]);
graph.add_edge(proc_id, vec![inter_id]);
match graph.start() {
Ok(_) => println!("Graph executed successfully"),
Err(e) => panic!("Graph execution failed: {:?}", e),
}
}