跨语言协作实战:Python + Go Agent 协作

在真实的 Agent 系统中,不同组件往往由不同团队使用不同技术栈开发。Python 在数据科学和机器学习领域有深厚积累,Go 在高并发和网络服务方面表现卓越。通过 A2A 协议,这两种语言的 Agent 可以无缝协作,发挥各自优势。

本文将通过一个完整的实战案例——智能数据分析平台,展示 Python Agent(数据处理)和 Go Agent(API 网关)如何通过 A2A 协议协作。


架构设计:分工明确的 Agent 网络

系统架构

用户请求
┌─────────────────────────────────────────────────────────────┐
│                  Go Agent(API 网关)                        │
│                  职责:请求路由、认证、限流                   │
│                  技术:Go + ADK + A2A Client                 │
└────────┬────────────────────────────┬───────────────────────┘
         │                            │
         ▼                            ▼
┌──────────────────┐          ┌──────────────────┐
│ Python Agent      │          │ Go Agent          │
│(数据处理专家)    │          │(报告生成器)      │
│ 职责:数据清洗、   │          │ 职责:格式化输出、  │
│       统计分析    │          │       文件导出      │
│ 技术:Python +    │          │ 技术:Go + ADK     │
│        Pandas +   │          │                    │
│        NumPy      │          │                    │
└────────┬──────────┘          └────────┬──────────┘
         │                              │
         ▼                              ▼
┌──────────────────┐          ┌──────────────────┐
│ 数据存储          │          │ 文件存储          │
│(PostgreSQL)     │          │(S3 / MinIO)     │
└──────────────────┘          └──────────────────┘

协作流程

1. 用户:"分析 sales_data.csv,生成季度报告"
2. Go Gateway Agent
   - 验证用户权限
   - 解析请求意图
   - 路由到 Python Data Agent
3. Python Data Agent
   - 从 S3 下载 sales_data.csv
   - 使用 Pandas 进行数据清洗
   - 计算季度统计指标
   - 将结果存储到 PostgreSQL
   - 返回处理结果摘要
4. Go Gateway Agent
   - 接收数据处理结果
   - 调用 Go Report Agent
5. Go Report Agent
   - 从 PostgreSQL 读取统计数据
   - 生成 PDF 报告
   - 上传到 S3
   - 返回报告下载链接
6. Go Gateway Agent
   - 汇总所有结果
   - 返回给用户

Python Agent:数据处理专家

项目结构

python-data-agent/
├── Dockerfile
├── requirements.txt
├── src/
│   ├── __init__.py
│   ├── agent.py          # Agent 核心逻辑
│   ├── data_processor.py  # 数据处理模块
│   ├── storage.py        # 存储接口
│   └── a2a_server.py     # A2A 服务端
├── config/
│   └── config.yaml
└── tests/
    └── test_processor.py

核心实现

# src/agent.py
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

@dataclass
class AnalysisResult:
    total_records: int
    date_range: tuple
    revenue_stats: Dict[str, float]
    top_products: List[Dict[str, Any]]
    monthly_trend: List[Dict[str, Any]]
    anomalies: List[Dict[str, Any]]
    raw_data_path: str

