Loop 工作流深度解析:迭代优化、终止条件与收敛性保障

Loop 工作流是 Agent Team 中处理迭代优化类任务的核心模式。与 Sequential 和 Parallel 的一次性执行不同,Loop 通过循环-评估-优化的闭环机制,让 Agent 自主改进输出质量。然而,循环带来的风险——无限循环、收敛震荡、状态膨胀——需要严谨的工程控制。本文将深入 Loop 的数学模型、终止理论、状态演化与生产实践。

迭代优化的数学模型

收敛性分析

Loop 工作流可以形式化为一个迭代函数系统:

x_{n+1} = f(x_n, e_n)

其中:

  • x_n:第 n 次迭代的输出
  • f:Agent 的改进函数
  • e_n:第 n 次迭代的评估反馈

收敛条件:当存在某个质量度量 Q(x),使得 Q(x_{n+1}) >= Q(x_n) 对所有 n 成立,且 Q(x) 有上界,则序列 {x_n} 必然收敛。

// 收敛性分析器
type ConvergenceAnalyzer struct {
    qualityHistory []float64
    windowSize     int
    threshold      float64
}

func (ca *ConvergenceAnalyzer) IsConverged() (bool, float64) {
    if len(ca.qualityHistory) < ca.windowSize {
        return false, 0
    }
    
    // 取最近 windowSize 次的质量值
    recent := ca.qualityHistory[len(ca.qualityHistory)-ca.windowSize:]
    
    // 计算方差
    mean := calculateMean(recent)
    variance := calculateVariance(recent, mean)
    
    // 方差小于阈值认为已收敛
    converged := variance < ca.threshold
    
    return converged, variance
}

func (ca *ConvergenceAnalyzer) DetectOscillation() bool {
    if len(ca.qualityHistory) < 4 {
        return false
    }
    
    // 检测震荡模式:质量值反复升降
    last4 := ca.qualityHistory[len(ca.qualityHistory)-4:]
    
    upDown := (last4[1] > last4[0]) && (last4[2] < last4[1]) && (last4[3] > last4[2])
    downUp := (last4[1] < last4[0]) && (last4[2] > last4[1]) && (last4[3] < last4[2])
    
    return upDown || downUp
}

质量度量设计

// 多维度质量评估体系
type QualityMetrics struct {
    // 内容质量(0-1)
    Relevance     float64 // 相关性
    Coherence     float64 // 连贯性
    Completeness  float64 // 完整性
    Accuracy      float64 // 准确性
    
    // 结构质量(0-1)
    Structure     float64 // 结构合理性
    Formatting    float64 // 格式规范
    
    // 语言质量(0-1)
    Grammar       float64 // 语法正确性
    Style         float64 // 风格一致性
    
    // 业务质量(0-1)
    Requirements  float64 // 需求满足度
    Constraints   float64 // 约束满足度
}

func (qm *QualityMetrics) OverallScore() float64 {
    // 加权综合评分
    weights := map[string]float64{
        "Relevance":    0.20,
        "Coherence":    0.15,
        "Completeness": 0.15,
        "Accuracy":     0.20,
        "Structure":    0.10,
        "Formatting":   0.05,
        "Grammar":      0.05,
        "Style":        0.05,
        "Requirements": 0.03,
        "Constraints":  0.02,
    }
    
    score := 0.0
    score += weights["Relevance"] * qm.Relevance
    score += weights["Coherence"] * qm.Coherence
    score += weights["Completeness"] * qm.Completeness
    score += weights["Accuracy"] * qm.Accuracy
    score += weights["Structure"] * qm.Structure
    score += weights["Formatting"] * qm.Formatting
    score += weights["Grammar"] * qm.Grammar
    score += weights["Style"] * qm.Style
    score += weights["Requirements"] * qm.Requirements
    score += weights["Constraints"] * qm.Constraints
    
    return score
}

