Agent 路由深度解析:动态分发策略、负载均衡与智能调度

Agent 路由是多 Agent 系统的"交通指挥中心"——它决定用户请求应该由哪个 Agent 处理。一个设计良好的路由系统能够显著提升系统吞吐量、降低延迟、提高准确率。本文将深入 ADK Go 中路由机制的架构设计、策略选择与生产实践。

路由系统的架构定位

在系统中的位置

                    用户请求
                ┌───────────────┐
                │     Router     │ ← 路由层(本文重点)
                │   (调度中心)    │
                └───────┬───────┘
        ┌───────────────┼───────────────┐
        │               │               │
        ▼               ▼               ▼
   ┌─────────┐    ┌─────────┐    ┌─────────┐
   │ Agent A │    │ Agent B │    │ Agent C │
   │(天气专家) │    │(订单专家) │    │(通用助手) │
   └─────────┘    └─────────┘    └─────────┘

路由系统的核心职责:

  1. 意图识别:理解用户请求的类型
  2. Agent 匹配:找到最合适的处理 Agent
  3. 负载均衡:避免单点过载
  4. 降级容错:失败时快速切换
  5. 缓存优化:避免重复路由决策

路由策略体系

1. 规则路由(Rule-Based Routing)

基于预定义规则进行匹配,确定性最高:

// 规则路由引擎
type RuleRouter struct {
    rules      []RoutingRule
    defaultAgent *agent.Agent
    cache      *RouteCache
    metrics    *RouterMetrics
}

type RoutingRule struct {
    Name        string
    Priority    int                    // 优先级,数字越大越优先
    Matcher     RuleMatcher            // 匹配器
    TargetAgent *agent.Agent           // 目标 Agent
    Preprocessors []Preprocessor       // 预处理链
    Transformers  []Transformer        // 输入转换器
}

// 规则匹配器接口
type RuleMatcher interface {
    Match(input string, context *RoutingContext) (bool, float64)
}

// 关键词匹配器
type KeywordMatcher struct {
    Keywords    []string
    MatchMode   MatchMode // ANY | ALL | EXACT
    CaseSensitive bool
}

func (m *KeywordMatcher) Match(input string, context *RoutingContext) (bool, float64) {
    inputLower := strings.ToLower(input)
    matched := 0
    
    for _, keyword := range m.Keywords {
        kw := keyword
        if !m.CaseSensitive {
            kw = strings.ToLower(kw)
        }
        
        if strings.Contains(inputLower, kw) {
            matched++
            if m.MatchMode == MatchModeAny {
                return true, 1.0
            }
        }
    }
    
    switch m.MatchMode {
    case MatchModeAll:
        return matched == len(m.Keywords), float64(matched) / float64(len(m.Keywords))
    case MatchModeExact:
        return matched == 1 && len(m.Keywords) == 1, float64(matched)
    default:
        return matched > 0, float64(matched) / float64(len(m.Keywords))
    }
}

// 正则匹配器
type RegexMatcher struct {
    Patterns []*regexp.Regexp
    Logic    LogicMode // AND | OR
}

func (m *RegexMatcher) Match(input string, context *RoutingContext) (bool, float64) {
    matched := 0
    
    for _, pattern := range m.Patterns {
        if pattern.MatchString(input) {
            matched++
            if m.Logic == LogicModeOR {
                return true, 1.0
            }
        }
    }
    
    if m.Logic == LogicModeAND {
        return matched == len(m.Patterns), float64(matched) / float64(len(m.Patterns))
    }
    
    return matched > 0, float64(matched) / float64(len(m.Patterns))
}

// 语义匹配器(基于 Embedding)
type SemanticMatcher struct {
    embeddings  map[string][]float64    // Agent 的语义向量
    threshold   float64                 // 相似度阈值
    model       EmbeddingModel          // Embedding 模型
}

func (m *SemanticMatcher) Match(input string, context *RoutingContext) (bool, float64) {
    inputVec, err := m.model.Embed(input)
    if err != nil {
        return false, 0
    }
    
    bestScore := 0.0
    for _, agentVec := range m.embeddings {
        score := cosineSimilarity(inputVec, agentVec)
        if score > bestScore {
            bestScore = score
        }
    }
    
    return bestScore >= m.threshold, bestScore
}

2. LLM 路由(LLM-Based Routing)

利用 LLM 的语义理解能力进行智能路由:

type LLMRouter struct {
    model         agent.Model
    agentRegistry map[string]*AgentDescriptor
    promptTemplate string
    cache         *RouteCache
}

