Exposing:暴露 Go Agent 给外部调用
详解如何通过 A2A 协议将 Go Agent 暴露给外部系统——服务注册、端点暴露、认证配置。
Exposing:暴露 Go Agent 给外部调用
将 Go Agent 通过 A2A 协议暴露给外部系统,是构建分布式 Agent 网络的第一步。但"暴露"不等于"开放"——生产环境中,暴露 Agent 需要考虑认证授权、访问控制、限流熔断、服务发现、版本管理等一系列问题。
本文将深入讲解如何安全、可靠、可扩展地暴露 Go Agent,从基础配置到生产级加固。
基础暴露:从最小可用到生产就绪
最小暴露配置
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"google.golang.org/adk/a2a"
"google.golang.org/adk/agent"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 创建 Agent
weatherAgent, err := agent.New(agent.Config{
Name: "weather-agent",
Model: model,
Instruction: "你是一个天气查询专家,提供准确的天气信息。",
})
if err != nil {
log.Fatalf("failed to create agent: %v", err)
}
// 创建 A2A 服务器
server, err := a2a.NewServer(ctx,
a2a.WithAgent(weatherAgent),
a2a.WithPort(8080),
a2a.WithAgentCard(a2a.AgentCard{
Name: "weather-agent",
Version: "1.0.0",
Description: "天气查询专家,支持全球城市",
URL: "http://localhost:8080/a2a",
Capabilities: a2a.Capabilities{
Streaming: true,
PushNotifications: false,
},
Skills: []a2a.Skill{
{
ID: "current-weather",
Name: "当前天气查询",
Description: "查询指定城市的当前天气状况",
InputModes: []string{"text"},
OutputModes: []string{"text"},
},
{
ID: "forecast",
Name: "天气预报",
Description: "查询未来 7 天天气预报",
InputModes: []string{"text"},
OutputModes: []string{"text", "file"},
},
},
}),
)
if err != nil {
log.Fatalf("failed to create server: %v", err)
}
// 优雅关闭处理
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
log.Println("shutting down server...")
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
log.Printf("shutdown error: %v", err)
}
cancel()
}()
log.Println("A2A server starting on :8080")
if err := server.Serve(); err != nil && err != http.ErrServerClosed {
log.Fatalf("server error: %v", err)
}
}
暴露的端点
A2A 服务器自动暴露以下标准端点:
| 端点 | 方法 | 说明 |
|---|---|---|
/.well-known/agent.json | GET | Agent Card(能力描述) |
/a2a | POST | A2A 协议主入口 |
/a2a/tasks | POST | 创建任务 |
/a2a/tasks/{id} | GET | 获取任务状态 |
/a2a/tasks/{id}/cancel | POST | 取消任务 |
/health | GET | 健康检查 |
服务注册:让 Agent 被发现
静态注册
在小型系统中,可以手动维护 Agent 注册表:
// registry.go
type StaticRegistry struct {
agents map[string]*a2a.AgentCard
mu sync.RWMutex
}
func NewStaticRegistry() *StaticRegistry {
return &StaticRegistry{
agents: make(map[string]*a2a.AgentCard),
}
}
func (r *StaticRegistry) Register(card *a2a.AgentCard) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.agents[card.Name]; exists {
return fmt.Errorf("agent %s already registered", card.Name)
}
// 验证 Agent Card
if err := validateAgentCard(card); err != nil {
return fmt.Errorf("invalid agent card: %w", err)
}
r.agents[card.Name] = card
log.Printf("agent registered: %s v%s at %s", card.Name, card.Version, card.URL)
return nil
}
func (r *StaticRegistry) Discover(skill string) ([]*a2a.AgentCard, error) {
r.mu.RLock()
defer r.mu.RUnlock()
var result []*a2a.AgentCard
for _, card := range r.agents {
for _, s := range card.Skills {
if s.ID == skill || s.Name == skill {
result = append(result, card)
break
}
}
}
return result, nil
}
func (r *StaticRegistry) Get(name string) (*a2a.AgentCard, error) {
r.mu.RLock()
defer r.mu.RUnlock()
card, exists := r.agents[name]
if !exists {
return nil, fmt.Errorf("agent %s not found", name)
}
return card, nil
}
func validateAgentCard(card *a2a.AgentCard) error {
if card.Name == "" {
return fmt.Errorf("name is required")
}
if card.URL == "" {
return fmt.Errorf("URL is required")
}
if len(card.Skills) == 0 {
return fmt.Errorf("at least one skill is required")
}
for i, skill := range card.Skills {
if skill.ID == "" {
return fmt.Errorf("skill[%d].id is required", i)
}
if skill.Name == "" {
return fmt.Errorf("skill[%d].name is required", i)
}
}
return nil
}
动态服务发现(Consul)
在大型分布式系统中,使用 Consul 或 etcd 进行动态服务发现:
type ConsulRegistry struct {
client *api.Client
prefix string
}
func NewConsulRegistry(addr string) (*ConsulRegistry, error) {
config := api.DefaultConfig()
config.Address = addr
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &ConsulRegistry{
client: client,
prefix: "a2a/agents",
}, nil
}
func (r *ConsulRegistry) Register(ctx context.Context, card *a2a.AgentCard) error {
// 序列化 Agent Card
cardData, err := json.Marshal(card)
if err != nil {
return err
}
// 注册服务
service := &api.AgentServiceRegistration{
ID: card.Name,
Name: "a2a-agent",
Tags: extractSkillTags(card.Skills),
Port: extractPort(card.URL),
Address: extractHost(card.URL),
Meta: map[string]string{
"version": card.Version,
"agent_card": string(cardData),
},
Check: &api.AgentServiceCheck{
HTTP: fmt.Sprintf("%s/health", card.URL),
Interval: "10s",
Timeout: "5s",
DeregisterCriticalServiceAfter: "1m",
},
}
if err := r.client.Agent().ServiceRegister(service); err != nil {
return fmt.Errorf("failed to register service: %w", err)
}
// 存储 Agent Card 到 KV
key := fmt.Sprintf("%s/%s", r.prefix, card.Name)
_, err = r.client.KV().Put(&api.KVPair{
Key: key,
Value: cardData,
}, nil)
return err
}
func (r *ConsulRegistry) Discover(ctx context.Context, skill string) ([]*a2a.AgentCard, error) {
// 查询健康的服务
services, _, err := r.client.Health().Service("a2a-agent", skill, true, nil)
if err != nil {
return nil, err
}
var agents []*a2a.AgentCard
for _, svc := range services {
cardData := svc.Service.Meta["agent_card"]
if cardData == "" {
continue
}
var card a2a.AgentCard
if err := json.Unmarshal([]byte(cardData), &card); err != nil {
log.Printf("failed to unmarshal agent card for %s: %v", svc.Service.ID, err)
continue
}
agents = append(agents, &card)
}
return agents, nil
}
func (r *ConsulRegistry) Watch(ctx context.Context, callback func([]*a2a.AgentCard)) error {
// 使用 Consul Watch 实时更新
plan, err := watch.Parse(map[string]interface{}{
"type": "service",
"service": "a2a-agent",
})
if err != nil {
return err
}
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return
}
services, ok := raw.([]*api.ServiceEntry)
if !ok {
return
}
var agents []*a2a.AgentCard
for _, svc := range services {
cardData := svc.Service.Meta["agent_card"]
if cardData == "" {
continue
}
var card a2a.AgentCard
if err := json.Unmarshal([]byte(cardData), &card); err != nil {
continue
}
agents = append(agents, &card)
}
callback(agents)
}
return plan.RunWithClientAndLogger(r.client, nil)
}
认证配置:多层防御体系
API Key 认证(基础)
// 生产级 API Key 认证
func setupAPIKeyAuth() a2a.AuthMiddleware {
// 从环境变量或 Secret Manager 加载有效密钥
validKeys := loadValidAPIKeys()
return a2a.APIKeyAuth{
Header: "X-API-Key",
Validator: func(key string) (string, error) {
// 1. 检查密钥是否存在
clientID, exists := validKeys[key]
if !exists {
return "", fmt.Errorf("invalid API key")
}
// 2. 检查密钥是否过期
if isKeyExpired(key) {
return "", fmt.Errorf("API key expired")
}
// 3. 记录使用(用于审计和限流)
recordKeyUsage(key)
return clientID, nil
},
}
}
func loadValidAPIKeys() map[string]string {
keys := make(map[string]string)
// 从环境变量加载(开发环境)
if envKeys := os.Getenv("A2A_API_KEYS"); envKeys != "" {
for _, pair := range strings.Split(envKeys, ",") {
parts := strings.SplitN(pair, ":", 2)
if len(parts) == 2 {
keys[parts[0]] = parts[1]
}
}
}
// 生产环境:从 Secret Manager 加载
if metadata.OnGCE() {
ctx := context.Background()
client, err := secretmanager.NewClient(ctx)
if err == nil {
defer client.Close()
req := &secretmanagerpb.AccessSecretVersionRequest{
Name: "projects/my-project/secrets/a2a-api-keys/versions/latest",
}
result, err := client.AccessSecretVersion(ctx, req)
if err == nil {
var secretKeys map[string]string
if err := json.Unmarshal(result.Payload.Data, &secretKeys); err == nil {
for k, v := range secretKeys {
keys[k] = v
}
}
}
}
}
return keys
}
OAuth 2.0 / JWT 认证(企业级)
import (
"github.com/golang-jwt/jwt/v5"
)
type JWTAuth struct {
publicKey *rsa.PublicKey
issuer string
audience string
}
func NewJWTAuth(publicKeyPEM []byte, issuer, audience string) (*JWTAuth, error) {
block, _ := pem.Decode(publicKeyPEM)
if block == nil {
return nil, fmt.Errorf("failed to decode PEM block")
}
pub, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
return nil, err
}
rsaPub, ok := pub.(*rsa.PublicKey)
if !ok {
return nil, fmt.Errorf("not an RSA public key")
}
return &JWTAuth{
publicKey: rsaPub,
issuer: issuer,
audience: audience,
}, nil
}
func (a *JWTAuth) Validate(tokenString string) (*jwt.MapClaims, error) {
token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodRSA); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
}
return a.publicKey, nil
},
jwt.WithIssuer(a.issuer),
jwt.WithAudience(a.audience),
jwt.WithValidMethods([]string{"RS256"}),
)
if err != nil {
return nil, fmt.Errorf("invalid token: %w", err)
}
if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
return &claims, nil
}
return nil, fmt.Errorf("invalid claims")
}
// 在 A2A 服务器中使用
func setupOAuthAuth() a2a.AuthMiddleware {
publicKeyPEM := []byte(`-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA...
-----END PUBLIC KEY-----`)
jwtAuth, err := NewJWTAuth(publicKeyPEM, "auth.example.com", "a2a-agent")
if err != nil {
log.Fatal(err)
}
return a2a.BearerAuth{
Validator: func(token string) (string, error) {
claims, err := jwtAuth.Validate(token)
if err != nil {
return "", err
}
// 提取用户/客户端 ID
sub, ok := (*claims)["sub"].(string)
if !ok {
return "", fmt.Errorf("missing sub claim")
}
// 检查权限
scopes, ok := (*claims)["scope"].(string)
if !ok || !strings.Contains(scopes, "a2a:invoke") {
return "", fmt.Errorf("insufficient scope")
}
return sub, nil
},
}
}
mTLS(双向 TLS)认证(最高安全)
func setupMTLS() (*tls.Config, error) {
// 加载服务器证书
cert, err := tls.LoadX509KeyPair("server.crt", "server.key")
if err != nil {
return nil, err
}
// 加载 CA 证书(用于验证客户端)
caCert, err := os.ReadFile("ca.crt")
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
return &tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
MinVersion: tls.VersionTLS12,
CipherSuites: []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
},
}, nil
}
// 在 A2A 服务器中使用
server, err := a2a.NewServer(ctx,
a2a.WithAgent(myAgent),
a2a.WithPort(8443),
a2a.WithTLS(setupMTLS()),
)
限流与防护
分层限流策略
// 1. 全局限流(保护整个服务)
func setupGlobalRateLimit() a2a.Middleware {
limiter := rate.NewLimiter(rate.Limit(1000), 2000) // 每秒 1000,突发 2000
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "service overloaded", http.StatusServiceUnavailable)
return
}
next.ServeHTTP(w, r)
})
}
}
// 2. 按客户端限流(防止单个客户端耗尽资源)
type ClientRateLimiter struct {
limiters sync.Map // map[string]*rate.Limiter
rps rate.Limit
burst int
}
func NewClientRateLimiter(rps rate.Limit, burst int) *ClientRateLimiter {
return &ClientRateLimiter{
rps: rps,
burst: burst,
}
}
func (l *ClientRateLimiter) getLimiter(clientID string) *rate.Limiter {
limiter, exists := l.limiters.Load(clientID)
if exists {
return limiter.(*rate.Limiter)
}
newLimiter := rate.NewLimiter(l.rps, l.burst)
actual, loaded := l.limiters.LoadOrStore(clientID, newLimiter)
if loaded {
return actual.(*rate.Limiter)
}
return newLimiter
}
func (l *ClientRateLimiter) Middleware() a2a.Middleware {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
clientID := r.Context().Value("client_id").(string)
if clientID == "" {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
limiter := l.getLimiter(clientID)
if !limiter.Allow() {
w.Header().Set("X-RateLimit-Limit", fmt.Sprintf("%v", l.rps))
w.Header().Set("X-RateLimit-Remaining", "0")
http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
}
// 3. 按 Skill 限流(保护特定能力)
type SkillRateLimiter struct {
limits map[string]rate.Limit
}
func (l *SkillRateLimiter) Middleware() a2a.Middleware {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 从请求中解析 skill
skill := r.Header.Get("X-A2A-Skill")
if skill == "" {
next.ServeHTTP(w, r)
return
}
// 检查 skill 限流
if limit, exists := l.limits[skill]; exists {
// ... 限流逻辑
}
next.ServeHTTP(w, r)
})
}
}
请求大小限制
func setupRequestSizeLimit(maxSize int64) a2a.Middleware {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, maxSize)
next.ServeHTTP(w, r)
})
}
}
server, err := a2a.NewServer(ctx,
a2a.WithAgent(myAgent),
a2a.WithMiddleware(
setupGlobalRateLimit(),
NewClientRateLimiter(10, 20).Middleware(), // 每个客户端 10 rps
setupRequestSizeLimit(10*1024*1024), // 最大 10MB
),
)
版本管理与兼容性
语义化版本控制
type Version struct {
Major int
Minor int
Patch int
}
func (v Version) String() string {
return fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch)
}
func (v Version) CompatibleWith(other Version) bool {
// 主版本相同即兼容
return v.Major == other.Major
}
// Agent Card 中的版本信息
type AgentCard struct {
Name string `json:"name"`
Version string `json:"version"`
MinVersion string `json:"minVersion,omitempty"` // 最低兼容版本
Deprecated bool `json:"deprecated,omitempty"`
Deprecation string `json:"deprecation,omitempty"` // 弃用说明
}
多版本共存
// 同时暴露多个版本
func main() {
ctx := context.Background()
// v1 Agent
agentV1, _ := agent.New(agent.Config{
Name: "weather-agent",
Version: "1.0.0",
})
// v2 Agent(新功能)
agentV2, _ := agent.New(agent.Config{
Name: "weather-agent",
Version: "2.0.0",
})
// v1 服务器
serverV1, _ := a2a.NewServer(ctx,
a2a.WithAgent(agentV1),
a2a.WithPort(8081),
a2a.WithPathPrefix("/v1"),
)
// v2 服务器
serverV2, _ := a2a.NewServer(ctx,
a2a.WithAgent(agentV2),
a2a.WithPort(8082),
a2a.WithPathPrefix("/v2"),
)
// 启动两个版本
go serverV1.Serve()
go serverV2.Serve()
// 使用 Nginx 或 API Gateway 做版本路由
}
常见问题深度解析
Q:Agent 暴露到公网安全吗?
绝对不安全,除非做好以下防护:
- 传输层:强制 HTTPS/TLS 1.2+
- 认证层:API Key / OAuth / mTLS 至少选一种
- 限流层:防止 DDoS 和资源耗尽
- 网络层:使用 VPC、防火墙规则限制访问源
- 审计层:记录所有调用日志,便于追溯
// 生产级安全配置
server, err := a2a.NewServer(ctx,
a2a.WithAgent(myAgent),
a2a.WithPort(443),
a2a.WithTLS(tlsConfig), // TLS 加密
a2a.WithAuth(setupOAuthAuth()), // OAuth 认证
a2a.WithMiddleware(
setupGlobalRateLimit(), // 全局限流
NewClientRateLimiter(10, 20).Middleware(), // 客户端限流
setupRequestSizeLimit(10*1024*1024), // 请求大小限制
setupAuditLog(), // 审计日志
),
a2a.WithIPWhitelist([]string{"10.0.0.0/8"}), // IP 白名单
)
Q:可以限制调用频率吗?
必须限制,生产环境至少配置三层限流:
server, err := a2a.NewServer(ctx,
a2a.WithRateLimit(100, time.Minute), // 全局:每分钟 100 请求
a2a.WithClientRateLimit(10, time.Minute), // 每客户端:每分钟 10 请求
a2a.WithBurstLimit(20), // 突发请求上限
)
限流策略选择:
| 策略 | 适用场景 | 实现复杂度 |
|---|---|---|
| 固定窗口 | 简单场景 | 低 |
| 滑动窗口 | 需要平滑限流 | 中 |
| 令牌桶 | 允许突发流量 | 中 |
| 漏桶 | 严格匀速 | 中 |
下一步
暴露搞定了,接下来看如何消费外部 Agent。
← A2A 协议介绍 | Consuming →
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
