你花了好几周时间,终于搭起来一个漂亮的分布式 Actor 系统。Actor 会说话,会路由消息,会同步文件,甚至还能分配任务。代码跑起来行云流水,在本地测试时爽得飞起。

然后周一上午,生产环境来了个"惊喜"——某个工作节点因为网络抖动掉线了。

结果呢?十几个任务卡在半路,客户端傻等,日志里全是超时错误。老板过来拍桌子:“这系统怎么这么脆弱?”

别慌,这不是你的系统太菜,而是现实世界本来就这么残酷。

墨菲定律:会出错的东西一定会出错

在分布式系统里,故障不是"如果会发生",而是"什么时候发生"的问题。就像开车一样,你不能假设路上永远不会有人突然变道,而是要随时准备好刹车。

常见的故障有这些:

工作节点崩溃了 - 进程被 kill 掉,内存爆了,主机直接断电

网络出问题 - 延迟突然飙到几秒,丢包率暴增,甚至整个机房断网

任务执行超时 - 某个节点卡住了,一直不返回结果

消息丢失 - WebSocket 断开重连的那一刻,正好有消息在传输

面对这些情况,你的系统需要像个经验丰富的老司机——知道什么时候该踩刹车,什么时候该换条路,什么时候该重新发起尝试。

今天我们就来给系统装上这套"主动安全系统"。

我们要实现什么

说白了,就是让系统学会这几招:

重试队列 - 任务失败了自动重试,就像快递没送到会再送一次

故障检测 - 及时发现哪个节点挂了,别再给它派活儿

自动清理 - 把死掉的节点踢出去,别占着位置不干活

督导机制 - 设置个"班长",专门盯着出问题的任务

掌握了这些,你的系统就能像那些打不死的服务一样,即使遇到故障也能自己恢复过来。

第一步:建立重试队列

我们先搞个"待重试任务表"。就像快递公司的"派件失败清单",记录哪些任务没完成、发给了谁、已经试了几次。

use std::collections::HashMap;
use std::time::{Instant, Duration};
use tokio::sync::Mutex;
use std::sync::Arc;

type JobId = String;

#[derive(Clone)]
struct PendingJob {
    job: Job,                // 任务本身
    assigned_to: String,     // 分配给谁了
    sent_at: Instant,        // 啥时候发的
    retries: u32,            // 已经重试了几次
}

type RetryQueue = Arc<Mutex<HashMap<JobId, PendingJob>>>;

这个结构很简单,就是用一个 HashMap 来存放所有"在路上"的任务。每个任务都带着时间戳和重试次数,方便我们后面判断要不要重发。

第二步:监控和重试失败任务

接下来我们需要一个后台任务,定期检查这个清单,看看有没有超时的任务需要重试。就像快递站每天晚上盘点"今天没送出去的件":

async fn retry_loop(retry_queue: RetryQueue, cluster: ClusterRouter) {
    loop {
        {
            let mut q = retry_queue.lock().await;
            let now = Instant::now();

            for (job_id, pending) in q.clone() {
                // 超过 5 秒还没收到结果?那可能出事了
                if now.duration_since(pending.sent_at) > Duration::from_secs(5) {
                    // 已经试了 3 次了?放弃吧
                    if pending.retries >= 3 {
                        println!("任务 {} 重试 3 次都失败了,放弃治疗", job_id);
                        q.remove(&job_id);
                        continue;
                    }

                    println!("任务 {} 超时了,准备重试第 {} 次", job_id, pending.retries + 1);

                    // 重新发送任务
                    let payload = serde_json::to_string(&pending.job).unwrap();
                    cluster.send(
                        &format!("{}@{}", "worker", pending.assigned_to),
                        &payload
                    ).await;

                    // 更新重试记录
                    q.insert(
                        job_id.clone(),
                        PendingJob {
                            retries: pending.retries + 1,
                            sent_at: Instant::now(),
                            ..pending
                        },
                    );
                }
            }
        }
        // 每 2 秒检查一次
        tokio::time::sleep(Duration::from_secs(2)).await;
    }
}

这个循环做的事情很直白:每隔两秒醒来看一眼,哪些任务发出去超过 5 秒了还没回音,就再发一次。如果一个任务已经试了 3 次还不行,那就真的是没救了,直接标记为失败。

这就像快递员第一次送不到,会再送一次;第二次还不行,再送一次;第三次还是找不到人,就直接退回去了。

第三步:任务成功后的清理工作

当任务执行完毕,结果返回时,我们得把它从"待重试清单"里删掉,别让它再被重试:

// 收到 JobResult 消息时
retry_queue.lock().await.remove(&msg.id);

这一行代码虽然简单,但非常重要。就像快递签收后,系统会自动标记为"已送达",下次就不会再派这个单了。

第四步:检测挂掉的节点