type AgentDescriptor struct {
    Agent       *agent.Agent
    Name        string
    Description string
    Capabilities []string
    Examples    []string
}

func (r *LLMRouter) Route(ctx context.Context, input string) (*agent.Agent, error) {
    // 1. 检查缓存
    if cached := r.cache.Get(input); cached != nil {
        return cached.(*agent.Agent), nil
    }
    
    // 2. 构建路由提示
    prompt := r.buildRoutingPrompt(input)
    
    // 3. 调用 LLM 决策
    response, err := r.model.GenerateContent(ctx, prompt)
    if err != nil {
        return nil, fmt.Errorf("llm routing failed: %w", err)
    }
    
    // 4. 解析决策结果
    decision, err := r.parseRoutingDecision(response)
    if err != nil {
        return nil, fmt.Errorf("parse routing decision: %w", err)
    }
    
    // 5. 获取目标 Agent
    target, ok := r.agentRegistry[decision.AgentName]
    if !ok {
        return nil, fmt.Errorf("unknown agent: %s", decision.AgentName)
    }
    
    // 6. 缓存决策
    r.cache.Set(input, target.Agent, 5*time.Minute)
    
    return target.Agent, nil
}

func (r *LLMRouter) buildRoutingPrompt(input string) string {
    var sb strings.Builder
    sb.WriteString("你是一个智能路由系统。根据用户输入,选择最合适的 Agent 处理。\n\n")
    sb.WriteString("可用 Agent 列表:\n")
    
    for name, desc := range r.agentRegistry {
        sb.WriteString(fmt.Sprintf("\n【%s】\n", name))
        sb.WriteString(fmt.Sprintf("描述: %s\n", desc.Description))
        sb.WriteString(fmt.Sprintf("能力: %s\n", strings.Join(desc.Capabilities, ", ")))
        if len(desc.Examples) > 0 {
            sb.WriteString(fmt.Sprintf("示例: %s\n", strings.Join(desc.Examples, "; ")))
        }
    }
    
    sb.WriteString(fmt.Sprintf("\n用户输入: %s\n", input))
    sb.WriteString("\n请输出 JSON 格式:{\"agent\": \"Agent名称\", \"confidence\": 0.95, \"reason\": \"选择原因\"}")
    
    return sb.String()
}

3. 混合路由(Hybrid Routing)

结合规则路由的速度和 LLM 路由的准确性:

type HybridRouter struct {
    ruleRouter    *RuleRouter      // 快速路径
    llmRouter     *LLMRouter       // 智能路径
    fallbackAgent *agent.Agent     // 兜底 Agent
    
    // 策略配置
    ruleThreshold    float64       // 规则匹配置信度阈值
    llmThreshold     float64       // LLM 路由置信度阈值
    useLLMForUnknown bool          // 对未知输入使用 LLM
}

func (r *HybridRouter) Route(ctx context.Context, input string) (*agent.Agent, error) {
    // 第一层:规则路由(快速路径)
    ruleResult, confidence, err := r.ruleRouter.Match(input)
    if err == nil && confidence >= r.ruleThreshold {
        return ruleResult, nil
    }
    
    // 第二层:LLM 路由(智能路径)
    if r.useLLMForUnknown || confidence > 0 {
        llmResult, err := r.llmRouter.Route(ctx, input)
        if err == nil {
            return llmResult, nil
        }
    }
    
    // 兜底
    if r.fallbackAgent != nil {
        return r.fallbackAgent, nil
    }
    
    return nil, fmt.Errorf("no suitable agent found for input: %s", input)
}

负载感知调度

动态负载均衡

type LoadAwareRouter struct {
    agents      []*LoadAwareAgent
    strategy    LoadBalanceStrategy
    healthChecker *HealthChecker
}

type LoadAwareAgent struct {
    Agent        *agent.Agent
    CurrentLoad  float64           // 当前负载 (0-1)
    AvgLatency   time.Duration     // 平均延迟
    ErrorRate    float64           // 错误率
    LastChecked  time.Time         // 最后检查时间
    Weight       float64           // 权重
}

type LoadBalanceStrategy interface {
    Select(agents []*LoadAwareAgent, input string) *LoadAwareAgent
}

