Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sqlite #4

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
WIP
  • Loading branch information
ronag committed Aug 17, 2024
commit 01c6c72d55374c971d5b9444944c2d7604748c1c
262 changes: 96 additions & 166 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
@@ -1,60 +1,39 @@
import assert from 'node:assert'
import { LRUCache } from 'lru-cache'
import { DecoratorHandler, parseHeaders, parseCacheControl } from '../utils.js'
import { DatabaseSync } from 'node:sqlite' // --experimental-sqlite
import * as BJSON from 'buffer-json'

class CacheHandler extends DecoratorHandler {
#handler
#store
#key
#opts
#value

constructor({ key, handler, store }) {
constructor({ key, handler, store, opts }) {
super(handler)

this.#key = key
this.#handler = handler
this.#store = store
this.#opts = opts
}

onConnect(abort) {
console.log('onConnect abort')
console.log(abort)

this.#value = null

return this.#handler.onConnect(abort)
}

onHeaders(statusCode, rawHeaders, resume, statusMessage, headers = parseHeaders(rawHeaders)) {
console.log('onHeaders')
console.log({ statusCode, rawHeaders, resume, statusMessage, headers })

if (statusCode !== 307) {
if (statusCode !== 307 || statusCode !== 200) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (statusCode !== 307 || statusCode !== 200) {
if (statusCode !== 307 && statusCode !== 200) {

return this.#handler.onHeaders(statusCode, rawHeaders, resume, statusMessage, headers)
}

// TODO (fix): Support vary header.
const cacheControl = parseCacheControl(headers['cache-control'])

const contentLength = headers['content-length'] ? Number(headers['content-length']) : Infinity
const maxEntrySize = this.#store.maxEntrySize ?? Infinity

console.log({ cacheControl, contentLength, maxEntrySize })

console.log('onHeaders if statement match:')

console.log(
contentLength < maxEntrySize &&
cacheControl &&
cacheControl.public &&
!cacheControl.private &&
!cacheControl['no-store'] &&
!cacheControl['no-cache'] &&
!cacheControl['must-understand'] &&
!cacheControl['must-revalidate'] &&
!cacheControl['proxy-revalidate'],
)

if (
contentLength < maxEntrySize &&
cacheControl &&
@@ -73,8 +52,6 @@ class CacheHandler extends DecoratorHandler {
? 31556952 // 1 year
: Number(maxAge)

console.log({ ttl, maxAge, cacheControl, contentLength, maxEntrySize })

if (ttl > 0) {
this.#value = {
data: {
@@ -88,16 +65,10 @@ class CacheHandler extends DecoratorHandler {
(rawHeaders?.reduce((xs, x) => xs + x.length, 0) ?? 0) +
(statusMessage?.length ?? 0) +
64,
ttl: ttl * 1e3,
expires: Date.now() + ttl,
}
}

console.log({ thisvalue: this.#value })
}

console.log('onHeaders, finish:')
console.log({ statusCode, rawHeaders, resume, statusMessage, headers })

return this.#handler.onHeaders(statusCode, rawHeaders, resume, statusMessage, headers)
}

@@ -116,34 +87,25 @@ class CacheHandler extends DecoratorHandler {
}

onComplete(rawTrailers) {
console.log('onComplete this:')
console.log({ thisvalue: this.#value })
console.log({ thisstore: this.#store }) // CacheStore{}
console.log({ thishandler: this.#handler }) // RequestHandler{}
console.log({ thishandlervalue: this.#handler.value })
console.log({ this: this })
if (this.#value) {
this.#value.data.rawTrailers = rawTrailers
this.#value.size += rawTrailers?.reduce((xs, x) => xs + x.length, 0) ?? 0

const opts = this.#handler.opts
const entries = this.#handler.entries
console.log('onComplete this:')
console.log({ opts, entries })

const reqHeaders = this.#handler.opts
const reqHeaders = this.#opts
const resHeaders = parseHeaders(this.#value.data.rawHeaders)

const vary = formatVaryData(resHeaders, reqHeaders)

console.log({ vary })
// Early return if Vary = *, uncacheable.
if (resHeaders.vary === '*') {
return this.#handler.onComplete(rawTrailers)
}

this.#value.vary = vary
this.#value.data.rawTrailers = rawTrailers
this.#value.size = this.#value.size
? this.#value.size + rawTrailers?.reduce((xs, x) => xs + x.length, 0)
: 0

console.log({ entries })
this.#value.vary = formatVaryData(resHeaders, reqHeaders)

this.#store.set(this.#key, entries.push(this.#value))
this.#store.set(this.#key, this.#value)
}

return this.#handler.onComplete(rawTrailers)
}
}
@@ -152,61 +114,86 @@ function formatVaryData(resHeaders, reqHeaders) {
return resHeaders.vary
?.split(',')
.map((key) => key.trim().toLowerCase())
.map((key) => [key, reqHeaders[key]])
.map((key) => [key, reqHeaders[key] ?? ''])
.filter(([, val]) => val)
}

// TODO (fix): Async filesystem cache.
class CacheStore {
constructor({ maxSize = 1024 * 1024, maxEntrySize = 128 * 1024 }) {
this.maxSize = maxSize
this.maxEntrySize = maxEntrySize
this.cache = new LRUCache({ maxSize })
export class CacheStore {
#database

#insertquery
#getQuery
#purgeQuery

#size = 0
#maxSize = 128e9

constructor(location = ':memory:', opts) {
// TODO (fix): Validate args...

this.#maxSize = opts.maxSize ?? this.#maxSize
this.#database = new DatabaseSync(location)

this.#database.exec(`
CREATE TABLE IF NOT EXISTS cacheInterceptor(
key TEXT,
data TEXT,
vary TEXT,
size INTEGER,
expires INTEGER
) STRICT
`)

this.#insertquery = this.#database.prepare(
'INSERT INTO cacheInterceptor (key, data, vary, size, expires) VALUES (?, ?, ?, ?, ?)',
)

this.#getQuery = this.#database.prepare(
'SELECT * FROM cacheInterceptor WHERE key = ? AND expires > ? ',
)

this.#purgeQuery = this.#database.prepare('DELETE FROM cacheInterceptor WHERE expires < ?')

this.#maybePurge()
}

set(key, value, opts) {
this.cache.set(key, value, opts)
set(key, { data, vary, size, expires }) {
this.#insertquery.run(key, JSON.stringify(data), BJSON.stringify(vary), size, expires)

this.#size += size
this.#maybePurge()
}

get(key) {
return this.cache.get(key)
return this.#getQuery.all(key, Date.now()).map(({ data, vary, size, expires }) => ({
data: BJSON.parse(data),
vary: JSON.parse(vary),
size: parseInt(size), // TODO (fix): Is parseInt necessary?
expires: parseInt(expires), // TODO (fix): Is parseInt necessary?
}))
}
}

function findEntryByHeaders(entries, reqHeaders) {
// Sort entries by number of vary headers in descending order, because
// we want to compare the most complex response to the request first.
entries.sort((a, b) => {
const lengthA = a.vary ? a.vary.length : 0
const lengthB = b.vary ? b.vary.length : 0
return lengthB - lengthA
})

console.log('Sort entries')
console.log({ entries })
close() {
this.#database.close()
}

console.log('reqHeaders')
console.log({ reqHeaders })
#maybePurge() {
if (this.#size == null || this.#size > this.#maxSize) {
this.#purgeQuery.run(Date.now())
this.#size = this.#database.exec('SELECT SUM(size) FROM cacheInterceptor')[0].values[0][0]
}
}
}

function findEntryByHeaders(entries, reqHeaders) {
return entries?.find(
(entry) =>
entry.vary?.every(([key, val]) => {
console.log(`reqHeaders[${key}] === ${val}`)
console.log({ reqHeadersval: reqHeaders[key] })
return reqHeaders[key] === val
}) ?? true,
(entry) => entry.vary?.every(([key, val]) => reqHeaders?.headers[key] === val) ?? true,
)
}

const DEFAULT_CACHE_STORE = new CacheStore({ maxSize: 128 * 1024, maxEntrySize: 1024 })
const DEFAULT_CACHE_STORE = new CacheStore()

export default (opts) => (dispatch) => (opts, handler) => {
console.log('cache dispatcher:')
console.log(dispatch)
console.log('opts:')
console.log(opts)
console.log('handler:')
console.log(handler)

if (!opts.cache || opts.upgrade) {
return dispatch(opts, handler)
}
@@ -235,85 +222,27 @@ export default (opts) => (dispatch) => (opts, handler) => {
// Dump body...
opts.body?.on('error', () => {}).resume()

opts.host = opts.host ?? new URL(opts.origin).host

if (!opts.headers) {
opts.headers = {}
}

// idea: use DEFAULT_CACHE_STORE by default if 'cache' not specified, since the cache interceptor was already specified to be used.
const store = opts.cache === true ? DEFAULT_CACHE_STORE : opts.cache

if (!store) {
throw new Error(`Cache store not provided.`)
}

let key = `${opts.method}:${opts.path}`
console.log('getting key: ' + key)
let entries = store.get(key)
const key = `${opts.method}:${opts.path}`

if (Array.isArray(entries) && entries.length === 0 && opts.method === 'HEAD') {
key = `GET:${opts.path}`
entries = store.get(key)
}
const entries = store.get(key) ?? (opts.method === 'HEAD' ? store.get(`GET:${opts.path}`) : null)

// testing
const rawHeaders = [
Buffer.from('Content-Type'),
Buffer.from('application/json'),
Buffer.from('Content-Length'),
Buffer.from('10'),
Buffer.from('Cache-Control'),
Buffer.from('public'),
]
// // cannot get the cache to work inside the test, so I hardcode the entries here
entries = [
{
statusCode: 200,
statusMessage: '',
rawHeaders,
rawTrailers: ['Hello'],
body: ['asd1'],
vary: [
['Accept', 'application/xml'],
['User-Agent', 'Mozilla/5.0'],
],
},
{
statusCode: 200,
statusMessage: '',
rawHeaders,
rawTrailers: ['Hello'],
body: ['asd2'],
vary: [
['Accept', 'application/txt'],
['User-Agent', 'Chrome'],
['origin2', 'www.google.com/images'],
],
},
// {
// statusCode: 200, statusMessage: 'last', rawHeaders, rawTrailers: ['Hello'], body: ['asd3'],
// vary: null },
{
statusCode: 200,
statusMessage: 'first',
rawHeaders,
rawTrailers: ['Hello'],
body: ['asd4'],
vary: [
['Accept', 'application/json'],
['User-Agent', 'Mozilla/5.0'],
['host2', 'www.google.com'],
['origin2', 'www.google.com/images'],
],
},
]

// *testing

// Find an entry that matches the request, if any
const entry = findEntryByHeaders(entries, opts)

console.log('Entry found:')
console.log({ entry })

// handler.value.vary = 'foobar'

if (entry) {
const { statusCode, statusMessage, rawHeaders, rawTrailers, body } = entry
const { statusCode, statusMessage, rawHeaders, rawTrailers, body } = entry.data
const ac = new AbortController()
const signal = ac.signal

@@ -325,11 +254,14 @@ export default (opts) => (dispatch) => (opts, handler) => {
try {
handler.onConnect(abort)
signal.throwIfAborted()

handler.onHeaders(statusCode, rawHeaders, resume, statusMessage)
signal.throwIfAborted()

if (opts.method !== 'HEAD') {
for (const chunk of body) {
const ret = handler.onData(chunk)

signal.throwIfAborted()
if (ret === false) {
// TODO (fix): back pressure...
@@ -345,8 +277,6 @@ export default (opts) => (dispatch) => (opts, handler) => {

return true
} else {
// handler.opts = opts
// handler.entries = entries
return dispatch(opts, new CacheHandler({ handler, store, key }))
return dispatch(opts, new CacheHandler({ handler, store, key, opts }))
}
}