Agent Team 架构深度解析:从单体到分布式协作的设计哲学

在构建复杂 AI 应用时,单体 Agent 的能力边界很快会显现。当任务涉及多领域知识、长流程编排或高并发处理时,Agent Team(多智能体团队)成为必然选择。本文将从架构层面深入剖析 ADK Go 中 Agent Team 的设计哲学、协作模型与生产实践。

单体 Agent 的局限性分析

能力边界与上下文瓶颈

单个 LLM Agent 面临的核心约束包括:

约束维度具体表现影响
上下文窗口模型 token 上限(4K-128K)长流程中历史信息被截断
领域专注度指令过多导致注意力分散多领域任务时表现下降
工具复杂度工具过多增加决策负担工具选择准确率降低
并发处理单线程顺序执行无法并行处理独立子任务
容错能力单点故障一处错误导致整体失败

生产经验:在电商客服场景中,单体 Agent 同时处理订单查询、物流追踪、售后政策三个领域时,工具调用准确率从 92% 下降至 67%。拆分为三个领域 Agent 后,整体准确率回升至 89%。

什么时候必须引入 Agent Team

决策树:是否需要 Agent Team?
        ┌───────────┴───────────┐
        ▼                       ▼
   任务步骤 > 3?            涉及多领域?
        │                       │
   是 ──┤                  是 ──┤
        ▼                       ▼
   需要状态传递?          领域间需要协作?
        │                       │
   是 ──┼──→ 需要 Team      是 ──┼──→ 需要 Team
   否 ──┘                  否 ──┘
        ▼                       ▼
   可用 Sequential          可用 Parallel

Agent Team 的拓扑架构模型

三种核心拓扑

ADK Go 支持三种 Agent Team 拓扑结构,每种适用于不同的业务场景:

1. 星型拓扑(Star Topology)

                    ┌─────────────┐
                    │ Orchestrator │ ← 中央调度器
                    │   (主控 Agent) │
                    └──────┬──────┘
           ┌───────────────┼───────────────┐
           │               │               │
           ▼               ▼               ▼
      ┌─────────┐    ┌─────────┐    ┌─────────┐
      │ Agent A │    │ Agent B │    │ Agent C │
      │(查询专家)│    │(写作专家)│    │(审核专家)│
      └─────────┘    └─────────┘    └─────────┘

适用场景:任务明确需要分阶段处理,Orchestrator 负责状态管理与流程控制。

生产要点

  • Orchestrator 不应包含业务逻辑,只负责调度
  • 避免 Orchestrator 成为性能瓶颈——状态操作必须是 O(1)
  • 建议 Orchestrator 使用轻量级模型(如 gemini-flash),Worker 使用能力更强的模型

2. 网状拓扑(Mesh Topology)

      ┌─────────┐         ┌─────────┐
      │ Agent A │◄───────►│ Agent B │
      │(数据源A) │         │(数据源B) │
      └────┬────┘         └────┬────┘
           │                   │
           └─────────┬─────────┘
              ┌─────────┐
              │ Agent C │
              │(聚合分析) │
              └────┬────┘
              ┌─────────┐
              │ Agent D │
              │(结果输出) │
              └─────────┘

适用场景:数据流复杂、Agent 间需要频繁交换信息的场景,如多源数据分析。

注意事项

  • 网状拓扑的复杂度随 Agent 数量呈 O(n²) 增长
  • 必须定义清晰的消息协议,避免循环依赖
  • 建议引入消息总线(Message Bus)解耦 Agent 间通信

3. 分层拓扑(Hierarchical Topology)

              ┌─────────────┐
              │ 顶层 Orchestrator │
              │  (战略调度)      │
              └──────┬──────┘
        ┌────────────┼────────────┐
        ▼            ▼            ▼
   ┌─────────┐  ┌─────────┐  ┌─────────┐
   │ Team A  │  │ Team B  │  │ Team C  │
   │子Orchestrator│ │子Orchestrator│ │子Orchestrator│
   └────┬────┘  └────┬────┘  └────┬────┘
        │            │            │
   ┌────┴────┐  ┌────┴────┐  ┌────┴────┐
   │A1 │ A2 │  │B1 │ B2 │  │C1 │ C2 │
   └───┴────┘  └───┴────┘  └───┴────┘

