-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathmoo.js
212 lines (192 loc) · 7.46 KB
/
moo.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
/*
MOO uses the web socket to talk to the core via the provided API endpoints
*/
"use strict";
function Moo(transport) {
this.transport = transport;
this.transport.moo = this;
this.reqid = 0;
this.subkey = 0;
this.requests = {};
this.mooid = Moo._counter++;
this.logger = transport.logger;
}
Moo._counter = 0;
Moo.prototype._subscribe_helper = function(svcname, reqname, cb) {
var subscription_args = {};
if (arguments.length == 4) {
cb = arguments[3];
subscription_args = arguments[2];
}
var self = this;
var subkey = self.subkey++;
subscription_args.subscription_key = subkey;
self.send_request(svcname + "/subscribe_" + reqname, subscription_args,
function (msg, body) {
if (cb)
cb(msg && msg.name == "Success" ? false : (msg ? msg.name : "NetworkError"), body);
});
return {
unsubscribe: function(ucb) {
self.send_request(svcname + "/unsubscribe_" + reqname,
{ subscription_key: subkey },
ucb);
}
};
};
Moo.prototype.send_request = function() {
var name;
var body;
var content_type;
var cb;
var i = 0;
name = arguments[i++];
if (typeof(arguments[i]) != 'function') { body = arguments[i++]; }
if (typeof(arguments[i]) != 'function') { content_type = arguments[i++]; }
cb = arguments[i++];
var origbody = body;
if (typeof(body) == 'undefined') {
// nothing needed here
} else if (!Buffer.isBuffer(body)) {
body = Buffer.from(JSON.stringify(body), 'utf8');
content_type = content_type || "application/json";
} else {
throw new Error("missing content_type");
}
let header = 'MOO/1 REQUEST ' + name + '\n' +
'Request-Id: ' + this.reqid + '\n';
if (body) {
header += 'Content-Length: ' + body.length + '\n' +
'Content-Type: ' + content_type + '\n';
}
this.logger.log('-> REQUEST', this.reqid, name, origbody ? JSON.stringify(origbody) : "");
const m = Buffer.from(header + '\n');
if (body)
this.transport.send(Buffer.concat([ m, body ], m.length + body.length));
else
this.transport.send(m);
this.requests[this.reqid] = { cb: cb };
this.reqid++;
};
Moo.prototype.parse = function(buf) {
var e = 0;
var s = 0;
var msg = {
headers: {}
};
if ((typeof ArrayBuffer != 'undefined') && (buf instanceof (ArrayBuffer))) {
// convert to Node Buffer
var view = new Uint8Array(buf);
var buf = new Buffer(buf.byteLength);
for (var i = 0; i < buf.length; ++i) buf[i] = view[i];
}
if (buf.length == 0) {
this.logger.log("MOO: empty message received");
return undefined;
}
var state;
while (e < buf.length) {
if (buf[e] == 0xa) { // looking to parse lines -- s == start of line, e == end of line
// parsing headers or first line?
if (state == 'header') {
if (s == e) { // is blank line? blank line is end of headers
// end of MOO header
if (msg.request_id === undefined) {
this.logger.log('MOO: missing Request-Id header: ', msg);
return undefined;
}
if (msg.content_length === undefined) {
if (msg.content_type) {
this.logger.log("MOO: bad message; has Content-Type but not Content-Length: ", msg);
return undefined;
}
if (e != buf.length - 1) {
this.logger.log("MOO: bad message; has no Content-Length, but data after headers: ", msg);
return undefined;
}
} else {
if (msg.content_length > 0) {
if (!msg.content_type) {
this.logger.log("MOO: bad message; has Content-Length but not Content-Type: ", msg);
return undefined;
} else if (msg.content_type == "application/json") {
var json = buf.toString('utf8', e+1, e+1+msg.content_length);
try {
msg.body = JSON.parse(json);
} catch (e) {
this.logger.log("MOO: bad json body: ", json, msg);
return undefined;
}
} else {
msg.body = buf.slice(e+1, e+1+msg.content_length);
}
}
}
return msg;
} else {
// parse MOO header line
var line = buf.toString('utf8', s, e);
var matches = line.match(/([^:]+): *(.*)/);
if (matches) {
if (matches[1] == "Content-Type")
msg.content_type = matches[2];
else if (matches[1] == "Content-Length")
msg.content_length = parseInt(matches[2]);
else if (matches[1] == "Request-Id")
msg.request_id = matches[2];
else
msg.headers[matches[1]] = matches[2];
} else {
this.logger.log("MOO: bad header line: ", line, msg);
return undefined;
}
}
} else {
// parse MOO first line
var line = buf.toString('utf8', s, e);
var matches = line.match(/^MOO\/([0-9]+) ([A-Z]+) (.*)/);
if (matches) {
msg.verb = matches[2];
if (msg.verb == "REQUEST") {
matches = matches[3].match(/([^\/]+)\/(.*)/);
if (matches) {
msg.service = matches[1];
msg.name = matches[2];
} else {
this.logger.log("MOO: bad first line: ", line, msg);
return undefined;
}
} else {
msg.name = matches[3];
}
state = 'header';
} else {
this.logger.log("MOO: bad first line: ", line, msg);
return undefined;
}
}
s = e+1;
}
e++;
}
this.logger.log("MOO: message lacks newline in header");
return undefined;
};
Moo.prototype.handle_response = function(msg, body) {
let req = this.requests[msg.request_id];
if (!req) {
this.logger.log("MOO: can not handle RESPONSE due to unknown Request-Id: ", msg);
return false;
}
if (req.cb) req.cb(msg, body);
if (msg.verb == "COMPLETE") delete(this.requests[msg.request_id]);
return true;
};
Moo.prototype.clean_up = function() {
Object.keys(this.requests).forEach(e => {
let cb = this.requests[e].cb;
if (cb) cb();
});
this.requests = {};
};
exports = module.exports = Moo;