多模态 Streaming 深度解析:音视频图像的流式处理、编码优化与实时传输

多模态能力是现代 AI 系统的关键差异化特性。当 Agent 需要处理音频、图像、视频等非文本内容时,传统的请求-响应模式面临巨大挑战——大文件传输延迟高、内存占用大、用户体验差。Streaming 多模态处理通过分片传输、渐进解码、流式同步等技术,实现高效的实时多模态交互。本文将深入 ADK Go 的多模态 Streaming 架构与生产实践。

多模态 Streaming 的架构挑战

核心难题

模态数据特征传输挑战处理挑战
图像1-10MB大文件延迟分辨率适配、格式转换
音频流式生成实时性要求编码格式、采样率
视频10-100MB+带宽占用大帧提取、关键帧采样
混合多种格式同步困难时序对齐、上下文关联

流式处理模型

输入数据
┌─────────────┐    分片/采样    ┌─────────────┐    编码压缩    ┌─────────────┐
│   原始媒体   │ ─────────────► │   处理单元   │ ───────────► │   传输单元   │
│  (大文件)    │               │  (帧/块/段)  │              │  (压缩包)    │
└─────────────┘               └─────────────┘              └──────┬──────┘
                                                           ┌─────────────┐
                                                           │   LLM API   │
                                                           │  (流式处理)  │
                                                           └──────┬──────┘
                                                           ┌─────────────┐
                                                           │  增量结果    │
                                                           │  (TextDelta) │
                                                           └─────────────┘

图像流式处理

分片传输架构

// 图像分片器
type ImageChunker struct {
    maxChunkSize int    // 最大分片大小(如 256KB)
    overlap      int    // 分片重叠区域(用于上下文保持)
    format       string // 输出格式
}

// 图像分片(基于网格)
type ImageChunk struct {
    Index      int       // 分片序号
    Bounds     image.Rectangle // 在原图中的位置
    Data       []byte    // 分片数据
    TotalChunks int      // 总分片数
    Metadata   map[string]interface{} // 元数据
}

func (c *ImageChunker) Chunk(img image.Image) ([]*ImageChunk, error) {
    bounds := img.Bounds()
    width := bounds.Dx()
    height := bounds.Dy()
    
    // 计算网格大小
    cols := int(math.Ceil(float64(width) / math.Sqrt(float64(c.maxChunkSize))))
    rows := int(math.Ceil(float64(height) / math.Sqrt(float64(c.maxChunkSize))))
    
    chunkWidth := width / cols
    chunkHeight := height / rows
    
    chunks := make([]*ImageChunk, 0, cols*rows)
    index := 0
    
    for row := 0; row < rows; row++ {
        for col := 0; col < cols; col++ {
            // 计算分片边界(含重叠)
            x1 := col*chunkWidth - c.overlap
            if x1 < 0 { x1 = 0 }
            y1 := row*chunkHeight - c.overlap
            if y1 < 0 { y1 = 0 }
            
            x2 := (col+1)*chunkWidth + c.overlap
            if x2 > width { x2 = width }
            y2 := (row+1)*chunkHeight + c.overlap
            if y2 > height { y2 = height }
            
            // 裁剪分片
            chunkBounds := image.Rect(x1, y1, x2, y2)
            chunkImg := imaging.Crop(img, chunkBounds)
            
            // 编码
            var buf bytes.Buffer
            if err := imaging.Encode(&buf, chunkImg, imaging.JPEG, imaging.JPEGQuality(85)); err != nil {
                return nil, fmt.Errorf("encode chunk: %w", err)
            }
            
            chunks = append(chunks, &ImageChunk{
                Index:       index,
                Bounds:      chunkBounds,
                Data:        buf.Bytes(),
                TotalChunks: cols * rows,
                Metadata: map[string]interface{}{
                    "original_width":  width,
                    "original_height": height,
                    "grid_position":   fmt.Sprintf("%d,%d", row, col),
                },
            })
            index++
        }
    }
    
    return chunks, nil
}

