Event 处理深度解析:Streaming 事件模型、实时捕获与生产级消费架构

Streaming 是构建交互式 AI 应用的核心能力。与同步调用等待完整响应不同,Streaming 通过事件流实时传递生成过程中的每个片段,为用户提供即时反馈。本文将深入 ADK Go 的 Streaming 事件模型、消费架构与生产级处理策略。

Streaming 的架构价值

为什么需要 Streaming

维度同步调用Streaming
首字延迟2-10s<100ms
用户感知等待焦虑即时反馈
交互性高(可中断)
资源利用阻塞等待渐进消费
超时风险高(长文本易超时)

用户体验数据:在客服场景中,Streaming 将用户 perceived wait time(感知等待时间)降低了 65%,即使实际总耗时相同。

Streaming 的数据流模型

用户输入
┌─────────────┐    Event Stream    ┌─────────────┐
│   LLM API   │ ────────────────► │   Consumer   │
│  (生成中)    │  TextDelta        │  (事件处理器)  │
│             │  ToolCall         │              │
│             │  ToolResult       │              │
│             │  Done             │              │
└─────────────┘                   └──────┬───────┘
                    ┌────────────────────┼────────────────────┐
                    │                    │                    │
                    ▼                    ▼                    ▼
              ┌──────────┐       ┌──────────┐       ┌──────────┐
              │  UI 渲染  │       │ 日志记录  │       │ 状态更新  │
              └──────────┘       └──────────┘       └──────────┘

Event 类型系统深度解析

核心 Event 类型

// Event 基础接口
type Event interface {
    Type() EventType
    Timestamp() time.Time
    Sequence() int64
    Metadata() map[string]interface{}
}

// Event 类型枚举
type EventType int
const (
    EventTypeTextDelta   EventType = iota // 文本增量
    EventTypeToolCall                     // 工具调用开始
    EventTypeToolResult                   // 工具调用结果
    EventTypeFunctionCall                 // 函数调用(兼容旧版)
    EventTypeFunctionResult               // 函数结果(兼容旧版)
    EventTypeThinking                     // 思考过程(CoT)
    EventTypeError                        // 错误事件
    EventTypeDone                         // 完成事件
    EventTypeCancelled                    // 取消事件
    EventTypeHeartbeat                    // 心跳事件(保活)
)

// 文本增量事件
type TextDeltaEvent struct {
    BaseEvent
    Text     string // 增量文本
    Index    int    // 在整体输出中的位置
    IsFinal  bool   // 是否为最后一片段
}

// 工具调用事件
type ToolCallEvent struct {
    BaseEvent
    ToolName string                 // 工具名称
    ToolID   string                 // 工具调用 ID
    Args     map[string]interface{} // 调用参数
    RawArgs  string                 // 原始参数 JSON
}

// 工具结果事件
type ToolResultEvent struct {
    BaseEvent
    ToolID   string      // 对应 ToolCall 的 ID
    Result   interface{} // 结果数据
    Error    error       // 错误(如果有)
    Duration time.Duration // 执行耗时
}

// 思考事件(Chain of Thought)
type ThinkingEvent struct {
    BaseEvent
    Content  string // 思考内容
    Step     int    // 思考步骤
}

// 完成事件
type DoneEvent struct {
    BaseEvent
    FinalText   string // 最终完整文本
    TokenUsage  int    // Token 使用量
    FinishReason string // 完成原因
}

// 基础事件实现
type BaseEvent struct {
    eventType EventType
    timestamp time.Time
    sequence  int64
    metadata  map[string]interface{}
}

func (e *BaseEvent) Type() EventType { return e.eventType }
func (e *BaseEvent) Timestamp() time.Time { return e.timestamp }
func (e *BaseEvent) Sequence() int64 { return e.sequence }
func (e *BaseEvent) Metadata() map[string]interface{} { return e.metadata }

Event 序列化协议

// Event 序列化器
type EventSerializer interface {
    Serialize(event Event) ([]byte, error)
    Deserialize(data []byte) (Event, error)
}

// JSON 序列化(默认)
type JSONSerializer struct{}

