Loop 工作流深度解析:迭代优化、终止条件与收敛性保障
深入剖析 ADK Go Loop 工作流的迭代机制、终止条件设计、收敛性分析、状态演化与生产级循环控制策略。
Loop 工作流深度解析:迭代优化、终止条件与收敛性保障
Loop 工作流是 Agent Team 中处理迭代优化类任务的核心模式。与 Sequential 和 Parallel 的一次性执行不同,Loop 通过循环-评估-优化的闭环机制,让 Agent 自主改进输出质量。然而,循环带来的风险——无限循环、收敛震荡、状态膨胀——需要严谨的工程控制。本文将深入 Loop 的数学模型、终止理论、状态演化与生产实践。
迭代优化的数学模型
收敛性分析
Loop 工作流可以形式化为一个迭代函数系统:
x_{n+1} = f(x_n, e_n)
其中:
x_n:第 n 次迭代的输出f:Agent 的改进函数e_n:第 n 次迭代的评估反馈
收敛条件:当存在某个质量度量 Q(x),使得 Q(x_{n+1}) >= Q(x_n) 对所有 n 成立,且 Q(x) 有上界,则序列 {x_n} 必然收敛。
// 收敛性分析器
type ConvergenceAnalyzer struct {
qualityHistory []float64
windowSize int
threshold float64
}
func (ca *ConvergenceAnalyzer) IsConverged() (bool, float64) {
if len(ca.qualityHistory) < ca.windowSize {
return false, 0
}
// 取最近 windowSize 次的质量值
recent := ca.qualityHistory[len(ca.qualityHistory)-ca.windowSize:]
// 计算方差
mean := calculateMean(recent)
variance := calculateVariance(recent, mean)
// 方差小于阈值认为已收敛
converged := variance < ca.threshold
return converged, variance
}
func (ca *ConvergenceAnalyzer) DetectOscillation() bool {
if len(ca.qualityHistory) < 4 {
return false
}
// 检测震荡模式:质量值反复升降
last4 := ca.qualityHistory[len(ca.qualityHistory)-4:]
upDown := (last4[1] > last4[0]) && (last4[2] < last4[1]) && (last4[3] > last4[2])
downUp := (last4[1] < last4[0]) && (last4[2] > last4[1]) && (last4[3] < last4[2])
return upDown || downUp
}
质量度量设计
// 多维度质量评估体系
type QualityMetrics struct {
// 内容质量(0-1)
Relevance float64 // 相关性
Coherence float64 // 连贯性
Completeness float64 // 完整性
Accuracy float64 // 准确性
// 结构质量(0-1)
Structure float64 // 结构合理性
Formatting float64 // 格式规范
// 语言质量(0-1)
Grammar float64 // 语法正确性
Style float64 // 风格一致性
// 业务质量(0-1)
Requirements float64 // 需求满足度
Constraints float64 // 约束满足度
}
func (qm *QualityMetrics) OverallScore() float64 {
// 加权综合评分
weights := map[string]float64{
"Relevance": 0.20,
"Coherence": 0.15,
"Completeness": 0.15,
"Accuracy": 0.20,
"Structure": 0.10,
"Formatting": 0.05,
"Grammar": 0.05,
"Style": 0.05,
"Requirements": 0.03,
"Constraints": 0.02,
}
score := 0.0
score += weights["Relevance"] * qm.Relevance
score += weights["Coherence"] * qm.Coherence
score += weights["Completeness"] * qm.Completeness
score += weights["Accuracy"] * qm.Accuracy
score += weights["Structure"] * qm.Structure
score += weights["Formatting"] * qm.Formatting
score += weights["Grammar"] * qm.Grammar
score += weights["Style"] * qm.Style
score += weights["Requirements"] * qm.Requirements
score += weights["Constraints"] * qm.Constraints
return score
}
// 通过 LLM 评估质量
func evaluateWithLLM(ctx context.Context, model agent.Model, content string, criteria []string) (*QualityMetrics, error) {
prompt := fmt.Sprintf(`请评估以下内容的质量,按以下维度打分(0-1):
%s
评估维度:
%s
请以 JSON 格式输出评分。`, content, strings.Join(criteria, "\n"))
response, err := model.GenerateContent(ctx, prompt)
if err != nil {
return nil, err
}
var metrics QualityMetrics
if err := json.Unmarshal([]byte(response), &metrics); err != nil {
return nil, err
}
return &metrics, nil
}
Loop 工作流架构设计
核心组件
type LoopWorkflow struct {
agent *agent.Agent // 迭代 Agent
evaluator Evaluator // 质量评估器
exitCondition ExitCondition // 退出条件
maxIterations int // 最大迭代次数
minIterations int // 最小迭代次数
convergenceCfg *ConvergenceConfig // 收敛配置
stateManager StateManager // 状态管理器
feedbackBuilder FeedbackBuilder // 反馈构建器
strategy IterationStrategy // 迭代策略
}
// 评估器接口
type Evaluator interface {
Evaluate(ctx context.Context, output string, target string) (*EvaluationResult, error)
}
// 退出条件接口
type ExitCondition interface {
ShouldExit(state *LoopState) (bool, string)
}
// 迭代状态
type LoopState struct {
Iteration int // 当前迭代次数
CurrentOutput string // 当前输出
PreviousOutput string // 上一次输出
QualityHistory []float64 // 质量历史
BestOutput string // 最佳输出
BestQuality float64 // 最佳质量
Feedback string // 当前反馈
Context map[string]interface{} // 上下文状态
StartTime time.Time // 开始时间
TokenUsed int // Token 消耗
}
// 评估结果
type EvaluationResult struct {
Quality float64 // 质量评分
Feedback string // 改进反馈
Metrics *QualityMetrics // 详细指标
Passed bool // 是否通过
Criteria map[string]bool // 各标准通过情况
}
执行引擎
func (lw *LoopWorkflow) Execute(ctx context.Context, input string) (*LoopResult, error) {
// 1. 初始化状态
state := &LoopState{
Iteration: 0,
Context: make(map[string]interface{}),
StartTime: time.Now(),
BestQuality: -1,
}
state.Context["original_input"] = input
// 2. 初始执行
output, err := lw.agent.Run(ctx, input)
if err != nil {
return nil, fmt.Errorf("initial execution failed: %w", err)
}
state.CurrentOutput = output
state.BestOutput = output
// 3. 评估初始输出
eval, err := lw.evaluator.Evaluate(ctx, output, input)
if err != nil {
return nil, fmt.Errorf("initial evaluation failed: %w", err)
}
state.QualityHistory = append(state.QualityHistory, eval.Quality)
state.BestQuality = eval.Quality
state.Feedback = eval.Feedback
// 4. 迭代优化循环
for {
state.Iteration++
// 检查退出条件
shouldExit, reason := lw.exitCondition.ShouldExit(state)
if shouldExit {
return lw.buildResult(state, reason), nil
}
// 检查最大迭代次数
if state.Iteration >= lw.maxIterations {
return lw.buildResult(state, "max_iterations_reached"), nil
}
// 构建迭代输入
iterationInput := lw.feedbackBuilder.Build(state, eval)
// 执行迭代
newOutput, err := lw.agent.Run(ctx, iterationInput)
if err != nil {
// 迭代失败,使用最佳历史结果
log.Printf("Iteration %d failed: %v", state.Iteration, err)
return lw.buildResult(state, "iteration_failed"), nil
}
state.PreviousOutput = state.CurrentOutput
state.CurrentOutput = newOutput
// 评估新输出
eval, err = lw.evaluator.Evaluate(ctx, newOutput, input)
if err != nil {
log.Printf("Evaluation failed at iteration %d: %v", state.Iteration, err)
continue
}
state.QualityHistory = append(state.QualityHistory, eval.Quality)
state.Feedback = eval.Feedback
state.TokenUsed += estimateTokens(newOutput)
// 更新最佳结果
if eval.Quality > state.BestQuality {
state.BestQuality = eval.Quality
state.BestOutput = newOutput
}
// 检测收敛
if lw.convergenceCfg != nil {
analyzer := &ConvergenceAnalyzer{
qualityHistory: state.QualityHistory,
windowSize: lw.convergenceCfg.WindowSize,
threshold: lw.convergenceCfg.Threshold,
}
if converged, variance := analyzer.IsConverged(); converged {
state.Context["convergence_variance"] = variance
return lw.buildResult(state, "converged"), nil
}
if analyzer.DetectOscillation() {
return lw.buildResult(state, "oscillation_detected"), nil
}
}
// 检查 Token 预算
if lw.tokenBudget != nil && !lw.tokenBudget.CanAllocate(1000) {
return lw.buildResult(state, "token_budget_exhausted"), nil
}
}
}
func (lw *LoopWorkflow) buildResult(state *LoopState, reason string) *LoopResult {
return &LoopResult{
Output: state.BestOutput,
Quality: state.BestQuality,
Iterations: state.Iteration,
ExitReason: reason,
QualityHistory: state.QualityHistory,
TokenUsed: state.TokenUsed,
Duration: time.Since(state.StartTime),
}
}
终止条件设计
复合退出条件
// 组合多个退出条件
type CompositeExitCondition struct {
conditions []ExitCondition
mode CompositeMode // ANY | ALL
}
type CompositeMode int
const (
ModeAny CompositeMode = iota // 任一条件满足即退出
ModeAll // 所有条件满足才退出
)
func (c *CompositeExitCondition) ShouldExit(state *LoopState) (bool, string) {
if c.mode == ModeAny {
for _, cond := range c.conditions {
if exit, reason := cond.ShouldExit(state); exit {
return true, reason
}
}
return false, ""
}
// ModeAll
reasons := make([]string, 0)
for _, cond := range c.conditions {
exit, reason := cond.ShouldExit(state)
if !exit {
return false, ""
}
reasons = append(reasons, reason)
}
return true, strings.Join(reasons, " + ")
}
// 具体退出条件实现
// 1. 质量阈值条件
type QualityThresholdCondition struct {
Threshold float64
}
func (c *QualityThresholdCondition) ShouldExit(state *LoopState) (bool, string) {
if state.BestQuality >= c.Threshold {
return true, fmt.Sprintf("quality_threshold_reached: %.3f >= %.3f",
state.BestQuality, c.Threshold)
}
return false, ""
}
// 2. 最大迭代次数条件
type MaxIterationCondition struct {
MaxIterations int
}
func (c *MaxIterationCondition) ShouldExit(state *LoopState) (bool, string) {
if state.Iteration >= c.MaxIterations {
return true, fmt.Sprintf("max_iterations: %d", c.MaxIterations)
}
return false, ""
}
// 3. 质量提升停滞条件
type ImprovementStallCondition struct {
WindowSize int
MinImprovement float64
}
func (c *ImprovementStallCondition) ShouldExit(state *LoopState) (bool, string) {
if len(state.QualityHistory) < c.WindowSize+1 {
return false, ""
}
recent := state.QualityHistory[len(state.QualityHistory)-c.WindowSize:]
best := state.QualityHistory[len(state.QualityHistory)-c.WindowSize-1]
for _, q := range recent {
if q-best >= c.MinImprovement {
return false, ""
}
}
return true, fmt.Sprintf("improvement_stalled: no improvement > %.3f in last %d iterations",
c.MinImprovement, c.WindowSize)
}
// 4. 时间预算条件
type TimeBudgetCondition struct {
MaxDuration time.Duration
}
func (c *TimeBudgetCondition) ShouldExit(state *LoopState) (bool, string) {
if time.Since(state.StartTime) >= c.MaxDuration {
return true, fmt.Sprintf("time_budget_exhausted: %v", c.MaxDuration)
}
return false, ""
}
// 5. 质量下降条件(防止过优化)
type QualityRegressionCondition struct {
Threshold float64
}
func (c *QualityRegressionCondition) ShouldExit(state *LoopState) (bool, string) {
if len(state.QualityHistory) < 2 {
return false, ""
}
last := state.QualityHistory[len(state.QualityHistory)-1]
prev := state.QualityHistory[len(state.QualityHistory)-2]
if prev-last > c.Threshold {
return true, fmt.Sprintf("quality_regression: %.3f -> %.3f", prev, last)
}
return false, ""
}
迭代策略设计
反馈构建策略
// 反馈构建器接口
type FeedbackBuilder interface {
Build(state *LoopState, eval *EvaluationResult) string
}
// 详细反馈构建器
type DetailedFeedbackBuilder struct{}
func (b *DetailedFeedbackBuilder) Build(state *LoopState, eval *EvaluationResult) string {
var feedback strings.Builder
feedback.WriteString(fmt.Sprintf("原始需求: %s\n\n", state.Context["original_input"]))
feedback.WriteString(fmt.Sprintf("当前迭代: %d\n", state.Iteration))
feedback.WriteString(fmt.Sprintf("当前质量评分: %.3f\n", eval.Quality))
feedback.WriteString(fmt.Sprintf("历史最佳: %.3f\n\n", state.BestQuality))
feedback.WriteString("评估反馈:\n")
feedback.WriteString(eval.Feedback)
feedback.WriteString("\n\n")
// 添加具体改进建议
feedback.WriteString("需要改进的方面:\n")
for criterion, passed := range eval.Criteria {
if !passed {
feedback.WriteString(fmt.Sprintf("- %s: 未达标\n", criterion))
}
}
feedback.WriteString("\n请基于以上反馈改进内容,输出优化后的版本。")
return feedback.String()
}
// 对比反馈构建器(提供前后对比)
type DiffFeedbackBuilder struct{}
func (b *DiffFeedbackBuilder) Build(state *LoopState, eval *EvaluationResult) string {
var feedback strings.Builder
feedback.WriteString("请优化以下内容。\n\n")
feedback.WriteString("【当前版本】\n")
feedback.WriteString(state.CurrentOutput)
feedback.WriteString("\n\n")
if state.PreviousOutput != "" {
feedback.WriteString("【上一版本】\n")
feedback.WriteString(state.PreviousOutput)
feedback.WriteString("\n\n")
feedback.WriteString(fmt.Sprintf("质量变化: %.3f -> %.3f\n",
state.QualityHistory[len(state.QualityHistory)-2], eval.Quality))
}
feedback.WriteString("\n改进方向: ")
feedback.WriteString(eval.Feedback)
return feedback.String()
}
自适应迭代策略
// 根据当前状态动态调整迭代策略
type AdaptiveIterationStrategy struct {
baseStrategy IterationStrategy
qualityTargets []float64 // 分阶段质量目标
currentPhase int
}
func (s *AdaptiveIterationStrategy) GetNextInput(state *LoopState, eval *EvaluationResult) string {
// 根据当前质量选择阶段
for i, target := range s.qualityTargets {
if state.BestQuality < target {
s.currentPhase = i
break
}
}
// 不同阶段使用不同策略
switch s.currentPhase {
case 0:
// 第一阶段:关注结构和完整性
return s.buildStructureFocusPrompt(state, eval)
case 1:
// 第二阶段:关注内容和准确性
return s.buildContentFocusPrompt(state, eval)
case 2:
// 第三阶段:关注语言和风格
return s.buildStyleFocusPrompt(state, eval)
default:
return s.baseStrategy.GetNextInput(state, eval)
}
}
func (s *AdaptiveIterationStrategy) buildStructureFocusPrompt(state *LoopState, eval *EvaluationResult) string {
return fmt.Sprintf(`当前质量: %.3f,重点关注结构优化。
当前内容:
%s
请优化以下方面:
1. 确保有清晰的标题和段落结构
2. 添加适当的小标题
3. 确保逻辑流程顺畅
4. 检查开头和结尾的完整性
输出优化后的完整版本。`, state.BestQuality, state.CurrentOutput)
}
实战场景:文案多轮优化系统
完整实现
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/google/adk-go/agent"
"github.com/google/adk-go/team"
)
// CopywritingOptimizer 文案优化系统
type CopywritingOptimizer struct {
workflow *team.LoopWorkflow
}
func NewCopywritingOptimizer(model agent.Model) (*CopywritingOptimizer, error) {
// 创建写作 Agent
writerAgent, err := agent.New(agent.Config{
Name: "copywriter",
Model: model,
Instruction: `你是资深文案策划。根据需求和反馈创作或优化文案。
要求:
1. 语言生动有感染力
2. 突出核心卖点
3. 符合目标受众语言习惯
4. 控制字数在要求范围内`,
Timeout: 30 * time.Second,
})
if err != nil {
return nil, err
}
// 构建评估器
evaluator := &CopywritingEvaluator{
model: model,
criteria: []EvaluationCriterion{
{Name: "吸引力", Weight: 0.25, Prompt: "评估文案的吸引力和 hooks 强度"},
{Name: "清晰度", Weight: 0.20, Prompt: "评估信息传达的清晰度"},
{Name: "说服力", Weight: 0.25, Prompt: "评估说服力和转化率潜力"},
{Name: "品牌调性", Weight: 0.15, Prompt: "评估是否符合品牌调性"},
{Name: "创意度", Weight: 0.15, Prompt: "评估创意和差异化程度"},
},
}
// 构建复合退出条件
exitCondition := team.NewCompositeExitCondition(team.ModeAny,
&team.QualityThresholdCondition{Threshold: 0.85}, // 质量达标
&team.MaxIterationCondition{MaxIterations: 5}, // 最多 5 轮
&team.ImprovementStallCondition{ // 提升停滞
WindowSize: 2,
MinImprovement: 0.05,
},
&team.TimeBudgetCondition{MaxDuration: 2 * time.Minute}, // 时间预算
&team.QualityRegressionCondition{Threshold: 0.10}, // 质量回退
)
// 构建工作流
workflow := team.NewLoopWorkflow(
team.WithAgent(writerAgent),
team.WithEvaluator(evaluator),
team.WithExitCondition(exitCondition),
team.WithFeedbackBuilder(&team.DiffFeedbackBuilder{}),
team.WithConvergenceConfig(&team.ConvergenceConfig{
WindowSize: 3,
Threshold: 0.01,
}),
team.WithTokenBudget(30000),
)
return &CopywritingOptimizer{workflow: workflow}, nil
}
// CopywritingEvaluator 文案评估器
type CopywritingEvaluator struct {
model agent.Model
criteria []EvaluationCriterion
}
type EvaluationCriterion struct {
Name string
Weight float64
Prompt string
}
func (e *CopywritingEvaluator) Evaluate(ctx context.Context, output string, target string) (*team.EvaluationResult, error) {
// 构建评估提示
prompt := fmt.Sprintf(`请评估以下文案,按维度打分(0-1,保留3位小数)。
目标: %s
文案内容:
%s
评估维度:
`, target, output)
for _, c := range e.criteria {
prompt += fmt.Sprintf("- %s (权重%.0f%%): %s\n", c.Name, c.Weight*100, c.Prompt)
}
prompt += `
请以 JSON 格式输出:
{
"scores": {"维度名": 分数},
"overall": 综合评分,
"feedback": "具体改进建议",
"passed": true/false
}`
response, err := e.model.GenerateContent(ctx, prompt)
if err != nil {
return nil, fmt.Errorf("evaluation failed: %w", err)
}
var evalData struct {
Scores map[string]float64 `json:"scores"`
Overall float64 `json:"overall"`
Feedback string `json:"feedback"`
Passed bool `json:"passed"`
}
if err := json.Unmarshal([]byte(response), &evalData); err != nil {
return nil, fmt.Errorf("parse evaluation: %w", err)
}
criteria := make(map[string]bool)
for name, score := range evalData.Scores {
criteria[name] = score >= 0.7
}
return &team.EvaluationResult{
Quality: evalData.Overall,
Feedback: evalData.Feedback,
Passed: evalData.Passed,
Criteria: criteria,
Metrics: &team.QualityMetrics{
// 映射到通用指标
},
}, nil
}
func main() {
ctx := context.Background()
optimizer, err := NewCopywritingOptimizer(model)
if err != nil {
log.Fatalf("Failed to create optimizer: %v", err)
}
result, err := optimizer.workflow.Execute(ctx,
`为新款智能手表创作社交媒体推广文案。
目标受众:25-35岁都市白领
核心卖点:7天续航、健康监测、时尚设计
字数要求:100-150字`)
if err != nil {
log.Fatalf("Optimization failed: %v", err)
}
fmt.Printf("最终文案(质量: %.3f):\n%s\n\n", result.Quality, result.Output)
fmt.Printf("迭代次数: %d\n", result.Iterations)
fmt.Printf("退出原因: %s\n", result.ExitReason)
fmt.Printf("质量历史: %v\n", result.QualityHistory)
fmt.Printf("Token 消耗: %d\n", result.TokenUsed)
fmt.Printf("总耗时: %v\n", result.Duration)
}
防死循环机制
多层防护
type AntiLoopProtection struct {
maxIterations int // 硬限制
timeBudget time.Duration // 时间限制
tokenBudget int // Token 限制
similarityThreshold float64 // 输出相似度阈值
history []string // 输出历史
}
func (p *AntiLoopProtection) Check(state *LoopState) (bool, string) {
// 1. 迭代次数检查
if state.Iteration >= p.maxIterations {
return true, "max_iterations"
}
// 2. 时间检查
if time.Since(state.StartTime) >= p.timeBudget {
return true, "time_budget"
}
// 3. Token 检查
if state.TokenUsed >= p.tokenBudget {
return true, "token_budget"
}
// 4. 相似度检查(检测循环)
if len(state.QualityHistory) >= 3 {
current := state.CurrentOutput
for i, hist := range p.history {
similarity := calculateSimilarity(current, hist)
if similarity > p.similarityThreshold {
return true, fmt.Sprintf("repeated_output (similarity %.3f with iteration %d)",
similarity, i)
}
}
}
p.history = append(p.history, state.CurrentOutput)
// 5. 质量回退检查
if len(state.QualityHistory) >= 2 {
last := state.QualityHistory[len(state.QualityHistory)-1]
prev := state.QualityHistory[len(state.QualityHistory)-2]
if prev-last > 0.2 {
return true, "significant_regression"
}
}
return false, ""
}
// 文本相似度计算(简化版 Jaccard)
func calculateSimilarity(a, b string) float64 {
setA := tokenize(a)
setB := tokenize(b)
intersection := 0
for token := range setA {
if setB[token] {
intersection++
}
}
union := len(setA) + len(setB) - intersection
if union == 0 {
return 1.0
}
return float64(intersection) / float64(union)
}
func tokenize(text string) map[string]bool {
tokens := make(map[string]bool)
words := strings.Fields(text)
for _, word := range words {
tokens[strings.ToLower(word)] = true
}
return tokens
}
状态演化与历史管理
压缩存储
type CompressedLoopHistory struct {
iterations int
qualityTrend []float64
bestOutputs []OutputSnapshot
compressionCfg *CompressionConfig
}
type OutputSnapshot struct {
Iteration int
Quality float64
Hash string // 内容哈希,用于去重
Summary string // 摘要(替代完整内容)
}
func (h *CompressedLoopHistory) Add(output string, quality float64, iteration int) {
// 只保留关键迭代的结果
if h.shouldKeep(iteration, quality) {
hash := sha256.Sum256([]byte(output))
h.bestOutputs = append(h.bestOutputs, OutputSnapshot{
Iteration: iteration,
Quality: quality,
Hash: fmt.Sprintf("%x", hash[:8]),
Summary: summarize(output, 200),
})
}
h.qualityTrend = append(h.qualityTrend, quality)
h.iterations = iteration
}
func (h *CompressedLoopHistory) shouldKeep(iteration int, quality float64) bool {
// 保留:首次、最佳、最近
if iteration == 1 {
return true
}
if quality > h.getBestQuality() {
return true
}
if iteration > h.iterations-2 {
return true
}
return false
}
常见问题深度解析
Q:Loop 工作流如何确保收敛?
A:收敛性无法绝对保证,但可以通过以下手段提高概率:
- 质量单调性:确保评估器给出的反馈确实指向改进方向
- 反馈质量:使用详细的、可操作的反馈,而非笼统的"不够好"
- 迭代降温:后期迭代降低改动幅度,避免震荡
- 多起点:从不同初始输出启动多个 Loop,选择最佳结果
Q:评估器本身不准确怎么办?
A:三层保障:
- 多评估器投票:使用 3 个不同评估器,取中位数
- 人类反馈强化:关键节点引入人工审核
- 评估器校准:定期用标注数据校准评估标准
Q:Loop 的 Token 消耗如何控制?
A:策略组合:
- 设置硬预算上限
- 使用轻量级模型进行评估
- 压缩历史上下文(只保留摘要)
- 早期终止(质量提升 < 0.01 时提前退出)
下一步
Loop 工作流的迭代优化与收敛控制已深入掌握。接下来探索 Custom Workflow——自定义工作流的灵活编排、动态调度与复杂场景适配。
← Parallel 工作流 | Custom Workflow →
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
