SSE 流式输出的暂停与中断

AI 流式对话中用户暂停或取消生成时的前后端状态同步、请求中断和数据一致性保障。

#tech / ai #type / concept #status / growing

[!info] related notes

SSE 流式输出的暂停与中断

一句话定义

SSE 流式输出的暂停与中断,是当用户在 AI 对话中点击”停止生成”时,前端取消请求、后端停止模型调用、消息状态一致更新的完整工程方案。

核心机制 / 工作原理

消息状态机

把单条消息的生命周期抽象为状态机:

idle → streaming → done
                  → cancelling → cancelled
                  → error
  • idle:消息已创建,等待生成
  • streaming:正在接收 chunk
  • cancelling:用户触发停止,等待确认
  • cancelled:已停止,保留已生成的部分内容
  • done:生成完成
  • error:发生错误

前端:AbortController

每次发起 SSE 请求时创建 AbortController

const controller = new AbortController();

fetch('/api/chat', {
  method: 'POST',
  body: JSON.stringify(payload),
  signal: controller.signal,
});

// 用户点击停止时
function handleStop() {
  message.status = 'cancelling';
  controller.abort();
}

controller.abort() 会:

  • 关闭前端的 fetch 连接
  • 触发后端的 req.on('close') 事件

后端:感知连接关闭

app.post('/api/chat', async (req, res) => {
  const stream = await llm.stream(messages);

  req.on('close', () => {
    // 连接被客户端关闭,停止模型流
    stream.controller.abort();
  });

  for await (const chunk of stream) {
    if (req.destroyed) break; // 双重检查
    res.write(`data: ${JSON.stringify(chunk)}\n\n`);
  }
});

防止旧 chunk 继续写入

流式回调中每次 append chunk 前检查:

function handleChunk(messageId: string, chunk: string) {
  // 检查当前消息是否仍处于 streaming 状态
  if (currentMessage.id !== messageId || currentMessage.status !== 'streaming') {
    return; // 丢弃过期 chunk
  }
  currentMessage.content += chunk;
}

中断后消息处理

  • cancelled 状态的消息保留已生成的部分内容,不删除
  • 用户可以点击”继续生成”从断点续接
  • 如果不支持续接,标记为 partial 并允许重新生成

最小例子 / 最小场景

用户提问后 AI 开始流式输出。输出到一半时用户点击”停止”。前端调用 controller.abort(),后端感知连接关闭后停止模型请求,消息状态变为 cancelled,已输出的内容保留在界面上。

边界与常见误解

  • EventSource 没有原生 abort 方法;用 fetch + ReadableStream 更灵活
  • 停止前端 UI 不等于停止后端模型请求;必须显式中断连接
  • nginx 等代理可能缓冲 SSE 数据;需要关闭 proxy_buffering
  • cancelled 不是 error,已生成的内容应该保留,不是清空
创建于 2026/5/29 更新于 2026/5/29