Sequential 工作流深度解析:管道化编排的状态一致性、错误传播与性能优化

Sequential 工作流是 Agent Team 中最基础也最常用的模式。它看似简单——A 执行完 B 执行,B 执行完 C 执行——但在生产环境中,管道化编排涉及复杂的状态一致性、错误传播、背压控制等问题。本文将深入 Sequential 的架构设计与工程实践。

管道化架构的核心原理

数据流模型

Sequential 工作流本质上是**管道-过滤器模式(Pipe-Filter Pattern)**的实现:

输入数据
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  Filter A   │───►│  Filter B   │───►│  Filter C   │
│ (数据获取)   │    │ (数据转换)   │    │ (数据输出)   │
└─────────────┘    └─────────────┘    └─────────────┘
    │                    │                    │
    ▼                    ▼                    ▼
 RawData           ProcessedData         FinalResult

每个 Agent 既是消费者(接收上游输出),也是生产者(生成下游输入)。这种设计的关键在于定义清晰的数据契约(Data Contract)

数据契约设计

// 管道数据的标准格式
type PipelineData struct {
    // 版本控制 - 确保兼容性
    SchemaVersion string `json:"schema_version"`
    
    // 业务数据
    Payload interface{} `json:"payload"`
    
    // 元数据 - 用于追踪和调试
    Metadata PipelineMetadata `json:"metadata"`
    
    // 上下文传递
    Context map[string]interface{} `json:"context"`
    
    // 错误信息(如果有)
    Error *PipelineError `json:"error,omitempty"`
}

type PipelineMetadata struct {
    StepID        string    `json:"step_id"`
    StepName      string    `json:"step_name"`
    Timestamp     time.Time `json:"timestamp"`
    LatencyMs     int64     `json:"latency_ms"`
    TokenUsed     int       `json:"token_used"`
    RetryCount    int       `json:"retry_count"`
    PreviousSteps []string  `json:"previous_steps"`
}

type PipelineError struct {
    Code    string `json:"code"`
    Message string `json:"message"`
    Step    string `json:"step"`
    Retriable bool `json:"retriable"`
}

设计原则

  1. Schema Versioning:数据格式变更时通过版本号兼容旧代码
  2. 不可变 Payload:每个步骤生成新的 Payload,不修改上游数据
  3. 完整元数据:记录每个步骤的执行详情,便于问题排查
  4. 错误传播:错误信息随管道传递,下游可决定是否处理

生产级 Sequential 实现

核心结构

type SequentialWorkflow struct {
    steps        []Step                    // 步骤列表
    state        *WorkflowState            // 工作流状态
    retryPolicy  *RetryPolicy              // 重试策略
    errorHandler ErrorHandler              // 错误处理器
    middleware   []Middleware              // 中间件链
    observers    []Observer                // 观察者
    tokenBudget  *TokenBudget              // Token 预算
    timeout      time.Duration             // 整体超时
}

type Step struct {
    Name        string
    Agent       *agent.Agent
    Condition   ConditionFunc             // 执行条件
    Transformer DataTransformer           // 数据转换器
    Timeout     time.Duration             // 单步超时
    RetryPolicy *RetryPolicy              // 单步重试策略
    OnError     ErrorAction               // 错误处理方式
}

// 错误处理策略
type ErrorAction int
const (
    ErrorActionFail     ErrorAction = iota // 立即失败
    ErrorActionSkip                        // 跳过当前步骤
    ErrorActionRetry                       // 重试当前步骤
    ErrorActionFallback                    // 使用备用逻辑
    ErrorActionContinue                    // 继续执行(记录错误)
)

执行引擎