适用场景:超大规模 Agent 系统(>10 个 Agent),如企业级智能客服平台。

架构优势

  • 每层只管理 3-5 个子节点,符合认知负荷理论
  • 子团队可独立部署、独立扩展
  • 故障隔离——一个子团队故障不影响其他团队

状态管理与数据一致性

Shared State 架构

ADK Go 的 Agent Team 通过 Shared State 实现数据共享:

// State 的层次结构
type TeamState struct {
    // 全局状态 - 所有 Agent 可读
    Global  map[string]interface{} `json:"global"`
    
    // 局部状态 - 仅特定 Agent 可访问
    Local   map[string]*AgentState `json:"local"`
    
    // 会话状态 - 跨轮次持久化
    Session *SessionState          `json:"session"`
    
    // 元数据 - 调试与监控用
    Meta    *StateMeta             `json:"meta"`
}

type AgentState struct {
    Output    string                 `json:"output"`
    ToolCalls []ToolCallRecord       `json:"tool_calls"`
    Metrics   *ExecutionMetrics      `json:"metrics"`
    Timestamp time.Time              `json:"timestamp"`
}

状态传递的最佳实践

// 生产级状态管理示例
func orchestrateWithState(ctx context.Context, team *Team, input string) (*Result, error) {
    // 1. 初始化状态 - 使用不可变快照避免竞态
    state := team.NewState()
    state.SetGlobal("input", input)
    state.SetGlobal("start_time", time.Now())
    state.SetGlobal("request_id", generateRequestID())
    
    // 2. 执行 Worker Agent - 每个 Worker 获得状态的副本
    for _, worker := range team.Workers {
        workerState := state.Fork() // 创建分支,避免直接修改全局状态
        
        result, err := worker.Run(ctx, workerState)
        if err != nil {
            // 3. 错误处理 - 记录失败但不中断整体流程
            state.SetLocal(worker.Name(), "error", err)
            state.SetLocal(worker.Name(), "status", "failed")
            
            if worker.IsCritical() {
                return nil, fmt.Errorf("critical worker %s failed: %w", worker.Name(), err)
            }
            continue
        }
        
        // 4. 合并结果 - 使用两阶段提交避免部分更新
        if err := state.Merge(worker.Name(), result); err != nil {
            return nil, fmt.Errorf("state merge failed: %w", err)
        }
    }
    
    // 5. 最终校验
    if err := validateFinalState(state); err != nil {
        return nil, fmt.Errorf("validation failed: %w", err)
    }
    
    return state.ToResult(), nil
}

关键设计原则

  1. 不可变状态(Immutable State):Worker 操作状态副本,通过显式 Merge 提交变更
  2. 版本控制:每次状态变更递增版本号,便于追踪和回滚
  3. TTL 管理:状态设置过期时间,防止内存泄漏
  4. 序列化安全:所有状态值必须可 JSON 序列化,便于持久化和调试

容错与重试机制

生产级重试策略

type RetryPolicy struct {
    MaxAttempts     int           // 最大重试次数(建议 3-5)
    InitialBackoff  time.Duration // 初始退避时间(建议 1s)
    MaxBackoff      time.Duration // 最大退避时间(建议 30s)
    BackoffMultiplier float64     // 退避倍数(建议 2.0)
    RetryableErrors []string      // 可重试的错误类型
    CircuitBreaker  *CircuitBreakerConfig // 熔断配置
}

type CircuitBreakerConfig struct {
    FailureThreshold int           // 熔断触发阈值
    ResetTimeout     time.Duration // 熔断后重置时间
    HalfOpenRequests int           // 半开状态测试请求数
}

// 实际应用
retryPolicy := &RetryPolicy{
    MaxAttempts:       3,
    InitialBackoff:    time.Second,
    MaxBackoff:        30 * time.Second,
    BackoffMultiplier: 2.0,
    RetryableErrors:   []string{"rate_limit", "timeout", "service_unavailable"},
    CircuitBreaker: &CircuitBreakerConfig{
        FailureThreshold: 5,
        ResetTimeout:     60 * time.Second,
        HalfOpenRequests: 2,
    },
}

降级策略(Fallback)

当 Agent 持续失败时,系统应优雅降级:

