之前我们建了一个能接收和路由消息到本地 Actor 的 WebSocket 服务器。就像建了一个邮局,但只能收信,不能寄信。今天我们要给它加上出站流程——连接到远程节点,让我们的 Actor 能跟其他节点上的 Actor 对话。

想象一下,之前我们的系统就像一个只有收件功能的邮局,现在我们要给它加上投递功能,让它既能收信,也能往外寄信。说白了,就是从"只能接电话"升级到"既能接电话又能打电话"。

实现目标

说白了,我们想要:

  • 一个WebSocket 客户端管理器,专门连接那些我们认识的"朋友"节点
  • 一个简单好用的 API,让我们能这样发消息:
cluster.send("printer@node2", "Hello!").await;
  • 集群能自动把消息通过 WebSocket 路由到远程节点

就像微信群聊一样,你发个消息,系统自动帮你送到对应的人那里。

消息格式回顾

还记得我们的消息长啥样吗?就像这样:

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Message {
    pub to: String,    // 收件人地址,比如 "printer@node2"
    pub from: String,  // 发件人地址
    pub body: String,  // 消息内容
}

简单吧?就像写信一样,有收件人、发件人和内容。

第一步:定义集群客户端

首先,我们需要一个管家来管理所有的远程连接:

use tokio_tungstenite::connect_async;
use futures_util::SinkExt;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

type PeerMap = Arc<RwLock<HashMap<String, tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>>>>;

#[derive(Clone)]
pub struct ClusterClient {
    peers: PeerMap,  // 存储所有连接的远程节点
}

impl ClusterClient {
    pub fn new() -> Self {
        Self {
            peers: Arc::new(RwLock::new(HashMap::new())),
        }
    }
}

这个 ClusterClient 就像一个通讯录管理器,记录着我们连接的所有远程朋友。用 Arc<RwLock<HashMap>> 这种结构,就像给通讯录加了把锁,多个线程同时访问也不会乱套。

第二步:连接到远程节点

现在我们需要一个方法来"交朋友"——连接到其他节点:

impl ClusterClient {
    pub async fn connect_to(&self, node_name: &str, address: &str) -> Result<(), Box<dyn std::error::Error>> {
        println!("正在连接到节点: {} ({})", node_name, address);
        
        let url = format!("ws://{}", address);
        let (ws_stream, _) = connect_async(url).await?;
        
        // 把新朋友加到通讯录里
        self.peers.write().await.insert(node_name.to_string(), ws_stream);
        
        println!("成功连接到 {}", node_name);
        Ok(())
    }
}

这就像你想加某个人微信,输入他的微信号,系统帮你建立连接,然后把他加到你的联系人列表里。

小贴士:现在我们直接存储 WebSocketStream,有点简单粗暴。在后续的教程里,我们会升级成基于任务和通道的系统,更加可靠。

第三步:向远程节点发送消息

现在到了最激动人心的部分——发消息!

impl ClusterClient {
    pub async fn send_to_remote(&self, to: &str, from: &str, body: &str) -> Result<(), Box<dyn std::error::Error>> {
        // 解析目标地址,比如 "printer@node2" -> ("printer", "node2")
        let (actor_name, node_name) = split_address(to);
        
        let message = Message {
            to: actor_name.to_string(),
            from: from.to_string(),
            body: body.to_string(),
        };
        
        // 把消息包装成 JSON 快递包裹
        let json = serde_json::to_string(&message).unwrap();
        
        // 从通讯录里找到对应的连接
        if let Some(mut ws) = self.peers.write().await.get_mut(node_name) {
            use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
            ws.send(TungsteniteMessage::Text(json)).await?;
            println!("消息已发送到 {} 节点的 {} Actor", node_name, actor_name);
        } else {
            println!("找不到节点: {}", node_name);
        }
        
        Ok(())
    }
}

辅助函数:解析地址

我们需要一个小工具来解析那些 actor@node 格式的地址:

fn split_address(addr: &str) -> (&str, &str) {
    match addr.find('@') {
        Some(pos) => (&addr[..pos], &addr[pos + 1..]),
        None => (addr, "local"), // 没有 @ 就默认是本地
    }
}

就像解析邮箱地址一样,把 user@domain.com 分成用户名和域名两部分。

实际测试

让我们来搭建两个节点测试一下:

节点 A(打印机节点):

// main.rs
#[tokio::main]
async fn main() {
    let cluster = Arc::new(ClusterSystem::new());
    
    // 启动服务器监听 8081 端口
    let server_cluster = cluster.clone();
    tokio::spawn(async move {
        start_websocket_server("127.0.0.1:8081", server_cluster).await;
    });
    
    // 注册一个打印机 Actor
    cluster.spawn_actor("printer", PrinterActor).await;
    
    println!("节点 A (打印机节点) 已启动,监听端口 8081");
    tokio::signal::ctrl_c().await.unwrap();
}

节点 B(发送方节点):

#[tokio::main]
async fn main() {
    let cluster = Arc::new(ClusterSystem::new());
    
    // 连接到节点 A
    cluster.client.connect_to("node_a", "127.0.0.1:8081").await.unwrap();
    
    // 5秒后发送一条消息
    tokio::time::sleep(Duration::from_secs(5)).await;
    
    cluster.send_to_remote("printer@node_a", "sender@node_b", "你好,远程打印机!").await.unwrap();
    
    println!("节点 B 已发送消息给远程节点");
    tokio::signal::ctrl_c().await.unwrap();
}

架构总结

现在我们的系统就像一个小型的分布式微信:

组件功能类比
ClusterClient管理远程节点连接通讯录管理器
connect_to()连接到其他 WebSocket 节点加好友
send_to_remote()向远程发送 JSON 消息发微信消息

总结

今天我们给分布式 Actor 系统加上了"发信"功能:

  1. ClusterClient - 一个贴心的通讯录管理器
  2. 连接管理 - 简单可靠的节点连接方式
  3. 消息路由 - 自动把消息送到正确的地方

现在我们的 Actor 不仅能在本地聊天,还能跨节点交流了!就像从座机升级到了手机,通讯范围一下子扩大了。

下一步,我们会让这个系统更加健壮——加上重连机制、错误处理,还有更优雅的并发模型。毕竟,现实世界的分布式系统可比演示环境复杂多了!

记住,编程就像搭积木,一块一块地建,最终能构建出令人惊叹的系统。


如果这篇文章对你有帮助,别忘了点赞和分享。还有什么想了解的 Rust 分布式系统话题?欢迎在评论区告诉我,下期内容可能就是你想看的!

想要及时获取更多 Rust 和分布式系统的干货内容,可以关注我的公众号"梦兽编程",我会定期分享实用的编程技巧和系统设计经验。