Callbacks 与 Plugins:生命周期钩子与扩展机制

Callbacks 和 Plugins 是 ADK Go 框架中最强大的扩展点。如果说 Tools 扩展了 Agent 的"能力边界",Skills 扩展了 Agent 的"知识深度",那么 Callbacks 和 Plugins 则扩展了 Agent 的"行为控制面"——它们允许开发者在 Agent 执行的每一个关键生命周期节点插入自定义逻辑,从而实现日志记录、安全审计、动态路由、结果后处理等横切关注点(Cross-Cutting Concerns)。

架构视角:为什么需要生命周期钩子

在一个生产级的 Agent 系统中,纯粹的业务逻辑(推理、Tool 调用、输出生成)往往只占整体代码的 30%-40%。其余部分被可观测性、安全性、合规性、性能优化等"基础设施"需求占据。如果将这些需求与业务逻辑紧耦合,会导致:

  1. 代码膨胀:每个 Agent 都需要重复实现日志、监控、限流等逻辑
  2. 维护困难:基础设施逻辑的变更需要修改所有 Agent
  3. 测试复杂:业务逻辑测试被基础设施依赖污染
  4. 复用低下:无法在不同 Agent 间共享通用的横切逻辑

Callbacks 和 Plugins 通过**面向切面编程(AOP)**的思想解决了这些问题。它们将基础设施逻辑从业务逻辑中剥离,以声明式的方式"编织"到 Agent 的生命周期中。

Callbacks:细粒度的生命周期钩子

Callbacks 是 ADK Go 提供的最轻量级扩展机制。它们以函数形式注册到 Agent 的特定生命周期事件上,适合实现简单、无状态的横切逻辑。

完整的生命周期事件图谱

ADK Go 定义了以下生命周期事件,覆盖了 Agent 执行的完整轨迹:

事件触发时机典型用途
BeforeRunAgent 开始处理用户输入前输入验证、敏感词过滤、权限检查
BeforeModelCall向 LLM 发送请求前Prompt 修改、上下文注入、成本预估
AfterModelCall收到 LLM 响应后响应解析、内容审核、结果缓存
BeforeToolCall执行 Tool 前Tool 参数校验、权限校验、日志记录
AfterToolCallTool 执行完成后结果处理、错误恢复、指标上报
OnError任何阶段发生错误时错误分类、告警通知、降级处理
AfterRunAgent 完成全部处理后结果格式化、会话持久化、后续动作触发
OnStreamChunk流式输出收到每个数据块时实时渲染、敏感内容拦截、打字机效果

生产级 Callback 配置示例

以下是一个包含完整可观测性、安全性和容错设计的 Callback 配置:

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "strings"
    "time"

    "google.golang.org/adk/agent"
    "google.golang.org/adk/callback"
    "google.golang.org/adk/llm"
)