// 加权轮询
func (s *WeightedRoundRobin) Select(agents []*LoadAwareAgent, input string) *LoadAwareAgent {
    var best *LoadAwareAgent
    bestScore := -1.0
    
    for _, a := range agents {
        // 跳过不健康或超载的 Agent
        if a.ErrorRate > 0.5 || a.CurrentLoad > 0.9 {
            continue
        }
        
        // 计算综合分数:权重 / (负载 + 延迟归一化 + 错误率)
        latencyNorm := float64(a.AvgLatency.Milliseconds()) / 1000.0
        score := a.Weight / (a.CurrentLoad + latencyNorm + a.ErrorRate + 0.1)
        
        if score > bestScore {
            bestScore = score
            best = a
        }
    }
    
    return best
}

// 一致性哈希(确保相同输入路由到相同 Agent)
type ConsistentHashStrategy struct {
    ring *consistent.Consistent
}

func (s *ConsistentHashStrategy) Select(agents []*LoadAwareAgent, input string) *LoadAwareAgent {
    // 使用输入的哈希值选择 Agent
    agentName, err := s.ring.Get(input)
    if err != nil {
        return nil
    }
    
    for _, a := range agents {
        if a.Agent.Name() == agentName {
            return a
        }
    }
    
    return nil
}

// 最少连接
func (s *LeastConnections) Select(agents []*LoadAwareAgent, input string) *LoadAwareAgent {
    var best *LoadAwareAgent
    minConnections := int(^uint(0) >> 1) // MaxInt
    
    for _, a := range agents {
        connections := a.Agent.ActiveConnections()
        if connections < minConnections && a.CurrentLoad < 0.9 {
            minConnections = connections
            best = a
        }
    }
    
    return best
}

健康检查与自动剔除

type HealthChecker struct {
    checkInterval time.Duration
    agents        map[string]*AgentHealth
}

type AgentHealth struct {
    State         HealthState
    LastCheck     time.Time
    SuccessCount  int
    FailureCount  int
    ConsecutiveFailures int
}

type HealthState int
const (
    HealthStateHealthy   HealthState = iota
    HealthStateDegraded
    HealthStateUnhealthy
)

func (hc *HealthChecker) Start(ctx context.Context) {
    ticker := time.NewTicker(hc.checkInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            hc.checkAll()
        }
    }
}

func (hc *HealthChecker) checkAll() {
    for id, health := range hc.agents {
        // 执行健康检查
        err := hc.performCheck(id)
        
        if err != nil {
            health.FailureCount++
            health.ConsecutiveFailures++
            
            if health.ConsecutiveFailures >= 3 {
                health.State = HealthStateUnhealthy
            } else if health.ConsecutiveFailures >= 1 {
                health.State = HealthStateDegraded
            }
        } else {
            health.SuccessCount++
            health.ConsecutiveFailures = 0
            health.State = HealthStateHealthy
        }
        
        health.LastCheck = time.Now()
    }
}

路由缓存与优化

多级缓存架构

type MultiLevelRouteCache struct {
    l1 *LRUCache      // L1:内存缓存,O(1) 访问
    l2 *RedisCache    // L2:分布式缓存,跨实例共享
    l3 *PersistentCache // L3:持久化缓存,重启不丢失
}

func (c *MultiLevelRouteCache) Get(input string) (*agent.Agent, error) {
    // L1 查找
    if agent := c.l1.Get(input); agent != nil {
        return agent, nil
    }
    
    // L2 查找
    if agent := c.l2.Get(input); agent != nil {
        c.l1.Set(input, agent, time.Minute) // 回填 L1
        return agent, nil
    }
    
    // L3 查找
    if agent := c.l3.Get(input); agent != nil {
        c.l2.Set(input, agent, time.Hour)   // 回填 L2
        c.l1.Set(input, agent, time.Minute) // 回填 L1
        return agent, nil
    }
    
    return nil, fmt.Errorf("cache miss")
}

func (c *MultiLevelRouteCache) Set(input string, agent *agent.Agent, ttl time.Duration) {
    c.l1.Set(input, agent, ttl/10)        // L1 缓存 1/10 TTL
    c.l2.Set(input, agent, ttl)           // L2 缓存完整 TTL
    c.l3.Set(input, agent, ttl*24)        // L3 缓存 24 倍 TTL
}

缓存预热

func (r *Router) PreheatCache(ctx context.Context, sampleInputs []string) error {
    for _, input := range sampleInputs {
        agent, err := r.Route(ctx, input)
        if err != nil {
            log.Printf("Preheating failed for '%s': %v", input, err)
            continue
        }
        
        // 预缓存结果
        r.cache.Set(input, agent, 24*time.Hour)
    }
    
    return nil
}