class DataProcessor:
    """数据处理核心类"""
    
    def __init__(self, storage_client):
        self.storage = storage_client
        self.required_columns = ['date', 'product', 'quantity', 'price', 'revenue']
    
    def process(self, file_path: str, analysis_type: str = 'full') -> AnalysisResult:
        """处理数据文件并返回分析结果"""
        
        # 1. 读取数据
        logger.info(f"Reading data from {file_path}")
        df = self._read_data(file_path)
        
        # 2. 数据清洗
        logger.info("Cleaning data")
        df = self._clean_data(df)
        
        # 3. 根据分析类型执行不同处理
        if analysis_type == 'full':
            result = self._full_analysis(df)
        elif analysis_type == 'summary':
            result = self._summary_analysis(df)
        else:
            raise ValueError(f"Unknown analysis type: {analysis_type}")
        
        # 4. 存储原始数据到数据库
        db_path = self.storage.save_dataframe(df, f"processed_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
        result.raw_data_path = db_path
        
        return result
    
    def _read_data(self, file_path: str) -> pd.DataFrame:
        """读取 CSV/Excel 文件"""
        if file_path.endswith('.csv'):
            df = pd.read_csv(file_path)
        elif file_path.endswith(('.xlsx', '.xls')):
            df = pd.read_excel(file_path)
        else:
            raise ValueError(f"Unsupported file format: {file_path}")
        
        # 验证必需列
        missing = set(self.required_columns) - set(df.columns)
        if missing:
            raise ValueError(f"Missing required columns: {missing}")
        
        return df
    
    def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """数据清洗"""
        # 去除重复行
        df = df.drop_duplicates()
        
        # 处理缺失值
        df = df.dropna(subset=['date', 'product', 'revenue'])
        
        # 转换日期格式
        df['date'] = pd.to_datetime(df['date'])
        
        # 去除异常值(revenue < 0 或 > 99.9 分位数)
        q99 = df['revenue'].quantile(0.999)
        df = df[(df['revenue'] >= 0) & (df['revenue'] <= q99)]
        
        # 排序
        df = df.sort_values('date')
        
        logger.info(f"Cleaned data: {len(df)} records")
        return df
    
    def _full_analysis(self, df: pd.DataFrame) -> AnalysisResult:
        """完整分析"""
        
        # 基础统计
        total_records = len(df)
        date_range = (df['date'].min().isoformat(), df['date'].max().isoformat())
        
        # 收入统计
        revenue_stats = {
            'total': float(df['revenue'].sum()),
            'mean': float(df['revenue'].mean()),
            'median': float(df['revenue'].median()),
            'std': float(df['revenue'].std()),
            'min': float(df['revenue'].min()),
            'max': float(df['revenue'].max()),
        }
        
        # 热销产品 TOP 10
        top_products = df.groupby('product').agg({
            'revenue': 'sum',
            'quantity': 'sum'
        }).sort_values('revenue', ascending=False).head(10)
        
        top_products_list = [
            {
                'product': idx,
                'revenue': float(row['revenue']),
                'quantity': int(row['quantity'])
            }
            for idx, row in top_products.iterrows()
        ]
        
        # 月度趋势
        df['month'] = df['date'].dt.to_period('M')
        monthly = df.groupby('month')['revenue'].sum().reset_index()
        monthly_trend = [
            {
                'month': str(row['month']),
                'revenue': float(row['revenue'])
            }
            for _, row in monthly.iterrows()
        ]
        
        # 异常检测(使用 3-sigma 规则)
        mean = df['revenue'].mean()
        std = df['revenue'].std()
        anomalies = df[abs(df['revenue'] - mean) > 3 * std]
        anomalies_list = [
            {
                'date': row['date'].isoformat(),
                'product': row['product'],
                'revenue': float(row['revenue']),
                'deviation': float(abs(row['revenue'] - mean) / std)
            }
            for _, row in anomalies.iterrows()
        ]
        
        return AnalysisResult(
            total_records=total_records,
            date_range=date_range,
            revenue_stats=revenue_stats,
            top_products=top_products_list,
            monthly_trend=monthly_trend,
            anomalies=anomalies_list,
            raw_data_path=""
        )

A2A 服务端

# src/a2a_server.py
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, Any
import uvicorn
import os
import logging

from agent import DataProcessor
from storage import PostgresStorage

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="Python Data Agent", version="1.0.0")

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 生产环境应限制
    allow_methods=["*"],
    allow_headers=["*"],
)

# 全局配置
API_KEY = os.getenv("A2A_API_KEY", "dev-key")
storage = PostgresStorage(os.getenv("DATABASE_URL"))
processor = DataProcessor(storage)

# A2A Agent Card
AGENT_CARD = {
    "name": "python-data-processor",
    "version": "1.0.0",
    "description": "数据处理专家,支持 CSV/Excel 的分析和转换",
    "url": "http://localhost:8081/a2a",
    "capabilities": {
        "streaming": False,
        "pushNotifications": False
    },
    "skills": [
        {
            "id": "csv-analysis",
            "name": "CSV 数据分析",
            "description": "分析 CSV/Excel 文件,返回统计摘要和可视化建议",
            "inputModes": ["text", "file"],
            "outputModes": ["text", "file"]
        },
        {
            "id": "data-cleaning",
            "name": "数据清洗",
            "description": "清洗数据集中的异常值和缺失值",
            "inputModes": ["file"],
            "outputModes": ["file"]
        }
    ],
    "authentication": {
        "type": "apiKey",
        "header": "X-API-Key"
    }
}