func (s *JSONSerializer) Serialize(event Event) ([]byte, error) {
    wrapper := &EventWrapper{
        Type:      event.Type().String(),
        Timestamp: event.Timestamp().UnixMilli(),
        Sequence:  event.Sequence(),
        Metadata:  event.Metadata(),
    }
    
    switch e := event.(type) {
    case *TextDeltaEvent:
        wrapper.Payload = map[string]interface{}{
            "text":    e.Text,
            "index":   e.Index,
            "is_final": e.IsFinal,
        }
    case *ToolCallEvent:
        wrapper.Payload = map[string]interface{}{
            "tool_name": e.ToolName,
            "tool_id":   e.ToolID,
            "args":      e.Args,
        }
    // ... 其他类型
    }
    
    return json.Marshal(wrapper)
}

// Protocol Buffers 序列化(高性能场景)
type PBSerializer struct{}

func (s *PBSerializer) Serialize(event Event) ([]byte, error) {
    pb := &pb.Event{
        Type:      pb.EventType(event.Type()),
        Timestamp: event.Timestamp().UnixNano(),
        Sequence:  event.Sequence(),
    }
    
    switch e := event.(type) {
    case *TextDeltaEvent:
        pb.Payload = &pb.Event_TextDelta{
            TextDelta: &pb.TextDeltaPayload{
                Text:    e.Text,
                Index:   int32(e.Index),
                IsFinal: e.IsFinal,
            },
        }
    // ... 其他类型
    }
    
    return proto.Marshal(pb)
}

生产级 Event 消费架构

消费者接口设计

// Event 消费者接口
type EventConsumer interface {
    Consume(ctx context.Context, event Event) error
    Close() error
}

// 组合消费者(责任链模式)
type ConsumerChain struct {
    consumers []EventConsumer
}

func (c *ConsumerChain) Consume(ctx context.Context, event Event) error {
    for _, consumer := range c.consumers {
        if err := consumer.Consume(ctx, event); err != nil {
            // 根据策略决定是否继续
            if c.isFatal(err) {
                return err
            }
            log.Printf("Consumer error: %v", err)
        }
    }
    return nil
}

// 过滤消费者
type FilteringConsumer struct {
    filter    EventFilter
    consumer  EventConsumer
}

func (c *FilteringConsumer) Consume(ctx context.Context, event Event) error {
    if !c.filter(event) {
        return nil
    }
    return c.consumer.Consume(ctx, event)
}

type EventFilter func(Event) bool

func TextOnlyFilter(event Event) bool {
    return event.Type() == EventTypeTextDelta
}

func NonHeartbeatFilter(event Event) bool {
    return event.Type() != EventTypeHeartbeat
}

缓冲与背压控制

// 带背压控制的消费者
type BackpressureConsumer struct {
    consumer    EventConsumer
    buffer      chan Event
    bufferSize  int
    dropStrategy DropStrategy
    metrics     *ConsumerMetrics
}

type DropStrategy int
const (
    DropStrategyReject DropStrategy = iota // 拒绝新事件
    DropStrategyOldest                      // 丢弃最旧事件
    DropStrategyLatest                      // 丢弃最新事件
    DropStrategySample                      // 采样丢弃
)

func (c *BackpressureConsumer) Consume(ctx context.Context, event Event) error {
    select {
    case c.buffer <- event:
        c.metrics.BufferSize.Set(float64(len(c.buffer)))
        return nil
    default:
        // 缓冲区满,应用丢弃策略
        c.metrics.DropCounter.Inc()
        
        switch c.dropStrategy {
        case DropStrategyReject:
            return fmt.Errorf("buffer full, event rejected")
        case DropStrategyOldest:
            select {
            case <-c.buffer: // 丢弃最旧
                c.buffer <- event
                return nil
            default:
                return fmt.Errorf("buffer empty after drop attempt")
            }
        case DropStrategyLatest:
            return nil // 直接丢弃当前事件
        case DropStrategySample:
            if rand.Float32() < 0.5 {
                return nil
            }
            // 50% 概率不丢弃,尝试阻塞写入
            select {
            case c.buffer <- event:
                return nil
            case <-ctx.Done():
                return ctx.Err()
            }
        default:
            return fmt.Errorf("unknown drop strategy")
        }
    }
}

func (c *BackpressureConsumer) Start(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case event := <-c.buffer:
            start := time.Now()
            if err := c.consumer.Consume(ctx, event); err != nil {
                log.Printf("Consumer error: %v", err)
            }
            c.metrics.ProcessingLatency.Observe(time.Since(start).Seconds())
        }
    }
}