实战场景:多功能智能助手

完整实现

package main

import (
    "context"
    "fmt"
    "log"
    "regexp"
    "strings"
    "time"
    
    "github.com/google/adk-go/agent"
    "github.com/google/adk-go/team"
    "github.com/google/adk-go/tool"
)

// IntelligentAssistant 多功能智能助手
type IntelligentAssistant struct {
    router *team.HybridRouter
}

func NewIntelligentAssistant(model agent.Model) (*IntelligentAssistant, error) {
    // 1. 创建各个领域的 Agent
    
    // 天气 Agent
    weatherAgent, err := agent.New(agent.Config{
        Name:        "weather-expert",
        Model:       model,
        Instruction: `你是天气专家。提供准确的天气预报、穿衣建议、出行提醒。`,
        Tools: []tool.Tool{
            tool.NewWeatherQueryTool(),
            tool.NewAirQualityTool(),
        },
        Timeout: 10 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    
    // 订单 Agent
    orderAgent, err := agent.New(agent.Config{
        Name:        "order-expert",
        Model:       model,
        Instruction: `你是订单处理专家。帮助用户查询、修改、取消订单。`,
        Tools: []tool.Tool{
            tool.NewOrderQueryTool(),
            tool.NewOrderModifyTool(),
        },
        Timeout: 15 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    
    // 物流 Agent
    logisticsAgent, err := agent.New(agent.Config{
        Name:        "logistics-expert",
        Model:       model,
        Instruction: `你是物流查询专家。追踪包裹、预估送达时间。`,
        Tools: []tool.Tool{
            tool.NewPackageTrackingTool(),
        },
        Timeout: 10 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    
    // 写作 Agent
    writingAgent, err := agent.New(agent.Config{
        Name:        "writing-expert",
        Model:       model,
        Instruction: `你是写作助手。帮助用户撰写、润色各类文案。`,
        Timeout: 20 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    
    // 通用 Agent
    generalAgent, err := agent.New(agent.Config{
        Name:        "general-assistant",
        Model:       model,
        Instruction: `你是通用助手。回答用户的各类问题,保持友好和专业。`,
        Timeout: 15 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    
    // 2. 构建规则路由
    ruleRouter := team.NewRuleRouter()
    
    // 天气规则(高优先级)
    ruleRouter.AddRule(team.RoutingRule{
        Name:     "weather-rule",
        Priority: 100,
        Matcher: team.NewKeywordMatcher([]string{
            "天气", "温度", "下雨", "下雪", "空气质量", "穿衣", "带伞",
        }, team.MatchModeAny, false),
        TargetAgent: weatherAgent,
    })
    
    // 订单规则
    ruleRouter.AddRule(team.RoutingRule{
        Name:     "order-rule",
        Priority: 90,
        Matcher: team.NewCompositeMatcher(team.LogicModeOR,
            team.NewKeywordMatcher([]string{"订单", "购买", "付款", "退款", "取消"}, team.MatchModeAny, false),
            team.NewRegexMatcher([]*regexp.Regexp{
                regexp.MustCompile(`订单号[::]?\s*\d+`),
            }, team.LogicModeOR),
        ),
        TargetAgent: orderAgent,
    })
    
    // 物流规则
    ruleRouter.AddRule(team.RoutingRule{
        Name:     "logistics-rule",
        Priority: 90,
        Matcher: team.NewKeywordMatcher([]string{
            "快递", "物流", "包裹", "发货", "送到", "在哪",
        }, team.MatchModeAny, false),
        TargetAgent: logisticsAgent,
    })
    
    // 写作规则
    ruleRouter.AddRule(team.RoutingRule{
        Name:     "writing-rule",
        Priority: 80,
        Matcher: team.NewKeywordMatcher([]string{
            "写", "创作", "润色", "修改", "文案", "文章", "邮件",
        }, team.MatchModeAny, false),
        TargetAgent: writingAgent,
    })
    
    // 3. 构建 LLM 路由(用于模糊匹配)
    llmRouter := team.NewLLMRouter(model, map[string]*team.AgentDescriptor{
        "weather-expert": {
            Agent:        weatherAgent,
            Description:  "处理天气相关查询",
            Capabilities: []string{"天气预报", "空气质量", "穿衣建议"},
            Examples:     []string{"今天天气怎么样", "明天需要带伞吗"},
        },
        "order-expert": {
            Agent:        orderAgent,
            Description:  "处理订单相关操作",
            Capabilities: []string{"订单查询", "订单修改", "退款处理"},
            Examples:     []string{"查一下我的订单", "我想取消订单"},
        },
        // ... 其他 Agent
    })
    
    // 4. 构建混合路由
    hybridRouter := team.NewHybridRouter(
        team.WithRuleRouter(ruleRouter),
        team.WithLLMRouter(llmRouter),
        team.WithFallbackAgent(generalAgent),
        team.WithRuleThreshold(0.8),
        team.WithLLMThreshold(0.7),
        team.WithCache(team.NewMultiLevelRouteCache()),
    )
    
    return &IntelligentAssistant{router: hybridRouter}, nil
}

func (a *IntelligentAssistant) Handle(ctx context.Context, userInput string) (string, error) {
    // 路由到合适的 Agent
    selectedAgent, err := a.router.Route(ctx, userInput)
    if err != nil {
        return "", fmt.Errorf("routing failed: %w", err)
    }
    
    log.Printf("[Router] Input routed to %s", selectedAgent.Name())
    
    // 执行 Agent
    result, err := selectedAgent.Run(ctx, userInput)
    if err != nil {
        return "", fmt.Errorf("agent execution failed: %w", err)
    }
    
    return result, nil
}

func main() {
    ctx := context.Background()
    
    assistant, err := NewIntelligentAssistant(model)
    if err != nil {
        log.Fatalf("Failed to create assistant: %v", err)
    }
    
    // 测试不同场景
    testCases := []string{
        "今天北京天气怎么样",
        "查一下我的订单 12345",
        "我的快递到哪了",
        "帮我写一封辞职信",
        "什么是量子计算",
    }
    
    for _, input := range testCases {
        response, err := assistant.Handle(ctx, input)
        if err != nil {
            log.Printf("Error: %v", err)
            continue
        }
        fmt.Printf("\n用户: %s\n助手: %s\n", input, response)
    }
}

路由决策的可观测性

追踪与审计

type RoutingDecision struct {
    RequestID     string
    Timestamp     time.Time
    Input         string
    InputHash     string
    SelectedAgent string
    RouteType     string // "rule" | "llm" | "fallback"
    Confidence    float64
    LatencyMs     int64
    RulesChecked  []string
    CacheHit      bool
}

func (r *Router) logDecision(decision *RoutingDecision) {
    // 记录到日志
    logData, _ := json.Marshal(decision)
    log.Printf("[RoutingDecision] %s", logData)
    
    // 记录到指标
    r.metrics.RoutingLatency.WithLabelValues(decision.RouteType).Observe(float64(decision.LatencyMs))
    r.metrics.RoutingCounter.WithLabelValues(decision.SelectedAgent, decision.RouteType).Inc()
    
    if decision.CacheHit {
        r.metrics.CacheHitCounter.Inc()
    } else {
        r.metrics.CacheMissCounter.Inc()
    }
}

常见问题深度解析

Q:规则路由和 LLM 路由如何选择?

A:决策矩阵:

维度规则路由LLM 路由
延迟<1ms500ms-2s
准确性高(确定性)高(语义理解)
成本高(每次 LLM 调用)
维护需更新规则自动适应
适用明确分类模糊语义

生产建议:混合使用,规则路由处理 80% 的明确请求,LLM 路由处理 20% 的模糊请求。

Q:路由缓存多久失效?

A:多级失效策略:

  • L1 内存:1-5 分钟,基于访问频率
  • L2 Redis:1 小时,基于时间
  • L3 持久化:24 小时,基于 LRU

对于动态内容(如"今天天气"),禁用缓存或使用极短 TTL。

Q:如何处理路由错误(选错 Agent)?

A:三层防护:

  1. 置信度阈值:低置信度时返回通用 Agent
  2. 用户确认:对模糊请求,询问用户意图
  3. 自动纠错:Agent 处理失败时,自动尝试其他 Agent
func (r *Router) RouteWithFallback(ctx context.Context, input string) (*agent.Agent, error) {
    agent, confidence, err := r.route(ctx, input)
    if err != nil {
        return r.fallbackAgent, nil
    }
    
    if confidence < 0.5 {
        // 低置信度,可能需要用户确认
        return r.clarificationAgent, nil
    }
    
    return agent, nil
}

下一步

Agent 路由的动态分发与智能调度已深入掌握。模块 5 全部完成,接下来进入模块 6:流式交互——Streaming 原理、Event 处理与实时通信。

Custom Workflow | Streaming 原理 →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。