// 通过 LLM 评估质量
func evaluateWithLLM(ctx context.Context, model agent.Model, content string, criteria []string) (*QualityMetrics, error) {
    prompt := fmt.Sprintf(`请评估以下内容的质量,按以下维度打分(0-1):

%s

评估维度:
%s

请以 JSON 格式输出评分。`, content, strings.Join(criteria, "\n"))
    
    response, err := model.GenerateContent(ctx, prompt)
    if err != nil {
        return nil, err
    }
    
    var metrics QualityMetrics
    if err := json.Unmarshal([]byte(response), &metrics); err != nil {
        return nil, err
    }
    
    return &metrics, nil
}

Loop 工作流架构设计

核心组件

type LoopWorkflow struct {
    agent           *agent.Agent        // 迭代 Agent
    evaluator       Evaluator           // 质量评估器
    exitCondition   ExitCondition       // 退出条件
    maxIterations   int                 // 最大迭代次数
    minIterations   int                 // 最小迭代次数
    convergenceCfg  *ConvergenceConfig  // 收敛配置
    stateManager    StateManager        // 状态管理器
    feedbackBuilder FeedbackBuilder     // 反馈构建器
    strategy        IterationStrategy   // 迭代策略
}

// 评估器接口
type Evaluator interface {
    Evaluate(ctx context.Context, output string, target string) (*EvaluationResult, error)
}

// 退出条件接口
type ExitCondition interface {
    ShouldExit(state *LoopState) (bool, string)
}

// 迭代状态
type LoopState struct {
    Iteration      int                    // 当前迭代次数
    CurrentOutput  string                 // 当前输出
    PreviousOutput string                 // 上一次输出
    QualityHistory []float64              // 质量历史
    BestOutput     string                 // 最佳输出
    BestQuality    float64                // 最佳质量
    Feedback       string                 // 当前反馈
    Context        map[string]interface{} // 上下文状态
    StartTime      time.Time              // 开始时间
    TokenUsed      int                    // Token 消耗
}

// 评估结果
type EvaluationResult struct {
    Quality    float64            // 质量评分
    Feedback   string             // 改进反馈
    Metrics    *QualityMetrics    // 详细指标
    Passed     bool               // 是否通过
    Criteria   map[string]bool    // 各标准通过情况
}

执行引擎

func (lw *LoopWorkflow) Execute(ctx context.Context, input string) (*LoopResult, error) {
    // 1. 初始化状态
    state := &LoopState{
        Iteration: 0,
        Context:   make(map[string]interface{}),
        StartTime: time.Now(),
        BestQuality: -1,
    }
    
    state.Context["original_input"] = input
    
    // 2. 初始执行
    output, err := lw.agent.Run(ctx, input)
    if err != nil {
        return nil, fmt.Errorf("initial execution failed: %w", err)
    }
    
    state.CurrentOutput = output
    state.BestOutput = output
    
    // 3. 评估初始输出
    eval, err := lw.evaluator.Evaluate(ctx, output, input)
    if err != nil {
        return nil, fmt.Errorf("initial evaluation failed: %w", err)
    }
    
    state.QualityHistory = append(state.QualityHistory, eval.Quality)
    state.BestQuality = eval.Quality
    state.Feedback = eval.Feedback
    
    // 4. 迭代优化循环
    for {
        state.Iteration++
        
        // 检查退出条件
        shouldExit, reason := lw.exitCondition.ShouldExit(state)
        if shouldExit {
            return lw.buildResult(state, reason), nil
        }
        
        // 检查最大迭代次数
        if state.Iteration >= lw.maxIterations {
            return lw.buildResult(state, "max_iterations_reached"), nil
        }
        
        // 构建迭代输入
        iterationInput := lw.feedbackBuilder.Build(state, eval)
        
        // 执行迭代
        newOutput, err := lw.agent.Run(ctx, iterationInput)
        if err != nil {
            // 迭代失败,使用最佳历史结果
            log.Printf("Iteration %d failed: %v", state.Iteration, err)
            return lw.buildResult(state, "iteration_failed"), nil
        }
        
        state.PreviousOutput = state.CurrentOutput
        state.CurrentOutput = newOutput
        
        // 评估新输出
        eval, err = lw.evaluator.Evaluate(ctx, newOutput, input)
        if err != nil {
            log.Printf("Evaluation failed at iteration %d: %v", state.Iteration, err)
            continue
        }
        
        state.QualityHistory = append(state.QualityHistory, eval.Quality)
        state.Feedback = eval.Feedback
        state.TokenUsed += estimateTokens(newOutput)
        
        // 更新最佳结果
        if eval.Quality > state.BestQuality {
            state.BestQuality = eval.Quality
            state.BestOutput = newOutput
        }
        
        // 检测收敛
        if lw.convergenceCfg != nil {
            analyzer := &ConvergenceAnalyzer{
                qualityHistory: state.QualityHistory,
                windowSize:     lw.convergenceCfg.WindowSize,
                threshold:      lw.convergenceCfg.Threshold,
            }
            
            if converged, variance := analyzer.IsConverged(); converged {
                state.Context["convergence_variance"] = variance
                return lw.buildResult(state, "converged"), nil
            }
            
            if analyzer.DetectOscillation() {
                return lw.buildResult(state, "oscillation_detected"), nil
            }
        }
        
        // 检查 Token 预算
        if lw.tokenBudget != nil && !lw.tokenBudget.CanAllocate(1000) {
            return lw.buildResult(state, "token_budget_exhausted"), nil
        }
    }
}

