Parallel 工作流深度解析:并发控制、结果聚合与一致性保障

Parallel 工作流是 Agent Team 中提升吞吐量的核心模式。与 Sequential 的管道化不同,Parallel 通过**扇出-扇入(Fan-Out/Fan-In)**模式实现多 Agent 并发执行。然而,并发带来的复杂性——竞态条件、部分失败、结果乱序、资源竞争——需要系统性的工程方案。本文将深入 Parallel 的架构设计与生产实践。

扇出-扇入架构模型

核心原理

                    输入数据
                ┌───────────────┐
                │   Dispatcher   │ ← 任务分发器
                │   (扇出阶段)    │
                └───────┬───────┘
        ┌───────────────┼───────────────┐
        │               │               │
        ▼               ▼               ▼
   ┌─────────┐    ┌─────────┐    ┌─────────┐
   │ Agent A │    │ Agent B │    │ Agent C │
   │(并发执行) │    │(并发执行) │    │(并发执行) │
   └────┬────┘    └────┬────┘    └────┬────┘
        │               │               │
        └───────────────┼───────────────┘
                ┌───────────────┐
                │   Aggregator   │ ← 结果聚合器
                │   (扇入阶段)    │
                └───────┬───────┘
                    聚合结果

关键设计决策

  1. 分发策略:轮询、随机、哈希、负载感知
  2. 并发限制:防止资源耗尽
  3. 超时控制:避免慢 Agent 拖垮整体
  4. 结果聚合:全等待、部分等待、超时截断
  5. 错误处理:全失败、部分失败、降级

并发模型选择

// 并发模型枚举
type ConcurrencyModel int
const (
    // 无限制并发 - 适合少量 Agent(<5)
    ModelUnlimited ConcurrencyModel = iota
    
    // 固定并发池 - 适合中等规模(5-20)
    ModelFixedPool
    
    // 动态并发 - 根据负载自动调整
    ModelDynamic
    
    // 优先级队列 - 重要任务优先执行
    ModelPriorityQueue
)

// 动态并发控制器
type DynamicConcurrencyController struct {
    minWorkers    int
    maxWorkers    int
    currentWorkers int
    taskQueue     chan Task
    metrics       *ConcurrencyMetrics
    mu            sync.RWMutex
}

func (dcc *DynamicConcurrencyController) adjustWorkers() {
    dcc.mu.Lock()
    defer dcc.mu.Unlock()
    
    queueDepth := len(dcc.taskQueue)
    utilization := dcc.metrics.GetCPUUtilization()
    
    // 扩容条件:队列积压且 CPU 利用率 < 70%
    if queueDepth > dcc.currentWorkers*2 && utilization < 0.7 {
        newWorkers := min(dcc.currentWorkers*2, dcc.maxWorkers)
        dcc.scaleUp(newWorkers - dcc.currentWorkers)
        dcc.currentWorkers = newWorkers
    }
    
    // 缩容条件:队列空闲且持续 5 分钟
    if queueDepth == 0 && dcc.metrics.GetIdleDuration() > 5*time.Minute {
        newWorkers := max(dcc.currentWorkers/2, dcc.minWorkers)
        dcc.scaleDown(dcc.currentWorkers - newWorkers)
        dcc.currentWorkers = newWorkers
    }
}

生产级 Parallel 实现

核心结构

type ParallelWorkflow struct {
    agents          []*agent.Agent        // Agent 列表
    dispatcher      Dispatcher            // 任务分发器
    aggregator      Aggregator            // 结果聚合器
    concurrencyCtrl ConcurrencyController // 并发控制器
    timeout         time.Duration         // 整体超时
    partialFailure  bool                  // 是否允许部分失败
    resultTimeout   time.Duration         // 单结果超时
    errorStrategy   ErrorStrategy         // 错误处理策略
    middleware      []ParallelMiddleware  // 中间件
}

// 分发器接口
type Dispatcher interface {
    Dispatch(ctx context.Context, input string, agents []*agent.Agent) ([]*Task, error)
}

// 聚合器接口
type Aggregator interface {
    Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error)
}

