Agent Team 架构深度解析:从单体到分布式协作的设计哲学
深入剖析 ADK Go 中 Agent Team 的架构设计原理,涵盖多 Agent 协作的拓扑模型、状态一致性、容错机制与生产环境最佳实践。
Agent Team 架构深度解析:从单体到分布式协作的设计哲学
在构建复杂 AI 应用时,单体 Agent 的能力边界很快会显现。当任务涉及多领域知识、长流程编排或高并发处理时,Agent Team(多智能体团队)成为必然选择。本文将从架构层面深入剖析 ADK Go 中 Agent Team 的设计哲学、协作模型与生产实践。
单体 Agent 的局限性分析
能力边界与上下文瓶颈
单个 LLM Agent 面临的核心约束包括:
| 约束维度 | 具体表现 | 影响 |
|---|---|---|
| 上下文窗口 | 模型 token 上限(4K-128K) | 长流程中历史信息被截断 |
| 领域专注度 | 指令过多导致注意力分散 | 多领域任务时表现下降 |
| 工具复杂度 | 工具过多增加决策负担 | 工具选择准确率降低 |
| 并发处理 | 单线程顺序执行 | 无法并行处理独立子任务 |
| 容错能力 | 单点故障 | 一处错误导致整体失败 |
生产经验:在电商客服场景中,单体 Agent 同时处理订单查询、物流追踪、售后政策三个领域时,工具调用准确率从 92% 下降至 67%。拆分为三个领域 Agent 后,整体准确率回升至 89%。
什么时候必须引入 Agent Team
决策树:是否需要 Agent Team?
│
┌───────────┴───────────┐
▼ ▼
任务步骤 > 3? 涉及多领域?
│ │
是 ──┤ 是 ──┤
▼ ▼
需要状态传递? 领域间需要协作?
│ │
是 ──┼──→ 需要 Team 是 ──┼──→ 需要 Team
否 ──┘ 否 ──┘
▼ ▼
可用 Sequential 可用 Parallel
Agent Team 的拓扑架构模型
三种核心拓扑
ADK Go 支持三种 Agent Team 拓扑结构,每种适用于不同的业务场景:
1. 星型拓扑(Star Topology)
┌─────────────┐
│ Orchestrator │ ← 中央调度器
│ (主控 Agent) │
└──────┬──────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent A │ │ Agent B │ │ Agent C │
│(查询专家)│ │(写作专家)│ │(审核专家)│
└─────────┘ └─────────┘ └─────────┘
适用场景:任务明确需要分阶段处理,Orchestrator 负责状态管理与流程控制。
生产要点:
- Orchestrator 不应包含业务逻辑,只负责调度
- 避免 Orchestrator 成为性能瓶颈——状态操作必须是 O(1)
- 建议 Orchestrator 使用轻量级模型(如 gemini-flash),Worker 使用能力更强的模型
2. 网状拓扑(Mesh Topology)
┌─────────┐ ┌─────────┐
│ Agent A │◄───────►│ Agent B │
│(数据源A) │ │(数据源B) │
└────┬────┘ └────┬────┘
│ │
└─────────┬─────────┘
▼
┌─────────┐
│ Agent C │
│(聚合分析) │
└────┬────┘
│
▼
┌─────────┐
│ Agent D │
│(结果输出) │
└─────────┘
适用场景:数据流复杂、Agent 间需要频繁交换信息的场景,如多源数据分析。
注意事项:
- 网状拓扑的复杂度随 Agent 数量呈 O(n²) 增长
- 必须定义清晰的消息协议,避免循环依赖
- 建议引入消息总线(Message Bus)解耦 Agent 间通信
3. 分层拓扑(Hierarchical Topology)
┌─────────────┐
│ 顶层 Orchestrator │
│ (战略调度) │
└──────┬──────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Team A │ │ Team B │ │ Team C │
│子Orchestrator│ │子Orchestrator│ │子Orchestrator│
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
┌────┴────┐ ┌────┴────┐ ┌────┴────┐
│A1 │ A2 │ │B1 │ B2 │ │C1 │ C2 │
└───┴────┘ └───┴────┘ └───┴────┘
适用场景:超大规模 Agent 系统(>10 个 Agent),如企业级智能客服平台。
架构优势:
- 每层只管理 3-5 个子节点,符合认知负荷理论
- 子团队可独立部署、独立扩展
- 故障隔离——一个子团队故障不影响其他团队
状态管理与数据一致性
Shared State 架构
ADK Go 的 Agent Team 通过 Shared State 实现数据共享:
// State 的层次结构
type TeamState struct {
// 全局状态 - 所有 Agent 可读
Global map[string]interface{} `json:"global"`
// 局部状态 - 仅特定 Agent 可访问
Local map[string]*AgentState `json:"local"`
// 会话状态 - 跨轮次持久化
Session *SessionState `json:"session"`
// 元数据 - 调试与监控用
Meta *StateMeta `json:"meta"`
}
type AgentState struct {
Output string `json:"output"`
ToolCalls []ToolCallRecord `json:"tool_calls"`
Metrics *ExecutionMetrics `json:"metrics"`
Timestamp time.Time `json:"timestamp"`
}
状态传递的最佳实践
// 生产级状态管理示例
func orchestrateWithState(ctx context.Context, team *Team, input string) (*Result, error) {
// 1. 初始化状态 - 使用不可变快照避免竞态
state := team.NewState()
state.SetGlobal("input", input)
state.SetGlobal("start_time", time.Now())
state.SetGlobal("request_id", generateRequestID())
// 2. 执行 Worker Agent - 每个 Worker 获得状态的副本
for _, worker := range team.Workers {
workerState := state.Fork() // 创建分支,避免直接修改全局状态
result, err := worker.Run(ctx, workerState)
if err != nil {
// 3. 错误处理 - 记录失败但不中断整体流程
state.SetLocal(worker.Name(), "error", err)
state.SetLocal(worker.Name(), "status", "failed")
if worker.IsCritical() {
return nil, fmt.Errorf("critical worker %s failed: %w", worker.Name(), err)
}
continue
}
// 4. 合并结果 - 使用两阶段提交避免部分更新
if err := state.Merge(worker.Name(), result); err != nil {
return nil, fmt.Errorf("state merge failed: %w", err)
}
}
// 5. 最终校验
if err := validateFinalState(state); err != nil {
return nil, fmt.Errorf("validation failed: %w", err)
}
return state.ToResult(), nil
}
关键设计原则:
- 不可变状态(Immutable State):Worker 操作状态副本,通过显式 Merge 提交变更
- 版本控制:每次状态变更递增版本号,便于追踪和回滚
- TTL 管理:状态设置过期时间,防止内存泄漏
- 序列化安全:所有状态值必须可 JSON 序列化,便于持久化和调试
容错与重试机制
生产级重试策略
type RetryPolicy struct {
MaxAttempts int // 最大重试次数(建议 3-5)
InitialBackoff time.Duration // 初始退避时间(建议 1s)
MaxBackoff time.Duration // 最大退避时间(建议 30s)
BackoffMultiplier float64 // 退避倍数(建议 2.0)
RetryableErrors []string // 可重试的错误类型
CircuitBreaker *CircuitBreakerConfig // 熔断配置
}
type CircuitBreakerConfig struct {
FailureThreshold int // 熔断触发阈值
ResetTimeout time.Duration // 熔断后重置时间
HalfOpenRequests int // 半开状态测试请求数
}
// 实际应用
retryPolicy := &RetryPolicy{
MaxAttempts: 3,
InitialBackoff: time.Second,
MaxBackoff: 30 * time.Second,
BackoffMultiplier: 2.0,
RetryableErrors: []string{"rate_limit", "timeout", "service_unavailable"},
CircuitBreaker: &CircuitBreakerConfig{
FailureThreshold: 5,
ResetTimeout: 60 * time.Second,
HalfOpenRequests: 2,
},
}
降级策略(Fallback)
当 Agent 持续失败时,系统应优雅降级:
func (t *Team) executeWithFallback(ctx context.Context, agent Agent, input string) (string, error) {
// 1. 尝试主 Agent
result, err := t.executeWithRetry(ctx, agent, input, t.primaryPolicy)
if err == nil {
return result, nil
}
// 2. 主 Agent 失败,尝试备用 Agent(简化版)
if t.fallbackAgent != nil {
log.Warnf("Primary agent %s failed, falling back to %s", agent.Name(), t.fallbackAgent.Name())
result, err = t.fallbackAgent.Run(ctx, input)
if err == nil {
// 标记降级状态,用于监控告警
t.metrics.RecordDegradation(agent.Name(), t.fallbackAgent.Name())
return result, nil
}
}
// 3. 备用也失败,返回缓存结果(如果可用)
if cached := t.cache.Get(input); cached != nil {
log.Warnf("All agents failed, returning cached result")
t.metrics.RecordCacheHit(input)
return cached.(string), nil
}
// 4. 全部失败,返回友好错误
return "", fmt.Errorf("service temporarily unavailable, please retry later")
}
性能优化与资源管理
并发控制
// 使用信号量限制并发数,防止资源耗尽
type ConcurrencyLimiter struct {
sem chan struct{}
}
func NewConcurrencyLimiter(maxConcurrent int) *ConcurrencyLimiter {
return &ConcurrencyLimiter{
sem: make(chan struct{}, maxConcurrent),
}
}
func (c *ConcurrencyLimiter) Execute(ctx context.Context, fn func() error) error {
select {
case c.sem <- struct{}{}:
defer func() { <-c.sem }()
return fn()
case <-ctx.Done():
return ctx.Err()
}
}
// 应用示例
limiter := NewConcurrencyLimiter(5) // 最多 5 个 Agent 同时运行
var wg sync.WaitGroup
for _, agent := range agents {
wg.Add(1)
go func(a Agent) {
defer wg.Done()
err := limiter.Execute(ctx, func() error {
_, err := a.Run(ctx, input)
return err
})
if err != nil {
log.Errorf("Agent %s failed: %v", a.Name(), err)
}
}(agent)
}
wg.Wait()
Token 预算管理
type TokenBudget struct {
TotalBudget int // 总预算
UsedTokens int // 已使用
ReservedTokens int // 为后续步骤预留
}
func (tb *TokenBudget) CanAllocate(requested int) bool {
available := tb.TotalBudget - tb.UsedTokens - tb.ReservedTokens
return requested <= available
}
func (tb *TokenBudget) Allocate(tokens int) error {
if !tb.CanAllocate(tokens) {
return fmt.Errorf("token budget exceeded: need %d, available %d",
tokens, tb.TotalBudget-tb.UsedTokens-tb.ReservedTokens)
}
tb.UsedTokens += tokens
return nil
}
// 在 Team 执行中应用
func (t *Team) runWithBudget(ctx context.Context, budget *TokenBudget) error {
for i, agent := range t.agents {
// 为后续 Agent 预留预算
remainingSteps := len(t.agents) - i - 1
budget.ReservedTokens = remainingSteps * t.avgTokensPerStep
if !budget.CanAllocate(t.avgTokensPerStep) {
return fmt.Errorf("insufficient token budget for agent %s", agent.Name())
}
result, err := agent.Run(ctx, input)
if err != nil {
return err
}
// 实际扣减(基于结果估算)
actualTokens := estimateTokens(result)
budget.Allocate(actualTokens)
}
return nil
}
监控与可观测性
关键指标
type TeamMetrics struct {
// 延迟指标
TotalLatency prometheus.Histogram // 端到端延迟
AgentLatency prometheus.Histogram // 单个 Agent 延迟
// 成功率
SuccessRate prometheus.Gauge // 整体成功率
AgentErrorRate prometheus.CounterVec // 各 Agent 错误率
// 资源使用
TokenUsage prometheus.Counter // Token 消耗
ActiveAgents prometheus.Gauge // 当前活跃 Agent 数
// 业务指标
StepCompletion prometheus.CounterVec // 各步骤完成数
RetryCount prometheus.Counter // 重试次数
FallbackCount prometheus.Counter // 降级次数
}
分布式追踪
func (t *Team) runWithTracing(ctx context.Context, input string) (*Result, error) {
// 创建根 Span
ctx, span := tracer.Start(ctx, "team.execution",
trace.WithAttributes(
attribute.String("team.name", t.name),
attribute.String("team.size", strconv.Itoa(len(t.agents))),
attribute.String("input.hash", hashInput(input)),
),
)
defer span.End()
for _, agent := range t.agents {
// 每个 Agent 一个子 Span
ctx, agentSpan := tracer.Start(ctx, fmt.Sprintf("agent.%s", agent.Name()))
result, err := agent.Run(ctx, input)
agentSpan.SetAttributes(
attribute.String("agent.output_hash", hashOutput(result)),
attribute.Int("agent.output_length", len(result)),
)
if err != nil {
agentSpan.RecordError(err)
agentSpan.SetStatus(codes.Error, err.Error())
} else {
agentSpan.SetStatus(codes.Ok, "success")
}
agentSpan.End()
if err != nil {
return nil, err
}
input = result // 传递结果给下一个 Agent
}
return &Result{Output: input}, nil
}
生产环境部署建议
1. 配置分离
# team-config.yaml
team:
name: "customer-service-team"
topology: "star" # star | mesh | hierarchical
orchestrator:
model: "gemini-flash"
timeout: 30s
max_tokens: 2048
workers:
- name: "order-agent"
model: "gemini-pro"
timeout: 45s
tools: ["order_query", "order_modify", "refund_process"]
retry_policy:
max_attempts: 3
backoff: "exponential"
- name: "logistics-agent"
model: "gemini-flash"
timeout: 30s
tools: ["track_package", "estimate_delivery"]
- name: "policy-agent"
model: "gemini-flash"
timeout: 20s
tools: ["search_policy", "check_eligibility"]
state:
backend: "redis" # memory | redis | etcd
ttl: 3600s
persistence: true
limits:
max_concurrent_agents: 10
total_token_budget: 100000
max_execution_time: 120s
2. 健康检查与自愈
func (t *Team) healthCheck() *HealthStatus {
status := &HealthStatus{}
for _, agent := range t.agents {
agentStatus := agent.HealthCheck()
status.Agents = append(status.Agents, agentStatus)
if agentStatus.State != "healthy" {
// 自动重启不健康的 Agent
if agentStatus.State == "degraded" {
go t.restartAgent(agent)
}
status.Overall = "degraded"
}
}
if status.Overall == "" {
status.Overall = "healthy"
}
return status
}
常见问题深度解析
Q:Agent 间通信该用同步还是异步?
A:取决于场景:
- 同步调用:Sequential 工作流,需要等待结果才能继续。优点是实现简单、状态一致;缺点是阻塞等待增加延迟。
- 异步调用:Parallel 工作流,通过消息队列解耦。优点是吞吐高、容错好;缺点是复杂度增加,需要处理乱序和超时。
生产建议:默认同步,当 Agent 数量 >5 或需要高吞吐时改用异步。
Q:如何防止 Orchestrator 成为瓶颈?
A:三个策略:
- 状态外置:将 State 存储在 Redis 等外部存储,Orchestrator 只保留引用
- 轻量级模型:Orchestrator 使用轻量模型,复杂判断下放给 Worker
- 缓存路由决策:对常见输入类型缓存路由结果,避免重复 LLM 调用
Q:Agent Team 的扩展性如何设计?
A:遵循以下原则:
- 水平扩展:Worker Agent 无状态化,可随意增加实例
- 负载均衡:基于输入特征哈希路由到不同实例
- 动态扩缩容:根据队列深度自动调整 Worker 数量
- 服务发现:使用 Consul/etcd 管理 Agent 注册与发现
下一步
理解了 Agent Team 的架构设计后,接下来深入 Sequential 工作流——顺序执行模式的实现细节与性能优化。
← Rewind Sessions | Sequential 工作流 →
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