func (lw *LoopWorkflow) buildResult(state *LoopState, reason string) *LoopResult {
    return &LoopResult{
        Output:        state.BestOutput,
        Quality:       state.BestQuality,
        Iterations:    state.Iteration,
        ExitReason:    reason,
        QualityHistory: state.QualityHistory,
        TokenUsed:     state.TokenUsed,
        Duration:      time.Since(state.StartTime),
    }
}

终止条件设计

复合退出条件

// 组合多个退出条件
type CompositeExitCondition struct {
    conditions []ExitCondition
    mode       CompositeMode // ANY | ALL
}

type CompositeMode int
const (
    ModeAny CompositeMode = iota // 任一条件满足即退出
    ModeAll                       // 所有条件满足才退出
)

func (c *CompositeExitCondition) ShouldExit(state *LoopState) (bool, string) {
    if c.mode == ModeAny {
        for _, cond := range c.conditions {
            if exit, reason := cond.ShouldExit(state); exit {
                return true, reason
            }
        }
        return false, ""
    }
    
    // ModeAll
    reasons := make([]string, 0)
    for _, cond := range c.conditions {
        exit, reason := cond.ShouldExit(state)
        if !exit {
            return false, ""
        }
        reasons = append(reasons, reason)
    }
    return true, strings.Join(reasons, " + ")
}

// 具体退出条件实现

// 1. 质量阈值条件
type QualityThresholdCondition struct {
    Threshold float64
}

func (c *QualityThresholdCondition) ShouldExit(state *LoopState) (bool, string) {
    if state.BestQuality >= c.Threshold {
        return true, fmt.Sprintf("quality_threshold_reached: %.3f >= %.3f", 
            state.BestQuality, c.Threshold)
    }
    return false, ""
}

// 2. 最大迭代次数条件
type MaxIterationCondition struct {
    MaxIterations int
}

func (c *MaxIterationCondition) ShouldExit(state *LoopState) (bool, string) {
    if state.Iteration >= c.MaxIterations {
        return true, fmt.Sprintf("max_iterations: %d", c.MaxIterations)
    }
    return false, ""
}

// 3. 质量提升停滞条件
type ImprovementStallCondition struct {
    WindowSize    int
    MinImprovement float64
}

