first commit
This commit is contained in:
153
node_modules/cloneable-readable/index.js
generated
vendored
Normal file
153
node_modules/cloneable-readable/index.js
generated
vendored
Normal file
@@ -0,0 +1,153 @@
|
||||
'use strict'
|
||||
|
||||
var PassThrough = require('readable-stream').PassThrough
|
||||
var inherits = require('inherits')
|
||||
var p = require('process-nextick-args')
|
||||
|
||||
function Cloneable (stream, opts) {
|
||||
if (!(this instanceof Cloneable)) {
|
||||
return new Cloneable(stream, opts)
|
||||
}
|
||||
|
||||
var objectMode = stream._readableState.objectMode
|
||||
this._original = stream
|
||||
this._clonesCount = 1
|
||||
|
||||
opts = opts || {}
|
||||
opts.objectMode = objectMode
|
||||
|
||||
PassThrough.call(this, opts)
|
||||
|
||||
forwardDestroy(stream, this)
|
||||
|
||||
this.on('newListener', onData)
|
||||
this.once('resume', onResume)
|
||||
|
||||
this._hasListener = true
|
||||
}
|
||||
|
||||
inherits(Cloneable, PassThrough)
|
||||
|
||||
function onData (event, listener) {
|
||||
if (event === 'data' || event === 'readable') {
|
||||
this._hasListener = false
|
||||
this.removeListener('newListener', onData)
|
||||
this.removeListener('resume', onResume)
|
||||
p.nextTick(clonePiped, this)
|
||||
}
|
||||
}
|
||||
|
||||
function onResume () {
|
||||
this._hasListener = false
|
||||
this.removeListener('newListener', onData)
|
||||
p.nextTick(clonePiped, this)
|
||||
}
|
||||
|
||||
Cloneable.prototype.clone = function () {
|
||||
if (!this._original) {
|
||||
throw new Error('already started')
|
||||
}
|
||||
|
||||
this._clonesCount++
|
||||
|
||||
// the events added by the clone should not count
|
||||
// for starting the flow
|
||||
this.removeListener('newListener', onData)
|
||||
var clone = new Clone(this)
|
||||
if (this._hasListener) {
|
||||
this.on('newListener', onData)
|
||||
}
|
||||
|
||||
return clone
|
||||
}
|
||||
|
||||
Cloneable.prototype._destroy = function (err, cb) {
|
||||
if (!err) {
|
||||
this.push(null)
|
||||
this.end()
|
||||
this.emit('close')
|
||||
}
|
||||
|
||||
p.nextTick(cb, err)
|
||||
}
|
||||
|
||||
function forwardDestroy (src, dest) {
|
||||
src.on('error', destroy)
|
||||
src.on('close', onClose)
|
||||
|
||||
function destroy (err) {
|
||||
src.removeListener('close', onClose)
|
||||
dest.destroy(err)
|
||||
}
|
||||
|
||||
function onClose () {
|
||||
dest.end()
|
||||
}
|
||||
}
|
||||
|
||||
function clonePiped (that) {
|
||||
if (--that._clonesCount === 0 && !that._readableState.destroyed) {
|
||||
that._original.pipe(that)
|
||||
that._original = undefined
|
||||
}
|
||||
}
|
||||
|
||||
function Clone (parent, opts) {
|
||||
if (!(this instanceof Clone)) {
|
||||
return new Clone(parent, opts)
|
||||
}
|
||||
|
||||
var objectMode = parent._readableState.objectMode
|
||||
|
||||
opts = opts || {}
|
||||
opts.objectMode = objectMode
|
||||
|
||||
this.parent = parent
|
||||
|
||||
PassThrough.call(this, opts)
|
||||
|
||||
forwardDestroy(parent, this)
|
||||
|
||||
parent.pipe(this)
|
||||
|
||||
// the events added by the clone should not count
|
||||
// for starting the flow
|
||||
// so we add the newListener handle after we are done
|
||||
this.on('newListener', onDataClone)
|
||||
this.on('resume', onResumeClone)
|
||||
}
|
||||
|
||||
function onDataClone (event, listener) {
|
||||
// We start the flow once all clones are piped or destroyed
|
||||
if (event === 'data' || event === 'readable' || event === 'close') {
|
||||
p.nextTick(clonePiped, this.parent)
|
||||
this.removeListener('newListener', onDataClone)
|
||||
}
|
||||
}
|
||||
|
||||
function onResumeClone () {
|
||||
this.removeListener('newListener', onDataClone)
|
||||
p.nextTick(clonePiped, this.parent)
|
||||
}
|
||||
|
||||
inherits(Clone, PassThrough)
|
||||
|
||||
Clone.prototype.clone = function () {
|
||||
return this.parent.clone()
|
||||
}
|
||||
|
||||
Cloneable.isCloneable = function (stream) {
|
||||
return stream instanceof Cloneable || stream instanceof Clone
|
||||
}
|
||||
|
||||
Clone.prototype._destroy = function (err, cb) {
|
||||
if (!err) {
|
||||
this.push(null)
|
||||
this.end()
|
||||
this.emit('close')
|
||||
}
|
||||
|
||||
p.nextTick(cb, err)
|
||||
}
|
||||
|
||||
module.exports = Cloneable
|
Reference in New Issue
Block a user