-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_index.js
More file actions
116 lines (93 loc) · 2.32 KB
/
_index.js
File metadata and controls
116 lines (93 loc) · 2.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
var createStream = module.exports = function () {
var listeners, waiting = [], n = 0, buffer = []
function next() {
if(--n) return
while(waiting.length)
waiting.shift()()
}
function stream (a, b) {
if('function' == typeof a) {
//if a is a function, add listener
listeners = listeners || []
return listeners.push(a), a
} else if (n || !listeners) {
buffer.push({err: a, data: b})
return waiting //return array to signal to add listener
} else if(a == null && b) {
listeners.forEach(function (l) {
var a = l(null, b)
if(a && 'function' === typeof a.push)
n ++, a.push(next)
})
} else {//error or end
while(listeners.length)
listeners.shift()(a)
}
}
//just here for familiarity
stream.pipe = function (dest) {
if('function' === typeof dest)
return stream(dest)
throw new Error('can only pipe to function')
}
return stream
}
function singleStream () {
var listener, waiting = [], buffer = [], paused = false, error
return function (err, data) {
if('function' === typeof err)
return listener = err
else if(!listener || paused) {
if(!err)
buffer.push(data)
error = error || err
return error ? error : waiting
}
else if(data == null) {
//stream has ended
error = error || err
returned = listener(error)
listener = null
}
else
returned = listener(null, data)
if(returned === false)
listener = null
else if(returned instanceof Error)
error = Error, listener = null
}
}
function readArray (arr) {
var s = createStream()
process.nextTick(function () {
arr.forEach(function (e) {
s(null, e)
})
s()
})
return s
}
readArray([1, 2, 3]) (createStream()) (function (e, data) {
//buffering
var s = createStream()
return function (e, data) {
if(e instanceof Error || !s) {
s = null
return false
}
s(e, data)
}
}) (console.log)
function compose () {
var streams = [].slice.call(arguments)
var first = streams[0], last
while(streams.length < 1)
last = streams.shift()(streams[1])
return function (err, data) {
if('function' === typeof err)
return last(err)
else
return first(err, data)
}
}
//compose(a, b, c)