# 模型定义
class TaskRequest(BaseModel):
    skill: str
    input: Dict[str, Any]

class TaskResponse(BaseModel):
    task_id: str
    status: str
    result: Optional[Dict[str, Any]] = None
    error: Optional[str] = None

# 认证
def verify_api_key(x_api_key: str = Header(...)):
    if x_api_key != API_KEY:
        raise HTTPException(status_code=401, detail="Invalid API key")
    return x_api_key

# 端点
@app.get("/.well-known/agent.json")
async def get_agent_card():
    return AGENT_CARD

@app.post("/a2a/tasks")
async def create_task(
    request: TaskRequest,
    api_key: str = Depends(verify_api_key)
):
    import uuid
    task_id = str(uuid.uuid4())
    
    logger.info(f"Creating task {task_id} for skill {request.skill}")
    
    try:
        if request.skill == "csv-analysis":
            file_path = request.input.get("file_path")
            analysis_type = request.input.get("analysis_type", "full")
            
            if not file_path:
                raise ValueError("file_path is required")
            
            result = processor.process(file_path, analysis_type)
            
            return TaskResponse(
                task_id=task_id,
                status="completed",
                result={
                    "total_records": result.total_records,
                    "date_range": result.date_range,
                    "revenue_stats": result.revenue_stats,
                    "top_products": result.top_products,
                    "monthly_trend": result.monthly_trend,
                    "anomalies": result.anomalies,
                    "raw_data_path": result.raw_data_path
                }
            )
        
        elif request.skill == "data-cleaning":
            file_path = request.input.get("file_path")
            if not file_path:
                raise ValueError("file_path is required")
            
            df = processor._read_data(file_path)
            cleaned_df = processor._clean_data(df)
            output_path = f"/tmp/cleaned_{task_id}.csv"
            cleaned_df.to_csv(output_path, index=False)
            
            return TaskResponse(
                task_id=task_id,
                status="completed",
                result={"output_path": output_path}
            )
        
        else:
            raise ValueError(f"Unknown skill: {request.skill}")
    
    except Exception as e:
        logger.error(f"Task {task_id} failed: {e}")
        return TaskResponse(
            task_id=task_id,
            status="failed",
            error=str(e)
        )

@app.get("/health")
async def health_check():
    return {"status": "healthy", "agent": "python-data-processor"}

if __name__ == "__main__":
    port = int(os.getenv("PORT", "8081"))
    uvicorn.run(app, host="0.0.0.0", port=port)

Dockerfile

FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY src/ ./src/

# 非 root 用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

EXPOSE 8081

CMD ["python", "-m", "src.a2a_server"]

Go Agent:API 网关与报告生成

项目结构

go-gateway-agent/
├── Dockerfile
├── go.mod
├── go.sum
├── main.go
├── internal/
│   ├── gateway/
│   │   └── gateway.go
│   ├── report/
│   │   └── generator.go
│   └── a2a/
│       └── client.go
└── config/
    └── config.yaml

Gateway 实现

// internal/gateway/gateway.go
package gateway

import (
    "context"
    "fmt"
    "os"
    "time"
    
    "google.golang.org/adk/a2a/client"
    "google.golang.org/adk/agent"
    "google.golang.org/adk/tool"
)

type GatewayAgent struct {
    agent        *agent.Agent
    pythonClient *a2aclient.Client
    reportClient *a2aclient.Client
}

func NewGatewayAgent(ctx context.Context) (*GatewayAgent, error) {
    // 创建 Python Data Agent 客户端
    pythonClient, err := a2aclient.New(ctx,
        a2aclient.WithURL("http://python-agent:8081/a2a"),
        a2aclient.WithAPIKey(os.Getenv("PYTHON_AGENT_API_KEY")),
        a2aclient.WithTimeout(60*time.Second),
        a2aclient.WithRetry(3, time.Second),
    )
    if err != nil {
        return nil, fmt.Errorf("failed to create python client: %w", err)
    }
    
    // 创建本地 Report Agent 客户端(内部调用)
    reportClient, err := a2aclient.New(ctx,
        a2aclient.WithURL("http://localhost:8082/a2a"),
        a2aclient.WithTimeout(30*time.Second),
    )
    if err != nil {
        return nil, fmt.Errorf("failed to create report client: %w", err)
    }
    
    // 创建 Gateway Agent
    a, err := agent.New(agent.Config{
        Name:        "gateway-agent",
        Model:       model,
        Instruction: `你是一个智能数据分析平台的网关。你的职责:1. 理解用户的分析需求 2. 调用 python-data-processor 进行数据处理 3. 调用 report-generator 生成报告 4. 汇总结果返回给用户`,
        Tools: []tool.Tool{
            NewProcessDataTool(pythonClient),
            NewGenerateReportTool(reportClient),
        },
    })
    if err != nil {
        return nil, err
    }
    
    return &GatewayAgent{
        agent:        a,
        pythonClient: pythonClient,
        reportClient: reportClient,
    }, nil
}