func (t *Team) executeWithFallback(ctx context.Context, agent Agent, input string) (string, error) {
    // 1. 尝试主 Agent
    result, err := t.executeWithRetry(ctx, agent, input, t.primaryPolicy)
    if err == nil {
        return result, nil
    }
    
    // 2. 主 Agent 失败,尝试备用 Agent(简化版)
    if t.fallbackAgent != nil {
        log.Warnf("Primary agent %s failed, falling back to %s", agent.Name(), t.fallbackAgent.Name())
        result, err = t.fallbackAgent.Run(ctx, input)
        if err == nil {
            // 标记降级状态,用于监控告警
            t.metrics.RecordDegradation(agent.Name(), t.fallbackAgent.Name())
            return result, nil
        }
    }
    
    // 3. 备用也失败,返回缓存结果(如果可用)
    if cached := t.cache.Get(input); cached != nil {
        log.Warnf("All agents failed, returning cached result")
        t.metrics.RecordCacheHit(input)
        return cached.(string), nil
    }
    
    // 4. 全部失败,返回友好错误
    return "", fmt.Errorf("service temporarily unavailable, please retry later")
}

性能优化与资源管理

并发控制

// 使用信号量限制并发数,防止资源耗尽
type ConcurrencyLimiter struct {
    sem chan struct{}
}

func NewConcurrencyLimiter(maxConcurrent int) *ConcurrencyLimiter {
    return &ConcurrencyLimiter{
        sem: make(chan struct{}, maxConcurrent),
    }
}

func (c *ConcurrencyLimiter) Execute(ctx context.Context, fn func() error) error {
    select {
    case c.sem <- struct{}{}:
        defer func() { <-c.sem }()
        return fn()
    case <-ctx.Done():
        return ctx.Err()
    }
}

// 应用示例
limiter := NewConcurrencyLimiter(5) // 最多 5 个 Agent 同时运行

var wg sync.WaitGroup
for _, agent := range agents {
    wg.Add(1)
    go func(a Agent) {
        defer wg.Done()
        err := limiter.Execute(ctx, func() error {
            _, err := a.Run(ctx, input)
            return err
        })
        if err != nil {
            log.Errorf("Agent %s failed: %v", a.Name(), err)
        }
    }(agent)
}
wg.Wait()

Token 预算管理

type TokenBudget struct {
    TotalBudget    int // 总预算
    UsedTokens     int // 已使用
    ReservedTokens int // 为后续步骤预留
}

func (tb *TokenBudget) CanAllocate(requested int) bool {
    available := tb.TotalBudget - tb.UsedTokens - tb.ReservedTokens
    return requested <= available
}

func (tb *TokenBudget) Allocate(tokens int) error {
    if !tb.CanAllocate(tokens) {
        return fmt.Errorf("token budget exceeded: need %d, available %d", 
            tokens, tb.TotalBudget-tb.UsedTokens-tb.ReservedTokens)
    }
    tb.UsedTokens += tokens
    return nil
}

// 在 Team 执行中应用
func (t *Team) runWithBudget(ctx context.Context, budget *TokenBudget) error {
    for i, agent := range t.agents {
        // 为后续 Agent 预留预算
        remainingSteps := len(t.agents) - i - 1
        budget.ReservedTokens = remainingSteps * t.avgTokensPerStep
        
        if !budget.CanAllocate(t.avgTokensPerStep) {
            return fmt.Errorf("insufficient token budget for agent %s", agent.Name())
        }
        
        result, err := agent.Run(ctx, input)
        if err != nil {
            return err
        }
        
        // 实际扣减(基于结果估算)
        actualTokens := estimateTokens(result)
        budget.Allocate(actualTokens)
    }
    return nil
}

监控与可观测性

关键指标

type TeamMetrics struct {
    // 延迟指标
    TotalLatency    prometheus.Histogram // 端到端延迟
    AgentLatency    prometheus.Histogram // 单个 Agent 延迟
    
    // 成功率
    SuccessRate     prometheus.Gauge     // 整体成功率
    AgentErrorRate  prometheus.CounterVec // 各 Agent 错误率
    
    // 资源使用
    TokenUsage      prometheus.Counter   // Token 消耗
    ActiveAgents    prometheus.Gauge     // 当前活跃 Agent 数
    
    // 业务指标
    StepCompletion  prometheus.CounterVec // 各步骤完成数
    RetryCount      prometheus.Counter   // 重试次数
    FallbackCount   prometheus.Counter   // 降级次数
}

