聊聊Node.js中如何實現Stream流(可讀、可寫、雙工和轉換流)_第1頁
聊聊Node.js中如何實現Stream流(可讀、可寫、雙工和轉換流)_第2頁
聊聊Node.js中如何實現Stream流(可讀、可寫、雙工和轉換流)_第3頁
聊聊Node.js中如何實現Stream流(可讀、可寫、雙工和轉換流)_第4頁
聊聊Node.js中如何實現Stream流(可讀、可寫、雙工和轉換流)_第5頁
已閱讀5頁,還剩11頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

第聊聊Node.js中如何實現Stream流(可讀、可寫、雙工和轉換流)ReadableStream

可讀流有兩個模式,暫停模式與流動模式。當我們創(chuàng)建一個流時,如果我們監(jiān)聽了readable事件,它就會來到暫停模式,在暫停模式下,它會不斷的讀取數據到緩沖區(qū),當讀取到的數據超過預設的大小時,它由屬性highWaterMark指定(默認為64kB),便會觸發(fā)readable事件,readable事件的觸發(fā)有兩種情況:

緩存區(qū)中的數據達到highWaterMark預設的大小

數據源的數據已經被讀取完畢

constfs=require(fs

constrs=fs.createReadStream(a.txt,{

highWaterMark:1//緩存區(qū)最多存儲1字節(jié)

rs.on(readable,()={

letdata;

while(data=rs.read()){

console.log(data.toString());

})

上面的程序設置highWaterMark為1,即每次讀取到一個字節(jié)便會觸發(fā)readable命令,每次當觸發(fā)readable命令時,我們調用可讀流的read([size])方法從緩沖區(qū)中讀取數據(讀取到的數據為Buffer),然后打印到控制臺。

當我們?yōu)榭勺x流綁定data事件時,可讀流便會切換到流動狀態(tài),當位于流動狀態(tài)時,可讀流會自動的從文件中讀取內容到緩沖區(qū),當緩沖區(qū)中的內容大于設定的highWaterMark的大小時,便會觸發(fā)data事件,將緩沖區(qū)中的數據傳遞給data事件綁定的函數。以上過程會自動不斷進行。當文件中的所有內容都被讀取完成時,那么就會觸發(fā)end事件。

constfs=require(fs

constrs=fs.createReadStream(a.txt,{

highWaterMark:2

rs.on(data,data={

console.log(data.toString());

rs.on(end,()={

console.log(文件讀取完畢!

});

暫停模式像是手動步槍,而流動模式則像是自動步槍。暫停模式與流動模式也可以相互切換,通過pause()可以從流動狀態(tài)切換到暫停狀態(tài),通過resume()則可以從暫停模式切換到流動模式。

可讀流的一個經典實例就是http中的請求對象req,下面的程序展示了通過監(jiān)聽req的data事件來讀取HTTP請求體中的內容

consthttp=require(http

constapp=http.createServer();

app.on(request,(req,res)={

letdatas=[];

req.on(data,data={

datas.push(data);

req.on(end,()={

req.body=Buffer.concat(datas);

//當讀取完body中的內容之后,將內容返回給客戶端

res.end(req.body);

app.listen(3000,()={

console.log(服務啟動在3000端口......

})

WritableStream

可寫流與可讀流相似,當我們向可寫流寫入數據時(通過可寫流的write()方法寫數據),會直接將數據寫入到文件中,如果寫入的數據比較慢的話,那就就會將數據寫入到緩沖區(qū),當緩沖區(qū)中的內容達到highWaterMark設定的大小時,write方法就會返回一個false,表明不能接受更多的數據了。

當緩沖區(qū)中的數據全部被消費完了(寫入了文件中或者被別的流消費了),那么就會觸發(fā)drAIn事件。

constfs=require(fs

constws=fs.createWriteStream(b.txt,{

highWaterMark:16*1024

functionwriteMillionTimes(writer,data,encoding,callback){

leti=10000;

write();

functionwrite(){

//表示是否可以向可寫流中寫入數據

letok=true;

while(i--0ok){

//當writer.write()方法返回false表示不可寫入數據

ok=writer.write(data,encoding,i===0callback:null);

if(i0){

//說明ok為false,即不能向緩沖區(qū)中寫入內容了

console.log(drain,i);

//監(jiān)聽drain事件,當隊列消費完畢時繼續(xù)調用write()方法寫入

writer.once(drain,write);

writeMillionTimes(ws,simple,utf-8,()={

console.log(end

})

輸出為

drain7268

drain4536

drain1804

end

說明有三次緩沖區(qū)中的內容達到了16KB,可以驗算上面的數字之間的差值,在乘以6(simple的字節(jié)數),大小大約為16*1024左右,如

(7268-4536)*6=16392\approx16384=16*1024

我們還可以調用可寫流的end()方法,表示將緩存中的內容清空寫入文件,并關閉文件,此時會觸發(fā)close事件

constfs=require(fs

constws=fs.createWriteStream(b.txt

ws.write(Hello

ws.write(World

ws.end(!

ws.on(close,()={

console.log(close//close

})

當調用end()方法之后就不能調用write()方法了,否則會報錯

constfs=require(fs

constws=fs.createWriteStream(b.txt

ws.write(Hello

ws.write(World

ws.end(!

ws.write(writeagain//Error[ERR_STREAM_WRITE_AFTER_END]:writeafterend

當調用end()方法之后,并且數據緩沖區(qū)中的數據已經寫入之后會觸發(fā)可寫流的finish事件

constfs=require(fs

constws=fs.createWriteStream(b.txt

ws.write(Hello

ws.write(World

ws.end(!

ws.on(close,()={

console.log(close

ws.on(finish,()={

console.log(finish

});

打印結果是

finish

close

說明finish事件會在close事件之前被觸發(fā)。

可寫流的經典例子就是http模塊的響應對象res,下面的程序演示了當請求到來時,我們讀取一個html頁面返回給客戶端

consthttp=require(http

constfs=require(fs

constapp=http.createServer();

app.on(request,(req,res)={

constrs=fs.createReadStream(index.html

rs.on(data,data={

res.write(data);

rs.on(end,()={

res.end()

app.listen(3000,()={

console.log(服務啟動在3000端口......

})

DuplexStream與TransformStream

Duplex,即雙工的意思,它既可以接收數據,也可以輸出數據,它的輸入和輸出之間可以沒有任何的關系,就像是一個部件內部有兩個獨立的系統(tǒng)。Duplex繼承了可讀流(Readable),并且擁有可寫流(Writable)的所有方法。

TransformStream繼承了DuplexStream,它同樣具有可讀流與可寫流的能力,并且它的輸出與輸入之間是有關系的,中間做了一次轉換。常見的轉換流有zlib,crypto。

出于文章結構的考慮,在這里不詳細講解這兩個流,在后文中會實現這兩個流,以加深對這兩個流的理解。

我們可以混合使用可讀流與可寫流來進行文件的復制

constfs=require(fs

functioncopy(source,target){

constrs=fs.createReadStream(source);

constws=fs.createWriteStream(target);

rs.on(data,data={

ws.write(data);

rs.on(end,()={

ws.end();

copy(a.txt,b.txt

但是上面的寫法卻不被建議使用,因為沒有考慮到可讀流與可寫流速度之間的差異,如果可讀流輸出數據的速度大于可寫流寫入數據的速度,這個時候就會有數據一直堆壓在緩存區(qū),導致占用過高的內存,專業(yè)術語叫做積壓。

我們需要改善上面的程序,具體做法就是當write()方法返回false時,我們切換可讀流的模式為暫停模式,當可寫流觸發(fā)了drain事件時,我們便將可讀流的狀態(tài)切換為流動模式

constfs=require(fs

functioncopy(source,target){

constrs=fs.createReadStream(source);

constws=fs.createWriteStream(target);

rs.on(data,data={

if(!ws.write(data)){

rs.pause();

rs.on(end,()={

ws.end();

ws.on(drain,()={

rs.resume();

}

那是不是每次我們使用流都需要寫這么多的代碼,當然不是。官方為可讀流提供了一個pipe(ws)方法,pipe方法接收一個可寫流,它的作用就是將可讀流中數據寫入到可寫流中去,并且它內部有做速度差異的處理。所以上面的寫法可以改為下面的版本

constfs=require(fs

functioncopy(source,target){

constrs=fs.createReadStream(source);

constws=fs.createWriteStream(target);

rs.pipe(ws);

}

當我們調用pipe方法時,會觸發(fā)可寫流的pipe事件。pipe的實現參考如下

Rtotype.pipe=function(ws){

this.on(data,data={

if(!ws.write(data)){

this.pause();

ws.on(drain,()={

this.resume();

//觸發(fā)pipe事件

ws.emit(pipe,this);

//返回可寫流,以支持鏈式調用

returnws;

}

這里給出官網畫的一個有關pipe的流程圖

+===================+

x--Pipingfunctions+--src.pipe(dest)|

xaresetupduring|===================|

xthe.pipemethod.|Eventcallbacks|

+===============+x|-------------------|

|YourData|xTheyexistoutside|.on(close,cb)|

+=======+=======+xthedataflow,but|.on(data,cb)|

|ximportantlyattach|.on(drain,cb)|

|xevents,andtheir|.on(unpipe,cb)|

+---------v---------+xrespectivecallbacks.|.on(error,cb)|

|ReadableStream+----+|.on(finish,cb)|

+-^-------^-------^-+||.on(end,cb)|

^|^|+-------------------+

||||

|^||

^^^|+-------------------++=================+

^|^+----WritableStream+---------.write(chunk)|

|||+-------------------++=======+=========+

||||

|^|+------------------v---------+

^|+-if(!chunk)|Isthischunktoobig|

^||emit.end();|Isthequeuebusy|

||+-else+-------+----------------+---+

|^|emit.write();||

|^^+--v---++---v---+

||^-----------------------------------No||Yes|

^|+------++---v---+

^||

|^emit.pause();+=================+|

|^---------------^-----------------------+returnfalse;-----+---+

|+=================+|

^whenqueueisempty+============+|

^------------^-----------------------Buffering||

||============||

+emit.drain();|^Buffer^||

+emit.resume();+------------+|

|^Buffer^||

+------------+addchunktoqueue|

|---^---------------------

+============+

在本節(jié)中我們來實現具體的流,通過實現流可以進一步加深對Stream內部工作細節(jié)的理解。

實現可讀流

上面我們都是通過fs.createReadableStream()方法來得到一個可讀流的,在這里我們自己實現一個可讀流。實現可讀流只需要繼承Readable類,然后實現_read()方法即可

const{Readable}=require(stream

classIeteratorReadableStreamextendsReadable{

constructor(iterator){

super();

this.iterator=iterator;

_read(){

letdata=this.iterator.next();

//console.log(data);

if(data.done){

this.push(null);

}else{

//必須push字符串或者Buffer

this.push(data.value+

module.exports=IeteratorReadableStream;

上述我們實現了一個可讀流,可讀流接收一個迭代器作為參數,這個迭代器作為這個可讀流的數據源??勺x流會自動的調用_read獲取數據,在_read方法中我們從迭代器中獲取數據,并且調用了push方法,該方法的作用就是將數據放入到緩存區(qū)中,只能向其中push字符串或者Buffer,當我們向其中pushnull時就表示數據已經被全部讀取完畢。

所以可讀流的執(zhí)行邏輯為,每次調用_read方法從數據源讀取數據,并將數據存入緩存區(qū),然后觸發(fā)data事件,將緩存區(qū)中的數據作為參數傳遞給data事件綁定的回調函數,循環(huán)上述過程直到向緩存區(qū)pushnull時,就表示數據源中的數據已經被讀取完畢,此時會觸發(fā)end事件。

我們創(chuàng)建一個迭代器作為數據源傳入

constIeteratorReadableStream=require(./IteratorReadableStream

function*getData(){

for(leti=0;ii++){

yieldi;

letrs=newIeteratorReadableStream(getData());

rs.on(data,data={

console.log(data.toString());

rs.on(end,()={

console.log(迭代結束

});

輸出為

迭代結束

實現可寫流

實現可寫流的過程同實現可讀流的過程類似,首先需要繼承Writable類,接著實現_write方法即可

constfs=require(fs

const{Writable}=require(stream

classFileWritableStreamextendsWritable{

constructor(filepath){

super();

this.filepath=filepath;

_write(chunk,encoding,callback){

fs.appendFile(this.filepath,chunk,{

encoding

},callback)

}

上面我們實現了一個可寫流,這個可寫流接收一個文件路徑作為參數,它的作用就是向這個文件中追加數據,每次當我們調用可寫流的write()方法時,它會向緩沖區(qū)寫入數據,當達到閾值時,便會調用_write()方法將數據新增到文件中。

process.stdin.pipe(newFileWritableStream(c.txt

上面這行代碼的作用就是將從標準輸入的字符輸出到c.txt中。

實現雙工流

DuplexStream既可以作為可讀流,也可以作為可寫流,并且它的輸入與輸出之間可以沒有關系。DuplexStream繼承了Readable,并且擁有Writable的所有,我們只要分別實現_read()和_write()方法即可

const{Duplex}=require(stream

classCustomDuplexStreamextendsDuplex{

constructor(){

super();

this.currentCharCode=65;

_read(){

if(this.currentCharCode=90){

this.push(String.fromCharCode(this.currentCharCode++))

}else{

this.push(null);

_write(chunk,encoding,callback){

console.log(chunk.toString());

callback();

}

上面雙工流的可讀流部分就是將大寫的26個字母添加進了緩存區(qū),而可寫流部分就是直接將數據輸出到控制臺。可見雙工流可讀流與可寫流之間并沒有任何的關系

constdp=newCustomDuplexStream();

dp.write(1

dp.write(2

dp.end();

dp.pipe(process.stdout);

輸出為

ABCDEFGHIJKLMNOPQRSTUVWXYZ

實現轉換流

TranformStream是Duplex的特例,它也是一個雙工流,不過它的輸入和輸出之間有關聯,它的內部通過_transform()方法將可寫流接收到的數據經過轉換后傳入到可讀流中,所以我們要實現轉換流,只需要實現_transform()方法即可

cons

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論