Skip to content

Commit

Permalink
Merge pull request #1000 from go-kivik/useCache4
Browse files Browse the repository at this point in the history
More work on caching reduce results
  • Loading branch information
flimzy authored Jun 18, 2024
2 parents 3c89b6f + 048e7b8 commit afa4c34
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 24 deletions.
32 changes: 32 additions & 0 deletions x/sqlite/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,35 @@ func (o optsMap) attEncodingInfo() (bool, error) {

const defaultWhereCap = 3

// buildReduceCacheWhere returns WHERE conditions for use when querying the
// reduce cache.
func (v viewOptions) buildReduceCacheWhere(args *[]any) []string {
where := make([]string, 0, defaultWhereCap)
if v.endkey != "" {
op := endKeyOp(v.descending, v.inclusiveEnd)
where = append(where, fmt.Sprintf("view.last_key %s $%d", op, len(*args)+1))
*args = append(*args, v.endkey)
}
if v.startkey != "" {
op := startKeyOp(v.descending)
where = append(where, fmt.Sprintf("view.first_key %s $%d", op, len(*args)+1))
*args = append(*args, v.startkey)
}
if v.key != "" {
idx := strconv.Itoa(len(*args) + 1)
where = append(where, "view.last_key = $"+idx, "view.first_key = $"+idx)
*args = append(*args, v.key)
}
if len(v.keys) > 0 {
for _, key := range v.keys {
idx := strconv.Itoa(len(*args) + 1)
where = append(where, "view.last_key = $"+idx, "view.first_key = $"+idx)
*args = append(*args, key)
}
}
return where
}

// buildGroupWhere returns WHERE conditions for use with grouping.
func (v viewOptions) buildGroupWhere(args *[]any) []string {
where := make([]string, 0, defaultWhereCap)
Expand Down Expand Up @@ -752,6 +781,9 @@ func (o optsMap) viewOptions(view string) (*viewOptions, error) {
if err != nil {
return nil, err
}
if len(keys) > 0 && (key != "" || endkey != "" || startkey != "") {
return nil, &internal.Error{Status: http.StatusBadRequest, Message: "`keys` is incompatible with `key`, `start_key` and `end_key`"}
}
sorted, err := o.sorted()
if err != nil {
return nil, err
Expand Down
43 changes: 23 additions & 20 deletions x/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (d *db) performQuery(
}

where := append([]string{""}, vopts.buildWhere(&args)...)
reduceWhere := append([]string{""}, vopts.buildReduceCacheWhere(&args)...)

query := fmt.Sprintf(d.ddocQuery(ddoc, view, rev.String(), leavesCTE+`,
reduce AS (
Expand All @@ -137,8 +138,10 @@ func (d *db) performQuery(
last_pk,
last_key,
value
FROM {{ .Reduce }}
FROM {{ .Reduce }} AS view
JOIN reduce ON reduce.reducible AND ($3 IS NULL OR $3 == TRUE)
WHERE TRUE
%[5]s -- WHERE
)
-- Metadata header
Expand Down Expand Up @@ -185,8 +188,6 @@ func (d *db) performQuery(
NULL, -- rev_pos
NULL -- data
FROM cache
JOIN reduce
WHERE reduce.reducible AND ($3 IS NULL OR $3 == TRUE)
)
UNION ALL
Expand All @@ -197,24 +198,24 @@ func (d *db) performQuery(
FROM (
SELECT
view.id AS id,
view.key AS key,
view.key AS first_key,
view.value AS value,
view.pk AS first,
view.pk AS last,
NULL AS conflicts,
0 AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
view.pk AS first_pk,
view.pk AS last_pk,
NULL, -- conflicts,
0, -- attachment_count,
NULL, -- filename
NULL, -- content_type
NULL, -- length
NULL, -- digest
NULL, -- rev_pos
NULL -- data
FROM {{ .Map }} AS view
JOIN reduce ON reduce.reducible AND ($3 IS NULL OR $3 == TRUE)
LEFT JOIN cache ON view.key >= cache.first_key AND view.key <= cache.last_key
WHERE cache.first_key IS NULL
%[2]s -- WHERE
%[5]s -- ORDER BY
%[1]s -- ORDER BY
)
UNION ALL
Expand Down Expand Up @@ -274,8 +275,7 @@ func (d *db) performQuery(
LEFT JOIN {{ .Attachments }} AS att ON bridge.pk = att.pk
%[1]s -- ORDER BY
)
`), vopts.buildOrderBy(), strings.Join(where, " AND "), vopts.limit, vopts.skip,
vopts.buildOrderBy("pk"))
`), vopts.buildOrderBy("pk"), strings.Join(where, " AND "), vopts.limit, vopts.skip, strings.Join(reduceWhere, " AND "))
results, err := d.db.QueryContext(ctx, query, args...) //nolint:rowserrcheck // Err checked in Next
switch {
case errIsNoSuchTable(err):
Expand Down Expand Up @@ -303,7 +303,7 @@ func (d *db) performQuery(
_ = results.Close() //nolint:sqlclosecheck // invalid option specified for reduce, so abort the query
return nil, &internal.Error{Status: http.StatusBadRequest, Message: "conflicts is invalid for reduce"}
}
result, err := d.reduce(ctx, meta.lastSeq, ddoc, view, rev.String(), results, meta.reduceFuncJS, vopts.reduceGroupLevel())
result, err := d.reduce(ctx, meta.lastSeq, ddoc, view, rev.String(), results, meta.reduceFuncJS, vopts.reduceGroupLevel(), len(vopts.keys) > 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -438,14 +438,14 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi
}
}

result, err := d.reduce(ctx, lastSeq, ddoc, view, rev.String(), results, meta.reduceFuncJS, vopts.reduceGroupLevel())
result, err := d.reduce(ctx, lastSeq, ddoc, view, rev.String(), results, meta.reduceFuncJS, vopts.reduceGroupLevel(), len(vopts.keys) > 0)
if err != nil {
return nil, err
}
return metaReduced{Rows: result, meta: meta}, nil
}

func (d *db) reduce(ctx context.Context, seq int, ddoc, name, rev string, results *sql.Rows, reduceFuncJS string, groupLevel int) (driver.Rows, error) {
func (d *db) reduce(ctx context.Context, seq int, ddoc, name, rev string, results *sql.Rows, reduceFuncJS string, groupLevel int, individualKeys bool) (driver.Rows, error) {
stmt, err := d.db.PrepareContext(ctx, d.ddocQuery(ddoc, name, rev, `
INSERT INTO {{ .Reduce }} (seq, depth, first_key, first_pk, last_key, last_pk, value)
VALUES ($1, $2, $3, $4, $5, $6, $7)
Expand All @@ -468,6 +468,9 @@ func (d *db) reduce(ctx context.Context, seq int, ddoc, name, rev string, result
}
}
}
if individualKeys {
callback = nil
}
return reduce.Reduce(&reduceRowIter{results: results}, reduceFuncJS, d.logger, groupLevel, callback)
}

Expand Down
196 changes: 192 additions & 4 deletions x/sqlite/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2602,18 +2602,206 @@ func TestDBQuery(t *testing.T) {
},
}
})
tests.Add("cache is not useable with endkey", func(t *testing.T) interface{} {
d := newDB(t)
_ = d.tPut("_design/foo", map[string]interface{}{
"views": map[string]interface{}{
"bar": map[string]string{
"map": `function(doc) {
emit(doc._id, [1]);
}`,
"reduce": `_count`,
},
},
})
_ = d.tPut("a", map[string]interface{}{})
_ = d.tPut("b", map[string]interface{}{})
_ = d.tPut("c", map[string]interface{}{})

db := d.underlying()
var table string
if err := db.QueryRow(`
SELECT name
FROM sqlite_master
WHERE type = 'table'
AND name LIKE '%_%_reduce_%'
`).Scan(&table); err != nil {
t.Fatalf("Failed to find reduced table: %s", err)
}
if _, err := db.Exec(fmt.Sprintf(`
INSERT INTO %q (seq, depth, first_key, first_pk, last_key, last_pk, value)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, table), 3, 0, `"a"`, 1, `"c"`, 3, "3"); err != nil {
t.Fatalf("Failed to insert reduced value: %s", err)
}

return test{
db: d,
ddoc: "_design/foo",
view: "_view/bar",
options: kivik.Param("endkey", "b"),
want: []rowResult{{Key: `null`, Value: `2`}},
wantCache: []reduced{
{Seq: 3, Depth: 0, FirstKey: `"a"`, FirstPK: 1, LastKey: `"c"`, LastPK: 3, Value: "3"},
{Seq: 4, Depth: 0, FirstKey: `"a"`, FirstPK: 1, LastKey: `"b"`, LastPK: 2, Value: "2"},
},
}
})
tests.Add("cache is not useable with startkey", func(t *testing.T) interface{} {
d := newDB(t)
_ = d.tPut("_design/foo", map[string]interface{}{
"views": map[string]interface{}{
"bar": map[string]string{
"map": `function(doc) {
emit(doc._id, [1]);
}`,
"reduce": `_count`,
},
},
})
_ = d.tPut("a", map[string]interface{}{})
_ = d.tPut("b", map[string]interface{}{})
_ = d.tPut("c", map[string]interface{}{})

db := d.underlying()
var table string
if err := db.QueryRow(`
SELECT name
FROM sqlite_master
WHERE type = 'table'
AND name LIKE '%_%_reduce_%'
`).Scan(&table); err != nil {
t.Fatalf("Failed to find reduced table: %s", err)
}
if _, err := db.Exec(fmt.Sprintf(`
INSERT INTO %q (seq, depth, first_key, first_pk, last_key, last_pk, value)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, table), 3, 0, `"a"`, 1, `"c"`, 3, "3"); err != nil {
t.Fatalf("Failed to insert reduced value: %s", err)
}

return test{
db: d,
ddoc: "_design/foo",
view: "_view/bar",
options: kivik.Param("startkey", "b"),
want: []rowResult{{Key: `null`, Value: `2`}},
wantCache: []reduced{
{Seq: 3, Depth: 0, FirstKey: `"a"`, FirstPK: 1, LastKey: `"c"`, LastPK: 3, Value: "3"},
{Seq: 4, Depth: 0, FirstKey: `"b"`, FirstPK: 2, LastKey: `"c"`, LastPK: 3, Value: "2"},
},
}
})
tests.Add("cache is not useable with key", func(t *testing.T) interface{} {
d := newDB(t)
_ = d.tPut("_design/foo", map[string]interface{}{
"views": map[string]interface{}{
"bar": map[string]string{
"map": `function(doc) {
emit(doc._id, [1]);
}`,
"reduce": `_count`,
},
},
})
_ = d.tPut("a", map[string]interface{}{})
_ = d.tPut("b", map[string]interface{}{})
_ = d.tPut("c", map[string]interface{}{})

db := d.underlying()
var table string
if err := db.QueryRow(`
SELECT name
FROM sqlite_master
WHERE type = 'table'
AND name LIKE '%_%_reduce_%'
`).Scan(&table); err != nil {
t.Fatalf("Failed to find reduced table: %s", err)
}
if _, err := db.Exec(fmt.Sprintf(`
INSERT INTO %q (seq, depth, first_key, first_pk, last_key, last_pk, value)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, table), 3, 0, `"a"`, 1, `"c"`, 3, "3"); err != nil {
t.Fatalf("Failed to insert reduced value: %s", err)
}

return test{
db: d,
ddoc: "_design/foo",
view: "_view/bar",
options: kivik.Param("key", "b"),
want: []rowResult{{Key: `null`, Value: `1`}},
wantCache: []reduced{
{Seq: 3, Depth: 0, FirstKey: `"a"`, FirstPK: 1, LastKey: `"c"`, LastPK: 3, Value: "3"},
{Seq: 4, Depth: 0, FirstKey: `"b"`, FirstPK: 2, LastKey: `"b"`, LastPK: 2, Value: "1"},
},
}
})
tests.Add("cache is not useable with keys", func(t *testing.T) interface{} {
d := newDB(t)
_ = d.tPut("_design/foo", map[string]interface{}{
"views": map[string]interface{}{
"bar": map[string]string{
"map": `function(doc) {
emit(doc._id, [1]);
}`,
"reduce": `_count`,
},
},
})
_ = d.tPut("a", map[string]interface{}{})
_ = d.tPut("b", map[string]interface{}{})
_ = d.tPut("c", map[string]interface{}{})

db := d.underlying()
var table string
if err := db.QueryRow(`
SELECT name
FROM sqlite_master
WHERE type = 'table'
AND name LIKE '%_%_reduce_%'
`).Scan(&table); err != nil {
t.Fatalf("Failed to find reduced table: %s", err)
}
if _, err := db.Exec(fmt.Sprintf(`
INSERT INTO %q (seq, depth, first_key, first_pk, last_key, last_pk, value)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, table), 3, 0, `"a"`, 1, `"c"`, 3, "3"); err != nil {
t.Fatalf("Failed to insert reduced value: %s", err)
}

return test{
db: d,
ddoc: "_design/foo",
view: "_view/bar",
options: kivik.Param("keys", []string{"b", "c"}),
want: []rowResult{{Key: `null`, Value: `2`}},
wantCache: []reduced{
{Seq: 3, Depth: 0, FirstKey: `"a"`, FirstPK: 1, LastKey: `"c"`, LastPK: 3, Value: "3"},
},
}
})
tests.Add("key + keys", test{
ddoc: "_design/foo",
view: "_view/bar",
options: kivik.Params(map[string]interface{}{"key": "a", "keys": []string{"b", "c"}}),
wantErr: "`keys` is incompatible with `key`, `start_key` and `end_key`",
wantStatus: http.StatusBadRequest,
})

/*
TODO:
- cache invidual keys with keys=[...] + reduce/group
- key + start_key
- key + end_key
- key + startkey + descending
- key + endkey + descending
- reduce cache
- caches created with key, used for range
- inclusive vs non-inclusive end
- different depths
- competing cache depths
- gaps in cache results
- endkey
- startkey
- key
- keys
- update_seq
- inclusive_end
- group
Expand Down

0 comments on commit afa4c34

Please sign in to comment.