Node.js Stream

Node.js Stream 的四种类型、pipe 机制、背压与 highWaterMark 的工作原理。

#type / concept #status / growing #tech / dev / backend #resource / nodejs #resource / javascript

[!info] related notes

Node.js Stream

一句话定义

Stream 是 Node.js 处理流式数据的核心抽象,让你不用一次性把全部数据加载到内存,而是分块读写。

四种流类型

类型说明典型场景
Readable可读流,数据源fs.createReadStream, http.IncomingMessage
Writable可写流,数据目的地fs.createWriteStream, http.ServerResponse
Duplex可读可写,双工net.Socket, TCP 连接
Transform读写+转换zlib.createGzip, crypto.createCipher

基本用法

import fs from 'node:fs';
import { pipeline } from 'node:stream/promises';
import zlib from 'node:zlib';

// 读取文件流
const readable = fs.createReadStream('input.txt', { encoding: 'utf8' });
const writable = fs.createWriteStream('output.txt');

// 最基本的 pipe
readable.pipe(writable);

// 使用 pipeline(推荐,自动处理错误和清理)
await pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('input.txt.gz')
);

pipe 与背压

pipe() 的核心价值是自动管理背压(backpressure)

背压发生场景:可读流产生数据的速度 > 可写流消费数据的速度。

// pipe 自动处理:当 writable 的缓冲区满了,
// pipe 会调用 readable.pause(),等 writable drain 后再 resume
readable.pipe(writable);

// 手动处理背压
readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);
  if (!canContinue) {
    readable.pause();          // 暂停读取
    writable.once('drain', () => {
      readable.resume();       // 缓冲区排空后恢复
    });
  }
});

highWaterMark

highWaterMark(高水位线)控制内部缓冲区大小,单位取决于是否传入 objectMode

// 默认 64KB(非 objectMode)
const readable = fs.createReadStream('file.txt', {
  highWaterMark: 16 * 1024  // 16KB
});

// objectMode 下默认 16 个对象
const transform = new Transform({
  objectMode: true,
  highWaterMark: 100  // 缓冲 100 个对象
});

当内部缓冲区大小超过 highWaterMark 时,write() 返回 falsereadable 触发背压。

自定义 Transform 流

import { Transform } from 'node:stream';

const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    // 将数据转为大写
    callback(null, chunk.toString().toUpperCase());
  }
});

process.stdin.pipe(upperCase).pipe(process.stdout);

流的错误处理

// 错误不会通过 pipe 自动传播,必须逐个监听
readable.on('error', handleError);
writable.on('error', handleError);

// pipeline 自动传播错误并清理资源
import { pipeline } from 'node:stream/promises';
try {
  await pipeline(readable, transform, writable);
} catch (err) {
  console.error('Pipeline failed:', err);
}

何时使用 Stream

  • 大文件处理:读写 GB 级文件,内存占用恒定
  • 网络传输:HTTP 请求/响应天然是流
  • 数据转换链:gzip、加密、CSV 解析等管道式处理
  • 实时数据:日志、传感器数据等持续到达的数据源

常见误区

  • 不要忽略背压:不处理背压会导致内存暴涨
  • pipe 不传播错误:用 pipeline 代替
  • stream.destroy() 释放资源:异常退出时记得调用
  • objectMode 和字节模式不能混 pipe
创建于 2026/5/27 更新于 2026/5/27