Consuming:消费外部 Agent

消费外部 Agent 是多 Agent 系统的核心能力。与调用普通 HTTP API 不同,消费 A2A Agent 需要处理异步任务、流式响应、错误重试、超时控制等复杂语义。一个健壮的 A2A Client 不仅要能发送请求,还要能管理任务生命周期、处理网络分区、实现熔断降级。

本文将深入讲解如何构建生产级的 A2A 消费端,从基础调用到高级模式。


创建 Client:连接管理的基础

基础 Client 配置

package main

import (
    "context"
    "crypto/tls"
    "fmt"
    "log"
    "net"
    "net/http"
    "time"
    
    "google.golang.org/adk/a2a/client"
)

func createClient() (*a2aclient.Client, error) {
    ctx := context.Background()
    
    // 自定义 HTTP Transport(生产级配置)
    transport := &http.Transport{
        // 连接池配置
        MaxIdleConns:        100,              // 最大空闲连接数
        MaxIdleConnsPerHost: 10,               // 每个主机的最大空闲连接
        MaxConnsPerHost:     50,               // 每个主机的最大连接数
        IdleConnTimeout:     90 * time.Second, // 空闲连接超时
        
        // TLS 配置
        TLSClientConfig: &tls.Config{
            MinVersion: tls.VersionTLS12,
            // 生产环境:验证服务器证书
            InsecureSkipVerify: false,
        },
        
        // 连接超时
        DialContext: (&net.Dialer{
            Timeout:   5 * time.Second,  // 连接超时
            KeepAlive: 30 * time.Second, // TCP KeepAlive
        }).DialContext,
        
        // 响应头超时
        ResponseHeaderTimeout: 10 * time.Second,
        
        // 期望 100-continue 超时
        ExpectContinueTimeout: 1 * time.Second,
        
        // 强制使用 HTTP/2
        ForceAttemptHTTP2: true,
    }
    
    client, err := a2aclient.New(ctx,
        a2aclient.WithURL("https://python-agent.example.com/a2a"),
        a2aclient.WithHTTPClient(&http.Client{
            Transport: transport,
            Timeout:   30 * time.Second, // 总请求超时
        }),
        a2aclient.WithAPIKey("production-api-key"),
        a2aclient.WithRetry(3, time.Second), // 失败重试 3 次,间隔 1s
        a2aclient.WithRequestTimeout(30*time.Second),
    )
    if err != nil {
        return nil, fmt.Errorf("failed to create client: %w", err)
    }
    
    return client, nil
}

连接池优化

A2A Client 的连接池配置直接影响性能:

// 高并发场景下的 Transport 配置
transport := &http.Transport{
    // 大量并发时,增加连接池大小
    MaxIdleConns:        500,
    MaxIdleConnsPerHost: 100,
    MaxConnsPerHost:     200,
    
    // 长连接保持时间(A2A 适合长连接复用)
    IdleConnTimeout:     5 * time.Minute,
    
    // 禁用压缩(如果 Agent 返回大量文本,压缩反而增加 CPU)
    DisableCompression: false,
    
    // TCP 连接复用
    DialContext: (&net.Dialer{
        Timeout:   5 * time.Second,
        KeepAlive: 30 * time.Second,
        // 使用双栈(IPv4 + IPv6)
        DualStack: true,
    }).DialContext,
}

调用外部 Agent:任务生命周期管理

同步调用(简单场景)

func callAgentSync(ctx context.Context, client *a2aclient.Client, input string) (string, error) {
    // 创建任务
    task, err := client.SendTask(ctx, &a2a.Task{
        Input: map[string]interface{}{
            "skill": "weather-query",
            "query": input,
        },
    })
    if err != nil {
        return "", fmt.Errorf("failed to send task: %w", err)
    }
    
    log.Printf("task created: %s (status: %s)", task.ID, task.Status)
    
    // 轮询等待完成
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    timeout := time.After(30 * time.Second)
    
    for {
        select {
        case <-ticker.C:
            task, err = client.GetTask(ctx, task.ID)
            if err != nil {
                return "", fmt.Errorf("failed to get task: %w", err)
            }
            
            log.Printf("task status: %s", task.Status)
            
            switch task.Status {
            case a2a.TaskStatusCompleted:
                output, _ := task.Output["result"].(string)
                return output, nil
            case a2a.TaskStatusFailed:
                errMsg, _ := task.Output["error"].(string)
                return "", fmt.Errorf("task failed: %s", errMsg)
            case a2a.TaskStatusCancelled:
                return "", fmt.Errorf("task was cancelled")
            }
            
        case <-timeout:
            // 超时取消任务
            if err := client.CancelTask(ctx, task.ID); err != nil {
                log.Printf("failed to cancel task: %v", err)
            }
            return "", fmt.Errorf("task timeout")
            
        case <-ctx.Done():
            return "", ctx.Err()
        }
    }
}

