加载中...

Stream


稳定度: 2 - 稳定

流是一个被io.js内部的许多对象所实现的抽象接口。例如一个发往HTTP服务器的请求是一个留,stdout也是一个流。流可以是可读的,可写的或双向的。所有的流都是EventEmitter实例。

你可以通过require('stream')来取货Stream的基类。其中包括了Readable流,Writable流,Duplex流和Transform流的基类。

此文档分为三个章节。第一章节解释了在你的编程中使用流时需要的API。如果你不需要实现你自己的流式API,你可以在这里停止。

第二章节解释了你在构建你自己的流时需要的API,这些API是为了方便你这么做而设计的。

第三章节深入讲述了流的工作机制,包括一些内部的机制和函数,你不应该去改动它们除非你知道你在做什么。

面向流消费者的API

流可以是可读的,可写的,或双工的。

所有的流都是EventEmitters。但是它们也各自有一些独特的方法和属性,这取决于它们是可读流,可写流或双工流。

如果一个流同时是可读的和可写的,那么表示它实现了以下所有的方法和事件。所以,这些API同时也涵盖DuplexTransform流,即使它们的实现可能有些不同。

在你程序中,为了消费流而去实现流接口不是必须的。如果你确实正在你的程序中实现流接口,请参考下一章节面向流实现者的API

几乎所有io.js程序,不论多简单,都使用了流。下面是一个在io.js是使用流的例子:

var http = require('http');

var server = http.createServer(function (req, res) {
  // req is an http.IncomingMessage, which is a Readable Stream
  // res is an http.ServerResponse, which is a Writable Stream

  var body = '';
  // we want to get the data as utf8 strings
  // If you don't set an encoding, then you'll get Buffer objects
  req.setEncoding('utf8');

  // Readable streams emit 'data' events once a listener is added
  req.on('data', function (chunk) {
    body += chunk;
  });

  // the end event tells you that you have entire body
  req.on('end', function () {
    try {
      var data = JSON.parse(body);
    } catch (er) {
      // uh oh!  bad json!
      res.statusCode = 400;
      return res.end('error: ' + er.message);
    }

    // write back something interesting to the user:
    res.write(typeof data);
    res.end();
  });
});

server.listen(1337);

// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o

Class: stream.Readable

可读流接口是一个你可以从之读取数据的数据源的抽象。换句话说,数据从可读流而来。

除非你指示已经准备好接受数据,否则可读流不会开始发生数据。

可读流有两个“模式”:流动模式和暂停模式。当在流动模式时,数据由底层系统读出,并且会尽快地提供给你的程序。当在暂停模式时,你必须调用stream.read()方法来获取数据块。流默认是暂停模式。

注意:如果data事件没有被绑定监听器,并且没有导流(pipe)目标,并且流被切换到了流动模式,那么数据将会被丢失。

你可以通过下面任意一个做法切换到流动模式:

  • 添加一个data事件的监听器来监听数据。

  • 调用resume()方法来明确开启流动模式。

  • 调用pipe()方法将数据导入一个可写流。

你可以同意下面任意一种方法切换回暂停模式:

  • 如果没有导流(pipe)目标,调用pause()方法。

  • 如果有导流(pipe)目标,移除所有的data事件监听器,并且通过unpipe()方法移除所有导流目标。

注意,由于为了向后兼任的原因,移除data事件的监听器将不会自动暂停流。同样的,如果有导流目标,调用pause()方法将不会保证目标流排空并请求更多数据时保持暂停。

一些内置的可读流例子:

  • 客户端的HTTP请求
  • 服务端的HTTP响应
  • 文件系统读取流
  • zlib
  • crypto
  • tcp sockets
  • 子进程的stdout和stderr
  • process.stdin

Event: 'readable'

当一个数据块能可以从流中被读出时,会触发一个readable事件。

某些情况下,监听一个readable事件会导致一些将要被读出的数据从底层系统进入内部缓冲,如果它没有准备好。

var readable = getReadableStreamSomehow();
readable.on('readable', function() {
  // there is some data to read now
});

当内部缓冲被排空时,一旦有更多数据,readable事件会再次触发。

Event: 'data'

  • chunk Buffer | String 数据块

为一个没有被暂停的流添加一个data事件的监听器会使其切换到流动模式。之后数据会被尽快得传递给用户。

如果你只是想尽快得从流中取得所有数据,这是最好的方式。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
});

Event: 'end'

当没有更多可读的数据时这个事件会被触发。

