Exposing:暴露 Go Agent 给外部调用

将 Go Agent 通过 A2A 协议暴露给外部系统,是构建分布式 Agent 网络的第一步。但"暴露"不等于"开放"——生产环境中,暴露 Agent 需要考虑认证授权、访问控制、限流熔断、服务发现、版本管理等一系列问题。

本文将深入讲解如何安全、可靠、可扩展地暴露 Go Agent,从基础配置到生产级加固。


基础暴露:从最小可用到生产就绪

最小暴露配置

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    "google.golang.org/adk/a2a"
    "google.golang.org/adk/agent"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // 创建 Agent
    weatherAgent, err := agent.New(agent.Config{
        Name:        "weather-agent",
        Model:       model,
        Instruction: "你是一个天气查询专家,提供准确的天气信息。",
    })
    if err != nil {
        log.Fatalf("failed to create agent: %v", err)
    }
    
    // 创建 A2A 服务器
    server, err := a2a.NewServer(ctx,
        a2a.WithAgent(weatherAgent),
        a2a.WithPort(8080),
        a2a.WithAgentCard(a2a.AgentCard{
            Name:        "weather-agent",
            Version:     "1.0.0",
            Description: "天气查询专家,支持全球城市",
            URL:         "http://localhost:8080/a2a",
            Capabilities: a2a.Capabilities{
                Streaming:         true,
                PushNotifications: false,
            },
            Skills: []a2a.Skill{
                {
                    ID:          "current-weather",
                    Name:        "当前天气查询",
                    Description: "查询指定城市的当前天气状况",
                    InputModes:  []string{"text"},
                    OutputModes: []string{"text"},
                },
                {
                    ID:          "forecast",
                    Name:        "天气预报",
                    Description: "查询未来 7 天天气预报",
                    InputModes:  []string{"text"},
                    OutputModes: []string{"text", "file"},
                },
            },
        }),
    )
    if err != nil {
        log.Fatalf("failed to create server: %v", err)
    }
    
    // 优雅关闭处理
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    
    go func() {
        <-sigCh
        log.Println("shutting down server...")
        
        shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
        defer shutdownCancel()
        
        if err := server.Shutdown(shutdownCtx); err != nil {
            log.Printf("shutdown error: %v", err)
        }
        cancel()
    }()
    
    log.Println("A2A server starting on :8080")
    if err := server.Serve(); err != nil && err != http.ErrServerClosed {
        log.Fatalf("server error: %v", err)
    }
}

暴露的端点

A2A 服务器自动暴露以下标准端点:

端点方法说明
/.well-known/agent.jsonGETAgent Card(能力描述)
/a2aPOSTA2A 协议主入口
/a2a/tasksPOST创建任务
/a2a/tasks/{id}GET获取任务状态
/a2a/tasks/{id}/cancelPOST取消任务
/healthGET健康检查

服务注册:让 Agent 被发现

静态注册

在小型系统中,可以手动维护 Agent 注册表:

// registry.go
type StaticRegistry struct {
    agents map[string]*a2a.AgentCard
    mu     sync.RWMutex
}

func NewStaticRegistry() *StaticRegistry {
    return &StaticRegistry{
        agents: make(map[string]*a2a.AgentCard),
    }
}

func (r *StaticRegistry) Register(card *a2a.AgentCard) error {
    r.mu.Lock()
    defer r.mu.Unlock()
    
    if _, exists := r.agents[card.Name]; exists {
        return fmt.Errorf("agent %s already registered", card.Name)
    }
    
    // 验证 Agent Card
    if err := validateAgentCard(card); err != nil {
        return fmt.Errorf("invalid agent card: %w", err)
    }
    
    r.agents[card.Name] = card
    log.Printf("agent registered: %s v%s at %s", card.Name, card.Version, card.URL)
    return nil
}

func (r *StaticRegistry) Discover(skill string) ([]*a2a.AgentCard, error) {
    r.mu.RLock()
    defer r.mu.RUnlock()
    
    var result []*a2a.AgentCard
    for _, card := range r.agents {
        for _, s := range card.Skills {
            if s.ID == skill || s.Name == skill {
                result = append(result, card)
                break
            }
        }
    }
    
    return result, nil
}

