Agent 路由深度解析:动态分发策略、负载均衡与智能调度
深入剖析 ADK Go Agent 路由机制的设计原理,涵盖规则路由、LLM 路由、混合路由策略、负载感知调度与生产级流量管理。
Agent 路由深度解析:动态分发策略、负载均衡与智能调度
Agent 路由是多 Agent 系统的"交通指挥中心"——它决定用户请求应该由哪个 Agent 处理。一个设计良好的路由系统能够显著提升系统吞吐量、降低延迟、提高准确率。本文将深入 ADK Go 中路由机制的架构设计、策略选择与生产实践。
路由系统的架构定位
在系统中的位置
用户请求
│
▼
┌───────────────┐
│ Router │ ← 路由层(本文重点)
│ (调度中心) │
└───────┬───────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent A │ │ Agent B │ │ Agent C │
│(天气专家) │ │(订单专家) │ │(通用助手) │
└─────────┘ └─────────┘ └─────────┘
路由系统的核心职责:
- 意图识别:理解用户请求的类型
- Agent 匹配:找到最合适的处理 Agent
- 负载均衡:避免单点过载
- 降级容错:失败时快速切换
- 缓存优化:避免重复路由决策
路由策略体系
1. 规则路由(Rule-Based Routing)
基于预定义规则进行匹配,确定性最高:
// 规则路由引擎
type RuleRouter struct {
rules []RoutingRule
defaultAgent *agent.Agent
cache *RouteCache
metrics *RouterMetrics
}
type RoutingRule struct {
Name string
Priority int // 优先级,数字越大越优先
Matcher RuleMatcher // 匹配器
TargetAgent *agent.Agent // 目标 Agent
Preprocessors []Preprocessor // 预处理链
Transformers []Transformer // 输入转换器
}
// 规则匹配器接口
type RuleMatcher interface {
Match(input string, context *RoutingContext) (bool, float64)
}
// 关键词匹配器
type KeywordMatcher struct {
Keywords []string
MatchMode MatchMode // ANY | ALL | EXACT
CaseSensitive bool
}
func (m *KeywordMatcher) Match(input string, context *RoutingContext) (bool, float64) {
inputLower := strings.ToLower(input)
matched := 0
for _, keyword := range m.Keywords {
kw := keyword
if !m.CaseSensitive {
kw = strings.ToLower(kw)
}
if strings.Contains(inputLower, kw) {
matched++
if m.MatchMode == MatchModeAny {
return true, 1.0
}
}
}
switch m.MatchMode {
case MatchModeAll:
return matched == len(m.Keywords), float64(matched) / float64(len(m.Keywords))
case MatchModeExact:
return matched == 1 && len(m.Keywords) == 1, float64(matched)
default:
return matched > 0, float64(matched) / float64(len(m.Keywords))
}
}
// 正则匹配器
type RegexMatcher struct {
Patterns []*regexp.Regexp
Logic LogicMode // AND | OR
}
func (m *RegexMatcher) Match(input string, context *RoutingContext) (bool, float64) {
matched := 0
for _, pattern := range m.Patterns {
if pattern.MatchString(input) {
matched++
if m.Logic == LogicModeOR {
return true, 1.0
}
}
}
if m.Logic == LogicModeAND {
return matched == len(m.Patterns), float64(matched) / float64(len(m.Patterns))
}
return matched > 0, float64(matched) / float64(len(m.Patterns))
}
// 语义匹配器(基于 Embedding)
type SemanticMatcher struct {
embeddings map[string][]float64 // Agent 的语义向量
threshold float64 // 相似度阈值
model EmbeddingModel // Embedding 模型
}
func (m *SemanticMatcher) Match(input string, context *RoutingContext) (bool, float64) {
inputVec, err := m.model.Embed(input)
if err != nil {
return false, 0
}
bestScore := 0.0
for _, agentVec := range m.embeddings {
score := cosineSimilarity(inputVec, agentVec)
if score > bestScore {
bestScore = score
}
}
return bestScore >= m.threshold, bestScore
}
2. LLM 路由(LLM-Based Routing)
利用 LLM 的语义理解能力进行智能路由:
type LLMRouter struct {
model agent.Model
agentRegistry map[string]*AgentDescriptor
promptTemplate string
cache *RouteCache
}
type AgentDescriptor struct {
Agent *agent.Agent
Name string
Description string
Capabilities []string
Examples []string
}
func (r *LLMRouter) Route(ctx context.Context, input string) (*agent.Agent, error) {
// 1. 检查缓存
if cached := r.cache.Get(input); cached != nil {
return cached.(*agent.Agent), nil
}
// 2. 构建路由提示
prompt := r.buildRoutingPrompt(input)
// 3. 调用 LLM 决策
response, err := r.model.GenerateContent(ctx, prompt)
if err != nil {
return nil, fmt.Errorf("llm routing failed: %w", err)
}
// 4. 解析决策结果
decision, err := r.parseRoutingDecision(response)
if err != nil {
return nil, fmt.Errorf("parse routing decision: %w", err)
}
// 5. 获取目标 Agent
target, ok := r.agentRegistry[decision.AgentName]
if !ok {
return nil, fmt.Errorf("unknown agent: %s", decision.AgentName)
}
// 6. 缓存决策
r.cache.Set(input, target.Agent, 5*time.Minute)
return target.Agent, nil
}
func (r *LLMRouter) buildRoutingPrompt(input string) string {
var sb strings.Builder
sb.WriteString("你是一个智能路由系统。根据用户输入,选择最合适的 Agent 处理。\n\n")
sb.WriteString("可用 Agent 列表:\n")
for name, desc := range r.agentRegistry {
sb.WriteString(fmt.Sprintf("\n【%s】\n", name))
sb.WriteString(fmt.Sprintf("描述: %s\n", desc.Description))
sb.WriteString(fmt.Sprintf("能力: %s\n", strings.Join(desc.Capabilities, ", ")))
if len(desc.Examples) > 0 {
sb.WriteString(fmt.Sprintf("示例: %s\n", strings.Join(desc.Examples, "; ")))
}
}
sb.WriteString(fmt.Sprintf("\n用户输入: %s\n", input))
sb.WriteString("\n请输出 JSON 格式:{\"agent\": \"Agent名称\", \"confidence\": 0.95, \"reason\": \"选择原因\"}")
return sb.String()
}
3. 混合路由(Hybrid Routing)
结合规则路由的速度和 LLM 路由的准确性:
type HybridRouter struct {
ruleRouter *RuleRouter // 快速路径
llmRouter *LLMRouter // 智能路径
fallbackAgent *agent.Agent // 兜底 Agent
// 策略配置
ruleThreshold float64 // 规则匹配置信度阈值
llmThreshold float64 // LLM 路由置信度阈值
useLLMForUnknown bool // 对未知输入使用 LLM
}
func (r *HybridRouter) Route(ctx context.Context, input string) (*agent.Agent, error) {
// 第一层:规则路由(快速路径)
ruleResult, confidence, err := r.ruleRouter.Match(input)
if err == nil && confidence >= r.ruleThreshold {
return ruleResult, nil
}
// 第二层:LLM 路由(智能路径)
if r.useLLMForUnknown || confidence > 0 {
llmResult, err := r.llmRouter.Route(ctx, input)
if err == nil {
return llmResult, nil
}
}
// 兜底
if r.fallbackAgent != nil {
return r.fallbackAgent, nil
}
return nil, fmt.Errorf("no suitable agent found for input: %s", input)
}
负载感知调度
动态负载均衡
type LoadAwareRouter struct {
agents []*LoadAwareAgent
strategy LoadBalanceStrategy
healthChecker *HealthChecker
}
type LoadAwareAgent struct {
Agent *agent.Agent
CurrentLoad float64 // 当前负载 (0-1)
AvgLatency time.Duration // 平均延迟
ErrorRate float64 // 错误率
LastChecked time.Time // 最后检查时间
Weight float64 // 权重
}
type LoadBalanceStrategy interface {
Select(agents []*LoadAwareAgent, input string) *LoadAwareAgent
}
// 加权轮询
func (s *WeightedRoundRobin) Select(agents []*LoadAwareAgent, input string) *LoadAwareAgent {
var best *LoadAwareAgent
bestScore := -1.0
for _, a := range agents {
// 跳过不健康或超载的 Agent
if a.ErrorRate > 0.5 || a.CurrentLoad > 0.9 {
continue
}
// 计算综合分数:权重 / (负载 + 延迟归一化 + 错误率)
latencyNorm := float64(a.AvgLatency.Milliseconds()) / 1000.0
score := a.Weight / (a.CurrentLoad + latencyNorm + a.ErrorRate + 0.1)
if score > bestScore {
bestScore = score
best = a
}
}
return best
}
// 一致性哈希(确保相同输入路由到相同 Agent)
type ConsistentHashStrategy struct {
ring *consistent.Consistent
}
func (s *ConsistentHashStrategy) Select(agents []*LoadAwareAgent, input string) *LoadAwareAgent {
// 使用输入的哈希值选择 Agent
agentName, err := s.ring.Get(input)
if err != nil {
return nil
}
for _, a := range agents {
if a.Agent.Name() == agentName {
return a
}
}
return nil
}
// 最少连接
func (s *LeastConnections) Select(agents []*LoadAwareAgent, input string) *LoadAwareAgent {
var best *LoadAwareAgent
minConnections := int(^uint(0) >> 1) // MaxInt
for _, a := range agents {
connections := a.Agent.ActiveConnections()
if connections < minConnections && a.CurrentLoad < 0.9 {
minConnections = connections
best = a
}
}
return best
}
健康检查与自动剔除
type HealthChecker struct {
checkInterval time.Duration
agents map[string]*AgentHealth
}
type AgentHealth struct {
State HealthState
LastCheck time.Time
SuccessCount int
FailureCount int
ConsecutiveFailures int
}
type HealthState int
const (
HealthStateHealthy HealthState = iota
HealthStateDegraded
HealthStateUnhealthy
)
func (hc *HealthChecker) Start(ctx context.Context) {
ticker := time.NewTicker(hc.checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
hc.checkAll()
}
}
}
func (hc *HealthChecker) checkAll() {
for id, health := range hc.agents {
// 执行健康检查
err := hc.performCheck(id)
if err != nil {
health.FailureCount++
health.ConsecutiveFailures++
if health.ConsecutiveFailures >= 3 {
health.State = HealthStateUnhealthy
} else if health.ConsecutiveFailures >= 1 {
health.State = HealthStateDegraded
}
} else {
health.SuccessCount++
health.ConsecutiveFailures = 0
health.State = HealthStateHealthy
}
health.LastCheck = time.Now()
}
}
路由缓存与优化
多级缓存架构
type MultiLevelRouteCache struct {
l1 *LRUCache // L1:内存缓存,O(1) 访问
l2 *RedisCache // L2:分布式缓存,跨实例共享
l3 *PersistentCache // L3:持久化缓存,重启不丢失
}
func (c *MultiLevelRouteCache) Get(input string) (*agent.Agent, error) {
// L1 查找
if agent := c.l1.Get(input); agent != nil {
return agent, nil
}
// L2 查找
if agent := c.l2.Get(input); agent != nil {
c.l1.Set(input, agent, time.Minute) // 回填 L1
return agent, nil
}
// L3 查找
if agent := c.l3.Get(input); agent != nil {
c.l2.Set(input, agent, time.Hour) // 回填 L2
c.l1.Set(input, agent, time.Minute) // 回填 L1
return agent, nil
}
return nil, fmt.Errorf("cache miss")
}
func (c *MultiLevelRouteCache) Set(input string, agent *agent.Agent, ttl time.Duration) {
c.l1.Set(input, agent, ttl/10) // L1 缓存 1/10 TTL
c.l2.Set(input, agent, ttl) // L2 缓存完整 TTL
c.l3.Set(input, agent, ttl*24) // L3 缓存 24 倍 TTL
}
缓存预热
func (r *Router) PreheatCache(ctx context.Context, sampleInputs []string) error {
for _, input := range sampleInputs {
agent, err := r.Route(ctx, input)
if err != nil {
log.Printf("Preheating failed for '%s': %v", input, err)
continue
}
// 预缓存结果
r.cache.Set(input, agent, 24*time.Hour)
}
return nil
}
实战场景:多功能智能助手
完整实现
package main
import (
"context"
"fmt"
"log"
"regexp"
"strings"
"time"
"github.com/google/adk-go/agent"
"github.com/google/adk-go/team"
"github.com/google/adk-go/tool"
)
// IntelligentAssistant 多功能智能助手
type IntelligentAssistant struct {
router *team.HybridRouter
}
func NewIntelligentAssistant(model agent.Model) (*IntelligentAssistant, error) {
// 1. 创建各个领域的 Agent
// 天气 Agent
weatherAgent, err := agent.New(agent.Config{
Name: "weather-expert",
Model: model,
Instruction: `你是天气专家。提供准确的天气预报、穿衣建议、出行提醒。`,
Tools: []tool.Tool{
tool.NewWeatherQueryTool(),
tool.NewAirQualityTool(),
},
Timeout: 10 * time.Second,
})
if err != nil {
return nil, err
}
// 订单 Agent
orderAgent, err := agent.New(agent.Config{
Name: "order-expert",
Model: model,
Instruction: `你是订单处理专家。帮助用户查询、修改、取消订单。`,
Tools: []tool.Tool{
tool.NewOrderQueryTool(),
tool.NewOrderModifyTool(),
},
Timeout: 15 * time.Second,
})
if err != nil {
return nil, err
}
// 物流 Agent
logisticsAgent, err := agent.New(agent.Config{
Name: "logistics-expert",
Model: model,
Instruction: `你是物流查询专家。追踪包裹、预估送达时间。`,
Tools: []tool.Tool{
tool.NewPackageTrackingTool(),
},
Timeout: 10 * time.Second,
})
if err != nil {
return nil, err
}
// 写作 Agent
writingAgent, err := agent.New(agent.Config{
Name: "writing-expert",
Model: model,
Instruction: `你是写作助手。帮助用户撰写、润色各类文案。`,
Timeout: 20 * time.Second,
})
if err != nil {
return nil, err
}
// 通用 Agent
generalAgent, err := agent.New(agent.Config{
Name: "general-assistant",
Model: model,
Instruction: `你是通用助手。回答用户的各类问题,保持友好和专业。`,
Timeout: 15 * time.Second,
})
if err != nil {
return nil, err
}
// 2. 构建规则路由
ruleRouter := team.NewRuleRouter()
// 天气规则(高优先级)
ruleRouter.AddRule(team.RoutingRule{
Name: "weather-rule",
Priority: 100,
Matcher: team.NewKeywordMatcher([]string{
"天气", "温度", "下雨", "下雪", "空气质量", "穿衣", "带伞",
}, team.MatchModeAny, false),
TargetAgent: weatherAgent,
})
// 订单规则
ruleRouter.AddRule(team.RoutingRule{
Name: "order-rule",
Priority: 90,
Matcher: team.NewCompositeMatcher(team.LogicModeOR,
team.NewKeywordMatcher([]string{"订单", "购买", "付款", "退款", "取消"}, team.MatchModeAny, false),
team.NewRegexMatcher([]*regexp.Regexp{
regexp.MustCompile(`订单号[::]?\s*\d+`),
}, team.LogicModeOR),
),
TargetAgent: orderAgent,
})
// 物流规则
ruleRouter.AddRule(team.RoutingRule{
Name: "logistics-rule",
Priority: 90,
Matcher: team.NewKeywordMatcher([]string{
"快递", "物流", "包裹", "发货", "送到", "在哪",
}, team.MatchModeAny, false),
TargetAgent: logisticsAgent,
})
// 写作规则
ruleRouter.AddRule(team.RoutingRule{
Name: "writing-rule",
Priority: 80,
Matcher: team.NewKeywordMatcher([]string{
"写", "创作", "润色", "修改", "文案", "文章", "邮件",
}, team.MatchModeAny, false),
TargetAgent: writingAgent,
})
// 3. 构建 LLM 路由(用于模糊匹配)
llmRouter := team.NewLLMRouter(model, map[string]*team.AgentDescriptor{
"weather-expert": {
Agent: weatherAgent,
Description: "处理天气相关查询",
Capabilities: []string{"天气预报", "空气质量", "穿衣建议"},
Examples: []string{"今天天气怎么样", "明天需要带伞吗"},
},
"order-expert": {
Agent: orderAgent,
Description: "处理订单相关操作",
Capabilities: []string{"订单查询", "订单修改", "退款处理"},
Examples: []string{"查一下我的订单", "我想取消订单"},
},
// ... 其他 Agent
})
// 4. 构建混合路由
hybridRouter := team.NewHybridRouter(
team.WithRuleRouter(ruleRouter),
team.WithLLMRouter(llmRouter),
team.WithFallbackAgent(generalAgent),
team.WithRuleThreshold(0.8),
team.WithLLMThreshold(0.7),
team.WithCache(team.NewMultiLevelRouteCache()),
)
return &IntelligentAssistant{router: hybridRouter}, nil
}
func (a *IntelligentAssistant) Handle(ctx context.Context, userInput string) (string, error) {
// 路由到合适的 Agent
selectedAgent, err := a.router.Route(ctx, userInput)
if err != nil {
return "", fmt.Errorf("routing failed: %w", err)
}
log.Printf("[Router] Input routed to %s", selectedAgent.Name())
// 执行 Agent
result, err := selectedAgent.Run(ctx, userInput)
if err != nil {
return "", fmt.Errorf("agent execution failed: %w", err)
}
return result, nil
}
func main() {
ctx := context.Background()
assistant, err := NewIntelligentAssistant(model)
if err != nil {
log.Fatalf("Failed to create assistant: %v", err)
}
// 测试不同场景
testCases := []string{
"今天北京天气怎么样",
"查一下我的订单 12345",
"我的快递到哪了",
"帮我写一封辞职信",
"什么是量子计算",
}
for _, input := range testCases {
response, err := assistant.Handle(ctx, input)
if err != nil {
log.Printf("Error: %v", err)
continue
}
fmt.Printf("\n用户: %s\n助手: %s\n", input, response)
}
}
路由决策的可观测性
追踪与审计
type RoutingDecision struct {
RequestID string
Timestamp time.Time
Input string
InputHash string
SelectedAgent string
RouteType string // "rule" | "llm" | "fallback"
Confidence float64
LatencyMs int64
RulesChecked []string
CacheHit bool
}
func (r *Router) logDecision(decision *RoutingDecision) {
// 记录到日志
logData, _ := json.Marshal(decision)
log.Printf("[RoutingDecision] %s", logData)
// 记录到指标
r.metrics.RoutingLatency.WithLabelValues(decision.RouteType).Observe(float64(decision.LatencyMs))
r.metrics.RoutingCounter.WithLabelValues(decision.SelectedAgent, decision.RouteType).Inc()
if decision.CacheHit {
r.metrics.CacheHitCounter.Inc()
} else {
r.metrics.CacheMissCounter.Inc()
}
}
常见问题深度解析
Q:规则路由和 LLM 路由如何选择?
A:决策矩阵:
| 维度 | 规则路由 | LLM 路由 |
|---|---|---|
| 延迟 | <1ms | 500ms-2s |
| 准确性 | 高(确定性) | 高(语义理解) |
| 成本 | 低 | 高(每次 LLM 调用) |
| 维护 | 需更新规则 | 自动适应 |
| 适用 | 明确分类 | 模糊语义 |
生产建议:混合使用,规则路由处理 80% 的明确请求,LLM 路由处理 20% 的模糊请求。
Q:路由缓存多久失效?
A:多级失效策略:
- L1 内存:1-5 分钟,基于访问频率
- L2 Redis:1 小时,基于时间
- L3 持久化:24 小时,基于 LRU
对于动态内容(如"今天天气"),禁用缓存或使用极短 TTL。
Q:如何处理路由错误(选错 Agent)?
A:三层防护:
- 置信度阈值:低置信度时返回通用 Agent
- 用户确认:对模糊请求,询问用户意图
- 自动纠错:Agent 处理失败时,自动尝试其他 Agent
func (r *Router) RouteWithFallback(ctx context.Context, input string) (*agent.Agent, error) {
agent, confidence, err := r.route(ctx, input)
if err != nil {
return r.fallbackAgent, nil
}
if confidence < 0.5 {
// 低置信度,可能需要用户确认
return r.clarificationAgent, nil
}
return agent, nil
}
下一步
Agent 路由的动态分发与智能调度已深入掌握。模块 5 全部完成,接下来进入模块 6:流式交互——Streaming 原理、Event 处理与实时通信。
← Custom Workflow | Streaming 原理 →
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
