-
Notifications
You must be signed in to change notification settings - Fork 23
/
server.js
executable file
·99 lines (87 loc) · 2.97 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env node
const express = require('express');
const app = express();
const bodyParser = require('body-parser');
app.use(bodyParser.json()); // for parsing application/json
const logger = new (require('service-logger'))(__filename);
const MongoStream = require('./lib/mongo-stream');
const CollectionManager = require('./lib/CollectionManager');
let mongoStream;
const config = require('./lib/configParser');
// returns the status of all collectionManagers currently running
app.get('/', (request, response) => {
const collectionManagers = Object.values(mongoStream.collectionManagers);
const responseBody = { total: collectionManagers.length };
collectionManagers.forEach(manager => {
if (manager.changeStream) {
responseBody[manager.collection] = 'Listening';
} else {
responseBody[manager.collection] = 'Not Listening';
}
});
response.send(responseBody);
});
// returns the mappings of all collectionManagers currently running
app.get('/mappings', (request, response) => {
response.send(mongoStream.elasticManager.mappings);
});
app.post('/collection-manager?', (request, response) => {
logger.info(request.body);
const collections = request.body.collections;
const managerOptions = {
dump: request.body.dump,
ignoreResumeTokens: request.body.ignoreResumeTokens,
ignoreDumpProgress: request.body.ignoreDumpProgress,
watch: request.body.watch
};
return mongoStream.addCollectionManager(collections, managerOptions)
.then((results) => {
response.send(results);
}).catch(err => {
logger.error(`Error posting collection-manager:`, err);
response.send(err);
});
});
// toggle dump process
app.put('/dump/:toggle', (request, response) => {
switch(request.params.toggle) {
case 'pause':
CollectionManager.pauseDump();
response.send('Dump paused. To resume, use "/dump/resume"');
break;
case 'resume':
CollectionManager.resumeDump();
response.send('Dump resumed.');
break;
default:
response.send(`ERROR: unknown dump option "${request.params.toggle}"`);
break;
}
});
// triggers a remove for the specified collections
app.delete('/collection-manager/:collections?', (request, response) => {
const collections = request.params.collections.split(',');
logger.info(`Deleting collections: ${collections}`);
return mongoStream.removeCollectionManager(collections)
.then(results => {
logger.info(`Remaining collections after Delete: ${results}`);
response.send(results);
}).catch(err => {
response.send(err);
});
});
app.listen(config.adminPort, (err) => {
if (err) {
return logger.error(`Error listening on ${config.adminPort}:`, err);
}
MongoStream.init(config)
.then((stream) => {
logger.info('connected');
mongoStream = stream;
})
.catch((err) => {
logger.error(`Error Creating MongoStream:`, err);
process.exit();
});
logger.info(`server is listening on port ${config.adminPort}`);
});