func (r *StaticRegistry) Get(name string) (*a2a.AgentCard, error) {
    r.mu.RLock()
    defer r.mu.RUnlock()
    
    card, exists := r.agents[name]
    if !exists {
        return nil, fmt.Errorf("agent %s not found", name)
    }
    
    return card, nil
}

func validateAgentCard(card *a2a.AgentCard) error {
    if card.Name == "" {
        return fmt.Errorf("name is required")
    }
    if card.URL == "" {
        return fmt.Errorf("URL is required")
    }
    if len(card.Skills) == 0 {
        return fmt.Errorf("at least one skill is required")
    }
    for i, skill := range card.Skills {
        if skill.ID == "" {
            return fmt.Errorf("skill[%d].id is required", i)
        }
        if skill.Name == "" {
            return fmt.Errorf("skill[%d].name is required", i)
        }
    }
    return nil
}

动态服务发现(Consul)

在大型分布式系统中,使用 Consul 或 etcd 进行动态服务发现:

type ConsulRegistry struct {
    client *api.Client
    prefix string
}

func NewConsulRegistry(addr string) (*ConsulRegistry, error) {
    config := api.DefaultConfig()
    config.Address = addr
    
    client, err := api.NewClient(config)
    if err != nil {
        return nil, err
    }
    
    return &ConsulRegistry{
        client: client,
        prefix: "a2a/agents",
    }, nil
}

func (r *ConsulRegistry) Register(ctx context.Context, card *a2a.AgentCard) error {
    // 序列化 Agent Card
    cardData, err := json.Marshal(card)
    if err != nil {
        return err
    }
    
    // 注册服务
    service := &api.AgentServiceRegistration{
        ID:      card.Name,
        Name:    "a2a-agent",
        Tags:    extractSkillTags(card.Skills),
        Port:    extractPort(card.URL),
        Address: extractHost(card.URL),
        Meta: map[string]string{
            "version":     card.Version,
            "agent_card":  string(cardData),
        },
        Check: &api.AgentServiceCheck{
            HTTP:     fmt.Sprintf("%s/health", card.URL),
            Interval: "10s",
            Timeout:  "5s",
            DeregisterCriticalServiceAfter: "1m",
        },
    }
    
    if err := r.client.Agent().ServiceRegister(service); err != nil {
        return fmt.Errorf("failed to register service: %w", err)
    }
    
    // 存储 Agent Card 到 KV
    key := fmt.Sprintf("%s/%s", r.prefix, card.Name)
    _, err = r.client.KV().Put(&api.KVPair{
        Key:   key,
        Value: cardData,
    }, nil)
    
    return err
}

func (r *ConsulRegistry) Discover(ctx context.Context, skill string) ([]*a2a.AgentCard, error) {
    // 查询健康的服务
    services, _, err := r.client.Health().Service("a2a-agent", skill, true, nil)
    if err != nil {
        return nil, err
    }
    
    var agents []*a2a.AgentCard
    for _, svc := range services {
        cardData := svc.Service.Meta["agent_card"]
        if cardData == "" {
            continue
        }
        
        var card a2a.AgentCard
        if err := json.Unmarshal([]byte(cardData), &card); err != nil {
            log.Printf("failed to unmarshal agent card for %s: %v", svc.Service.ID, err)
            continue
        }
        
        agents = append(agents, &card)
    }
    
    return agents, nil
}

func (r *ConsulRegistry) Watch(ctx context.Context, callback func([]*a2a.AgentCard)) error {
    // 使用 Consul Watch 实时更新
    plan, err := watch.Parse(map[string]interface{}{
        "type":    "service",
        "service": "a2a-agent",
    })
    if err != nil {
        return err
    }
    
    plan.Handler = func(idx uint64, raw interface{}) {
        if raw == nil {
            return
        }
        
        services, ok := raw.([]*api.ServiceEntry)
        if !ok {
            return
        }
        
        var agents []*a2a.AgentCard
        for _, svc := range services {
            cardData := svc.Service.Meta["agent_card"]
            if cardData == "" {
                continue
            }
            
            var card a2a.AgentCard
            if err := json.Unmarshal([]byte(cardData), &card); err != nil {
                continue
            }
            agents = append(agents, &card)
        }
        
        callback(agents)
    }
    
    return plan.RunWithClientAndLogger(r.client, nil)
}