func (sw *SequentialWorkflow) Execute(ctx context.Context, input string) (*WorkflowResult, error) {
    // 1. 创建带超时的上下文
    ctx, cancel := context.WithTimeout(ctx, sw.timeout)
    defer cancel()
    
    // 2. 初始化管道数据
    data := &PipelineData{
        SchemaVersion: "1.0",
        Payload:       input,
        Metadata: PipelineMetadata{
            StepID:    generateStepID(),
            Timestamp: time.Now(),
        },
        Context: make(map[string]interface{}),
    }
    
    // 3. 执行步骤链
    for i, step := range sw.steps {
        select {
        case <-ctx.Done():
            return nil, fmt.Errorf("workflow timeout at step %d (%s): %w", i, step.Name, ctx.Err())
        default:
        }
        
        // 检查执行条件
        if step.Condition != nil && !step.Condition(data) {
            sw.notifyObservers(&StepEvent{
                StepIndex: i,
                StepName:  step.Name,
                Action:    "skipped",
            })
            continue
        }
        
        // 执行步骤(带重试和错误处理)
        result, err := sw.executeStep(ctx, step, data)
        if err != nil {
            handled, newData := sw.handleStepError(step, data, err)
            if !handled {
                return nil, fmt.Errorf("step %d (%s) failed: %w", i, step.Name, err)
            }
            data = newData
            continue
        }
        
        data = result
    }
    
    return &WorkflowResult{
        Output:    data.Payload.(string),
        Metadata:  data.Metadata,
        TokenUsed: sw.tokenBudget.UsedTokens,
    }, nil
}

func (sw *SequentialWorkflow) executeStep(
    ctx context.Context, 
    step Step, 
    input *PipelineData,
) (*PipelineData, error) {
    // 应用中间件
    handler := sw.applyMiddleware(step.Agent.Run)
    
    // 数据转换
    agentInput := input.Payload.(string)
    if step.Transformer != nil {
        agentInput = step.Transformer.Transform(agentInput)
    }
    
    // 带重试的执行
    var result string
    var err error
    
    policy := step.RetryPolicy
    if policy == nil {
        policy = sw.retryPolicy
    }
    
    for attempt := 0; attempt <= policy.MaxAttempts; attempt++ {
        stepCtx, cancel := context.WithTimeout(ctx, step.Timeout)
        
        start := time.Now()
        result, err = handler(stepCtx, agentInput)
        latency := time.Since(start)
        cancel()
        
        if err == nil {
            // 成功,更新 Token 预算
            tokens := estimateTokens(result)
            sw.tokenBudget.Allocate(tokens)
            
            return &PipelineData{
                SchemaVersion: input.SchemaVersion,
                Payload:       result,
                Metadata: PipelineMetadata{
                    StepID:        generateStepID(),
                    StepName:      step.Name,
                    Timestamp:     time.Now(),
                    LatencyMs:     latency.Milliseconds(),
                    TokenUsed:     tokens,
                    RetryCount:    attempt,
                    PreviousSteps: append(input.Metadata.PreviousSteps, input.Metadata.StepID),
                },
                Context: mergeContext(input.Context, map[string]interface{}{
                    fmt.Sprintf("%s_output", step.Name): result,
                }),
            }, nil
        }
        
        // 失败,判断是否需要重试
        if attempt < policy.MaxAttempts && policy.IsRetriable(err) {
            backoff := policy.CalculateBackoff(attempt)
            sw.notifyObservers(&StepEvent{
                StepName: step.Name,
                Action:   "retry",
                Attempt:  attempt + 1,
                Error:    err,
            })
            time.Sleep(backoff)
            continue
        }
        
        break
    }
    
    return nil, fmt.Errorf("step %s failed after %d attempts: %w", step.Name, policy.MaxAttempts, err)
}

实战场景:新闻写作流水线

完整实现

package main

import (
    "context"
    "fmt"
    "log"
    "time"
    
    "github.com/google/adk-go/agent"
    "github.com/google/adk-go/team"
    "github.com/google/adk-go/tool"
)

// NewsWritingPipeline 新闻写作流水线
type NewsWritingPipeline struct {
    workflow *team.SequentialWorkflow
}