光重试任务还不够,如果某个节点彻底挂了,你再怎么重试也没用,还不如把它踢出去,别浪费时间。

我们需要一个"健康检查"机制,定期看看哪些节点已经断线了:

async fn health_check(cluster_map: ClusterMap, cluster: ClusterClient) {
    loop {
        let peers = cluster.peers.read().unwrap().clone();

        for (id, conn) in peers.iter() {
            // 检查连接是否还活着
            if conn.is_disconnected().await {
                println!("节点 {} 已经掉线,从集群里移除", id);

                // 从集群地图中删除
                cluster_map.write().unwrap().remove(id);

                // 从连接池中删除
                cluster.peers.write().unwrap().remove(id);
            }
        }

        // 每 10 秒检查一次
        tokio::time::sleep(Duration::from_secs(10)).await;
    }
}

这就像公司早会点名一样,发现谁连续缺勤好几天,就把他从花名册上划掉。这样后面派任务的时候,就不会再派给这些"失联人员"了。

更精细的做法是给每个节点加个"最后活跃时间"或者"心跳时间戳",这样判断起来更准确。

第五步:设置督导 Actor

在真正的生产环境里,任务失败后你可能需要做很多事情:发报警邮件、记录日志、触发备用流程、甚至自动重启某些服务。

这些逻辑如果都写在重试循环里,代码会变得一团糟。更好的办法是用一个专门的"督导 Actor"来处理:

struct Supervisor {
    retry_queue: RetryQueue,
}

#[async_trait::async_trait]
impl Actor for Supervisor {
    type Message = String;

    async fn handle(&mut self, msg: String) {
        println!("督导收到故障报告: {}", msg);

        // 这里可以做很多事情:
        // - 发送告警到 Slack 或邮件
        // - 记录到监控系统
        // - 触发备用恢复流程
        // - 自动调整资源配置
    }
}

当某个任务重试 3 次都失败后,不要只是打印日志了事,而是通知这个 Supervisor Actor。它可以根据故障类型采取不同的应对措施。

就像生产线上有个质检员,发现某个环节老出问题,不是简单地扔掉次品,而是会分析原因、调整工艺、甚至叫停整条生产线。

总结一下整套机制

现在我们的系统有了这些防护:

机制作用
RetryQueue记录所有在途任务和重试状态
retry_loop自动重试超时或失败的任务
health_check检测并移除掉线的节点
Supervisor Actor接收故障事件并采取应对措施
JobResult 清理标记任务完成,避免重复处理

这套机制就像给你的系统配了"安全气囊、ABS 防抱死、车道偏离预警"这些主动安全装置。平时可能感觉不到,但关键时刻能救命。

可以进一步优化的地方

上面的代码已经能用了,但如果你想做得更专业,还可以考虑这些:

指数退避策略 - 第一次重试等 1 秒,第二次等 2 秒,第三次等 4 秒,避免"雪崩式重试"

指标收集 - 统计重试次数、失败率、平均延迟等数据,用 Prometheus 这类工具监控

熔断器模式 - 某个节点连续失败多次后,暂时不再给它派任务,给它喘息的时间

告警通知 - 失败次数超过阈值时,自动发邮件或打电话叫醒值班工程师

持久化重试队列 - 把重试队列存到 Redis 或磁盘,这样即使主节点重启也不会丢任务

完整的督导树 - 参考 Erlang/OTP 的 Supervisor 模式,构建多层级的故障监控和恢复机制

动态调整重试策略 - 根据当前系统负载和网络状况,自动调整超时时间和重试次数

你做到了

到这里,你的分布式 Actor 系统已经相当完整了:

  • 本地 Actor 通信
  • 基于 WebSocket 的跨节点通信
  • 集群路由和消息转发
  • 基于 Gossip 的节点发现
  • 命令控制层
  • 文件同步和热更新
  • 分布式任务队列
  • 容错和自愈机制

这不只是个学习项目,而是一个可以用来搞事情的基础框架。你可以基于它做:

游戏服务器 - 玩家在不同区服之间跳转,状态自动同步

聊天系统 - 消息跨节点路由,离线消息自动重试

AI Agent 集群 - 多个 AI 节点协作处理任务,某个节点挂了自动转移

机器人集群 - 多台机器人之间协调动作,容忍个别机器人掉线

边缘计算平台 - 把计算任务分发到边缘节点,网络抖动时自动重试

这些场景的核心逻辑都已经在你手里了。剩下的只是根据具体需求做些调整和优化。

系统总会出故障,但一个好的系统知道如何优雅地应对故障。这就是"高可用"的真正含义——不是永远不出问题,而是出了问题能自己扛过去。

现在,你的系统已经有了这个能力。


如果这篇文章对你有帮助,可以关注"梦兽编程"公众号,我会持续分享 Rust 和分布式系统的实战经验。遇到问题也欢迎留言交流,咱们一起进步。