流式聊天界面深度实战:从 SSE 到 WebSocket,构建生产级实时交互系统

流式聊天界面是用户与 AI Agent 交互的核心载体。一个优秀的流式聊天系统需要在低延迟、高可靠性、良好用户体验之间取得平衡。本文将深入 SSE 与 WebSocket 的技术选型、消息可靠性保障、连接生命周期管理、前端渲染优化与生产级部署策略。

传输协议深度对比

SSE vs WebSocket

维度SSEWebSocket
通信方向单向(服务器→客户端)双向
协议基础HTTPTCP + WebSocket 握手
自动重连原生支持需手动实现
浏览器兼容优秀(IE 除外)良好
穿透代理优秀可能需配置
心跳机制原生需自定义
多路复用单连接单流单连接多流
二进制支持Base64 编码原生支持
适用场景服务器推送双向实时通信

选型决策树

是否需要客户端主动推送?
    是 ─┼──→ WebSocket
    否 ─┘
是否需要双向低延迟?(如协同编辑)
    是 ─┼──→ WebSocket
    否 ─┘
是否需要简单实现 + 自动重连?
    是 ─┼──→ SSE
    否 ─┘
    默认 SSE(大多数 AI 聊天场景)

生产建议:AI 聊天场景首选 SSE,因为:

  1. 单向通信足够(用户输入通过 HTTP POST,输出通过 SSE)
  2. 自动重连减少客户端复杂度
  3. HTTP 基础设施成熟(负载均衡、CDN、WAF 都支持)

生产级 SSE 架构

服务端实现

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "net/http"
    "sync"
    "time"
    
    "github.com/google/adk-go/agent"
    "github.com/google/adk-go/event"
    "github.com/google/uuid"
)

// ChatServer 生产级聊天服务器
type ChatServer struct {
    agent        *agent.Agent
    sessions     *SessionManager
    rateLimiter  *RateLimiter
    metrics      *ChatMetrics
    upgrader     *websocket.Upgrader // WebSocket 支持
}

// Session 管理器
type SessionManager struct {
    sessions map[string]*ChatSession
    mu       sync.RWMutex
    ttl      time.Duration
}

type ChatSession struct {
    ID           string
    UserID       string
    History      []Message
    CreatedAt    time.Time
    LastActivity time.Time
    CancelFunc   context.CancelFunc
}

type Message struct {
    ID        string    `json:"id"`
    Role      string    `json:"role"` // "user" | "assistant" | "system"
    Content   string    `json:"content"`
    Timestamp time.Time `json:"timestamp"`
    Metadata  map[string]interface{} `json:"metadata,omitempty"`
}