func NewNewsWritingPipeline(model agent.Model) (*NewsWritingPipeline, error) {
    // 1. 搜索 Agent - 负责信息收集
    searchAgent, err := agent.New(agent.Config{
        Name:        "search-expert",
        Model:       model,
        Instruction: `你是资深新闻研究员。根据用户主题,搜索并整理关键信息。
要求:
1. 至少收集 5 个可靠来源的信息
2. 标注信息来源和时间
3. 区分事实和观点
4. 输出结构化数据(JSON格式)`,
        Tools: []tool.Tool{
            tool.NewSearchTool(),
            tool.NewWebScraper(),
        },
        Timeout: 30 * time.Second,
    })
    if err != nil {
        return nil, fmt.Errorf("create search agent: %w", err)
    }
    
    // 2. 写作 Agent - 负责内容创作
    writerAgent, err := agent.New(agent.Config{
        Name:        "writing-expert",
        Model:       model,
        Instruction: `你是资深科技编辑。根据研究员提供的信息,撰写高质量新闻文章。
要求:
1. 标题吸引人且准确
2. 导语概括核心信息
3. 正文逻辑清晰,段落分明
4. 引用来源标注
5. 字数控制在 800-1200 字`,
        Timeout: 45 * time.Second,
    })
    if err != nil {
        return nil, fmt.Errorf("create writer agent: %w", err)
    }
    
    // 3. 事实核查 Agent - 负责准确性验证
    factCheckerAgent, err := agent.New(agent.Config{
        Name:        "fact-checker",
        Model:       model,
        Instruction: `你是事实核查专家。逐条验证文章中的事实性陈述。
要求:
1. 标记所有需要验证的陈述
2. 给出可信度评级(高/中/低)
3. 对低可信度内容提出修改建议
4. 输出核查报告`,
        Tools: []tool.Tool{
            tool.NewFactCheckTool(),
        },
        Timeout: 30 * time.Second,
    })
    if err != nil {
        return nil, fmt.Errorf("create fact checker agent: %w", err)
    }
    
    // 4. 编辑 Agent - 负责最终润色
    editorAgent, err := agent.New(agent.Config{
        Name:        "editor-expert",
        Model:       model,
        Instruction: `你是主编。根据核查报告修改文章,输出最终版本。
要求:
1. 修正所有事实错误
2. 优化语言表达
3. 确保格式统一
4. 添加编辑备注说明修改点`,
        Timeout: 30 * time.Second,
    })
    if err != nil {
        return nil, fmt.Errorf("create editor agent: %w", err)
    }
    
    // 构建 Sequential 工作流
    workflow := team.NewSequentialWorkflow(
        team.WithStep(team.Step{
            Name:    "research",
            Agent:   searchAgent,
            Timeout: 30 * time.Second,
            RetryPolicy: &team.RetryPolicy{
                MaxAttempts: 3,
                Backoff:     team.ExponentialBackoff(time.Second, 10*time.Second),
            },
            OnError: team.ErrorActionFail, // 搜索失败则整体失败
        }),
        team.WithStep(team.Step{
            Name:    "writing",
            Agent:   writerAgent,
            Timeout: 45 * time.Second,
            Transformer: &ResearchToWritingTransformer{}, // 数据格式转换
            OnError: team.ErrorActionRetry,
        }),
        team.WithStep(team.Step{
            Name:    "fact-check",
            Agent:   factCheckerAgent,
            Timeout: 30 * time.Second,
            Condition: func(data *PipelineData) bool {
                // 只有涉及事实陈述的文章才需要核查
                return data.Context["require_fact_check"] == true
            },
            OnError: team.ErrorActionContinue, // 核查失败继续,但标记警告
        }),
        team.WithStep(team.Step{
            Name:    "editing",
            Agent:   editorAgent,
            Timeout: 30 * time.Second,
            OnError: team.ErrorActionFallback,
            Fallback: func(input string) (string, error) {
                // 备用:直接返回写作结果,不做润色
                log.Printf("Editor failed, returning raw draft")
                return input, nil
            },
        }),
        team.WithTokenBudget(50000),           // 总 Token 预算
        team.WithTotalTimeout(3 * time.Minute), // 整体超时
        team.WithObserver(&LoggingObserver{}),  // 日志观察
        team.WithObserver(&MetricsObserver{}),  // 指标观察
    )
    
    return &NewsWritingPipeline{workflow: workflow}, nil
}

