forked from mikeal/morestreams
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.js
116 lines (105 loc) · 2.79 KB
/
main.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
var stream = require('stream')
, fs = require('fs')
, util = require('util')
;
var BufferedStream = function (limit) {
if (typeof limit === 'undefined') {
limit = Infinity;
}
this.limit = limit;
this.size = 0;
this.chunks = [];
this.writable = true;
this.readable = true;
}
util.inherits(BufferedStream, stream.Stream);
BufferedStream.prototype.pipe = function (dest, options) {
var self = this
if (self.resume) self.resume();
stream.Stream.prototype.pipe.call(self, dest, options)
//just incase you are piping to two streams, do not emit data twice.
//note: you can pipe twice, but you need to pipe both streams in the same tick.
//(this is normal for streams)
if(this.piped)
return dest
process.nextTick(function () {
self.chunks.forEach(function (c) {self.emit('data', c)})
self.size = 0;
delete self.chunks;
if(self.ended){
self.emit('end')
}
})
this.piped = true
return dest
}
BufferedStream.prototype.write = function (chunk) {
if (!this.chunks) {
this.emit('data', chunk);
return;
}
this.chunks.push(chunk);
this.size += chunk.length;
if (this.limit < this.size) {
this.pause();
}
}
BufferedStream.prototype.end = function () {
if(!this.chunks)
this.emit('end');
else
this.ended = true
}
if (!stream.Stream.prototype.pause) {
BufferedStream.prototype.pause = function() {
this.emit('pause');
};
}
if (!stream.Stream.prototype.resume) {
BufferedStream.prototype.resume = function() {
this.emit('resume');
};
}
exports.BufferedStream = BufferedStream;
var UpgradableStream = function () {
var self = this;
self.upgradable = false;
self.on('pipe', function (source) {
self.source = source;
self.upgradable = (source instanceof fs.ReadStream);
})
};
util.inherits(UpgradableStream, stream.Stream);
UpgradableStream.prototype.pipe = function () {
var dest = this.dest = arguments[0]
, destfd
;
if (dest.socket) destfd = dest.socket.fd;
else if (dest.fd) destfd = dest.fd;
if (this.upgradable && destfd) {
console.log('sendfile')
console.log(this.source.bufferSize)
console.error(this.source)
// this.source.end();
var p = this.source.path;
var m = this.source.mode;
dest.socket.flush();
fs.stat(p, function (e, stat) {
fs.open(p, m, 0, function(err, fd) {
console.dir(fd)
fs.sendfile(fd, destfd, 0, stat.size, function () {
dest.end();
});
})
})
delete this.source
return dest
} else {
console.log('pipeing')
return this.source.pipe.apply(this.source, arguments);
}
}
UpgradableStream.prototype.write = function () {}
UpgradableStream.prototype.end = function () {}
UpgradableStream.prototype.resume = function () {}
exports.UpgradableStream = UpgradableStream