-
Notifications
You must be signed in to change notification settings - Fork 24
/
index.js
94 lines (81 loc) · 2.25 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*!
* csv-stream
* Copyright(c) 2012 HipSnip Limited
* Author Rémy Loubradou <[email protected]>
* MIT Licensed
*/
/**
* Modules dependencies
*/
var Stream = require('stream'),
util = require('util'),
Parser = require('./lib/parser');
exports.createStream = function(options){
return new CSVStream(options || {});
}
function CSVStream(options){
var self = this;
Stream.call(this);
// States
this.writable = true;
this.readable = true;
this._paused = false;
this._ended = false;
this._destroyed = false;
this._endCallWhenPause = false;
// Buffer
this._buffer = new Buffer(0);
this._encoding = undefined; // Encoding needs to be undefined for Buffer.toString method
// CSV parser
this._parser = new Parser(options);
this._parser.on('data',function(data){
if(self._ended) throw new Error('Must not emit data event after emittion of end event.')
self.emit('data',data);
});
this._parser.on('column',function(key,value){
self.emit('column',key,value);
});
this._parser.on('header',function(header){
self.emit('header',header);
});
this._parser.on('end',function(){
self._ended = true;
self.readable = false;
self.emit('end');
});
}
util.inherits(CSVStream,Stream);
CSVStream.prototype.write = function(buffer,encoding){
this._encoding = encoding || this._encoding;
if(this._ended) throw new Error('Cannot write after end has been called.');
if(buffer) this._buffer = Buffer.concat([this._buffer, buffer], this._buffer.length + buffer.length);
if(this._paused) return false;
this._parser.parse(this._buffer.toString(this._encoding));
this._buffer = new Buffer(0);
return !this._paused;
}
CSVStream.prototype.end = function(buffer,encoding){
if(this._buffer || buffer){
if(this.write(buffer,encoding)){
this.writable = false;
this._parser.end();
if(!this._destroyed) this.destroy();
}else{
this._endCallWhenPause = true;
}
}
}
CSVStream.prototype.destroy = function(){
this._buffer = null;
this._destroyed = true;
this.emit('close');
}
CSVStream.prototype.pause = function(){
this._paused = true;
}
CSVStream.prototype.resume = function(){
this._paused = false;
if(this._buffer && this._buffer.length > 0 && !this._endCallWhenPause) this.write();
if(this._endCallWhenPause) this.end();
this.emit('drain');
}