博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
node.js中stream流中可读流和可写流的使用
阅读量:6330 次
发布时间:2019-06-22

本文共 6143 字,大约阅读时间需要 20 分钟。

node.js中的流 stream 是处理流式数据的抽象接口。node.js 提供了很多流对象,像http中的request和response,和 process.stdout 都是流的实例。

流可以是 可读的,可写的,或是可读可写的。所有流都是 events 的实例。

 

一、流的类型

node.js中有四种基本流类型:

1、Writable 可写流 (例:fs.createWriteStream() )

2、Readable 可读流 (例:fs.createReadStream() )

3、Duplex 可读又可写流 (例:net.Socket ) 

4、Transform 读写过程中可修改或转换数据的 Duplex 流 (例:zlib.createDeflate() )

 

二、流中的数据有两种模式

1、二进制模式,都是 string字符串  和 Buffer。

2、对象模式,流内部处理的是一系统普通对象。

 

三、可读流的两种模式

1、流动模式 ( flowing ) ,数据自动从系统底层读取,并通过事件,尽可能快地提供给应用程序。

2、暂停模式 ( paused ),必须显式的调用 read() 读取数据。

可读流 都开始于暂停模式,可以通过如下方法切换到流动模式:

1、添加 'data' 事件回调。

2、调用 resume()。

3、调用 pipe()。

可读流通过如下方法切换回暂停模式:

1、如果没有管道目标,调用 pause()。

2、如果有管道目标,移除所有管道目标,调用 unpipe() 移除多个管道目标。

 

四、创建可读流,并监听事件

const fs = require('fs');//创建一个文件可读流let rs = fs.createReadStream('./1.txt', {    //文件系统标志    flags: 'r',    //数据编码,如果调置了该参数,则读取的数据会自动解析    //如果没调置,则读取的数据会是 Buffer    //也可以通过 rs.setEncoding() 进行设置    encoding: 'utf8',    //文件描述符,默认为null    fd: null,    //文件权限    mode: 0o666,    //文件读取的开始位置    start: 0,    //文件读取的结束位置(包括结束位置)    end: Infinity,    //读取缓冲区的大小,默认64K    highWaterMark: 3});//文件被打开时触发rs.on('open', function () {    console.log('文件打开');});//监听data事件,会让当前流切换到流动模式//当流中将数据传给消费者后触发//由于我们在上面配置了 highWaterMark 为 3字节,所以下面会打印多次。rs.on('data', function (data) {    console.log(data);});//流中没有数据可供消费者时触发rs.on('end', function () {    console.log('数据读取完毕');});//读取数据出错时触发rs.on('error', function () {    console.log('读取错误');});//当文件被关闭时触发rs.on('close', function () {    console.log('文件关闭');});

注意,'open' 和 'close' 事件并不是所有流都会触发。

当们监听'data'事件后,系统会尽可能快的读取出数据。但有时候,我们需要暂停一下流的读取,操作其他事情。

这时候就需要用到 pause() 和 resume() 方法。

const fs = require('fs');//创建一个文件可读流let rs = fs.createReadStream('./1.txt', {    highWaterMark: 3});rs.on('data', function (data) {    console.log(`读取了 ${data.length} 字节数据 : ${data.toString()}`);    //使流动模式的流停止触发'data'事件,切换出流动模式,数据都会保留在内部缓存中。    rs.pause();    //等待3秒后,再恢复触发'data'事件,将流切换回流动模式。    setTimeout(function () {        rs.resume();    }, 3000);});

可读流的 'readable' 事件,当流中有数据可供读取时就触发。

注意当监听 'readable' 事件后,会导致流停止流动,需调用 read() 方法读取数据。

注意 on('data'),on('readable'),pipe() 不要混合使用,会导致不明确的行为。

const fs = require('fs');let rs = fs.createReadStream('./1.txt', {    highWaterMark: 1});//当流中有数据可供读取时就触发rs.on('readable', function () {    let data;    //循环读取数据    //参数表示要读取的字节数    //如果可读的数据不足字节数,则返回缓冲区剩余数据    //如是没有指定字节数,则返回缓冲区中所有数据    while (data = rs.read()) {        console.log(`读取到 ${data.length} 字节数据`);        console.log(data.toString());    }});

 

五、创建可写流,并监听事件

const fs = require('fs');//创建一个文件可写流let ws = fs.createWriteStream('./1.txt', {    highWaterMark: 3});//往流中写入数据//参数一表示要写入的数据//参数二表示编码方式//参数三表示写入成功的回调//缓冲区满时返回false,未满时返回true。//由于上面我们设置的缓冲区大小为 3字节,所以到写入第3个时,就返回了false。console.log(ws.write('1', 'utf8'));console.log(ws.write('2', 'utf8'));console.log(ws.write('3', 'utf8'));console.log(ws.write('4', 'utf8'));function writeData() {    let cnt = 9;    return function () {        let flag = true;        while (cnt && flag) {            flag = ws.write(`${cnt}`);            console.log('缓冲区中写入的字节数', ws.writableLength);            cnt--;        }    };}let wd = writeData();wd();//当缓冲区中的数据满的时候,应停止写入数据,//一旦缓冲区中的数据写入文件了,并清空了,则会触发 'drain' 事件,告诉生产者可以继续写数据了。ws.on('drain', function () {    console.log('可以继续写数据了');    console.log('缓冲区中写入的字节数', ws.writableLength);    wd();});//当流或底层资源关闭时触发ws.on('close', function () {    console.log('文件被关闭');});//当写入数据出错时触发ws.on('error', function () {    console.log('写入数据错误');});