认证配置:多层防御体系

API Key 认证(基础)

// 生产级 API Key 认证
func setupAPIKeyAuth() a2a.AuthMiddleware {
    // 从环境变量或 Secret Manager 加载有效密钥
    validKeys := loadValidAPIKeys()
    
    return a2a.APIKeyAuth{
        Header: "X-API-Key",
        Validator: func(key string) (string, error) {
            // 1. 检查密钥是否存在
            clientID, exists := validKeys[key]
            if !exists {
                return "", fmt.Errorf("invalid API key")
            }
            
            // 2. 检查密钥是否过期
            if isKeyExpired(key) {
                return "", fmt.Errorf("API key expired")
            }
            
            // 3. 记录使用(用于审计和限流)
            recordKeyUsage(key)
            
            return clientID, nil
        },
    }
}

func loadValidAPIKeys() map[string]string {
    keys := make(map[string]string)
    
    // 从环境变量加载(开发环境)
    if envKeys := os.Getenv("A2A_API_KEYS"); envKeys != "" {
        for _, pair := range strings.Split(envKeys, ",") {
            parts := strings.SplitN(pair, ":", 2)
            if len(parts) == 2 {
                keys[parts[0]] = parts[1]
            }
        }
    }
    
    // 生产环境:从 Secret Manager 加载
    if metadata.OnGCE() {
        ctx := context.Background()
        client, err := secretmanager.NewClient(ctx)
        if err == nil {
            defer client.Close()
            
            req := &secretmanagerpb.AccessSecretVersionRequest{
                Name: "projects/my-project/secrets/a2a-api-keys/versions/latest",
            }
            result, err := client.AccessSecretVersion(ctx, req)
            if err == nil {
                var secretKeys map[string]string
                if err := json.Unmarshal(result.Payload.Data, &secretKeys); err == nil {
                    for k, v := range secretKeys {
                        keys[k] = v
                    }
                }
            }
        }
    }
    
    return keys
}

OAuth 2.0 / JWT 认证(企业级)

import (
    "github.com/golang-jwt/jwt/v5"
)

type JWTAuth struct {
    publicKey *rsa.PublicKey
    issuer    string
    audience  string
}

func NewJWTAuth(publicKeyPEM []byte, issuer, audience string) (*JWTAuth, error) {
    block, _ := pem.Decode(publicKeyPEM)
    if block == nil {
        return nil, fmt.Errorf("failed to decode PEM block")
    }
    
    pub, err := x509.ParsePKIXPublicKey(block.Bytes)
    if err != nil {
        return nil, err
    }
    
    rsaPub, ok := pub.(*rsa.PublicKey)
    if !ok {
        return nil, fmt.Errorf("not an RSA public key")
    }
    
    return &JWTAuth{
        publicKey: rsaPub,
        issuer:    issuer,
        audience:  audience,
    }, nil
}

func (a *JWTAuth) Validate(tokenString string) (*jwt.MapClaims, error) {
    token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
        if _, ok := token.Method.(*jwt.SigningMethodRSA); !ok {
            return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
        }
        return a.publicKey, nil
    },
        jwt.WithIssuer(a.issuer),
        jwt.WithAudience(a.audience),
        jwt.WithValidMethods([]string{"RS256"}),
    )
    if err != nil {
        return nil, fmt.Errorf("invalid token: %w", err)
    }
    
    if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
        return &claims, nil
    }
    
    return nil, fmt.Errorf("invalid claims")
}

