最新公告
  • 欢迎您光临起源地模板网,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!立即加入钻石VIP
  • 浅谈Nodejs中的流

    正文概述 掘金(头号前端)   2021-02-09   349

    概述

    流(Stream)在NodeJs中是个十分基础的概念,很多基础模块都是基于流实现的,扮演着十分重要的角色。流是一个十分难以理解的概念,同时相关的文档又比较缺少,对于NodeJs初学者来说,理解流往往需要花很多时间理解,才能真正掌握这个概念,所幸的是,对于大部分NodeJs使用者来说,仅仅是用来开发Web应用,对流的不充分认识并不影响使用。但是,理解流能够对NodeJs中的其他模块有更好的理解,同时在某些情况下,使用流来处理数据会有更好的效果。

    考虑使用NodeJs处理以下这个场景:

    没了解NodeJs的流之前,大家可能都会想到以下方案:

    以上方法其实有比较多的问题:

    • 效率很低,多了读写本地文件的io;

    • 容易造成用内存溢出;

    流其实能够很好解决像上述这类型的问题,如果大家能够用流的思维去思考,就能很好地解决这个问题。

    什么是流?

    流的理解

    以下是官方定义: 流(stream)是 Node.js 中处理流式数据的抽象接口

    官方这个定义其实有点难理解,这里的流其实可以分两个角度理解。

    • 对于一般开发者来说,流其实可以认为是一种数据集合,可以认为是跟数组一样,只需要关心获取(消费)以及写入(生产)数据就可以了。

    • 对于流开发者来说(使用stream模块创建一个新实例),关注的是如何实现这个抽象接口,通常需要关注两点,资源目标是什么以及如何操作数据。一旦确定资源目标之后,就需要根据流的状态以及资源目标的状态做相应的操作。

    流的类型

    流的类型有4种:

    • 可读流(Readable)

    • 可写流(Writable)

    • 双工流(Duplex)

    • 转换流 (Transform)

    其中可读流以及可写流是基础,理解这两个流之后,就比较容易裂解另外两个流了。

    可读流

    可读流是对提供数据的来源的一种抽象。

    可读流的实现

    所谓的实现可读流就是实现 _read 方法, _read 方法定义了从哪里用怎样的方式把数据push到缓冲池中,这里要注意的是,push的数据并不是直接给到消费者消费的,而是首先经过一个缓冲池,当缓冲池满了的时候(超出highWaterMark),会产生“背压”,调用push的时候返回false。

    class MyReadable extends Readable {
        /**
         * @param size 指定读取的大小
         */
        _read(size?:number) {
            const data = getDataFromSomeWhere();
            // console.log('source:'+data)
           const res= this.push(data || null); 
           // res will be false if over highWaterMark
           //  push a null data means  finished
        }
    }
    
    可读流模式

    可读流有读取两种模式:流动模式(flowing)或暂停模式(paused)。

    暂停模式

    浅谈Nodejs中的流

    暂停模式下,一个可读流读创建时,模式是暂停模式,创建后会自动调用_read方法,把数据从数据源push到缓冲池中,直到缓冲池中的数据达到了浮标值。每当数据到达浮标值时,可读流会触发一个 " readable " 事件,告诉消费者有数据已经准备好了,可以继续消费。

    一般来说, 'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。所以,数据源的数据被读完前,也会触发一次'readable'事件;

    消费者 " readable " 事件的句柄中,通过 stream.read(size) 主动获取获取缓冲池中的数据。

    demo:

    import { Readable } from 'stream';
    class Source {
        private data :string[] =Array(10).fill('byte');
        public getData() {
            return this.data.pop()
        }
    }
    class MyReadable extends Readable {
        source=new Source()
        _read() {
            const data=this.source.getData()
            // console.log('source:'+data)
            this.push(data||null);
        }
    }
    const myReadable = new MyReadable();
    myReadable.setEncoding('utf-8')
    myReadable.on('readable',()=>{
        console.log('readable')
        let data=''
        while(data=myReadable.read()){
            console.log(data)
        }
    })
    myReadable.on('end',()=>{
        console.log('end')
    })
    
    流动模式

    浅谈Nodejs中的流

    所有可读流开始的时候都是暂停模式,可以通过以下方法可以切换至流动模式:

    • 添加 " data " 事件句柄;

    • 调用 “ resume ”方法;

    • 使用 " pipe " 方法把数据发送到可写流

    流动模式下,缓冲池里面的数据会自动输出到消费端进行消费,同时,每次输出数据后,会自动回调_read方法,把数据源的数据放到缓冲池中;直到流动模式切换至其他暂停模式,或者数据源的数据被读取完了( push(null) );

    可读流可以通过以下方式切换回暂停模式:

    • 如果没有管道目标,则调用 stream.pause()

    • 如果有管道目标,则移除所有管道目标。调用 stream.unpipe() 可以移除多个管道目标。

    demo:

    import { Readable } from 'stream';
    class Source {
        private data: string[] = Array(10).fill('byte');
        public getData() {
            return this.data.pop();
        }
    }
    class MyReadable extends Readable {
        source = new Source();
        /**
         * 
         * @param size 指定读取的大小
         */
        _read(size?:number) {
            const data = this.source.getData();
            // console.log('source:'+data)
            this.push(data || null);
        }
    }
    const myReadable = new MyReadable();
    myReadable.setEncoding('utf-8');
    myReadable
        .on('data', chunk => {
            console.log(chunk);
        })
        .on('end', () => {
            console.log('end');
        });
    
    
    两种模式的区别

    这里的模式其实对消费者来说的,采用哪一种模式消费从流种读取的数据。两者的区别是:

    • 在流动模式中,数据自动从底层系统读取,并通过 EventEmitter 接口的事件尽可能快地被提供给应用程序。

    • 在暂停模式中,必须显式调用 stream.read() 读取数据块。

    pipe

    可读流对象有个 pipe 方法,pipe方法中接受一个可写流为参数,返回的也是这个目标可写流。使用流pipe方法,可读流会自动切换至流动模式。

    可写流

    可写流是对数据要被写入的目的地的一种抽象。 所谓实现可写流,就是实现可写流的_write方法,就是当生产者把数据输入时,怎么把这些数据写到目的地中。

    import {Writable} from 'stream'
    class MyWritable extends Writable{
        /**
         * @param str 每次写入的数据
         * @param encoding 编码
         * @param cb 回调,每次写入数据成功需要回调这个参数,如果不回调,不会自动清空缓冲队列。
         */
        _write(str:string,encoding:string,cb:(err?:any)=>void){
            console.log(str)
            console.log(encoding)
            cb()
        }
    }
    

    相对可读流来说,可写流要简单一些。 浅谈Nodejs中的流 当生产者调用write(chunk)时,内部会根据一些状态(corked,writing等)选择是否缓存到缓冲队列中或者调用_write,每次写完数据后,会尝试清空缓存队列中的数据。如果缓冲队列中的数据大小超出了浮标值(highWaterMark),消费者调用write(chunk)后会返回false,这时候生产者应该停止继续写入。

    那么什么时候可以继续写入呢?当缓冲中的数据都被成功_write之后,清空了缓冲队列后会触发 'drain' 事件,这时候生产者可以继续写入数据。

    当生产者需要结束写入数据时,需要调用 stream.end 方法通知可写流结束。

    demo

    import {Writable} from 'stream'
    class MyWritable extends Writable{
        /**
         * 
         * @param str 每次写入的数据
         * @param encoding 编码
         * @param cb 回调,每次写入数据成功需要回调这个参数,如果不回调,不会自动清空缓冲队列。
         */
        _write(str:Buffer,encoding:string,cb:(err?:any)=>void){
            console.log(str.toString('utf8'))
            console.log(encoding)
            cb()
        }
    }
    const myWritable = new MyWritable();
    
    console.log(myWritable.write('hello world','utf8')) 
    console.log( myWritable.write('hello world1','utf8'))
    

    双工流

    在理解了可读流与可写流后,双工流就好理解了,双工流事实上是同时实现流可写流与可读流。 浅谈Nodejs中的流 这里需要注意的是,双工流中,独立维护两个缓冲区,同时可读可写流的数据源不一定是同一个目标。

    demo

    import { Duplex } from 'stream';
    class MyDuplex extends Duplex {
        readEnd = false;
        _read() {
            if (!this.readEnd) {
                this.push('read');
                this.readEnd = true;
            }
            this.push(null);
        }
        _write(chunk, encoding, callback) {
            console.log(chunk.toString('utf8'));
            callback();
        }
    }
    const duplex = new MyDuplex();
    duplex
        .on('data', chunk => {
            console.log(chunk.toString());
        })
        .on('end', () => {
            console.log('read end');
        });
    duplex.write('write');
    
    

    转换流

    与双工流一样,转换流(Transform)同时实现可写流(_write)与可读流(_read),只不过,这里的数据流输入与输出有关联的。 浅谈Nodejs中的流 这里要注意的是,这里的输入输出虽然是有关联,但是并不一定是一一对应关系,或者不一定是同步的。 比如,在压缩的时候,有可能对多个写入压缩成一个chunk,这时候指提供一个压缩后的chunk给消费者消费。 另外,也有可能是一次写入会被输出多个chunk,比如转换的时候,把输入的1kb数据,放大至1M多次push到缓冲中,如果缓冲池超出了浮标值,就会造成“背压”。

    有机智的同学可能会发现,这个转换流既是可写流同时又是可读流,是不是可以结合上面提到的可读流的pipe方法,是不是可以进行无限的管道接驳? 确实是这样的,转换流以及双工流都可以进行管道的接驳。

    readabl
        .pipe(tranform1)
        .pipe(tranform2)
        .pipe(tranformN)
    

    一些常用事件与方法

    流内部维护了一些列状态,不同的流在不同的阶段会发生状态改变, 由于流都是继承于 EventEmiter,当状态发生改变时,会触发相关的事件。

    可读流可写流
    事件- data- drain实例方法- read()- write()需要实现的方法_read_write

    NodeJs内置流

    NodeJs中很多,模块是都是使用流实现的。

    readablewritableduplextransform
    - Http client response- Http client requestTcp socketcrypto

    有哪些应用?

    • 流式渲染chunk传输

    demo

    import { createServer } from 'http';
    import { Readable } from "stream";
    const source=[
        `
        <!DOCTYPE html>
        <html lang="en">
        <head>
            <meta charset="UTF-8">
            <meta name="viewport" content="width=device-width, initial-scale=1.0">
            <meta http-equiv="X-UA-Compatible" content="ie=edge">
            <title>Test</title>
            <script src="https://unpkg.com/react-dom@16.13.1/umd/react-dom.production.min.js"></script>
        </head>
        `,
        `
        <body>
          <div>chunk1</div>
                `,
        `  <div>chunk2</div>
        </body>`,
        `</html>`
    ]
    function getRS(){
        let hasRead=false;
        const dataSource=[...source]
        return new Readable({
            read(){
                if(hasRead){
                    return
                }
                hasRead=true
                dataSource.forEach((d,index)=>{
                    setTimeout(() => {
                        if(index===dataSource.length-1){
                            this.push(null)
                        }else{
                            this.push(d)
                        }
                    }, index*1000);
                })
            }
        })
         
    }
    
    
    const server = createServer((req, res) => {
        let reqData;
        const readable= getRS();
        req.on('data', chunk => {
            reqData += chunk;
        });
        req.on('end', () => {
            console.log('req end')
           
            res.statusCode=200
            readable.pipe(res);
            readable.on('end',()=>{
                console.log('end')
            })
        });
      
    });
    server.listen(3000);
    
    
    • 大文件读取

    • 文件上传下载

    • 数据的加解密

    • 数据压解

    流有哪些不足?

    • 文档不够友好

    由于Stream模块比较难理解,很多程度上是因为NodeJs这块不够友好,官网上仅仅是Api文档,重要知识点 比较零碎,不成系统,对于一些细节的实现,往往需要看源码才能弄清楚,这个对新手来说不太友好。

    • 对流的组合不够强大

    目前可读流只有pipe方法,不够强大,某些场景下需要手动做比较多的额外工作,比如,想要从两个流中消费到以一个地方。熟悉RX的同学可能知道,Observable对象之间的关系可能有很多种,Rx有相关的方法去组合不同的Observable,目前社区上也有相关的 工具 ,可以把Stream转为Rx对象,方法处理数据。

    • 使用不当可能存在性能问题

    由于流内部会维护一个缓存区,虽然缓冲区默认大小为16Kb,但是某些情况下会自动扩容,如果流没有及时消费或者没有被终止,会有OOM的风险。 另外,流的读写数据是分多次的,如果往数据源中读写是有额外开销的话,这部分开销会随着读写次数的线性增加。

    总结

    本文尝试从宏观一点介绍NodeJs中的流,让大家对流有个系统点的认识,还有很多细节没有讲解,有兴趣的话,建议可以看看NodeJs中关于流的源码。


    起源地下载网 » 浅谈Nodejs中的流

    常见问题FAQ

    免费下载或者VIP会员专享资源能否直接商用?
    本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
    提示下载完但解压或打开不了?
    最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,若小于网盘提示的容量则是这个原因。这是浏览器下载的bug,建议用百度网盘软件或迅雷下载。若排除这种情况,可在对应资源底部留言,或 联络我们.。
    找不到素材资源介绍文章里的示例图片?
    对于PPT,KEY,Mockups,APP,网页模版等类型的素材,文章内用于介绍的图片通常并不包含在对应可供下载素材包内。这些相关商业图片需另外购买,且本站不负责(也没有办法)找到出处。 同样地一些字体文件也是这种情况,但部分素材会在素材包内有一份字体下载链接清单。
    模板不会安装或需要功能定制以及二次开发?
    请QQ联系我们

    发表评论

    还没有评论,快来抢沙发吧!

    如需帝国cms功能定制以及二次开发请联系我们

    联系作者

    请选择支付方式

    ×
    迅虎支付宝
    迅虎微信
    支付宝当面付
    余额支付
    ×
    微信扫码支付 0 元