自适应分辨率

// 根据网络状况和场景选择合适分辨率
type AdaptiveResolution struct {
    networkType   string // "wifi" | "4g" | "5g"
    latencyBudget time.Duration
    qualityLevel  int    // 1-5
}

func (ar *AdaptiveResolution) SelectResolution(originalWidth, originalHeight int) (int, int) {
    // 基于网络类型的基础限制
    maxDimension := map[string]int{
        "wifi": 2048,
        "5g":   1536,
        "4g":   1024,
        "3g":   512,
    }[ar.networkType]
    
    if maxDimension == 0 {
        maxDimension = 1024
    }
    
    // 根据质量等级调整
    qualityMultiplier := map[int]float64{
        1: 0.5,
        2: 0.7,
        3: 1.0,
        4: 1.2,
        5: 1.5,
    }[ar.qualityLevel]
    
    maxDimension = int(float64(maxDimension) * qualityMultiplier)
    
    // 计算目标尺寸(保持宽高比)
    scale := math.Min(
        float64(maxDimension)/float64(originalWidth),
        float64(maxDimension)/float64(originalHeight),
    )
    
    if scale >= 1.0 {
        return originalWidth, originalHeight
    }
    
    return int(float64(originalWidth) * scale), int(float64(originalHeight) * scale)
}

音频流式处理

实时音频流

// 音频流处理器
type AudioStreamProcessor struct {
    sampleRate    int
    channels      int
    frameDuration time.Duration // 每帧时长(如 20ms)
    encoder       AudioEncoder
}

// 音频帧
type AudioFrame struct {
    Sequence    int       // 帧序号
    Timestamp   time.Time // 时间戳
    Data        []byte    // PCM 或编码数据
    SampleRate  int
    Channels    int
    IsKeyFrame  bool      // 是否关键帧(用于丢包恢复)
}

func (p *AudioStreamProcessor) ProcessStream(ctx context.Context, reader io.Reader, handler FrameHandler) error {
    frameSize := int(p.sampleRate * p.channels * 2 * p.frameDuration.Seconds()) // 16-bit PCM
    
    buffer := make([]byte, frameSize)
    sequence := 0
    
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
        
        n, err := io.ReadFull(reader, buffer)
        if err == io.EOF {
            return nil
        }
        if err != nil && err != io.ErrUnexpectedEOF {
            return fmt.Errorf("read audio frame: %w", err)
        }
        
        frame := &AudioFrame{
            Sequence:   sequence,
            Timestamp:  time.Now(),
            Data:       buffer[:n],
            SampleRate: p.sampleRate,
            Channels:   p.channels,
            IsKeyFrame: sequence%50 == 0, // 每 50 帧一个关键帧
        }
        
        // 编码(如 Opus)
        encoded, err := p.encoder.Encode(frame)
        if err != nil {
            return fmt.Errorf("encode frame: %w", err)
        }
        frame.Data = encoded
        
        if err := handler.Handle(frame); err != nil {
            return err
        }
        
        sequence++
    }
}

// 语音活动检测(VAD)优化
type VADProcessor struct {
    threshold    float64
    silenceFrames int
    maxSilence   int
}

func (v *VADProcessor) Process(frame *AudioFrame) (bool, *AudioFrame) {
    energy := calculateEnergy(frame.Data)
    
    if energy > v.threshold {
        v.silenceFrames = 0
        return true, frame
    }
    
    v.silenceFrames++
    if v.silenceFrames > v.maxSilence {
        return false, nil // 丢弃静音帧
    }
    
    return true, frame // 保留少量静音(用于自然停顿)
}

视频流式处理

关键帧提取

// 视频关键帧提取器
type KeyFrameExtractor struct {
    interval     time.Duration // 提取间隔
    minQuality   float64       // 最低质量阈值
    maxFrames    int           // 最大提取帧数
}