// SSE 处理器
func (s *ChatServer) handleSSE(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    
    // 1. 限流检查
    clientIP := getClientIP(r)
    if !s.rateLimiter.Allow(clientIP) {
        http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
        return
    }
    
    // 2. 获取或创建会话
    sessionID := r.URL.Query().Get("session_id")
    if sessionID == "" {
        sessionID = uuid.New().String()
    }
    
    session, err := s.sessions.GetOrCreate(sessionID)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    // 3. 读取用户输入
    var req ChatRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    
    // 4. 设置 SSE 头部
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("X-Accel-Buffering", "no")
    w.Header().Set("Access-Control-Allow-Origin", "*")
    
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming not supported", http.StatusInternalServerError)
        return
    }
    
    // 5. 发送会话 ID(首条事件)
    fmt.Fprintf(w, "event: session\n")
    fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]string{"session_id": sessionID}))
    flusher.Flush()
    
    // 6. 创建可取消的上下文
    ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
    defer cancel()
    session.CancelFunc = cancel
    
    // 7. 记录用户消息
    userMsg := Message{
        ID:        uuid.New().String(),
        Role:      "user",
        Content:   req.Message,
        Timestamp: time.Now(),
    }
    session.History = append(session.History, userMsg)
    session.LastActivity = time.Now()
    
    // 8. 启动 Streaming
    startTime := time.Now()
    messageID := uuid.New().String()
    
    // 发送消息开始事件
    fmt.Fprintf(w, "event: message_start\n")
    fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
        "message_id": messageID,
        "role":       "assistant",
    }))
    flusher.Flush()
    
    // 执行 Agent Streaming
    stream, err := s.agent.RunStream(ctx, req.Message, 
        agent.WithHistory(session.History))
    if err != nil {
        s.sendError(w, flusher, err)
        return
    }
    defer stream.Close()
    
    var fullContent strings.Builder
    tokenCount := 0
    
    for {
        ev, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            s.sendError(w, flusher, err)
            return
        }
        
        switch e := ev.(type) {
        case *event.TextDeltaEvent:
            fullContent.WriteString(e.Text)
            tokenCount += len(e.Text)
            
            // 发送文本增量
            fmt.Fprintf(w, "event: text_delta\n")
            fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
                "message_id": messageID,
                "delta":      e.Text,
                "index":      e.Index,
            }))
            flusher.Flush()
            
        case *event.ToolCallEvent:
            // 发送工具调用事件(UI 可展示加载状态)
            fmt.Fprintf(w, "event: tool_call\n")
            fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
                "message_id": messageID,
                "tool_name":  e.ToolName,
                "tool_id":    e.ToolID,
            }))
            flusher.Flush()
            
        case *event.ToolResultEvent:
            fmt.Fprintf(w, "event: tool_result\n")
            fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
                "message_id": messageID,
                "tool_id":    e.ToolID,
                "result":     e.Result,
            }))
            flusher.Flush()
            
        case *event.ThinkingEvent:
            // 可选:展示思考过程
            fmt.Fprintf(w, "event: thinking\n")
            fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
                "message_id": messageID,
                "content":    e.Content,
            }))
            flusher.Flush()
        }
    }
    
    // 9. 发送完成事件
    latency := time.Since(startTime)
    
    fmt.Fprintf(w, "event: message_end\n")
    fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
        "message_id":  messageID,
        "content":     fullContent.String(),
        "token_count": tokenCount,
        "latency_ms":  latency.Milliseconds(),
    }))
    flusher.Flush()
    
    // 10. 保存助手回复到历史
    assistantMsg := Message{
        ID:        messageID,
        Role:      "assistant",
        Content:   fullContent.String(),
        Timestamp: time.Now(),
        Metadata: map[string]interface{}{
            "latency_ms":  latency.Milliseconds(),
            "token_count": tokenCount,
        },
    }
    session.History = append(session.History, assistantMsg)
    session.LastActivity = time.Now()
    
    // 记录指标
    s.metrics.RecordMessage(latency, tokenCount)
}

func (s *ChatServer) sendError(w http.ResponseWriter, flusher http.Flusher, err error) {
    fmt.Fprintf(w, "event: error\n")
    fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]string{
        "error": err.Error(),
    }))
    flusher.Flush()
}

func mustJSON(v interface{}) string {
    b, _ := json.Marshal(v)
    return string(b)
}

限流与防护

// 令牌桶限流器
type RateLimiter struct {
    buckets map[string]*TokenBucket
    mu      sync.RWMutex
    rate    float64 // 每秒令牌数
    burst   int     // 桶容量
}

type TokenBucket struct {
    tokens    float64
    lastUpdate time.Time
    mu        sync.Mutex
}

func (rl *RateLimiter) Allow(key string) bool {
    rl.mu.Lock()
    bucket, exists := rl.buckets[key]
    if !exists {
        bucket = &TokenBucket{
            tokens:     float64(rl.burst),
            lastUpdate: time.Now(),
        }
        rl.buckets[key] = bucket
    }
    rl.mu.Unlock()
    
    bucket.mu.Lock()
    defer bucket.mu.Unlock()
    
    // 补充令牌
    now := time.Now()
    elapsed := now.Sub(bucket.lastUpdate).Seconds()
    bucket.tokens = math.Min(bucket.tokens+elapsed*rl.rate, float64(rl.burst))
    bucket.lastUpdate = now
    
    if bucket.tokens >= 1 {
        bucket.tokens--
        return true
    }
    
    return false
}

// IP 级别的连接限制
type ConnectionLimiter struct {
    connections map[string]int
    maxConn     int
    mu          sync.RWMutex
}

func (cl *ConnectionLimiter) Acquire(ip string) bool {
    cl.mu.Lock()
    defer cl.mu.Unlock()
    
    if cl.connections[ip] >= cl.maxConn {
        return false
    }
    
    cl.connections[ip]++
    return true
}