有序消费保障

// 序列号排序消费者
type OrderedConsumer struct {
    consumer    EventConsumer
    buffer      map[int64]Event
    nextSeq     int64
    maxBuffer   int
    timeout     time.Duration
    mu          sync.Mutex
}

func (c *OrderedConsumer) Consume(ctx context.Context, event Event) error {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    seq := event.Sequence()
    
    if seq < c.nextSeq {
        // 重复或过期事件,丢弃
        return nil
    }
    
    if seq == c.nextSeq {
        // 正好是需要的事件,直接消费
        if err := c.consumeWithTimeout(ctx, event); err != nil {
            return err
        }
        c.nextSeq++
        
        // 消费缓冲中的连续事件
        for {
            if nextEvent, ok := c.buffer[c.nextSeq]; ok {
                if err := c.consumeWithTimeout(ctx, nextEvent); err != nil {
                    return err
                }
                delete(c.buffer, c.nextSeq)
                c.nextSeq++
            } else {
                break
            }
        }
        
        return nil
    }
    
    // 乱序事件,放入缓冲
    if len(c.buffer) >= c.maxBuffer {
        // 缓冲区满,强制消费最旧的事件
        oldestSeq := c.findOldestSequence()
        if oldestSeq != -1 {
            if err := c.consumeWithTimeout(ctx, c.buffer[oldestSeq]); err != nil {
                return err
            }
            delete(c.buffer, oldestSeq)
        }
    }
    
    c.buffer[seq] = event
    return nil
}

func (c *OrderedConsumer) consumeWithTimeout(ctx context.Context, event Event) error {
    ctx, cancel := context.WithTimeout(ctx, c.timeout)
    defer cancel()
    
    done := make(chan error, 1)
    go func() {
        done <- c.consumer.Consume(ctx, event)
    }()
    
    select {
    case err := <-done:
        return err
    case <-ctx.Done():
        return fmt.Errorf("consume timeout for event %d", event.Sequence())
    }
}

实战:Web 实时聊天系统

完整后端实现

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "net/http"
    "time"
    
    "github.com/google/adk-go/agent"
    "github.com/google/adk-go/event"
)

// ChatServer 实时聊天服务器
type ChatServer struct {
    agent     *agent.Agent
    consumers []event.EventConsumer
}

func NewChatServer(a *agent.Agent) *ChatServer {
    return &ChatServer{
        agent: a,
        consumers: []event.EventConsumer{
            // 日志消费者
            &LoggingConsumer{},
            // 指标消费者
            &MetricsConsumer{},
        },
    }
}

