聊聊Node.js stream 模块,看看如何构建高性能的应用
本篇文章带大家了解 Node stream 模块,介绍一下如何使用 Stream 构建高性能的 Node.js 应用,希望对大家有所帮助!
当你在键盘上输入字符,从磁盘读取文件或在网上下载文件时,一股信息流(bits)在流经不同的设备和应用。
如果你学会处理这些字节流,你将能构建高性能且有价值的应用。例如,试想一下当你在 YouTube 观看视频时,你不需要一直等待直到完整的视频下载完。一旦有一个小缓冲,视频就会开始播放,而剩下的会在你观看时继续下载。
Nodejs 包含一个内置模块 stream
可以让我们处理流数据。在这篇文章中,我们将通过几个简单的示例来讲解 stream
的用法,我们也会描述在面对复杂案例构建高性能应用时,应该如何构建管道去合并不同的流。
在我们深入理解应用构建前,理解 Node.js stream
模块提供的特性很重要。
让我们开始吧!
Node.js 流的类型
Node.js stream
提供了四种类型的流
- 可读流(Readable Streams)
- 可写流(Writable Streams)
- 双工流(Duplex Streams)
- 转换流(Transform Streams)
让我们在高层面来看看每一种流类型吧。
可读流
可读流可以从一个特定的数据源中读取数据,最常见的是从一个文件系统中读取。Node.js 应用中其他常见的可读流用法有:
process.stdin
-通过stdin
在终端应用中读取用户输入。http.IncomingMessage
- 在 HTTP 服务中读取传入的请求内容或者在 HTTP 客户端中读取服务器的 HTTP 响应。
可写流
你可以使用可写流将来自应用的数据写入到特定的地方,比如一个文件。
process.stdout
可以用来将数据写成标准输出且被 console.log
内部使用。
接下来是双工流和转换流,可以被定义为基于可读流和可写流的混合流类型。
双工流
双工流是可读流和可写流的结合,它既可以将数据写入到特定的地方也可以从数据源读取数据。最常见的双工流案例是 net.Socket
,它被用来从 socket 读写数据。
有一点很重要,双工流中的可读端和可写端的操作是相互独立的,数据不会从一端流向另一端。
转换流
转换流与双工流略有相似,但在转换流中,可读端和可写端是相关联的。
crypto.Cipher
类是一个很好的例子,它实现了加密流。通过 crypto.Cipher
流,应用可以往流的可写端写入纯文本数据并从流的可读端读取加密后的密文。之所以将这种类型的流称之为转换流就是因为其转换性质。
附注:另一个转换流是 stream.PassThrough
。stream.PassThrough
从可写端传递数据到可读端,没有任何转换。这听起来可能有点多余,但 Passthrough 流对构建自定义流以及流管道非常有帮助。(比如创建一个流的数据的多个副本)
从可读的 Node.js 流读取数据
一旦可读流连接到生产数据的源头,比如一个文件,就可以用几种方法通过该流读取数据。
首先,先创建一个名为 myfile
的简单的 text 文件,85 字节大小,包含以下字符串:
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur nec mauris turpis.
现在,我们看下从可读流读取数据的两种不同方式。
1. 监听 data
事件
从可读流读取数据的最常见方式是监听流发出的 data
事件。以下代码演示了这种方式:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); })
highWaterMark
属性作为一个选项传递给 fs.createReadStream
,用于决定该流中有多少数据缓冲。然后数据被冲到读取机制(在这个案例中,是我们的 data
处理程序)。默认情况下,可读 fs
流的 highWaterMark
值是 64kb。我们刻意重写该值为 20 字节用于触发多个 data
事件。
如果你运行上述程序,它会在五个迭代内从 myfile
中读取 85 个字节。你会在 console 看到以下输出:
Read 20 bytes "Lorem ipsum dolor si" Read 20 bytes "t amet, consectetur " Read 20 bytes "adipiscing elit. Cur" Read 20 bytes "abitur nec mauris tu" Read 5 bytes "rpis."
2. 使用异步迭代器
从可读流中读取数据的另一种方法是使用异步迭代器:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); (async () => { for await (const chunk of readable) { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); } })()
如果你运行这个程序,你会得到和前面例子一样的输出。
可读 Node.js 流的状态
当一个监听器监听到可读流的 data
事件时,流的状态会切换成”流动”状态(除非该流被显式的暂停了)。你可以通过流对象的 readableFlowing
属性检查流的”流动”状态
我们可以稍微修改下前面的例子,通过 data
处理器来示范:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); let bytesRead = 0 console.log(`before attaching 'data' handler. is flowing: ${readable.readableFlowing}`); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes`); bytesRead += chunk.length // 在从可读流中读取 60 个字节后停止阅读 if (bytesRead === 60) { readable.pause() console.log(`after pause() call. is flowing: ${readable.readableFlowing}`); // 在等待 1 秒后继续读取 setTimeout(() => { readable.resume() console.log(`after resume() call. is flowing: ${readable.readableFlowing}`); }, 1000) } }) console.log(`after attaching 'data' handler. is flowing: ${readable.readableFlowing}`);
在这个例子中,我们从一个可读流中读取 myfile
,但在读取 60 个字节后,我们临时暂停了数据流 1 秒。我们也在不同的时间打印了 readableFlowing
属性的值去理解他是如何变化的。
如果你运行上述程序,你会得到以下输出:
before attaching 'data' handler. is flowing: null after attaching 'data' handler. is flowing: true Read 20 bytes Read 20 bytes Read 20 bytes after pause() call. is flowing: false after resume() call. is flowing: true Read 20 bytes Read 5 bytes
我们可以用以下来解释输出:
当我们的程序开始时,
readableFlowing
的值是null
,因为我们没有提供任何消耗流的机制。在连接到
data
处理器后,可读流变为“流动”模式,readableFlowing
变为true
。一旦读取 60 个字节,通过调用
pause()
来暂停流,readableFlowing
也转变为false
。在等待 1 秒后,通过调用
resume()
,流再次切换为“流动”模式,readableFlowing
改为 `true'。然后剩下的文件内容在流中流动。
通过 Node.js 流处理大量数据
因为有流,应用不需要在内存中保留大型的二进制对象:小型的数据块可以接收到就进行处理。
在这部分,让我们组合不同的流来构建一个可以处理大量数据的真实应用。我们会使用一个小型的工具程序来生成一个给定文件的 SHA-256。
但首先,我们需要创建一个大型的 4GB 的假文件来测试。你可以通过一个简单的 shell 命令来完成:
- On macOS:
mkfile -n 4g 4gb_file
- On Linux:
xfs_mkfile 4096m 4gb_file
在我们创建了假文件 4gb_file
后,让我们在不使用 stream
模块的情况下来生成来文件的 SHA-256 hash。
const fs = require("fs"); const crypto = require("crypto"); fs.readFile("./4gb_file", (readErr, data) => { if (readErr) return console.log(readErr) const hash = crypto.createHash("sha256").update(data).digest("base64"); fs.writeFile("./checksum.txt", hash, (writeErr) => { writeErr && console.error(err) }); });
如果你运行以上代码,你可能会得到以下错误:
RangeError [ERR_FS_FILE_TOO_LARGE]: File size (4294967296) is greater than 2 GB at FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) { code: 'ERR_FS_FILE_TOO_LARGE' }