// 在 A2A 服务器中使用
func setupOAuthAuth() a2a.AuthMiddleware {
    publicKeyPEM := []byte(`-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA...
-----END PUBLIC KEY-----`)
    
    jwtAuth, err := NewJWTAuth(publicKeyPEM, "auth.example.com", "a2a-agent")
    if err != nil {
        log.Fatal(err)
    }
    
    return a2a.BearerAuth{
        Validator: func(token string) (string, error) {
            claims, err := jwtAuth.Validate(token)
            if err != nil {
                return "", err
            }
            
            // 提取用户/客户端 ID
            sub, ok := (*claims)["sub"].(string)
            if !ok {
                return "", fmt.Errorf("missing sub claim")
            }
            
            // 检查权限
            scopes, ok := (*claims)["scope"].(string)
            if !ok || !strings.Contains(scopes, "a2a:invoke") {
                return "", fmt.Errorf("insufficient scope")
            }
            
            return sub, nil
        },
    }
}

mTLS(双向 TLS)认证(最高安全)

func setupMTLS() (*tls.Config, error) {
    // 加载服务器证书
    cert, err := tls.LoadX509KeyPair("server.crt", "server.key")
    if err != nil {
        return nil, err
    }
    
    // 加载 CA 证书(用于验证客户端)
    caCert, err := os.ReadFile("ca.crt")
    if err != nil {
        return nil, err
    }
    
    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)
    
    return &tls.Config{
        Certificates: []tls.Certificate{cert},
        ClientCAs:    caCertPool,
        ClientAuth:   tls.RequireAndVerifyClientCert,
        MinVersion:   tls.VersionTLS12,
        CipherSuites: []uint16{
            tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
            tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
        },
    }, nil
}

// 在 A2A 服务器中使用
server, err := a2a.NewServer(ctx,
    a2a.WithAgent(myAgent),
    a2a.WithPort(8443),
    a2a.WithTLS(setupMTLS()),
)

限流与防护

分层限流策略

// 1. 全局限流(保护整个服务)
func setupGlobalRateLimit() a2a.Middleware {
    limiter := rate.NewLimiter(rate.Limit(1000), 2000) // 每秒 1000,突发 2000
    
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            if !limiter.Allow() {
                http.Error(w, "service overloaded", http.StatusServiceUnavailable)
                return
            }
            next.ServeHTTP(w, r)
        })
    }
}

// 2. 按客户端限流(防止单个客户端耗尽资源)
type ClientRateLimiter struct {
    limiters sync.Map // map[string]*rate.Limiter
    rps      rate.Limit
    burst    int
}

func NewClientRateLimiter(rps rate.Limit, burst int) *ClientRateLimiter {
    return &ClientRateLimiter{
        rps:   rps,
        burst: burst,
    }
}

func (l *ClientRateLimiter) getLimiter(clientID string) *rate.Limiter {
    limiter, exists := l.limiters.Load(clientID)
    if exists {
        return limiter.(*rate.Limiter)
    }
    
    newLimiter := rate.NewLimiter(l.rps, l.burst)
    actual, loaded := l.limiters.LoadOrStore(clientID, newLimiter)
    if loaded {
        return actual.(*rate.Limiter)
    }
    return newLimiter
}

func (l *ClientRateLimiter) Middleware() a2a.Middleware {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            clientID := r.Context().Value("client_id").(string)
            if clientID == "" {
                http.Error(w, "unauthorized", http.StatusUnauthorized)
                return
            }
            
            limiter := l.getLimiter(clientID)
            if !limiter.Allow() {
                w.Header().Set("X-RateLimit-Limit", fmt.Sprintf("%v", l.rps))
                w.Header().Set("X-RateLimit-Remaining", "0")
                http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
                return
            }
            
            next.ServeHTTP(w, r)
        })
    }
}

// 3. 按 Skill 限流(保护特定能力)
type SkillRateLimiter struct {
    limits map[string]rate.Limit
}

func (l *SkillRateLimiter) Middleware() a2a.Middleware {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            // 从请求中解析 skill
            skill := r.Header.Get("X-A2A-Skill")
            if skill == "" {
                next.ServeHTTP(w, r)
                return
            }
            
            // 检查 skill 限流
            if limit, exists := l.limits[skill]; exists {
                // ... 限流逻辑
            }
            
            next.ServeHTTP(w, r)
        })
    }
}

