跨语言协作实战:Python + Go Agent 协作
通过实际案例展示 Python ADK Agent 和 Go ADK Agent 如何通过 A2A 协议协作——任务分工、信息传递、结果汇总。
跨语言协作实战:Python + Go Agent 协作
在真实的 Agent 系统中,不同组件往往由不同团队使用不同技术栈开发。Python 在数据科学和机器学习领域有深厚积累,Go 在高并发和网络服务方面表现卓越。通过 A2A 协议,这两种语言的 Agent 可以无缝协作,发挥各自优势。
本文将通过一个完整的实战案例——智能数据分析平台,展示 Python Agent(数据处理)和 Go Agent(API 网关)如何通过 A2A 协议协作。
架构设计:分工明确的 Agent 网络
系统架构
用户请求
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Go Agent(API 网关) │
│ 职责:请求路由、认证、限流 │
│ 技术:Go + ADK + A2A Client │
└────────┬────────────────────────────┬───────────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Python Agent │ │ Go Agent │
│(数据处理专家) │ │(报告生成器) │
│ 职责:数据清洗、 │ │ 职责:格式化输出、 │
│ 统计分析 │ │ 文件导出 │
│ 技术:Python + │ │ 技术:Go + ADK │
│ Pandas + │ │ │
│ NumPy │ │ │
└────────┬──────────┘ └────────┬──────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ 数据存储 │ │ 文件存储 │
│(PostgreSQL) │ │(S3 / MinIO) │
└──────────────────┘ └──────────────────┘
协作流程
1. 用户:"分析 sales_data.csv,生成季度报告"
│
▼
2. Go Gateway Agent
- 验证用户权限
- 解析请求意图
- 路由到 Python Data Agent
│
▼
3. Python Data Agent
- 从 S3 下载 sales_data.csv
- 使用 Pandas 进行数据清洗
- 计算季度统计指标
- 将结果存储到 PostgreSQL
- 返回处理结果摘要
│
▼
4. Go Gateway Agent
- 接收数据处理结果
- 调用 Go Report Agent
│
▼
5. Go Report Agent
- 从 PostgreSQL 读取统计数据
- 生成 PDF 报告
- 上传到 S3
- 返回报告下载链接
│
▼
6. Go Gateway Agent
- 汇总所有结果
- 返回给用户
Python Agent:数据处理专家
项目结构
python-data-agent/
├── Dockerfile
├── requirements.txt
├── src/
│ ├── __init__.py
│ ├── agent.py # Agent 核心逻辑
│ ├── data_processor.py # 数据处理模块
│ ├── storage.py # 存储接口
│ └── a2a_server.py # A2A 服务端
├── config/
│ └── config.yaml
└── tests/
└── test_processor.py
核心实现
# src/agent.py
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
@dataclass
class AnalysisResult:
total_records: int
date_range: tuple
revenue_stats: Dict[str, float]
top_products: List[Dict[str, Any]]
monthly_trend: List[Dict[str, Any]]
anomalies: List[Dict[str, Any]]
raw_data_path: str
class DataProcessor:
"""数据处理核心类"""
def __init__(self, storage_client):
self.storage = storage_client
self.required_columns = ['date', 'product', 'quantity', 'price', 'revenue']
def process(self, file_path: str, analysis_type: str = 'full') -> AnalysisResult:
"""处理数据文件并返回分析结果"""
# 1. 读取数据
logger.info(f"Reading data from {file_path}")
df = self._read_data(file_path)
# 2. 数据清洗
logger.info("Cleaning data")
df = self._clean_data(df)
# 3. 根据分析类型执行不同处理
if analysis_type == 'full':
result = self._full_analysis(df)
elif analysis_type == 'summary':
result = self._summary_analysis(df)
else:
raise ValueError(f"Unknown analysis type: {analysis_type}")
# 4. 存储原始数据到数据库
db_path = self.storage.save_dataframe(df, f"processed_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
result.raw_data_path = db_path
return result
def _read_data(self, file_path: str) -> pd.DataFrame:
"""读取 CSV/Excel 文件"""
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
elif file_path.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_path)
else:
raise ValueError(f"Unsupported file format: {file_path}")
# 验证必需列
missing = set(self.required_columns) - set(df.columns)
if missing:
raise ValueError(f"Missing required columns: {missing}")
return df
def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""数据清洗"""
# 去除重复行
df = df.drop_duplicates()
# 处理缺失值
df = df.dropna(subset=['date', 'product', 'revenue'])
# 转换日期格式
df['date'] = pd.to_datetime(df['date'])
# 去除异常值(revenue < 0 或 > 99.9 分位数)
q99 = df['revenue'].quantile(0.999)
df = df[(df['revenue'] >= 0) & (df['revenue'] <= q99)]
# 排序
df = df.sort_values('date')
logger.info(f"Cleaned data: {len(df)} records")
return df
def _full_analysis(self, df: pd.DataFrame) -> AnalysisResult:
"""完整分析"""
# 基础统计
total_records = len(df)
date_range = (df['date'].min().isoformat(), df['date'].max().isoformat())
# 收入统计
revenue_stats = {
'total': float(df['revenue'].sum()),
'mean': float(df['revenue'].mean()),
'median': float(df['revenue'].median()),
'std': float(df['revenue'].std()),
'min': float(df['revenue'].min()),
'max': float(df['revenue'].max()),
}
# 热销产品 TOP 10
top_products = df.groupby('product').agg({
'revenue': 'sum',
'quantity': 'sum'
}).sort_values('revenue', ascending=False).head(10)
top_products_list = [
{
'product': idx,
'revenue': float(row['revenue']),
'quantity': int(row['quantity'])
}
for idx, row in top_products.iterrows()
]
# 月度趋势
df['month'] = df['date'].dt.to_period('M')
monthly = df.groupby('month')['revenue'].sum().reset_index()
monthly_trend = [
{
'month': str(row['month']),
'revenue': float(row['revenue'])
}
for _, row in monthly.iterrows()
]
# 异常检测(使用 3-sigma 规则)
mean = df['revenue'].mean()
std = df['revenue'].std()
anomalies = df[abs(df['revenue'] - mean) > 3 * std]
anomalies_list = [
{
'date': row['date'].isoformat(),
'product': row['product'],
'revenue': float(row['revenue']),
'deviation': float(abs(row['revenue'] - mean) / std)
}
for _, row in anomalies.iterrows()
]
return AnalysisResult(
total_records=total_records,
date_range=date_range,
revenue_stats=revenue_stats,
top_products=top_products_list,
monthly_trend=monthly_trend,
anomalies=anomalies_list,
raw_data_path=""
)
A2A 服务端
# src/a2a_server.py
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, Any
import uvicorn
import os
import logging
from agent import DataProcessor
from storage import PostgresStorage
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Python Data Agent", version="1.0.0")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应限制
allow_methods=["*"],
allow_headers=["*"],
)
# 全局配置
API_KEY = os.getenv("A2A_API_KEY", "dev-key")
storage = PostgresStorage(os.getenv("DATABASE_URL"))
processor = DataProcessor(storage)
# A2A Agent Card
AGENT_CARD = {
"name": "python-data-processor",
"version": "1.0.0",
"description": "数据处理专家,支持 CSV/Excel 的分析和转换",
"url": "http://localhost:8081/a2a",
"capabilities": {
"streaming": False,
"pushNotifications": False
},
"skills": [
{
"id": "csv-analysis",
"name": "CSV 数据分析",
"description": "分析 CSV/Excel 文件,返回统计摘要和可视化建议",
"inputModes": ["text", "file"],
"outputModes": ["text", "file"]
},
{
"id": "data-cleaning",
"name": "数据清洗",
"description": "清洗数据集中的异常值和缺失值",
"inputModes": ["file"],
"outputModes": ["file"]
}
],
"authentication": {
"type": "apiKey",
"header": "X-API-Key"
}
}
# 模型定义
class TaskRequest(BaseModel):
skill: str
input: Dict[str, Any]
class TaskResponse(BaseModel):
task_id: str
status: str
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
# 认证
def verify_api_key(x_api_key: str = Header(...)):
if x_api_key != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API key")
return x_api_key
# 端点
@app.get("/.well-known/agent.json")
async def get_agent_card():
return AGENT_CARD
@app.post("/a2a/tasks")
async def create_task(
request: TaskRequest,
api_key: str = Depends(verify_api_key)
):
import uuid
task_id = str(uuid.uuid4())
logger.info(f"Creating task {task_id} for skill {request.skill}")
try:
if request.skill == "csv-analysis":
file_path = request.input.get("file_path")
analysis_type = request.input.get("analysis_type", "full")
if not file_path:
raise ValueError("file_path is required")
result = processor.process(file_path, analysis_type)
return TaskResponse(
task_id=task_id,
status="completed",
result={
"total_records": result.total_records,
"date_range": result.date_range,
"revenue_stats": result.revenue_stats,
"top_products": result.top_products,
"monthly_trend": result.monthly_trend,
"anomalies": result.anomalies,
"raw_data_path": result.raw_data_path
}
)
elif request.skill == "data-cleaning":
file_path = request.input.get("file_path")
if not file_path:
raise ValueError("file_path is required")
df = processor._read_data(file_path)
cleaned_df = processor._clean_data(df)
output_path = f"/tmp/cleaned_{task_id}.csv"
cleaned_df.to_csv(output_path, index=False)
return TaskResponse(
task_id=task_id,
status="completed",
result={"output_path": output_path}
)
else:
raise ValueError(f"Unknown skill: {request.skill}")
except Exception as e:
logger.error(f"Task {task_id} failed: {e}")
return TaskResponse(
task_id=task_id,
status="failed",
error=str(e)
)
@app.get("/health")
async def health_check():
return {"status": "healthy", "agent": "python-data-processor"}
if __name__ == "__main__":
port = int(os.getenv("PORT", "8081"))
uvicorn.run(app, host="0.0.0.0", port=port)
Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY src/ ./src/
# 非 root 用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
EXPOSE 8081
CMD ["python", "-m", "src.a2a_server"]
Go Agent:API 网关与报告生成
项目结构
go-gateway-agent/
├── Dockerfile
├── go.mod
├── go.sum
├── main.go
├── internal/
│ ├── gateway/
│ │ └── gateway.go
│ ├── report/
│ │ └── generator.go
│ └── a2a/
│ └── client.go
└── config/
└── config.yaml
Gateway 实现
// internal/gateway/gateway.go
package gateway
import (
"context"
"fmt"
"os"
"time"
"google.golang.org/adk/a2a/client"
"google.golang.org/adk/agent"
"google.golang.org/adk/tool"
)
type GatewayAgent struct {
agent *agent.Agent
pythonClient *a2aclient.Client
reportClient *a2aclient.Client
}
func NewGatewayAgent(ctx context.Context) (*GatewayAgent, error) {
// 创建 Python Data Agent 客户端
pythonClient, err := a2aclient.New(ctx,
a2aclient.WithURL("http://python-agent:8081/a2a"),
a2aclient.WithAPIKey(os.Getenv("PYTHON_AGENT_API_KEY")),
a2aclient.WithTimeout(60*time.Second),
a2aclient.WithRetry(3, time.Second),
)
if err != nil {
return nil, fmt.Errorf("failed to create python client: %w", err)
}
// 创建本地 Report Agent 客户端(内部调用)
reportClient, err := a2aclient.New(ctx,
a2aclient.WithURL("http://localhost:8082/a2a"),
a2aclient.WithTimeout(30*time.Second),
)
if err != nil {
return nil, fmt.Errorf("failed to create report client: %w", err)
}
// 创建 Gateway Agent
a, err := agent.New(agent.Config{
Name: "gateway-agent",
Model: model,
Instruction: `你是一个智能数据分析平台的网关。你的职责:1. 理解用户的分析需求 2. 调用 python-data-processor 进行数据处理 3. 调用 report-generator 生成报告 4. 汇总结果返回给用户`,
Tools: []tool.Tool{
NewProcessDataTool(pythonClient),
NewGenerateReportTool(reportClient),
},
})
if err != nil {
return nil, err
}
return &GatewayAgent{
agent: a,
pythonClient: pythonClient,
reportClient: reportClient,
}, nil
}
func (g *GatewayAgent) HandleRequest(ctx context.Context, userInput string) (string, error) {
return g.agent.Run(ctx, userInput)
}
Report Generator
// internal/report/generator.go
package report
import (
"context"
"fmt"
"os"
"time"
"github.com/jung-kurt/gofpdf"
)
type ReportGenerator struct {
templateDir string
outputDir string
}
func NewReportGenerator(templateDir, outputDir string) *ReportGenerator {
return &ReportGenerator{
templateDir: templateDir,
outputDir: outputDir,
}
}
func (g *ReportGenerator) GeneratePDF(ctx context.Context, data ReportData) (string, error) {
pdf := gofpdf.New("P", "mm", "A4", "")
pdf.AddPage()
pdf.SetFont("Arial", "B", 16)
// 标题
pdf.Cell(40, 10, fmt.Sprintf("数据分析报告 - %s", data.Title))
pdf.Ln(20)
// 概览
pdf.SetFont("Arial", "B", 12)
pdf.Cell(40, 10, "数据概览")
pdf.Ln(10)
pdf.SetFont("Arial", "", 10)
pdf.Cell(40, 10, fmt.Sprintf("总记录数: %d", data.TotalRecords))
pdf.Ln(5)
pdf.Cell(40, 10, fmt.Sprintf("日期范围: %s 至 %s", data.DateRange[0], data.DateRange[1]))
pdf.Ln(5)
pdf.Cell(40, 10, fmt.Sprintf("总收入: %.2f", data.RevenueStats["total"]))
pdf.Ln(10)
// 热销产品
pdf.SetFont("Arial", "B", 12)
pdf.Cell(40, 10, "热销产品 TOP 10")
pdf.Ln(10)
pdf.SetFont("Arial", "", 10)
for i, product := range data.TopProducts {
pdf.Cell(40, 10, fmt.Sprintf("%d. %s - 收入: %.2f",
i+1, product.Product, product.Revenue))
pdf.Ln(5)
}
// 保存
filename := fmt.Sprintf("report_%d.pdf", time.Now().Unix())
filepath := fmt.Sprintf("%s/%s", g.outputDir, filename)
if err := pdf.OutputFileAndClose(filepath); err != nil {
return "", fmt.Errorf("failed to generate PDF: %w", err)
}
return filepath, nil
}
type ReportData struct {
Title string
TotalRecords int
DateRange [2]string
RevenueStats map[string]float64
TopProducts []ProductStat
MonthlyTrend []MonthlyData
}
type ProductStat struct {
Product string
Revenue float64
Quantity int
}
type MonthlyData struct {
Month string
Revenue float64
}
A2A 工具封装
// internal/gateway/tools.go
package gateway
import (
"context"
"encoding/json"
"fmt"
"time"
"google.golang.org/adk/a2a/client"
"google.golang.org/adk/tool"
)
// ProcessDataTool 调用 Python Agent 处理数据
type ProcessDataTool struct {
client *a2aclient.Client
}
func NewProcessDataTool(client *a2aclient.Client) *ProcessDataTool {
return &ProcessDataTool{client: client}
}
func (t *ProcessDataTool) Name() string { return "process_data" }
func (t *ProcessDataTool) Description() string { return "调用 Python Agent 处理数据文件" }
func (t *ProcessDataTool) Schema() tool.Schema {
return tool.Schema{
Type: "object",
Properties: map[string]tool.Property{
"file_path": {
Type: "string",
Description: "数据文件路径",
},
"analysis_type": {
Type: "string",
Description: "分析类型: full 或 summary",
Enum: []string{"full", "summary"},
},
},
Required: []string{"file_path"},
}
}
func (t *ProcessDataTool) Call(ctx context.Context, input string) (string, error) {
var params struct {
FilePath string `json:"file_path"`
AnalysisType string `json:"analysis_type"`
}
if err := json.Unmarshal([]byte(input), ¶ms); err != nil {
return "", err
}
if params.AnalysisType == "" {
params.AnalysisType = "full"
}
// 调用 Python Agent
task, err := t.client.SendTask(ctx, &a2a.Task{
Input: map[string]interface{}{
"skill": "csv-analysis",
"file_path": params.FilePath,
"analysis_type": params.AnalysisType,
},
})
if err != nil {
return "", fmt.Errorf("failed to send task: %w", err)
}
// 等待结果(简化版,实际应轮询)
result, err := waitForTask(ctx, t.client, task.ID)
if err != nil {
return "", err
}
return result, nil
}
// GenerateReportTool 调用 Report Agent 生成报告
type GenerateReportTool struct {
client *a2aclient.Client
}
func NewGenerateReportTool(client *a2aclient.Client) *GenerateReportTool {
return &GenerateReportTool{client: client}
}
func (t *GenerateReportTool) Name() string { return "generate_report" }
func (t *GenerateReportTool) Description() string { return "生成 PDF 报告" }
func (t *GenerateReportTool) Schema() tool.Schema {
return tool.Schema{
Type: "object",
Properties: map[string]tool.Property{
"data_path": {
Type: "string",
Description: "数据文件路径",
},
"title": {
Type: "string",
Description: "报告标题",
},
},
Required: []string{"data_path", "title"},
}
}
func (t *GenerateReportTool) Call(ctx context.Context, input string) (string, error) {
var params struct {
DataPath string `json:"data_path"`
Title string `json:"title"`
}
if err := json.Unmarshal([]byte(input), ¶ms); err != nil {
return "", err
}
task, err := t.client.SendTask(ctx, &a2a.Task{
Input: map[string]interface{}{
"skill": "generate-pdf",
"data_path": params.DataPath,
"title": params.Title,
},
})
if err != nil {
return "", fmt.Errorf("failed to send task: %w", err)
}
result, err := waitForTask(ctx, t.client, task.ID)
if err != nil {
return "", err
}
return result, nil
}
func waitForTask(ctx context.Context, client *a2aclient.Client, taskID string) (string, error) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
timeout := time.After(5 * time.Minute)
for {
select {
case <-ticker.C:
task, err := client.GetTask(ctx, taskID)
if err != nil {
return "", err
}
switch task.Status {
case a2a.TaskStatusCompleted:
result, _ := json.Marshal(task.Output)
return string(result), nil
case a2a.TaskStatusFailed:
return "", fmt.Errorf("task failed: %v", task.Output)
case a2a.TaskStatusCancelled:
return "", fmt.Errorf("task cancelled")
}
case <-timeout:
client.CancelTask(ctx, taskID)
return "", fmt.Errorf("task timeout")
case <-ctx.Done():
client.CancelTask(ctx, taskID)
return "", ctx.Err()
}
}
}
部署与运行
Docker Compose 编排
# docker-compose.yml
version: '3.8'
services:
python-agent:
build: ./python-data-agent
ports:
- "8081:8081"
environment:
- PORT=8081
- A2A_API_KEY=${PYTHON_AGENT_API_KEY}
- DATABASE_URL=postgresql://postgres:password@postgres:5432/agentdb
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
depends_on:
- postgres
volumes:
- ./data:/data
networks:
- agent-network
go-gateway:
build: ./go-gateway-agent
ports:
- "8080:8080"
environment:
- PORT=8080
- PYTHON_AGENT_API_KEY=${PYTHON_AGENT_API_KEY}
- PYTHON_AGENT_URL=http://python-agent:8081/a2a
- DATABASE_URL=postgresql://postgres:password@postgres:5432/agentdb
depends_on:
- python-agent
- postgres
networks:
- agent-network
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
- POSTGRES_DB=agentdb
volumes:
- postgres-data:/var/lib/postgresql/data
networks:
- agent-network
minio:
image: minio/minio
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
volumes:
- minio-data:/data
networks:
- agent-network
volumes:
postgres-data:
minio-data:
networks:
agent-network:
driver: bridge
运行流程
# 1. 设置环境变量
export PYTHON_AGENT_API_KEY="your-secret-key"
export AWS_ACCESS_KEY_ID="your-aws-key"
export AWS_SECRET_ACCESS_KEY="your-aws-secret"
# 2. 启动所有服务
docker-compose up -d
# 3. 验证服务状态
docker-compose ps
curl http://localhost:8081/health
curl http://localhost:8080/health
# 4. 测试完整流程
curl -X POST http://localhost:8080/a2a/tasks \
-H "Content-Type: application/json" \
-H "X-API-Key: your-secret-key" \
-d '{
"skill": "analyze-and-report",
"input": {
"file_path": "/data/sales_data.csv",
"title": "Q3 销售数据分析报告"
}
}'
生产环境考量
跨语言类型映射
| Python 类型 | Go 类型 | JSON 表示 | 注意事项 |
|---|---|---|---|
int | int64 | number | Python int 无上限,Go 有上限 |
float | float64 | number | 浮点精度差异 |
str | string | string | UTF-8 编码 |
dict | map[string]interface{} | object | 键必须是字符串 |
list | []interface{} | array | 类型一致性检查 |
datetime | time.Time | string (RFC3339) | 时区处理 |
None | nil | null | 空值判断 |
pandas.DataFrame | 自定义结构 | object/array | 需序列化 |
错误处理策略
// Go 端:统一错误格式
type ErrorResponse struct {
Code string `json:"code"`
Message string `json:"message"`
Details string `json:"details,omitempty"`
}
func handleA2AError(err error) *ErrorResponse {
var a2aErr *a2a.Error
if errors.As(err, &a2aErr) {
return &ErrorResponse{
Code: a2aErr.Code,
Message: a2aErr.Message,
Details: a2aErr.Details,
}
}
// 网络错误
if errors.Is(err, context.DeadlineExceeded) {
return &ErrorResponse{
Code: "TIMEOUT",
Message: "请求超时,请稍后重试",
}
}
return &ErrorResponse{
Code: "INTERNAL",
Message: "内部错误",
}
}
# Python 端:统一错误格式
class AgentError(Exception):
def __init__(self, code: str, message: str, details: str = None):
self.code = code
self.message = message
self.details = details
super().__init__(message)
def handle_error(e: Exception) -> dict:
if isinstance(e, AgentError):
return {
"code": e.code,
"message": e.message,
"details": e.details
}
elif isinstance(e, pd.errors.EmptyDataError):
return {
"code": "EMPTY_DATA",
"message": "数据文件为空"
}
elif isinstance(e, ValueError):
return {
"code": "INVALID_INPUT",
"message": str(e)
}
else:
return {
"code": "INTERNAL",
"message": "内部错误",
"details": str(e)
}
性能优化
- 连接池复用:Go Client 保持长连接,避免频繁 TCP 握手
- 批量处理:Python Agent 支持批量数据分析,减少跨语言调用次数
- 异步流水线:Gateway 使用 goroutine 并行调用多个 Agent
- 结果缓存:常用分析结果缓存到 Redis,避免重复计算
小结
模块 8 完成。学习了:
- A2A 协议的深层设计理念
- Exposing 暴露 Go Agent 的完整实践
- Consuming 消费外部 Agent 的高级模式
- 跨语言协作的实战案例
接下来进入模块 9:进阶主题——Grounding、Artifacts、Skills、Callbacks。
← Consuming | Grounding →
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
