-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
158 lines (144 loc) · 4.52 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
var FlumeViewVector = require('./core')
var hash = require('string-hash')
var bipf = require('bipf')
var AtomicFile = require('atomic-file')
var hash = require('./hash')
var path = require('path')
var pull = require('pull-stream')
var Through = require('push-stream/throughs/through')
const IS_DEFINED = 1, STRING = 2, OBJECT = 4
function hash_array (p) {
var v = 0
for(var i = 0; i < p.length; i++)
v = hash.update(v, p[i])
return v
}
function pushCont (cont) {
var stream
cont(function (err, _stream) {
if(err) _stream = new Empty(err)
if(!stream) stream = _stream
else _stream.pipe(stream).resume()
})
return stream = stream || new Through()
}
function isEmpty (o) {
for(var k in o) return false
return true
}
//TODO: because we use a hash table, sometimes two indexes can collide.
//to save storing the keys on the many many indexes, we can just check the output
//it can also be used for bruteforce scans of the raw log.
function createFilter(query) {
return function (buffer, start) {}
}
function createIndexer (indexed) {
return function (buf, seq, add) {
if(isEmpty(indexed)) return
//by using bipf encoding, and an incremental hash function
//indexing doesn't allocate anything on the heap.
//(not counting when it actually indexes) so this is really fast!
;(function recurse (p, hash_value) {
bipf.iterate(buf, p, function (_, _value, _key) {
var type = bipf.getEncodedType(buf, _value)
var key_hash = hash.update_bipf(hash_value, buf, _key)
var index_type = indexed[key_hash]
if(!index_type) return
if(index_type & IS_DEFINED) add(hash.update(key_hash, 'null'))
if(type === bipf.types.string) {
if(index_type & STRING && bipf.getEncodedLength(buf, _value) < 100) {
add(hash.update_bipf(key_hash, buf, _value))
}
}
else if(type == bipf.types.object) {
recurse(_value, key_hash)
}
})
})(0, 0)
}
}
module.exports = function () {
return function (log, name) {
var indexed, optimistic_indexed = {}
var af = AtomicFile(path.join(path.dirname(log.filename), name, 'indexed.json'))
var vectors = FlumeViewVector(1, hash, createIndexer(indexed), true)(log, name)
af.get(function (err, _indexed) {
indexed = _indexed || {}
})
var updating = false
function isEmpty (o) {
for(var k in o) return false
return true
}
function query (opts) {
//check if any indexes need to be added, and arn't already being added.
//when update is finished, perform query
var q = opts.query, toIndex = {}
;(function recurse (q) {
//[AND|OR|DIFF, terms...] add terms to index. AND|OR|DIFF doesn't matter
if(!Array.isArray(q)) throw new Error('invalid query:'+JSON.stringify(q))
if(q[0] !== 'EQ') {
for(var i = 1; i < q.length; i++)
recurse(q[i])
}
else {
var [_EQ, path, value] = q
var p = hash_array(path)
if(!(STRING & (indexed[p] | optimistic_indexed[p]))) {
toIndex[p] = toIndex[p] | STRING
var _path = []
for(var i = 0; i < path.length-1; i++) {
_path.push(path[i])
var p2 = hash_array(_path)
toIndex[p2] = toIndex[p2] | OBJECT
}
}
}
})(q)
if(isEmpty(toIndex)) {
return vectors.query(opts) //we don't need to index anything
}
else {
//copy new terms into optimistic_index (indexed terms, and terms being current added)
for(var k in toIndex)
optimistic_indexed[k] = toIndex[k]
}
return pushCont(function (cb) {
vectors.update(createIndexer(toIndex), function () {
for(var k in toIndex) {
indexed[k] = indexed[k] | toIndex[k]
// delete optimistic_indexed[k]
}
af.set(indexed, function () {})
cb(null, vectors.query(opts))
})
})
}
return {
methods: {
query: 'sync'
},
createSink: function (cb) {
return function (read) {
af.get(function () {
pull(read, vectors.createSink(cb))
})
}
},
since: vectors.since,
query: function (opts) {
if(indexed) {
this.query = query
return query(opts)
}
else {
return pushCont(function (cb) {
af.get(function () {
cb(null, query(opts))
})
})
}
}
}
}
}