Previously, we built a WebSocket server that receives and routes messages to local Actors. Think of it like setting up a post office that can only receive mail but can’t send any out. Today we’re adding the outbound flow — connecting to remote nodes so our Actors can talk to Actors on other nodes.

Imagine our system was like a post office with only receiving capabilities. Now we’re adding delivery services so it can both receive and send mail. Simply put, we’re upgrading from “can only answer phone calls” to “can both answer and make phone calls.”

Implementation Goals

Here’s what we want to build:

  • A WebSocket client manager that connects to those remote “friend” nodes we know
  • A simple, easy-to-use API that lets us send messages like this:
cluster.send("printer@node2", "Hello!").await;
  • The cluster automatically routes messages via WebSocket to remote nodes

Just like WeChat group chats — you send a message and the system automatically delivers it to the right person.

Message Format Review

Remember what our messages look like? Just like this:

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Message {
    pub to: String,    // Recipient address, like "printer@node2"
    pub from: String,  // Sender address
    pub body: String,  // Message content
}

Simple, right? Just like writing a letter — you have the recipient, sender, and content.

Step 1: Define the Cluster Client

First, we need a manager to handle all our remote connections:

use tokio_tungstenite::connect_async;
use futures_util::SinkExt;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

type PeerMap = Arc<RwLock<HashMap<String, tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>>>>;

#[derive(Clone)]
pub struct ClusterClient {
    peers: PeerMap,  // Store all connected remote nodes
}

impl ClusterClient {
    pub fn new() -> Self {
        Self {
            peers: Arc::new(RwLock::new(HashMap::new())),
        }
    }
}

This ClusterClient is like a contact book manager, keeping track of all our connected remote friends. Using the Arc<RwLock<HashMap>> structure is like putting a lock on our contact book — multiple threads can access it simultaneously without causing chaos.

Step 2: Connect to Remote Nodes

Now we need a method to “make friends” — connect to other nodes:

impl ClusterClient {
    pub async fn connect_to(&self, node_name: &str, address: &str) -> Result<(), Box<dyn std::error::Error>> {
        println!("正在连接到节点: {} ({})", node_name, address);
        
        let url = format!("ws://{}", address);
        let (ws_stream, _) = connect_async(url).await?;
        
        // Add our new friend to the contact book
        self.peers.write().await.insert(node_name.to_string(), ws_stream);
        
        println!("成功连接到 {}", node_name);
        Ok(())
    }
}

This is like wanting to add someone on WeChat — you enter their WeChat ID, the system helps you establish a connection, then adds them to your contact list.

Pro tip: Right now we’re directly storing WebSocketStream, which is a bit crude. In future tutorials, we’ll upgrade to a task and channel-based system for better reliability.

Step 3: Send Messages to Remote Nodes

Now for the most exciting part — sending messages!

impl ClusterClient {
    pub async fn send_to_remote(&self, to: &str, from: &str, body: &str) -> Result<(), Box<dyn std::error::Error>> {
        // Parse the target address, e.g., "printer@node2" -> ("printer", "node2")
        let (actor_name, node_name) = split_address(to);
        
        let message = Message {
            to: actor_name.to_string(),
            from: from.to_string(),
            body: body.to_string(),
        };
        
        // Pack the message into a JSON delivery package
        let json = serde_json::to_string(&message).unwrap();
        
        // Find the corresponding connection from our contact book
        if let Some(mut ws) = self.peers.write().await.get_mut(node_name) {
            use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
            ws.send(TungsteniteMessage::Text(json)).await?;
            println!("消息已发送到 {} 节点的 {} Actor", node_name, actor_name);
        } else {
            println!("找不到节点: {}", node_name);
        }
        
        Ok(())
    }
}

Helper Function: Address Parsing

We need a little utility to parse those actor@node format addresses:

fn split_address(addr: &str) -> (&str, &str) {
    match addr.find('@') {
        Some(pos) => (&addr[..pos], &addr[pos + 1..]),
        None => (addr, "local"), // Default to local if no @
    }
}

Just like parsing email addresses — splitting user@domain.com into username and domain parts.

Practical Testing

Let’s set up two nodes to test this out:

Node A (Printer Node):

// main.rs
#[tokio::main]
async fn main() {
    let cluster = Arc::new(ClusterSystem::new());
    
    // Start server listening on port 8081
    let server_cluster = cluster.clone();
    tokio::spawn(async move {
        start_websocket_server("127.0.0.1:8081", server_cluster).await;
    });
    
    // Register a printer Actor
    cluster.spawn_actor("printer", PrinterActor).await;
    
    println!("节点 A (打印机节点) 已启动,监听端口 8081");
    tokio::signal::ctrl_c().await.unwrap();
}

Node B (Sender Node):

#[tokio::main]
async fn main() {
    let cluster = Arc::new(ClusterSystem::new());
    
    // Connect to Node A
    cluster.client.connect_to("node_a", "127.0.0.1:8081").await.unwrap();
    
    // Send a message after 5 seconds
    tokio::time::sleep(Duration::from_secs(5)).await;
    
    cluster.send_to_remote("printer@node_a", "sender@node_b", "Hello, remote printer!").await.unwrap();
    
    println!("节点 B 已发送消息给远程节点");
    tokio::signal::ctrl_c().await.unwrap();
}

Architecture Summary

Now our system is like a mini distributed WeChat:

ComponentFunctionAnalogy
ClusterClientManages remote node connectionsContact book manager
connect_to()Connects to other WebSocket nodesAdding friends
send_to_remote()Sends JSON messages across the wireSending WeChat messages

Summary

Today we added “mail sending” capabilities to our distributed Actor system:

  1. ClusterClient - A thoughtful contact book manager
  2. Connection Management - Simple and reliable node connection approach
  3. Message Routing - Automatically delivers messages to the right place

Now our Actors can not only chat locally but also communicate across nodes! It’s like upgrading from landline phones to mobile phones — the communication range has expanded dramatically.

Next up, we’ll make this system more robust — adding reconnection mechanisms, error handling, and more elegant concurrency models. After all, real-world distributed systems are much more complex than demo environments!

Remember, programming is like building with LEGO blocks — piece by piece, you can eventually build amazing systems.


If this article helped you, don’t forget to like and share! What other Rust distributed systems topics would you like to learn about? Let me know in the comments — your suggestion might be the next article!

Want to get timely updates on more Rust and distributed systems content? Follow our WeChat account “DreamBeast Programming” where I regularly share practical programming tips and system design insights.