Skip to content

Agent间通信接口 (A2A)

概述

A2A (Agent-to-Agent) Interface 提供了标准化的 Agent 间通信协议,基于 JSON-RPC 2.0 实现,支持同步和流式两种通信模式。

协议标准

  • JSON-RPC 2.0: 工业标准的 RPC 协议
  • Server-Sent Events (SSE): 流式响应的传输协议
  • RESTful HTTP: 基于 HTTP 的端点实现

核心组件

pkg/agentos/a2a/
├── types.go      # 协议类型定义
├── validator.go  # 请求验证
├── mapper.go     # 协议转换
├── a2a.go        # A2A 接口管理
└── handlers.go   # HTTP 处理器

快速开始

1. 创建 A2A 接口

go
package main

import (
    "github.com/gin-gonic/gin"
    "github.com/rexleimo/agno-go/pkg/agentos/a2a"
    "github.com/rexleimo/agno-go/pkg/agno/agent"
)

func main() {
    // 创建 Agent
    myAgent, _ := agent.New(&agent.Config{
        Name:  "my-agent",
        Model: model,
        // ... 其他配置
    })

    // 创建 A2A 接口
    a2aInterface := a2a.New("/api/v1/a2a")

    // 注册 Agent 作为 Entity
    a2aInterface.RegisterEntity("my-agent", myAgent)

    // 设置 Gin 路由
    router := gin.Default()
    a2aInterface.SetupRoutes(router)

    router.Run(":8080")
}

2. 发送同步消息

bash
curl -X POST http://localhost:8080/api/v1/a2a/sendMessage \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "method": "sendMessage",
    "id": "req-001",
    "params": {
      "message": {
        "messageId": "msg-001",
        "role": "user",
        "agentId": "my-agent",
        "contextId": "session-123",
        "parts": [
          {
            "type": "text",
            "content": "你好,Agent!"
          }
        ]
      }
    }
  }'

响应示例:

json
{
  "jsonrpc": "2.0",
  "id": "req-001",
  "result": {
    "task": {
      "taskId": "task-001",
      "status": "completed",
      "messages": [
        {
          "messageId": "msg-002",
          "role": "assistant",
          "agentId": "my-agent",
          "contextId": "session-123",
          "parts": [
            {
              "type": "text",
              "content": "你好!有什么可以帮助你的吗?"
            }
          ]
        }
      ]
    }
  }
}

3. 发送流式消息

bash
curl -X POST http://localhost:8080/api/v1/a2a/streamMessage \
  -H "Content-Type: application/json" \
  -H "Accept: text/event-stream" \
  -d '{
    "jsonrpc": "2.0",
    "method": "streamMessage",
    "id": "req-002",
    "params": {
      "message": {
        "messageId": "msg-003",
        "role": "user",
        "agentId": "my-agent",
        "contextId": "session-123",
        "parts": [
          {
            "type": "text",
            "content": "给我讲个故事"
          }
        ]
      }
    }
  }'

SSE 响应流:

data: {"type":"content","content":"从前"}

data: {"type":"content","content":"有"}

data: {"type":"content","content":"一个"}

data: {"type":"content","content":"王国..."}

data: {"type":"done"}

协议详解

JSON-RPC 2.0 请求格式

go
type JSONRPC2Request struct {
    JSONRPC string        `json:"jsonrpc"`  // 必须为 "2.0"
    Method  string        `json:"method"`   // "sendMessage" 或 "streamMessage"
    ID      string        `json:"id"`       // 请求唯一标识
    Params  RequestParams `json:"params"`   // 请求参数
}

type RequestParams struct {
    Message Message `json:"message"`  // 消息内容
}

消息结构

go
type Message struct {
    MessageID string `json:"messageId"`  // 消息唯一标识
    Role      string `json:"role"`       // "user" 或 "assistant"
    AgentID   string `json:"agentId"`    // 目标 Agent ID
    ContextID string `json:"contextId"`  // 会话上下文 ID
    Parts     []Part `json:"parts"`      // 消息部分
}

type Part struct {
    Type    string `json:"type"`              // "text" 或 "data"
    Content string `json:"content,omitempty"` // 文本内容
    Data    string `json:"data,omitempty"`    // 结构化数据 (JSON)
}

响应格式

成功响应

