State Subscription
Example of subscribing to graph events.
This example shows how to use the subscription mechanism to monitor the graph execution in real-time.
use std::sync::Arc;
use tokio::time::Duration;
use dagrs::{
Action, DefaultNode, EnvVar, Graph, InChannels, NodeTable, OutChannels, Output,
};
struct SleepAction;
#[async_trait::async_trait]
impl Action for SleepAction {
async fn run(&self, _: &mut InChannels, _: &mut OutChannels, _: Arc<EnvVar>) -> Output {
tokio::time::sleep(Duration::from_millis(100)).await;
Output::empty()
}
}
#[tokio::main]
async fn main() {
let mut table = NodeTable::new();
let mut graph = Graph::new();
let node = DefaultNode::with_action("SleepNode".to_string(), SleepAction, &mut table);
graph.add_node(node);
// Subscribe to events BEFORE starting the graph
let mut receiver = graph.subscribe();
// Spawn a task to handle events
tokio::spawn(async move {
while let Ok(event) = receiver.recv().await {
println!("Received event: {:?}", event);
// You can pattern match on 'event' to handle specific cases like NodeSuccess, NodeFailed, etc.
}
});
graph.start().unwrap();
}