Node.js Stream
Node.js Stream 的四种类型、pipe 机制、背压与 highWaterMark 的工作原理。
#type / concept
#status / growing
#tech / dev / backend
#resource / nodejs
#resource / javascript
[!info] related notes
- 所属 MOC: Node.js MOC
- 相关概念: Node.js Buffer, Node.js 事件循环阶段
Node.js Stream
一句话定义
Stream 是 Node.js 处理流式数据的核心抽象,让你不用一次性把全部数据加载到内存,而是分块读写。
四种流类型
| 类型 | 说明 | 典型场景 |
|---|---|---|
| Readable | 可读流,数据源 | fs.createReadStream, http.IncomingMessage |
| Writable | 可写流,数据目的地 | fs.createWriteStream, http.ServerResponse |
| Duplex | 可读可写,双工 | net.Socket, TCP 连接 |
| Transform | 读写+转换 | zlib.createGzip, crypto.createCipher |
基本用法
import fs from 'node:fs';
import { pipeline } from 'node:stream/promises';
import zlib from 'node:zlib';
// 读取文件流
const readable = fs.createReadStream('input.txt', { encoding: 'utf8' });
const writable = fs.createWriteStream('output.txt');
// 最基本的 pipe
readable.pipe(writable);
// 使用 pipeline(推荐,自动处理错误和清理)
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz')
);
pipe 与背压
pipe() 的核心价值是自动管理背压(backpressure)。
背压发生场景:可读流产生数据的速度 > 可写流消费数据的速度。
// pipe 自动处理:当 writable 的缓冲区满了,
// pipe 会调用 readable.pause(),等 writable drain 后再 resume
readable.pipe(writable);
// 手动处理背压
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
readable.pause(); // 暂停读取
writable.once('drain', () => {
readable.resume(); // 缓冲区排空后恢复
});
}
});
highWaterMark
highWaterMark(高水位线)控制内部缓冲区大小,单位取决于是否传入 objectMode:
// 默认 64KB(非 objectMode)
const readable = fs.createReadStream('file.txt', {
highWaterMark: 16 * 1024 // 16KB
});
// objectMode 下默认 16 个对象
const transform = new Transform({
objectMode: true,
highWaterMark: 100 // 缓冲 100 个对象
});
当内部缓冲区大小超过 highWaterMark 时,write() 返回 false,readable 触发背压。
自定义 Transform 流
import { Transform } from 'node:stream';
const upperCase = new Transform({
transform(chunk, encoding, callback) {
// 将数据转为大写
callback(null, chunk.toString().toUpperCase());
}
});
process.stdin.pipe(upperCase).pipe(process.stdout);
流的错误处理
// 错误不会通过 pipe 自动传播,必须逐个监听
readable.on('error', handleError);
writable.on('error', handleError);
// pipeline 自动传播错误并清理资源
import { pipeline } from 'node:stream/promises';
try {
await pipeline(readable, transform, writable);
} catch (err) {
console.error('Pipeline failed:', err);
}
何时使用 Stream
- 大文件处理:读写 GB 级文件,内存占用恒定
- 网络传输:HTTP 请求/响应天然是流
- 数据转换链:gzip、加密、CSV 解析等管道式处理
- 实时数据:日志、传感器数据等持续到达的数据源
常见误区
- 不要忽略背压:不处理背压会导致内存暴涨
pipe不传播错误:用pipeline代替stream.destroy()释放资源:异常退出时记得调用objectMode和字节模式不能混 pipe