Receive Any & Broadcast & Typed Action
Use `recv_any` and `broadcast` to simplify the process of sending & receiving packets, and use typed action to limit the type of package.
This example shows how to use recv_any
and broadcast
to simplify the process of sending & receiving packets, and use typed action to limit the type of package.
This example creates a graph with two senders and one receiver:
- A normal sender that sends messages immediately
- A slow sender that delays 500ms before sending messages
- A receiver that uses
recv_any
to receive messages from either sender
When running this example, you will see output similar to:
Received message 'Hello from Sender' from node NodeId(1)
Received message 'Hello from SlowSender' from node NodeId(2)
The first message comes from the normal sender, and the second message comes from the slow sender after a 500ms delay.
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use dagrs::{
connection::{in_channel::TypedInChannels, out_channel::TypedOutChannels},
node::typed_action::TypedAction,
DefaultNode, EnvVar, Graph, Node, NodeTable, Output,
};
use tokio::time::sleep;
/// An action that sends a message to its output channel
#[derive(Default)]
pub struct SenderAction {
message: String,
}
impl SenderAction {
pub fn new(message: String) -> Self {
Self { message }
}
}
#[async_trait]
impl TypedAction for SenderAction {
type I = ();
type O = String;
async fn run(
&self,
_: TypedInChannels<Self::I>,
out: TypedOutChannels<Self::O>,
_: Arc<EnvVar>,
) -> Output {
// Send the message to all receivers
out.broadcast(self.message.clone()).await;
Output::Out(None)
}
}
/// An action that sends a message to its output channel after a delay
#[derive(Default)]
pub struct SlowSenderAction {
message: String,
}
impl SlowSenderAction {
pub fn new(message: String) -> Self {
Self { message }
}
}
#[async_trait]
impl TypedAction for SlowSenderAction {
type I = ();
type O = String;
async fn run(
&self,
_: TypedInChannels<Self::I>,
out: TypedOutChannels<Self::O>,
_: Arc<EnvVar>,
) -> Output {
// Wait for 500ms before sending
sleep(Duration::from_millis(500)).await;
// Send the message to all receivers
out.broadcast(self.message.clone()).await;
Output::Out(None)
}
}
/// An action that receives messages from any available channel
#[derive(Default)]
pub struct ReceiverAction;
#[async_trait]
impl TypedAction for ReceiverAction {
type I = String;
type O = ();
async fn run(
&self,
mut input: TypedInChannels<Self::I>,
_: TypedOutChannels<Self::O>,
_: Arc<EnvVar>,
) -> Output {
// Receive from any available channel
match input.recv_any().await {
Ok((sender_id, content)) => {
let message = content.unwrap();
println!("Received message '{}' from node {:?}", message, sender_id);
}
Err(e) => {
eprintln!("Error receiving message: {:?}", e);
}
}
match input.recv_any().await {
Ok((sender_id, content)) => {
let message = content.unwrap();
println!("Received message '{}' from node {:?}", message, sender_id);
}
Err(e) => {
eprintln!("Error receiving message: {:?}", e);
}
}
Output::Out(None)
}
}
fn main() {
// Create a node table
let mut node_table = NodeTable::new();
// Create sender nodes
let sender1 = DefaultNode::with_action(
"Sender1".to_string(),
SenderAction::new("Hello from Sender".to_string()),
&mut node_table,
);
let sender2 = DefaultNode::with_action(
"Sender2".to_string(),
SlowSenderAction::new("Hello from SlowSender".to_string()),
&mut node_table,
);
// Create receiver node
let receiver = DefaultNode::with_action(
"Receiver".to_string(),
ReceiverAction::default(),
&mut node_table,
);
// Get node IDs before adding nodes to the graph
let sender1_id = sender1.id();
let sender2_id = sender2.id();
let receiver_id = receiver.id();
// Create a graph
let mut graph = Graph::new();
// Add nodes to the graph
graph.add_node(sender1);
graph.add_node(sender2);
graph.add_node(receiver);
// Add edges: both senders connect to the receiver
graph.add_edge(sender1_id, vec![receiver_id]);
graph.add_edge(sender2_id, vec![receiver_id]);
// Run the graph
match graph.start() {
Ok(_) => (),
Err(e) => {
eprintln!("Graph execution failed: {:?}", e);
}
}
}