func (c *ImprovementStallCondition) ShouldExit(state *LoopState) (bool, string) {
    if len(state.QualityHistory) < c.WindowSize+1 {
        return false, ""
    }
    
    recent := state.QualityHistory[len(state.QualityHistory)-c.WindowSize:]
    best := state.QualityHistory[len(state.QualityHistory)-c.WindowSize-1]
    
    for _, q := range recent {
        if q-best >= c.MinImprovement {
            return false, ""
        }
    }
    
    return true, fmt.Sprintf("improvement_stalled: no improvement > %.3f in last %d iterations",
        c.MinImprovement, c.WindowSize)
}

// 4. 时间预算条件
type TimeBudgetCondition struct {
    MaxDuration time.Duration
}

func (c *TimeBudgetCondition) ShouldExit(state *LoopState) (bool, string) {
    if time.Since(state.StartTime) >= c.MaxDuration {
        return true, fmt.Sprintf("time_budget_exhausted: %v", c.MaxDuration)
    }
    return false, ""
}

// 5. 质量下降条件(防止过优化)
type QualityRegressionCondition struct {
    Threshold float64
}

func (c *QualityRegressionCondition) ShouldExit(state *LoopState) (bool, string) {
    if len(state.QualityHistory) < 2 {
        return false, ""
    }
    
    last := state.QualityHistory[len(state.QualityHistory)-1]
    prev := state.QualityHistory[len(state.QualityHistory)-2]
    
    if prev-last > c.Threshold {
        return true, fmt.Sprintf("quality_regression: %.3f -> %.3f", prev, last)
    }
    return false, ""
}

迭代策略设计

反馈构建策略

// 反馈构建器接口
type FeedbackBuilder interface {
    Build(state *LoopState, eval *EvaluationResult) string
}

// 详细反馈构建器
type DetailedFeedbackBuilder struct{}

func (b *DetailedFeedbackBuilder) Build(state *LoopState, eval *EvaluationResult) string {
    var feedback strings.Builder
    
    feedback.WriteString(fmt.Sprintf("原始需求: %s\n\n", state.Context["original_input"]))
    feedback.WriteString(fmt.Sprintf("当前迭代: %d\n", state.Iteration))
    feedback.WriteString(fmt.Sprintf("当前质量评分: %.3f\n", eval.Quality))
    feedback.WriteString(fmt.Sprintf("历史最佳: %.3f\n\n", state.BestQuality))
    
    feedback.WriteString("评估反馈:\n")
    feedback.WriteString(eval.Feedback)
    feedback.WriteString("\n\n")
    
    // 添加具体改进建议
    feedback.WriteString("需要改进的方面:\n")
    for criterion, passed := range eval.Criteria {
        if !passed {
            feedback.WriteString(fmt.Sprintf("- %s: 未达标\n", criterion))
        }
    }
    
    feedback.WriteString("\n请基于以上反馈改进内容,输出优化后的版本。")
    
    return feedback.String()
}

// 对比反馈构建器(提供前后对比)
type DiffFeedbackBuilder struct{}

func (b *DiffFeedbackBuilder) Build(state *LoopState, eval *EvaluationResult) string {
    var feedback strings.Builder
    
    feedback.WriteString("请优化以下内容。\n\n")
    feedback.WriteString("【当前版本】\n")
    feedback.WriteString(state.CurrentOutput)
    feedback.WriteString("\n\n")
    
    if state.PreviousOutput != "" {
        feedback.WriteString("【上一版本】\n")
        feedback.WriteString(state.PreviousOutput)
        feedback.WriteString("\n\n")
        
        feedback.WriteString(fmt.Sprintf("质量变化: %.3f -> %.3f\n", 
            state.QualityHistory[len(state.QualityHistory)-2], eval.Quality))
    }
    
    feedback.WriteString("\n改进方向: ")
    feedback.WriteString(eval.Feedback)
    
    return feedback.String()
}

自适应迭代策略