func (cl *ConnectionLimiter) Release(ip string) {
    cl.mu.Lock()
    defer cl.mu.Unlock()
    
    if cl.connections[ip] > 0 {
        cl.connections[ip]--
    }
}

前端实时渲染

现代前端实现(React)

import React, { useState, useRef, useCallback, useEffect } from 'react';

interface Message {
  id: string;
  role: 'user' | 'assistant';
  content: string;
  isStreaming?: boolean;
  metadata?: {
    latency_ms?: number;
    token_count?: number;
  };
}

interface ChatState {
  messages: Message[];
  sessionId: string | null;
  isConnected: boolean;
  isLoading: boolean;
}

export function ChatInterface() {
  const [state, setState] = useState<ChatState>({
    messages: [],
    sessionId: null,
    isConnected: false,
    isLoading: false,
  });
  const [input, setInput] = useState('');
  const messagesEndRef = useRef<HTMLDivElement>(null);
  const eventSourceRef = useRef<EventSource | null>(null);
  const abortControllerRef = useRef<AbortController | null>(null);

  // 自动滚动到底部
  const scrollToBottom = useCallback(() => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
  }, []);

  useEffect(() => {
    scrollToBottom();
  }, [state.messages, scrollToBottom]);

  // 发送消息
  const sendMessage = useCallback(async () => {
    if (!input.trim() || state.isLoading) return;

    const userMessage: Message = {
      id: generateId(),
      role: 'user',
      content: input.trim(),
    };

    setState(prev => ({
      ...prev,
      messages: [...prev.messages, userMessage],
      isLoading: true,
    }));
    setInput('');

    // 创建 AbortController 用于取消
    abortControllerRef.current = new AbortController();

    try {
      const response = await fetch('/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          message: userMessage.content,
          session_id: state.sessionId,
        }),
        signal: abortControllerRef.current.signal,
      });

      if (!response.ok) {
        throw new Error(`HTTP ${response.status}`);
      }

      const reader = response.body?.getReader();
      const decoder = new TextDecoder();
      let buffer = '';

      // 添加临时的 assistant 消息
      const assistantId = generateId();
      setState(prev => ({
        ...prev,
        messages: [...prev.messages, {
          id: assistantId,
          role: 'assistant',
          content: '',
          isStreaming: true,
        }],
      }));

      while (reader) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop() || '';

        for (const line of lines) {
          if (line.startsWith('event: ')) {
            const eventType = line.slice(7);
            // 处理事件类型
          } else if (line.startsWith('data: ')) {
            const data = line.slice(6);
            handleSSEEvent(data, assistantId);
          }
        }
      }
    } catch (error) {
      if (error instanceof Error && error.name === 'AbortError') {
        console.log('Request cancelled');
      } else {
        console.error('Chat error:', error);
        setState(prev => ({
          ...prev,
          messages: [...prev.messages, {
            id: generateId(),
            role: 'assistant',
            content: '抱歉,发生了错误。请重试。',
          }],
        }));
      }
    } finally {
      setState(prev => ({ ...prev, isLoading: false }));
    }
  }, [input, state.sessionId, state.isLoading]);

  // 处理 SSE 事件
  const handleSSEEvent = useCallback((data: string, assistantId: string) => {
    try {
      const event = JSON.parse(data);

      setState(prev => {
        const messages = [...prev.messages];
        const assistantIndex = messages.findIndex(m => m.id === assistantId);

        if (assistantIndex === -1) return prev;

        switch (event.type || event.event) {
          case 'session':
            return { ...prev, sessionId: event.session_id };

          case 'text_delta':
            messages[assistantIndex] = {
              ...messages[assistantIndex],
              content: messages[assistantIndex].content + event.delta,
            };
            return { ...prev, messages };

          case 'tool_call':
            // 可以展示工具调用状态
            messages[assistantIndex] = {
              ...messages[assistantIndex],
              content: messages[assistantIndex].content + `\n[使用工具: ${event.tool_name}]\n`,
            };
            return { ...prev, messages };

          case 'message_end':
            messages[assistantIndex] = {
              ...messages[assistantIndex],
              content: event.content || messages[assistantIndex].content,
              isStreaming: false,
              metadata: {
                latency_ms: event.latency_ms,
                token_count: event.token_count,
              },
            };
            return { ...prev, messages };

          case 'error':
            messages[assistantIndex] = {
              ...messages[assistantIndex],
              content: `错误: ${event.error}`,
              isStreaming: false,
            };
            return { ...prev, messages, isLoading: false };
        }

        return prev;
      });
    } catch (e) {
      console.error('Parse SSE data error:', e);
    }
  }, []);

  // 取消生成
  const cancelGeneration = useCallback(() => {
    abortControllerRef.current?.abort();
    setState(prev => ({ ...prev, isLoading: false }));
  }, []);

  return (
    <div className="chat-container">
      <div className="messages">
        {state.messages.map(msg => (
          <div key={msg.id} className={`message ${msg.role}`}>
            <div className="message-content">
              {msg.content}
              {msg.isStreaming && <span className="cursor"></span>}
            </div>
            {msg.metadata && (
              <div className="message-meta">
                {msg.metadata.token_count && `${msg.metadata.token_count} tokens`}
                {msg.metadata.latency_ms && ` · ${msg.metadata.latency_ms}ms`}
              </div>
            )}
          </div>
        ))}
        <div ref={messagesEndRef} />
      </div>

      <div className="input-area">
        <textarea
          value={input}
          onChange={e => setInput(e.target.value)}
          onKeyDown={e => {
            if (e.key === 'Enter' && !e.shiftKey) {
              e.preventDefault();
              sendMessage();
            }
          }}
          placeholder="输入消息..."
          disabled={state.isLoading}
        />
        <button
          onClick={state.isLoading ? cancelGeneration : sendMessage}
          className={state.isLoading ? 'cancel' : 'send'}
        >
          {state.isLoading ? '停止' : '发送'}
        </button>
      </div>
    </div>
  );
}