func main() {
    ctx := context.Background()

    model, err := llm.NewGeminiModel(ctx, llm.GeminiConfig{
        APIKey: os.Getenv("GOOGLE_API_KEY"),
        Model:  "gemini-2.0-flash",
    })
    if err != nil {
        log.Fatalf("模型初始化失败: %v", err)
    }

    agent, err := agent.New(agent.Config{
        Name:  "production-agent",
        Model: model,
        Instruction: "你是一个有帮助的助手。",
        Callbacks: []callback.Callback{
            // 1. 输入安全审计
            {
                Type: callback.BeforeRun,
                Handler: func(ctx context.Context, input *callback.RunInput) error {
                    // 记录用户输入(注意:生产环境需脱敏处理 PII)
                    log.Printf("[AUDIT] user=%s session=%s input_len=%d",
                        input.UserID, input.SessionID, len(input.Text))

                    // 敏感词过滤
                    blockedWords := []string{"密码", "密钥", "token", "secret"}
                    for _, word := range blockedWords {
                        if strings.Contains(strings.ToLower(input.Text), word) {
                            // 不阻断,但标记为高风险
                            input.Metadata["risk_level"] = "high"
                            input.Metadata["sensitive_words"] = append(
                                input.Metadata["sensitive_words"].([]string), word)
                        }
                    }

                    // 输入长度限制(防止 Prompt Injection 通过超长输入消耗 Token)
                    if len(input.Text) > 10000 {
                        return fmt.Errorf("输入过长: %d 字符,最大允许 10000", len(input.Text))
                    }

                    return nil
                },
            },

            // 2. Prompt 增强与成本预估
            {
                Type: callback.BeforeModelCall,
                Handler: func(ctx context.Context, call *callback.ModelCallInput) error {
                    // 注入当前时间(解决模型知识截止时间问题)
                    call.Prompt += fmt.Sprintf("\n\n[系统提示] 当前时间: %s", time.Now().Format(time.RFC3339))

                    // 预估 Token 消耗(粗略估算)
                    estimatedTokens := len(call.Prompt) * 3 / 2  // 中文字符保守估算
                    if estimatedTokens > 8000 {
                        log.Printf("[WARN] 大上下文请求: %d tokens", estimatedTokens)
                    }

                    // 记录模型调用指标
                    metrics.IncrementCounter("model_calls", metrics.Tag{"model", call.ModelName})

                    return nil
                },
            },

            // 3. 响应后处理与内容审核
            {
                Type: callback.AfterModelCall,
                Handler: func(ctx context.Context, call *callback.ModelCallOutput) error {
                    // 延迟记录
                    metrics.RecordHistogram("model_latency_ms", float64(call.Duration.Milliseconds()))

                    // 输出内容安全检测
                    if containsPII(call.Response.Text) {
                        log.Printf("[ALERT] 模型输出包含 PII,已脱敏处理")
                        call.Response.Text = maskPII(call.Response.Text)
                    }

                    // 缓存高频响应
                    if call.Response.Metadata["cacheable"] == true {
                        cache.Set(call.CacheKey, call.Response, 5*time.Minute)
                    }

                    return nil
                },
            },

            // 4. Tool 调用监控与熔断
            {
                Type: callback.BeforeToolCall,
                Handler: func(ctx context.Context, call *callback.ToolCallInput) error {
                    // 检查 Tool 熔断状态
                    if circuitBreaker.IsOpen(call.ToolName) {
                        return fmt.Errorf("Tool %s 当前不可用(熔断器开启)", call.ToolName)
                    }

                    // 记录 Tool 调用
                    log.Printf("[TOOL] name=%s args=%v", call.ToolName, call.Arguments)

                    // 参数安全检查
                    if err := validateToolArgs(call.ToolName, call.Arguments); err != nil {
                        return fmt.Errorf("Tool 参数验证失败: %w", err)
                    }

                    return nil
                },
            },

            {
                Type: callback.AfterToolCall,
                Handler: func(ctx context.Context, call *callback.ToolCallOutput) error {
                    duration := time.Since(call.StartTime)
                    metrics.RecordHistogram("tool_latency_ms", float64(duration.Milliseconds()),
                        metrics.Tag{"tool", call.ToolName})

                    if call.Error != nil {
                        metrics.IncrementCounter("tool_errors", metrics.Tag{"tool", call.ToolName})
                        // 记录错误,用于熔断器决策
                        circuitBreaker.RecordFailure(call.ToolName)
                    } else {
                        circuitBreaker.RecordSuccess(call.ToolName)
                    }

                    return nil
                },
            },

            // 5. 错误处理与降级
            {
                Type: callback.OnError,
                Handler: func(ctx context.Context, err *callback.ErrorEvent) error {
                    log.Printf("[ERROR] phase=%s error=%v", err.Phase, err.Error)

                    // 分类错误并触发不同处理策略
                    switch classifyError(err.Error) {
                    case ErrorTypeRateLimit:
                        // 速率限制:触发指数退避重试
                        err.RetryAfter = calculateBackoff(err.Attempt)
                    case ErrorTypeModelUnavailable:
                        // 模型不可用:切换到备用模型
                        err.FallbackModel = "gemini-1.5-flash"
                    case ErrorTypeToolFailure:
                        // Tool 失败:尝试使用缓存结果或返回友好提示
                        if cached, ok := cache.Get(err.ToolName); ok {
                            err.RecoveryData = cached
                        }
                    }

                    // 严重错误发送告警
                    if err.Severity == callback.SeverityCritical {
                        alert.Send(fmt.Sprintf("Agent 严重错误: %v", err.Error))
                    }

                    return nil
                },
            },

            // 6. 流式输出处理
            {
                Type: callback.OnStreamChunk,
                Handler: func(ctx context.Context, chunk *callback.StreamChunk) error {
                    // 实时敏感词检测(流式场景下尤为重要)
                    if containsSensitiveContent(chunk.Text) {
                        chunk.Text = maskSensitiveContent(chunk.Text)
                    }

                    // 记录流式指标
                    metrics.IncrementCounter("stream_chunks")

                    return nil
                },
            },
        },
    })
    if err != nil {
        log.Fatalf("Agent 创建失败: %v", err)
    }

    // 使用 Agent...
}