type VideoFrame struct {
    Timestamp   time.Duration // 在视频中的时间位置
    Image       image.Image   // 帧图像
    Quality     float64       // 质量评分
    IsKeyFrame  bool          // 是否关键帧
}

func (e *KeyFrameExtractor) Extract(ctx context.Context, videoPath string) ([]*VideoFrame, error) {
    // 使用 ffmpeg 提取帧
    cmd := exec.CommandContext(ctx, "ffmpeg",
        "-i", videoPath,
        "-vf", fmt.Sprintf("select='not(mod(n\\,%d))',scale=640:-1", 
            int(e.interval.Seconds()*30)), // 假设 30fps
        "-vsync", "vfr",
        "-q:v", "2",
        "-f", "image2pipe",
        "-vcodec", "mjpeg",
        "-",
    )
    
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        return nil, err
    }
    
    if err := cmd.Start(); err != nil {
        return nil, err
    }
    defer cmd.Wait()
    
    frames := make([]*VideoFrame, 0)
    decoder := jpeg.NewDecoder()
    
    for i := 0; i < e.maxFrames; i++ {
        select {
        case <-ctx.Done():
            return frames, ctx.Err()
        default:
        }
        
        img, err := decoder.Decode(stdout)
        if err == io.EOF {
            break
        }
        if err != nil {
            continue
        }
        
        quality := e.assessQuality(img)
        if quality < e.minQuality {
            continue
        }
        
        frames = append(frames, &VideoFrame{
            Timestamp:  time.Duration(i) * e.interval,
            Image:      img,
            Quality:    quality,
            IsKeyFrame: i == 0 || quality > 0.9,
        })
    }
    
    return frames, nil
}

func (e *KeyFrameExtractor) assessQuality(img image.Image) float64 {
    bounds := img.Bounds()
    
    // 基于分辨率、清晰度、对比度综合评分
    resolutionScore := math.Min(float64(bounds.Dx()*bounds.Dy())/1000000.0, 1.0)
    
    // 简化版清晰度评估(拉普拉斯方差)
    sharpness := calculateSharpness(img)
    sharpnessScore := math.Min(sharpness/1000.0, 1.0)
    
    return 0.4*resolutionScore + 0.6*sharpnessScore
}

多模态流式同步

时间轴同步

// 多模态时间轴同步器
type MultiModalSync struct {
    streams    map[string]*MediaStream
    timeline   *Timeline
    syncWindow time.Duration
}

type MediaStream struct {
    Type      string        // "text" | "audio" | "video" | "image"
    Events    []TimedEvent  // 带时间戳的事件
    Position  time.Duration // 当前播放位置
}

type TimedEvent struct {
    Timestamp time.Duration
    Event     interface{}
}

func (s *MultiModalSync) Synchronize(ctx context.Context) (<-chan *SyncFrame, error) {
    output := make(chan *SyncFrame, 10)
    
    go func() {
        defer close(output)
        
        ticker := time.NewTicker(50 * time.Millisecond)
        defer ticker.Stop()
        
        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                frame := s.collectFrame()
                if frame != nil {
                    output <- frame
                }
            }
        }
    }()
    
    return output, nil
}

func (s *MultiModalSync) collectFrame() *SyncFrame {
    now := s.timeline.CurrentPosition()
    window := s.syncWindow
    
    frame := &SyncFrame{
        Timestamp: now,
        Events:    make(map[string]interface{}),
    }
    
    hasData := false
    
    for streamType, stream := range s.streams {
        // 找到当前时间窗口内的事件
        for _, event := range stream.Events {
            if event.Timestamp >= now-window && event.Timestamp <= now+window {
                frame.Events[streamType] = event.Event
                hasData = true
                break
            }
        }
    }
    
    if !hasData {
        return nil
    }
    
    return frame
}

type SyncFrame struct {
    Timestamp time.Duration
    Events    map[string]interface{} // stream_type -> event
}

实战:看图问答系统

完整实现

package main