// 根据当前状态动态调整迭代策略
type AdaptiveIterationStrategy struct {
    baseStrategy    IterationStrategy
    qualityTargets  []float64 // 分阶段质量目标
    currentPhase    int
}

func (s *AdaptiveIterationStrategy) GetNextInput(state *LoopState, eval *EvaluationResult) string {
    // 根据当前质量选择阶段
    for i, target := range s.qualityTargets {
        if state.BestQuality < target {
            s.currentPhase = i
            break
        }
    }
    
    // 不同阶段使用不同策略
    switch s.currentPhase {
    case 0:
        // 第一阶段:关注结构和完整性
        return s.buildStructureFocusPrompt(state, eval)
    case 1:
        // 第二阶段:关注内容和准确性
        return s.buildContentFocusPrompt(state, eval)
    case 2:
        // 第三阶段:关注语言和风格
        return s.buildStyleFocusPrompt(state, eval)
    default:
        return s.baseStrategy.GetNextInput(state, eval)
    }
}

func (s *AdaptiveIterationStrategy) buildStructureFocusPrompt(state *LoopState, eval *EvaluationResult) string {
    return fmt.Sprintf(`当前质量: %.3f,重点关注结构优化。

当前内容:
%s

请优化以下方面:
1. 确保有清晰的标题和段落结构
2. 添加适当的小标题
3. 确保逻辑流程顺畅
4. 检查开头和结尾的完整性

输出优化后的完整版本。`, state.BestQuality, state.CurrentOutput)
}

实战场景:文案多轮优化系统

完整实现

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "strings"
    "time"
    
    "github.com/google/adk-go/agent"
    "github.com/google/adk-go/team"
)

// CopywritingOptimizer 文案优化系统
type CopywritingOptimizer struct {
    workflow *team.LoopWorkflow
}

