-
Notifications
You must be signed in to change notification settings - Fork 47
/
index.js
268 lines (227 loc) · 6.41 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
'use strict';
var exec = require('child_process').exec
, spawn = require('child_process').spawn
, path = require('path')
, domain = require('domain')
, d = domain.create();
/**
* log
*
* Logs a message to the console with a tag.
*
* @param message the message to log
* @param tag (optional) the tag to log with.
*/
function log(message, tag) {
var util = require('util')
, color = require('cli-color')
, tags, currentTag;
tag = tag || 'info';
tags = {
error: color.red.bold,
warn: color.yellow,
info: color.cyanBright
};
currentTag = tags[tag] || function(str) { return str; };
util.log((currentTag("[" + tag + "] ") + message).replace(/(\n|\r|\r\n)$/, ''));
}
/**
* getArchiveName
*
* Returns the archive name in database_YYYY_MM_DD.tar.gz format.
*
* @param databaseName The name of the database
*/
function getArchiveName(databaseName) {
var date = new Date()
, datestring;
datestring = [
databaseName,
date.getFullYear(),
date.getMonth() + 1,
date.getDate(),
date.getTime()
];
return datestring.join('_') + '.tar.gz';
}
/* removeRF
*
* Remove a file or directory. (Recursive, forced)
*
* @param target path to the file or directory
* @param callback callback(error)
*/
function removeRF(target, callback) {
var fs = require('fs');
callback = callback || function() { };
fs.exists(target, function(exists) {
if (!exists) {
return callback(null);
}
log("Removing " + target, 'info');
exec( 'rm -rf ' + target, callback);
});
}
/**
* mongoDump
*
* Calls mongodump on a specified database.
*
* @param options MongoDB connection options [host, port, username, password, db]
* @param directory Directory to dump the database to
* @param callback callback(err)
*/
function mongoDump(options, directory, callback) {
var mongodump
, mongoOptions;
callback = callback || function() { };
mongoOptions= [
'-h', options.host + ':' + options.port,
'-d', options.db,
'-o', directory
];
if(options.username && options.password) {
mongoOptions.push('-u');
mongoOptions.push(options.username);
mongoOptions.push('-p');
mongoOptions.push(options.password);
}
log('Starting mongodump of ' + options.db, 'info');
mongodump = spawn('mongodump', mongoOptions);
mongodump.stdout.on('data', function (data) {
log(data);
});
mongodump.stderr.on('data', function (data) {
log(data, 'error');
});
mongodump.on('exit', function (code) {
if(code === 0) {
log('mongodump executed successfully', 'info');
callback(null);
} else {
callback(new Error("Mongodump exited with code " + code));
}
});
}
/**
* compressDirectory
*
* Compressed the directory so we can upload it to S3.
*
* @param directory current working directory
* @param input path to input file or directory
* @param output path to output archive
* @param callback callback(err)
*/
function compressDirectory(directory, input, output, callback) {
var tar
, tarOptions;
callback = callback || function() { };
tarOptions = [
'-zcf',
output,
input
];
log('Starting compression of ' + input + ' into ' + output, 'info');
tar = spawn('tar', tarOptions, { cwd: directory });
tar.stderr.on('data', function (data) {
log(data, 'error');
});
tar.on('exit', function (code) {
if(code === 0) {
log('successfully compress directory', 'info');
callback(null);
} else {
callback(new Error("Tar exited with code " + code));
}
});
}
/**
* sendToS3
*
* Sends a file or directory to S3.
*
* @param options s3 options [key, secret, bucket]
* @param directory directory containing file or directory to upload
* @param target file or directory to upload
* @param callback callback(err)
*/
function sendToS3(options, directory, target, callback) {
var knox = require('knox')
, sourceFile = path.join(directory, target)
, s3client
, destination = options.destination || '/'
, headers = {};
callback = callback || function() { };
// Deleting destination because it's not an explicitly named knox option
delete options.destination;
s3client = knox.createClient(options);
if (options.encrypt)
headers = {"x-amz-server-side-encryption": "AES256"}
log('Attemping to upload ' + target + ' to the ' + options.bucket + ' s3 bucket');
s3client.putFile(sourceFile, path.join(destination, target), headers, function(err, res){
if(err) {
return callback(err);
}
res.setEncoding('utf8');
res.on('data', function(chunk){
if(res.statusCode !== 200) {
log(chunk, 'error');
} else {
log(chunk);
}
});
res.on('end', function(chunk) {
if (res.statusCode !== 200) {
return callback(new Error('Expected a 200 response from S3, got ' + res.statusCode));
}
log('Successfully uploaded to s3');
return callback();
});
});
}
/**
* sync
*
* Performs a mongodump on a specified database, gzips the data,
* and uploads it to s3.
*
* @param mongodbConfig mongodb config [host, port, username, password, db]
* @param s3Config s3 config [key, secret, bucket]
* @param callback callback(err)
*/
function sync(mongodbConfig, s3Config, callback) {
var tmpDir = path.join(require('os').tmpDir(), 'mongodb_s3_backup')
, backupDir = path.join(tmpDir, mongodbConfig.db)
, archiveName = getArchiveName(mongodbConfig.db)
, async = require('async')
, tmpDirCleanupFns;
callback = callback || function() { };
tmpDirCleanupFns = [
async.apply(removeRF, backupDir),
async.apply(removeRF, path.join(tmpDir, archiveName))
];
async.series(tmpDirCleanupFns.concat([
async.apply(mongoDump, mongodbConfig, tmpDir),
async.apply(compressDirectory, tmpDir, mongodbConfig.db, archiveName),
d.bind(async.apply(sendToS3, s3Config, tmpDir, archiveName)) // this function sometimes throws EPIPE errors
]), function(err) {
if(err) {
log(err, 'error');
} else {
log('Successfully backed up ' + mongodbConfig.db);
}
// cleanup folders
async.series(tmpDirCleanupFns, function() {
return callback(err);
});
});
// this cleans up folders in case of EPIPE error from AWS connection
d.on('error', function(err) {
d.exit()
async.series(tmpDirCleanupFns, function() {
throw(err);
});
});
}
module.exports = { sync: sync, log: log };