Plugins:全功能的扩展模块

如果说 Callbacks 是"函数级别的钩子",那么 Plugins 就是"模块级别的扩展"。Plugins 可以:

  • 注册新的 Callbacks
  • 添加新的 Tools
  • 修改 Agent 配置
  • 访问和修改内部状态
  • 实现跨请求的状态共享

Plugin 接口定义

type Plugin interface {
    Name() string
    Initialize(ctx context.Context, config PluginConfig) error
    RegisterCallbacks(registry *callback.Registry) error
    RegisterTools(registry *tool.Registry) error
    Shutdown(ctx context.Context) error
}

生产级 Plugin 示例:分布式追踪

以下是一个实现 OpenTelemetry 分布式追踪的 Plugin,展示了 Plugin 如何深度集成到 Agent 的各个角落:

package tracing

import (
    "context"
    "fmt"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/codes"
    "go.opentelemetry.io/otel/trace"
    "google.golang.org/adk/agent"
    "google.golang.org/adk/callback"
    "google.golang.org/adk/plugin"
    "google.golang.org/adk/tool"
)

// TracingPlugin 为 Agent 提供 OpenTelemetry 分布式追踪能力
type TracingPlugin struct {
    tracer trace.Tracer
    config TracingConfig
}

type TracingConfig struct {
    ServiceName    string
    ExporterEndpoint string
    SamplingRate   float64
}

func (p *TracingPlugin) Name() string {
    return "opentelemetry-tracing"
}

func (p *TracingPlugin) Initialize(ctx context.Context, config plugin.PluginConfig) error {
    p.config = config.(TracingConfig)
    
    // 初始化 OTel TracerProvider
    tp, err := initTracerProvider(p.config)
    if err != nil {
        return fmt.Errorf("初始化 TracerProvider 失败: %w", err)
    }
    
    otel.SetTracerProvider(tp)
    p.tracer = tp.Tracer(p.config.ServiceName)
    
    return nil
}

