Sequential 工作流深度解析:管道化编排的状态一致性、错误传播与性能优化
深入剖析 ADK Go Sequential 工作流的内部实现机制,涵盖管道设计模式、状态传递协议、错误处理策略、背压控制与生产环境调优。
Table of Contents
Sequential 工作流深度解析:管道化编排的状态一致性、错误传播与性能优化
Sequential 工作流是 Agent Team 中最基础也最常用的模式。它看似简单——A 执行完 B 执行,B 执行完 C 执行——但在生产环境中,管道化编排涉及复杂的状态一致性、错误传播、背压控制等问题。本文将深入 Sequential 的架构设计与工程实践。
管道化架构的核心原理
数据流模型
Sequential 工作流本质上是**管道-过滤器模式(Pipe-Filter Pattern)**的实现:
输入数据
│
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Filter A │───►│ Filter B │───►│ Filter C │
│ (数据获取) │ │ (数据转换) │ │ (数据输出) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
RawData ProcessedData FinalResult
每个 Agent 既是消费者(接收上游输出),也是生产者(生成下游输入)。这种设计的关键在于定义清晰的数据契约(Data Contract)。
数据契约设计
// 管道数据的标准格式
type PipelineData struct {
// 版本控制 - 确保兼容性
SchemaVersion string `json:"schema_version"`
// 业务数据
Payload interface{} `json:"payload"`
// 元数据 - 用于追踪和调试
Metadata PipelineMetadata `json:"metadata"`
// 上下文传递
Context map[string]interface{} `json:"context"`
// 错误信息(如果有)
Error *PipelineError `json:"error,omitempty"`
}
type PipelineMetadata struct {
StepID string `json:"step_id"`
StepName string `json:"step_name"`
Timestamp time.Time `json:"timestamp"`
LatencyMs int64 `json:"latency_ms"`
TokenUsed int `json:"token_used"`
RetryCount int `json:"retry_count"`
PreviousSteps []string `json:"previous_steps"`
}
type PipelineError struct {
Code string `json:"code"`
Message string `json:"message"`
Step string `json:"step"`
Retriable bool `json:"retriable"`
}
设计原则:
- Schema Versioning:数据格式变更时通过版本号兼容旧代码
- 不可变 Payload:每个步骤生成新的 Payload,不修改上游数据
- 完整元数据:记录每个步骤的执行详情,便于问题排查
- 错误传播:错误信息随管道传递,下游可决定是否处理
生产级 Sequential 实现
核心结构
type SequentialWorkflow struct {
steps []Step // 步骤列表
state *WorkflowState // 工作流状态
retryPolicy *RetryPolicy // 重试策略
errorHandler ErrorHandler // 错误处理器
middleware []Middleware // 中间件链
observers []Observer // 观察者
tokenBudget *TokenBudget // Token 预算
timeout time.Duration // 整体超时
}
type Step struct {
Name string
Agent *agent.Agent
Condition ConditionFunc // 执行条件
Transformer DataTransformer // 数据转换器
Timeout time.Duration // 单步超时
RetryPolicy *RetryPolicy // 单步重试策略
OnError ErrorAction // 错误处理方式
}
// 错误处理策略
type ErrorAction int
const (
ErrorActionFail ErrorAction = iota // 立即失败
ErrorActionSkip // 跳过当前步骤
ErrorActionRetry // 重试当前步骤
ErrorActionFallback // 使用备用逻辑
ErrorActionContinue // 继续执行(记录错误)
)
执行引擎
func (sw *SequentialWorkflow) Execute(ctx context.Context, input string) (*WorkflowResult, error) {
// 1. 创建带超时的上下文
ctx, cancel := context.WithTimeout(ctx, sw.timeout)
defer cancel()
// 2. 初始化管道数据
data := &PipelineData{
SchemaVersion: "1.0",
Payload: input,
Metadata: PipelineMetadata{
StepID: generateStepID(),
Timestamp: time.Now(),
},
Context: make(map[string]interface{}),
}
// 3. 执行步骤链
for i, step := range sw.steps {
select {
case <-ctx.Done():
return nil, fmt.Errorf("workflow timeout at step %d (%s): %w", i, step.Name, ctx.Err())
default:
}
// 检查执行条件
if step.Condition != nil && !step.Condition(data) {
sw.notifyObservers(&StepEvent{
StepIndex: i,
StepName: step.Name,
Action: "skipped",
})
continue
}
// 执行步骤(带重试和错误处理)
result, err := sw.executeStep(ctx, step, data)
if err != nil {
handled, newData := sw.handleStepError(step, data, err)
if !handled {
return nil, fmt.Errorf("step %d (%s) failed: %w", i, step.Name, err)
}
data = newData
continue
}
data = result
}
return &WorkflowResult{
Output: data.Payload.(string),
Metadata: data.Metadata,
TokenUsed: sw.tokenBudget.UsedTokens,
}, nil
}
func (sw *SequentialWorkflow) executeStep(
ctx context.Context,
step Step,
input *PipelineData,
) (*PipelineData, error) {
// 应用中间件
handler := sw.applyMiddleware(step.Agent.Run)
// 数据转换
agentInput := input.Payload.(string)
if step.Transformer != nil {
agentInput = step.Transformer.Transform(agentInput)
}
// 带重试的执行
var result string
var err error
policy := step.RetryPolicy
if policy == nil {
policy = sw.retryPolicy
}
for attempt := 0; attempt <= policy.MaxAttempts; attempt++ {
stepCtx, cancel := context.WithTimeout(ctx, step.Timeout)
start := time.Now()
result, err = handler(stepCtx, agentInput)
latency := time.Since(start)
cancel()
if err == nil {
// 成功,更新 Token 预算
tokens := estimateTokens(result)
sw.tokenBudget.Allocate(tokens)
return &PipelineData{
SchemaVersion: input.SchemaVersion,
Payload: result,
Metadata: PipelineMetadata{
StepID: generateStepID(),
StepName: step.Name,
Timestamp: time.Now(),
LatencyMs: latency.Milliseconds(),
TokenUsed: tokens,
RetryCount: attempt,
PreviousSteps: append(input.Metadata.PreviousSteps, input.Metadata.StepID),
},
Context: mergeContext(input.Context, map[string]interface{}{
fmt.Sprintf("%s_output", step.Name): result,
}),
}, nil
}
// 失败,判断是否需要重试
if attempt < policy.MaxAttempts && policy.IsRetriable(err) {
backoff := policy.CalculateBackoff(attempt)
sw.notifyObservers(&StepEvent{
StepName: step.Name,
Action: "retry",
Attempt: attempt + 1,
Error: err,
})
time.Sleep(backoff)
continue
}
break
}
return nil, fmt.Errorf("step %s failed after %d attempts: %w", step.Name, policy.MaxAttempts, err)
}
实战场景:新闻写作流水线
完整实现
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/google/adk-go/agent"
"github.com/google/adk-go/team"
"github.com/google/adk-go/tool"
)
// NewsWritingPipeline 新闻写作流水线
type NewsWritingPipeline struct {
workflow *team.SequentialWorkflow
}
func NewNewsWritingPipeline(model agent.Model) (*NewsWritingPipeline, error) {
// 1. 搜索 Agent - 负责信息收集
searchAgent, err := agent.New(agent.Config{
Name: "search-expert",
Model: model,
Instruction: `你是资深新闻研究员。根据用户主题,搜索并整理关键信息。
要求:
1. 至少收集 5 个可靠来源的信息
2. 标注信息来源和时间
3. 区分事实和观点
4. 输出结构化数据(JSON格式)`,
Tools: []tool.Tool{
tool.NewSearchTool(),
tool.NewWebScraper(),
},
Timeout: 30 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("create search agent: %w", err)
}
// 2. 写作 Agent - 负责内容创作
writerAgent, err := agent.New(agent.Config{
Name: "writing-expert",
Model: model,
Instruction: `你是资深科技编辑。根据研究员提供的信息,撰写高质量新闻文章。
要求:
1. 标题吸引人且准确
2. 导语概括核心信息
3. 正文逻辑清晰,段落分明
4. 引用来源标注
5. 字数控制在 800-1200 字`,
Timeout: 45 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("create writer agent: %w", err)
}
// 3. 事实核查 Agent - 负责准确性验证
factCheckerAgent, err := agent.New(agent.Config{
Name: "fact-checker",
Model: model,
Instruction: `你是事实核查专家。逐条验证文章中的事实性陈述。
要求:
1. 标记所有需要验证的陈述
2. 给出可信度评级(高/中/低)
3. 对低可信度内容提出修改建议
4. 输出核查报告`,
Tools: []tool.Tool{
tool.NewFactCheckTool(),
},
Timeout: 30 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("create fact checker agent: %w", err)
}
// 4. 编辑 Agent - 负责最终润色
editorAgent, err := agent.New(agent.Config{
Name: "editor-expert",
Model: model,
Instruction: `你是主编。根据核查报告修改文章,输出最终版本。
要求:
1. 修正所有事实错误
2. 优化语言表达
3. 确保格式统一
4. 添加编辑备注说明修改点`,
Timeout: 30 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("create editor agent: %w", err)
}
// 构建 Sequential 工作流
workflow := team.NewSequentialWorkflow(
team.WithStep(team.Step{
Name: "research",
Agent: searchAgent,
Timeout: 30 * time.Second,
RetryPolicy: &team.RetryPolicy{
MaxAttempts: 3,
Backoff: team.ExponentialBackoff(time.Second, 10*time.Second),
},
OnError: team.ErrorActionFail, // 搜索失败则整体失败
}),
team.WithStep(team.Step{
Name: "writing",
Agent: writerAgent,
Timeout: 45 * time.Second,
Transformer: &ResearchToWritingTransformer{}, // 数据格式转换
OnError: team.ErrorActionRetry,
}),
team.WithStep(team.Step{
Name: "fact-check",
Agent: factCheckerAgent,
Timeout: 30 * time.Second,
Condition: func(data *PipelineData) bool {
// 只有涉及事实陈述的文章才需要核查
return data.Context["require_fact_check"] == true
},
OnError: team.ErrorActionContinue, // 核查失败继续,但标记警告
}),
team.WithStep(team.Step{
Name: "editing",
Agent: editorAgent,
Timeout: 30 * time.Second,
OnError: team.ErrorActionFallback,
Fallback: func(input string) (string, error) {
// 备用:直接返回写作结果,不做润色
log.Printf("Editor failed, returning raw draft")
return input, nil
},
}),
team.WithTokenBudget(50000), // 总 Token 预算
team.WithTotalTimeout(3 * time.Minute), // 整体超时
team.WithObserver(&LoggingObserver{}), // 日志观察
team.WithObserver(&MetricsObserver{}), // 指标观察
)
return &NewsWritingPipeline{workflow: workflow}, nil
}
// ResearchToWritingTransformer 将研究结果转换为写作输入
type ResearchToWritingTransformer struct{}
func (t *ResearchToWritingTransformer) Transform(input string) string {
// 解析 JSON 格式的研究结果
// 提取关键信息,生成写作提示
return fmt.Sprintf(`基于以下研究结果撰写新闻文章:
%s
请确保:
1. 使用所有提供的关键信息
2. 保持客观中立的语气
3. 适当引用来源`, input)
}
// LoggingObserver 日志观察器
type LoggingObserver struct{}
func (o *LoggingObserver) OnStepStart(step string, input interface{}) {
log.Printf("[Pipeline] Step %s started", step)
}
func (o *LoggingObserver) OnStepComplete(step string, result interface{}, latency time.Duration) {
log.Printf("[Pipeline] Step %s completed in %v", step, latency)
}
func (o *LoggingObserver) OnStepError(step string, err error, attempt int) {
log.Printf("[Pipeline] Step %s failed (attempt %d): %v", step, attempt, err)
}
// MetricsObserver 指标观察器
type MetricsObserver struct {
stepLatencies map[string][]time.Duration
}
func (o *MetricsObserver) OnStepComplete(step string, result interface{}, latency time.Duration) {
o.stepLatencies[step] = append(o.stepLatencies[step], latency)
}
func (o *MetricsObserver) GetAverageLatency(step string) time.Duration {
latencies := o.stepLatencies[step]
if len(latencies) == 0 {
return 0
}
var total time.Duration
for _, l := range latencies {
total += l
}
return total / time.Duration(len(latencies))
}
func main() {
ctx := context.Background()
pipeline, err := NewNewsWritingPipeline(model)
if err != nil {
log.Fatalf("Failed to create pipeline: %v", err)
}
result, err := pipeline.workflow.Execute(ctx, "今日科技新闻有哪些?")
if err != nil {
log.Fatalf("Pipeline failed: %v", err)
}
fmt.Printf("Final article:\n%s\n", result.Output)
fmt.Printf("Total tokens used: %d\n", result.TokenUsed)
fmt.Printf("Total latency: %v\n", result.Metadata.LatencyMs)
}
错误处理策略详解
错误分类与处理矩阵
// 错误分类
type ErrorClassifier struct {
rules []ClassificationRule
}
type ClassificationRule struct {
Pattern *regexp.Regexp
Category ErrorCategory
Severity ErrorSeverity
Action ErrorAction
}
type ErrorCategory string
const (
ErrorCategoryNetwork ErrorCategory = "network" // 网络错误
ErrorCategoryRateLimit ErrorCategory = "rate_limit" // 限流
ErrorCategoryValidation ErrorCategory = "validation" // 验证错误
ErrorCategoryTimeout ErrorCategory = "timeout" // 超时
ErrorCategoryModel ErrorCategory = "model" // 模型错误
ErrorCategoryTool ErrorCategory = "tool" // 工具错误
)
type ErrorSeverity string
const (
ErrorSeverityCritical ErrorSeverity = "critical" // 致命,必须中断
ErrorSeverityHigh ErrorSeverity = "high" // 严重,建议中断
ErrorSeverityMedium ErrorSeverity = "medium" // 中等,可降级
ErrorSeverityLow ErrorSeverity = "low" // 轻微,可忽略
)
// 默认分类规则
var DefaultRules = []ClassificationRule{
{
Pattern: regexp.MustCompile(`(?i)timeout|deadline exceeded`),
Category: ErrorCategoryTimeout,
Severity: ErrorSeverityHigh,
Action: ErrorActionRetry,
},
{
Pattern: regexp.MustCompile(`(?i)rate limit|too many requests`),
Category: ErrorCategoryRateLimit,
Severity: ErrorSeverityMedium,
Action: ErrorActionRetry,
},
{
Pattern: regexp.MustCompile(`(?i)invalid|validation|bad request`),
Category: ErrorCategoryValidation,
Severity: ErrorSeverityCritical,
Action: ErrorActionFail,
},
{
Pattern: regexp.MustCompile(`(?i)connection refused|network error`),
Category: ErrorCategoryNetwork,
Severity: ErrorSeverityHigh,
Action: ErrorActionRetry,
},
}
断路器模式(Circuit Breaker)
type CircuitBreaker struct {
state CircuitState
failureCount int
successCount int
lastFailureTime time.Time
threshold int
resetTimeout time.Duration
halfOpenMaxCalls int
mu sync.RWMutex
}
type CircuitState int
const (
StateClosed CircuitState = iota // 正常状态
StateOpen // 熔断状态
StateHalfOpen // 半开状态
)
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mu.Lock()
switch cb.state {
case StateOpen:
if time.Since(cb.lastFailureTime) > cb.resetTimeout {
cb.state = StateHalfOpen
cb.failureCount = 0
cb.successCount = 0
} else {
cb.mu.Unlock()
return fmt.Errorf("circuit breaker is open")
}
case StateHalfOpen:
if cb.successCount+cb.failureCount >= cb.halfOpenMaxCalls {
cb.mu.Unlock()
return fmt.Errorf("circuit breaker half-open limit reached")
}
}
cb.mu.Unlock()
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failureCount++
cb.lastFailureTime = time.Now()
if cb.state == StateHalfOpen || cb.failureCount >= cb.threshold {
cb.state = StateOpen
}
return err
}
cb.successCount++
if cb.state == StateHalfOpen && cb.successCount >= cb.halfOpenMaxCalls {
cb.state = StateClosed
cb.failureCount = 0
} else if cb.state == StateClosed {
// 连续成功,重置计数
if cb.successCount > cb.threshold {
cb.failureCount = 0
}
}
return nil
}
性能优化策略
1. 步骤并行化(Pipeline Parallelism)
当 Sequential 中的某些步骤处理的数据相互独立时,可以引入微并行:
func (sw *SequentialWorkflow) executeWithMicroParallelism(
ctx context.Context,
steps []Step,
data *PipelineData,
) (*PipelineData, error) {
// 检测可并行执行的步骤组
groups := sw.detectParallelGroups(steps)
currentData := data
for _, group := range groups {
if len(group) == 1 {
// 单步骤,顺序执行
result, err := sw.executeStep(ctx, group[0], currentData)
if err != nil {
return nil, err
}
currentData = result
} else {
// 多步骤,并行执行
results := make([]*PipelineData, len(group))
errs := make([]error, len(group))
var wg sync.WaitGroup
for i, step := range group {
wg.Add(1)
go func(idx int, s Step) {
defer wg.Done()
results[idx], errs[idx] = sw.executeStep(ctx, s, currentData)
}(i, step)
}
wg.Wait()
// 合并结果
currentData = sw.mergeParallelResults(currentData, results, errs)
}
}
return currentData, nil
}
2. 结果缓存
type ResultCache struct {
backend CacheBackend
ttl time.Duration
keyFunc func(string) string
}
func (c *ResultCache) GetOrExecute(
ctx context.Context,
key string,
fn func() (string, error),
) (string, error) {
cacheKey := c.keyFunc(key)
// 尝试读取缓存
if cached, err := c.backend.Get(cacheKey); err == nil {
return cached, nil
}
// 执行并缓存
result, err := fn()
if err != nil {
return "", err
}
c.backend.Set(cacheKey, result, c.ttl)
return result, nil
}
// 在 Sequential 中应用
func (sw *SequentialWorkflow) executeStepWithCache(
ctx context.Context,
step Step,
input *PipelineData,
) (*PipelineData, error) {
if step.CacheConfig == nil {
return sw.executeStep(ctx, step, input)
}
cacheKey := fmt.Sprintf("%s:%s", step.Name, hashInput(input.Payload.(string)))
result, err := step.CacheConfig.GetOrExecute(ctx, cacheKey, func() (string, error) {
data, err := sw.executeStep(ctx, step, input)
if err != nil {
return "", err
}
return data.Payload.(string), nil
})
if err != nil {
return nil, err
}
return &PipelineData{
Payload: result,
// ... 其他字段
}, nil
}
3. 背压控制(Backpressure)
type BackpressureController struct {
maxInflight int
semaphore chan struct{}
queue chan Task
}
func NewBackpressureController(maxInflight, queueSize int) *BackpressureController {
return &BackpressureController{
maxInflight: maxInflight,
semaphore: make(chan struct{}, maxInflight),
queue: make(chan Task, queueSize),
}
}
func (bc *BackpressureController) Submit(task Task) error {
select {
case bc.queue <- task:
return nil
default:
return fmt.Errorf("queue full, backpressure applied")
}
}
func (bc *BackpressureController) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case task := <-bc.queue:
bc.semaphore <- struct{}{}
go func(t Task) {
defer func() { <-bc.semaphore }()
t.Execute()
}(task)
}
}
}
常见问题深度解析
Q:Sequential 的延迟如何优化?
A:三个层面的优化:
- 模型层面:使用更快的模型(如 gemini-flash 替代 gemini-pro),牺牲少量质量换取速度
- 架构层面:引入微并行,将无依赖的步骤并行化
- 基础设施层面:使用模型缓存(如 GPTCache),对常见输入直接返回缓存结果
实测数据:在 4 步骤的新闻写作流水线中,通过模型降级 + 结果缓存,平均延迟从 45s 降至 18s。
Q:如何处理步骤间的数据格式不兼容?
A:使用适配器模式(Adapter Pattern):
type DataAdapter interface {
Convert(input interface{}) (interface{}, error)
Validate(data interface{}) error
}
// 每个步骤可配置输入/输出适配器
type Step struct {
// ...
InputAdapter DataAdapter
OutputAdapter DataAdapter
}
Q:长时间运行的 Sequential 工作流如何防止状态丢失?
A:实现检查点(Checkpoint)机制:
func (sw *SequentialWorkflow) executeWithCheckpoints(ctx context.Context, input string) error {
checkpoint, err := sw.loadCheckpoint()
if err == nil && checkpoint != nil {
// 从检查点恢复
log.Printf("Resuming from checkpoint at step %d", checkpoint.StepIndex)
sw.resumeFromCheckpoint(checkpoint)
}
for i := checkpoint.StepIndex; i < len(sw.steps); i++ {
// 执行步骤
result, err := sw.executeStep(ctx, sw.steps[i], data)
// 保存检查点
sw.saveCheckpoint(&Checkpoint{
StepIndex: i + 1,
Data: result,
Timestamp: time.Now(),
})
if err != nil {
return err
}
data = result
}
// 完成后清理检查点
sw.clearCheckpoint()
return nil
}
下一步
Sequential 工作流的管道化编排已深入掌握。接下来探索 Parallel 工作流——并行执行模式的并发控制、结果聚合与一致性保障。
← Agent Team 架构 | Parallel 工作流 →
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
