Agent间通信接口 (A2A)
概述
A2A (Agent-to-Agent) Interface 提供了标准化的 Agent 间通信协议,基于 JSON-RPC 2.0 实现,支持同步和流式两种通信模式。
协议标准
- JSON-RPC 2.0: 工业标准的 RPC 协议
- Server-Sent Events (SSE): 流式响应的传输协议
- RESTful HTTP: 基于 HTTP 的端点实现
核心组件
pkg/agentos/a2a/
├── types.go # 协议类型定义
├── validator.go # 请求验证
├── mapper.go # 协议转换
├── a2a.go # A2A 接口管理
└── handlers.go # HTTP 处理器
快速开始
1. 创建 A2A 接口
go
package main
import (
"github.com/gin-gonic/gin"
"github.com/rexleimo/agno-go/pkg/agentos/a2a"
"github.com/rexleimo/agno-go/pkg/agno/agent"
)
func main() {
// 创建 Agent
myAgent, _ := agent.New(&agent.Config{
Name: "my-agent",
Model: model,
// ... 其他配置
})
// 创建 A2A 接口
a2aInterface := a2a.New("/api/v1/a2a")
// 注册 Agent 作为 Entity
a2aInterface.RegisterEntity("my-agent", myAgent)
// 设置 Gin 路由
router := gin.Default()
a2aInterface.SetupRoutes(router)
router.Run(":8080")
}
2. 发送同步消息
bash
curl -X POST http://localhost:8080/api/v1/a2a/sendMessage \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"method": "sendMessage",
"id": "req-001",
"params": {
"message": {
"messageId": "msg-001",
"role": "user",
"agentId": "my-agent",
"contextId": "session-123",
"parts": [
{
"type": "text",
"content": "你好,Agent!"
}
]
}
}
}'
响应示例:
json
{
"jsonrpc": "2.0",
"id": "req-001",
"result": {
"task": {
"taskId": "task-001",
"status": "completed",
"messages": [
{
"messageId": "msg-002",
"role": "assistant",
"agentId": "my-agent",
"contextId": "session-123",
"parts": [
{
"type": "text",
"content": "你好!有什么可以帮助你的吗?"
}
]
}
]
}
}
}
3. 发送流式消息
bash
curl -X POST http://localhost:8080/api/v1/a2a/streamMessage \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{
"jsonrpc": "2.0",
"method": "streamMessage",
"id": "req-002",
"params": {
"message": {
"messageId": "msg-003",
"role": "user",
"agentId": "my-agent",
"contextId": "session-123",
"parts": [
{
"type": "text",
"content": "给我讲个故事"
}
]
}
}
}'
SSE 响应流:
data: {"type":"content","content":"从前"}
data: {"type":"content","content":"有"}
data: {"type":"content","content":"一个"}
data: {"type":"content","content":"王国..."}
data: {"type":"done"}
协议详解
JSON-RPC 2.0 请求格式
go
type JSONRPC2Request struct {
JSONRPC string `json:"jsonrpc"` // 必须为 "2.0"
Method string `json:"method"` // "sendMessage" 或 "streamMessage"
ID string `json:"id"` // 请求唯一标识
Params RequestParams `json:"params"` // 请求参数
}
type RequestParams struct {
Message Message `json:"message"` // 消息内容
}
消息结构
go
type Message struct {
MessageID string `json:"messageId"` // 消息唯一标识
Role string `json:"role"` // "user" 或 "assistant"
AgentID string `json:"agentId"` // 目标 Agent ID
ContextID string `json:"contextId"` // 会话上下文 ID
Parts []Part `json:"parts"` // 消息部分
}
type Part struct {
Type string `json:"type"` // "text" 或 "data"
Content string `json:"content,omitempty"` // 文本内容
Data string `json:"data,omitempty"` // 结构化数据 (JSON)
}
响应格式
成功响应
go
type JSONRPC2Response struct {
JSONRPC string `json:"jsonrpc"` // "2.0"
ID string `json:"id"` // 对应请求 ID
Result *ResultObject `json:"result"` // 结果对象
}
type ResultObject struct {
Task Task `json:"task"` // 任务信息
}
type Task struct {
TaskID string `json:"taskId"` // 任务 ID
Status string `json:"status"` // "completed" 或 "failed"
Messages []Message `json:"messages"` // 响应消息列表
}
错误响应
go
type JSONRPC2Response struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Error *ErrorObject `json:"error"`
}
type ErrorObject struct {
Code int `json:"code"` // 错误码
Message string `json:"message"` // 错误信息
}
标准错误码:
-32700
: Parse error (JSON 解析错误)-32600
: Invalid Request (无效请求)-32601
: Method not found (方法不存在)-32602
: Invalid params (无效参数)-32603
: Internal error (内部错误)
验证机制
请求验证
A2A 接口提供完整的请求验证:
go
func ValidateRequest(req *JSONRPC2Request) error {
// 1. 检查 JSON-RPC 版本
if req.JSONRPC != "2.0" {
return fmt.Errorf("invalid jsonrpc version, must be 2.0")
}
// 2. 检查方法
if req.Method != "sendMessage" && req.Method != "streamMessage" {
return fmt.Errorf("invalid method, must be sendMessage or streamMessage")
}
// 3. 检查请求 ID
if req.ID == "" {
return fmt.Errorf("request id is required")
}
// 4. 验证消息
return ValidateMessage(&req.Params.Message)
}
func ValidateMessage(msg *Message) error {
// 检查必填字段
if msg.MessageID == "" {
return fmt.Errorf("messageId is required")
}
if msg.Role == "" {
return fmt.Errorf("role is required")
}
if msg.AgentID == "" {
return fmt.Errorf("agentId is required")
}
if len(msg.Parts) == 0 {
return fmt.Errorf("message must have at least one part")
}
// 验证每个 part
for i, part := range msg.Parts {
if err := ValidatePart(&part); err != nil {
return fmt.Errorf("invalid part at index %d: %w", i, err)
}
}
return nil
}
高级特性
1. Entity 管理
A2A 接口支持注册多个 Entity (Agent/Workflow):
go
// 注册多个 Agent
a2aInterface.RegisterEntity("sales-agent", salesAgent)
a2aInterface.RegisterEntity("support-agent", supportAgent)
a2aInterface.RegisterEntity("analytics-agent", analyticsAgent)
// 获取 Entity
entity, exists := a2aInterface.GetEntity("sales-agent")
// 列出所有 Entity
entities := a2aInterface.ListEntities()
// 返回: ["sales-agent", "support-agent", "analytics-agent"]
2. 自定义路径前缀
go
// 默认前缀: "/api/v1/a2a"
a2aInterface := a2a.New("/api/v1/a2a")
// 自定义前缀
a2aInterface := a2a.New("/my/custom/path")
// 端点路径:
// POST /my/custom/path/sendMessage
// POST /my/custom/path/streamMessage
3. 协议转换
A2A 接口自动处理协议转换:
go
// A2A Message → Agent RunInput
runInput, err := a2a.MapA2ARequestToRunInput(req)
// Agent RunOutput → A2A Task
task := a2a.MapRunOutputToTask(output, &req.Params.Message)
完整示例
Server 端
go
package main
import (
"context"
"log"
"github.com/gin-gonic/gin"
"github.com/rexleimo/agno-go/pkg/agentos/a2a"
"github.com/rexleimo/agno-go/pkg/agno/agent"
"github.com/rexleimo/agno-go/pkg/agno/models/openai"
)
func main() {
// 1. 创建模型
model, err := openai.New("gpt-4", openai.Config{
APIKey: "your-api-key",
})
if err != nil {
log.Fatal(err)
}
// 2. 创建 Agent
myAgent, err := agent.New(&agent.Config{
Name: "customer-service",
Model: model,
Instructions: "你是一个乐于助人的客服 Agent。",
})
if err != nil {
log.Fatal(err)
}
// 3. 创建 A2A 接口
a2aInterface := a2a.New("/api/v1/a2a")
a2aInterface.RegisterEntity("customer-service", myAgent)
// 4. 设置路由
router := gin.Default()
a2aInterface.SetupRoutes(router)
// 5. 启动服务
log.Println("A2A 服务监听 :8080")
router.Run(":8080")
}
Client 端 (Go)
go
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/rexleimo/agno-go/pkg/agentos/a2a"
)
func main() {
// 构造请求
request := &a2a.JSONRPC2Request{
JSONRPC: "2.0",
Method: "sendMessage",
ID: "req-001",
Params: a2a.RequestParams{
Message: a2a.Message{
MessageID: "msg-001",
Role: "user",
AgentID: "customer-service",
ContextID: "session-123",
Parts: []a2a.Part{
{
Type: "text",
Content: "如何退货?",
},
},
},
},
}
// 序列化
requestBody, _ := json.Marshal(request)
// 发送请求
resp, err := http.Post(
"http://localhost:8080/api/v1/a2a/sendMessage",
"application/json",
bytes.NewBuffer(requestBody),
)
if err != nil {
panic(err)
}
defer resp.Body.Close()
// 读取响应
body, _ := io.ReadAll(resp.Body)
var response a2a.JSONRPC2Response
json.Unmarshal(body, &response)
// 处理结果
if response.Error != nil {
fmt.Printf("错误: %s\n", response.Error.Message)
return
}
task := response.Result.Task
fmt.Printf("任务状态: %s\n", task.Status)
for _, msg := range task.Messages {
for _, part := range msg.Parts {
fmt.Printf("Agent 响应: %s\n", part.Content)
}
}
}
最佳实践
1. 错误处理
go
// 使用标准错误码
if err := validateInput(); err != nil {
return &a2a.ErrorObject{
Code: -32602, // Invalid params
Message: err.Error(),
}
}
2. ContextID 管理
go
// 为每个会话使用唯一的 contextId
contextID := fmt.Sprintf("session-%s-%d", userID, time.Now().Unix())
// 同一会话的所有消息使用相同的 contextId
message1.ContextID = contextID
message2.ContextID = contextID
3. 并发处理
go
// A2A 接口是并发安全的
// 可以同时处理多个请求
for i := 0; i < 10; i++ {
go func(id int) {
// 并发发送请求
sendMessageToAgent(fmt.Sprintf("req-%d", id))
}(i)
}
4. 超时控制
go
// 设置请求超时
client := &http.Client{
Timeout: 30 * time.Second,
}
resp, err := client.Post(url, contentType, body)
性能指标
- 平均响应时间: ~200ms (取决于模型)
- 并发处理能力: 1000+ requests/s
- 内存占用: ~50KB per active connection
故障排查
常见问题
1. "Invalid JSON-RPC version"
原因: jsonrpc
字段不是 "2.0"
解决:
json
{
"jsonrpc": "2.0", // 必须是字符串 "2.0"
"method": "sendMessage",
...
}
2. "Agent not found"
原因: agentId
未注册
解决:
go
// 检查已注册的 entities
entities := a2aInterface.ListEntities()
fmt.Println(entities)
// 确保 Agent 已注册
a2aInterface.RegisterEntity("your-agent-id", agent)
3. "Invalid message format"
原因: 消息缺少必填字段
解决:
json
{
"messageId": "msg-001", // ✅ 必填
"role": "user", // ✅ 必填
"agentId": "my-agent", // ✅ 必填
"contextId": "session-123", // ⚠️ 可选但推荐
"parts": [ // ✅ 必填,至少一个
{
"type": "text",
"content": "你好"
}
]
}
相关文档
版本历史
- v1.1.0 (2025-01-XX): 初始 A2A 接口实现
- JSON-RPC 2.0 协议支持
- 同步和流式模式
- 完整的验证和错误处理
更新时间: 2025-01-XX