go
type JSONRPC2Response struct {
    JSONRPC string        `json:"jsonrpc"`  // "2.0"
    ID      string        `json:"id"`       // 对应请求 ID
    Result  *ResultObject `json:"result"`   // 结果对象
}

type ResultObject struct {
    Task Task `json:"task"`  // 任务信息
}

type Task struct {
    TaskID   string    `json:"taskId"`   // 任务 ID
    Status   string    `json:"status"`   // "completed" 或 "failed"
    Messages []Message `json:"messages"` // 响应消息列表
}

错误响应

go
type JSONRPC2Response struct {
    JSONRPC string       `json:"jsonrpc"`
    ID      string       `json:"id"`
    Error   *ErrorObject `json:"error"`
}

type ErrorObject struct {
    Code    int    `json:"code"`    // 错误码
    Message string `json:"message"` // 错误信息
}

标准错误码:

  • -32700: Parse error (JSON 解析错误)
  • -32600: Invalid Request (无效请求)
  • -32601: Method not found (方法不存在)
  • -32602: Invalid params (无效参数)
  • -32603: Internal error (内部错误)

验证机制

请求验证

A2A 接口提供完整的请求验证:

go
func ValidateRequest(req *JSONRPC2Request) error {
    // 1. 检查 JSON-RPC 版本
    if req.JSONRPC != "2.0" {
        return fmt.Errorf("invalid jsonrpc version, must be 2.0")
    }

    // 2. 检查方法
    if req.Method != "sendMessage" && req.Method != "streamMessage" {
        return fmt.Errorf("invalid method, must be sendMessage or streamMessage")
    }

    // 3. 检查请求 ID
    if req.ID == "" {
        return fmt.Errorf("request id is required")
    }

    // 4. 验证消息
    return ValidateMessage(&req.Params.Message)
}

func ValidateMessage(msg *Message) error {
    // 检查必填字段
    if msg.MessageID == "" {
        return fmt.Errorf("messageId is required")
    }
    if msg.Role == "" {
        return fmt.Errorf("role is required")
    }
    if msg.AgentID == "" {
        return fmt.Errorf("agentId is required")
    }
    if len(msg.Parts) == 0 {
        return fmt.Errorf("message must have at least one part")
    }

    // 验证每个 part
    for i, part := range msg.Parts {
        if err := ValidatePart(&part); err != nil {
            return fmt.Errorf("invalid part at index %d: %w", i, err)
        }
    }

    return nil
}

高级特性

1. Entity 管理

A2A 接口支持注册多个 Entity (Agent/Workflow):

go
// 注册多个 Agent
a2aInterface.RegisterEntity("sales-agent", salesAgent)
a2aInterface.RegisterEntity("support-agent", supportAgent)
a2aInterface.RegisterEntity("analytics-agent", analyticsAgent)

// 获取 Entity
entity, exists := a2aInterface.GetEntity("sales-agent")

// 列出所有 Entity
entities := a2aInterface.ListEntities()
// 返回: ["sales-agent", "support-agent", "analytics-agent"]

2. 自定义路径前缀

go
// 默认前缀: "/api/v1/a2a"
a2aInterface := a2a.New("/api/v1/a2a")

// 自定义前缀
a2aInterface := a2a.New("/my/custom/path")

// 端点路径:
// POST /my/custom/path/sendMessage
// POST /my/custom/path/streamMessage

3. 协议转换

A2A 接口自动处理协议转换:

go
// A2A Message → Agent RunInput
runInput, err := a2a.MapA2ARequestToRunInput(req)

// Agent RunOutput → A2A Task
task := a2a.MapRunOutputToTask(output, &req.Params.Message)

完整示例

Server 端

go
package main

import (
    "context"
    "log"

    "github.com/gin-gonic/gin"
    "github.com/rexleimo/agno-go/pkg/agentos/a2a"
    "github.com/rexleimo/agno-go/pkg/agno/agent"
    "github.com/rexleimo/agno-go/pkg/agno/models/openai"
)