func (g *GatewayAgent) HandleRequest(ctx context.Context, userInput string) (string, error) {
    return g.agent.Run(ctx, userInput)
}

Report Generator

// internal/report/generator.go
package report

import (
    "context"
    "fmt"
    "os"
    "time"
    
    "github.com/jung-kurt/gofpdf"
)

type ReportGenerator struct {
    templateDir string
    outputDir   string
}

func NewReportGenerator(templateDir, outputDir string) *ReportGenerator {
    return &ReportGenerator{
        templateDir: templateDir,
        outputDir:   outputDir,
    }
}

func (g *ReportGenerator) GeneratePDF(ctx context.Context, data ReportData) (string, error) {
    pdf := gofpdf.New("P", "mm", "A4", "")
    pdf.AddPage()
    pdf.SetFont("Arial", "B", 16)
    
    // 标题
    pdf.Cell(40, 10, fmt.Sprintf("数据分析报告 - %s", data.Title))
    pdf.Ln(20)
    
    // 概览
    pdf.SetFont("Arial", "B", 12)
    pdf.Cell(40, 10, "数据概览")
    pdf.Ln(10)
    
    pdf.SetFont("Arial", "", 10)
    pdf.Cell(40, 10, fmt.Sprintf("总记录数: %d", data.TotalRecords))
    pdf.Ln(5)
    pdf.Cell(40, 10, fmt.Sprintf("日期范围: %s 至 %s", data.DateRange[0], data.DateRange[1]))
    pdf.Ln(5)
    pdf.Cell(40, 10, fmt.Sprintf("总收入: %.2f", data.RevenueStats["total"]))
    pdf.Ln(10)
    
    // 热销产品
    pdf.SetFont("Arial", "B", 12)
    pdf.Cell(40, 10, "热销产品 TOP 10")
    pdf.Ln(10)
    
    pdf.SetFont("Arial", "", 10)
    for i, product := range data.TopProducts {
        pdf.Cell(40, 10, fmt.Sprintf("%d. %s - 收入: %.2f", 
            i+1, product.Product, product.Revenue))
        pdf.Ln(5)
    }
    
    // 保存
    filename := fmt.Sprintf("report_%d.pdf", time.Now().Unix())
    filepath := fmt.Sprintf("%s/%s", g.outputDir, filename)
    
    if err := pdf.OutputFileAndClose(filepath); err != nil {
        return "", fmt.Errorf("failed to generate PDF: %w", err)
    }
    
    return filepath, nil
}

type ReportData struct {
    Title         string
    TotalRecords  int
    DateRange     [2]string
    RevenueStats  map[string]float64
    TopProducts   []ProductStat
    MonthlyTrend  []MonthlyData
}

type ProductStat struct {
    Product  string
    Revenue  float64
    Quantity int
}

type MonthlyData struct {
    Month   string
    Revenue float64
}

A2A 工具封装

// internal/gateway/tools.go
package gateway

import (
    "context"
    "encoding/json"
    "fmt"
    "time"
    
    "google.golang.org/adk/a2a/client"
    "google.golang.org/adk/tool"
)

// ProcessDataTool 调用 Python Agent 处理数据
type ProcessDataTool struct {
    client *a2aclient.Client
}

func NewProcessDataTool(client *a2aclient.Client) *ProcessDataTool {
    return &ProcessDataTool{client: client}
}

func (t *ProcessDataTool) Name() string        { return "process_data" }
func (t *ProcessDataTool) Description() string { return "调用 Python Agent 处理数据文件" }

