SSE 流式输出的暂停与中断
AI 流式对话中用户暂停或取消生成时的前后端状态同步、请求中断和数据一致性保障。
#tech / ai
#type / concept
#status / growing
[!info] related notes
- 前置概念: SSE
- 并列概念: AI 对话与流式界面
- 关系笔记: AI 对话前端性能优化
SSE 流式输出的暂停与中断
一句话定义
SSE 流式输出的暂停与中断,是当用户在 AI 对话中点击”停止生成”时,前端取消请求、后端停止模型调用、消息状态一致更新的完整工程方案。
核心机制 / 工作原理
消息状态机
把单条消息的生命周期抽象为状态机:
idle → streaming → done
→ cancelling → cancelled
→ error
idle:消息已创建,等待生成streaming:正在接收 chunkcancelling:用户触发停止,等待确认cancelled:已停止,保留已生成的部分内容done:生成完成error:发生错误
前端:AbortController
每次发起 SSE 请求时创建 AbortController:
const controller = new AbortController();
fetch('/api/chat', {
method: 'POST',
body: JSON.stringify(payload),
signal: controller.signal,
});
// 用户点击停止时
function handleStop() {
message.status = 'cancelling';
controller.abort();
}
controller.abort() 会:
- 关闭前端的 fetch 连接
- 触发后端的
req.on('close')事件
后端:感知连接关闭
app.post('/api/chat', async (req, res) => {
const stream = await llm.stream(messages);
req.on('close', () => {
// 连接被客户端关闭,停止模型流
stream.controller.abort();
});
for await (const chunk of stream) {
if (req.destroyed) break; // 双重检查
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
});
防止旧 chunk 继续写入
流式回调中每次 append chunk 前检查:
function handleChunk(messageId: string, chunk: string) {
// 检查当前消息是否仍处于 streaming 状态
if (currentMessage.id !== messageId || currentMessage.status !== 'streaming') {
return; // 丢弃过期 chunk
}
currentMessage.content += chunk;
}
中断后消息处理
cancelled状态的消息保留已生成的部分内容,不删除- 用户可以点击”继续生成”从断点续接
- 如果不支持续接,标记为
partial并允许重新生成
最小例子 / 最小场景
用户提问后 AI 开始流式输出。输出到一半时用户点击”停止”。前端调用 controller.abort(),后端感知连接关闭后停止模型请求,消息状态变为 cancelled,已输出的内容保留在界面上。
边界与常见误解
EventSource没有原生 abort 方法;用fetch+ReadableStream更灵活- 停止前端 UI 不等于停止后端模型请求;必须显式中断连接
- nginx 等代理可能缓冲 SSE 数据;需要关闭
proxy_buffering cancelled不是error,已生成的内容应该保留,不是清空