之前我们建了一个能接收和路由消息到本地 Actor 的 WebSocket 服务器。就像建了一个邮局,但只能收信,不能寄信。今天我们要给它加上出站流程——连接到远程节点,让我们的 Actor 能跟其他节点上的 Actor 对话。
想象一下,之前我们的系统就像一个只有收件功能的邮局,现在我们要给它加上投递功能,让它既能收信,也能往外寄信。说白了,就是从"只能接电话"升级到"既能接电话又能打电话"。
实现目标
说白了,我们想要:
- 一个WebSocket 客户端管理器,专门连接那些我们认识的"朋友"节点
- 一个简单好用的 API,让我们能这样发消息:
cluster.send("printer@node2", "Hello!").await;
- 集群能自动把消息通过 WebSocket 路由到远程节点
就像微信群聊一样,你发个消息,系统自动帮你送到对应的人那里。
消息格式回顾
还记得我们的消息长啥样吗?就像这样:
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Message {
pub to: String, // 收件人地址,比如 "printer@node2"
pub from: String, // 发件人地址
pub body: String, // 消息内容
}
简单吧?就像写信一样,有收件人、发件人和内容。
第一步:定义集群客户端
首先,我们需要一个管家来管理所有的远程连接:
use tokio_tungstenite::connect_async;
use futures_util::SinkExt;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
type PeerMap = Arc<RwLock<HashMap<String, tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>>>>;
#[derive(Clone)]
pub struct ClusterClient {
peers: PeerMap, // 存储所有连接的远程节点
}
impl ClusterClient {
pub fn new() -> Self {
Self {
peers: Arc::new(RwLock::new(HashMap::new())),
}
}
}
这个 ClusterClient
就像一个通讯录管理器,记录着我们连接的所有远程朋友。用 Arc<RwLock<HashMap>>
这种结构,就像给通讯录加了把锁,多个线程同时访问也不会乱套。
第二步:连接到远程节点
现在我们需要一个方法来"交朋友"——连接到其他节点:
impl ClusterClient {
pub async fn connect_to(&self, node_name: &str, address: &str) -> Result<(), Box<dyn std::error::Error>> {
println!("正在连接到节点: {} ({})", node_name, address);
let url = format!("ws://{}", address);
let (ws_stream, _) = connect_async(url).await?;
// 把新朋友加到通讯录里
self.peers.write().await.insert(node_name.to_string(), ws_stream);
println!("成功连接到 {}", node_name);
Ok(())
}
}
这就像你想加某个人微信,输入他的微信号,系统帮你建立连接,然后把他加到你的联系人列表里。
小贴士:现在我们直接存储
WebSocketStream
,有点简单粗暴。在后续的教程里,我们会升级成基于任务和通道的系统,更加可靠。
第三步:向远程节点发送消息
现在到了最激动人心的部分——发消息!
impl ClusterClient {
pub async fn send_to_remote(&self, to: &str, from: &str, body: &str) -> Result<(), Box<dyn std::error::Error>> {
// 解析目标地址,比如 "printer@node2" -> ("printer", "node2")
let (actor_name, node_name) = split_address(to);
let message = Message {
to: actor_name.to_string(),
from: from.to_string(),
body: body.to_string(),
};
// 把消息包装成 JSON 快递包裹
let json = serde_json::to_string(&message).unwrap();
// 从通讯录里找到对应的连接
if let Some(mut ws) = self.peers.write().await.get_mut(node_name) {
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
ws.send(TungsteniteMessage::Text(json)).await?;
println!("消息已发送到 {} 节点的 {} Actor", node_name, actor_name);
} else {
println!("找不到节点: {}", node_name);
}
Ok(())
}
}
辅助函数:解析地址
我们需要一个小工具来解析那些 actor@node
格式的地址:
fn split_address(addr: &str) -> (&str, &str) {
match addr.find('@') {
Some(pos) => (&addr[..pos], &addr[pos + 1..]),
None => (addr, "local"), // 没有 @ 就默认是本地
}
}
就像解析邮箱地址一样,把 user@domain.com
分成用户名和域名两部分。
实际测试
让我们来搭建两个节点测试一下:
节点 A(打印机节点):
// main.rs
#[tokio::main]
async fn main() {
let cluster = Arc::new(ClusterSystem::new());
// 启动服务器监听 8081 端口
let server_cluster = cluster.clone();
tokio::spawn(async move {
start_websocket_server("127.0.0.1:8081", server_cluster).await;
});
// 注册一个打印机 Actor
cluster.spawn_actor("printer", PrinterActor).await;
println!("节点 A (打印机节点) 已启动,监听端口 8081");
tokio::signal::ctrl_c().await.unwrap();
}
节点 B(发送方节点):
#[tokio::main]
async fn main() {
let cluster = Arc::new(ClusterSystem::new());
// 连接到节点 A
cluster.client.connect_to("node_a", "127.0.0.1:8081").await.unwrap();
// 5秒后发送一条消息
tokio::time::sleep(Duration::from_secs(5)).await;
cluster.send_to_remote("printer@node_a", "sender@node_b", "你好,远程打印机!").await.unwrap();
println!("节点 B 已发送消息给远程节点");
tokio::signal::ctrl_c().await.unwrap();
}
架构总结
现在我们的系统就像一个小型的分布式微信:
组件 | 功能 | 类比 |
---|---|---|
ClusterClient | 管理远程节点连接 | 通讯录管理器 |
connect_to() | 连接到其他 WebSocket 节点 | 加好友 |
send_to_remote() | 向远程发送 JSON 消息 | 发微信消息 |
总结
今天我们给分布式 Actor 系统加上了"发信"功能:
- ClusterClient - 一个贴心的通讯录管理器
- 连接管理 - 简单可靠的节点连接方式
- 消息路由 - 自动把消息送到正确的地方
现在我们的 Actor 不仅能在本地聊天,还能跨节点交流了!就像从座机升级到了手机,通讯范围一下子扩大了。
下一步,我们会让这个系统更加健壮——加上重连机制、错误处理,还有更优雅的并发模型。毕竟,现实世界的分布式系统可比演示环境复杂多了!
记住,编程就像搭积木,一块一块地建,最终能构建出令人惊叹的系统。
如果这篇文章对你有帮助,别忘了点赞和分享。还有什么想了解的 Rust 分布式系统话题?欢迎在评论区告诉我,下期内容可能就是你想看的!
想要及时获取更多 Rust 和分布式系统的干货内容,可以关注我的公众号"梦兽编程",我会定期分享实用的编程技巧和系统设计经验。