// Agent 执行结果
type AgentResult struct {
    AgentName string
    Output    string
    Error     error
    Latency   time.Duration
    Tokens    int
    Timestamp time.Time
}

// 工作流结果
type WorkflowResult struct {
    Output        string
    PartialResults map[string]*AgentResult
    SuccessCount  int
    FailureCount  int
    TotalLatency  time.Duration
    TokenUsage    int
}

执行引擎

func (pw *ParallelWorkflow) Execute(ctx context.Context, input string) (*WorkflowResult, error) {
    // 1. 创建带超时的上下文
    ctx, cancel := context.WithTimeout(ctx, pw.timeout)
    defer cancel()
    
    // 2. 分发任务
    tasks, err := pw.dispatcher.Dispatch(ctx, input, pw.agents)
    if err != nil {
        return nil, fmt.Errorf("dispatch failed: %w", err)
    }
    
    // 3. 并发执行
    resultChan := make(chan *AgentResult, len(tasks))
    var wg sync.WaitGroup
    
    for _, task := range tasks {
        wg.Add(1)
        go func(t *Task) {
            defer wg.Done()
            
            // 应用并发控制
            if err := pw.concurrencyCtrl.Acquire(ctx); err != nil {
                resultChan <- &AgentResult{
                    AgentName: t.Agent.Name(),
                    Error:     fmt.Errorf("acquire concurrency slot: %w", err),
                }
                return
            }
            defer pw.concurrencyCtrl.Release()
            
            // 执行 Agent
            result := pw.executeAgent(ctx, t)
            resultChan <- result
        }(task)
    }
    
    // 4. 等待所有任务完成或超时
    go func() {
        wg.Wait()
        close(resultChan)
    }()
    
    // 5. 收集结果
    results := make([]*AgentResult, 0, len(tasks))
    for result := range resultChan {
        results = append(results, result)
    }
    
    // 6. 聚合结果
    return pw.aggregator.Aggregate(ctx, results)
}

func (pw *ParallelWorkflow) executeAgent(ctx context.Context, task *Task) *AgentResult {
    start := time.Now()
    
    // 单 Agent 超时控制
    agentCtx, cancel := context.WithTimeout(ctx, pw.resultTimeout)
    defer cancel()
    
    output, err := task.Agent.Run(agentCtx, task.Input)
    
    latency := time.Since(start)
    
    return &AgentResult{
        AgentName: task.Agent.Name(),
        Output:    output,
        Error:     err,
        Latency:   latency,
        Tokens:    estimateTokens(output),
        Timestamp: time.Now(),
    }
}

结果聚合策略

1. 全等待聚合(All-Wait)

等待所有 Agent 完成,无论成功失败:

type AllWaitAggregator struct {
    formatStrategy FormatStrategy
}

func (a *AllWaitAggregator) Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error) {
    var output strings.Builder
    successCount := 0
    failureCount := 0
    totalTokens := 0
    maxLatency := time.Duration(0)
    
    partialResults := make(map[string]*AgentResult)
    
    for _, result := range results {
        partialResults[result.AgentName] = result
        
        if result.Error != nil {
            failureCount++
            output.WriteString(fmt.Sprintf("\n## %s (失败)\n错误: %v\n", 
                result.AgentName, result.Error))
        } else {
            successCount++
            output.WriteString(fmt.Sprintf("\n## %s\n%s\n", 
                result.AgentName, result.Output))
            totalTokens += result.Tokens
        }
        
        if result.Latency > maxLatency {
            maxLatency = result.Latency
        }
    }
    
    // 如果有失败且不允许部分失败,返回错误
    if failureCount > 0 && !pw.partialFailure {
        return nil, fmt.Errorf("%d agents failed", failureCount)
    }
    
    return &WorkflowResult{
        Output:         output.String(),
        PartialResults: partialResults,
        SuccessCount:   successCount,
        FailureCount:   failureCount,
        TotalLatency:   maxLatency, // Parallel 的总延迟 = 最慢的 Agent
        TokenUsage:     totalTokens,
    }, nil
}

2. 超时截断聚合(Timeout-Truncate)

