Parallel 工作流深度解析:并发控制、结果聚合与一致性保障
深入剖析 ADK Go Parallel 工作流的并发模型、结果聚合算法、一致性协议与生产级性能调优,涵盖扇出-扇入模式、超时控制与部分失败处理。
Table of Contents
Parallel 工作流深度解析:并发控制、结果聚合与一致性保障
Parallel 工作流是 Agent Team 中提升吞吐量的核心模式。与 Sequential 的管道化不同,Parallel 通过**扇出-扇入(Fan-Out/Fan-In)**模式实现多 Agent 并发执行。然而,并发带来的复杂性——竞态条件、部分失败、结果乱序、资源竞争——需要系统性的工程方案。本文将深入 Parallel 的架构设计与生产实践。
扇出-扇入架构模型
核心原理
输入数据
│
▼
┌───────────────┐
│ Dispatcher │ ← 任务分发器
│ (扇出阶段) │
└───────┬───────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent A │ │ Agent B │ │ Agent C │
│(并发执行) │ │(并发执行) │ │(并发执行) │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────────┼───────────────┘
│
▼
┌───────────────┐
│ Aggregator │ ← 结果聚合器
│ (扇入阶段) │
└───────┬───────┘
│
▼
聚合结果
关键设计决策:
- 分发策略:轮询、随机、哈希、负载感知
- 并发限制:防止资源耗尽
- 超时控制:避免慢 Agent 拖垮整体
- 结果聚合:全等待、部分等待、超时截断
- 错误处理:全失败、部分失败、降级
并发模型选择
// 并发模型枚举
type ConcurrencyModel int
const (
// 无限制并发 - 适合少量 Agent(<5)
ModelUnlimited ConcurrencyModel = iota
// 固定并发池 - 适合中等规模(5-20)
ModelFixedPool
// 动态并发 - 根据负载自动调整
ModelDynamic
// 优先级队列 - 重要任务优先执行
ModelPriorityQueue
)
// 动态并发控制器
type DynamicConcurrencyController struct {
minWorkers int
maxWorkers int
currentWorkers int
taskQueue chan Task
metrics *ConcurrencyMetrics
mu sync.RWMutex
}
func (dcc *DynamicConcurrencyController) adjustWorkers() {
dcc.mu.Lock()
defer dcc.mu.Unlock()
queueDepth := len(dcc.taskQueue)
utilization := dcc.metrics.GetCPUUtilization()
// 扩容条件:队列积压且 CPU 利用率 < 70%
if queueDepth > dcc.currentWorkers*2 && utilization < 0.7 {
newWorkers := min(dcc.currentWorkers*2, dcc.maxWorkers)
dcc.scaleUp(newWorkers - dcc.currentWorkers)
dcc.currentWorkers = newWorkers
}
// 缩容条件:队列空闲且持续 5 分钟
if queueDepth == 0 && dcc.metrics.GetIdleDuration() > 5*time.Minute {
newWorkers := max(dcc.currentWorkers/2, dcc.minWorkers)
dcc.scaleDown(dcc.currentWorkers - newWorkers)
dcc.currentWorkers = newWorkers
}
}
生产级 Parallel 实现
核心结构
type ParallelWorkflow struct {
agents []*agent.Agent // Agent 列表
dispatcher Dispatcher // 任务分发器
aggregator Aggregator // 结果聚合器
concurrencyCtrl ConcurrencyController // 并发控制器
timeout time.Duration // 整体超时
partialFailure bool // 是否允许部分失败
resultTimeout time.Duration // 单结果超时
errorStrategy ErrorStrategy // 错误处理策略
middleware []ParallelMiddleware // 中间件
}
// 分发器接口
type Dispatcher interface {
Dispatch(ctx context.Context, input string, agents []*agent.Agent) ([]*Task, error)
}
// 聚合器接口
type Aggregator interface {
Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error)
}
// Agent 执行结果
type AgentResult struct {
AgentName string
Output string
Error error
Latency time.Duration
Tokens int
Timestamp time.Time
}
// 工作流结果
type WorkflowResult struct {
Output string
PartialResults map[string]*AgentResult
SuccessCount int
FailureCount int
TotalLatency time.Duration
TokenUsage int
}
执行引擎
func (pw *ParallelWorkflow) Execute(ctx context.Context, input string) (*WorkflowResult, error) {
// 1. 创建带超时的上下文
ctx, cancel := context.WithTimeout(ctx, pw.timeout)
defer cancel()
// 2. 分发任务
tasks, err := pw.dispatcher.Dispatch(ctx, input, pw.agents)
if err != nil {
return nil, fmt.Errorf("dispatch failed: %w", err)
}
// 3. 并发执行
resultChan := make(chan *AgentResult, len(tasks))
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
go func(t *Task) {
defer wg.Done()
// 应用并发控制
if err := pw.concurrencyCtrl.Acquire(ctx); err != nil {
resultChan <- &AgentResult{
AgentName: t.Agent.Name(),
Error: fmt.Errorf("acquire concurrency slot: %w", err),
}
return
}
defer pw.concurrencyCtrl.Release()
// 执行 Agent
result := pw.executeAgent(ctx, t)
resultChan <- result
}(task)
}
// 4. 等待所有任务完成或超时
go func() {
wg.Wait()
close(resultChan)
}()
// 5. 收集结果
results := make([]*AgentResult, 0, len(tasks))
for result := range resultChan {
results = append(results, result)
}
// 6. 聚合结果
return pw.aggregator.Aggregate(ctx, results)
}
func (pw *ParallelWorkflow) executeAgent(ctx context.Context, task *Task) *AgentResult {
start := time.Now()
// 单 Agent 超时控制
agentCtx, cancel := context.WithTimeout(ctx, pw.resultTimeout)
defer cancel()
output, err := task.Agent.Run(agentCtx, task.Input)
latency := time.Since(start)
return &AgentResult{
AgentName: task.Agent.Name(),
Output: output,
Error: err,
Latency: latency,
Tokens: estimateTokens(output),
Timestamp: time.Now(),
}
}
结果聚合策略
1. 全等待聚合(All-Wait)
等待所有 Agent 完成,无论成功失败:
type AllWaitAggregator struct {
formatStrategy FormatStrategy
}
func (a *AllWaitAggregator) Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error) {
var output strings.Builder
successCount := 0
failureCount := 0
totalTokens := 0
maxLatency := time.Duration(0)
partialResults := make(map[string]*AgentResult)
for _, result := range results {
partialResults[result.AgentName] = result
if result.Error != nil {
failureCount++
output.WriteString(fmt.Sprintf("\n## %s (失败)\n错误: %v\n",
result.AgentName, result.Error))
} else {
successCount++
output.WriteString(fmt.Sprintf("\n## %s\n%s\n",
result.AgentName, result.Output))
totalTokens += result.Tokens
}
if result.Latency > maxLatency {
maxLatency = result.Latency
}
}
// 如果有失败且不允许部分失败,返回错误
if failureCount > 0 && !pw.partialFailure {
return nil, fmt.Errorf("%d agents failed", failureCount)
}
return &WorkflowResult{
Output: output.String(),
PartialResults: partialResults,
SuccessCount: successCount,
FailureCount: failureCount,
TotalLatency: maxLatency, // Parallel 的总延迟 = 最慢的 Agent
TokenUsage: totalTokens,
}, nil
}
2. 超时截断聚合(Timeout-Truncate)
在指定时间内返回已完成的 Agent 结果:
type TimeoutTruncateAggregator struct {
waitTimeout time.Duration
}
func (a *TimeoutTruncateAggregator) Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error) {
ctx, cancel := context.WithTimeout(ctx, a.waitTimeout)
defer cancel()
collected := make([]*AgentResult, 0)
resultChan := make(chan *AgentResult, len(results))
// 将结果发送到 channel
for _, r := range results {
resultChan <- r
}
close(resultChan)
// 在超时前尽可能多地收集
for {
select {
case result, ok := <-resultChan:
if !ok {
goto done
}
collected = append(collected, result)
case <-ctx.Done():
goto done
}
}
done:
// 对未收集到的结果标记为超时
collectedMap := make(map[string]bool)
for _, r := range collected {
collectedMap[r.AgentName] = true
}
for _, r := range results {
if !collectedMap[r.AgentName] {
collected = append(collected, &AgentResult{
AgentName: r.AgentName,
Error: fmt.Errorf("aggregation timeout"),
})
}
}
return a.formatResult(collected)
}
3. 智能聚合(Smart Aggregation)
基于结果质量的智能选择:
type SmartAggregator struct {
qualityThreshold float64
selector ResultSelector
}
func (a *SmartAggregator) Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error) {
// 过滤出成功的结果
successful := make([]*AgentResult, 0)
for _, r := range results {
if r.Error == nil {
successful = append(successful, r)
}
}
if len(successful) == 0 {
return nil, fmt.Errorf("all agents failed")
}
// 评估每个结果的质量
scored := make([]*ScoredResult, len(successful))
for i, r := range successful {
score := a.evaluateQuality(r)
scored[i] = &ScoredResult{
Result: r,
Score: score,
}
}
// 按质量排序
sort.Slice(scored, func(i, j int) bool {
return scored[i].Score > scored[j].Score
})
// 选择质量最高的结果,或合并多个高质量结果
if scored[0].Score >= a.qualityThreshold {
// 单个结果质量足够高,直接返回
return &WorkflowResult{
Output: scored[0].Result.Output,
SuccessCount: 1,
TotalLatency: scored[0].Result.Latency,
}, nil
}
// 合并前 N 个结果
topResults := a.selectTopResults(scored, 3)
merged := a.mergeResults(topResults)
return &WorkflowResult{
Output: merged,
SuccessCount: len(topResults),
}, nil
}
func (a *SmartAggregator) evaluateQuality(result *AgentResult) float64 {
// 多维度质量评估
lengthScore := min(float64(len(result.Output))/1000.0, 1.0) // 长度适中
latencyScore := 1.0 / (1.0 + float64(result.Latency.Seconds())) // 延迟越低越好
// 内容质量(通过 LLM 评估)
contentScore := a.assessContentQuality(result.Output)
// 加权综合
return 0.2*lengthScore + 0.3*latencyScore + 0.5*contentScore
}
实战场景:多数据源实时分析
金融数据分析系统
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/google/adk-go/agent"
"github.com/google/adk-go/team"
"github.com/google/adk-go/tool"
)
// FinancialAnalysisSystem 金融数据分析系统
type FinancialAnalysisSystem struct {
workflow *team.ParallelWorkflow
}
func NewFinancialAnalysisSystem(model agent.Model) (*FinancialAnalysisSystem, error) {
// 1. 股票数据 Agent
stockAgent, err := agent.New(agent.Config{
Name: "stock-analyst",
Model: model,
Instruction: `你是股票分析专家。分析给定股票的技术指标和趋势。
输出格式:
- 当前价格与涨跌幅
- 关键技术指标(MA、RSI、MACD)
- 短期趋势判断(上涨/下跌/震荡)
- 风险提示`,
Tools: []tool.Tool{
tool.NewStockPriceTool(),
tool.NewTechnicalIndicatorTool(),
},
Timeout: 20 * time.Second,
})
if err != nil {
return nil, err
}
// 2. 新闻情绪 Agent
newsAgent, err := agent.New(agent.Config{
Name: "news-analyst",
Model: model,
Instruction: `你是新闻情绪分析专家。分析近期相关新闻的市场情绪。
输出格式:
- 新闻摘要(最近 5 条)
- 情绪评分(-1 到 +1)
- 关键事件影响分析
- 情绪趋势变化`,
Tools: []tool.Tool{
tool.NewNewsSearchTool(),
tool.NewSentimentAnalysisTool(),
},
Timeout: 25 * time.Second,
})
if err != nil {
return nil, err
}
// 3. 宏观经济 Agent
macroAgent, err := agent.New(agent.Config{
Name: "macro-analyst",
Model: model,
Instruction: `你是宏观经济分析师。分析影响市场的宏观因素。
输出格式:
- 利率政策影响
- 通胀数据解读
- 行业政策变化
- 国际市场联动`,
Tools: []tool.Tool{
tool.NewEconomicDataTool(),
tool.NewPolicyTrackerTool(),
},
Timeout: 20 * time.Second,
})
if err != nil {
return nil, err
}
// 4. 竞品分析 Agent
competitorAgent, err := agent.New(agent.Config{
Name: "competitor-analyst",
Model: model,
Instruction: `你是行业竞争分析师。分析同行业竞争对手动态。
输出格式:
- 主要竞争对手近期动作
- 市场份额变化
- 产品/服务对比
- 竞争格局评估`,
Tools: []tool.Tool{
tool.NewCompetitorTrackerTool(),
},
Timeout: 20 * time.Second,
})
if err != nil {
return nil, err
}
// 构建 Parallel 工作流
workflow := team.NewParallelWorkflow(
team.WithAgents(stockAgent, newsAgent, macroAgent, competitorAgent),
team.WithConcurrencyLimit(4), // 最多 4 个并发
team.WithTimeout(30 * time.Second), // 整体 30 秒超时
team.WithResultTimeout(25 * time.Second), // 单个结果 25 秒超时
team.WithPartialFailure(true), // 允许部分失败
team.WithAggregator(&FinancialReportAggregator{}),
team.WithDispatcher(&LoadAwareDispatcher{}),
)
return &FinancialAnalysisSystem{workflow: workflow}, nil
}
// FinancialReportAggregator 金融报告聚合器
type FinancialReportAggregator struct{}
func (a *FinancialReportAggregator) Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error) {
report := &FinancialReport{
GeneratedAt: time.Now(),
Sections: make(map[string]string),
}
for _, result := range results {
if result.Error != nil {
report.Sections[result.AgentName] = fmt.Sprintf("【数据获取失败】%v", result.Error)
continue
}
report.Sections[result.AgentName] = result.Output
}
// 生成综合分析
synthesis := a.generateSynthesis(report.Sections)
report.Synthesis = synthesis
output := fmt.Sprintf(`# 金融数据分析报告
生成时间: %s
## 综合分析
%s
## 详细数据
### 股票技术分析
%s
### 新闻情绪分析
%s
### 宏观经济分析
%s
### 竞品分析
%s
---
*报告由多 Agent 并行分析生成,总耗时: %v*`,
report.GeneratedAt.Format("2006-01-02 15:04:05"),
report.Synthesis,
report.Sections["stock-analyst"],
report.Sections["news-analyst"],
report.Sections["macro-analyst"],
report.Sections["competitor-analyst"],
calculateMaxLatency(results),
)
return &WorkflowResult{
Output: output,
SuccessCount: countSuccesses(results),
FailureCount: countFailures(results),
}, nil
}
func (a *FinancialReportAggregator) generateSynthesis(sections map[string]string) string {
// 基于各 Agent 结果生成综合分析
// 实际实现中可调用另一个 LLM Agent 进行汇总
var synthesis strings.Builder
synthesis.WriteString("基于多维度分析,当前市场呈现以下特征:\n\n")
// 提取关键信号
signals := a.extractSignals(sections)
for _, signal := range signals {
synthesis.WriteString(fmt.Sprintf("- %s\n", signal))
}
return synthesis.String()
}
// LoadAwareDispatcher 负载感知分发器
type LoadAwareDispatcher struct {
loadBalancer *LoadBalancer
}
func (d *LoadAwareDispatcher) Dispatch(ctx context.Context, input string, agents []*agent.Agent) ([]*Task, error) {
tasks := make([]*Task, len(agents))
for i, agent := range agents {
// 根据 Agent 当前负载调整输入
load := d.loadBalancer.GetLoad(agent.Name())
taskInput := input
if load > 0.8 {
// 高负载时简化输入,减少处理量
taskInput = d.simplifyInput(input)
}
tasks[i] = &Task{
Agent: agent,
Input: fmt.Sprintf("分析标的: %s\n\n%s", taskInput, agent.Instruction()),
}
}
return tasks, nil
}
func main() {
ctx := context.Background()
system, err := NewFinancialAnalysisSystem(model)
if err != nil {
log.Fatalf("Failed to create system: %v", err)
}
result, err := system.workflow.Execute(ctx, "贵州茅台(600519)")
if err != nil {
log.Fatalf("Analysis failed: %v", err)
}
fmt.Println(result.Output)
fmt.Printf("\n成功率: %d/%d\n", result.SuccessCount, result.SuccessCount+result.FailureCount)
}
部分失败处理策略
优雅降级模式
type DegradationStrategy struct {
levels []DegradationLevel
}
type DegradationLevel struct {
Name string
Condition func(*WorkflowResult) bool
Action func(*WorkflowResult) *WorkflowResult
}
// 三级降级策略
var DefaultDegradationStrategy = &DegradationStrategy{
levels: []DegradationLevel{
{
Name: "Level 1 - 数据补全",
Condition: func(r *WorkflowResult) bool {
return r.FailureCount > 0 && r.FailureCount <= len(r.PartialResults)/2
},
Action: func(r *WorkflowResult) *WorkflowResult {
// 使用缓存或默认值补全缺失数据
for name, result := range r.PartialResults {
if result.Error != nil {
cached := getCachedResult(name)
if cached != "" {
result.Output = cached
result.Error = nil
r.SuccessCount++
r.FailureCount--
}
}
}
return r
},
},
{
Name: "Level 2 - 简化输出",
Condition: func(r *WorkflowResult) bool {
return r.SuccessCount > 0 && r.FailureCount > len(r.PartialResults)/2
},
Action: func(r *WorkflowResult) *WorkflowResult {
// 只保留成功的结果,生成简化报告
var output strings.Builder
output.WriteString("【部分服务不可用,以下为可用数据】\n\n")
for name, result := range r.PartialResults {
if result.Error == nil {
output.WriteString(fmt.Sprintf("## %s\n%s\n\n", name, result.Output))
}
}
r.Output = output.String()
return r
},
},
{
Name: "Level 3 - 完全降级",
Condition: func(r *WorkflowResult) bool {
return r.SuccessCount == 0
},
Action: func(r *WorkflowResult) *WorkflowResult {
r.Output = "【系统暂时不可用,请稍后重试】"
return r
},
},
},
}
竞态条件与数据安全
状态隔离
// 每个 Agent 获得独立的执行上下文
type IsolatedContext struct {
AgentID string
Input string
State map[string]interface{}
CancelFunc context.CancelFunc
}
func (pw *ParallelWorkflow) executeWithIsolation(ctx context.Context, agent *agent.Agent, input string) (*AgentResult, error) {
// 创建隔离上下文
isolatedCtx, cancel := context.WithCancel(ctx)
defer cancel()
// 状态隔离 - 深拷贝避免共享
isolatedState := deepCopy(pw.sharedState)
// 执行 Agent
result, err := agent.Run(isolatedCtx, input, agent.WithState(isolatedState))
// 合并状态(线程安全)
if err == nil {
pw.mergeStateSafely(agent.Name(), isolatedState)
}
return result, err
}
func (pw *ParallelWorkflow) mergeStateSafely(agentName string, state map[string]interface{}) {
pw.stateMu.Lock()
defer pw.stateMu.Unlock()
for key, value := range state {
qualifiedKey := fmt.Sprintf("%s.%s", agentName, key)
pw.sharedState[qualifiedKey] = value
}
}
性能调优指南
1. 并发数调优
// 基于 CPU 核心数和模型 API 限流计算最优并发数
func calculateOptimalConcurrency() int {
cpuCores := runtime.NumCPU()
apiRateLimit := 100 // 假设 API 限流 100 req/s
agentCount := 4
// 考虑网络 I/O 等待,并发数可以超过 CPU 核心数
ioMultiplier := 3
// 受限于 API 限流
apiLimit := apiRateLimit / agentCount
return min(cpuCores*ioMultiplier, apiLimit)
}
2. 连接池优化
type ModelConnectionPool struct {
clients chan *ModelClient
maxSize int
}
func NewConnectionPool(size int) *ModelConnectionPool {
pool := &ModelConnectionPool{
clients: make(chan *ModelClient, size),
maxSize: size,
}
// 预热连接
for i := 0; i < size; i++ {
client, err := createModelClient()
if err != nil {
log.Printf("Failed to create client %d: %v", i, err)
continue
}
pool.clients <- client
}
return pool
}
func (p *ModelConnectionPool) Acquire(ctx context.Context) (*ModelClient, error) {
select {
case client := <-p.clients:
return client, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (p *ModelConnectionPool) Release(client *ModelClient) {
select {
case p.clients <- client:
default:
// 池已满,关闭连接
client.Close()
}
}
常见问题深度解析
Q:Parallel 工作流中某个 Agent 慢怎么办?
A:三层防护:
- 单 Agent 超时:设置
resultTimeout,防止单个 Agent 拖垮整体 - 熔断机制:对持续失败的 Agent 触发熔断,快速失败
- 异步回退:对超时 Agent 返回占位符,后台继续执行,完成后通过回调更新
Q:结果顺序如何保证?
A:使用有序聚合器:
type OrderedAggregator struct {
agentOrder []string
}
func (a *OrderedAggregator) Aggregate(ctx context.Context, results []*AgentResult) (*WorkflowResult, error) {
// 按预定义顺序排列结果
ordered := make([]*AgentResult, len(a.agentOrder))
resultMap := make(map[string]*AgentResult)
for _, r := range results {
resultMap[r.AgentName] = r
}
for i, name := range a.agentOrder {
ordered[i] = resultMap[name]
}
return a.formatOrdered(ordered)
}
Q:如何避免 thundering herd(惊群效应)?
A:引入抖动(Jitter)和令牌桶限流:
func (pw *ParallelWorkflow) executeWithJitter(ctx context.Context, tasks []*Task) {
for i, task := range tasks {
// 添加随机抖动,避免同时触发
jitter := time.Duration(rand.Intn(1000)) * time.Millisecond
go func(t *Task, delay time.Duration) {
time.Sleep(delay)
pw.executeAgent(ctx, t)
}(task, time.Duration(i)*100*time.Millisecond+jitter)
}
}
下一步
Parallel 工作流的并发控制与结果聚合已深入掌握。接下来探索 Loop 工作流——循环执行模式的迭代优化、终止条件与收敛性保障。
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