请求大小限制

func setupRequestSizeLimit(maxSize int64) a2a.Middleware {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            r.Body = http.MaxBytesReader(w, r.Body, maxSize)
            next.ServeHTTP(w, r)
        })
    }
}

server, err := a2a.NewServer(ctx,
    a2a.WithAgent(myAgent),
    a2a.WithMiddleware(
        setupGlobalRateLimit(),
        NewClientRateLimiter(10, 20).Middleware(),  // 每个客户端 10 rps
        setupRequestSizeLimit(10*1024*1024),         // 最大 10MB
    ),
)

版本管理与兼容性

语义化版本控制

type Version struct {
    Major int
    Minor int
    Patch int
}

func (v Version) String() string {
    return fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch)
}

func (v Version) CompatibleWith(other Version) bool {
    // 主版本相同即兼容
    return v.Major == other.Major
}

// Agent Card 中的版本信息
type AgentCard struct {
    Name        string    `json:"name"`
    Version     string    `json:"version"`
    MinVersion  string    `json:"minVersion,omitempty"`  // 最低兼容版本
    Deprecated  bool      `json:"deprecated,omitempty"`
    Deprecation string    `json:"deprecation,omitempty"` // 弃用说明
}

多版本共存

// 同时暴露多个版本
func main() {
    ctx := context.Background()
    
    // v1 Agent
    agentV1, _ := agent.New(agent.Config{
        Name:    "weather-agent",
        Version: "1.0.0",
    })
    
    // v2 Agent(新功能)
    agentV2, _ := agent.New(agent.Config{
        Name:    "weather-agent",
        Version: "2.0.0",
    })
    
    // v1 服务器
    serverV1, _ := a2a.NewServer(ctx,
        a2a.WithAgent(agentV1),
        a2a.WithPort(8081),
        a2a.WithPathPrefix("/v1"),
    )
    
    // v2 服务器
    serverV2, _ := a2a.NewServer(ctx,
        a2a.WithAgent(agentV2),
        a2a.WithPort(8082),
        a2a.WithPathPrefix("/v2"),
    )
    
    // 启动两个版本
    go serverV1.Serve()
    go serverV2.Serve()
    
    // 使用 Nginx 或 API Gateway 做版本路由
}

常见问题深度解析

Q:Agent 暴露到公网安全吗?

绝对不安全,除非做好以下防护

  1. 传输层:强制 HTTPS/TLS 1.2+
  2. 认证层:API Key / OAuth / mTLS 至少选一种
  3. 限流层:防止 DDoS 和资源耗尽
  4. 网络层:使用 VPC、防火墙规则限制访问源
  5. 审计层:记录所有调用日志,便于追溯
// 生产级安全配置
server, err := a2a.NewServer(ctx,
    a2a.WithAgent(myAgent),
    a2a.WithPort(443),
    a2a.WithTLS(tlsConfig),                          // TLS 加密
    a2a.WithAuth(setupOAuthAuth()),                  // OAuth 认证
    a2a.WithMiddleware(
        setupGlobalRateLimit(),                      // 全局限流
        NewClientRateLimiter(10, 20).Middleware(),   // 客户端限流
        setupRequestSizeLimit(10*1024*1024),         // 请求大小限制
        setupAuditLog(),                             // 审计日志
    ),
    a2a.WithIPWhitelist([]string{"10.0.0.0/8"}),     // IP 白名单
)

Q:可以限制调用频率吗?

必须限制,生产环境至少配置三层限流:

server, err := a2a.NewServer(ctx,
    a2a.WithRateLimit(100, time.Minute),  // 全局:每分钟 100 请求
    a2a.WithClientRateLimit(10, time.Minute),  // 每客户端:每分钟 10 请求
    a2a.WithBurstLimit(20),  // 突发请求上限
)

限流策略选择

策略适用场景实现复杂度
固定窗口简单场景
滑动窗口需要平滑限流
令牌桶允许突发流量
漏桶严格匀速

下一步

暴露搞定了,接下来看如何消费外部 Agent。

A2A 协议介绍 | Consuming →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。