Skip to content

Commit

Permalink
tidy up and added DELETE_RAW
Browse files Browse the repository at this point in the history
  • Loading branch information
fergiemcdowall committed Aug 18, 2023
1 parent 7b7f1d5 commit dba3129
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 80 deletions.
16 changes: 7 additions & 9 deletions src/main.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as tokenization from './tokenisationPipeline.js'
import PQueue from 'p-queue'
import { InvertedIndex } from 'fergies-inverted-index'
import { LRUCache } from 'lru-cache'
import { Reader } from './read.js'
Expand Down Expand Up @@ -48,12 +47,7 @@ export class Main {
max: 1000
})

this.w = new Writer(
ops,
this._CACHE,
new PQueue({ concurrency: 1 }),
this.INDEX
)
this.w = new Writer(ops, this._CACHE, this.INDEX)
this.r = new Reader(ops, this._CACHE, this.INDEX)

// TODO: this should be something more sensible like "countDocs"
Expand Down Expand Up @@ -106,8 +100,12 @@ export class Main {
return this.w.DELETE(...id)
}

DISTINCT (...tokens) {
return this.r.DISTINCT(...tokens)
DELETE_RAW (...id) {
return this.w.DELETE_RAW(...id)
}

DISTINCT (...token) {
return this.r.DISTINCT(...token)
}

DICTIONARY (token) {
Expand Down
8 changes: 0 additions & 8 deletions src/tokenisationPipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,3 @@ export const tokenizationPipeline = (tokens, field, ops) =>
.then(STOPWORDS)
.then(SCORE_TERM_FREQUENCY)
.then(([tokens, field, ops]) => tokens)

// export const SPLIT = SPLIT
// export const SKIP = SKIP
// export const LOWACASE = LOWCASE
// export const REPLACE = REPLACE
// export const NGRAMS = NGRAMS
// export const STOPWORDS = STOPWORDS
// export const SCORE_TERM_FREQUENCY = SCORE_TERM_FREQUENCY
124 changes: 61 additions & 63 deletions src/write.js
Original file line number Diff line number Diff line change
@@ -1,110 +1,108 @@
import { DocumentProcessor } from './DocumentProcessor.js'
import PQueue from 'p-queue'

export class Writer {
constructor (ops, cache, queue, ii) {
this.cache = cache
this.ii = ii
this.ops = ops
this.queue = queue
#cache
#ii
#ops
#queue

constructor (ops, cache, ii) {
this.#cache = cache
this.#ii = ii
this.#ops = ops
this.#queue = new PQueue({ concurrency: 1 })
}

incrementDocCount (increment) {
return this.ii.STORE.get(['DOCUMENT_COUNT'])
.then(count => this.ii.STORE.put(['DOCUMENT_COUNT'], +count + increment))
#incrementDocCount (increment) {
return this.#ii.STORE.get(['DOCUMENT_COUNT'])
.then(count => this.#ii.STORE.put(['DOCUMENT_COUNT'], +count + increment))
.catch(
// if not found assume value to be 0
e => this.ii.STORE.put(['DOCUMENT_COUNT'], increment)
e => this.#ii.STORE.put(['DOCUMENT_COUNT'], increment)
)
}

decrementDocCount (decrement) {
return this.ii.STORE.get(['DOCUMENT_COUNT']).then(count =>
this.ii.STORE.put(['DOCUMENT_COUNT'], +count - decrement)
// TODO: should this be a separate function?
#decrementDocCount (decrement) {
return this.#ii.STORE.get(['DOCUMENT_COUNT']).then(count =>
this.#ii.STORE.put(['DOCUMENT_COUNT'], +count - decrement)
)
}

#PUT (docs, putOptions) {
this.cache.clear()

this.#cache.clear()
const ops = {
...this.ops,
...this.#ops,
...putOptions
}

return DocumentProcessor(ops)
.processDocuments(docs)
.then(vectors => {
return this.ii
.PUT(vectors, putOptions, this.ii.LEVEL_OPTIONS)
.then(result => {
return Promise.all([
this.PUT_RAW(
docs,
result.map(r => r._id),
!ops.storeRawDocs
),
this.incrementDocCount(
result.filter(r => r.status === 'CREATED').length
)
]).then(() => result)
})
})
.then(vectors => this.#ii.PUT(vectors, putOptions))
.then(result =>
this.PUT_RAW(
docs,
result.map(r => r._id),
!ops.storeRawDocs
)
.then(() =>
this.#incrementDocCount(
result.filter(r => r.status === 'CREATED').length
)
)
.then(() => result)
)
}

#DELETE (_ids) {
return this.ii.DELETE(_ids, this.ii.LEVEL_OPTIONS).then(result => {
this.cache.clear()
const deleted = result.filter(d => d.status === 'DELETED')
return Promise.all([
Promise.all(
deleted.map(r =>
this.ii.STORE.del(['DOC_RAW', r._id], this.ii.LEVEL_OPTIONS)
return this.#ii.DELETE(_ids).then(result =>
this.DELETE_RAW(..._ids)
.then(() =>
this.#decrementDocCount(
result.filter(d => d.status === 'DELETED').length
)
),
this.decrementDocCount(deleted.length)
]).then(() => result)
})
)
.then(() => this.#cache.clear())
.then(() => result)
)
}

DELETE (...docIds) {
return this.#DELETE(docIds)
}

DELETE_RAW (...docIds) {
return Promise.all(docIds.map(id => this.#ii.STORE.del(['DOC_RAW', id])))
}

FLUSH () {
return this.ii.STORE.clear()
return this.#ii.STORE.clear()
.then(() => {
this.cache.clear()
this.#cache.clear()
const timestamp = Date.now()
return this.ii.STORE.batch(
[
{ type: 'put', key: ['~CREATED'], value: timestamp },
{ type: 'put', key: ['~LAST_UPDATED'], value: timestamp },
{ type: 'put', key: ['DOCUMENT_COUNT'], value: 0 }
],
this.ii.LEVEL_OPTIONS
)
return this.#ii.STORE.batch([
{ type: 'put', key: ['~CREATED'], value: timestamp },
{ type: 'put', key: ['~LAST_UPDATED'], value: timestamp },
{ type: 'put', key: ['DOCUMENT_COUNT'], value: 0 }
])
})
.then(() => true)
}

IMPORT (index) {
this.cache.clear()
return Promise.resolve(this.ii.IMPORT(index))
this.#cache.clear()
return Promise.resolve(this.#ii.IMPORT(index))
}

PUT (docs, pops) {
return this.queue.add(() => this.#PUT(docs, pops))
return this.#queue.add(() => this.#PUT(docs, pops))
}

PUT_RAW (docs, ids, dontStoreValue) {
this.cache.clear()
this.#cache.clear()
return Promise.all(
docs.map((doc, i) =>
this.ii.STORE.put(
['DOC_RAW', ids[i]],
dontStoreValue ? {} : doc,
this.ii.LEVEL_OPTIONS
)
this.#ii.STORE.put(['DOC_RAW', ids[i]], dontStoreValue ? {} : doc)
)
).then(
// TODO: make this actually deal with errors
Expand All @@ -119,6 +117,6 @@ export class Writer {

// TODO: does this need to be exported?
_INCREMENT_DOC_COUNT (increment) {
return this.incrementDocCount(increment)
return this.#incrementDocCount(increment)
}
}

0 comments on commit dba3129

Please sign in to comment.