function generateId(): string {
  return Math.random().toString(36).substring(2, 15);
}

渲染优化

// 虚拟滚动(处理长对话)
import { useVirtualizer } from '@tanstack/react-virtual';

function VirtualMessageList({ messages }: { messages: Message[] }) {
  const parentRef = useRef<HTMLDivElement>(null);
  
  const virtualizer = useVirtualizer({
    count: messages.length,
    getScrollElement: () => parentRef.current,
    estimateSize: () => 80,
    overscan: 5,
  });

  return (
    <div ref={parentRef} className="messages-container">
      <div
        style={{
          height: `${virtualizer.getTotalSize()}px`,
          width: '100%',
          position: 'relative',
        }}
      >
        {virtualizer.getVirtualItems().map(virtualItem => (
          <div
            key={virtualItem.key}
            style={{
              position: 'absolute',
              top: 0,
              left: 0,
              width: '100%',
              transform: `translateY(${virtualItem.start}px)`,
            }}
          >
            <MessageItem message={messages[virtualItem.index]} />
          </div>
        ))}
      </div>
    </div>
  );
}

// 打字机效果优化(requestAnimationFrame)
function useTypewriterEffect(content: string, speed: number = 30) {
  const [displayed, setDisplayed] = useState('');
  const indexRef = useRef(0);
  const rafRef = useRef<number>();

  useEffect(() => {
    indexRef.current = 0;
    setDisplayed('');

    const animate = () => {
      if (indexRef.current < content.length) {
        indexRef.current += 1;
        setDisplayed(content.slice(0, indexRef.current));
        rafRef.current = requestAnimationFrame(animate);
      }
    };

    rafRef.current = requestAnimationFrame(animate);

    return () => {
      if (rafRef.current) {
        cancelAnimationFrame(rafRef.current);
      }
    };
  }, [content]);

  return displayed;
}

连接可靠性保障

心跳与保活

// SSE 心跳发送器
func (s *ChatServer) sendHeartbeat(w http.ResponseWriter, flusher http.Flusher, stop <-chan struct{}) {
    ticker := time.NewTicker(15 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-stop:
            return
        case <-ticker.C:
            fmt.Fprintf(w, "event: heartbeat\n")
            fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]int64{
                "timestamp": time.Now().Unix(),
            }))
            flusher.Flush()
        }
    }
}

