你花了好几周时间,终于搭起来一个漂亮的分布式 Actor 系统。Actor 会说话,会路由消息,会同步文件,甚至还能分配任务。代码跑起来行云流水,在本地测试时爽得飞起。
然后周一上午,生产环境来了个"惊喜"——某个工作节点因为网络抖动掉线了。
结果呢?十几个任务卡在半路,客户端傻等,日志里全是超时错误。老板过来拍桌子:“这系统怎么这么脆弱?”
别慌,这不是你的系统太菜,而是现实世界本来就这么残酷。
墨菲定律:会出错的东西一定会出错
在分布式系统里,故障不是"如果会发生",而是"什么时候发生"的问题。就像开车一样,你不能假设路上永远不会有人突然变道,而是要随时准备好刹车。
常见的故障有这些:
工作节点崩溃了 - 进程被 kill 掉,内存爆了,主机直接断电
网络出问题 - 延迟突然飙到几秒,丢包率暴增,甚至整个机房断网
任务执行超时 - 某个节点卡住了,一直不返回结果
消息丢失 - WebSocket 断开重连的那一刻,正好有消息在传输
面对这些情况,你的系统需要像个经验丰富的老司机——知道什么时候该踩刹车,什么时候该换条路,什么时候该重新发起尝试。
今天我们就来给系统装上这套"主动安全系统"。
我们要实现什么
说白了,就是让系统学会这几招:
重试队列 - 任务失败了自动重试,就像快递没送到会再送一次
故障检测 - 及时发现哪个节点挂了,别再给它派活儿
自动清理 - 把死掉的节点踢出去,别占着位置不干活
督导机制 - 设置个"班长",专门盯着出问题的任务
掌握了这些,你的系统就能像那些打不死的服务一样,即使遇到故障也能自己恢复过来。
第一步:建立重试队列
我们先搞个"待重试任务表"。就像快递公司的"派件失败清单",记录哪些任务没完成、发给了谁、已经试了几次。
use std::collections::HashMap;
use std::time::{Instant, Duration};
use tokio::sync::Mutex;
use std::sync::Arc;
type JobId = String;
#[derive(Clone)]
struct PendingJob {
job: Job, // 任务本身
assigned_to: String, // 分配给谁了
sent_at: Instant, // 啥时候发的
retries: u32, // 已经重试了几次
}
type RetryQueue = Arc<Mutex<HashMap<JobId, PendingJob>>>;
这个结构很简单,就是用一个 HashMap 来存放所有"在路上"的任务。每个任务都带着时间戳和重试次数,方便我们后面判断要不要重发。
第二步:监控和重试失败任务
接下来我们需要一个后台任务,定期检查这个清单,看看有没有超时的任务需要重试。就像快递站每天晚上盘点"今天没送出去的件":
async fn retry_loop(retry_queue: RetryQueue, cluster: ClusterRouter) {
loop {
{
let mut q = retry_queue.lock().await;
let now = Instant::now();
for (job_id, pending) in q.clone() {
// 超过 5 秒还没收到结果?那可能出事了
if now.duration_since(pending.sent_at) > Duration::from_secs(5) {
// 已经试了 3 次了?放弃吧
if pending.retries >= 3 {
println!("任务 {} 重试 3 次都失败了,放弃治疗", job_id);
q.remove(&job_id);
continue;
}
println!("任务 {} 超时了,准备重试第 {} 次", job_id, pending.retries + 1);
// 重新发送任务
let payload = serde_json::to_string(&pending.job).unwrap();
cluster.send(
&format!("{}@{}", "worker", pending.assigned_to),
&payload
).await;
// 更新重试记录
q.insert(
job_id.clone(),
PendingJob {
retries: pending.retries + 1,
sent_at: Instant::now(),
..pending
},
);
}
}
}
// 每 2 秒检查一次
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
这个循环做的事情很直白:每隔两秒醒来看一眼,哪些任务发出去超过 5 秒了还没回音,就再发一次。如果一个任务已经试了 3 次还不行,那就真的是没救了,直接标记为失败。
这就像快递员第一次送不到,会再送一次;第二次还不行,再送一次;第三次还是找不到人,就直接退回去了。
第三步:任务成功后的清理工作
当任务执行完毕,结果返回时,我们得把它从"待重试清单"里删掉,别让它再被重试:
// 收到 JobResult 消息时
retry_queue.lock().await.remove(&msg.id);
这一行代码虽然简单,但非常重要。就像快递签收后,系统会自动标记为"已送达",下次就不会再派这个单了。
第四步:检测挂掉的节点
光重试任务还不够,如果某个节点彻底挂了,你再怎么重试也没用,还不如把它踢出去,别浪费时间。
我们需要一个"健康检查"机制,定期看看哪些节点已经断线了:
async fn health_check(cluster_map: ClusterMap, cluster: ClusterClient) {
loop {
let peers = cluster.peers.read().unwrap().clone();
for (id, conn) in peers.iter() {
// 检查连接是否还活着
if conn.is_disconnected().await {
println!("节点 {} 已经掉线,从集群里移除", id);
// 从集群地图中删除
cluster_map.write().unwrap().remove(id);
// 从连接池中删除
cluster.peers.write().unwrap().remove(id);
}
}
// 每 10 秒检查一次
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
这就像公司早会点名一样,发现谁连续缺勤好几天,就把他从花名册上划掉。这样后面派任务的时候,就不会再派给这些"失联人员"了。
更精细的做法是给每个节点加个"最后活跃时间"或者"心跳时间戳",这样判断起来更准确。
第五步:设置督导 Actor
在真正的生产环境里,任务失败后你可能需要做很多事情:发报警邮件、记录日志、触发备用流程、甚至自动重启某些服务。
这些逻辑如果都写在重试循环里,代码会变得一团糟。更好的办法是用一个专门的"督导 Actor"来处理:
struct Supervisor {
retry_queue: RetryQueue,
}
#[async_trait::async_trait]
impl Actor for Supervisor {
type Message = String;
async fn handle(&mut self, msg: String) {
println!("督导收到故障报告: {}", msg);
// 这里可以做很多事情:
// - 发送告警到 Slack 或邮件
// - 记录到监控系统
// - 触发备用恢复流程
// - 自动调整资源配置
}
}
当某个任务重试 3 次都失败后,不要只是打印日志了事,而是通知这个 Supervisor Actor。它可以根据故障类型采取不同的应对措施。
就像生产线上有个质检员,发现某个环节老出问题,不是简单地扔掉次品,而是会分析原因、调整工艺、甚至叫停整条生产线。
总结一下整套机制
现在我们的系统有了这些防护:
| 机制 | 作用 |
|---|---|
| RetryQueue | 记录所有在途任务和重试状态 |
| retry_loop | 自动重试超时或失败的任务 |
| health_check | 检测并移除掉线的节点 |
| Supervisor Actor | 接收故障事件并采取应对措施 |
| JobResult 清理 | 标记任务完成,避免重复处理 |
这套机制就像给你的系统配了"安全气囊、ABS 防抱死、车道偏离预警"这些主动安全装置。平时可能感觉不到,但关键时刻能救命。
可以进一步优化的地方
上面的代码已经能用了,但如果你想做得更专业,还可以考虑这些:
指数退避策略 - 第一次重试等 1 秒,第二次等 2 秒,第三次等 4 秒,避免"雪崩式重试"
指标收集 - 统计重试次数、失败率、平均延迟等数据,用 Prometheus 这类工具监控
熔断器模式 - 某个节点连续失败多次后,暂时不再给它派任务,给它喘息的时间
告警通知 - 失败次数超过阈值时,自动发邮件或打电话叫醒值班工程师
持久化重试队列 - 把重试队列存到 Redis 或磁盘,这样即使主节点重启也不会丢任务
完整的督导树 - 参考 Erlang/OTP 的 Supervisor 模式,构建多层级的故障监控和恢复机制
动态调整重试策略 - 根据当前系统负载和网络状况,自动调整超时时间和重试次数
你做到了
到这里,你的分布式 Actor 系统已经相当完整了:
- 本地 Actor 通信
- 基于 WebSocket 的跨节点通信
- 集群路由和消息转发
- 基于 Gossip 的节点发现
- 命令控制层
- 文件同步和热更新
- 分布式任务队列
- 容错和自愈机制
这不只是个学习项目,而是一个可以用来搞事情的基础框架。你可以基于它做:
游戏服务器 - 玩家在不同区服之间跳转,状态自动同步
聊天系统 - 消息跨节点路由,离线消息自动重试
AI Agent 集群 - 多个 AI 节点协作处理任务,某个节点挂了自动转移
机器人集群 - 多台机器人之间协调动作,容忍个别机器人掉线
边缘计算平台 - 把计算任务分发到边缘节点,网络抖动时自动重试
这些场景的核心逻辑都已经在你手里了。剩下的只是根据具体需求做些调整和优化。
系统总会出故障,但一个好的系统知道如何优雅地应对故障。这就是"高可用"的真正含义——不是永远不出问题,而是出了问题能自己扛过去。
现在,你的系统已经有了这个能力。
如果这篇文章对你有帮助,可以关注"梦兽编程"公众号,我会持续分享 Rust 和分布式系统的实战经验。遇到问题也欢迎留言交流,咱们一起进步。
