Node.js诞生之初就是为了提高IO性能,其中
文件操作系统和网络模块实现了流接口。Node.js中的Stream就是处理流式数据的抽象接口。



分段处理可以同时操作多个数据chunk内存空间管道,扩展程序变得简单可读流,能够实现数据的读取可写流,能够实现数据的写操作双工流,既可读又可写转换流,可读可写,还能实现数据修改转换将test中的内容拷贝到test1里边去
const fs = require('fs')
let rs = fs.createReadStream('./test.txt')
let ws = fs.createWriteStream('./test1.txt')
rs.pipe(ws) // pipe:管道操作
可读流就是专门生产供程序消费数据的流处于流的上游,node中最常见的数据生产方式其实就是读取磁盘文件或读取网络请求中的内容。如何自定义可读流?

const {Readable} = require('stream')
// 模拟底层数据
let source = ['111', '222', '333']
// 自定义类继承 Readable
class MyReadable extends Readable{
constructor(source) {
super()
this.source = source
}
// 重写_read的方法
_read() {
let data = this.source.shift() || null
this.push(data)
}
}
// 实例化
let myReadable = new MyReadable(source)
/* myReadable.on('readable', () => {
let data = null
// readable默认是暂停的模式,如果需要源源不断的读需要用到循环,然后调用read方法。read里的2表示读取的长度
while((data = myReadable.read(2)) != null) {
console.log(data.toString())
}
}) */
// data 默认是流动模式,能源源不断拿到所需要的值
myReadable.on('data', (chunk) => {
// console.log(chunk)
console.log(chunk.toString())
})
用于消费数据的流,处于流的下游。常见就是往磁盘中写入内容,或者对tcp或http的网络响应做出操作。

const {Writable} = require('stream')
class MyWriteable extends Writable{
constructor() {
super()
}
_write(chunk, en, done) {
process.stdout.write(chunk.toString() + '<----')
process.nextTick(done)
}
}
let myWriteable = new MyWriteable()
myWriteable.write('拉勾教育', 'utf-8', () => {
console.log('end')
})
Node.js中stream是流操作的抽象接口集合。
可读、可写、双工、转换是单一抽象的具体实现。
流操作的核心功能就是处理数据。
Node.js诞生之初就是解决密集型IO事务。
Node.js中凡是处理数据的模块都继承了流和EventEmitter
Duplex是双工流,既能生产数据又能消费数据。
let {Duplex} = require('stream')
class MyDuplex extends Duplex{
constructor(source) {
super()
this.source = source
}
_read() {
let data = this.source.shift() || null
this.push(data)
}
_write(chunk, en, next) {
process.stdout.write(chunk)
process.nextTick(next)
}
}
let source = ['a', 'b', 'c']
let myDuplex = new MyDuplex(source)
/* myDuplex.on('data', (chunk) => {
console.log(chunk.toString())
}) */
myDuplex.write('拉勾教育', () => {
console.log(1111)
})
let {Transform} = require('stream')
class MyTransform extends Transform{
constructor() {
super()
}
_transform(chunk, en, cb) {
this.push(chunk.toString().toUpperCase())
cb(null)
}
}
let t = new MyTransform()
t.write('a')
t.on('data', (chunk) => {
console.log(chunk.toString())
})
const fs = require('fs')
let rs = fs.createReadStream('test.txt', {
flags: 'r',
encoding: null,
fd: null,
mode: 438,
autoClose: true,
start: 0,
// end: 3,
highWaterMark: 4 // 缓存区缓存的字节数
})
// rs.on('data', (chunk) => { // data消费数据
// console.log(chunk.toString())
// rs.pause() // 暂停
// setTimeout(() => {
// rs.resume() // 流动
// }, 1000)
// })
// rs.on('readable', () => { // readable消费数据
// // let data = rs.read()
// // console.log(data)
// while((data = rs.read(1)) !== null) {
// console.log(data.toString())
// // 查看缓存中的字节数
// console.log('----------', rs._readableState.length)
// }
// })
rs.on('open', (fd) => {
console.log(fd, '文件打开了')
})
// 默认情况下close当前的流是一个暂停模式,所以close得在数据被消费或出错之后才能出触发
rs.on('close', () => {
console.log('文件关闭了')
})
let bufferArr = []
// 消费数据
rs.on('data', (chunk) => {
console.log(chunk);
// 因为这边接收到的是一个连续的数据,所以得先缓存起来,最后在end中拿到完整的数据
bufferArr.push(chunk)
})
// end在close之前执行,表示当前的数据都被消费完毕了
rs.on('end', () => {
// 在end中拿到完整的数据
console.log(Buffer.concat(bufferArr).toString())
console.log('当数据被清空之后')
})
rs.on('error', (err) => {
console.log('出错了')
})
const fs = require('fs')
const ws = fs.createWriteStream('test.txt', {
flags: 'w',
mode: 438,
fd: null,
encoding: "utf-8",
start: 0,
highWaterMark: 3
})
let buf = Buffer.from('abc')
// 字符串 或者 buffer ===》 fs rs
ws.write(buf, () => {
console.log('ok2')
})
ws.write('小星星', () => {
console.log('ok1')
})
ws.on('open', (fd) => {
console.log('open', fd)
})
ws.write("22")
// close 是在数据写入操作全部完成之后再执行
ws.on('close', () => {
console.log('文件关闭了')
})
// end 执行之后就意味着数据写入操作完成
ws.end('小星星')
// error
ws.on('error', (err) => {
console.log('出错了')
})
const fs = require('fs')
let ws = fs.createWriteStream('test.txt', {
highWaterMark: 3
})
let flag = ws.write('1')
console.log(flag) // true
flag = ws.write('2')
console.log(flag) // true
flag = ws.write('3')
console.log(flag) // false
// 如果 flag 为 false 并不是说明当前数据不能被执行写入
//
ws.on('drain', () => {
console.log('11') // 11
})

