Event 处理深度解析:Streaming 事件模型、实时捕获与生产级消费架构
深入剖析 ADK Go Streaming 的事件驱动架构,涵盖 Event 类型系统、背压控制、有序消费、错误恢复与高性能实时处理管道设计。
Event 处理深度解析:Streaming 事件模型、实时捕获与生产级消费架构
Streaming 是构建交互式 AI 应用的核心能力。与同步调用等待完整响应不同,Streaming 通过事件流实时传递生成过程中的每个片段,为用户提供即时反馈。本文将深入 ADK Go 的 Streaming 事件模型、消费架构与生产级处理策略。
Streaming 的架构价值
为什么需要 Streaming
| 维度 | 同步调用 | Streaming |
|---|---|---|
| 首字延迟 | 2-10s | <100ms |
| 用户感知 | 等待焦虑 | 即时反馈 |
| 交互性 | 低 | 高(可中断) |
| 资源利用 | 阻塞等待 | 渐进消费 |
| 超时风险 | 高(长文本易超时) | 低 |
用户体验数据:在客服场景中,Streaming 将用户 perceived wait time(感知等待时间)降低了 65%,即使实际总耗时相同。
Streaming 的数据流模型
用户输入
│
▼
┌─────────────┐ Event Stream ┌─────────────┐
│ LLM API │ ────────────────► │ Consumer │
│ (生成中) │ TextDelta │ (事件处理器) │
│ │ ToolCall │ │
│ │ ToolResult │ │
│ │ Done │ │
└─────────────┘ └──────┬───────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ UI 渲染 │ │ 日志记录 │ │ 状态更新 │
└──────────┘ └──────────┘ └──────────┘
Event 类型系统深度解析
核心 Event 类型
// Event 基础接口
type Event interface {
Type() EventType
Timestamp() time.Time
Sequence() int64
Metadata() map[string]interface{}
}
// Event 类型枚举
type EventType int
const (
EventTypeTextDelta EventType = iota // 文本增量
EventTypeToolCall // 工具调用开始
EventTypeToolResult // 工具调用结果
EventTypeFunctionCall // 函数调用(兼容旧版)
EventTypeFunctionResult // 函数结果(兼容旧版)
EventTypeThinking // 思考过程(CoT)
EventTypeError // 错误事件
EventTypeDone // 完成事件
EventTypeCancelled // 取消事件
EventTypeHeartbeat // 心跳事件(保活)
)
// 文本增量事件
type TextDeltaEvent struct {
BaseEvent
Text string // 增量文本
Index int // 在整体输出中的位置
IsFinal bool // 是否为最后一片段
}
// 工具调用事件
type ToolCallEvent struct {
BaseEvent
ToolName string // 工具名称
ToolID string // 工具调用 ID
Args map[string]interface{} // 调用参数
RawArgs string // 原始参数 JSON
}
// 工具结果事件
type ToolResultEvent struct {
BaseEvent
ToolID string // 对应 ToolCall 的 ID
Result interface{} // 结果数据
Error error // 错误(如果有)
Duration time.Duration // 执行耗时
}
// 思考事件(Chain of Thought)
type ThinkingEvent struct {
BaseEvent
Content string // 思考内容
Step int // 思考步骤
}
// 完成事件
type DoneEvent struct {
BaseEvent
FinalText string // 最终完整文本
TokenUsage int // Token 使用量
FinishReason string // 完成原因
}
// 基础事件实现
type BaseEvent struct {
eventType EventType
timestamp time.Time
sequence int64
metadata map[string]interface{}
}
func (e *BaseEvent) Type() EventType { return e.eventType }
func (e *BaseEvent) Timestamp() time.Time { return e.timestamp }
func (e *BaseEvent) Sequence() int64 { return e.sequence }
func (e *BaseEvent) Metadata() map[string]interface{} { return e.metadata }
Event 序列化协议
// Event 序列化器
type EventSerializer interface {
Serialize(event Event) ([]byte, error)
Deserialize(data []byte) (Event, error)
}
// JSON 序列化(默认)
type JSONSerializer struct{}
func (s *JSONSerializer) Serialize(event Event) ([]byte, error) {
wrapper := &EventWrapper{
Type: event.Type().String(),
Timestamp: event.Timestamp().UnixMilli(),
Sequence: event.Sequence(),
Metadata: event.Metadata(),
}
switch e := event.(type) {
case *TextDeltaEvent:
wrapper.Payload = map[string]interface{}{
"text": e.Text,
"index": e.Index,
"is_final": e.IsFinal,
}
case *ToolCallEvent:
wrapper.Payload = map[string]interface{}{
"tool_name": e.ToolName,
"tool_id": e.ToolID,
"args": e.Args,
}
// ... 其他类型
}
return json.Marshal(wrapper)
}
// Protocol Buffers 序列化(高性能场景)
type PBSerializer struct{}
func (s *PBSerializer) Serialize(event Event) ([]byte, error) {
pb := &pb.Event{
Type: pb.EventType(event.Type()),
Timestamp: event.Timestamp().UnixNano(),
Sequence: event.Sequence(),
}
switch e := event.(type) {
case *TextDeltaEvent:
pb.Payload = &pb.Event_TextDelta{
TextDelta: &pb.TextDeltaPayload{
Text: e.Text,
Index: int32(e.Index),
IsFinal: e.IsFinal,
},
}
// ... 其他类型
}
return proto.Marshal(pb)
}
生产级 Event 消费架构
消费者接口设计
// Event 消费者接口
type EventConsumer interface {
Consume(ctx context.Context, event Event) error
Close() error
}
// 组合消费者(责任链模式)
type ConsumerChain struct {
consumers []EventConsumer
}
func (c *ConsumerChain) Consume(ctx context.Context, event Event) error {
for _, consumer := range c.consumers {
if err := consumer.Consume(ctx, event); err != nil {
// 根据策略决定是否继续
if c.isFatal(err) {
return err
}
log.Printf("Consumer error: %v", err)
}
}
return nil
}
// 过滤消费者
type FilteringConsumer struct {
filter EventFilter
consumer EventConsumer
}
func (c *FilteringConsumer) Consume(ctx context.Context, event Event) error {
if !c.filter(event) {
return nil
}
return c.consumer.Consume(ctx, event)
}
type EventFilter func(Event) bool
func TextOnlyFilter(event Event) bool {
return event.Type() == EventTypeTextDelta
}
func NonHeartbeatFilter(event Event) bool {
return event.Type() != EventTypeHeartbeat
}
缓冲与背压控制
// 带背压控制的消费者
type BackpressureConsumer struct {
consumer EventConsumer
buffer chan Event
bufferSize int
dropStrategy DropStrategy
metrics *ConsumerMetrics
}
type DropStrategy int
const (
DropStrategyReject DropStrategy = iota // 拒绝新事件
DropStrategyOldest // 丢弃最旧事件
DropStrategyLatest // 丢弃最新事件
DropStrategySample // 采样丢弃
)
func (c *BackpressureConsumer) Consume(ctx context.Context, event Event) error {
select {
case c.buffer <- event:
c.metrics.BufferSize.Set(float64(len(c.buffer)))
return nil
default:
// 缓冲区满,应用丢弃策略
c.metrics.DropCounter.Inc()
switch c.dropStrategy {
case DropStrategyReject:
return fmt.Errorf("buffer full, event rejected")
case DropStrategyOldest:
select {
case <-c.buffer: // 丢弃最旧
c.buffer <- event
return nil
default:
return fmt.Errorf("buffer empty after drop attempt")
}
case DropStrategyLatest:
return nil // 直接丢弃当前事件
case DropStrategySample:
if rand.Float32() < 0.5 {
return nil
}
// 50% 概率不丢弃,尝试阻塞写入
select {
case c.buffer <- event:
return nil
case <-ctx.Done():
return ctx.Err()
}
default:
return fmt.Errorf("unknown drop strategy")
}
}
}
func (c *BackpressureConsumer) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case event := <-c.buffer:
start := time.Now()
if err := c.consumer.Consume(ctx, event); err != nil {
log.Printf("Consumer error: %v", err)
}
c.metrics.ProcessingLatency.Observe(time.Since(start).Seconds())
}
}
}
有序消费保障
// 序列号排序消费者
type OrderedConsumer struct {
consumer EventConsumer
buffer map[int64]Event
nextSeq int64
maxBuffer int
timeout time.Duration
mu sync.Mutex
}
func (c *OrderedConsumer) Consume(ctx context.Context, event Event) error {
c.mu.Lock()
defer c.mu.Unlock()
seq := event.Sequence()
if seq < c.nextSeq {
// 重复或过期事件,丢弃
return nil
}
if seq == c.nextSeq {
// 正好是需要的事件,直接消费
if err := c.consumeWithTimeout(ctx, event); err != nil {
return err
}
c.nextSeq++
// 消费缓冲中的连续事件
for {
if nextEvent, ok := c.buffer[c.nextSeq]; ok {
if err := c.consumeWithTimeout(ctx, nextEvent); err != nil {
return err
}
delete(c.buffer, c.nextSeq)
c.nextSeq++
} else {
break
}
}
return nil
}
// 乱序事件,放入缓冲
if len(c.buffer) >= c.maxBuffer {
// 缓冲区满,强制消费最旧的事件
oldestSeq := c.findOldestSequence()
if oldestSeq != -1 {
if err := c.consumeWithTimeout(ctx, c.buffer[oldestSeq]); err != nil {
return err
}
delete(c.buffer, oldestSeq)
}
}
c.buffer[seq] = event
return nil
}
func (c *OrderedConsumer) consumeWithTimeout(ctx context.Context, event Event) error {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
done := make(chan error, 1)
go func() {
done <- c.consumer.Consume(ctx, event)
}()
select {
case err := <-done:
return err
case <-ctx.Done():
return fmt.Errorf("consume timeout for event %d", event.Sequence())
}
}
实战:Web 实时聊天系统
完整后端实现
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
"github.com/google/adk-go/agent"
"github.com/google/adk-go/event"
)
// ChatServer 实时聊天服务器
type ChatServer struct {
agent *agent.Agent
consumers []event.EventConsumer
}
func NewChatServer(a *agent.Agent) *ChatServer {
return &ChatServer{
agent: a,
consumers: []event.EventConsumer{
// 日志消费者
&LoggingConsumer{},
// 指标消费者
&MetricsConsumer{},
},
}
}
func (s *ChatServer) handleChat(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// 解析用户输入
var req ChatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// 设置 SSE 头部
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // 禁用 Nginx 缓冲
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
// 创建事件通道
eventChan := make(chan event.Event, 100)
// 启动 Agent Streaming
go func() {
defer close(eventChan)
stream, err := s.agent.RunStream(ctx, req.Message)
if err != nil {
eventChan <- &event.ErrorEvent{
BaseEvent: event.BaseEvent{eventType: event.EventTypeError},
Error: err,
}
return
}
defer stream.Close()
for {
ev, err := stream.Recv()
if err == io.EOF {
eventChan <- &event.DoneEvent{
BaseEvent: event.BaseEvent{eventType: event.EventTypeDone},
}
return
}
if err != nil {
eventChan <- &event.ErrorEvent{
BaseEvent: event.BaseEvent{eventType: event.EventTypeError},
Error: err,
}
return
}
eventChan <- ev
}
}()
// 消费事件并发送 SSE
for ev := range eventChan {
// 先发送给消费者链
for _, consumer := range s.consumers {
if err := consumer.Consume(ctx, ev); err != nil {
log.Printf("Consumer error: %v", err)
}
}
// 序列化事件
data, err := s.serializeEvent(ev)
if err != nil {
log.Printf("Serialize error: %v", err)
continue
}
// 发送 SSE 事件
switch ev.Type() {
case event.EventTypeTextDelta:
textEvent := ev.(*event.TextDeltaEvent)
fmt.Fprintf(w, "event: text_delta\n")
fmt.Fprintf(w, "data: %s\n\n", jsonEscape(textEvent.Text))
case event.EventTypeToolCall:
toolEvent := ev.(*event.ToolCallEvent)
fmt.Fprintf(w, "event: tool_call\n")
fmt.Fprintf(w, "data: {\"tool\":\"%s\",\"args\":%s}\n\n",
toolEvent.ToolName, toolEvent.RawArgs)
case event.EventTypeToolResult:
resultEvent := ev.(*event.ToolResultEvent)
resultJSON, _ := json.Marshal(resultEvent.Result)
fmt.Fprintf(w, "event: tool_result\n")
fmt.Fprintf(w, "data: {\"tool_id\":\"%s\",\"result\":%s}\n\n",
resultEvent.ToolID, resultJSON)
case event.EventTypeDone:
doneEvent := ev.(*event.DoneEvent)
fmt.Fprintf(w, "event: done\n")
fmt.Fprintf(w, "data: {\"tokens\":%d,\"reason\":\"%s\"}\n\n",
doneEvent.TokenUsage, doneEvent.FinishReason)
flusher.Flush()
return
case event.EventTypeError:
errEvent := ev.(*event.ErrorEvent)
fmt.Fprintf(w, "event: error\n")
fmt.Fprintf(w, "data: {\"error\":\"%s\"}\n\n", errEvent.Error.Error())
flusher.Flush()
return
}
flusher.Flush()
}
}
func (s *ChatServer) serializeEvent(ev event.Event) ([]byte, error) {
// 使用统一的序列化格式
wrapper := map[string]interface{}{
"type": ev.Type().String(),
"timestamp": ev.Timestamp().UnixMilli(),
"sequence": ev.Sequence(),
}
switch e := ev.(type) {
case *event.TextDeltaEvent:
wrapper["text"] = e.Text
wrapper["index"] = e.Index
case *event.ToolCallEvent:
wrapper["tool_name"] = e.ToolName
wrapper["tool_id"] = e.ToolID
wrapper["args"] = e.Args
case *event.DoneEvent:
wrapper["tokens"] = e.TokenUsage
wrapper["finish_reason"] = e.FinishReason
}
return json.Marshal(wrapper)
}
func jsonEscape(s string) string {
b, _ := json.Marshal(s)
return string(b)
}
type ChatRequest struct {
Message string `json:"message"`
SessionID string `json:"session_id,omitempty"`
}
// LoggingConsumer 日志消费者
type LoggingConsumer struct{}
func (c *LoggingConsumer) Consume(ctx context.Context, ev event.Event) error {
switch ev.Type() {
case event.EventTypeTextDelta:
textEvent := ev.(*event.TextDeltaEvent)
log.Printf("[TextDelta] seq=%d len=%d", ev.Sequence(), len(textEvent.Text))
case event.EventTypeToolCall:
toolEvent := ev.(*event.ToolCallEvent)
log.Printf("[ToolCall] seq=%d tool=%s", ev.Sequence(), toolEvent.ToolName)
case event.EventTypeDone:
doneEvent := ev.(*event.DoneEvent)
log.Printf("[Done] seq=%d tokens=%d", ev.Sequence(), doneEvent.TokenUsage)
}
return nil
}
func (c *LoggingConsumer) Close() error { return nil }
// MetricsConsumer 指标消费者
type MetricsConsumer struct {
textTokens int
toolCalls int
startTime time.Time
}
func (c *MetricsConsumer) Consume(ctx context.Context, ev event.Event) error {
if c.startTime.IsZero() {
c.startTime = time.Now()
}
switch ev.Type() {
case event.EventTypeTextDelta:
c.textTokens += len(ev.(*event.TextDeltaEvent).Text)
case event.EventTypeToolCall:
c.toolCalls++
case event.EventTypeDone:
latency := time.Since(c.startTime)
log.Printf("[Metrics] total_latency=%v text_chars=%d tool_calls=%d",
latency, c.textTokens, c.toolCalls)
}
return nil
}
func (c *MetricsConsumer) Close() error { return nil }
func main() {
// 创建 Agent
a, err := agent.New(agent.Config{
Name: "chat-agent",
Model: model,
Instruction: "你是一个 helpful 的助手。",
})
if err != nil {
log.Fatal(err)
}
server := NewChatServer(a)
http.HandleFunc("/chat", server.handleChat)
log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
错误恢复与容错
断线重连机制
// 可恢复的事件流
type ResumableStream struct {
agent *agent.Agent
lastSeq int64
input string
reconnects int
maxReconnects int
backoff time.Duration
}
func (s *ResumableStream) Start(ctx context.Context, handler EventHandler) error {
for {
stream, err := s.agent.RunStream(ctx, s.input,
agent.WithResumeFrom(s.lastSeq))
if err != nil {
if s.reconnects >= s.maxReconnects {
return fmt.Errorf("max reconnects exceeded: %w", err)
}
// 退避重试
time.Sleep(s.backoff * time.Duration(1<<s.reconnects))
s.reconnects++
continue
}
s.reconnects = 0
for {
ev, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
// 连接断开,尝试重连
break
}
s.lastSeq = ev.Sequence()
if err := handler.Handle(ctx, ev); err != nil {
return err
}
}
}
}
性能优化
批量处理
// 批量事件消费者
type BatchingConsumer struct {
consumer EventConsumer
batchSize int
flushInterval time.Duration
buffer []Event
mu sync.Mutex
}
func (c *BatchingConsumer) Consume(ctx context.Context, event Event) error {
c.mu.Lock()
c.buffer = append(c.buffer, event)
if len(c.buffer) >= c.batchSize {
batch := c.buffer
c.buffer = nil
c.mu.Unlock()
return c.flush(ctx, batch)
}
c.mu.Unlock()
return nil
}
func (c *BatchingConsumer) Start(ctx context.Context) {
ticker := time.NewTicker(c.flushInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.mu.Lock()
if len(c.buffer) > 0 {
batch := c.buffer
c.buffer = nil
c.mu.Unlock()
c.flush(ctx, batch)
} else {
c.mu.Unlock()
}
}
}
}
func (c *BatchingConsumer) flush(ctx context.Context, batch []Event) error {
// 批量处理逻辑
for _, event := range batch {
if err := c.consumer.Consume(ctx, event); err != nil {
return err
}
}
return nil
}
常见问题深度解析
Q:Event 丢失怎么办?
A:三层保障:
- 客户端确认:重要 Event 要求客户端发送 ACK
- 服务端缓冲:服务端维护发送缓冲区,未确认事件重发
- 持久化:关键 Event 写入消息队列(如 Kafka),确保至少一次投递
Q:如何处理高并发下的 Streaming?
A:
- 连接池:复用与 LLM API 的连接
- 限流:基于令牌桶限制并发 Stream 数量
- 降级:高负载时关闭非关键消费者(如日志)
- 水平扩展:使用负载均衡分发请求到多个实例
Q:Streaming 的 Token 消耗是否更高?
A:Token 消耗相同,但 Streaming 有额外开销:
- 网络开销:每个 Event 都有 HTTP 帧头开销
- 序列化开销:每个 Event 独立序列化
- 处理开销:消费者链的处理延迟
优化:合并小 TextDelta(<10 字符)为一批发送。
下一步
Event 处理的实时捕获与消费架构已深入掌握。接下来探索多模态 Streaming——音频、图像、视频的事件处理与流式传输。
← Streaming 原理 | 多模态 Streaming →
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