注意,除非数据被完全消费,end事件才会触发。这可以通过切换到流动模式,或重复调用read()方法。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
});
readable.on('end', function() {
  console.log('there will be no more data.');
});

Event: 'close'

当底层资源(如源头的文件描述符)被关闭时触发。不是所有的流都会触发这个事件。

Event: 'error'

  • Error Object

当接受数据时有错误发生,会触发此事件。

readable.read([size])

  • size Number 可选,指定读取数据的数量
  • Return String | Buffer | null

read()方法从内部缓冲中取出数据并返回它。如果没有可用数据,那么将返回null

如果你传递了一个size参数,那么它将返回指定字节的数据。如果size参数的字节数不可用,那么将返回null

如果你不指定size参数,那么将会返回内部缓冲中的所有数据。

这个方法只能在暂定模式中被调用。在流动模式下,这个方法会被自动地重复调用,知道内部缓冲被排空。

var readable = getReadableStreamSomehow();
readable.on('readable', function() {
  var chunk;
  while (null !== (chunk = readable.read())) {
    console.log('got %d bytes of data', chunk.length);
  }
});

如果这个方法返回一个数据块,那么它也会触发data事件。

readable.setEncoding(encoding)

  • encoding String 使用的编码
  • Return: this

调用这个函数会导致流返回指定编码的字符串而不是Buffer对象。例如,如果你调用readable.setEncoding('utf8'),那么输出的数据将被解释为UTF-8数据,并且作为字符串返回。如果你调用了readable.setEncoding('hex'),那么数据将被使用十六进制字符串的格式编码。

该方法可以正确地处理多字节字符。如果你只是简单地直接取出缓冲并且对它们调用buf.toString(encoding),将会导致错位。如果你想使用字符串读取数据,请使用这个方法。

var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});

readable.resume()

  • Return: this

这个方法将会让可读流继续触发data事件。

这个方法将会使流切换至流动模式。如果你不想消费流中的数据,但你想监听它的end事件,你可以通过调用readable.resume()来打开数据流。

var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function() {
  console.log('got to the end, but did not read anything');
});

readable.pause()

  • Return: this

这个方法会使一个处于流动模式的流停止触发data事件,并切换至暂停模式。所有可用的数据将仍然存在于内部缓冲中。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
  readable.pause();
  console.log('there will be no more data for 1 second');
  setTimeout(function() {
    console.log('now data will start flowing again');
    readable.resume();
  }, 1000);
});

readable.isPaused()

  • Return: Boolean

这个方法会返回流是否被客户端代码所暂停(调用readable.pause(),并且没有在之后调用readable.resume())。

var readable = new stream.Readable

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false

readable.pipe(destination[, options])

  • destination Writable Stream 写入数据的目标
  • options Object

  • end Boolean 当读取者结束时结束写入者。默认为true

这个方法会取出可读流中所有的数据,并且将之写入指定的目标。这个方法会自动调节流量,所以当快速读取可读流时目标不会溢出。

可以将数据安全地导流至多个目标。

var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
readable.pipe(writable);

这个函数返回目标流,所以你可以链式调用pipe()

var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

例子,模仿UNIX的cat命令:

process.stdin.pipe(process.stdout);

默认情况下,当源流触发end事件时,目标流会被调用end()方法,然后目标就不再是可写的了。将传递{ end: false }作为options参数,将保持目标流开启。

例子,保持被写入的流开启,所以“Goodbye”可以在末端被写入:

reader.pipe(writer, { end: false });
reader.on('end', function() {
  writer.end('Goodbye\n');
});

注意,不论指定任何options参数,process.stderrprocess.stdout在程序退出前永远不会被关闭。

readable.unpipe([destination])

  • destination Writable Stream 可选,指定解除导流的流

这方法会移除之前调用pipe()方法所设置的钩子。

如果没有指定目标,那么所有的导流都会被移除。

如果指定了目标,但是并没有为目标设置导流,那么什么都不会发生。

var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(function() {
  console.log('stop writing to file.txt');
  readable.unpipe(writable);
  console.log('manually close the file stream');
  writable.end();
}, 1000);

readable.unshift(chunk)

  • chunk Buffer | String 要插回读取队列开头的数据块。

该方法在许多场景中都很有用,比如一个流正在被一个解析器消费,解析器可能需要将某些刚拉取出的数据“逆消费”回来源,以便流能将它传递给其它消费者。

如果你发现你必须经常在你的程序中调用stream.unshift(chunk),你应该考虑实现一个Transform流(参阅下文的面向流实现者的API)。

// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  var decoder = new StringDecoder('utf8');
  var header = '';
  function onReadable() {
    var chunk;
    while (null !== (chunk = stream.read())) {
      var str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // found the header boundary
        var split = str.split(/\n\n/);
        header += split.shift();
        var remaining = split.join('\n\n');
        var buf = new Buffer(remaining, 'utf8');
        if (buf.length)
          stream.unshift(buf);
        stream.removeListener('error', callback);
        stream.removeListener('readable', onReadable);
        // now the body of the message can be read from the stream.
        callback(null, header, stream);
      } else {
        // still reading the header.
        header += str;
      }
    }
  }
}

readable.wrap(stream)

  • stream Stream 一个“旧式”可读流

Node.js v0.10 以及之前版本的流没有完全包含如今的所有的流API(更多的信息请参阅下文的“兼容性”)。

如果你正在使用一个老旧的io.js库,它触发data时间并且有一个仅作查询用途的pause()方法,那么你可以调用wrap()方法来创建一个使用“旧式”流作为数据源的可读流。

你几乎不会用到这个函数,它的存在仅是为了老旧的io.js程序和库交互。

例子:

var OldReader = require('./old-api-module.js').OldReader;
var oreader = new OldReader;
var Readable = require('stream').Readable;
var myReader = new Readable().wrap(oreader);

myReader.on('readable', function() {
  myReader.read(); // etc.
});

Class: stream.Writable

可写流接口是一个你可以向其写入数据的目标的抽象。

一些内部的可写流例子:

  • 客户端的http请求
  • 服务端的http响应
  • 文件系统写入流
  • zlib
  • crypto
  • tcp socket
  • 子进程stdin
  • process.stdoutprocess.stderr

writable.write(chunk[, encoding][, callback])

  • chunk String | Buffer 要写入的数据
  • encoding String 编码,如果数据块是字符串
  • callback Function 当数据块写入完毕后调用的回调函数
  • Returns: Boolean 如果被全部处理则返回true

该方法向底层系统写入数据,并且当数据被全部处理后调用指定的回调函数。

返回值指示了你是否可以立刻写入数据。如果数据需要被内部缓冲,会返回false。否则返回true

返回值经供参考。即使返回false,你仍可以继续写入数据。但是,写入的数据将会被缓冲在内存里,所以最好不要这样做。应该在写入更多数据前等待drain事件。

Event: 'drain'

如果一个writable.write(chunk)调用返回了false,那么drain事件会指示出可以继续向流写入数据的时机。

// Write the data to the supplied writable stream 1MM times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

writable.cork()

强制滞留所有写入。

滞留的数据会在调用.uncork().end()方法后被写入。

writable.uncork()

写入在调用.cork()方法所有被滞留的数据。

writable.setDefaultEncoding(encoding)

  • encoding String 新的默认编码

设置一个可写流的默认编码。

writable.end([chunk][, encoding][, callback])

  • chunk String | Buffer 可选,写入的数据
  • encoding String 编码,如果数据块是字符串
  • callback Function 可选,回调函数

当没有更多可写的数据时,调用这个方法。如果指定了回调函数,那么会被添加为finish事件的监听器。

在调用了end()后调用write()会导致一个错误。

// write 'hello, ' and then end with 'world!'
var file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// writing more now is not allowed!

Event: 'finish'

当调用了end()方法,并且所有的数据都被写入了底层系统,这个事件会被触发。

var writer = getWritableStreamSomehow();
for (var i = 0; i < 100; i ++) {
  writer.write('hello, #' + i + '!\n');
}
writer.end('this is the end\n');
writer.on('finish', function() {
  console.error('all writes are now complete.');
});

Event: 'pipe'

  • src Readable Stream 对这个可写流进行导流的源可读流

这个事件将会在可读流被一个可写流使用pipe()方法进行导流时触发。