/**
* 需求:“小星星” 写入指定的文件
* 01 一次性写入
* 02 分批写入
* 对比:
*/
let fs = require('fs')
let ws = fs.createWriteStream('test.txt', {
highWaterMark: 3
})
// ws.write('小星星')
let source = "小星星".split('')
let num = 0
let flag = true
function executeWrite () {
flag = true
while(num !== 3 && flag) {
flag = ws.write(source[num])
num++
}
}
executeWrite()
ws.on('drain', () => {
console.log('drain 执行了,可以继续写入数据了')
executeWrite()
})
Node.js的stream已实现了背压机制
硬盘的写入速度,远远小于硬盘的读取速度。如果可读流太快,而可写流的无法迅速的消费可读流传输的数据,写入流将会把 chunk,push 到写队列中方便之后使用,这样就会造成数据在内存中的累积。这个时候将会触发 backpressur(背压) 机制。如果没有 backpressur(背压) 机制,系统将会出现如下的问题:

在代码中调用pipe时,它会向可写流发出信号,表示有数据准备传输。当我们的可写流使用 write() 写入数据时,如果写队列繁忙,或者内部缓存区已经溢出了,write() 将会返回false。
这个时候,背压机制就会启动,它会暂停任何数据传入到可写流中,并等待可写流准备好,清空内部缓存区后。将会发出 drain 事件,并恢复可读流的传输。
这就意味着 pipe 只会使用固定大小的内存,不会存在内存泄漏的问题。