func (p *TracingPlugin) RegisterCallbacks(registry *callback.Registry) error {
    // 为每个 Agent Run 创建一个 Trace Span
    registry.Register(callback.BeforeRun, func(ctx context.Context, input *callback.RunInput) error {
        ctx, span := p.tracer.Start(ctx, "agent.run",
            trace.WithAttributes(
                attribute.String("agent.name", input.AgentName),
                attribute.String("session.id", input.SessionID),
                attribute.String("user.id", input.UserID),
                attribute.Int("input.length", len(input.Text)),
            ),
        )
        // 将 span 存入 context,供后续回调使用
        input.Context = trace.ContextWithSpan(ctx, span)
        return nil
    })

    registry.Register(callback.AfterRun, func(ctx context.Context, output *callback.RunOutput) error {
        span := trace.SpanFromContext(ctx)
        
        span.SetAttributes(
            attribute.Int("output.length", len(output.Text)),
            attribute.Int("tool.calls", output.ToolCallCount),
            attribute.Int("model.calls", output.ModelCallCount),
        )
        
        if output.Error != nil {
            span.SetStatus(codes.Error, output.Error.Error())
            span.RecordError(output.Error)
        }
        
        span.End()
        return nil
    })

    // 为每个模型调用创建子 Span
    registry.Register(callback.BeforeModelCall, func(ctx context.Context, call *callback.ModelCallInput) error {
        ctx, span := p.tracer.Start(ctx, "llm.call",
            trace.WithAttributes(
                attribute.String("model.name", call.ModelName),
                attribute.Int("prompt.tokens", call.PromptTokens),
            ),
        )
        call.Context = ctx
        return nil
    })

    registry.Register(callback.AfterModelCall, func(ctx context.Context, call *callback.ModelCallOutput) error {
        span := trace.SpanFromContext(ctx)
        span.SetAttributes(
            attribute.Int("response.tokens", call.ResponseTokens),
            attribute.Float64("duration.ms", float64(call.Duration.Milliseconds())),
        )
        span.End()
        return nil
    })

    // 为每个 Tool 调用创建子 Span
    registry.Register(callback.BeforeToolCall, func(ctx context.Context, call *callback.ToolCallInput) error {
        ctx, span := p.tracer.Start(ctx, fmt.Sprintf("tool.%s", call.ToolName),
            trace.WithAttributes(
                attribute.String("tool.name", call.ToolName),
            ),
        )
        call.Context = ctx
        return nil
    })

    registry.Register(callback.AfterToolCall, func(ctx context.Context, call *callback.ToolCallOutput) error {
        span := trace.SpanFromContext(ctx)
        span.SetAttributes(
            attribute.Bool("tool.success", call.Error == nil),
            attribute.Float64("duration.ms", float64(call.Duration.Milliseconds())),
        )
        if call.Error != nil {
            span.RecordError(call.Error)
        }
        span.End()
        return nil
    })

    return nil
}

func (p *TracingPlugin) RegisterTools(registry *tool.Registry) error {
    // 此 Plugin 不注册新 Tool
    return nil
}

func (p *TracingPlugin) Shutdown(ctx context.Context) error {
    // 优雅关闭,确保所有 Span 被导出
    if tp, ok := otel.GetTracerProvider().(*sdktrace.TracerProvider); ok {
        return tp.Shutdown(ctx)
    }
    return nil
}

使用 Plugin

func main() {
    tracingPlugin := &tracing.TracingPlugin{}
    
    agent, err := agent.New(agent.Config{
        Name: "traced-agent",
        Plugins: []plugin.Plugin{
            tracingPlugin,
            &logging.Plugin{},
            &metrics.Plugin{},
        },
    })
    
    // 应用退出时确保 Plugin 优雅关闭
    defer func() {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        for _, p := range agent.Plugins() {
            if err := p.Shutdown(ctx); err != nil {
                log.Printf("Plugin %s 关闭失败: %v", p.Name(), err)
            }
        }
    }()
}

Callbacks vs Plugins:如何选择

场景推荐机制原因
简单的日志记录Callback轻量、声明式、无需额外依赖
输入/输出过滤Callback逻辑简单、无状态
跨请求状态共享Plugin可以维护内部状态
添加新 ToolsPlugin需要访问 Tool Registry
复杂的可观测性集成Plugin需要初始化外部客户端(OTel、Prometheus 等)
动态配置更新Plugin可以实现热加载逻辑
A/B 测试框架Plugin需要维护实验状态和分流逻辑

高级模式:Plugin 链与优先级

当多个 Plugin 同时注册时,它们的执行顺序和交互方式需要仔细设计。ADK Go 支持 Plugin 优先级和链式执行模式:

agent, err := agent.New(agent.Config{
    Plugins: []plugin.Plugin{
        // 高优先级:安全相关 Plugin 最先执行
        &security.Plugin{Priority: 100},
        
        // 中优先级:可观测性 Plugin
        &tracing.Plugin{Priority: 50},
        &metrics.Plugin{Priority: 50},
        
        // 低优先级:业务增强 Plugin
        &enhancement.Plugin{Priority: 10},
    },
})

Plugin 链的执行遵循洋葱模型(Onion Model):

请求进入
[Security Plugin] BeforeRun
[Tracing Plugin] BeforeRun
[Metrics Plugin] BeforeRun
[Enhancement Plugin] BeforeRun
[核心业务逻辑]
[Enhancement Plugin] AfterRun
[Metrics Plugin] AfterRun
[Tracing Plugin] AfterRun
[Security Plugin] AfterRun
响应返回

这种设计确保了安全策略总是在最外层(最先执行 Before,最后执行 After),而业务增强逻辑在最内层。

生产陷阱与调试技巧

陷阱 1:Callback 中的阻塞操作

Callbacks 在 Agent 的主执行路径上运行,如果在 Callback 中执行阻塞 I/O(如数据库查询、HTTP 请求),会直接增加 Agent 的响应延迟。

解决方案:将非关键操作异步化

{
    Type: callback.AfterRun,
    Handler: func(ctx context.Context, output *callback.RunOutput) error {
        // 异步记录日志,不阻塞主流程
        go func() {
            auditLog.Write(output)
        }()
        return nil
    },
}

陷阱 2:Callback 链中的错误传播

一个 Callback 返回错误可能导致整个 Agent 执行失败。需要明确错误处理策略:

{
    Type: callback.BeforeRun,
    Handler: func(ctx context.Context, input *callback.RunInput) error {
        err := doSomething()
        if err != nil {
            // 策略 A:阻断执行
            // return err
            
            // 策略 B:记录错误但继续
            log.Printf("非致命错误: %v", err)
            return nil
            
            // 策略 C:降级处理
            input.Metadata["degraded"] = true
            return nil
        }
        return nil
    },
}

陷阱 3:Plugin 初始化失败导致 Agent 启动失败

生产环境中,可观测性 Plugin(如 Tracing)的初始化失败不应阻断核心业务。实施优雅降级:

func (p *TracingPlugin) Initialize(ctx context.Context, config plugin.PluginConfig) error {
    err := p.doInit(config)
    if err != nil {
        // 记录错误,但将 Plugin 标记为 noop
        log.Printf("Tracing 初始化失败,已降级: %v", err)
        p.noop = true
    }
    return nil  // 不返回错误,避免阻断 Agent 启动
}

调试技巧:Callback 执行追踪

当 Agent 行为异常时,定位是哪个 Callback 导致的问题可能很困难。建议实现 Callback 执行追踪:

func tracedCallback(cb callback.Callback) callback.Callback {
    return callback.Callback{
        Type: cb.Type,
        Handler: func(ctx context.Context, event interface{}) error {
            start := time.Now()
            err := cb.Handler(ctx, event)
            duration := time.Since(start)
            
            log.Printf("[CALLBACK] type=%s duration=%v error=%v",
                cb.Type, duration, err)
            
            return err
        },
    }
}

小结

模块 9 完成。我们深入学习了四个进阶主题:

  • Grounding:通过搜索增强生成,将 Agent 的回答锚定在真实世界之上,解决了 LLM 知识截止和幻觉问题
  • Artifacts:结构化内容生成机制,使 Agent 的输出从自由文本升级为类型化、可机器解析的数据单元
  • Skills:预设的专业能力模块,让 Agent 在特定领域具备系统化的思维框架和最佳实践
  • Callbacks 与 Plugins:生命周期钩子与扩展机制,实现了横切关注点的优雅分离和深度定制

这些机制共同构成了生产级 Agent 系统的"基础设施层"——它们不直接参与业务推理,但决定了系统的可靠性、可维护性和可扩展性。

接下来进入最后一个模块:实战综合——端到端项目实战。

Skills for Agents | 端到端项目实战 →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。