func main() {
    // 1. 创建模型
    model, err := openai.New("gpt-4", openai.Config{
        APIKey: "your-api-key",
    })
    if err != nil {
        log.Fatal(err)
    }

    // 2. 创建 Agent
    myAgent, err := agent.New(&agent.Config{
        Name:         "customer-service",
        Model:        model,
        Instructions: "你是一个乐于助人的客服 Agent。",
    })
    if err != nil {
        log.Fatal(err)
    }

    // 3. 创建 A2A 接口
    a2aInterface := a2a.New("/api/v1/a2a")
    a2aInterface.RegisterEntity("customer-service", myAgent)

    // 4. 设置路由
    router := gin.Default()
    a2aInterface.SetupRoutes(router)

    // 5. 启动服务
    log.Println("A2A 服务监听 :8080")
    router.Run(":8080")
}

Client 端 (Go)

go
package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"

    "github.com/rexleimo/agno-go/pkg/agentos/a2a"
)

func main() {
    // 构造请求
    request := &a2a.JSONRPC2Request{
        JSONRPC: "2.0",
        Method:  "sendMessage",
        ID:      "req-001",
        Params: a2a.RequestParams{
            Message: a2a.Message{
                MessageID: "msg-001",
                Role:      "user",
                AgentID:   "customer-service",
                ContextID: "session-123",
                Parts: []a2a.Part{
                    {
                        Type:    "text",
                        Content: "如何退货?",
                    },
                },
            },
        },
    }

    // 序列化
    requestBody, _ := json.Marshal(request)

    // 发送请求
    resp, err := http.Post(
        "http://localhost:8080/api/v1/a2a/sendMessage",
        "application/json",
        bytes.NewBuffer(requestBody),
    )
    if err != nil {
        panic(err)
    }
    defer resp.Body.Close()

    // 读取响应
    body, _ := io.ReadAll(resp.Body)

    var response a2a.JSONRPC2Response
    json.Unmarshal(body, &response)

    // 处理结果
    if response.Error != nil {
        fmt.Printf("错误: %s\n", response.Error.Message)
        return
    }

    task := response.Result.Task
    fmt.Printf("任务状态: %s\n", task.Status)
    for _, msg := range task.Messages {
        for _, part := range msg.Parts {
            fmt.Printf("Agent 响应: %s\n", part.Content)
        }
    }
}

最佳实践

1. 错误处理

go
// 使用标准错误码
if err := validateInput(); err != nil {
    return &a2a.ErrorObject{
        Code:    -32602, // Invalid params
        Message: err.Error(),
    }
}

2. ContextID 管理

go
// 为每个会话使用唯一的 contextId
contextID := fmt.Sprintf("session-%s-%d", userID, time.Now().Unix())

// 同一会话的所有消息使用相同的 contextId
message1.ContextID = contextID
message2.ContextID = contextID

3. 并发处理

go
// A2A 接口是并发安全的
// 可以同时处理多个请求

for i := 0; i < 10; i++ {
    go func(id int) {
        // 并发发送请求
        sendMessageToAgent(fmt.Sprintf("req-%d", id))
    }(i)
}

4. 超时控制

go
// 设置请求超时
client := &http.Client{
    Timeout: 30 * time.Second,
}

resp, err := client.Post(url, contentType, body)

性能指标

  • 平均响应时间: ~200ms (取决于模型)
  • 并发处理能力: 1000+ requests/s
  • 内存占用: ~50KB per active connection

故障排查

常见问题

1. "Invalid JSON-RPC version"

原因: jsonrpc 字段不是 "2.0"

解决:

json
{
  "jsonrpc": "2.0",  // 必须是字符串 "2.0"
  "method": "sendMessage",
  ...
}

2. "Agent not found"

原因: agentId 未注册

解决:

go
// 检查已注册的 entities
entities := a2aInterface.ListEntities()
fmt.Println(entities)

// 确保 Agent 已注册
a2aInterface.RegisterEntity("your-agent-id", agent)

3. "Invalid message format"

原因: 消息缺少必填字段

解决:

json
{
  "messageId": "msg-001",     // ✅ 必填
  "role": "user",             // ✅ 必填
  "agentId": "my-agent",      // ✅ 必填
  "contextId": "session-123", // ⚠️ 可选但推荐
  "parts": [                  // ✅ 必填,至少一个
    {
      "type": "text",
      "content": "你好"
    }
  ]
}

相关文档

版本历史

  • v1.1.0 (2025-01-XX): 初始 A2A 接口实现
    • JSON-RPC 2.0 协议支持
    • 同步和流式模式
    • 完整的验证和错误处理

更新时间: 2025-01-XX

Released under the MIT License.