let fs = require('fs')
let rs = fs.createReadStream('test.txt', {
highWaterMark: 4
})
let ws = fs.createWriteStream('test1.txt', {
highWaterMark: 1
})
let flag = true
// 文件可读流读取数据
rs.on('data', (chunk) => {
// 文件可写流写入数据
flag = ws.write(chunk, () => {
console.log('写完了')
})
if (!flag) {
// 开启暂停模式
rs.pause()
}
})
ws.on('drain', () => {
// 文件有新的空间可以接纳数据了
rs.resume() // 开启流动模式
})
上面代码就相当于
let fs = require('fs')
let rs = fs.createReadStream('test.txt', {
highWaterMark: 4
})
let ws = fs.createWriteStream('test1.txt', {
highWaterMark: 1
})
rs.pipe(ws) // 开启背压机制
const fs = require('fs')
const EventEmitter = require('events')
class MyFileReadStream extends EventEmitter{
constructor(path, options = {}) {
super()
this.path = path
this.flags = options.flags || "r"
this.mode = options.mode || 438
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.end = options.start
this.highWaterMark = options.highWaterMark || 64 * 1024
this.open()
}
open() {
// 原生 open 方法来打开指定位置上的文件
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
this.emit('error', err)
}
this.fd = fd
this.emit('open', fd)
})
}
}
let rs = new MyFileReadStream('test.txt')
rs.on('open', (fd) => {
console.log('open', fd)
})
rs.on('error', (err) => {
console.log(err)
})
const fs = require('fs')
const EventEmitter = require('events')
class MyFileReadStream extends EventEmitter{
constructor(path, options = {}) {
super()
this.path = path
this.flags = options.flags || "r"
this.mode = options.mode || 438
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.end = options.start
this.highWaterMark = options.highWaterMark || 64 * 1024
this.readOffset = 0
this.open()
this.on('newListener', (type) => {
if (type === 'data') {
this.read()
}
})
}
open() {
// 原生 open 方法来打开指定位置上的文件
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
this.emit('error', err)
}
this.fd = fd
this.emit('open', fd)
})
}
read() {
if (typeof this.fd !== 'number') {
return this.once('open', this.read)
}
let buf = Buffer.alloc(this.highWaterMark)
fs.read(this.fd, buf, 0, this.highWaterMark, this.readOffset, (err, readBytes) => {
if (readBytes) {
this.readOffset += readBytes
this.emit('data', buf)
this.read()
} else {
this.emit('end')
this.close()
}
})
}
close() {
fs.close(this.fd, () => {
this.emit('close')
})
}
}
let rs = new MyFileReadStream('test.txt')
/*rs.on('open', (fd) => {
console.log('open', fd)
})
rs.on('error', (err) => {
console.log(err)
}) */
rs.on('data', (chunk) => {
console.log(chunk)
})
rs.on('close', () => {
console.log('close')
})
rs.on('end', () => {
console.log('end')
})
const fs = require('fs')
const EventEmitter = require('events')
class MyFileReadStream extends EventEmitter{
constructor(path, options = {}) {
super()
this.path = path
this.flags = options.flags || "r"
this.mode = options.mode || 438
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.end = options.end
this.highWaterMark = options.highWaterMark || 64 * 1024
this.readOffset = 0
this.open()
this.on('newListener', (type) => {
if (type === 'data') {
this.read()
}
})
}
open() {
// 原生 open 方法来打开指定位置上的文件
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
this.emit('error', err)
}
this.fd = fd
this.emit('open', fd)
})
}
read() {
if (typeof this.fd !== 'number') {
return this.once('open', this.read)
}
let buf = Buffer.alloc(this.highWaterMark)
let howMuchToRead
/* if (this.end) {
howMuchToRead = Math.min(this.end - this.readOffset + 1, this.highWaterMark)
} else {
howMuchToRead = this.highWaterMark
} */
howMuchToRead = this.end ? Math.min(this.end - this.readOffset + 1, this.highWaterMark) : this.highWaterMark
fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => {
if (readBytes) {
this.readOffset += readBytes
this.emit('data', buf.slice(0, readBytes))
this.read()
} else {
this.emit('end')
this.close()
}
})
}
close() {
fs.close(this.fd, () => {
this.emit('close')
})
}
}
let rs = new MyFileReadStream('test.txt', {
end: 7,
highWaterMark: 3
})
rs.on('data', (chunk) => {
console.log(chunk)
})