异步调用(生产推荐)

type AsyncResult struct {
    TaskID string
    Result string
    Error  error
}

func callAgentAsync(ctx context.Context, client *a2aclient.Client, input string) (<-chan AsyncResult, error) {
    // 创建任务
    task, err := client.SendTask(ctx, &a2a.Task{
        Input: map[string]interface{}{
            "skill": "data-analysis",
            "query": input,
        },
    })
    if err != nil {
        return nil, err
    }
    
    resultCh := make(chan AsyncResult, 1)
    
    // 后台轮询
    go func() {
        defer close(resultCh)
        
        ticker := time.NewTicker(2 * time.Second)
        defer ticker.Stop()
        
        timeout := time.After(5 * time.Minute)
        
        for {
            select {
            case <-ticker.C:
                currentTask, err := client.GetTask(ctx, task.ID)
                if err != nil {
                    resultCh <- AsyncResult{TaskID: task.ID, Error: err}
                    return
                }
                
                switch currentTask.Status {
                case a2a.TaskStatusCompleted:
                    output, _ := currentTask.Output["result"].(string)
                    resultCh <- AsyncResult{TaskID: task.ID, Result: output}
                    return
                case a2a.TaskStatusFailed:
                    errMsg, _ := currentTask.Output["error"].(string)
                    resultCh <- AsyncResult{
                        TaskID: task.ID,
                        Error:  fmt.Errorf("task failed: %s", errMsg),
                    }
                    return
                case a2a.TaskStatusCancelled:
                    resultCh <- AsyncResult{
                        TaskID: task.ID,
                        Error:  fmt.Errorf("task cancelled"),
                    }
                    return
                }
                
            case <-timeout:
                client.CancelTask(ctx, task.ID)
                resultCh <- AsyncResult{
                    TaskID: task.ID,
                    Error:  fmt.Errorf("task timeout after 5m"),
                }
                return
                
            case <-ctx.Done():
                client.CancelTask(ctx, task.ID)
                resultCh <- AsyncResult{TaskID: task.ID, Error: ctx.Err()}
                return
            }
        }
    }()
    
    return resultCh, nil
}

// 使用示例
func main() {
    client, _ := createClient()
    
    resultCh, err := callAgentAsync(ctx, client, "分析 Q3 销售数据")
    if err != nil {
        log.Fatal(err)
    }
    
    // 继续处理其他事情...
    
    // 等待结果
    result := <-resultCh
    if result.Error != nil {
        log.Printf("task %s failed: %v", result.TaskID, result.Error)
        return
    }
    
    log.Printf("task %s completed: %s", result.TaskID, result.Result)
}

流式调用(实时响应)

