Consuming:消费外部 Agent
详解如何在 Go Agent 中调用其他语言暴露的 Agent——通过 A2A Client 接入外部 Agent。
Consuming:消费外部 Agent
消费外部 Agent 是多 Agent 系统的核心能力。与调用普通 HTTP API 不同,消费 A2A Agent 需要处理异步任务、流式响应、错误重试、超时控制等复杂语义。一个健壮的 A2A Client 不仅要能发送请求,还要能管理任务生命周期、处理网络分区、实现熔断降级。
本文将深入讲解如何构建生产级的 A2A 消费端,从基础调用到高级模式。
创建 Client:连接管理的基础
基础 Client 配置
package main
import (
"context"
"crypto/tls"
"fmt"
"log"
"net"
"net/http"
"time"
"google.golang.org/adk/a2a/client"
)
func createClient() (*a2aclient.Client, error) {
ctx := context.Background()
// 自定义 HTTP Transport(生产级配置)
transport := &http.Transport{
// 连接池配置
MaxIdleConns: 100, // 最大空闲连接数
MaxIdleConnsPerHost: 10, // 每个主机的最大空闲连接
MaxConnsPerHost: 50, // 每个主机的最大连接数
IdleConnTimeout: 90 * time.Second, // 空闲连接超时
// TLS 配置
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
// 生产环境:验证服务器证书
InsecureSkipVerify: false,
},
// 连接超时
DialContext: (&net.Dialer{
Timeout: 5 * time.Second, // 连接超时
KeepAlive: 30 * time.Second, // TCP KeepAlive
}).DialContext,
// 响应头超时
ResponseHeaderTimeout: 10 * time.Second,
// 期望 100-continue 超时
ExpectContinueTimeout: 1 * time.Second,
// 强制使用 HTTP/2
ForceAttemptHTTP2: true,
}
client, err := a2aclient.New(ctx,
a2aclient.WithURL("https://python-agent.example.com/a2a"),
a2aclient.WithHTTPClient(&http.Client{
Transport: transport,
Timeout: 30 * time.Second, // 总请求超时
}),
a2aclient.WithAPIKey("production-api-key"),
a2aclient.WithRetry(3, time.Second), // 失败重试 3 次,间隔 1s
a2aclient.WithRequestTimeout(30*time.Second),
)
if err != nil {
return nil, fmt.Errorf("failed to create client: %w", err)
}
return client, nil
}
连接池优化
A2A Client 的连接池配置直接影响性能:
// 高并发场景下的 Transport 配置
transport := &http.Transport{
// 大量并发时,增加连接池大小
MaxIdleConns: 500,
MaxIdleConnsPerHost: 100,
MaxConnsPerHost: 200,
// 长连接保持时间(A2A 适合长连接复用)
IdleConnTimeout: 5 * time.Minute,
// 禁用压缩(如果 Agent 返回大量文本,压缩反而增加 CPU)
DisableCompression: false,
// TCP 连接复用
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
// 使用双栈(IPv4 + IPv6)
DualStack: true,
}).DialContext,
}
调用外部 Agent:任务生命周期管理
同步调用(简单场景)
func callAgentSync(ctx context.Context, client *a2aclient.Client, input string) (string, error) {
// 创建任务
task, err := client.SendTask(ctx, &a2a.Task{
Input: map[string]interface{}{
"skill": "weather-query",
"query": input,
},
})
if err != nil {
return "", fmt.Errorf("failed to send task: %w", err)
}
log.Printf("task created: %s (status: %s)", task.ID, task.Status)
// 轮询等待完成
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
timeout := time.After(30 * time.Second)
for {
select {
case <-ticker.C:
task, err = client.GetTask(ctx, task.ID)
if err != nil {
return "", fmt.Errorf("failed to get task: %w", err)
}
log.Printf("task status: %s", task.Status)
switch task.Status {
case a2a.TaskStatusCompleted:
output, _ := task.Output["result"].(string)
return output, nil
case a2a.TaskStatusFailed:
errMsg, _ := task.Output["error"].(string)
return "", fmt.Errorf("task failed: %s", errMsg)
case a2a.TaskStatusCancelled:
return "", fmt.Errorf("task was cancelled")
}
case <-timeout:
// 超时取消任务
if err := client.CancelTask(ctx, task.ID); err != nil {
log.Printf("failed to cancel task: %v", err)
}
return "", fmt.Errorf("task timeout")
case <-ctx.Done():
return "", ctx.Err()
}
}
}
异步调用(生产推荐)
type AsyncResult struct {
TaskID string
Result string
Error error
}
func callAgentAsync(ctx context.Context, client *a2aclient.Client, input string) (<-chan AsyncResult, error) {
// 创建任务
task, err := client.SendTask(ctx, &a2a.Task{
Input: map[string]interface{}{
"skill": "data-analysis",
"query": input,
},
})
if err != nil {
return nil, err
}
resultCh := make(chan AsyncResult, 1)
// 后台轮询
go func() {
defer close(resultCh)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
timeout := time.After(5 * time.Minute)
for {
select {
case <-ticker.C:
currentTask, err := client.GetTask(ctx, task.ID)
if err != nil {
resultCh <- AsyncResult{TaskID: task.ID, Error: err}
return
}
switch currentTask.Status {
case a2a.TaskStatusCompleted:
output, _ := currentTask.Output["result"].(string)
resultCh <- AsyncResult{TaskID: task.ID, Result: output}
return
case a2a.TaskStatusFailed:
errMsg, _ := currentTask.Output["error"].(string)
resultCh <- AsyncResult{
TaskID: task.ID,
Error: fmt.Errorf("task failed: %s", errMsg),
}
return
case a2a.TaskStatusCancelled:
resultCh <- AsyncResult{
TaskID: task.ID,
Error: fmt.Errorf("task cancelled"),
}
return
}
case <-timeout:
client.CancelTask(ctx, task.ID)
resultCh <- AsyncResult{
TaskID: task.ID,
Error: fmt.Errorf("task timeout after 5m"),
}
return
case <-ctx.Done():
client.CancelTask(ctx, task.ID)
resultCh <- AsyncResult{TaskID: task.ID, Error: ctx.Err()}
return
}
}
}()
return resultCh, nil
}
// 使用示例
func main() {
client, _ := createClient()
resultCh, err := callAgentAsync(ctx, client, "分析 Q3 销售数据")
if err != nil {
log.Fatal(err)
}
// 继续处理其他事情...
// 等待结果
result := <-resultCh
if result.Error != nil {
log.Printf("task %s failed: %v", result.TaskID, result.Error)
return
}
log.Printf("task %s completed: %s", result.TaskID, result.Result)
}
流式调用(实时响应)
func callAgentStreaming(ctx context.Context, client *a2aclient.Client, input string) error {
// 创建流式任务
stream, err := client.SendTaskStreaming(ctx, &a2a.Task{
Input: map[string]interface{}{
"skill": "code-generation",
"query": input,
},
})
if err != nil {
return fmt.Errorf("failed to start streaming: %w", err)
}
defer stream.Close()
// 实时接收流式响应
for {
select {
case update, ok := <-stream.Updates():
if !ok {
log.Println("stream closed")
return nil
}
switch update.Type {
case a2a.StreamUpdateTypeStatus:
log.Printf("status: %s", update.Status)
case a2a.StreamUpdateTypeOutput:
// 实时输出(如代码片段)
chunk, _ := update.Data["chunk"].(string)
fmt.Print(chunk) // 实时打印
case a2a.StreamUpdateTypeArtifact:
// 完整产物(如生成的文件)
artifact := update.Artifact
log.Printf("artifact received: %s (%d bytes)", artifact.Name, len(artifact.Data))
case a2a.StreamUpdateTypeError:
log.Printf("stream error: %s", update.Error)
return fmt.Errorf("stream error: %s", update.Error)
}
case <-ctx.Done():
return ctx.Err()
}
}
}
封装为 Tool:无缝接入本地 Agent
将外部 A2A Agent 封装为本地 Tool,可以让 orchestrator Agent 像调用本地函数一样调用远程 Agent:
package tools
import (
"context"
"encoding/json"
"fmt"
"time"
"google.golang.org/adk/a2a/client"
"google.golang.org/adk/tool"
)
// ExternalAgentTool 将远程 A2A Agent 封装为本地 Tool
type ExternalAgentTool struct {
name string
description string
client *a2aclient.Client
skill string
timeout time.Duration
maxRetries int
}
// NewExternalAgentTool 创建外部 Agent Tool
func NewExternalAgentTool(config ExternalAgentConfig) (*ExternalAgentTool, error) {
ctx := context.Background()
client, err := a2aclient.New(ctx,
a2aclient.WithURL(config.URL),
a2aclient.WithAPIKey(config.APIKey),
a2aclient.WithTimeout(config.Timeout),
a2aclient.WithRetry(config.MaxRetries, time.Second),
)
if err != nil {
return nil, fmt.Errorf("failed to create client: %w", err)
}
return &ExternalAgentTool{
name: config.Name,
description: config.Description,
client: client,
skill: config.Skill,
timeout: config.Timeout,
maxRetries: config.MaxRetries,
}, nil
}
type ExternalAgentConfig struct {
Name string
Description string
URL string
APIKey string
Skill string
Timeout time.Duration
MaxRetries int
}
func (t *ExternalAgentTool) Name() string {
return t.name
}
func (t *ExternalAgentTool) Description() string {
return t.description
}
func (t *ExternalAgentTool) Schema() tool.Schema {
return tool.Schema{
Type: "object",
Properties: map[string]tool.Property{
"query": {
Type: "string",
Description: "要发送给外部 Agent 的查询",
},
},
Required: []string{"query"},
}
}
func (t *ExternalAgentTool) Call(ctx context.Context, input string) (string, error) {
// 解析输入
var params struct {
Query string `json:"query"`
}
if err := json.Unmarshal([]byte(input), ¶ms); err != nil {
return "", fmt.Errorf("invalid input: %w", err)
}
// 创建带超时的上下文
callCtx, cancel := context.WithTimeout(ctx, t.timeout)
defer cancel()
// 调用外部 Agent
task, err := t.client.SendTask(callCtx, &a2a.Task{
Input: map[string]interface{}{
"skill": t.skill,
"query": params.Query,
},
})
if err != nil {
return "", fmt.Errorf("failed to send task: %w", err)
}
// 轮询等待结果
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
currentTask, err := t.client.GetTask(callCtx, task.ID)
if err != nil {
return "", fmt.Errorf("failed to get task: %w", err)
}
switch currentTask.Status {
case a2a.TaskStatusCompleted:
result, _ := currentTask.Output["result"].(string)
return result, nil
case a2a.TaskStatusFailed:
errMsg, _ := currentTask.Output["error"].(string)
return "", fmt.Errorf("external agent failed: %s", errMsg)
case a2a.TaskStatusCancelled:
return "", fmt.Errorf("task was cancelled")
}
case <-callCtx.Done():
// 超时取消任务
t.client.CancelTask(ctx, task.ID)
return "", fmt.Errorf("external agent call timeout")
}
}
}
// 使用示例
func setupOrchestratorAgent() (*llmagent.Agent, error) {
ctx := context.Background()
// 创建外部 Agent Tools
weatherTool, err := NewExternalAgentTool(ExternalAgentConfig{
Name: "weather_query",
Description: "查询指定城市的天气信息",
URL: "https://weather-agent.example.com/a2a",
APIKey: "weather-api-key",
Skill: "current-weather",
Timeout: 10 * time.Second,
MaxRetries: 2,
})
if err != nil {
return nil, err
}
dataTool, err := NewExternalAgentTool(ExternalAgentConfig{
Name: "data_analysis",
Description: "分析数据集并返回统计摘要",
URL: "https://data-agent.example.com/a2a",
APIKey: "data-api-key",
Skill: "csv-analysis",
Timeout: 60 * time.Second,
MaxRetries: 3,
})
if err != nil {
return nil, err
}
// 创建 orchestrator Agent
agent, err := llmagent.New(llmagent.Config{
Name: "orchestrator",
Model: model,
Instruction: `你是一个任务调度专家。根据用户需求,调用合适的工具完成任务。
可用工具:
- weather_query: 查询天气
- data_analysis: 数据分析
如果用户请求涉及多个步骤,请依次调用工具。`,
Tools: []tool.Tool{weatherTool, dataTool},
})
if err != nil {
return nil, err
}
return agent, nil
}
高级模式:Agent 编排
并行调用多个 Agent
func parallelAgentCalls(ctx context.Context, inputs map[string]string) (map[string]string, error) {
type agentCall struct {
name string
client *a2aclient.Client
input string
}
calls := []agentCall{
{"weather", weatherClient, inputs["weather"]},
{"news", newsClient, inputs["news"]},
{"stock", stockClient, inputs["stock"]},
}
results := make(map[string]string)
errCh := make(chan error, len(calls))
resultCh := make(chan struct {
name string
result string
err error
}, len(calls))
// 并行调用
for _, call := range calls {
go func(c agentCall) {
task, err := c.client.SendTask(ctx, &a2a.Task{
Input: map[string]interface{}{
"query": c.input,
},
})
if err != nil {
resultCh <- struct {
name string
result string
err error
}{c.name, "", err}
return
}
// 等待完成(简化版,实际应轮询)
result, err := waitForTask(ctx, c.client, task.ID)
resultCh <- struct {
name string
result string
err error
}{c.name, result, err}
}(call)
}
// 收集结果
var errs []error
for i := 0; i < len(calls); i++ {
r := <-resultCh
if r.err != nil {
errs = append(errs, fmt.Errorf("%s: %w", r.name, r.err))
continue
}
results[r.name] = r.result
}
if len(errs) > 0 {
return results, fmt.Errorf("partial failures: %v", errs)
}
return results, nil
}
流水线调用(链式处理)
func pipelineAgentCalls(ctx context.Context, initialInput string) (string, error) {
// 步骤 1:数据获取
step1Result, err := callAgent(ctx, dataClient, "fetch-data", initialInput)
if err != nil {
return "", fmt.Errorf("step 1 failed: %w", err)
}
// 步骤 2:数据处理
step2Result, err := callAgent(ctx, processClient, "process-data", step1Result)
if err != nil {
return "", fmt.Errorf("step 2 failed: %w", err)
}
// 步骤 3:结果格式化
step3Result, err := callAgent(ctx, formatClient, "format-result", step2Result)
if err != nil {
return "", fmt.Errorf("step 3 failed: %w", err)
}
return step3Result, nil
}
错误处理与重试策略
智能重试
type RetryPolicy struct {
MaxRetries int
BaseDelay time.Duration
MaxDelay time.Duration
Multiplier float64
RetryableErrors []string // 可重试的错误类型
}
func (p *RetryPolicy) Execute(ctx context.Context, operation func() error) error {
var lastErr error
for attempt := 0; attempt <= p.MaxRetries; attempt++ {
if attempt > 0 {
// 计算退避时间
delay := p.BaseDelay * time.Duration(math.Pow(p.Multiplier, float64(attempt-1)))
if delay > p.MaxDelay {
delay = p.MaxDelay
}
// 添加抖动,防止惊群
jitter := time.Duration(rand.Int63n(int64(delay) / 2))
delay = delay + jitter
log.Printf("retry attempt %d/%d after %v", attempt, p.MaxRetries, delay)
select {
case <-time.After(delay):
case <-ctx.Done():
return ctx.Err()
}
}
err := operation()
if err == nil {
return nil
}
lastErr = err
// 检查是否可重试
if !p.isRetryable(err) {
return err
}
log.Printf("attempt %d failed: %v", attempt+1, err)
}
return fmt.Errorf("max retries exceeded: %w", lastErr)
}
func (p *RetryPolicy) isRetryable(err error) bool {
errStr := err.Error()
for _, retryable := range p.RetryableErrors {
if strings.Contains(errStr, retryable) {
return true
}
}
return false
}
// 使用示例
policy := RetryPolicy{
MaxRetries: 3,
BaseDelay: time.Second,
MaxDelay: 30 * time.Second,
Multiplier: 2.0,
RetryableErrors: []string{
"connection refused",
"timeout",
"temporary",
"rate limit",
"service unavailable",
},
}
err := policy.Execute(ctx, func() error {
_, err := client.SendTask(ctx, task)
return err
})
常见问题深度解析
Q:外部 Agent 无响应怎么办?
根本原因:
- 网络分区或延迟
- 外部 Agent 过载
- 外部 Agent 崩溃
- 请求本身导致死循环或长执行
解决方案:
// 1. 配置合理的超时
client, _ := a2aclient.New(ctx,
a2aclient.WithTimeout(30*time.Second), // 单次请求超时
a2aclient.WithRequestTimeout(5*time.Minute), // 任务总超时
)
// 2. 实现熔断
cbClient := NewCircuitBreakerClient(client)
// 3. 优雅降级
func callWithFallback(ctx context.Context, client *a2aclient.Client, input string) (string, error) {
result, err := client.Call(ctx, input)
if err != nil {
log.Printf("primary agent failed: %v", err)
// 尝试备用 Agent
result, err = fallbackClient.Call(ctx, input)
if err != nil {
// 返回缓存结果
return getCachedResult(input), nil
}
}
return result, nil
}
// 4. 异步处理 + 回调
func callAsyncWithCallback(ctx context.Context, client *a2aclient.Client, input string, callbackURL string) error {
_, err := client.SendTask(ctx, &a2a.Task{
Input: map[string]interface{}{
"query": input,
"callback_url": callbackURL,
},
})
return err
}
Q:多个外部 Agent 怎么管理?
Agent Registry 模式:
type AgentRegistry struct {
agents map[string]*AgentConnection
mu sync.RWMutex
}
type AgentConnection struct {
Name string
Client *a2aclient.Client
Health HealthStatus
LastUsed time.Time
CallCount int64
ErrorCount int64
AvgLatency time.Duration
}
func (r *AgentRegistry) GetHealthyAgent(skill string) (*AgentConnection, error) {
r.mu.RLock()
defer r.mu.RUnlock()
var candidates []*AgentConnection
for _, agent := range r.agents {
if agent.HasSkill(skill) && agent.Health == HealthHealthy {
candidates = append(candidates, agent)
}
}
if len(candidates) == 0 {
return nil, fmt.Errorf("no healthy agent found for skill: %s", skill)
}
// 选择延迟最低的
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].AvgLatency < candidates[j].AvgLatency
})
return candidates[0], nil
}
func (r *AgentRegistry) HealthCheck(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.mu.RLock()
agents := make([]*AgentConnection, 0, len(r.agents))
for _, agent := range r.agents {
agents = append(agents, agent)
}
r.mu.RUnlock()
for _, agent := range agents {
go func(a *AgentConnection) {
start := time.Now()
_, err := a.Client.GetAgentCard(ctx)
latency := time.Since(start)
r.mu.Lock()
defer r.mu.Unlock()
a.LastUsed = time.Now()
a.AvgLatency = (a.AvgLatency + latency) / 2
if err != nil {
a.ErrorCount++
if a.ErrorCount > 5 {
a.Health = HealthUnhealthy
}
} else {
a.ErrorCount = 0
a.Health = HealthHealthy
}
}(agent)
}
case <-ctx.Done():
return
}
}
}
下一步
消费搞定了,接下来看跨语言协作——Python + Go 的实战案例。
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