在指定时间内返回已完成的 Agent 结果:

type TimeoutTruncateAggregator struct {
    waitTimeout time.Duration
}

func (a *TimeoutTruncateAggregator) Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error) {
    ctx, cancel := context.WithTimeout(ctx, a.waitTimeout)
    defer cancel()
    
    collected := make([]*AgentResult, 0)
    resultChan := make(chan *AgentResult, len(results))
    
    // 将结果发送到 channel
    for _, r := range results {
        resultChan <- r
    }
    close(resultChan)
    
    // 在超时前尽可能多地收集
    for {
        select {
        case result, ok := <-resultChan:
            if !ok {
                goto done
            }
            collected = append(collected, result)
        case <-ctx.Done():
            goto done
        }
    }
    
done:
    // 对未收集到的结果标记为超时
    collectedMap := make(map[string]bool)
    for _, r := range collected {
        collectedMap[r.AgentName] = true
    }
    
    for _, r := range results {
        if !collectedMap[r.AgentName] {
            collected = append(collected, &AgentResult{
                AgentName: r.AgentName,
                Error:     fmt.Errorf("aggregation timeout"),
            })
        }
    }
    
    return a.formatResult(collected)
}

3. 智能聚合(Smart Aggregation)

基于结果质量的智能选择:

type SmartAggregator struct {
    qualityThreshold float64
    selector         ResultSelector
}

func (a *SmartAggregator) Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error) {
    // 过滤出成功的结果
    successful := make([]*AgentResult, 0)
    for _, r := range results {
        if r.Error == nil {
            successful = append(successful, r)
        }
    }
    
    if len(successful) == 0 {
        return nil, fmt.Errorf("all agents failed")
    }
    
    // 评估每个结果的质量
    scored := make([]*ScoredResult, len(successful))
    for i, r := range successful {
        score := a.evaluateQuality(r)
        scored[i] = &ScoredResult{
            Result: r,
            Score:  score,
        }
    }
    
    // 按质量排序
    sort.Slice(scored, func(i, j int) bool {
        return scored[i].Score > scored[j].Score
    })
    
    // 选择质量最高的结果,或合并多个高质量结果
    if scored[0].Score >= a.qualityThreshold {
        // 单个结果质量足够高,直接返回
        return &WorkflowResult{
            Output:       scored[0].Result.Output,
            SuccessCount: 1,
            TotalLatency: scored[0].Result.Latency,
        }, nil
    }
    
    // 合并前 N 个结果
    topResults := a.selectTopResults(scored, 3)
    merged := a.mergeResults(topResults)
    
    return &WorkflowResult{
        Output:       merged,
        SuccessCount: len(topResults),
    }, nil
}

func (a *SmartAggregator) evaluateQuality(result *AgentResult) float64 {
    // 多维度质量评估
    lengthScore := min(float64(len(result.Output))/1000.0, 1.0) // 长度适中
    latencyScore := 1.0 / (1.0 + float64(result.Latency.Seconds())) // 延迟越低越好
    
    // 内容质量(通过 LLM 评估)
    contentScore := a.assessContentQuality(result.Output)
    
    // 加权综合
    return 0.2*lengthScore + 0.3*latencyScore + 0.5*contentScore
}

实战场景:多数据源实时分析

金融数据分析系统

package main

import (
    "context"
    "fmt"
    "log"
    "time"
    
    "github.com/google/adk-go/agent"
    "github.com/google/adk-go/team"
    "github.com/google/adk-go/tool"
)

// FinancialAnalysisSystem 金融数据分析系统
type FinancialAnalysisSystem struct {
    workflow *team.ParallelWorkflow
}