func callAgentStreaming(ctx context.Context, client *a2aclient.Client, input string) error {
    // 创建流式任务
    stream, err := client.SendTaskStreaming(ctx, &a2a.Task{
        Input: map[string]interface{}{
            "skill": "code-generation",
            "query": input,
        },
    })
    if err != nil {
        return fmt.Errorf("failed to start streaming: %w", err)
    }
    defer stream.Close()
    
    // 实时接收流式响应
    for {
        select {
        case update, ok := <-stream.Updates():
            if !ok {
                log.Println("stream closed")
                return nil
            }
            
            switch update.Type {
            case a2a.StreamUpdateTypeStatus:
                log.Printf("status: %s", update.Status)
                
            case a2a.StreamUpdateTypeOutput:
                // 实时输出(如代码片段)
                chunk, _ := update.Data["chunk"].(string)
                fmt.Print(chunk) // 实时打印
                
            case a2a.StreamUpdateTypeArtifact:
                // 完整产物(如生成的文件)
                artifact := update.Artifact
                log.Printf("artifact received: %s (%d bytes)", artifact.Name, len(artifact.Data))
                
            case a2a.StreamUpdateTypeError:
                log.Printf("stream error: %s", update.Error)
                return fmt.Errorf("stream error: %s", update.Error)
            }
            
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

封装为 Tool:无缝接入本地 Agent

将外部 A2A Agent 封装为本地 Tool,可以让 orchestrator Agent 像调用本地函数一样调用远程 Agent:

package tools

import (
    "context"
    "encoding/json"
    "fmt"
    "time"
    
    "google.golang.org/adk/a2a/client"
    "google.golang.org/adk/tool"
)

// ExternalAgentTool 将远程 A2A Agent 封装为本地 Tool
type ExternalAgentTool struct {
    name        string
    description string
    client      *a2aclient.Client
    skill       string
    timeout     time.Duration
    maxRetries  int
}

// NewExternalAgentTool 创建外部 Agent Tool
func NewExternalAgentTool(config ExternalAgentConfig) (*ExternalAgentTool, error) {
    ctx := context.Background()
    
    client, err := a2aclient.New(ctx,
        a2aclient.WithURL(config.URL),
        a2aclient.WithAPIKey(config.APIKey),
        a2aclient.WithTimeout(config.Timeout),
        a2aclient.WithRetry(config.MaxRetries, time.Second),
    )
    if err != nil {
        return nil, fmt.Errorf("failed to create client: %w", err)
    }
    
    return &ExternalAgentTool{
        name:        config.Name,
        description: config.Description,
        client:      client,
        skill:       config.Skill,
        timeout:     config.Timeout,
        maxRetries:  config.MaxRetries,
    }, nil
}

type ExternalAgentConfig struct {
    Name        string
    Description string
    URL         string
    APIKey      string
    Skill       string
    Timeout     time.Duration
    MaxRetries  int
}

func (t *ExternalAgentTool) Name() string {
    return t.name
}

func (t *ExternalAgentTool) Description() string {
    return t.description
}

func (t *ExternalAgentTool) Schema() tool.Schema {
    return tool.Schema{
        Type: "object",
        Properties: map[string]tool.Property{
            "query": {
                Type:        "string",
                Description: "要发送给外部 Agent 的查询",
            },
        },
        Required: []string{"query"},
    }
}

func (t *ExternalAgentTool) Call(ctx context.Context, input string) (string, error) {
    // 解析输入
    var params struct {
        Query string `json:"query"`
    }
    if err := json.Unmarshal([]byte(input), &params); err != nil {
        return "", fmt.Errorf("invalid input: %w", err)
    }
    
    // 创建带超时的上下文
    callCtx, cancel := context.WithTimeout(ctx, t.timeout)
    defer cancel()
    
    // 调用外部 Agent
    task, err := t.client.SendTask(callCtx, &a2a.Task{
        Input: map[string]interface{}{
            "skill": t.skill,
            "query": params.Query,
        },
    })
    if err != nil {
        return "", fmt.Errorf("failed to send task: %w", err)
    }
    
    // 轮询等待结果
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            currentTask, err := t.client.GetTask(callCtx, task.ID)
            if err != nil {
                return "", fmt.Errorf("failed to get task: %w", err)
            }
            
            switch currentTask.Status {
            case a2a.TaskStatusCompleted:
                result, _ := currentTask.Output["result"].(string)
                return result, nil
            case a2a.TaskStatusFailed:
                errMsg, _ := currentTask.Output["error"].(string)
                return "", fmt.Errorf("external agent failed: %s", errMsg)
            case a2a.TaskStatusCancelled:
                return "", fmt.Errorf("task was cancelled")
            }
            
        case <-callCtx.Done():
            // 超时取消任务
            t.client.CancelTask(ctx, task.ID)
            return "", fmt.Errorf("external agent call timeout")
        }
    }
}

// 使用示例
func setupOrchestratorAgent() (*llmagent.Agent, error) {
    ctx := context.Background()
    
    // 创建外部 Agent Tools
    weatherTool, err := NewExternalAgentTool(ExternalAgentConfig{
        Name:        "weather_query",
        Description: "查询指定城市的天气信息",
        URL:         "https://weather-agent.example.com/a2a",
        APIKey:      "weather-api-key",
        Skill:       "current-weather",
        Timeout:     10 * time.Second,
        MaxRetries:  2,
    })
    if err != nil {
        return nil, err
    }
    
    dataTool, err := NewExternalAgentTool(ExternalAgentConfig{
        Name:        "data_analysis",
        Description: "分析数据集并返回统计摘要",
        URL:         "https://data-agent.example.com/a2a",
        APIKey:      "data-api-key",
        Skill:       "csv-analysis",
        Timeout:     60 * time.Second,
        MaxRetries:  3,
    })
    if err != nil {
        return nil, err
    }
    
    // 创建 orchestrator Agent
    agent, err := llmagent.New(llmagent.Config{
        Name:        "orchestrator",
        Model:       model,
        Instruction: `你是一个任务调度专家。根据用户需求,调用合适的工具完成任务。
可用工具:
- weather_query: 查询天气
- data_analysis: 数据分析

如果用户请求涉及多个步骤,请依次调用工具。`,
        Tools: []tool.Tool{weatherTool, dataTool},
    })
    if err != nil {
        return nil, err
    }
    
    return agent, nil
}

高级模式:Agent 编排

并行调用多个 Agent

func parallelAgentCalls(ctx context.Context, inputs map[string]string) (map[string]string, error) {
    type agentCall struct {
        name   string
        client *a2aclient.Client
        input  string
    }
    
    calls := []agentCall{
        {"weather", weatherClient, inputs["weather"]},
        {"news", newsClient, inputs["news"]},
        {"stock", stockClient, inputs["stock"]},
    }
    
    results := make(map[string]string)
    errCh := make(chan error, len(calls))
    resultCh := make(chan struct {
        name   string
        result string
        err    error
    }, len(calls))
    
    // 并行调用
    for _, call := range calls {
        go func(c agentCall) {
            task, err := c.client.SendTask(ctx, &a2a.Task{
                Input: map[string]interface{}{
                    "query": c.input,
                },
            })
            if err != nil {
                resultCh <- struct {
                    name   string
                    result string
                    err    error
                }{c.name, "", err}
                return
            }
            
            // 等待完成(简化版,实际应轮询)
            result, err := waitForTask(ctx, c.client, task.ID)
            resultCh <- struct {
                name   string
                result string
                err    error
            }{c.name, result, err}
        }(call)
    }
    
    // 收集结果
    var errs []error
    for i := 0; i < len(calls); i++ {
        r := <-resultCh
        if r.err != nil {
            errs = append(errs, fmt.Errorf("%s: %w", r.name, r.err))
            continue
        }
        results[r.name] = r.result
    }
    
    if len(errs) > 0 {
        return results, fmt.Errorf("partial failures: %v", errs)
    }
    
    return results, nil
}

流水线调用(链式处理)

func pipelineAgentCalls(ctx context.Context, initialInput string) (string, error) {
    // 步骤 1:数据获取
    step1Result, err := callAgent(ctx, dataClient, "fetch-data", initialInput)
    if err != nil {
        return "", fmt.Errorf("step 1 failed: %w", err)
    }
    
    // 步骤 2:数据处理
    step2Result, err := callAgent(ctx, processClient, "process-data", step1Result)
    if err != nil {
        return "", fmt.Errorf("step 2 failed: %w", err)
    }
    
    // 步骤 3:结果格式化
    step3Result, err := callAgent(ctx, formatClient, "format-result", step2Result)
    if err != nil {
        return "", fmt.Errorf("step 3 failed: %w", err)
    }
    
    return step3Result, nil
}

错误处理与重试策略

智能重试

type RetryPolicy struct {
    MaxRetries  int
    BaseDelay   time.Duration
    MaxDelay    time.Duration
    Multiplier  float64
    RetryableErrors []string // 可重试的错误类型
}

func (p *RetryPolicy) Execute(ctx context.Context, operation func() error) error {
    var lastErr error
    
    for attempt := 0; attempt <= p.MaxRetries; attempt++ {
        if attempt > 0 {
            // 计算退避时间
            delay := p.BaseDelay * time.Duration(math.Pow(p.Multiplier, float64(attempt-1)))
            if delay > p.MaxDelay {
                delay = p.MaxDelay
            }
            
            // 添加抖动,防止惊群
            jitter := time.Duration(rand.Int63n(int64(delay) / 2))
            delay = delay + jitter
            
            log.Printf("retry attempt %d/%d after %v", attempt, p.MaxRetries, delay)
            
            select {
            case <-time.After(delay):
            case <-ctx.Done():
                return ctx.Err()
            }
        }
        
        err := operation()
        if err == nil {
            return nil
        }
        
        lastErr = err
        
        // 检查是否可重试
        if !p.isRetryable(err) {
            return err
        }
        
        log.Printf("attempt %d failed: %v", attempt+1, err)
    }
    
    return fmt.Errorf("max retries exceeded: %w", lastErr)
}

func (p *RetryPolicy) isRetryable(err error) bool {
    errStr := err.Error()
    for _, retryable := range p.RetryableErrors {
        if strings.Contains(errStr, retryable) {
            return true
        }
    }
    return false
}

// 使用示例
policy := RetryPolicy{
    MaxRetries:  3,
    BaseDelay:   time.Second,
    MaxDelay:    30 * time.Second,
    Multiplier:  2.0,
    RetryableErrors: []string{
        "connection refused",
        "timeout",
        "temporary",
        "rate limit",
        "service unavailable",
    },
}

err := policy.Execute(ctx, func() error {
    _, err := client.SendTask(ctx, task)
    return err
})

常见问题深度解析

Q:外部 Agent 无响应怎么办?

根本原因

  1. 网络分区或延迟
  2. 外部 Agent 过载
  3. 外部 Agent 崩溃
  4. 请求本身导致死循环或长执行

解决方案

// 1. 配置合理的超时
client, _ := a2aclient.New(ctx,
    a2aclient.WithTimeout(30*time.Second),        // 单次请求超时
    a2aclient.WithRequestTimeout(5*time.Minute),  // 任务总超时
)

// 2. 实现熔断
cbClient := NewCircuitBreakerClient(client)

// 3. 优雅降级
func callWithFallback(ctx context.Context, client *a2aclient.Client, input string) (string, error) {
    result, err := client.Call(ctx, input)
    if err != nil {
        log.Printf("primary agent failed: %v", err)
        
        // 尝试备用 Agent
        result, err = fallbackClient.Call(ctx, input)
        if err != nil {
            // 返回缓存结果
            return getCachedResult(input), nil
        }
    }
    return result, nil
}

// 4. 异步处理 + 回调
func callAsyncWithCallback(ctx context.Context, client *a2aclient.Client, input string, callbackURL string) error {
    _, err := client.SendTask(ctx, &a2a.Task{
        Input: map[string]interface{}{
            "query":        input,
            "callback_url": callbackURL,
        },
    })
    return err
}

Q:多个外部 Agent 怎么管理?

Agent Registry 模式

type AgentRegistry struct {
    agents map[string]*AgentConnection
    mu     sync.RWMutex
}

type AgentConnection struct {
    Name        string
    Client      *a2aclient.Client
    Health      HealthStatus
    LastUsed    time.Time
    CallCount   int64
    ErrorCount  int64
    AvgLatency  time.Duration
}

func (r *AgentRegistry) GetHealthyAgent(skill string) (*AgentConnection, error) {
    r.mu.RLock()
    defer r.mu.RUnlock()
    
    var candidates []*AgentConnection
    for _, agent := range r.agents {
        if agent.HasSkill(skill) && agent.Health == HealthHealthy {
            candidates = append(candidates, agent)
        }
    }
    
    if len(candidates) == 0 {
        return nil, fmt.Errorf("no healthy agent found for skill: %s", skill)
    }
    
    // 选择延迟最低的
    sort.Slice(candidates, func(i, j int) bool {
        return candidates[i].AvgLatency < candidates[j].AvgLatency
    })
    
    return candidates[0], nil
}

func (r *AgentRegistry) HealthCheck(ctx context.Context) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            r.mu.RLock()
            agents := make([]*AgentConnection, 0, len(r.agents))
            for _, agent := range r.agents {
                agents = append(agents, agent)
            }
            r.mu.RUnlock()
            
            for _, agent := range agents {
                go func(a *AgentConnection) {
                    start := time.Now()
                    _, err := a.Client.GetAgentCard(ctx)
                    latency := time.Since(start)
                    
                    r.mu.Lock()
                    defer r.mu.Unlock()
                    
                    a.LastUsed = time.Now()
                    a.AvgLatency = (a.AvgLatency + latency) / 2
                    
                    if err != nil {
                        a.ErrorCount++
                        if a.ErrorCount > 5 {
                            a.Health = HealthUnhealthy
                        }
                    } else {
                        a.ErrorCount = 0
                        a.Health = HealthHealthy
                    }
                }(agent)
            }
        case <-ctx.Done():
            return
        }
    }
}

下一步

消费搞定了,接下来看跨语言协作——Python + Go 的实战案例。

Exposing | 跨语言协作实战 →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。