import (
    "bytes"
    "context"
    "fmt"
    "image"
    "image/jpeg"
    "io"
    "log"
    "math"
    "net/http"
    "time"
    
    "github.com/google/adk-go/agent"
    "github.com/google/adk-go/event"
    "github.com/disintegration/imaging"
)

// VisionChatSystem 视觉问答系统
type VisionChatSystem struct {
    agent           *agent.Agent
    imageProcessor  *ImageProcessor
    maxImageSize    int
    quality         int
}

type ImageProcessor struct {
    maxDimension int
    quality      int
    format       imaging.Format
}

func (p *ImageProcessor) Process(imageData []byte) ([]byte, error) {
    // 解码图像
    img, format, err := image.Decode(bytes.NewReader(imageData))
    if err != nil {
        return nil, fmt.Errorf("decode image: %w", err)
    }
    
    bounds := img.Bounds()
    width := bounds.Dx()
    height := bounds.Dy()
    
    // 自适应调整大小
    if width > p.maxDimension || height > p.maxDimension {
        scale := math.Min(
            float64(p.maxDimension)/float64(width),
            float64(p.maxDimension)/float64(height),
        )
        
        newWidth := int(float64(width) * scale)
        newHeight := int(float64(height) * scale)
        
        img = imaging.Resize(img, newWidth, newHeight, imaging.Lanczos)
    }
    
    // 编码输出
    var buf bytes.Buffer
    switch format {
    case "jpeg":
        err = jpeg.Encode(&buf, img, &jpeg.Options{Quality: p.quality})
    default:
        err = imaging.Encode(&buf, img, p.format, imaging.JPEGQuality(p.quality))
    }
    
    if err != nil {
        return nil, fmt.Errorf("encode image: %w", err)
    }
    
    return buf.Bytes(), nil
}

func (s *VisionChatSystem) HandleChat(ctx context.Context, text string, imageData []byte) (<-chan event.Event, error) {
    eventChan := make(chan event.Event, 100)
    
    go func() {
        defer close(eventChan)
        
        // 处理图像
        processedImage, err := s.imageProcessor.Process(imageData)
        if err != nil {
            eventChan <- &event.ErrorEvent{
                BaseEvent: event.BaseEvent{eventType: event.EventTypeError},
                Error:     err,
            }
            return
        }
        
        // 构建多模态输入
        multimodalInput := agent.NewMultimodalInput()
        multimodalInput.AddText(text)
        multimodalInput.AddImage(processedImage, "image/jpeg")
        
        // 流式执行
        stream, err := s.agent.RunStream(ctx, multimodalInput)
        if err != nil {
            eventChan <- &event.ErrorEvent{
                BaseEvent: event.BaseEvent{eventType: event.EventTypeError},
                Error:     err,
            }
            return
        }
        defer stream.Close()
        
        // 转发事件
        for {
            ev, err := stream.Recv()
            if err == io.EOF {
                eventChan <- &event.DoneEvent{
                    BaseEvent: event.BaseEvent{eventType: event.EventTypeDone},
                }
                return
            }
            if err != nil {
                eventChan <- &event.ErrorEvent{
                    BaseEvent: event.BaseEvent{eventType: event.EventTypeError},
                    Error:     err,
                }
                return
            }
            
            eventChan <- ev
        }
    }()
    
    return eventChan, nil
}

// HTTP 处理器
func (s *VisionChatSystem) HTTPHandler(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    
    // 解析 multipart 表单
    if err := r.ParseMultipartForm(32 << 20); err != nil { // 32MB max
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    
    text := r.FormValue("text")
    
    // 读取图像
    file, _, err := r.FormFile("image")
    if err != nil {
        http.Error(w, "image required", http.StatusBadRequest)
        return
    }
    defer file.Close()
    
    imageData, err := io.ReadAll(file)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    // 设置 SSE
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming not supported", http.StatusInternalServerError)
        return
    }
    
    // 处理请求
    eventChan, err := s.HandleChat(ctx, text, imageData)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    // 发送事件
    for ev := range eventChan {
        data, err := json.Marshal(ev)
        if err != nil {
            continue
        }
        
        fmt.Fprintf(w, "data: %s\n\n", data)
        flusher.Flush()
    }
}