断线重连(前端)

class ResumableSSE {
  private url: string;
  private eventSource: EventSource | null = null;
  private reconnectAttempts = 0;
  private maxReconnects = 5;
  private baseDelay = 1000;
  private messageBuffer: string[] = [];
  private lastEventId: string | null = null;

  constructor(url: string) {
    this.url = url;
  }

  connect() {
    const url = this.lastEventId 
      ? `${this.url}?last_event_id=${this.lastEventId}`
      : this.url;

    this.eventSource = new EventSource(url);

    this.eventSource.onopen = () => {
      console.log('SSE connected');
      this.reconnectAttempts = 0;
    };

    this.eventSource.onmessage = (event) => {
      this.lastEventId = event.lastEventId;
      this.handleMessage(event.data);
    };

    this.eventSource.onerror = () => {
      this.eventSource?.close();
      this.reconnect();
    };
  }

  private reconnect() {
    if (this.reconnectAttempts >= this.maxReconnects) {
      console.error('Max reconnects reached');
      return;
    }

    const delay = this.baseDelay * Math.pow(2, this.reconnectAttempts);
    this.reconnectAttempts++;

    console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
    
    setTimeout(() => this.connect(), delay);
  }

  private handleMessage(data: string) {
    // 处理消息
    this.messageBuffer.push(data);
  }

  disconnect() {
    this.eventSource?.close();
  }
}

生产部署策略

Nginx 配置

# SSE 优化配置
location /chat {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    
    # 禁用缓冲
    proxy_buffering off;
    proxy_cache off;
    
    # 长连接
    proxy_read_timeout 300s;
    proxy_send_timeout 300s;
    
    # 头部传递
    proxy_set_header Connection "";
    proxy_set_header Cache-Control "no-cache";
    
    # CORS
    add_header Access-Control-Allow-Origin *;
    add_header Access-Control-Allow-Methods "POST, OPTIONS";
}

# 负载均衡
upstream backend {
    least_conn;  # 最少连接算法
    server backend1:8080;
    server backend2:8080;
    server backend3:8080;
}

Kubernetes 部署

apiVersion: apps/v1
kind: Deployment
metadata:
  name: chat-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: chat-server
  template:
    metadata:
      labels:
        app: chat-server
    spec:
      containers:
      - name: server
        image: chat-server:latest
        ports:
        - containerPort: 8080
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        env:
        - name: GOMAXPROCS
          value: "2"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: chat-server
spec:
  selector:
    app: chat-server
  ports:
  - port: 80
    targetPort: 8080
  sessionAffinity: ClientIP  # 会话亲和性
  sessionAffinityConfig:
    clientIP:
      timeoutSeconds: 300

常见问题深度解析

Q:SSE 连接数过多怎么办?

A

  1. 连接复用:一个 SSE 连接处理多个对话(通过消息 ID 区分)
  2. 连接池:限制每用户并发连接数
  3. 长轮询降级:高负载时降级为长轮询
  4. 水平扩展:增加后端实例,使用 Redis 共享会话状态

Q:如何防止消息乱序?

A

  1. 服务端序列号:每个 Event 带全局递增序列号
  2. 客户端缓冲:收到乱序消息时缓冲,等待缺失消息
  3. 超时机制:缺失消息超过 5 秒未到达,请求重传
// 带序列号的事件包装
type SequencedEvent struct {
    Sequence int64 `json:"seq"`
    Event    Event `json:"event"`
}

Q:移动端弱网环境如何优化?

A

  1. 消息合并:网络差时合并多个 TextDelta 为一批
  2. 降级策略:弱网时关闭打字机效果,直接显示完整结果
  3. 本地缓存:缓存历史消息,减少重复加载
  4. 断线提示:明确提示用户连接状态

小结

模块 6 完成。深入学习了:

  • Streaming 事件模型与类型系统
  • Event 处理:背压控制、有序消费、错误恢复
  • 多模态 Streaming:音视频图像的流式处理
  • 流式界面实战:SSE/WebSocket 架构、前端渲染优化、生产部署

P1 阶段全部完成。共 28 篇文章推送完毕。

多模态 Streaming | Agent Runtime 架构 →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。