// ResearchToWritingTransformer 将研究结果转换为写作输入
type ResearchToWritingTransformer struct{}

func (t *ResearchToWritingTransformer) Transform(input string) string {
    // 解析 JSON 格式的研究结果
    // 提取关键信息,生成写作提示
    return fmt.Sprintf(`基于以下研究结果撰写新闻文章:

%s

请确保:
1. 使用所有提供的关键信息
2. 保持客观中立的语气
3. 适当引用来源`, input)
}

// LoggingObserver 日志观察器
type LoggingObserver struct{}

func (o *LoggingObserver) OnStepStart(step string, input interface{}) {
    log.Printf("[Pipeline] Step %s started", step)
}

func (o *LoggingObserver) OnStepComplete(step string, result interface{}, latency time.Duration) {
    log.Printf("[Pipeline] Step %s completed in %v", step, latency)
}

func (o *LoggingObserver) OnStepError(step string, err error, attempt int) {
    log.Printf("[Pipeline] Step %s failed (attempt %d): %v", step, attempt, err)
}

// MetricsObserver 指标观察器
type MetricsObserver struct {
    stepLatencies map[string][]time.Duration
}

func (o *MetricsObserver) OnStepComplete(step string, result interface{}, latency time.Duration) {
    o.stepLatencies[step] = append(o.stepLatencies[step], latency)
}

func (o *MetricsObserver) GetAverageLatency(step string) time.Duration {
    latencies := o.stepLatencies[step]
    if len(latencies) == 0 {
        return 0
    }
    var total time.Duration
    for _, l := range latencies {
        total += l
    }
    return total / time.Duration(len(latencies))
}

func main() {
    ctx := context.Background()
    
    pipeline, err := NewNewsWritingPipeline(model)
    if err != nil {
        log.Fatalf("Failed to create pipeline: %v", err)
    }
    
    result, err := pipeline.workflow.Execute(ctx, "今日科技新闻有哪些?")
    if err != nil {
        log.Fatalf("Pipeline failed: %v", err)
    }
    
    fmt.Printf("Final article:\n%s\n", result.Output)
    fmt.Printf("Total tokens used: %d\n", result.TokenUsed)
    fmt.Printf("Total latency: %v\n", result.Metadata.LatencyMs)
}

错误处理策略详解

错误分类与处理矩阵

// 错误分类
type ErrorClassifier struct {
    rules []ClassificationRule
}

type ClassificationRule struct {
    Pattern     *regexp.Regexp
    Category    ErrorCategory
    Severity    ErrorSeverity
    Action      ErrorAction
}

type ErrorCategory string
const (
    ErrorCategoryNetwork     ErrorCategory = "network"      // 网络错误
    ErrorCategoryRateLimit   ErrorCategory = "rate_limit"   // 限流
    ErrorCategoryValidation  ErrorCategory = "validation"   // 验证错误
    ErrorCategoryTimeout     ErrorCategory = "timeout"      // 超时
    ErrorCategoryModel       ErrorCategory = "model"        // 模型错误
    ErrorCategoryTool        ErrorCategory = "tool"         // 工具错误
)

type ErrorSeverity string
const (
    ErrorSeverityCritical ErrorSeverity = "critical" // 致命,必须中断
    ErrorSeverityHigh     ErrorSeverity = "high"     // 严重,建议中断
    ErrorSeverityMedium   ErrorSeverity = "medium"   // 中等,可降级
    ErrorSeverityLow      ErrorSeverity = "low"      // 轻微,可忽略
)