func (s *ChatServer) handleChat(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    
    // 解析用户输入
    var req ChatRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    
    // 设置 SSE 头部
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("X-Accel-Buffering", "no") // 禁用 Nginx 缓冲
    
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming not supported", http.StatusInternalServerError)
        return
    }
    
    // 创建事件通道
    eventChan := make(chan event.Event, 100)
    
    // 启动 Agent Streaming
    go func() {
        defer close(eventChan)
        
        stream, err := s.agent.RunStream(ctx, req.Message)
        if err != nil {
            eventChan <- &event.ErrorEvent{
                BaseEvent: event.BaseEvent{eventType: event.EventTypeError},
                Error:     err,
            }
            return
        }
        defer stream.Close()
        
        for {
            ev, err := stream.Recv()
            if err == io.EOF {
                eventChan <- &event.DoneEvent{
                    BaseEvent: event.BaseEvent{eventType: event.EventTypeDone},
                }
                return
            }
            if err != nil {
                eventChan <- &event.ErrorEvent{
                    BaseEvent: event.BaseEvent{eventType: event.EventTypeError},
                    Error:     err,
                }
                return
            }
            
            eventChan <- ev
        }
    }()
    
    // 消费事件并发送 SSE
    for ev := range eventChan {
        // 先发送给消费者链
        for _, consumer := range s.consumers {
            if err := consumer.Consume(ctx, ev); err != nil {
                log.Printf("Consumer error: %v", err)
            }
        }
        
        // 序列化事件
        data, err := s.serializeEvent(ev)
        if err != nil {
            log.Printf("Serialize error: %v", err)
            continue
        }
        
        // 发送 SSE 事件
        switch ev.Type() {
        case event.EventTypeTextDelta:
            textEvent := ev.(*event.TextDeltaEvent)
            fmt.Fprintf(w, "event: text_delta\n")
            fmt.Fprintf(w, "data: %s\n\n", jsonEscape(textEvent.Text))
            
        case event.EventTypeToolCall:
            toolEvent := ev.(*event.ToolCallEvent)
            fmt.Fprintf(w, "event: tool_call\n")
            fmt.Fprintf(w, "data: {\"tool\":\"%s\",\"args\":%s}\n\n", 
                toolEvent.ToolName, toolEvent.RawArgs)
            
        case event.EventTypeToolResult:
            resultEvent := ev.(*event.ToolResultEvent)
            resultJSON, _ := json.Marshal(resultEvent.Result)
            fmt.Fprintf(w, "event: tool_result\n")
            fmt.Fprintf(w, "data: {\"tool_id\":\"%s\",\"result\":%s}\n\n",
                resultEvent.ToolID, resultJSON)
            
        case event.EventTypeDone:
            doneEvent := ev.(*event.DoneEvent)
            fmt.Fprintf(w, "event: done\n")
            fmt.Fprintf(w, "data: {\"tokens\":%d,\"reason\":\"%s\"}\n\n",
                doneEvent.TokenUsage, doneEvent.FinishReason)
            flusher.Flush()
            return
            
        case event.EventTypeError:
            errEvent := ev.(*event.ErrorEvent)
            fmt.Fprintf(w, "event: error\n")
            fmt.Fprintf(w, "data: {\"error\":\"%s\"}\n\n", errEvent.Error.Error())
            flusher.Flush()
            return
        }
        
        flusher.Flush()
    }
}

func (s *ChatServer) serializeEvent(ev event.Event) ([]byte, error) {
    // 使用统一的序列化格式
    wrapper := map[string]interface{}{
        "type":      ev.Type().String(),
        "timestamp": ev.Timestamp().UnixMilli(),
        "sequence":  ev.Sequence(),
    }
    
    switch e := ev.(type) {
    case *event.TextDeltaEvent:
        wrapper["text"] = e.Text
        wrapper["index"] = e.Index
    case *event.ToolCallEvent:
        wrapper["tool_name"] = e.ToolName
        wrapper["tool_id"] = e.ToolID
        wrapper["args"] = e.Args
    case *event.DoneEvent:
        wrapper["tokens"] = e.TokenUsage
        wrapper["finish_reason"] = e.FinishReason
    }
    
    return json.Marshal(wrapper)
}

func jsonEscape(s string) string {
    b, _ := json.Marshal(s)
    return string(b)
}

type ChatRequest struct {
    Message string `json:"message"`
    SessionID string `json:"session_id,omitempty"`
}

// LoggingConsumer 日志消费者
type LoggingConsumer struct{}

func (c *LoggingConsumer) Consume(ctx context.Context, ev event.Event) error {
    switch ev.Type() {
    case event.EventTypeTextDelta:
        textEvent := ev.(*event.TextDeltaEvent)
        log.Printf("[TextDelta] seq=%d len=%d", ev.Sequence(), len(textEvent.Text))
    case event.EventTypeToolCall:
        toolEvent := ev.(*event.ToolCallEvent)
        log.Printf("[ToolCall] seq=%d tool=%s", ev.Sequence(), toolEvent.ToolName)
    case event.EventTypeDone:
        doneEvent := ev.(*event.DoneEvent)
        log.Printf("[Done] seq=%d tokens=%d", ev.Sequence(), doneEvent.TokenUsage)
    }
    return nil
}

func (c *LoggingConsumer) Close() error { return nil }

// MetricsConsumer 指标消费者
type MetricsConsumer struct {
    textTokens  int
    toolCalls   int
    startTime   time.Time
}

func (c *MetricsConsumer) Consume(ctx context.Context, ev event.Event) error {
    if c.startTime.IsZero() {
        c.startTime = time.Now()
    }
    
    switch ev.Type() {
    case event.EventTypeTextDelta:
        c.textTokens += len(ev.(*event.TextDeltaEvent).Text)
    case event.EventTypeToolCall:
        c.toolCalls++
    case event.EventTypeDone:
        latency := time.Since(c.startTime)
        log.Printf("[Metrics] total_latency=%v text_chars=%d tool_calls=%d",
            latency, c.textTokens, c.toolCalls)
    }
    return nil
}