func (t *ProcessDataTool) Schema() tool.Schema {
    return tool.Schema{
        Type: "object",
        Properties: map[string]tool.Property{
            "file_path": {
                Type:        "string",
                Description: "数据文件路径",
            },
            "analysis_type": {
                Type:        "string",
                Description: "分析类型: full 或 summary",
                Enum:        []string{"full", "summary"},
            },
        },
        Required: []string{"file_path"},
    }
}

func (t *ProcessDataTool) Call(ctx context.Context, input string) (string, error) {
    var params struct {
        FilePath     string `json:"file_path"`
        AnalysisType string `json:"analysis_type"`
    }
    if err := json.Unmarshal([]byte(input), &params); err != nil {
        return "", err
    }
    
    if params.AnalysisType == "" {
        params.AnalysisType = "full"
    }
    
    // 调用 Python Agent
    task, err := t.client.SendTask(ctx, &a2a.Task{
        Input: map[string]interface{}{
            "skill":         "csv-analysis",
            "file_path":     params.FilePath,
            "analysis_type": params.AnalysisType,
        },
    })
    if err != nil {
        return "", fmt.Errorf("failed to send task: %w", err)
    }
    
    // 等待结果(简化版,实际应轮询)
    result, err := waitForTask(ctx, t.client, task.ID)
    if err != nil {
        return "", err
    }
    
    return result, nil
}

// GenerateReportTool 调用 Report Agent 生成报告
type GenerateReportTool struct {
    client *a2aclient.Client
}

func NewGenerateReportTool(client *a2aclient.Client) *GenerateReportTool {
    return &GenerateReportTool{client: client}
}

func (t *GenerateReportTool) Name() string        { return "generate_report" }
func (t *GenerateReportTool) Description() string { return "生成 PDF 报告" }

func (t *GenerateReportTool) Schema() tool.Schema {
    return tool.Schema{
        Type: "object",
        Properties: map[string]tool.Property{
            "data_path": {
                Type:        "string",
                Description: "数据文件路径",
            },
            "title": {
                Type:        "string",
                Description: "报告标题",
            },
        },
        Required: []string{"data_path", "title"},
    }
}

func (t *GenerateReportTool) Call(ctx context.Context, input string) (string, error) {
    var params struct {
        DataPath string `json:"data_path"`
        Title    string `json:"title"`
    }
    if err := json.Unmarshal([]byte(input), &params); err != nil {
        return "", err
    }
    
    task, err := t.client.SendTask(ctx, &a2a.Task{
        Input: map[string]interface{}{
            "skill":      "generate-pdf",
            "data_path":  params.DataPath,
            "title":      params.Title,
        },
    })
    if err != nil {
        return "", fmt.Errorf("failed to send task: %w", err)
    }
    
    result, err := waitForTask(ctx, t.client, task.ID)
    if err != nil {
        return "", err
    }
    
    return result, nil
}

func waitForTask(ctx context.Context, client *a2aclient.Client, taskID string) (string, error) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    timeout := time.After(5 * time.Minute)
    
    for {
        select {
        case <-ticker.C:
            task, err := client.GetTask(ctx, taskID)
            if err != nil {
                return "", err
            }
            
            switch task.Status {
            case a2a.TaskStatusCompleted:
                result, _ := json.Marshal(task.Output)
                return string(result), nil
            case a2a.TaskStatusFailed:
                return "", fmt.Errorf("task failed: %v", task.Output)
            case a2a.TaskStatusCancelled:
                return "", fmt.Errorf("task cancelled")
            }
            
        case <-timeout:
            client.CancelTask(ctx, taskID)
            return "", fmt.Errorf("task timeout")
            
        case <-ctx.Done():
            client.CancelTask(ctx, taskID)
            return "", ctx.Err()
        }
    }
}

部署与运行

Docker Compose 编排

# docker-compose.yml
version: '3.8'

