Callbacks 与 Plugins:生命周期钩子与扩展机制
Table of Contents
Callbacks 与 Plugins:生命周期钩子与扩展机制
Callbacks 和 Plugins 是 ADK Go 框架中最强大的扩展点。如果说 Tools 扩展了 Agent 的"能力边界",Skills 扩展了 Agent 的"知识深度",那么 Callbacks 和 Plugins 则扩展了 Agent 的"行为控制面"——它们允许开发者在 Agent 执行的每一个关键生命周期节点插入自定义逻辑,从而实现日志记录、安全审计、动态路由、结果后处理等横切关注点(Cross-Cutting Concerns)。
架构视角:为什么需要生命周期钩子
在一个生产级的 Agent 系统中,纯粹的业务逻辑(推理、Tool 调用、输出生成)往往只占整体代码的 30%-40%。其余部分被可观测性、安全性、合规性、性能优化等"基础设施"需求占据。如果将这些需求与业务逻辑紧耦合,会导致:
- 代码膨胀:每个 Agent 都需要重复实现日志、监控、限流等逻辑
- 维护困难:基础设施逻辑的变更需要修改所有 Agent
- 测试复杂:业务逻辑测试被基础设施依赖污染
- 复用低下:无法在不同 Agent 间共享通用的横切逻辑
Callbacks 和 Plugins 通过**面向切面编程(AOP)**的思想解决了这些问题。它们将基础设施逻辑从业务逻辑中剥离,以声明式的方式"编织"到 Agent 的生命周期中。
Callbacks:细粒度的生命周期钩子
Callbacks 是 ADK Go 提供的最轻量级扩展机制。它们以函数形式注册到 Agent 的特定生命周期事件上,适合实现简单、无状态的横切逻辑。
完整的生命周期事件图谱
ADK Go 定义了以下生命周期事件,覆盖了 Agent 执行的完整轨迹:
| 事件 | 触发时机 | 典型用途 |
|---|---|---|
BeforeRun | Agent 开始处理用户输入前 | 输入验证、敏感词过滤、权限检查 |
BeforeModelCall | 向 LLM 发送请求前 | Prompt 修改、上下文注入、成本预估 |
AfterModelCall | 收到 LLM 响应后 | 响应解析、内容审核、结果缓存 |
BeforeToolCall | 执行 Tool 前 | Tool 参数校验、权限校验、日志记录 |
AfterToolCall | Tool 执行完成后 | 结果处理、错误恢复、指标上报 |
OnError | 任何阶段发生错误时 | 错误分类、告警通知、降级处理 |
AfterRun | Agent 完成全部处理后 | 结果格式化、会话持久化、后续动作触发 |
OnStreamChunk | 流式输出收到每个数据块时 | 实时渲染、敏感内容拦截、打字机效果 |
生产级 Callback 配置示例
以下是一个包含完整可观测性、安全性和容错设计的 Callback 配置:
package main
import (
"context"
"fmt"
"log"
"os"
"strings"
"time"
"google.golang.org/adk/agent"
"google.golang.org/adk/callback"
"google.golang.org/adk/llm"
)
func main() {
ctx := context.Background()
model, err := llm.NewGeminiModel(ctx, llm.GeminiConfig{
APIKey: os.Getenv("GOOGLE_API_KEY"),
Model: "gemini-2.0-flash",
})
if err != nil {
log.Fatalf("模型初始化失败: %v", err)
}
agent, err := agent.New(agent.Config{
Name: "production-agent",
Model: model,
Instruction: "你是一个有帮助的助手。",
Callbacks: []callback.Callback{
// 1. 输入安全审计
{
Type: callback.BeforeRun,
Handler: func(ctx context.Context, input *callback.RunInput) error {
// 记录用户输入(注意:生产环境需脱敏处理 PII)
log.Printf("[AUDIT] user=%s session=%s input_len=%d",
input.UserID, input.SessionID, len(input.Text))
// 敏感词过滤
blockedWords := []string{"密码", "密钥", "token", "secret"}
for _, word := range blockedWords {
if strings.Contains(strings.ToLower(input.Text), word) {
// 不阻断,但标记为高风险
input.Metadata["risk_level"] = "high"
input.Metadata["sensitive_words"] = append(
input.Metadata["sensitive_words"].([]string), word)
}
}
// 输入长度限制(防止 Prompt Injection 通过超长输入消耗 Token)
if len(input.Text) > 10000 {
return fmt.Errorf("输入过长: %d 字符,最大允许 10000", len(input.Text))
}
return nil
},
},
// 2. Prompt 增强与成本预估
{
Type: callback.BeforeModelCall,
Handler: func(ctx context.Context, call *callback.ModelCallInput) error {
// 注入当前时间(解决模型知识截止时间问题)
call.Prompt += fmt.Sprintf("\n\n[系统提示] 当前时间: %s", time.Now().Format(time.RFC3339))
// 预估 Token 消耗(粗略估算)
estimatedTokens := len(call.Prompt) * 3 / 2 // 中文字符保守估算
if estimatedTokens > 8000 {
log.Printf("[WARN] 大上下文请求: %d tokens", estimatedTokens)
}
// 记录模型调用指标
metrics.IncrementCounter("model_calls", metrics.Tag{"model", call.ModelName})
return nil
},
},
// 3. 响应后处理与内容审核
{
Type: callback.AfterModelCall,
Handler: func(ctx context.Context, call *callback.ModelCallOutput) error {
// 延迟记录
metrics.RecordHistogram("model_latency_ms", float64(call.Duration.Milliseconds()))
// 输出内容安全检测
if containsPII(call.Response.Text) {
log.Printf("[ALERT] 模型输出包含 PII,已脱敏处理")
call.Response.Text = maskPII(call.Response.Text)
}
// 缓存高频响应
if call.Response.Metadata["cacheable"] == true {
cache.Set(call.CacheKey, call.Response, 5*time.Minute)
}
return nil
},
},
// 4. Tool 调用监控与熔断
{
Type: callback.BeforeToolCall,
Handler: func(ctx context.Context, call *callback.ToolCallInput) error {
// 检查 Tool 熔断状态
if circuitBreaker.IsOpen(call.ToolName) {
return fmt.Errorf("Tool %s 当前不可用(熔断器开启)", call.ToolName)
}
// 记录 Tool 调用
log.Printf("[TOOL] name=%s args=%v", call.ToolName, call.Arguments)
// 参数安全检查
if err := validateToolArgs(call.ToolName, call.Arguments); err != nil {
return fmt.Errorf("Tool 参数验证失败: %w", err)
}
return nil
},
},
{
Type: callback.AfterToolCall,
Handler: func(ctx context.Context, call *callback.ToolCallOutput) error {
duration := time.Since(call.StartTime)
metrics.RecordHistogram("tool_latency_ms", float64(duration.Milliseconds()),
metrics.Tag{"tool", call.ToolName})
if call.Error != nil {
metrics.IncrementCounter("tool_errors", metrics.Tag{"tool", call.ToolName})
// 记录错误,用于熔断器决策
circuitBreaker.RecordFailure(call.ToolName)
} else {
circuitBreaker.RecordSuccess(call.ToolName)
}
return nil
},
},
// 5. 错误处理与降级
{
Type: callback.OnError,
Handler: func(ctx context.Context, err *callback.ErrorEvent) error {
log.Printf("[ERROR] phase=%s error=%v", err.Phase, err.Error)
// 分类错误并触发不同处理策略
switch classifyError(err.Error) {
case ErrorTypeRateLimit:
// 速率限制:触发指数退避重试
err.RetryAfter = calculateBackoff(err.Attempt)
case ErrorTypeModelUnavailable:
// 模型不可用:切换到备用模型
err.FallbackModel = "gemini-1.5-flash"
case ErrorTypeToolFailure:
// Tool 失败:尝试使用缓存结果或返回友好提示
if cached, ok := cache.Get(err.ToolName); ok {
err.RecoveryData = cached
}
}
// 严重错误发送告警
if err.Severity == callback.SeverityCritical {
alert.Send(fmt.Sprintf("Agent 严重错误: %v", err.Error))
}
return nil
},
},
// 6. 流式输出处理
{
Type: callback.OnStreamChunk,
Handler: func(ctx context.Context, chunk *callback.StreamChunk) error {
// 实时敏感词检测(流式场景下尤为重要)
if containsSensitiveContent(chunk.Text) {
chunk.Text = maskSensitiveContent(chunk.Text)
}
// 记录流式指标
metrics.IncrementCounter("stream_chunks")
return nil
},
},
},
})
if err != nil {
log.Fatalf("Agent 创建失败: %v", err)
}
// 使用 Agent...
}
Plugins:全功能的扩展模块
如果说 Callbacks 是"函数级别的钩子",那么 Plugins 就是"模块级别的扩展"。Plugins 可以:
- 注册新的 Callbacks
- 添加新的 Tools
- 修改 Agent 配置
- 访问和修改内部状态
- 实现跨请求的状态共享
Plugin 接口定义
type Plugin interface {
Name() string
Initialize(ctx context.Context, config PluginConfig) error
RegisterCallbacks(registry *callback.Registry) error
RegisterTools(registry *tool.Registry) error
Shutdown(ctx context.Context) error
}
生产级 Plugin 示例:分布式追踪
以下是一个实现 OpenTelemetry 分布式追踪的 Plugin,展示了 Plugin 如何深度集成到 Agent 的各个角落:
package tracing
import (
"context"
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/adk/agent"
"google.golang.org/adk/callback"
"google.golang.org/adk/plugin"
"google.golang.org/adk/tool"
)
// TracingPlugin 为 Agent 提供 OpenTelemetry 分布式追踪能力
type TracingPlugin struct {
tracer trace.Tracer
config TracingConfig
}
type TracingConfig struct {
ServiceName string
ExporterEndpoint string
SamplingRate float64
}
func (p *TracingPlugin) Name() string {
return "opentelemetry-tracing"
}
func (p *TracingPlugin) Initialize(ctx context.Context, config plugin.PluginConfig) error {
p.config = config.(TracingConfig)
// 初始化 OTel TracerProvider
tp, err := initTracerProvider(p.config)
if err != nil {
return fmt.Errorf("初始化 TracerProvider 失败: %w", err)
}
otel.SetTracerProvider(tp)
p.tracer = tp.Tracer(p.config.ServiceName)
return nil
}
func (p *TracingPlugin) RegisterCallbacks(registry *callback.Registry) error {
// 为每个 Agent Run 创建一个 Trace Span
registry.Register(callback.BeforeRun, func(ctx context.Context, input *callback.RunInput) error {
ctx, span := p.tracer.Start(ctx, "agent.run",
trace.WithAttributes(
attribute.String("agent.name", input.AgentName),
attribute.String("session.id", input.SessionID),
attribute.String("user.id", input.UserID),
attribute.Int("input.length", len(input.Text)),
),
)
// 将 span 存入 context,供后续回调使用
input.Context = trace.ContextWithSpan(ctx, span)
return nil
})
registry.Register(callback.AfterRun, func(ctx context.Context, output *callback.RunOutput) error {
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.Int("output.length", len(output.Text)),
attribute.Int("tool.calls", output.ToolCallCount),
attribute.Int("model.calls", output.ModelCallCount),
)
if output.Error != nil {
span.SetStatus(codes.Error, output.Error.Error())
span.RecordError(output.Error)
}
span.End()
return nil
})
// 为每个模型调用创建子 Span
registry.Register(callback.BeforeModelCall, func(ctx context.Context, call *callback.ModelCallInput) error {
ctx, span := p.tracer.Start(ctx, "llm.call",
trace.WithAttributes(
attribute.String("model.name", call.ModelName),
attribute.Int("prompt.tokens", call.PromptTokens),
),
)
call.Context = ctx
return nil
})
registry.Register(callback.AfterModelCall, func(ctx context.Context, call *callback.ModelCallOutput) error {
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.Int("response.tokens", call.ResponseTokens),
attribute.Float64("duration.ms", float64(call.Duration.Milliseconds())),
)
span.End()
return nil
})
// 为每个 Tool 调用创建子 Span
registry.Register(callback.BeforeToolCall, func(ctx context.Context, call *callback.ToolCallInput) error {
ctx, span := p.tracer.Start(ctx, fmt.Sprintf("tool.%s", call.ToolName),
trace.WithAttributes(
attribute.String("tool.name", call.ToolName),
),
)
call.Context = ctx
return nil
})
registry.Register(callback.AfterToolCall, func(ctx context.Context, call *callback.ToolCallOutput) error {
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.Bool("tool.success", call.Error == nil),
attribute.Float64("duration.ms", float64(call.Duration.Milliseconds())),
)
if call.Error != nil {
span.RecordError(call.Error)
}
span.End()
return nil
})
return nil
}
func (p *TracingPlugin) RegisterTools(registry *tool.Registry) error {
// 此 Plugin 不注册新 Tool
return nil
}
func (p *TracingPlugin) Shutdown(ctx context.Context) error {
// 优雅关闭,确保所有 Span 被导出
if tp, ok := otel.GetTracerProvider().(*sdktrace.TracerProvider); ok {
return tp.Shutdown(ctx)
}
return nil
}
使用 Plugin
func main() {
tracingPlugin := &tracing.TracingPlugin{}
agent, err := agent.New(agent.Config{
Name: "traced-agent",
Plugins: []plugin.Plugin{
tracingPlugin,
&logging.Plugin{},
&metrics.Plugin{},
},
})
// 应用退出时确保 Plugin 优雅关闭
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for _, p := range agent.Plugins() {
if err := p.Shutdown(ctx); err != nil {
log.Printf("Plugin %s 关闭失败: %v", p.Name(), err)
}
}
}()
}
Callbacks vs Plugins:如何选择
| 场景 | 推荐机制 | 原因 |
|---|---|---|
| 简单的日志记录 | Callback | 轻量、声明式、无需额外依赖 |
| 输入/输出过滤 | Callback | 逻辑简单、无状态 |
| 跨请求状态共享 | Plugin | 可以维护内部状态 |
| 添加新 Tools | Plugin | 需要访问 Tool Registry |
| 复杂的可观测性集成 | Plugin | 需要初始化外部客户端(OTel、Prometheus 等) |
| 动态配置更新 | Plugin | 可以实现热加载逻辑 |
| A/B 测试框架 | Plugin | 需要维护实验状态和分流逻辑 |
高级模式:Plugin 链与优先级
当多个 Plugin 同时注册时,它们的执行顺序和交互方式需要仔细设计。ADK Go 支持 Plugin 优先级和链式执行模式:
agent, err := agent.New(agent.Config{
Plugins: []plugin.Plugin{
// 高优先级:安全相关 Plugin 最先执行
&security.Plugin{Priority: 100},
// 中优先级:可观测性 Plugin
&tracing.Plugin{Priority: 50},
&metrics.Plugin{Priority: 50},
// 低优先级:业务增强 Plugin
&enhancement.Plugin{Priority: 10},
},
})
Plugin 链的执行遵循洋葱模型(Onion Model):
请求进入
↓
[Security Plugin] BeforeRun
↓
[Tracing Plugin] BeforeRun
↓
[Metrics Plugin] BeforeRun
↓
[Enhancement Plugin] BeforeRun
↓
[核心业务逻辑]
↓
[Enhancement Plugin] AfterRun
↓
[Metrics Plugin] AfterRun
↓
[Tracing Plugin] AfterRun
↓
[Security Plugin] AfterRun
↓
响应返回
这种设计确保了安全策略总是在最外层(最先执行 Before,最后执行 After),而业务增强逻辑在最内层。
生产陷阱与调试技巧
陷阱 1:Callback 中的阻塞操作
Callbacks 在 Agent 的主执行路径上运行,如果在 Callback 中执行阻塞 I/O(如数据库查询、HTTP 请求),会直接增加 Agent 的响应延迟。
解决方案:将非关键操作异步化
{
Type: callback.AfterRun,
Handler: func(ctx context.Context, output *callback.RunOutput) error {
// 异步记录日志,不阻塞主流程
go func() {
auditLog.Write(output)
}()
return nil
},
}
陷阱 2:Callback 链中的错误传播
一个 Callback 返回错误可能导致整个 Agent 执行失败。需要明确错误处理策略:
{
Type: callback.BeforeRun,
Handler: func(ctx context.Context, input *callback.RunInput) error {
err := doSomething()
if err != nil {
// 策略 A:阻断执行
// return err
// 策略 B:记录错误但继续
log.Printf("非致命错误: %v", err)
return nil
// 策略 C:降级处理
input.Metadata["degraded"] = true
return nil
}
return nil
},
}
陷阱 3:Plugin 初始化失败导致 Agent 启动失败
生产环境中,可观测性 Plugin(如 Tracing)的初始化失败不应阻断核心业务。实施优雅降级:
func (p *TracingPlugin) Initialize(ctx context.Context, config plugin.PluginConfig) error {
err := p.doInit(config)
if err != nil {
// 记录错误,但将 Plugin 标记为 noop
log.Printf("Tracing 初始化失败,已降级: %v", err)
p.noop = true
}
return nil // 不返回错误,避免阻断 Agent 启动
}
调试技巧:Callback 执行追踪
当 Agent 行为异常时,定位是哪个 Callback 导致的问题可能很困难。建议实现 Callback 执行追踪:
func tracedCallback(cb callback.Callback) callback.Callback {
return callback.Callback{
Type: cb.Type,
Handler: func(ctx context.Context, event interface{}) error {
start := time.Now()
err := cb.Handler(ctx, event)
duration := time.Since(start)
log.Printf("[CALLBACK] type=%s duration=%v error=%v",
cb.Type, duration, err)
return err
},
}
}
小结
模块 9 完成。我们深入学习了四个进阶主题:
- Grounding:通过搜索增强生成,将 Agent 的回答锚定在真实世界之上,解决了 LLM 知识截止和幻觉问题
- Artifacts:结构化内容生成机制,使 Agent 的输出从自由文本升级为类型化、可机器解析的数据单元
- Skills:预设的专业能力模块,让 Agent 在特定领域具备系统化的思维框架和最佳实践
- Callbacks 与 Plugins:生命周期钩子与扩展机制,实现了横切关注点的优雅分离和深度定制
这些机制共同构成了生产级 Agent 系统的"基础设施层"——它们不直接参与业务推理,但决定了系统的可靠性、可维护性和可扩展性。
接下来进入最后一个模块:实战综合——端到端项目实战。
← Skills for Agents | 端到端项目实战 →
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