func main() {
    // 创建 Agent
    a, err := agent.New(agent.Config{
        Name:        "vision-agent",
        Model:       model, // 使用支持视觉的模型,如 gemini-pro-vision
        Instruction: "你是一个视觉助手。仔细观察图像,回答用户的问题。",
        Multimodal:  true,
    })
    if err != nil {
        log.Fatal(err)
    }
    
    system := &VisionChatSystem{
        agent: a,
        imageProcessor: &ImageProcessor{
            maxDimension: 1024,
            quality:      85,
            format:       imaging.JPEG,
        },
    }
    
    http.HandleFunc("/vision-chat", system.HTTPHandler)
    log.Println("Vision server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

带宽自适应

动态码率调整

// 带宽监控器
type BandwidthMonitor struct {
    samples     []BandwidthSample
    windowSize  time.Duration
    currentBW   float64 // bytes per second
}

type BandwidthSample struct {
    Timestamp time.Time
    Bytes     int
    Duration  time.Duration
}

func (m *BandwidthMonitor) Update(bytes int, duration time.Duration) {
    sample := BandwidthSample{
        Timestamp: time.Now(),
        Bytes:     bytes,
        Duration:  duration,
    }
    
    m.samples = append(m.samples, sample)
    
    // 清理过期样本
    cutoff := time.Now().Add(-m.windowSize)
    var i int
    for i = 0; i < len(m.samples); i++ {
        if m.samples[i].Timestamp.After(cutoff) {
            break
        }
    }
    m.samples = m.samples[i:]
    
    // 计算平均带宽
    var totalBytes int
    var totalDuration time.Duration
    for _, s := range m.samples {
        totalBytes += s.Bytes
        totalDuration += s.Duration
    }
    
    if totalDuration > 0 {
        m.currentBW = float64(totalBytes) / totalDuration.Seconds()
    }
}

// 自适应编码器
type AdaptiveEncoder struct {
    monitor     *BandwidthMonitor
    targetBW    float64 // 目标带宽
    encoder     MediaEncoder
}

func (e *AdaptiveEncoder) Encode(frame *MediaFrame) ([]byte, error) {
    // 根据当前带宽调整质量
    bw := e.monitor.currentBW
    
    quality := 1.0
    if bw > 0 {
        quality = math.Min(e.targetBW/bw, 1.0)
    }
    
    // 动态调整编码参数
    params := EncodingParams{
        Quality:      int(quality * 100),
        Resolution:   e.calculateResolution(quality),
        FrameRate:    e.calculateFrameRate(quality),
    }
    
    return e.encoder.Encode(frame, params)
}

常见问题深度解析

Q:大图像如何不影响首字延迟?

A:三层策略:

  1. 预上传:用户选择图像后立即开始上传,不等文本输入完成
  2. 渐进传输:先发送缩略图(用于快速预览),再传输完整图像
  3. 并行处理:图像上传和文本处理并行进行

Q:视频处理太慢怎么办?

A

  1. 关键帧采样:只提取 5-10 个关键帧,而非全部帧
  2. 分辨率降级:处理时使用 480p,展示时使用原分辨率
  3. 预提取:上传时后台提取关键帧,用户提问时直接使用
  4. 流式提取:边上传边提取,不等完整上传

Q:多模态的 Token 消耗如何控制?

A

  • 图像:压缩到 512x512,Token 数约 1000-2000
  • 音频:转文本后处理,避免直接传输音频 Token
  • 视频:提取关键帧,每帧约 1000 Token,控制帧数 <10

下一步

多模态 Streaming 的流式处理与传输已深入掌握。接下来探索流式聊天界面实战——从 SSE 到 WebSocket,构建完整的实时交互 UI。

Event 处理 | 流式聊天界面 →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。