前端 SSE 消费设计

前端消费 SSE 流式消息的设计:为什么用 fetch 而不是 EventSource、事件分发架构、Buffer 处理、中断控制。

#type / concept #status / growing #tech / dev / frontend

[!info] related notes

前端 SSE 消费设计

核心问题

AI 对话的流式输出需要前端逐字显示(打字机效果),同时还要处理多种事件类型(文本、症状提取、安全警告、引用来源)。怎么设计一个通用的 SSE 消费层?

为什么用 fetch 而不是 EventSource

维度EventSourcefetch + ReadableStream
方法只支持 GET支持 POST
Header不能自定义可以加 Authorization
事件解析内置手动
错误处理自动重连手动控制

AI 对话需要 POST 请求(发送消息)和 JWT header,所以必须用 fetch。

事件类型设计

一个好的 SSE 事件类型设计应该语义明确、前端可直接分发

事件类型数据前端处理
text{content: "..."}追加到当前消息
extracted_info{info: {...}}更新人体可视化面板
phase_changed{phase: "analysis_ready"}更新诊断面板
red_flag{has_red_flags, flags}显示安全警告
citation{citation: {...}}显示引用来源
done{session_id, full_text}流结束,更新状态

设计要点

  • 每种事件类型对应一个明确的 UI 行为
  • 前端用 switch/case 分发,不需要复杂的解析逻辑
  • done 事件携带最终数据,前端可以做最终状态更新

Hook 设计

function useChatSSE(callbacks: {
  onText: (text: string) => void,
  onExtractedInfo: (info: ExtractedInfo) => void,
  onPhaseChange: (phase: string) => void,
  onRedFlag: (event: RedFlagEvent) => void,
  onCitation: (citation: Citation) => void,
  onDone: (event: DoneEvent) => void,
  onError?: (error: string) => void,
}): {
  sendMessage: (sessionId: string, content: string) => void,
  isStreaming: boolean,
  abort: () => void,
}

每个回调对应一种事件类型,组件按需传入。

SSE 行解析

function processSSELine(line: string, currentEvent: string, callbacks): string {
  if (line.startsWith('event:')) {
    return line.slice(6).trim();  // 设置事件类型
  }
  if (line.startsWith('data:')) {
    const data = JSON.parse(line.slice(5).trim());
    dispatch(data, currentEvent, callbacks);  // 分发
    return 'message';  // 重置为默认
  }
  return currentEvent;
}

Buffer 处理

SSE 数据是流式到达的,一个 chunk 可能包含多个完整事件,也可能在事件中间截断:

buffer += decoder.decode(value, { stream: true });  // stream: true 保留未完成字符
const lines = buffer.split('\n');
buffer = lines.pop() || '';  // 最后一行可能不完整,留到下次处理

关键stream: true 参数让 TextDecoder 保留跨 chunk 的多字节字符(如中文)。

中断控制

const controller = new AbortController();

fetch(url, { signal: controller.signal, ... });

// 用户发新消息时中止之前的流
abort();

// 组件卸载时中止
useEffect(() => () => abort(), []);

常见错误

内存泄漏

组件卸载时必须中止流,否则 fetch 继续在后台运行。

忘记处理 buffer 尾部

// ❌ 循环结束后丢弃剩余 buffer
while (true) { ... }

// ✅ 处理最后的 buffer
if (buffer.trim()) {
  processSSELine(buffer.trim(), currentEvent, callbacks);
}

中文乱码

// ❌ 不用 stream 模式
decoder.decode(value)  // 跨 chunk 的中文字符会被截断

// ✅ 用 stream 模式
decoder.decode(value, { stream: true })
创建于 2026/6/25 更新于 2026/6/25