func NewCopywritingOptimizer(model agent.Model) (*CopywritingOptimizer, error) {
    // 创建写作 Agent
    writerAgent, err := agent.New(agent.Config{
        Name:        "copywriter",
        Model:       model,
        Instruction: `你是资深文案策划。根据需求和反馈创作或优化文案。
要求:
1. 语言生动有感染力
2. 突出核心卖点
3. 符合目标受众语言习惯
4. 控制字数在要求范围内`,
        Timeout: 30 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    
    // 构建评估器
    evaluator := &CopywritingEvaluator{
        model: model,
        criteria: []EvaluationCriterion{
            {Name: "吸引力", Weight: 0.25, Prompt: "评估文案的吸引力和 hooks 强度"},
            {Name: "清晰度", Weight: 0.20, Prompt: "评估信息传达的清晰度"},
            {Name: "说服力", Weight: 0.25, Prompt: "评估说服力和转化率潜力"},
            {Name: "品牌调性", Weight: 0.15, Prompt: "评估是否符合品牌调性"},
            {Name: "创意度", Weight: 0.15, Prompt: "评估创意和差异化程度"},
        },
    }
    
    // 构建复合退出条件
    exitCondition := team.NewCompositeExitCondition(team.ModeAny,
        &team.QualityThresholdCondition{Threshold: 0.85},      // 质量达标
        &team.MaxIterationCondition{MaxIterations: 5},          // 最多 5 轮
        &team.ImprovementStallCondition{                         // 提升停滞
            WindowSize:     2,
            MinImprovement: 0.05,
        },
        &team.TimeBudgetCondition{MaxDuration: 2 * time.Minute}, // 时间预算
        &team.QualityRegressionCondition{Threshold: 0.10},      // 质量回退
    )
    
    // 构建工作流
    workflow := team.NewLoopWorkflow(
        team.WithAgent(writerAgent),
        team.WithEvaluator(evaluator),
        team.WithExitCondition(exitCondition),
        team.WithFeedbackBuilder(&team.DiffFeedbackBuilder{}),
        team.WithConvergenceConfig(&team.ConvergenceConfig{
            WindowSize: 3,
            Threshold:  0.01,
        }),
        team.WithTokenBudget(30000),
    )
    
    return &CopywritingOptimizer{workflow: workflow}, nil
}

// CopywritingEvaluator 文案评估器
type CopywritingEvaluator struct {
    model    agent.Model
    criteria []EvaluationCriterion
}

type EvaluationCriterion struct {
    Name   string
    Weight float64
    Prompt string
}

func (e *CopywritingEvaluator) Evaluate(ctx context.Context, output string, target string) (*team.EvaluationResult, error) {
    // 构建评估提示
    prompt := fmt.Sprintf(`请评估以下文案,按维度打分(0-1,保留3位小数)。

目标: %s

文案内容:
%s

评估维度:
`, target, output)
    
    for _, c := range e.criteria {
        prompt += fmt.Sprintf("- %s (权重%.0f%%): %s\n", c.Name, c.Weight*100, c.Prompt)
    }
    
    prompt += `
请以 JSON 格式输出:
{
  "scores": {"维度名": 分数},
  "overall": 综合评分,
  "feedback": "具体改进建议",
  "passed": true/false
}`
    
    response, err := e.model.GenerateContent(ctx, prompt)
    if err != nil {
        return nil, fmt.Errorf("evaluation failed: %w", err)
    }
    
    var evalData struct {
        Scores  map[string]float64 `json:"scores"`
        Overall float64            `json:"overall"`
        Feedback string           `json:"feedback"`
        Passed  bool              `json:"passed"`
    }
    
    if err := json.Unmarshal([]byte(response), &evalData); err != nil {
        return nil, fmt.Errorf("parse evaluation: %w", err)
    }
    
    criteria := make(map[string]bool)
    for name, score := range evalData.Scores {
        criteria[name] = score >= 0.7
    }
    
    return &team.EvaluationResult{
        Quality:  evalData.Overall,
        Feedback: evalData.Feedback,
        Passed:   evalData.Passed,
        Criteria: criteria,
        Metrics: &team.QualityMetrics{
            // 映射到通用指标
        },
    }, nil
}

func main() {
    ctx := context.Background()
    
    optimizer, err := NewCopywritingOptimizer(model)
    if err != nil {
        log.Fatalf("Failed to create optimizer: %v", err)
    }
    
    result, err := optimizer.workflow.Execute(ctx, 
        `为新款智能手表创作社交媒体推广文案。
目标受众:25-35岁都市白领
核心卖点:7天续航、健康监测、时尚设计
字数要求:100-150字`)
    
    if err != nil {
        log.Fatalf("Optimization failed: %v", err)
    }
    
    fmt.Printf("最终文案(质量: %.3f):\n%s\n\n", result.Quality, result.Output)
    fmt.Printf("迭代次数: %d\n", result.Iterations)
    fmt.Printf("退出原因: %s\n", result.ExitReason)
    fmt.Printf("质量历史: %v\n", result.QualityHistory)
    fmt.Printf("Token 消耗: %d\n", result.TokenUsed)
    fmt.Printf("总耗时: %v\n", result.Duration)
}

防死循环机制

多层防护

type AntiLoopProtection struct {
    maxIterations    int           // 硬限制
    timeBudget       time.Duration // 时间限制
    tokenBudget      int           // Token 限制
    similarityThreshold float64    // 输出相似度阈值
    history          []string      // 输出历史
}

func (p *AntiLoopProtection) Check(state *LoopState) (bool, string) {
    // 1. 迭代次数检查
    if state.Iteration >= p.maxIterations {
        return true, "max_iterations"
    }
    
    // 2. 时间检查
    if time.Since(state.StartTime) >= p.timeBudget {
        return true, "time_budget"
    }
    
    // 3. Token 检查
    if state.TokenUsed >= p.tokenBudget {
        return true, "token_budget"
    }
    
    // 4. 相似度检查(检测循环)
    if len(state.QualityHistory) >= 3 {
        current := state.CurrentOutput
        for i, hist := range p.history {
            similarity := calculateSimilarity(current, hist)
            if similarity > p.similarityThreshold {
                return true, fmt.Sprintf("repeated_output (similarity %.3f with iteration %d)", 
                    similarity, i)
            }
        }
    }
    p.history = append(p.history, state.CurrentOutput)
    
    // 5. 质量回退检查
    if len(state.QualityHistory) >= 2 {
        last := state.QualityHistory[len(state.QualityHistory)-1]
        prev := state.QualityHistory[len(state.QualityHistory)-2]
        if prev-last > 0.2 {
            return true, "significant_regression"
        }
    }
    
    return false, ""
}

// 文本相似度计算(简化版 Jaccard)
func calculateSimilarity(a, b string) float64 {
    setA := tokenize(a)
    setB := tokenize(b)
    
    intersection := 0
    for token := range setA {
        if setB[token] {
            intersection++
        }
    }
    
    union := len(setA) + len(setB) - intersection
    if union == 0 {
        return 1.0
    }
    
    return float64(intersection) / float64(union)
}

func tokenize(text string) map[string]bool {
    tokens := make(map[string]bool)
    words := strings.Fields(text)
    for _, word := range words {
        tokens[strings.ToLower(word)] = true
    }
    return tokens
}

状态演化与历史管理

压缩存储

type CompressedLoopHistory struct {
    iterations     int
    qualityTrend   []float64
    bestOutputs    []OutputSnapshot
    compressionCfg *CompressionConfig
}

type OutputSnapshot struct {
    Iteration int
    Quality   float64
    Hash      string // 内容哈希,用于去重
    Summary   string // 摘要(替代完整内容)
}

func (h *CompressedLoopHistory) Add(output string, quality float64, iteration int) {
    // 只保留关键迭代的结果
    if h.shouldKeep(iteration, quality) {
        hash := sha256.Sum256([]byte(output))
        
        h.bestOutputs = append(h.bestOutputs, OutputSnapshot{
            Iteration: iteration,
            Quality:   quality,
            Hash:      fmt.Sprintf("%x", hash[:8]),
            Summary:   summarize(output, 200),
        })
    }
    
    h.qualityTrend = append(h.qualityTrend, quality)
    h.iterations = iteration
}

func (h *CompressedLoopHistory) shouldKeep(iteration int, quality float64) bool {
    // 保留:首次、最佳、最近
    if iteration == 1 {
        return true
    }
    
    if quality > h.getBestQuality() {
        return true
    }
    
    if iteration > h.iterations-2 {
        return true
    }
    
    return false
}

常见问题深度解析

Q:Loop 工作流如何确保收敛?

A:收敛性无法绝对保证,但可以通过以下手段提高概率:

  1. 质量单调性:确保评估器给出的反馈确实指向改进方向
  2. 反馈质量:使用详细的、可操作的反馈,而非笼统的"不够好"
  3. 迭代降温:后期迭代降低改动幅度,避免震荡
  4. 多起点:从不同初始输出启动多个 Loop,选择最佳结果

Q:评估器本身不准确怎么办?

A:三层保障:

  1. 多评估器投票:使用 3 个不同评估器,取中位数
  2. 人类反馈强化:关键节点引入人工审核
  3. 评估器校准:定期用标注数据校准评估标准

Q:Loop 的 Token 消耗如何控制?

A:策略组合:

  • 设置硬预算上限
  • 使用轻量级模型进行评估
  • 压缩历史上下文(只保留摘要)
  • 早期终止(质量提升 < 0.01 时提前退出)

下一步

Loop 工作流的迭代优化与收敛控制已深入掌握。接下来探索 Custom Workflow——自定义工作流的灵活编排、动态调度与复杂场景适配。

Parallel 工作流 | Custom Workflow →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。