var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('pipe', function(src) {
  console.error('something is piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);

Event: 'unpipe'

  • src Readable Stream 对这个可写流停止导流的源可读流

当可读流对其调用unpipe()方法,在源可读流的目标集合中删除这个可写流,这个事件将会触发。

var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('unpipe', function(src) {
  console.error('something has stopped piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);

Event: 'error'

  • Error object

在写入数据或导流发生错误时触发。

Class: stream.Duplex

双工是同时实现了可读流与可写流的借口。它的用处请参阅下文。

内部双工流的例子:

  • tcp socket
  • zlib
  • crypto

Class: stream.Transform

转换流是一种输出由输入计算所得的栓共流。它们同时集成了可读流与可写流的借口。它们的用处请参阅下文。

内部转换流的例子:

  • zlib
  • crypto

面向流实现者的API

实现所有种类的流的模式都是一样的:

  1. 为你的子类继承合适的父类(util.inherits非常合适于做这个)。
  2. 为了保证内部机制被正确初始化,在你的构造函数中调用合适的父类构造函数。
  3. 实现一个或多个特定的方法,参阅下文。

被扩展的类和要实现的方法取决于你要编写的流类的类型:

用途 需要实现的方法
只读 Readable _read
只写 Writable _write, _writev
可读以及可写 Duplex _read, _write, _writev
操作被写入数据,然后读出结果 Transform _transform, _flush

在你的实现代码中,非常重要的一点是永远不要调用上文的面向流消费者的API。否则,你在程序中消费你的流接口时可能有潜在的副作用。

Class: stream.Readable

stream.Readable是一个被设计为需要实现底层的_read(size)方法的抽象类。

请参阅上文的面向流消费者的API来了解如何在程序中消费流。以下解释了如果在你的程序中实现可读流。

例子:一个计数流

这是一个可读流的基础例子。它从1到1,000,000递增数字,然后结束。

var Readable = require('stream').Readable;
var util = require('util');
util.inherits(Counter, Readable);

function Counter(opt) {
  Readable.call(this, opt);
  this._max = 1000000;
  this._index = 1;
}

Counter.prototype._read = function() {
  var i = this._index++;
  if (i > this._max)
    this.push(null);
  else {
    var str = '' + i;
    var buf = new Buffer(str, 'ascii');
    this.push(buf);
  }
};

例子:简单协议 v1 (次优)

这类似于上文中提到的parseHeader函数,但是使用一个自定义流实现。另外,注意这个实现不将流入的数据转换为字符串。

更好地实现是作为一个转换流实现,请参阅下文更好地实现。

// A parser for a simple data protocol.
// The "header" is a JSON object, followed by 2 \n characters, and
// then a message body.
//
// NOTE: This can be done more simply as a Transform stream!
// Using Readable directly for this is sub-optimal.  See the
// alternative example below under the Transform section.

var Readable = require('stream').Readable;
var util = require('util');

util.inherits(SimpleProtocol, Readable);

function SimpleProtocol(source, options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(source, options);

  Readable.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;

  // source is a readable stream, such as a socket or file
  this._source = source;

  var self = this;
  source.on('end', function() {
    self.push(null);
  });

  // give it a kick whenever the source is readable
  // read(0) will not consume any bytes
  source.on('readable', function() {
    self.read(0);
  });

  this._rawHeader = [];
  this.header = null;
}

SimpleProtocol.prototype._read = function(n) {
  if (!this._inBody) {
    var chunk = this._source.read();

    // if the source doesn't have data, we don't have data yet.
    if (chunk === null)
      return this.push('');

    // check if the chunk has a \n\n
    var split = -1;
    for (var i = 0; i < chunk.length; i++) {
      if (chunk[i] === 10) { // '\n'
        if (this._sawFirstCr) {
          split = i;
          break;
        } else {
          this._sawFirstCr = true;
        }
      } else {
        this._sawFirstCr = false;
      }
    }

    if (split === -1) {
      // still waiting for the \n\n
      // stash the chunk, and try again.
      this._rawHeader.push(chunk);
      this.push('');
    } else {
      this._inBody = true;
      var h = chunk.slice(0, split);
      this._rawHeader.push(h);
      var header = Buffer.concat(this._rawHeader).toString();
      try {
        this.header = JSON.parse(header);
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
        return;
      }
      // now, because we got some extra data, unshift the rest
      // back into the read queue so that our consumer will see it.
      var b = chunk.slice(split);
      this.unshift(b);

      // and let them know that we are done parsing the header.
      this.emit('header', this.header);
    }
  } else {
    // from there on, just provide the data to our consumer.
    // careful not to push(null), since that would indicate EOF.
    var chunk = this._source.read();
    if (chunk) this.push(chunk);
  }
};

// Usage:
// var parser = new SimpleProtocol(source);
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.

new stream.Readable([options])

  • options Object

  • highWaterMark Number 在停止从底层资源读取之前,在内部缓冲中存储的最大字节数。默认为16kb,对于objectMode则是16
  • encoding String 如果被指定,那么缓冲将被利用指定编码解码为字符串,默认为null
  • objectMode Boolean 是否该流应该表现如一个对象的流。意思是说stream.read(n)返回一个单独的对象而不是一个大小为nBuffer,默认为false

在实现了Readable类的类中,请确保调用了Readable构造函数,这样缓冲设置才能被正确的初始化。

readable._read(size)

  • size Number 异步读取数据的字节数

注意:实现这个函数,而不要直接调用这个函数。

这个函数不应该被直接调用。它应该被子类实现,并且仅被Readable类的内部方法调用。

所有的可读流都必须实现这个方法用来从底层资源中获取数据。

这个函数有一个下划线前缀,因为它对于类是内部的,并应该直接被用户的程序调用。你应在你的拓展类里覆盖这个方法。

当数据可用时,调用readable.push(chunk)方法将之推入读取队列。如果方法返回false,那么你应当停止读取。当_read方法再次被调用,你应当推入更多数据。

参数size仅作查询。“read”调用返回数据的实现可以通过这个参数来知道应当抓取多少数据;其余与之无关的实现,比如TCP或TLS,则可忽略这个参数,并在可用时返回数据。例如,没有必要“等到”size个字节可用时才调用stream.push(chunk)

readable.push(chunk[, encoding])

  • chunk Buffer | null | String 被推入读取队列的数据块
  • encoding String 字符串数据块的编码。必须是一个合法的Buffer编码,如'utf8'或'ascii'
  • return Boolean 是否应该继续推入

注意:这个函数应该被Readable流的实现者调用,而不是消费者。

_read()函数在至少调用一次push(chunk)方法前,不会被再次调用。

Readable类通过在readable事件触发时,调用read()方法将数据推入 之后用于读出数据的读取队列 来工作。

push()方法需要明确地向读取队列中插入数据。如果它的参数为null,那么它将发送一个数据结束信号(EOF)。

这个API被设计为尽可能的灵活。例如,你可能正在包装一个有pause/resume机制和一个数据回调函数的低级别源。那那些情况下,你可以通过以下方式包装这些低级别源:

// source is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

util.inherits(SourceWrapper, Readable);

function SourceWrapper(options) {
  Readable.call(this, options);

  this._source = getLowlevelSourceObject();
  var self = this;

  // Every time there's data, we push it into the internal buffer.
  this._source.ondata = function(chunk) {
    // if push() returns false, then we need to stop reading from source
    if (!self.push(chunk))
      self._source.readStop();
  };

  // When the source ends, we push the EOF-signaling `null` chunk
  this._source.onend = function() {
    self.push(null);
  };
}

// _read will be called when the stream wants to pull more data in
// the advisory size argument is ignored in this case.
SourceWrapper.prototype._read = function(size) {
  this._source.readStart();
};

Class: stream.Writable

stream.Writable是一个被设计为需要实现底层的_write(chunk, encoding, callback)方法的抽象类。

请参阅上文的面向流消费者的API来了解如何在程序中消费流。以下解释了如果在你的程序中实现可写流。

new stream.Writable([options])

  • options Object

  • highWaterMark Number write()方法开始返回false的缓冲级别。默认为16kb,对于objectMode流则是16
  • decodeStrings Boolean 是否在传递给write()方法前将字符串解码成Buffer。默认为true
  • objectMode Boolean 是否write(anyObj)为一个合法操作。如果设置为true你可以写入任意数据而不仅是Buffer或字符串数据。默认为false

在实现了Writable类的类中,请确保调用了Writable构造函数,这样缓冲设置才能被正确的初始化。

writable._write(chunk, encoding, callback)

  • chunk Buffer | String 将要被写入的数据块。除非decodeStrings配置被设置为false,否则将一直是一个buffer
  • encoding String 如果数据块是一个字符串,那么这就是编码的类型。如果是一个buffer,那么则会忽略它
  • callback Function 当你处理完给定的数据块后调用这个函数

所有的Writable流的实现都必须提供一个_write()方法来给底层资源传输数据。

这个函数不应该被直接调用。它应该被子类实现,并且仅被Writable类的内部方法调用。

回调函数使用标准的callback(error)模式来表示这个写操作成功或发生了错误。

如果构造函数选项中设置了decodeStrings标志,那么数据块将是一个字符串而不是一个Buffer,编码将会决定字符串的类型。这个是为了帮助处理编码字符串的实现。如果你没有明确地将decodeStrings选项设为false,那么你会安全地忽略encoding参数,并且数据块是Buffer形式。

这个函数有一个下划线前缀,因为它对于类是内部的,并应该直接被用户的程序调用。你应在你的拓展类里覆盖这个方法。

writable._writev(chunks, callback)

  • chunks Array 将被写入的数据块数组。其中每一个数据都有如下格式:{ chunk: ..., encoding: ... }
  • callback Function 当你处理完给定的数据块后调用这个函数

注意:这个函数不应该被直接调用。它应该被子类实现,并且仅被Writable类的内部方法调用。

这个函数对于你的实现是完全可选的。大多数情况下它是不必的。如果实现,它会被以所有滞留在写入队列中的数据块调用。

Class: stream.Duplex

一个“双工”流既是可读的,又是可写的。如TCPsocket连接。

注意,和你实现ReadableWritable流时一样,stream.Duplex是一个被设计为需要实现底层的_read(size)_write(chunk, encoding, callback)方法的抽象类。

由于JavaScript并不具备多继承能力,这个类是继承于Readable类,并寄生于Writable类。所以为了实现这个类,用户需要同时实现低级别的_read(n)方法和低级别的_write(chunk, encoding, callback)方法。

new stream.Duplex(options)

  • options Object 同时传递给WritableReadable构造函数。并且包含以下属性:

  • allowHalfOpen Boolean 默认为true。如果设置为false,那么流的可读的一端结束时可写的一端也会自动结束,反之亦然。
  • readableObjectMode Boolean 默认为false,为流的可读的一端设置objectMode。当objectModetrue时没有效果。
  • writableObjectMode Boolean 默认为false,为流的可写的一端设置objectMode。当objectModetrue时没有效果。

在实现了Duplex类的类中,请确保调用了Duplex构造函数,这样缓冲设置才能被正确的初始化。

Class: stream.Transform

“转换”流是一个输出于输入存在对应关系的双工流,如一个zilib流或一个crypto流。

输出和输出并不需要有相同的大小,相同的数据块数或同时到达。例如,一个哈希流只有一个单独数据块的输出当输入结束时。一个zlib流的输出比其输入小得多或大得多。

除了实现_read()方法和_write()方法,转换流还必须实现_transform()方法,并且可选地实现_flush()方法(参阅下文)。

new stream.Transform([options])

  • options Object 同时传递给WritableReadable构造函数。

在实现了Transform类的类中,请确保调用了Transform构造函数,这样缓冲设置才能被正确的初始化。

transform._transform(chunk, encoding, callback)

  • chunk Buffer | String 将要被写入的数据块。除非decodeStrings配置被设置为false,否则将一直是一个buffer
  • encoding String 如果数据块是一个字符串,那么这就是编码的类型。如果是一个buffer,那么则会忽略它
  • callback Function 当你处理完给定的数据块后调用这个函数

这个函数不应该被直接调用。它应该被子类实现,并且仅被Transform类的内部方法调用。

所有Transform流的实现都必须提供一个_transform方法来接受输入和产生输出。

Transform类中,_transform可以做需要做的任何事,如处理需要写入的字节,将它们传递给可写端,异步I/O,等等。

调用transform.push(outputChunk)0次或多次来从输入的数据块产生输出,取决于你想从这个数据块中输出多少数据作为结果。

仅当目前的数据块被完全消费后,才会调用回调函数。注意,对于某些特殊的输入可能会没有输出。如果你将数据作为第二个参数传入回调函数,那么数据将被传递给push方法。换句话说,下面的两个例子是相等的:

transform.prototype._transform = function (data, encoding, callback) {
  this.push(data);
  callback();
}

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data);
}

这个函数有一个下划线前缀,因为它对于类是内部的,并应该直接被用户的程序调用。你应在你的拓展类里覆盖这个方法。

transform._flush(callback)

  • callback Function 当你排空了所有剩余数据后,这个回调函数会被调用

注意:这个函数不应该被直接调用。它应该被子类实现,并且仅被Transform类的内部方法调用。

在一些情景中,你的转换操作需要在流的末尾多发生一点点数据。例如,一个Zlib压缩流会存储一些内部状态以便它能优化压缩输出。但是在最后,它需要尽可能好得处理这些留下的东西来使数据完整。

在这种情况中,您可以实现一个_flush方法,它会在最后被调用,在所有写入数据被消费、但在触发end表示可读端到达末尾之前。和_transform一样,只需在写入操作完成时适当地调用transform.push(chunk)零或多次。

这个函数有一个下划线前缀,因为它对于类是内部的,并应该直接被用户的程序调用。你应在你的拓展类里覆盖这个方法。

Events: 'finish' 和 'end'

finishend事件分别来自于父类WritableReadablefinish事件在end()方法被调用以及所有的输入被_transform方法处理后触发。end事件在所有的在_flush方法的回调函数被调用后的数据被输出后触发。

Example: SimpleProtocol 解释器 v2

上文中的简单协议解释器可以简单地通过高级别的Transform流更好地实现。与上文例子中的parseHeaderSimpleProtocol v1相似。

在这个例子中,没有从参数中提供输入,然后将它导流至解释器中,这更符合io.js的使用习惯。

var util = require('util');
var Transform = require('stream').Transform;
util.inherits(SimpleProtocol, Transform);

function SimpleProtocol(options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(options);

  Transform.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;
  this._rawHeader = [];
  this.header = null;
}

SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
  if (!this._inBody) {
    // check if the chunk has a \n\n
    var split = -1;
    for (var i = 0; i < chunk.length; i++) {
      if (chunk[i] === 10) { // '\n'
        if (this._sawFirstCr) {
          split = i;
          break;
        } else {
          this._sawFirstCr = true;
        }
      } else {
        this._sawFirstCr = false;
      }
    }

    if (split === -1) {
      // still waiting for the \n\n
      // stash the chunk, and try again.
      this._rawHeader.push(chunk);
    } else {
      this._inBody = true;
      var h = chunk.slice(0, split);
      this._rawHeader.push(h);
      var header = Buffer.concat(this._rawHeader).toString();
      try {
        this.header = JSON.parse(header);
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
        return;
      }
      // and let them know that we are done parsing the header.
      this.emit('header', this.header);

      // now, because we got some extra data, emit this first.
      this.push(chunk.slice(split));
    }
  } else {
    // from there on, just provide the data to our consumer as-is.
    this.push(chunk);
  }
  done();
};

// Usage:
// var parser = new SimpleProtocol();
// source.pipe(parser)
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.

Class: stream.PassThrough

这是一个Transform流的实现。将输入的流简单地传递给输出。它的主要目的是用来演示和测试,但它在某些需要构建特殊流的情况下可能有用。

简化的构造器API

可以简单的构造流而不使用继承。

这可以通过调用合适的方法作为构造函数和参数来实现:

例子:

Readable

var readable = new stream.Readable({
  read: function(n) {
    // sets this._read under the hood
  }
});

Writable

var writable = new stream.Writable({
  write: function(chunk, encoding, next) {
    // sets this._write under the hood
  }
});

// or

var writable = new stream.Writable({
  writev: function(chunks, next) {
    // sets this._writev under the hood
  }
});

Duplex

var duplex = new stream.Duplex({
  read: function(n) {
    // sets this._read under the hood
  },
  write: function(chunk, encoding, next) {
    // sets this._write under the hood
  }
});

// or

var duplex = new stream.Duplex({
  read: function(n) {
    // sets this._read under the hood
  },
  writev: function(chunks, next) {
    // sets this._writev under the hood
  }
});

Transform

var transform = new stream.Transform({
  transform: function(chunk, encoding, next) {
    // sets this._transform under the hood
  },
  flush: function(done) {
    // sets this._flush under the hood
  }
});

流:内部细节

缓冲

Writable流和Readable流都会分别在一个内部的叫_writableState.buffer_readableState.buffer的对象里缓冲数据。

潜在的被缓冲的数据量取决于被传递给构造函数的highWaterMark参数。

Readable流中,当其的实现调用stream.push(chunk)时就会发生缓冲。如果流的消费者没有调用stream.read(),那么数据就会保留在内部队列中直到它被消费。

Writable流中,当用户重复调用stream.write(chunk)时就会发生缓冲,甚至是当write()返回false时。

流,尤其是pipe()方法的初衷,是限制数据的滞留量在一个可接受的水平,这样才使得不同传输速度的来源和目标不会淹没可用的内存。

stream.read(0)

在一些情况下,你想不消费任何数据而去触发一次底层可读流机制的刷新。你可以调用stream.read(0),它总是返回null

如果内部的读缓冲量在highWaterMark之下,并且流没有正在读取,那么调用read(0)将会触发一次低级别的_read调用。

几乎永远没有必须这么做。但是,你可能会在io.jsReadable流类的内部代码的几处看到这个。

stream.push('')

推入一个0字节的字符串或Buffer(不处于对象模式)有一个有趣的副作用。因为这是一个stream.push()的调用,它将会结束读取进程。但是,它不添加任何数据到可读缓冲中,所以没有任何用户可消费的数据。

在极少的情况下,你当下没有数据可以提供,但你的消费者同过调用stream.read(0)来得知合适再次检查。在这样的情况下,你可以调用stream.push('')

至今为止,这个功能的唯一使用之处是在tls.CryptoStream类中,它将在io.js的1.0版本中被废弃。如果你发现你不得不使用stream.push(''),请考虑使用另外的方式。因为这几乎表示发生了某些可怕的错误。

与旧版本的Node.js的兼容性

Node.js的0.10版本之前,可读流接口非常简单,并且功能和功用都不强。

  • data事件会立刻触发,而不是等待你调用read()方法。如果你需要进行一些I/O操作来决定是否处理数据,那么你只能将数据存储在某些缓冲区中以防数据流失。
  • pause()仅供查询,并不保证生效。这意味着你还是要准备接收data事件在流已经处于暂停模式中时。

io.js v1.0 和Node.js v0.10中,下文所述的Readable类添加进来。为了向后兼容性,当一个data事件的监听器被添加时或resume()方法被调用时,可读流切换至流动模式。其作用是,即便您不使用新的read()方法和readable事件,您也不必担心丢失数据块。

大多数程序都会保持功能正常,但是,以下有一些边界情况:

  • 没有添加任何data事件
  • 从未调用resume()方法
  • 流没有被导流至任何可写的目标

例如,考虑以下代码:

// WARNING!  BROKEN!
net.createServer(function(socket) {

  // we add an 'end' method, but never consume the data
  socket.on('end', function() {
    // It will never get here.
    socket.end('I got your message (but didnt read it)\n');
  });

}).listen(1337);

Node.js v0.10前,到来的信息数据会被简单地丢弃。但是在io.js v1.0 和Node.js v0.10后,socket会被永远暂停。

解决方案是调用resume()方法来开启数据流:

// Workaround
net.createServer(function(socket) {

  socket.on('end', function() {
    socket.end('I got your message (but didnt read it)\n');
  });

  // start the flow of data, discarding it.
  socket.resume();

}).listen(1337);

除了新的Readable流切换至流动模式之外,在v0.10之前的流可以被使用wrap()方法包裹。

对象模式

通常情况下,流仅操作字符串和Buffer

处于对象模式中的流除了Buffer和字符串外,还能读出普通的JavaScirpt值。

处于对象模式中的可读流在调用stream.read(size)后只会返回单个项目,不论size参数是什么。

处于对象模式中的可写流总是忽略stream.write(data, encoding)中的encoding参数。

对于处于对象模式中的流,特殊值null仍然保留它的特殊意义。也就是说,对于对象模式的可读流,stream.read()返回一个null仍意味着没有更多的数据了,并且stream.push(null)会发送一个文件末端信号(EOF)。

核心io.js中没有流是对象模式的。这个模式仅仅供用户的流库使用。

你应当在子类的构造函数的options参数对象中设置对象模式。在流的过程中设置对象模式时不安全的。

对于双工流,可以分别得通过readableObjectModewritableObjectMode设置可读端和可写端。这些配置可以被用来通过转换流实现解释器和序列化器。

var util = require('util');
var StringDecoder = require('string_decoder').StringDecoder;
var Transform = require('stream').Transform;
util.inherits(JSONParseStream, Transform);

// Gets \n-delimited JSON string data, and emits the parsed objects
function JSONParseStream() {
  if (!(this instanceof JSONParseStream))
    return new JSONParseStream();

  Transform.call(this, { readableObjectMode : true });

  this._buffer = '';
  this._decoder = new StringDecoder('utf8');
}

JSONParseStream.prototype._transform = function(chunk, encoding, cb) {
  this._buffer += this._decoder.write(chunk);
  // split on newlines
  var lines = this._buffer.split(/\r?\n/);
  // keep the last partial line buffered
  this._buffer = lines.pop();
  for (var l = 0; l < lines.length; l++) {
    var line = lines[l];
    try {
      var obj = JSON.parse(line);
    } catch (er) {
      this.emit('error', er);
      return;
    }
    // push the parsed object out to the readable consumer
    this.push(obj);
  }
  cb();
};

JSONParseStream.prototype._flush = function(cb) {
  // Just handle any leftover
  var rem = this._buffer.trim();
  if (rem) {
    try {
      var obj = JSON.parse(rem);
    } catch (er) {
      this.emit('error', er);
      return;
    }
    // push the parsed object out to the readable consumer
    this.push(obj);
  }
  cb();
};

还没有评论.