分布式追踪

func (t *Team) runWithTracing(ctx context.Context, input string) (*Result, error) {
    // 创建根 Span
    ctx, span := tracer.Start(ctx, "team.execution",
        trace.WithAttributes(
            attribute.String("team.name", t.name),
            attribute.String("team.size", strconv.Itoa(len(t.agents))),
            attribute.String("input.hash", hashInput(input)),
        ),
    )
    defer span.End()
    
    for _, agent := range t.agents {
        // 每个 Agent 一个子 Span
        ctx, agentSpan := tracer.Start(ctx, fmt.Sprintf("agent.%s", agent.Name()))
        
        result, err := agent.Run(ctx, input)
        
        agentSpan.SetAttributes(
            attribute.String("agent.output_hash", hashOutput(result)),
            attribute.Int("agent.output_length", len(result)),
        )
        
        if err != nil {
            agentSpan.RecordError(err)
            agentSpan.SetStatus(codes.Error, err.Error())
        } else {
            agentSpan.SetStatus(codes.Ok, "success")
        }
        agentSpan.End()
        
        if err != nil {
            return nil, err
        }
        
        input = result // 传递结果给下一个 Agent
    }
    
    return &Result{Output: input}, nil
}

生产环境部署建议

1. 配置分离

# team-config.yaml
team:
  name: "customer-service-team"
  topology: "star"  # star | mesh | hierarchical
  
  orchestrator:
    model: "gemini-flash"
    timeout: 30s
    max_tokens: 2048
    
  workers:
    - name: "order-agent"
      model: "gemini-pro"
      timeout: 45s
      tools: ["order_query", "order_modify", "refund_process"]
      retry_policy:
        max_attempts: 3
        backoff: "exponential"
        
    - name: "logistics-agent"
      model: "gemini-flash"
      timeout: 30s
      tools: ["track_package", "estimate_delivery"]
      
    - name: "policy-agent"
      model: "gemini-flash"
      timeout: 20s
      tools: ["search_policy", "check_eligibility"]
      
  state:
    backend: "redis"  # memory | redis | etcd
    ttl: 3600s
    persistence: true
    
  limits:
    max_concurrent_agents: 10
    total_token_budget: 100000
    max_execution_time: 120s

2. 健康检查与自愈

func (t *Team) healthCheck() *HealthStatus {
    status := &HealthStatus{}
    
    for _, agent := range t.agents {
        agentStatus := agent.HealthCheck()
        status.Agents = append(status.Agents, agentStatus)
        
        if agentStatus.State != "healthy" {
            // 自动重启不健康的 Agent
            if agentStatus.State == "degraded" {
                go t.restartAgent(agent)
            }
            status.Overall = "degraded"
        }
    }
    
    if status.Overall == "" {
        status.Overall = "healthy"
    }
    
    return status
}

常见问题深度解析

Q:Agent 间通信该用同步还是异步?

A:取决于场景:

  • 同步调用:Sequential 工作流,需要等待结果才能继续。优点是实现简单、状态一致;缺点是阻塞等待增加延迟。
  • 异步调用:Parallel 工作流,通过消息队列解耦。优点是吞吐高、容错好;缺点是复杂度增加,需要处理乱序和超时。

生产建议:默认同步,当 Agent 数量 >5 或需要高吞吐时改用异步。

Q:如何防止 Orchestrator 成为瓶颈?

A:三个策略:

  1. 状态外置:将 State 存储在 Redis 等外部存储,Orchestrator 只保留引用
  2. 轻量级模型:Orchestrator 使用轻量模型,复杂判断下放给 Worker
  3. 缓存路由决策:对常见输入类型缓存路由结果,避免重复 LLM 调用

Q:Agent Team 的扩展性如何设计?

A:遵循以下原则:

  • 水平扩展:Worker Agent 无状态化,可随意增加实例
  • 负载均衡:基于输入特征哈希路由到不同实例
  • 动态扩缩容:根据队列深度自动调整 Worker 数量
  • 服务发现:使用 Consul/etcd 管理 Agent 注册与发现

下一步

理解了 Agent Team 的架构设计后,接下来深入 Sequential 工作流——顺序执行模式的实现细节与性能优化。

Rewind Sessions | Sequential 工作流 →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。