写入流的 end() 方法 和 'finish' 事件监听

const fs = require('fs');//创建一个文件可写流let ws = fs.createWriteStream('./1.txt', {    highWaterMark: 3});//往流中写入数据//参数一表示要写入的数据//参数二表示编码方式//参数三表示写入成功的回调//缓冲区满时返回false,未满时返回true。//由于上面我们设置的缓冲区大小为 3字节,所以到写入第3个时,就返回了false。console.log(ws.write('1', 'utf8'));console.log(ws.write('2', 'utf8'));console.log(ws.write('3', 'utf8'));console.log(ws.write('4', 'utf8'));//调用end()表明已经没有数据要被写入,在关闭流之前再写一块数据。//如果传入了回调函数,则将作为 'finish' 事件的回调函数ws.end('最后一点数据', 'utf8');//调用 end() 且缓冲区数据都已传给底层系统时触发ws.on('finish', function () {    console.log('写入完成');});

写入流的 cork() 和 uncork() 方法,主要是为了解决大量小块数据写入时,内部缓冲可能失效,导致的性能下降。

const fs = require('fs');let ws = fs.createWriteStream('./1.txt', {    highWaterMark: 1});//调用 cork() 后,会强制把所有写入的数据缓冲到内存中。//不会因为写入的数据超过了 highWaterMark 的设置而写入到文件中。ws.cork();ws.write('1');console.log(ws.writableLength);ws.write('2');console.log(ws.writableLength);ws.write('3');console.log(ws.writableLength);//将调用 cork() 后的缓冲数据都输出到目标,也就是写入文件中。ws.uncork();

注意 cork() 的调用次数要与 uncork() 一致。

const fs = require('fs');let ws = fs.createWriteStream('./1.txt', {    highWaterMark: 1});//调用一次 cork() 就应该写一次 uncork(),两者要一一对应。ws.cork();ws.write('4');ws.write('5');ws.cork();ws.write('6');process.nextTick(function () {    //注意这里只调用了一次 uncork()    ws.uncork();    //只有调用同样次数的 uncork() 数据才会被输出。    ws.uncork();});

 

 六、可读流的 pipe() 方法

pipe() 方法类似下面的代码,在可读流与可写流之前架起一座桥梁。

const fs = require('fs');//创建一个可读流let rs = fs.createReadStream('./1.txt', {    highWaterMark: 3});//创建一个可写流let ws = fs.createWriteStream('./2.txt', {    highWaterMark: 3});rs.on('data', function (data) {    let flag = ws.write(data);    console.log(`往可写流中写入 ${data.length} 字节数据`);    //如果写入缓冲区已满,则暂停可读流的读取    if (!flag) {        rs.pause();        console.log('暂停可读流');    }});//监控可读流数据是否读完rs.on('end', function () {    console.log('数据已读完');    //如果可读流读完了,则调用 end() 表示可写流已写入完成    ws.end();});//如果可写流缓冲区已清空,可以再次写入,则重新打开可读流ws.on('drain', function () {    rs.resume();    console.log('重新开启可读流');});

我们用 pipe() 方法完成上面的功能。

const fs = require('fs');//创建一个可读流let rs = fs.createReadStream('./1.txt', {    highWaterMark: 3});//创建一个可写流let ws = fs.createWriteStream('./2.txt', {    highWaterMark: 3});let ws2 = fs.createWriteStream('./3.txt', {    highWaterMark: 3});//绑定可写流到可读流,自动将可读流切换到流动模式,将可读流的所有数据推送到可写流。rs.pipe(ws);//可以绑定多个可写流rs.pipe(ws2);

我们也可以用 unpipe() 手动的解绑可写流。

const fs = require('fs');//创建一个可读流let rs = fs.createReadStream('./1.txt', {    highWaterMark: 3});//创建一个可写流let ws = fs.createWriteStream('./2.txt', {    highWaterMark: 3});let ws2 = fs.createWriteStream('./3.txt', {    highWaterMark: 3});rs.pipe(ws);rs.pipe(ws2);//解绑可写流,如果参数没写,则解绑所有管道setTimeout(function () {    rs.unpipe(ws2);}, 0);

  

转载于:https://www.cnblogs.com/jkko123/p/10236725.html

你可能感兴趣的文章
安徽现首套被动房 可自主“呼吸”
查看>>
冬训成果何在?林丹无缘新赛季首冠状态成迷
查看>>
一个连区块链是什么都不知道的财经“专家”也敢谈比特币是泡沫?
查看>>
程序员面试被要求手写代码,你与顶级程序员的差别在哪?
查看>>
JavaScript是如何工作的: CSS 和 JS 动画底层原理及如何优化它们的性能
查看>>
Async/Await替代Promise的6个理由
查看>>
谁用光了磁盘?Docker System命令详解
查看>>
Android App性能优化[译]
查看>>
自然语言处理的语义建模介绍
查看>>
菜鸟学数据库——WAL模式及其原理
查看>>
微信小程序有旋转动画效果的音乐组件
查看>>
更大的块和更智能的合同:比特币现金的下一个叉子是什么?
查看>>
NODE Stream流总结(2)
查看>>
2018深圳云栖拉开帷幕,飞天技术汇五大专场邀你参加~
查看>>
聊聊sentinel的DataSource
查看>>
Maven的聚合模块和继承
查看>>
ELK的心脏,ElasticSearch学习方法论
查看>>
认识微服务
查看>>
Python3入门与实践(四): 面向对象
查看>>
OpenCV 入门
查看>>