用 Rust 从零构建 AI Agent(四):状态机与多步任务

为什么单循环不够
Part 1 的循环处理的是「一口气能说完」的问题:读这个文件,列那个目录,回答用户。两三回合,结束。只要模型能在一次对话内规划和执行,循环就够了。
但真实工作更复杂。有些任务分阶段:探索,然后计划,然后执行,然后验证。有些阶段有审批门:写入数据库前需要人类签字。有些步骤以可重试的方式失败:瞬态网络抖动、速率限制、片刻死锁。有些任务需要 survive 进程重启:用户让 Agent 通宵做研究爬取,机器重启了,第二天早上想从断点继续而不是从头开始。
循环无法优雅处理这些。状态机可以:节点间共享类型化状态,节点间有命名转换,每步后有持久化检查点,任何节点都能暂停并给出原因。Claude Code CLI 的 EnterPlanMode 和 ExitPlanMode 工具就是这个思想的生产版本:Agent 进入和退出的状态模式,实现阶段由用户审批门控。LangGraph(Python 领域最常引用的 Agent 框架)也是同一思想的不同外衣。两者都归结为同一个核心。
本文用 Rust 实现。Eugene v0.4 引入 eugene-state:Node<S> trait、Graph<S> 运行器、Checkpointer<S>(含 SQLite 实现)、以及暂停图直到调用者恢复它的类型化中断。三个节点,一个审批门, durable 到崩溃。Part 1 的循环还在那里,只是现在住在其中一个节点里。
Node trait:一个阶段
节点是一个阶段。它接收类型化状态,修改它,然后告诉运行器下一步做什么:
#[async_trait]
trait Node<S>: Send + Sync + 'static
where
S: Send + Sync + 'static,
{
fn name(&self) -> &'static str;
async fn run(&self, state: &mut S) -> Result<NextStep, GraphError>;
}
enum NextStep {
Goto(&'static str),
Halt,
Interrupt { reason: String },
}
Goto 命名下一节点。Halt 结束运行。Interrupt 暂停,等待外部确认。没有隐式顺序:节点只通过 Goto 显式跳转。这看起来比隐式顺序更啰嗦,但它是让图可维护的原因——每个转换都是显式的,可以在源码中搜索,可以在运行时可视化。
Rust AI Agent 状态机工作流:Draft → Review → [Interrupt] → Revise → Halt,每次转换后检查点持久化到 SQLite。
状态 S 是图的全局共享状态。每个阶段读取它需要的,写入它产生的。下一阶段在前一阶段放置的地方找到输入。没有节点间消息传递,没有事件总线,没有全局共享。可变状态,显式交接。
Graph 运行器
运行器很短。它按名持有节点,一个入口节点,以及一个最大步数上限,防止有 bug 的 Goto 无限循环:
struct Graph<S> {
nodes: HashMap<&'static str, Box<dyn Node<S>>>,
entry: &'static str,
max_steps: usize,
}
impl<S: Send + Sync + 'static> Graph<S> {
async fn run_with_checkpoint<C: Checkpointer<S>>(
&self,
run_id: &str,
initial: S,
ckpt: &C,
) -> Result<Outcome<S>, GraphError> {
let (mut state, mut current) = match ckpt.load(run_id).await? {
Some(ck) => (ck.state, ck.next_node),
None => (initial, self.entry),
};
for _ in 0..self.max_steps {
let node = self.nodes.get(current)
.ok_or(GraphError::UnknownNode(current))?;
let next = node.run(&mut state).await?;
match next {
NextStep::Goto(name) => {
ckpt.save(run_id, name, &state).await?;
current = name;
}
NextStep::Halt => {
ckpt.delete(run_id).await?;
return Ok(Outcome::Success(state));
}
NextStep::Interrupt { reason } => {
ckpt.save(run_id, current, &state).await?;
return Ok(Outcome::Interrupted { state, reason });
}
}
}
Err(GraphError::MaxStepsExceeded)
}
}
持久化节奏是承重部分。运行器在每次转换后保存。两个节点间的崩溃丢失零工作量;节点内的崩溃最多丢失一个节点的计算。下次运行加载最近的检查点,从上一运行即将进入的节点恢复。
Halt 时的 delete 虽小但重要:它清理成功后的状态,使未来用相同 run_id 的运行从头开始。没有它,检查点表会无限增长。有了它,表只保存进行中或暂停的运行。
中断就是带原因的暂停
Interrupt 变体是人机协作的原语。节点决定没有确认无法继续,返回 Interrupt { reason },运行器序列化状态,保存下一节点,返回 Outcome::Interrupted 给调用者。
调用者接下来做什么由 Agent shell 决定。在交互式 CLI 中,它打印原因,等待 stdin 的 y/n,然后再次用相同 run_id 调用 run_with_checkpoint 恢复。在 Web 应用中,它推送通知,让人在 UI 中批准,然后通过 HTTP 端点恢复。在自动化流水线中,它可能把中断排队等待审查,几小时后恢复。图不关心。检查点是持久的;恢复是一次函数调用。
Part 4 的 gist 用 stdin 演示:
Outcome::Interrupted { state, reason, .. } => {
println!("
=== Draft ===
{}", state.draft.as_deref().unwrap_or(""));
println!("
=== Editor's notes ===
{}",
state.critique.as_deref().unwrap_or(""));
if prompt_yes_no(&reason) {
let outcome = graph
.run_with_checkpoint(RUN_ID, state, &ckpt)
.await?;
// ... print final answer ...
} else {
eprintln!("[abort] Checkpoint kept; rerun to resume.");
}
}
用户回答 no,gist 退出而不删除检查点。再次运行相同命令加载检查点,找到已存在的草稿和评论,直接从 revise 恢复。取消用户审查明天再来,成本为零。
Plan Mode 的血统
这就是 Claude Code 用于其 Plan Mode 的相同形状。用户调用 EnterPlanMode 时,CLI 进入每个工具都限制为只读操作的状态,Agent 的工作是生成实现计划。用户审查计划;如果批准,ExitPlanMode 翻转状态回来,实现工具变得可调用。状态住在 PermissionMode 里。中断是隐式的:Agent 有了连贯计划后停止调用工具,等待用户。
这里的机制更通用。任何节点都能返回 Interrupt,不只是指定的「计划」节点。原因是自由文本,所以调用者可以按 UI 需要格式化它。暂停时的状态完全序列化,所以恢复不需要相同进程甚至相同机器。
SQLite 作为默认存储
Checkpointer<S> 是 trait。实现决定状态住在哪里。默认的 InMemoryCheckpointer<S> 是 Mutex<HashMap<String, Checkpoint<S>>>,适合测试和短运行。SqliteCheckpointer 是真实 Agent 想要的:
const SCHEMA: &str = r#"
CREATE TABLE IF NOT EXISTS checkpoints (
run_id TEXT PRIMARY KEY,
next_node TEXT NOT NULL,
state_json TEXT NOT NULL,
updated_at INTEGER NOT NULL
);
"#;
一张表。每个进行中运行一行,以 run_id 为键。状态以 JSON 存于一列,意味着表对任何实现 Serialize 和 DeserializeOwned 的 S 都是通用的。没有 schema 迁移——添加字段只需在 Rust struct 上加 #[serde(default)]。
SQLite 在 Rust 中通过 rusqlite 使用,写操作通过 tokio::task::spawn_blocking 包装,因为 rusqlite 是同步的。编译时间只占一小部分,而它的工作(每节点一次 INSERT,恢复时一次 SELECT)小到阻塞池的往返都不可见。eugene-state crate 提供两者:rusqlite 是默认 sqlite 后端;用户可以换入自己的 Checkpointer<S> 实现,比如 Postgres、Redis 或任何适合其部署的存储。
同一张表支持多个并发运行。run_id 区分它们。工作池可以有一百个图在飞行,每个写到同一张表,行级锁处理竞争。一个有一千次 read_file 检查点的长期研究 Agent 插入一千行,然后最终 Halt 全部删除。SQLite 毫无怨言地处理这些。
重试属于图,不属于节点
瞬态错误是状态机持久化的第三条腿。来自模型的 429 Too Many Requests。Agent 调用的第三方 API 的连接重置。文件系统的片刻不可写。每种都值得重试,但在不同层面。
Part 3 的 with_retry helper 包装单个 HTTP 调用。当一个节点只做一个外部请求时,这是正确的范围。图层面增加第二个范围:当节点以 GraphError::Transient 失败时,图在传播错误前将整个节点重试最多 retry_attempts 次。
let mut graph = Graph::<State>::new("draft").retry_attempts(3);
两个范围做不同的事。HTTP 重试处理那种你不应该放弃函数已完成工作的失败。图重试处理那种整个阶段需要从头开始的失败:也许模型生成了无法解析的无效 JSON 计划,你想用新上下文再次询问。选择正确的范围意味着思考什么状态在尝试间存活。函数内的重试保留函数的局部变量。图层面的重试让节点从头开始。
三节点图实战
Part 4 的 gist 演示最小的有趣图:draft、review、revise。每个节点做一次 Claude 调用。中断坐在 review 和 revise 之间,让用户在授权重写前看到草稿和编辑笔记。
struct Draft { http: reqwest::Client, api_key: String }
#[async_trait]
impl Node<State> for Draft {
fn name(&self) -> &'static str { "draft" }
async fn run(&self, s: &mut State) -> Result<NextStep, GraphError> {
let system = "You are Eugene, a careful technical writer. Write a single short answer to the user's question. Plain prose, no bullet lists, fewer than 200 words.";
let answer = call_claude(&self.http, &self.api_key, system,
&format!("Write a draft answer to: {}", s.request)).await?;
s.draft = Some(answer);
Ok(NextStep::Goto("review"))
}
}
struct Review { http: reqwest::Client, api_key: String }
#[async_trait]
impl Node<State> for Review {
fn name(&self) -> &'static str { "review" }
async fn run(&self, s: &mut State) -> Result<NextStep, GraphError> {
let system = "You are a sharp editor. Read the draft and list the three biggest weaknesses in one sentence each.";
let critique = call_claude(&self.http, &self.api_key, system,
s.draft.as_deref().unwrap_or("")).await?;
s.critique = Some(critique);
Ok(NextStep::Interrupt {
reason: "Draft and critique ready. Approve revision?".into(),
})
}
}
struct Revise { http: reqwest::Client, api_key: String }
#[async_trait]
impl Node<State> for Revise {
fn name(&self) -> &'static str { "revise" }
async fn run(&self, s: &mut State) -> Result<NextStep, GraphError> {
let system = "You are Eugene. Rewrite the draft using the editor's notes. Keep the same tone. Plain prose, fewer than 200 words.";
let improved = call_claude(&self.http, &self.api_key, system,
&format!("Draft:
{}
Notes:
{}",
s.draft.as_deref().unwrap_or(""),
s.critique.as_deref().unwrap_or(""))).await?;
s.draft = Some(improved);
Ok(NextStep::Halt)
}
}
运行器在 review 后暂停。用户看到草稿和评论,回答 yes,运行器进入 revise,然后 Halt。用户回答 no,程序退出,保留检查点。再次运行相同命令直接走到中断,无需重新计算草稿或评论。检查点 survive SIGINT、断电、kill -9。
可视化:图比代码更诚实
状态机的真正价值不在运行时,在可读性。一张图告诉你任务的结构,而无需读任何一行 Rust。draft → review → [interrupt] → revise → halt。这个画面服务于两个读者:你,规划下一次迭代;以及模型,当你最终想让它通过添加验证节点来扩展图时。清晰的图是模型能读和修改的东西。清晰的图是让状态机可维护的东西。
Hook 泛化了中断
Part 4 的 gist 用手工方式门控 revise 步骤:review 节点返回 NextStep::Interrupt,主函数读 stdin 决定是否恢复。当一个节点特殊时这有效。对于有许多破坏性节点、想把相同门控规则应用到所有节点的 Agent,在每个节点里硬编码中断会变得繁琐且容易遗漏。
Claude Code 用 Hook 解决这个问题。它的工具执行器在每个分发周围运行 pre-tool 和 post-tool Hook 链。Hook 检查权限规则、运行绑定到生命周期事件的用户定义 shell 命令、记录分析、修改工具的输入或输出。新工具自动继承 Hook 链;策略住在一个地方。
相同模式可以移植到状态机。eugene-state 定义 NodeHook<S> trait,有 before 和 after 方法:
#[async_trait]
pub trait NodeHook<S>: Send + Sync + 'static {
async fn before(
&self,
node_name: &str,
required: PermissionMode,
state: &S,
) -> Result<HookOutcome, GraphError> {
Ok(HookOutcome::Proceed)
}
async fn after(
&self,
node_name: &str,
state: &S,
result: NextStep,
) -> Result<NextStep, GraphError> {
Ok(result)
}
}
enum HookOutcome {
Proceed,
Modify(Value),
Deny(String),
Replace(String),
}
before 可以拒绝、修改或替换节点的输入。after 可以重写节点的下一步,这就是共享后处理节点如何在不编辑节点本身的情况下被缝合到图上。
权限模式
Hook 的主要用途是权限门控。Claude Code 的 ToolPermissionContext 携带一个来自小枚举的 PermissionMode:plan(只读)、default(破坏性操作前询问)、acceptEdits(预批准)、bypassPermissions(无门控)。每个工具声明最小模式;执行器检查当前模式,然后运行、询问用户或拒绝。
eugene-state 中的 Rust 等价物是一个枚举、Node 上的一个方法、以及一个内置 Hook:
pub enum PermissionMode {
Plan,
Default,
AcceptEdits,
BypassPermissions,
}
impl PermissionMode {
pub fn level(self) -> u8 { /* Plan=0 < Default=1 < AcceptEdits=2 < Bypass=3 */ }
pub fn permits(self, required: PermissionMode) -> bool {
self.level() >= required.level()
}
}
#[async_trait]
pub trait Node<S> {
fn name(&self) -> &'static str;
fn required_mode(&self) -> PermissionMode { PermissionMode::Plan }
async fn run(&self, state: &mut S) -> Result<NextStep, GraphError>;
}
ModeGate hook 在 before 中检查 required_mode:
impl<S: Send + Sync + 'static> NodeHook<S> for ModeGate {
async fn before(&self, node_name: &str, required: PermissionMode, state: &S)
-> Result<HookOutcome, GraphError> {
if self.current_mode.permits(required) {
Ok(HookOutcome::Proceed)
} else {
Ok(HookOutcome::Deny(format!(
"Node '{}' requires {:?} mode; current mode is {:?}.",
node_name, required, self.current_mode
)))
}
}
}
权限模式可以动态切换。Plan 模式禁止所有写操作,Agent 被迫生成计划而非执行。AcceptEdits 模式让已审查的 Agent 无人值守运行。BypassPermissions 用于测试和自动化流水线。切换模式不需要接触任何节点;它是 Graph 上的一次方法调用,修改 ModeGate hook 持有的状态。
这揭示了什么
Part 1 的循环、Part 2 的提示词、Part 3 的注册表:每个都是节点可以使用的工具。图是包装它们的东西。当你有一个阶段要运行时,节点是杀鸡用牛刀。当你有三个阶段时,图就是可信任代码与调试代码之间的区别。
图买的三个属性:
- 可恢复性:崩溃只丢失一个节点,不是整个任务。
- 门控:节点可以暂停并等待人类,无需保持进程打开。
- 可组合性:阶段可以添加或重排,无需接触运行器,因为节点只通过类型化状态和命名转换交互。
trait 形状是标准 Rust 工具包:async_trait 用于异步方法,关联状态类型由图携带,通过 Box<dyn Node<S>> 的对象安全分发,通过检查点 trait 的 serde 驱动持久化,以及 tokio::task::spawn_blocking 让 sqlite 写操作远离异步运行时。这些都不 exotic。当你用心组装时,这就是生产 Rust 的样子。
下一步:多 Agent 协作
图一次运行一个阶段,在一个进程里。下一个问题是跨阶段并行:一个研究员和一个作者应该同时看一个主题,然后汇合;一个规划者和一个批评者争论直到达成共识;一队相同的调查员各自处理不同问题。Part 5 引入多 Agent 团队:一个路由器分发给专家,一个辩论协议让两个 Agent 争论,以及 tokio-native 并行执行,其中编排是 join_all 而不是定制消息总线。Part 3 的注册表和 Part 4 的图都会跟来。
相关代码与参考
eugene/crates/eugene-state:trait、图、检查点存储的完整实现,包含 12 个单元测试,覆盖运行器(halt、interrupt、retry、max-steps、unknown-node)、内存检查点、SQLite 检查点(往返和重新打开)、权限模式算术、以及ModeGate/ post-hook 链重写 next-steps。SQLite 后端在sqlitefeature 后,不需要依赖的项目可以省略它。 ⭐ Star on GitHub- Claude Code 的 EnterPlanMode / ExitPlanMode 和背后的权限模式状态机
- LangGraph 和类型化状态 Agent 图的血统
觉得有帮助?
如果这篇内容对你有用,欢迎:
- 点赞 / 转发 / 收藏,让更多 Rust + AI 开发者看到;
- 关注公众号「全栈之巅-梦兽编程」,每周更新 Rust / AI 工程实践;
- 在评论区留下你的问题,关于状态机设计、检查点策略或权限模式都可以聊;
- 了解 AI 编程助手服务,用 Claude Code 级能力提升你的团队效率。