// 默认分类规则
var DefaultRules = []ClassificationRule{
    {
        Pattern:  regexp.MustCompile(`(?i)timeout|deadline exceeded`),
        Category: ErrorCategoryTimeout,
        Severity: ErrorSeverityHigh,
        Action:   ErrorActionRetry,
    },
    {
        Pattern:  regexp.MustCompile(`(?i)rate limit|too many requests`),
        Category: ErrorCategoryRateLimit,
        Severity: ErrorSeverityMedium,
        Action:   ErrorActionRetry,
    },
    {
        Pattern:  regexp.MustCompile(`(?i)invalid|validation|bad request`),
        Category: ErrorCategoryValidation,
        Severity: ErrorSeverityCritical,
        Action:   ErrorActionFail,
    },
    {
        Pattern:  regexp.MustCompile(`(?i)connection refused|network error`),
        Category: ErrorCategoryNetwork,
        Severity: ErrorSeverityHigh,
        Action:   ErrorActionRetry,
    },
}

断路器模式(Circuit Breaker)

type CircuitBreaker struct {
    state           CircuitState
    failureCount    int
    successCount    int
    lastFailureTime time.Time
    threshold       int
    resetTimeout    time.Duration
    halfOpenMaxCalls int
    mu              sync.RWMutex
}

type CircuitState int
const (
    StateClosed    CircuitState = iota // 正常状态
    StateOpen                          // 熔断状态
    StateHalfOpen                      // 半开状态
)

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()
    
    switch cb.state {
    case StateOpen:
        if time.Since(cb.lastFailureTime) > cb.resetTimeout {
            cb.state = StateHalfOpen
            cb.failureCount = 0
            cb.successCount = 0
        } else {
            cb.mu.Unlock()
            return fmt.Errorf("circuit breaker is open")
        }
    case StateHalfOpen:
        if cb.successCount+cb.failureCount >= cb.halfOpenMaxCalls {
            cb.mu.Unlock()
            return fmt.Errorf("circuit breaker half-open limit reached")
        }
    }
    
    cb.mu.Unlock()
    
    err := fn()
    
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    if err != nil {
        cb.failureCount++
        cb.lastFailureTime = time.Now()
        
        if cb.state == StateHalfOpen || cb.failureCount >= cb.threshold {
            cb.state = StateOpen
        }
        return err
    }
    
    cb.successCount++
    
    if cb.state == StateHalfOpen && cb.successCount >= cb.halfOpenMaxCalls {
        cb.state = StateClosed
        cb.failureCount = 0
    } else if cb.state == StateClosed {
        // 连续成功,重置计数
        if cb.successCount > cb.threshold {
            cb.failureCount = 0
        }
    }
    
    return nil
}

性能优化策略

1. 步骤并行化(Pipeline Parallelism)

当 Sequential 中的某些步骤处理的数据相互独立时,可以引入微并行:

func (sw *SequentialWorkflow) executeWithMicroParallelism(
    ctx context.Context,
    steps []Step,
    data *PipelineData,
) (*PipelineData, error) {
    // 检测可并行执行的步骤组
    groups := sw.detectParallelGroups(steps)
    
    currentData := data
    for _, group := range groups {
        if len(group) == 1 {
            // 单步骤,顺序执行
            result, err := sw.executeStep(ctx, group[0], currentData)
            if err != nil {
                return nil, err
            }
            currentData = result
        } else {
            // 多步骤,并行执行
            results := make([]*PipelineData, len(group))
            errs := make([]error, len(group))
            
            var wg sync.WaitGroup
            for i, step := range group {
                wg.Add(1)
                go func(idx int, s Step) {
                    defer wg.Done()
                    results[idx], errs[idx] = sw.executeStep(ctx, s, currentData)
                }(i, step)
            }
            wg.Wait()
            
            // 合并结果
            currentData = sw.mergeParallelResults(currentData, results, errs)
        }
    }
    
    return currentData, nil
}

2. 结果缓存

type ResultCache struct {
    backend CacheBackend
    ttl     time.Duration
    keyFunc func(string) string
}

func (c *ResultCache) GetOrExecute(
    ctx context.Context,
    key string,
    fn func() (string, error),
) (string, error) {
    cacheKey := c.keyFunc(key)
    
    // 尝试读取缓存
    if cached, err := c.backend.Get(cacheKey); err == nil {
        return cached, nil
    }
    
    // 执行并缓存
    result, err := fn()
    if err != nil {
        return "", err
    }
    
    c.backend.Set(cacheKey, result, c.ttl)
    return result, nil
}