func NewFinancialAnalysisSystem(model agent.Model) (*FinancialAnalysisSystem, error) {
    // 1. 股票数据 Agent
    stockAgent, err := agent.New(agent.Config{
        Name:        "stock-analyst",
        Model:       model,
        Instruction: `你是股票分析专家。分析给定股票的技术指标和趋势。
输出格式:
- 当前价格与涨跌幅
- 关键技术指标(MA、RSI、MACD)
- 短期趋势判断(上涨/下跌/震荡)
- 风险提示`,
        Tools: []tool.Tool{
            tool.NewStockPriceTool(),
            tool.NewTechnicalIndicatorTool(),
        },
        Timeout: 20 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    
    // 2. 新闻情绪 Agent
    newsAgent, err := agent.New(agent.Config{
        Name:        "news-analyst",
        Model:       model,
        Instruction: `你是新闻情绪分析专家。分析近期相关新闻的市场情绪。
输出格式:
- 新闻摘要(最近 5 条)
- 情绪评分(-1 到 +1)
- 关键事件影响分析
- 情绪趋势变化`,
        Tools: []tool.Tool{
            tool.NewNewsSearchTool(),
            tool.NewSentimentAnalysisTool(),
        },
        Timeout: 25 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    
    // 3. 宏观经济 Agent
    macroAgent, err := agent.New(agent.Config{
        Name:        "macro-analyst",
        Model:       model,
        Instruction: `你是宏观经济分析师。分析影响市场的宏观因素。
输出格式:
- 利率政策影响
- 通胀数据解读
- 行业政策变化
- 国际市场联动`,
        Tools: []tool.Tool{
            tool.NewEconomicDataTool(),
            tool.NewPolicyTrackerTool(),
        },
        Timeout: 20 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    
    // 4. 竞品分析 Agent
    competitorAgent, err := agent.New(agent.Config{
        Name:        "competitor-analyst",
        Model:       model,
        Instruction: `你是行业竞争分析师。分析同行业竞争对手动态。
输出格式:
- 主要竞争对手近期动作
- 市场份额变化
- 产品/服务对比
- 竞争格局评估`,
        Tools: []tool.Tool{
            tool.NewCompetitorTrackerTool(),
        },
        Timeout: 20 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    
    // 构建 Parallel 工作流
    workflow := team.NewParallelWorkflow(
        team.WithAgents(stockAgent, newsAgent, macroAgent, competitorAgent),
        team.WithConcurrencyLimit(4),              // 最多 4 个并发
        team.WithTimeout(30 * time.Second),        // 整体 30 秒超时
        team.WithResultTimeout(25 * time.Second),  // 单个结果 25 秒超时
        team.WithPartialFailure(true),             // 允许部分失败
        team.WithAggregator(&FinancialReportAggregator{}),
        team.WithDispatcher(&LoadAwareDispatcher{}),
    )
    
    return &FinancialAnalysisSystem{workflow: workflow}, nil
}

// FinancialReportAggregator 金融报告聚合器
type FinancialReportAggregator struct{}

func (a *FinancialReportAggregator) Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error) {
    report := &FinancialReport{
        GeneratedAt: time.Now(),
        Sections:    make(map[string]string),
    }
    
    for _, result := range results {
        if result.Error != nil {
            report.Sections[result.AgentName] = fmt.Sprintf("【数据获取失败】%v", result.Error)
            continue
        }
        report.Sections[result.AgentName] = result.Output
    }
    
    // 生成综合分析
    synthesis := a.generateSynthesis(report.Sections)
    report.Synthesis = synthesis
    
    output := fmt.Sprintf(`# 金融数据分析报告
生成时间: %s

## 综合分析
%s

## 详细数据

### 股票技术分析
%s

### 新闻情绪分析
%s

### 宏观经济分析
%s

### 竞品分析
%s

---
*报告由多 Agent 并行分析生成,总耗时: %v*`,
        report.GeneratedAt.Format("2006-01-02 15:04:05"),
        report.Synthesis,
        report.Sections["stock-analyst"],
        report.Sections["news-analyst"],
        report.Sections["macro-analyst"],
        report.Sections["competitor-analyst"],
        calculateMaxLatency(results),
    )
    
    return &WorkflowResult{
        Output:       output,
        SuccessCount: countSuccesses(results),
        FailureCount: countFailures(results),
    }, nil
}

func (a *FinancialReportAggregator) generateSynthesis(sections map[string]string) string {
    // 基于各 Agent 结果生成综合分析
    // 实际实现中可调用另一个 LLM Agent 进行汇总
    var synthesis strings.Builder
    synthesis.WriteString("基于多维度分析,当前市场呈现以下特征:\n\n")
    
    // 提取关键信号
    signals := a.extractSignals(sections)
    for _, signal := range signals {
        synthesis.WriteString(fmt.Sprintf("- %s\n", signal))
    }
    
    return synthesis.String()
}

// LoadAwareDispatcher 负载感知分发器
type LoadAwareDispatcher struct {
    loadBalancer *LoadBalancer
}

func (d *LoadAwareDispatcher) Dispatch(ctx context.Context, input string, agents []*agent.Agent) ([]*Task, error) {
    tasks := make([]*Task, len(agents))
    
    for i, agent := range agents {
        // 根据 Agent 当前负载调整输入
        load := d.loadBalancer.GetLoad(agent.Name())
        
        taskInput := input
        if load > 0.8 {
            // 高负载时简化输入,减少处理量
            taskInput = d.simplifyInput(input)
        }
        
        tasks[i] = &Task{
            Agent: agent,
            Input: fmt.Sprintf("分析标的: %s\n\n%s", taskInput, agent.Instruction()),
        }
    }
    
    return tasks, nil
}

func main() {
    ctx := context.Background()
    
    system, err := NewFinancialAnalysisSystem(model)
    if err != nil {
        log.Fatalf("Failed to create system: %v", err)
    }
    
    result, err := system.workflow.Execute(ctx, "贵州茅台(600519)")
    if err != nil {
        log.Fatalf("Analysis failed: %v", err)
    }
    
    fmt.Println(result.Output)
    fmt.Printf("\n成功率: %d/%d\n", result.SuccessCount, result.SuccessCount+result.FailureCount)
}

部分失败处理策略

优雅降级模式

type DegradationStrategy struct {
    levels []DegradationLevel
}

type DegradationLevel struct {
    Name        string
    Condition   func(*WorkflowResult) bool
    Action      func(*WorkflowResult) *WorkflowResult
}

// 三级降级策略
var DefaultDegradationStrategy = &DegradationStrategy{
    levels: []DegradationLevel{
        {
            Name: "Level 1 - 数据补全",
            Condition: func(r *WorkflowResult) bool {
                return r.FailureCount > 0 && r.FailureCount <= len(r.PartialResults)/2
            },
            Action: func(r *WorkflowResult) *WorkflowResult {
                // 使用缓存或默认值补全缺失数据
                for name, result := range r.PartialResults {
                    if result.Error != nil {
                        cached := getCachedResult(name)
                        if cached != "" {
                            result.Output = cached
                            result.Error = nil
                            r.SuccessCount++
                            r.FailureCount--
                        }
                    }
                }
                return r
            },
        },
        {
            Name: "Level 2 - 简化输出",
            Condition: func(r *WorkflowResult) bool {
                return r.SuccessCount > 0 && r.FailureCount > len(r.PartialResults)/2
            },
            Action: func(r *WorkflowResult) *WorkflowResult {
                // 只保留成功的结果,生成简化报告
                var output strings.Builder
                output.WriteString("【部分服务不可用,以下为可用数据】\n\n")
                for name, result := range r.PartialResults {
                    if result.Error == nil {
                        output.WriteString(fmt.Sprintf("## %s\n%s\n\n", name, result.Output))
                    }
                }
                r.Output = output.String()
                return r
            },
        },
        {
            Name: "Level 3 - 完全降级",
            Condition: func(r *WorkflowResult) bool {
                return r.SuccessCount == 0
            },
            Action: func(r *WorkflowResult) *WorkflowResult {
                r.Output = "【系统暂时不可用,请稍后重试】"
                return r
            },
        },
    },
}

竞态条件与数据安全

状态隔离

// 每个 Agent 获得独立的执行上下文
type IsolatedContext struct {
    AgentID     string
    Input       string
    State       map[string]interface{}
    CancelFunc  context.CancelFunc
}

func (pw *ParallelWorkflow) executeWithIsolation(ctx context.Context, agent *agent.Agent, input string) (*AgentResult, error) {
    // 创建隔离上下文
    isolatedCtx, cancel := context.WithCancel(ctx)
    defer cancel()
    
    // 状态隔离 - 深拷贝避免共享
    isolatedState := deepCopy(pw.sharedState)
    
    // 执行 Agent
    result, err := agent.Run(isolatedCtx, input, agent.WithState(isolatedState))
    
    // 合并状态(线程安全)
    if err == nil {
        pw.mergeStateSafely(agent.Name(), isolatedState)
    }
    
    return result, err
}

func (pw *ParallelWorkflow) mergeStateSafely(agentName string, state map[string]interface{}) {
    pw.stateMu.Lock()
    defer pw.stateMu.Unlock()
    
    for key, value := range state {
        qualifiedKey := fmt.Sprintf("%s.%s", agentName, key)
        pw.sharedState[qualifiedKey] = value
    }
}

性能调优指南

1. 并发数调优

// 基于 CPU 核心数和模型 API 限流计算最优并发数
func calculateOptimalConcurrency() int {
    cpuCores := runtime.NumCPU()
    apiRateLimit := 100 // 假设 API 限流 100 req/s
    agentCount := 4
    
    // 考虑网络 I/O 等待,并发数可以超过 CPU 核心数
    ioMultiplier := 3
    
    // 受限于 API 限流
    apiLimit := apiRateLimit / agentCount
    
    return min(cpuCores*ioMultiplier, apiLimit)
}

2. 连接池优化

type ModelConnectionPool struct {
    clients chan *ModelClient
    maxSize int
}

func NewConnectionPool(size int) *ModelConnectionPool {
    pool := &ModelConnectionPool{
        clients: make(chan *ModelClient, size),
        maxSize: size,
    }
    
    // 预热连接
    for i := 0; i < size; i++ {
        client, err := createModelClient()
        if err != nil {
            log.Printf("Failed to create client %d: %v", i, err)
            continue
        }
        pool.clients <- client
    }
    
    return pool
}

func (p *ModelConnectionPool) Acquire(ctx context.Context) (*ModelClient, error) {
    select {
    case client := <-p.clients:
        return client, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

func (p *ModelConnectionPool) Release(client *ModelClient) {
    select {
    case p.clients <- client:
    default:
        // 池已满,关闭连接
        client.Close()
    }
}

常见问题深度解析

Q:Parallel 工作流中某个 Agent 慢怎么办?

A:三层防护:

  1. 单 Agent 超时:设置 resultTimeout,防止单个 Agent 拖垮整体
  2. 熔断机制:对持续失败的 Agent 触发熔断,快速失败
  3. 异步回退:对超时 Agent 返回占位符,后台继续执行,完成后通过回调更新

Q:结果顺序如何保证?

A:使用有序聚合器:

type OrderedAggregator struct {
    agentOrder []string
}

func (a *OrderedAggregator) Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error) {
    // 按预定义顺序排列结果
    ordered := make([]*AgentResult, len(a.agentOrder))
    resultMap := make(map[string]*AgentResult)
    
    for _, r := range results {
        resultMap[r.AgentName] = r
    }
    
    for i, name := range a.agentOrder {
        ordered[i] = resultMap[name]
    }
    
    return a.formatOrdered(ordered)
}

Q:如何避免 thundering herd(惊群效应)?

A:引入抖动(Jitter)和令牌桶限流:

func (pw *ParallelWorkflow) executeWithJitter(ctx context.Context, tasks []*Task) {
    for i, task := range tasks {
        // 添加随机抖动,避免同时触发
        jitter := time.Duration(rand.Intn(1000)) * time.Millisecond
        
        go func(t *Task, delay time.Duration) {
            time.Sleep(delay)
            pw.executeAgent(ctx, t)
        }(task, time.Duration(i)*100*time.Millisecond+jitter)
    }
}

下一步

Parallel 工作流的并发控制与结果聚合已深入掌握。接下来探索 Loop 工作流——循环执行模式的迭代优化、终止条件与收敛性保障。

Sequential 工作流 | Loop 工作流 →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。