穩定性: 2 - 不穩定
流是一個抽象接口,在 Node 里被不同的對象實現。例如request to an HTTPserver 是流,stdout 是流。流是可讀,可寫,或者可讀寫。所有的流是 EventEmitter 的實例。
你可以通過 require('stream')
加載 Stream 基類。其中包括了 Readable
流、Writable
流、Duplex
流和 Transform
流的基類。
這個文檔分為 3 個章節。第一個章節解釋了在你的程序中使用流時候需要了解的部分。如果你不用實現流式 API,可以只看這個章節。
如果你想實現你自己的流,第二個章節解釋了這部分 API。這些 API 讓你的實現更加簡單。
第三個部分深入的解釋了流是如何工作的,包括一些內部機制和函數,這些內容不要改動,除非你明確知道你要做什么。
流可以是可讀(Readable),可寫(Writable),或者兼具兩者(Duplex,雙工)的。
所有的流都是事件分發器(EventEmitters),但是也有自己的方法和屬性,這取決于他它們是可讀(Readable),可寫(Writable),或者兼具兩者(Duplex,雙工)的。
如果流式可讀寫的,則它實現了下面的所有方法和事件。因此,這個章節 API 完全闡述了Duplex 或 Transform 流,即便他們的實現有所不同。
沒有必要為了消費流而在你的程序里實現流的接口。如果你正在你的程序里實現流接口,請同時參考下面的API for Stream Implementors。
基本所有的 Node 程序,無論多簡單,都會使用到流。這有一個使用流的例子。
javascript
var http = require('http');
var server = http.createServer(function (req, res) {
// req is an http.IncomingMessage, which is 可讀流(Readable stream)
// res is an http.ServerResponse, which is a Writable Stream
var body = '';
// we want to get the data as utf8 strings
// If you don't set an encoding, then you'll get Buffer objects
req.setEncoding('utf8');
// 可讀流(Readable stream) emit 'data' 事件 once a 監聽器(listener) is added
req.on('data', function (chunk) {
body += chunk;
});
// the end 事件 tells you that you have entire body
req.on('end', function () {
try {
var data = JSON.parse(body);
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end('error: ' + er.message);
}
// write back something interesting to the user:
res.write(typeof data);
res.end();
});
});
server.listen(1337);
// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o
可讀流(Readable stream)接口是對你正在讀取的數據的來源的抽象。換句話說,數據來來自
可讀流(Readable stream)不會分發數據,直到你表明準備就緒。
可讀流(Readable stream) 有2種模式: 流動模式(flowing mode) 和 暫停模式(paused mode). 流動模式(flowing mode)時,盡快的從底層系統讀取數據并提供給你的程序。 暫停模式(paused mode)時, 你必須明確的調用 stream.read()
來讀取數據。 暫停模式(paused mode) 是默認模式。
注意: 如果沒有綁定數據處理函數,并且沒有 pipe()
目標,流會切換到流動模式(flowing mode),并且數據會丟失。
可以通過下面幾個方法,將流切換到流動模式(flowing mode)。
可以通過以下方法來切換到暫停模式(paused mode):
pause()
方法.'data'
事件][]處理函數, 調用 unpipe()
方法移除所有的 導流(pipe) 目標。注意, 為了向后兼容考慮, 移除 'data' 事件監聽器并不會自動暫停流。同樣的,當有導流目標時,調用 pause() 并不能保證流在那些目標排空后,請求更多數據時保持暫停狀態。
可讀流(Readable stream)例子包括:
當一個數據塊可以從流中讀出,將會觸發'readable'
事件.`
某些情況下, 如果沒有準備好,監聽一個 'readable'
事件將會導致一些數據從底層系統讀取到內部緩存。
javascript
var readble = getReadableStreamSomehow();
readable.on('readable', function() {
// there is some data to read now
});
一旦內部緩存排空,一旦有更多數據將會再次觸發 readable
事件。
chunk
{Buffer | String} 數據塊綁定一個 data
事件的監聽器(listener)到一個未明確暫停的流,會將流切換到流動模式。數據會盡額能的傳遞。
如果你像盡快的從流中獲取數據,這是最快的方法。
javascript
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
});
如果沒有更多的可讀數據,將會觸發這個事件。
注意,除非數據已經被完全消費, the end
事件才會觸發。 可以通過切換到流動模式(flowing mode)來實現,或者通過調用重復調用 read()
獲取數據,直到結束。
javascript
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
});
readable.on('end', function() {
console.log('there will be no more data.');
});
當底層資源(例如源頭的文件描述符)關閉時觸發。并不是所有流都會觸發這個事件。
當接收數據時發生錯誤觸發。
size
{Number} 可選參數, 需要讀入的數據量read()
方法從內部緩存中拉取數據。如果沒有可用數據,將會返回null
如果傳了 size
參數,將會返回相當字節的數據。如果size
不可用,將會返回 null
如果你沒有指定 size
參數。將會返回內部緩存的所有數據。
這個方法僅能再暫停模式(paused mode)里調用. 流動模式(flowing mode)下這個方法會被自動調用直到內存緩存排空。
javascript
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
var chunk;
while (null !== (chunk = readable.read())) {
console.log('got %d bytes of data', chunk.length);
}
});
如果這個方法返回一個數據塊, 它同時也會觸發['data'
事件][].
encoding
{String} 要使用的編碼.this
調用此函數會使得流返回指定編碼的字符串,而不是 Buffer 對象。例如,如果你調用readable.setEncoding('utf8')
,輸出數據將會是UTF-8 編碼,并且返回字符串。如果你調用 readable.setEncoding('hex')
,將會返回2進制編碼的數據。
該方法能正確處理多字節字符。如果不想這么做,僅簡單的直接拉取緩存并調buf.toString(encoding)
,可能會導致字節錯位。因此,如果你想以字符串讀取數據,請使用這個方法。
javascript
var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
assert.equal(typeof chunk, 'string');
console.log('got %d characters of string data', chunk.length);
});
this
這個方法讓可讀流(Readable stream)繼續觸發 data
事件.
這個方法會將流切換到流動模式(flowing mode). 如果你不想從流中消費數據,而想得到end
事件,可以調用 readable.resume()
來打開數據流。
javascript
var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function(chunk) {
console.log('got to the end, but did not read anything');
});
this
這個方法會使得流動模式(flowing mode)的流停止觸發 data
事件, 切換到流動模式(flowing mode). 并讓后續可用數據留在內部緩沖區中。
javascript
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
readable.pause();
console.log('there will be no more data for 1 second');
setTimeout(function() {
console.log('now data will start flowing again');
readable.resume();
}, 1000);
});
Boolean
這個方法返回readable
是否被客戶端代碼 明確的暫停(調用 readable.pause()
)。
var readable = new stream.Readable
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
destination
{Writable Stream} 寫入數據的目標options
{Object} 導流(pipe) 選項end
{Boolean} 讀取到結束符時,結束寫入者。默認 = true
這個方法從可讀流(Readable stream)拉取所有數據, 并將數據寫入到提供的目標中。自動管理流量,這樣目標不會快速的可讀流(Readable stream)淹沒。
可以導流到多個目標。
javascript
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
readable.pipe(writable);
這個函數返回目標流, 因此你可以建立導流鏈:
javascript
var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
例如, 模擬 Unix 的 cat
命令:
javascript
process.stdin.pipe(process.stdout);
默認情況下,當源數據流觸發 end
的時候調用end()
,所以 destination
不可再寫。傳 { end:false }
作為options
,可以保持目標流打開狀態。
這會讓 writer
保持打開狀態,可以在最后寫入"Goodbye" 。
javascript
reader.pipe(writer, { end: false });
reader.on('end', function() {
writer.end('Goodbye\n');
});
注意 process.stderr
和 process.stdout
直到進程結束才會關閉,無論是否指定
destination
{Writable Stream} 可選,指定解除導流的流這個方法會解除之前調用 pipe()
設置的鉤子( pipe()
)。
如果沒有指定 destination
,所有的 導流(pipe) 都會被移除。
如果指定了 destination
,但是沒有建立如果沒有指定 destination
,則什么事情都不會發生。
javascript
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(function() {
console.log('stop writing to file.txt');
readable.unpipe(writable);
console.log('manually close the file stream');
writable.end();
}, 1000);
chunk
{Buffer | String} 數據塊插入到讀隊列中這個方法很有用,當一個流正被一個解析器消費,解析器可能需要將某些剛拉取出的數據“逆消費”,返回到原來的源,以便流能將它傳遞給其它消費者。
如果你在程序中必須經常調用 stream.unshift(chunk)
,那你可以考慮實現Transform來替換(參見下文API for Stream Implementors)。
javascript
// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
var decoder = new StringDecoder('utf8');
var header = '';
function onReadable() {
var chunk;
while (null !== (chunk = stream.read())) {
var str = decoder.write(chunk);
if (str.match(/\n\n/)) {
// found the header boundary
var split = str.split(/\n\n/);
header += split.shift();
var remaining = split.join('\n\n');
var buf = new Buffer(remaining, 'utf8');
if (buf.length)
stream.unshift(buf);
stream.removeListener('error', callback);
stream.removeListener('readable', onReadable);
// now the body of the message can be read from the stream.
callback(null, header, stream);
} else {
// still reading the header.
header += str;
}
}
}
}
stream
{Stream} 一個舊式的可讀流(Readable stream)v0.10 版本之前的 Node 流并未實現現在所有流的API(更多信息詳見下文“兼容性”章節)。
如果你使用的是舊的 Node 庫,它觸發 'data'
事件,并擁有僅做查詢用的pause()
方法,那么你能使用wrap()
方法來創建一個Readable 流來使用舊版本的流,作為數據源。
你應該很少需要用到這個函數,但它會留下方便和舊版本的 Node 程序和庫交互。
例如:
javascript
var OldReader = require('./old-api-module.js').OldReader;
var oreader = new OldReader;
var Readable = require('stream').Readable;
var myReader = new Readable().wrap(oreader);
myReader.on('readable', function() {
myReader.read(); // etc.
});
可寫流(Writable stream )接口是你正把數據寫到一個目標的抽象。
可寫流(Writable stream )的例子包括:
chunk
{String | Buffer} 準備寫的數據encoding
{String} 編碼方式(如果chunk
是字符串)callback
{Function} 數據塊寫入后的回調這個方法向底層系統寫入數據,并在數據處理完畢后調用所給的回調。
返回值表示你是否應該繼續立即寫入。如果數據要緩存在內部,將會返回false
。否則返回 true
。
返回值僅供參考。即使返回 false
,你也可能繼續寫。但是寫會緩存在內存里,所以不要做的太過分。最好的辦法是等待drain
事件后,再寫入數據。
如果調用 writable.write(chunk)
返回 false, drain
事件會告訴你什么時候將更多的數據寫入到流中。
javascript
// Write the data to the supplied 可寫流(Writable stream ) 1MM times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
var i = 1000000;
write();
function write() {
var ok = true;
←上一篇: Node.js 加密
→下一篇:Node.js 網絡