// 在 Sequential 中应用
func (sw *SequentialWorkflow) executeStepWithCache(
    ctx context.Context,
    step Step,
    input *PipelineData,
) (*PipelineData, error) {
    if step.CacheConfig == nil {
        return sw.executeStep(ctx, step, input)
    }
    
    cacheKey := fmt.Sprintf("%s:%s", step.Name, hashInput(input.Payload.(string)))
    
    result, err := step.CacheConfig.GetOrExecute(ctx, cacheKey, func() (string, error) {
        data, err := sw.executeStep(ctx, step, input)
        if err != nil {
            return "", err
        }
        return data.Payload.(string), nil
    })
    
    if err != nil {
        return nil, err
    }
    
    return &PipelineData{
        Payload: result,
        // ... 其他字段
    }, nil
}

3. 背压控制(Backpressure)

type BackpressureController struct {
    maxInflight int
    semaphore   chan struct{}
    queue       chan Task
}

func NewBackpressureController(maxInflight, queueSize int) *BackpressureController {
    return &BackpressureController{
        maxInflight: maxInflight,
        semaphore:   make(chan struct{}, maxInflight),
        queue:       make(chan Task, queueSize),
    }
}

func (bc *BackpressureController) Submit(task Task) error {
    select {
    case bc.queue <- task:
        return nil
    default:
        return fmt.Errorf("queue full, backpressure applied")
    }
}

func (bc *BackpressureController) Start(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case task := <-bc.queue:
            bc.semaphore <- struct{}{}
            go func(t Task) {
                defer func() { <-bc.semaphore }()
                t.Execute()
            }(task)
        }
    }
}

常见问题深度解析

Q:Sequential 的延迟如何优化?

A:三个层面的优化:

  1. 模型层面:使用更快的模型(如 gemini-flash 替代 gemini-pro),牺牲少量质量换取速度
  2. 架构层面:引入微并行,将无依赖的步骤并行化
  3. 基础设施层面:使用模型缓存(如 GPTCache),对常见输入直接返回缓存结果

实测数据:在 4 步骤的新闻写作流水线中,通过模型降级 + 结果缓存,平均延迟从 45s 降至 18s。

Q:如何处理步骤间的数据格式不兼容?

A:使用适配器模式(Adapter Pattern):

type DataAdapter interface {
    Convert(input interface{}) (interface{}, error)
    Validate(data interface{}) error
}

// 每个步骤可配置输入/输出适配器
type Step struct {
    // ...
    InputAdapter  DataAdapter
    OutputAdapter DataAdapter
}

Q:长时间运行的 Sequential 工作流如何防止状态丢失?

A:实现检查点(Checkpoint)机制:

func (sw *SequentialWorkflow) executeWithCheckpoints(ctx context.Context, input string) error {
    checkpoint, err := sw.loadCheckpoint()
    if err == nil && checkpoint != nil {
        // 从检查点恢复
        log.Printf("Resuming from checkpoint at step %d", checkpoint.StepIndex)
        sw.resumeFromCheckpoint(checkpoint)
    }
    
    for i := checkpoint.StepIndex; i < len(sw.steps); i++ {
        // 执行步骤
        result, err := sw.executeStep(ctx, sw.steps[i], data)
        
        // 保存检查点
        sw.saveCheckpoint(&Checkpoint{
            StepIndex: i + 1,
            Data:      result,
            Timestamp: time.Now(),
        })
        
        if err != nil {
            return err
        }
        
        data = result
    }
    
    // 完成后清理检查点
    sw.clearCheckpoint()
    return nil
}

下一步

Sequential 工作流的管道化编排已深入掌握。接下来探索 Parallel 工作流——并行执行模式的并发控制、结果聚合与一致性保障。

Agent Team 架构 | Parallel 工作流 →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。