流式聊天界面深度实战:从 SSE 到 WebSocket,构建生产级实时交互系统
深入剖析 ADK Go 流式聊天界面的完整架构,涵盖 SSE/WebSocket 协议选型、消息可靠性、连接管理、前端渲染优化与生产级部署策略。
流式聊天界面深度实战:从 SSE 到 WebSocket,构建生产级实时交互系统
流式聊天界面是用户与 AI Agent 交互的核心载体。一个优秀的流式聊天系统需要在低延迟、高可靠性、良好用户体验之间取得平衡。本文将深入 SSE 与 WebSocket 的技术选型、消息可靠性保障、连接生命周期管理、前端渲染优化与生产级部署策略。
传输协议深度对比
SSE vs WebSocket
| 维度 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 单向(服务器→客户端) | 双向 |
| 协议基础 | HTTP | TCP + WebSocket 握手 |
| 自动重连 | 原生支持 | 需手动实现 |
| 浏览器兼容 | 优秀(IE 除外) | 良好 |
| 穿透代理 | 优秀 | 可能需配置 |
| 心跳机制 | 原生 | 需自定义 |
| 多路复用 | 单连接单流 | 单连接多流 |
| 二进制支持 | Base64 编码 | 原生支持 |
| 适用场景 | 服务器推送 | 双向实时通信 |
选型决策树:
是否需要客户端主动推送?
│
是 ─┼──→ WebSocket
否 ─┘
│
是否需要双向低延迟?(如协同编辑)
│
是 ─┼──→ WebSocket
否 ─┘
│
是否需要简单实现 + 自动重连?
│
是 ─┼──→ SSE
否 ─┘
│
▼
默认 SSE(大多数 AI 聊天场景)
生产建议:AI 聊天场景首选 SSE,因为:
- 单向通信足够(用户输入通过 HTTP POST,输出通过 SSE)
- 自动重连减少客户端复杂度
- HTTP 基础设施成熟(负载均衡、CDN、WAF 都支持)
生产级 SSE 架构
服务端实现
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
"github.com/google/adk-go/agent"
"github.com/google/adk-go/event"
"github.com/google/uuid"
)
// ChatServer 生产级聊天服务器
type ChatServer struct {
agent *agent.Agent
sessions *SessionManager
rateLimiter *RateLimiter
metrics *ChatMetrics
upgrader *websocket.Upgrader // WebSocket 支持
}
// Session 管理器
type SessionManager struct {
sessions map[string]*ChatSession
mu sync.RWMutex
ttl time.Duration
}
type ChatSession struct {
ID string
UserID string
History []Message
CreatedAt time.Time
LastActivity time.Time
CancelFunc context.CancelFunc
}
type Message struct {
ID string `json:"id"`
Role string `json:"role"` // "user" | "assistant" | "system"
Content string `json:"content"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// SSE 处理器
func (s *ChatServer) handleSSE(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// 1. 限流检查
clientIP := getClientIP(r)
if !s.rateLimiter.Allow(clientIP) {
http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
return
}
// 2. 获取或创建会话
sessionID := r.URL.Query().Get("session_id")
if sessionID == "" {
sessionID = uuid.New().String()
}
session, err := s.sessions.GetOrCreate(sessionID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 3. 读取用户输入
var req ChatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// 4. 设置 SSE 头部
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
// 5. 发送会话 ID(首条事件)
fmt.Fprintf(w, "event: session\n")
fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]string{"session_id": sessionID}))
flusher.Flush()
// 6. 创建可取消的上下文
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
session.CancelFunc = cancel
// 7. 记录用户消息
userMsg := Message{
ID: uuid.New().String(),
Role: "user",
Content: req.Message,
Timestamp: time.Now(),
}
session.History = append(session.History, userMsg)
session.LastActivity = time.Now()
// 8. 启动 Streaming
startTime := time.Now()
messageID := uuid.New().String()
// 发送消息开始事件
fmt.Fprintf(w, "event: message_start\n")
fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
"message_id": messageID,
"role": "assistant",
}))
flusher.Flush()
// 执行 Agent Streaming
stream, err := s.agent.RunStream(ctx, req.Message,
agent.WithHistory(session.History))
if err != nil {
s.sendError(w, flusher, err)
return
}
defer stream.Close()
var fullContent strings.Builder
tokenCount := 0
for {
ev, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
s.sendError(w, flusher, err)
return
}
switch e := ev.(type) {
case *event.TextDeltaEvent:
fullContent.WriteString(e.Text)
tokenCount += len(e.Text)
// 发送文本增量
fmt.Fprintf(w, "event: text_delta\n")
fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
"message_id": messageID,
"delta": e.Text,
"index": e.Index,
}))
flusher.Flush()
case *event.ToolCallEvent:
// 发送工具调用事件(UI 可展示加载状态)
fmt.Fprintf(w, "event: tool_call\n")
fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
"message_id": messageID,
"tool_name": e.ToolName,
"tool_id": e.ToolID,
}))
flusher.Flush()
case *event.ToolResultEvent:
fmt.Fprintf(w, "event: tool_result\n")
fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
"message_id": messageID,
"tool_id": e.ToolID,
"result": e.Result,
}))
flusher.Flush()
case *event.ThinkingEvent:
// 可选:展示思考过程
fmt.Fprintf(w, "event: thinking\n")
fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
"message_id": messageID,
"content": e.Content,
}))
flusher.Flush()
}
}
// 9. 发送完成事件
latency := time.Since(startTime)
fmt.Fprintf(w, "event: message_end\n")
fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]interface{}{
"message_id": messageID,
"content": fullContent.String(),
"token_count": tokenCount,
"latency_ms": latency.Milliseconds(),
}))
flusher.Flush()
// 10. 保存助手回复到历史
assistantMsg := Message{
ID: messageID,
Role: "assistant",
Content: fullContent.String(),
Timestamp: time.Now(),
Metadata: map[string]interface{}{
"latency_ms": latency.Milliseconds(),
"token_count": tokenCount,
},
}
session.History = append(session.History, assistantMsg)
session.LastActivity = time.Now()
// 记录指标
s.metrics.RecordMessage(latency, tokenCount)
}
func (s *ChatServer) sendError(w http.ResponseWriter, flusher http.Flusher, err error) {
fmt.Fprintf(w, "event: error\n")
fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]string{
"error": err.Error(),
}))
flusher.Flush()
}
func mustJSON(v interface{}) string {
b, _ := json.Marshal(v)
return string(b)
}
限流与防护
// 令牌桶限流器
type RateLimiter struct {
buckets map[string]*TokenBucket
mu sync.RWMutex
rate float64 // 每秒令牌数
burst int // 桶容量
}
type TokenBucket struct {
tokens float64
lastUpdate time.Time
mu sync.Mutex
}
func (rl *RateLimiter) Allow(key string) bool {
rl.mu.Lock()
bucket, exists := rl.buckets[key]
if !exists {
bucket = &TokenBucket{
tokens: float64(rl.burst),
lastUpdate: time.Now(),
}
rl.buckets[key] = bucket
}
rl.mu.Unlock()
bucket.mu.Lock()
defer bucket.mu.Unlock()
// 补充令牌
now := time.Now()
elapsed := now.Sub(bucket.lastUpdate).Seconds()
bucket.tokens = math.Min(bucket.tokens+elapsed*rl.rate, float64(rl.burst))
bucket.lastUpdate = now
if bucket.tokens >= 1 {
bucket.tokens--
return true
}
return false
}
// IP 级别的连接限制
type ConnectionLimiter struct {
connections map[string]int
maxConn int
mu sync.RWMutex
}
func (cl *ConnectionLimiter) Acquire(ip string) bool {
cl.mu.Lock()
defer cl.mu.Unlock()
if cl.connections[ip] >= cl.maxConn {
return false
}
cl.connections[ip]++
return true
}
func (cl *ConnectionLimiter) Release(ip string) {
cl.mu.Lock()
defer cl.mu.Unlock()
if cl.connections[ip] > 0 {
cl.connections[ip]--
}
}
前端实时渲染
现代前端实现(React)
import React, { useState, useRef, useCallback, useEffect } from 'react';
interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
isStreaming?: boolean;
metadata?: {
latency_ms?: number;
token_count?: number;
};
}
interface ChatState {
messages: Message[];
sessionId: string | null;
isConnected: boolean;
isLoading: boolean;
}
export function ChatInterface() {
const [state, setState] = useState<ChatState>({
messages: [],
sessionId: null,
isConnected: false,
isLoading: false,
});
const [input, setInput] = useState('');
const messagesEndRef = useRef<HTMLDivElement>(null);
const eventSourceRef = useRef<EventSource | null>(null);
const abortControllerRef = useRef<AbortController | null>(null);
// 自动滚动到底部
const scrollToBottom = useCallback(() => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}, []);
useEffect(() => {
scrollToBottom();
}, [state.messages, scrollToBottom]);
// 发送消息
const sendMessage = useCallback(async () => {
if (!input.trim() || state.isLoading) return;
const userMessage: Message = {
id: generateId(),
role: 'user',
content: input.trim(),
};
setState(prev => ({
...prev,
messages: [...prev.messages, userMessage],
isLoading: true,
}));
setInput('');
// 创建 AbortController 用于取消
abortControllerRef.current = new AbortController();
try {
const response = await fetch('/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
message: userMessage.content,
session_id: state.sessionId,
}),
signal: abortControllerRef.current.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let buffer = '';
// 添加临时的 assistant 消息
const assistantId = generateId();
setState(prev => ({
...prev,
messages: [...prev.messages, {
id: assistantId,
role: 'assistant',
content: '',
isStreaming: true,
}],
}));
while (reader) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('event: ')) {
const eventType = line.slice(7);
// 处理事件类型
} else if (line.startsWith('data: ')) {
const data = line.slice(6);
handleSSEEvent(data, assistantId);
}
}
}
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
console.log('Request cancelled');
} else {
console.error('Chat error:', error);
setState(prev => ({
...prev,
messages: [...prev.messages, {
id: generateId(),
role: 'assistant',
content: '抱歉,发生了错误。请重试。',
}],
}));
}
} finally {
setState(prev => ({ ...prev, isLoading: false }));
}
}, [input, state.sessionId, state.isLoading]);
// 处理 SSE 事件
const handleSSEEvent = useCallback((data: string, assistantId: string) => {
try {
const event = JSON.parse(data);
setState(prev => {
const messages = [...prev.messages];
const assistantIndex = messages.findIndex(m => m.id === assistantId);
if (assistantIndex === -1) return prev;
switch (event.type || event.event) {
case 'session':
return { ...prev, sessionId: event.session_id };
case 'text_delta':
messages[assistantIndex] = {
...messages[assistantIndex],
content: messages[assistantIndex].content + event.delta,
};
return { ...prev, messages };
case 'tool_call':
// 可以展示工具调用状态
messages[assistantIndex] = {
...messages[assistantIndex],
content: messages[assistantIndex].content + `\n[使用工具: ${event.tool_name}]\n`,
};
return { ...prev, messages };
case 'message_end':
messages[assistantIndex] = {
...messages[assistantIndex],
content: event.content || messages[assistantIndex].content,
isStreaming: false,
metadata: {
latency_ms: event.latency_ms,
token_count: event.token_count,
},
};
return { ...prev, messages };
case 'error':
messages[assistantIndex] = {
...messages[assistantIndex],
content: `错误: ${event.error}`,
isStreaming: false,
};
return { ...prev, messages, isLoading: false };
}
return prev;
});
} catch (e) {
console.error('Parse SSE data error:', e);
}
}, []);
// 取消生成
const cancelGeneration = useCallback(() => {
abortControllerRef.current?.abort();
setState(prev => ({ ...prev, isLoading: false }));
}, []);
return (
<div className="chat-container">
<div className="messages">
{state.messages.map(msg => (
<div key={msg.id} className={`message ${msg.role}`}>
<div className="message-content">
{msg.content}
{msg.isStreaming && <span className="cursor">▊</span>}
</div>
{msg.metadata && (
<div className="message-meta">
{msg.metadata.token_count && `${msg.metadata.token_count} tokens`}
{msg.metadata.latency_ms && ` · ${msg.metadata.latency_ms}ms`}
</div>
)}
</div>
))}
<div ref={messagesEndRef} />
</div>
<div className="input-area">
<textarea
value={input}
onChange={e => setInput(e.target.value)}
onKeyDown={e => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
}}
placeholder="输入消息..."
disabled={state.isLoading}
/>
<button
onClick={state.isLoading ? cancelGeneration : sendMessage}
className={state.isLoading ? 'cancel' : 'send'}
>
{state.isLoading ? '停止' : '发送'}
</button>
</div>
</div>
);
}
function generateId(): string {
return Math.random().toString(36).substring(2, 15);
}
渲染优化
// 虚拟滚动(处理长对话)
import { useVirtualizer } from '@tanstack/react-virtual';
function VirtualMessageList({ messages }: { messages: Message[] }) {
const parentRef = useRef<HTMLDivElement>(null);
const virtualizer = useVirtualizer({
count: messages.length,
getScrollElement: () => parentRef.current,
estimateSize: () => 80,
overscan: 5,
});
return (
<div ref={parentRef} className="messages-container">
<div
style={{
height: `${virtualizer.getTotalSize()}px`,
width: '100%',
position: 'relative',
}}
>
{virtualizer.getVirtualItems().map(virtualItem => (
<div
key={virtualItem.key}
style={{
position: 'absolute',
top: 0,
left: 0,
width: '100%',
transform: `translateY(${virtualItem.start}px)`,
}}
>
<MessageItem message={messages[virtualItem.index]} />
</div>
))}
</div>
</div>
);
}
// 打字机效果优化(requestAnimationFrame)
function useTypewriterEffect(content: string, speed: number = 30) {
const [displayed, setDisplayed] = useState('');
const indexRef = useRef(0);
const rafRef = useRef<number>();
useEffect(() => {
indexRef.current = 0;
setDisplayed('');
const animate = () => {
if (indexRef.current < content.length) {
indexRef.current += 1;
setDisplayed(content.slice(0, indexRef.current));
rafRef.current = requestAnimationFrame(animate);
}
};
rafRef.current = requestAnimationFrame(animate);
return () => {
if (rafRef.current) {
cancelAnimationFrame(rafRef.current);
}
};
}, [content]);
return displayed;
}
连接可靠性保障
心跳与保活
// SSE 心跳发送器
func (s *ChatServer) sendHeartbeat(w http.ResponseWriter, flusher http.Flusher, stop <-chan struct{}) {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for {
select {
case <-stop:
return
case <-ticker.C:
fmt.Fprintf(w, "event: heartbeat\n")
fmt.Fprintf(w, "data: %s\n\n", mustJSON(map[string]int64{
"timestamp": time.Now().Unix(),
}))
flusher.Flush()
}
}
}
断线重连(前端)
class ResumableSSE {
private url: string;
private eventSource: EventSource | null = null;
private reconnectAttempts = 0;
private maxReconnects = 5;
private baseDelay = 1000;
private messageBuffer: string[] = [];
private lastEventId: string | null = null;
constructor(url: string) {
this.url = url;
}
connect() {
const url = this.lastEventId
? `${this.url}?last_event_id=${this.lastEventId}`
: this.url;
this.eventSource = new EventSource(url);
this.eventSource.onopen = () => {
console.log('SSE connected');
this.reconnectAttempts = 0;
};
this.eventSource.onmessage = (event) => {
this.lastEventId = event.lastEventId;
this.handleMessage(event.data);
};
this.eventSource.onerror = () => {
this.eventSource?.close();
this.reconnect();
};
}
private reconnect() {
if (this.reconnectAttempts >= this.maxReconnects) {
console.error('Max reconnects reached');
return;
}
const delay = this.baseDelay * Math.pow(2, this.reconnectAttempts);
this.reconnectAttempts++;
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => this.connect(), delay);
}
private handleMessage(data: string) {
// 处理消息
this.messageBuffer.push(data);
}
disconnect() {
this.eventSource?.close();
}
}
生产部署策略
Nginx 配置
# SSE 优化配置
location /chat {
proxy_pass http://backend;
proxy_http_version 1.1;
# 禁用缓冲
proxy_buffering off;
proxy_cache off;
# 长连接
proxy_read_timeout 300s;
proxy_send_timeout 300s;
# 头部传递
proxy_set_header Connection "";
proxy_set_header Cache-Control "no-cache";
# CORS
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Methods "POST, OPTIONS";
}
# 负载均衡
upstream backend {
least_conn; # 最少连接算法
server backend1:8080;
server backend2:8080;
server backend3:8080;
}
Kubernetes 部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: chat-server
spec:
replicas: 3
selector:
matchLabels:
app: chat-server
template:
metadata:
labels:
app: chat-server
spec:
containers:
- name: server
image: chat-server:latest
ports:
- containerPort: 8080
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
env:
- name: GOMAXPROCS
value: "2"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: chat-server
spec:
selector:
app: chat-server
ports:
- port: 80
targetPort: 8080
sessionAffinity: ClientIP # 会话亲和性
sessionAffinityConfig:
clientIP:
timeoutSeconds: 300
常见问题深度解析
Q:SSE 连接数过多怎么办?
A:
- 连接复用:一个 SSE 连接处理多个对话(通过消息 ID 区分)
- 连接池:限制每用户并发连接数
- 长轮询降级:高负载时降级为长轮询
- 水平扩展:增加后端实例,使用 Redis 共享会话状态
Q:如何防止消息乱序?
A:
- 服务端序列号:每个 Event 带全局递增序列号
- 客户端缓冲:收到乱序消息时缓冲,等待缺失消息
- 超时机制:缺失消息超过 5 秒未到达,请求重传
// 带序列号的事件包装
type SequencedEvent struct {
Sequence int64 `json:"seq"`
Event Event `json:"event"`
}
Q:移动端弱网环境如何优化?
A:
- 消息合并:网络差时合并多个 TextDelta 为一批
- 降级策略:弱网时关闭打字机效果,直接显示完整结果
- 本地缓存:缓存历史消息,减少重复加载
- 断线提示:明确提示用户连接状态
小结
模块 6 完成。深入学习了:
- Streaming 事件模型与类型系统
- Event 处理:背压控制、有序消费、错误恢复
- 多模态 Streaming:音视频图像的流式处理
- 流式界面实战:SSE/WebSocket 架构、前端渲染优化、生产部署
P1 阶段全部完成。共 28 篇文章推送完毕。
← 多模态 Streaming | Agent Runtime 架构 →
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
