前端 SSE 消费设计
前端消费 SSE 流式消息的设计:为什么用 fetch 而不是 EventSource、事件分发架构、Buffer 处理、中断控制。
#type / concept
#status / growing
#tech / dev / frontend
[!info] related notes
- 前置: SSE
- 后端配合: 多服务 SSE 管道
- 上层应用: AI Chat UI
前端 SSE 消费设计
核心问题
AI 对话的流式输出需要前端逐字显示(打字机效果),同时还要处理多种事件类型(文本、症状提取、安全警告、引用来源)。怎么设计一个通用的 SSE 消费层?
为什么用 fetch 而不是 EventSource
| 维度 | EventSource | fetch + ReadableStream |
|---|---|---|
| 方法 | 只支持 GET | 支持 POST |
| Header | 不能自定义 | 可以加 Authorization |
| 事件解析 | 内置 | 手动 |
| 错误处理 | 自动重连 | 手动控制 |
AI 对话需要 POST 请求(发送消息)和 JWT header,所以必须用 fetch。
事件类型设计
一个好的 SSE 事件类型设计应该语义明确、前端可直接分发:
| 事件类型 | 数据 | 前端处理 |
|---|---|---|
| text | {content: "..."} | 追加到当前消息 |
| extracted_info | {info: {...}} | 更新人体可视化面板 |
| phase_changed | {phase: "analysis_ready"} | 更新诊断面板 |
| red_flag | {has_red_flags, flags} | 显示安全警告 |
| citation | {citation: {...}} | 显示引用来源 |
| done | {session_id, full_text} | 流结束,更新状态 |
设计要点:
- 每种事件类型对应一个明确的 UI 行为
- 前端用 switch/case 分发,不需要复杂的解析逻辑
- done 事件携带最终数据,前端可以做最终状态更新
Hook 设计
function useChatSSE(callbacks: {
onText: (text: string) => void,
onExtractedInfo: (info: ExtractedInfo) => void,
onPhaseChange: (phase: string) => void,
onRedFlag: (event: RedFlagEvent) => void,
onCitation: (citation: Citation) => void,
onDone: (event: DoneEvent) => void,
onError?: (error: string) => void,
}): {
sendMessage: (sessionId: string, content: string) => void,
isStreaming: boolean,
abort: () => void,
}
每个回调对应一种事件类型,组件按需传入。
SSE 行解析
function processSSELine(line: string, currentEvent: string, callbacks): string {
if (line.startsWith('event:')) {
return line.slice(6).trim(); // 设置事件类型
}
if (line.startsWith('data:')) {
const data = JSON.parse(line.slice(5).trim());
dispatch(data, currentEvent, callbacks); // 分发
return 'message'; // 重置为默认
}
return currentEvent;
}
Buffer 处理
SSE 数据是流式到达的,一个 chunk 可能包含多个完整事件,也可能在事件中间截断:
buffer += decoder.decode(value, { stream: true }); // stream: true 保留未完成字符
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // 最后一行可能不完整,留到下次处理
关键:stream: true 参数让 TextDecoder 保留跨 chunk 的多字节字符(如中文)。
中断控制
const controller = new AbortController();
fetch(url, { signal: controller.signal, ... });
// 用户发新消息时中止之前的流
abort();
// 组件卸载时中止
useEffect(() => () => abort(), []);
常见错误
内存泄漏
组件卸载时必须中止流,否则 fetch 继续在后台运行。
忘记处理 buffer 尾部
// ❌ 循环结束后丢弃剩余 buffer
while (true) { ... }
// ✅ 处理最后的 buffer
if (buffer.trim()) {
processSSELine(buffer.trim(), currentEvent, callbacks);
}
中文乱码
// ❌ 不用 stream 模式
decoder.decode(value) // 跨 chunk 的中文字符会被截断
// ✅ 用 stream 模式
decoder.decode(value, { stream: true })