services:
  python-agent:
    build: ./python-data-agent
    ports:
      - "8081:8081"
    environment:
      - PORT=8081
      - A2A_API_KEY=${PYTHON_AGENT_API_KEY}
      - DATABASE_URL=postgresql://postgres:password@postgres:5432/agentdb
      - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
      - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
    depends_on:
      - postgres
    volumes:
      - ./data:/data
    networks:
      - agent-network

  go-gateway:
    build: ./go-gateway-agent
    ports:
      - "8080:8080"
    environment:
      - PORT=8080
      - PYTHON_AGENT_API_KEY=${PYTHON_AGENT_API_KEY}
      - PYTHON_AGENT_URL=http://python-agent:8081/a2a
      - DATABASE_URL=postgresql://postgres:password@postgres:5432/agentdb
    depends_on:
      - python-agent
      - postgres
    networks:
      - agent-network

  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=agentdb
    volumes:
      - postgres-data:/var/lib/postgresql/data
    networks:
      - agent-network

  minio:
    image: minio/minio
    command: server /data --console-address ":9001"
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      - MINIO_ROOT_USER=minioadmin
      - MINIO_ROOT_PASSWORD=minioadmin
    volumes:
      - minio-data:/data
    networks:
      - agent-network

volumes:
  postgres-data:
  minio-data:

networks:
  agent-network:
    driver: bridge

运行流程

# 1. 设置环境变量
export PYTHON_AGENT_API_KEY="your-secret-key"
export AWS_ACCESS_KEY_ID="your-aws-key"
export AWS_SECRET_ACCESS_KEY="your-aws-secret"

# 2. 启动所有服务
docker-compose up -d

# 3. 验证服务状态
docker-compose ps
curl http://localhost:8081/health
curl http://localhost:8080/health

# 4. 测试完整流程
curl -X POST http://localhost:8080/a2a/tasks \
  -H "Content-Type: application/json" \
  -H "X-API-Key: your-secret-key" \
  -d '{
    "skill": "analyze-and-report",
    "input": {
      "file_path": "/data/sales_data.csv",
      "title": "Q3 销售数据分析报告"
    }
  }'

生产环境考量

跨语言类型映射

Python 类型Go 类型JSON 表示注意事项
intint64numberPython int 无上限,Go 有上限
floatfloat64number浮点精度差异
strstringstringUTF-8 编码
dictmap[string]interface{}object键必须是字符串
list[]interface{}array类型一致性检查
datetimetime.Timestring (RFC3339)时区处理
Nonenilnull空值判断
pandas.DataFrame自定义结构object/array需序列化

错误处理策略

// Go 端:统一错误格式
type ErrorResponse struct {
    Code    string `json:"code"`
    Message string `json:"message"`
    Details string `json:"details,omitempty"`
}

func handleA2AError(err error) *ErrorResponse {
    var a2aErr *a2a.Error
    if errors.As(err, &a2aErr) {
        return &ErrorResponse{
            Code:    a2aErr.Code,
            Message: a2aErr.Message,
            Details: a2aErr.Details,
        }
    }
    
    // 网络错误
    if errors.Is(err, context.DeadlineExceeded) {
        return &ErrorResponse{
            Code:    "TIMEOUT",
            Message: "请求超时,请稍后重试",
        }
    }
    
    return &ErrorResponse{
        Code:    "INTERNAL",
        Message: "内部错误",
    }
}
# Python 端:统一错误格式
class AgentError(Exception):
    def __init__(self, code: str, message: str, details: str = None):
        self.code = code
        self.message = message
        self.details = details
        super().__init__(message)

def handle_error(e: Exception) -> dict:
    if isinstance(e, AgentError):
        return {
            "code": e.code,
            "message": e.message,
            "details": e.details
        }
    elif isinstance(e, pd.errors.EmptyDataError):
        return {
            "code": "EMPTY_DATA",
            "message": "数据文件为空"
        }
    elif isinstance(e, ValueError):
        return {
            "code": "INVALID_INPUT",
            "message": str(e)
        }
    else:
        return {
            "code": "INTERNAL",
            "message": "内部错误",
            "details": str(e)
        }

性能优化

  1. 连接池复用:Go Client 保持长连接,避免频繁 TCP 握手
  2. 批量处理:Python Agent 支持批量数据分析,减少跨语言调用次数
  3. 异步流水线:Gateway 使用 goroutine 并行调用多个 Agent
  4. 结果缓存:常用分析结果缓存到 Redis,避免重复计算

小结

模块 8 完成。学习了:

  • A2A 协议的深层设计理念
  • Exposing 暴露 Go Agent 的完整实践
  • Consuming 消费外部 Agent 的高级模式
  • 跨语言协作的实战案例

接下来进入模块 9:进阶主题——Grounding、Artifacts、Skills、Callbacks。

Consuming | Grounding →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。