func (c *MetricsConsumer) Close() error { return nil }

func main() {
    // 创建 Agent
    a, err := agent.New(agent.Config{
        Name:        "chat-agent",
        Model:       model,
        Instruction: "你是一个 helpful 的助手。",
    })
    if err != nil {
        log.Fatal(err)
    }
    
    server := NewChatServer(a)
    
    http.HandleFunc("/chat", server.handleChat)
    log.Println("Server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

错误恢复与容错

断线重连机制

// 可恢复的事件流
type ResumableStream struct {
    agent       *agent.Agent
    lastSeq     int64
    input       string
    reconnects  int
    maxReconnects int
    backoff     time.Duration
}

func (s *ResumableStream) Start(ctx context.Context, handler EventHandler) error {
    for {
        stream, err := s.agent.RunStream(ctx, s.input, 
            agent.WithResumeFrom(s.lastSeq))
        if err != nil {
            if s.reconnects >= s.maxReconnects {
                return fmt.Errorf("max reconnects exceeded: %w", err)
            }
            
            // 退避重试
            time.Sleep(s.backoff * time.Duration(1<<s.reconnects))
            s.reconnects++
            continue
        }
        
        s.reconnects = 0
        
        for {
            ev, err := stream.Recv()
            if err == io.EOF {
                return nil
            }
            if err != nil {
                // 连接断开,尝试重连
                break
            }
            
            s.lastSeq = ev.Sequence()
            
            if err := handler.Handle(ctx, ev); err != nil {
                return err
            }
        }
    }
}

性能优化

批量处理

// 批量事件消费者
type BatchingConsumer struct {
    consumer    EventConsumer
    batchSize   int
    flushInterval time.Duration
    buffer      []Event
    mu          sync.Mutex
}

func (c *BatchingConsumer) Consume(ctx context.Context, event Event) error {
    c.mu.Lock()
    c.buffer = append(c.buffer, event)
    
    if len(c.buffer) >= c.batchSize {
        batch := c.buffer
        c.buffer = nil
        c.mu.Unlock()
        
        return c.flush(ctx, batch)
    }
    c.mu.Unlock()
    
    return nil
}

func (c *BatchingConsumer) Start(ctx context.Context) {
    ticker := time.NewTicker(c.flushInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            c.mu.Lock()
            if len(c.buffer) > 0 {
                batch := c.buffer
                c.buffer = nil
                c.mu.Unlock()
                
                c.flush(ctx, batch)
            } else {
                c.mu.Unlock()
            }
        }
    }
}

func (c *BatchingConsumer) flush(ctx context.Context, batch []Event) error {
    // 批量处理逻辑
    for _, event := range batch {
        if err := c.consumer.Consume(ctx, event); err != nil {
            return err
        }
    }
    return nil
}

常见问题深度解析

Q:Event 丢失怎么办?

A:三层保障:

  1. 客户端确认:重要 Event 要求客户端发送 ACK
  2. 服务端缓冲:服务端维护发送缓冲区,未确认事件重发
  3. 持久化:关键 Event 写入消息队列(如 Kafka),确保至少一次投递

Q:如何处理高并发下的 Streaming?

A

  1. 连接池:复用与 LLM API 的连接
  2. 限流:基于令牌桶限制并发 Stream 数量
  3. 降级:高负载时关闭非关键消费者(如日志)
  4. 水平扩展:使用负载均衡分发请求到多个实例

Q:Streaming 的 Token 消耗是否更高?

A:Token 消耗相同,但 Streaming 有额外开销:

  • 网络开销:每个 Event 都有 HTTP 帧头开销
  • 序列化开销:每个 Event 独立序列化
  • 处理开销:消费者链的处理延迟

优化:合并小 TextDelta(<10 字符)为一批发送。

下一步

Event 处理的实时捕获与消费架构已深入掌握。接下来探索多模态 Streaming——音频、图像、视频的事件处理与流式传输。

Streaming 原理 | 多模态 Streaming →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。