diff --git a/x/sqlite/designdocs.go b/x/sqlite/designdocs.go index 75285e217..123a7b457 100644 --- a/x/sqlite/designdocs.go +++ b/x/sqlite/designdocs.go @@ -64,7 +64,7 @@ func (d *db) updateDesignDoc(ctx context.Context, tx *sql.Tx, rev revision, data } func (d *db) createViewMap(ctx context.Context, tx *sql.Tx, ddoc, name, rev string) error { - for _, query := range viewMapSchema { + for _, query := range viewSchema { if _, err := tx.ExecContext(ctx, d.ddocQuery(ddoc, name, rev, query)); err != nil { return err } diff --git a/x/sqlite/options.go b/x/sqlite/options.go index 295012d28..d2b75da52 100644 --- a/x/sqlite/options.go +++ b/x/sqlite/options.go @@ -162,39 +162,43 @@ func toUint64(in interface{}, msg string) (uint64, error) { } } -func toBool(in interface{}) bool { +func toBool(in interface{}) (value bool, ok bool) { switch t := in.(type) { case bool: - return t + return t, true case string: - b, _ := strconv.ParseBool(t) - return b + b, err := strconv.ParseBool(t) + return b, err == nil default: - return false + return false, false } } func (o optsMap) direction() string { - if toBool(o["descending"]) { + if v, _ := toBool(o["descending"]); v { return "DESC" } return "ASC" } func (o optsMap) includeDocs() bool { - return toBool(o["include_docs"]) + v, _ := toBool(o["include_docs"]) + return v } func (o optsMap) attachments() bool { - return toBool(o["attachments"]) + v, _ := toBool(o["attachments"]) + return v } func (o optsMap) latest() bool { - return toBool(o["latest"]) + v, _ := toBool(o["latest"]) + return v } func (o optsMap) revs() bool { - return toBool(o["revs"]) + v, _ := toBool(o["revs"]) + return v } const ( @@ -226,3 +230,11 @@ func (o optsMap) update() (string, error) { } return "", &internal.Error{Status: http.StatusBadRequest, Message: "invalid value for `update`"} } + +func (o optsMap) reduce() *bool { + v, ok := toBool(o["reduce"]) + if !ok { + return nil + } + return &v +} diff --git a/x/sqlite/put_designdocs_test.go b/x/sqlite/put_designdocs_test.go index 896aea351..18aa0789d 100644 --- a/x/sqlite/put_designdocs_test.go +++ b/x/sqlite/put_designdocs_test.go @@ -109,7 +109,7 @@ func TestDBPut_designDocs(t *testing.T) { SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' - AND name LIKE 'foo_%_map_bar_%' + AND name LIKE 'foo_%_bar_map_%' `).Scan(&viewCount) if err != nil { t.Fatal(err) diff --git a/x/sqlite/query.go b/x/sqlite/query.go index e7041b4b2..5fe497740 100644 --- a/x/sqlite/query.go +++ b/x/sqlite/query.go @@ -19,11 +19,13 @@ import ( "errors" "fmt" "io" + "log" "net/http" "strings" "github.com/dop251/goja" + "github.com/go-kivik/kivik/v4" "github.com/go-kivik/kivik/v4/driver" "github.com/go-kivik/kivik/x/sqlite/v4/internal" ) @@ -58,27 +60,53 @@ func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Option } query := d.ddocQuery(ddoc, view, rev.String(), ` + WITH view AS ( + SELECT EXISTS( + SELECT 1 + FROM {{ .Design }} + WHERE id = $1 + AND rev = $2 + AND rev_id = $3 + AND func_type = 'reduce' + ) AS reducable + ) + SELECT - 0 AS ord, -- Hack to ensure that header row comes first, since sub-union ordering doesn't work COALESCE(MAX(last_seq), 0) == (SELECT COALESCE(max(seq),0) FROM {{ .Docs }}) AS up_to_date, - NULL, + view.reducable, NULL, NULL, NULL, NULL FROM {{ .Design }} + JOIN view WHERE id = $1 AND rev = $2 AND rev_id = $3 AND func_type = 'map' AND func_name = $4 - UNION + UNION ALL + + SELECT * + FROM ( + SELECT + NULL AS id, + NULL AS key, + value AS value, + "" AS rev, + NULL AS doc, + "" AS conflicts + FROM {{ .Reduce }} + JOIN view + WHERE view.reducable AND ($6 IS NULL OR $6 == TRUE) + ) + + UNION ALL SELECT * FROM ( SELECT - 1 AS ord, id, key, value, @@ -86,10 +114,16 @@ func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Option NULL AS doc, "" AS conflicts FROM {{ .Map }} + JOIN view + WHERE $6 == FALSE OR NOT view.reducable + ORDER BY key ) - ORDER BY ord, key `) - results, err = d.db.QueryContext(ctx, query, "_design/"+ddoc, rev.rev, rev.id, view) //nolint:rowserrcheck // Err checked in Next + + results, err = d.db.QueryContext( //nolint:rowserrcheck // Err checked in Next + ctx, query, + "_design/"+ddoc, rev.rev, rev.id, view, kivik.EndKeySuffix, opts.reduce(), + ) switch { case errIsNoSuchTable(err): return nil, &internal.Error{Status: http.StatusNotFound, Message: "missing named view"} @@ -105,10 +139,13 @@ func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Option if update != updateModeTrue { break } - var upToDate bool - if err := results.Scan(discard{}, &upToDate, discard{}, discard{}, discard{}, discard{}, discard{}); err != nil { + var upToDate, reducible bool + if err := results.Scan(&upToDate, &reducible, discard{}, discard{}, discard{}, discard{}); err != nil { return nil, err } + if reduce := opts.reduce(); reduce != nil && *reduce && !reducible { + return nil, &internal.Error{Status: http.StatusBadRequest, Message: "reduce is invalid for map-only views"} + } if upToDate { break } @@ -135,22 +172,36 @@ const batchSize = 100 // ddoc revid and last_seq. If mode is "true", it will also update the index. func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision, error) { var ( - ddocRev revision - funcBody *string - lastSeq int + ddocRev revision + mapFuncJS, reduceFuncJS *string + lastSeq int ) err := d.db.QueryRowContext(ctx, d.query(` + WITH design AS ( + SELECT + id, + rev, + rev_id, + MAX(CASE WHEN func_type = 'map' THEN func_body END) AS map_func, + MAX(CASE WHEN func_type = 'reduce' THEN func_body END) AS reduce_func, + MAX(last_seq) AS last_seq + FROM {{ .Design }} + WHERE id = $1 + AND func_type IN ('map', 'reduce') + GROUP BY id, rev, rev_id + ) SELECT docs.rev, docs.rev_id, - design.func_body, + design.map_func, + design.reduce_func, COALESCE(design.last_seq, 0) AS last_seq FROM {{ .Docs }} AS docs - LEFT JOIN {{ .Design }} AS design ON docs.id = design.id AND docs.rev = design.rev AND docs.rev_id = design.rev_id + LEFT JOIN design ON docs.id = design.id AND docs.rev = design.rev AND docs.rev_id = design.rev_id WHERE docs.id = $1 ORDER BY docs.rev DESC, docs.rev_id DESC LIMIT 1 - `), "_design/"+ddoc).Scan(&ddocRev.rev, &ddocRev.id, &funcBody, &lastSeq) + `), "_design/"+ddoc).Scan(&ddocRev.rev, &ddocRev.id, &mapFuncJS, &reduceFuncJS, &lastSeq) switch { case errors.Is(err, sql.ErrNoRows): return revision{}, &internal.Error{Status: http.StatusNotFound, Message: "missing"} @@ -162,7 +213,7 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision return ddocRev, nil } - if funcBody == nil { + if mapFuncJS == nil { return revision{}, &internal.Error{Status: http.StatusNotFound, Message: "missing named view"} } @@ -210,7 +261,7 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision } defer docs.Close() - batch := newMapIndexBatch() + batch := newMapIndexBatch(reduceFuncJS) vm := goja.New() @@ -233,11 +284,11 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision } } - if _, err := vm.RunString("const map = " + *funcBody); err != nil { + if _, err := vm.RunString("const map = " + *mapFuncJS); err != nil { return revision{}, err } - mf, ok := goja.AssertFunction(vm.Get("map")) + mapFunc, ok := goja.AssertFunction(vm.Get("map")) if !ok { return revision{}, fmt.Errorf("expected map to be a function, got %T", vm.Get("map")) } @@ -261,7 +312,7 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision if err := vm.Set("emit", emit(full.ID)); err != nil { return revision{}, err } - if _, err := mf(goja.Undefined(), vm.ToValue(full.toMap())); err != nil { + if _, err := mapFunc(goja.Undefined(), vm.ToValue(full.toMap())); err != nil { var exception *goja.Exception if errors.As(err, &exception) { d.logger.Printf("map function threw exception for %s: %s", full.ID, exception.String()) @@ -344,6 +395,8 @@ type mapIndexBatch struct { insertCount int entries map[string][]mapIndexEntry deleted []string + // reduce is the text of the JS reduce function, if any. + reduce *string } type mapIndexEntry struct { @@ -351,9 +404,10 @@ type mapIndexEntry struct { Value *string } -func newMapIndexBatch() *mapIndexBatch { +func newMapIndexBatch(reduce *string) *mapIndexBatch { return &mapIndexBatch{ entries: make(map[string][]mapIndexEntry, batchSize), + reduce: reduce, } } @@ -414,25 +468,133 @@ func (d *db) writeMapIndexBatch(ctx context.Context, seq int, rev revision, ddoc } } - if batch.insertCount == 0 { - return tx.Commit() - } - - args := make([]interface{}, 0, batch.insertCount*3) - values := make([]string, 0, batch.insertCount) - for id, entries := range batch.entries { - for _, entry := range entries { - values = append(values, fmt.Sprintf("($%d, $%d, $%d)", len(args)+1, len(args)+2, len(args)+3)) - args = append(args, id, entry.Key, entry.Value) + if batch.insertCount > 0 { + args := make([]interface{}, 0, batch.insertCount*3) + values := make([]string, 0, batch.insertCount) + for id, entries := range batch.entries { + for _, entry := range entries { + values = append(values, fmt.Sprintf("($%d, $%d, $%d)", len(args)+1, len(args)+2, len(args)+3)) + args = append(args, id, entry.Key, entry.Value) + } } - } - query := d.ddocQuery(ddoc, viewName, rev.String(), ` + query := d.ddocQuery(ddoc, viewName, rev.String(), ` INSERT INTO {{ .Map }} (id, key, value) VALUES `) + strings.Join(values, ",") - if _, err := tx.ExecContext(ctx, query, args...); err != nil { + if _, err := tx.ExecContext(ctx, query, args...); err != nil { + return err + } + } + + reduceFunc, err := batch.reduceFunc(d.logger) + if err != nil { return err } + if reduceFunc != nil { + if _, err := tx.ExecContext(ctx, d.ddocQuery(ddoc, viewName, rev.String(), ` + DELETE FROM {{ .Reduce }} + `)); err != nil { + return err + } + + rows, err := tx.QueryContext(ctx, d.ddocQuery(ddoc, viewName, rev.String(), ` + SELECT id, key, value + FROM {{ .Map }} + `)) + if err != nil { + return err + } + defer rows.Close() + + var ( + keys [][2]interface{} + values []interface{} + + id, key, value *string + ) + + for rows.Next() { + if err := rows.Scan(&id, &key, &value); err != nil { + return err + } + keys = append(keys, [2]interface{}{id, key}) + if value == nil { + values = append(values, nil) + } else { + values = append(values, *value) + } + } + if err := rows.Err(); err != nil { + return err + } + + rv := reduceFunc(keys, values, false) + var rvJSON *json.RawMessage + if rv != nil { + tmp, _ := json.Marshal(rv) + rvJSON = (*json.RawMessage)(&tmp) + } + + if _, err := tx.ExecContext(ctx, d.ddocQuery(ddoc, viewName, rev.String(), ` + INSERT INTO {{ .Reduce }} (min_key, max_key, value) + VALUES ($1, $2, $3) + `), nil, kivik.EndKeySuffix, rvJSON); err != nil { + return err + } + } return tx.Commit() } + +func (b *mapIndexBatch) reduceFunc(logger *log.Logger) (func(keys [][2]interface{}, values []interface{}, rereduce bool) interface{}, error) { + if b.reduce == nil { + return nil, nil + } + switch *b.reduce { + case "_count": + return func(_ [][2]interface{}, values []interface{}, rereduce bool) interface{} { + if !rereduce { + return len(values) + } + var total uint64 + for _, value := range values { + v, _ := toUint64(value, "") + total += v + } + return total + }, nil + case "_sum": + return func(_ [][2]interface{}, values []interface{}, _ bool) interface{} { + var total uint64 + for _, value := range values { + v, _ := toUint64(value, "") + total += v + } + return total + }, nil + default: + vm := goja.New() + + if _, err := vm.RunString("const reduce = " + *b.reduce); err != nil { + return nil, err + } + reduceFunc, ok := goja.AssertFunction(vm.Get("reduce")) + if !ok { + return nil, fmt.Errorf("expected reduce to be a function, got %T", vm.Get("map")) + } + + return func(keys [][2]interface{}, values []interface{}, rereduce bool) interface{} { + reduceValue, err := reduceFunc(goja.Undefined(), vm.ToValue(keys), vm.ToValue(values), vm.ToValue(rereduce)) + if err != nil { + var exception *goja.Exception + if errors.As(err, &exception) { + logger.Printf("reduce function threw exception: %s", exception.String()) + return nil + } + return err + } + + return reduceValue.Export() + }, nil + } +} diff --git a/x/sqlite/query_test.go b/x/sqlite/query_test.go index fd7fc94ae..3ca952051 100644 --- a/x/sqlite/query_test.go +++ b/x/sqlite/query_test.go @@ -57,6 +57,18 @@ func TestDBQuery(t *testing.T) { wantStatus: http.StatusNotFound, } }) + tests.Add("ddoc does exist but only non-view functions exist", func(t *testing.T) interface{} { + d := newDB(t) + _ = d.tPut("_design/foo", map[string]interface{}{"updates": map[string]string{"update1": "function() {}"}}) + + return test{ + db: d, + ddoc: "_design/foo", + view: "_view/bar", + wantErr: "missing named view", + wantStatus: http.StatusNotFound, + } + }) tests.Add("simple view with a single document", func(t *testing.T) interface{} { d := newDB(t) _ = d.tPut("_design/foo", map[string]interface{}{ @@ -418,11 +430,235 @@ func TestDBQuery(t *testing.T) { }, } }) + tests.Add("simple reduce function", func(t *testing.T) interface{} { + d := newDB(t) + _ = d.tPut("_design/foo", map[string]interface{}{ + "views": map[string]interface{}{ + "bar": map[string]string{ + "map": `function(doc) { + emit(doc._id, [1]); + }`, + // Manual implementation of _count for testing purposes. + "reduce": `function(sum, values, rereduce) { + if (rereduce) { + let sum=0; + for (let i=0; i < values.length; i++) { + sum += values[i]; + } + return sum; + } + return values.length; + }`, + }, + }, + }) + _ = d.tPut("a", map[string]string{"a": "a"}) + _ = d.tPut("b", map[string]string{"b": "b"}) + return test{ + db: d, + ddoc: "_design/foo", + view: "_view/bar", + want: []rowResult{ + { + Key: "null", + Value: "3", // TODO: Should be 2 because ddocs should be ignored + }, + }, + } + }) + tests.Add("reduce=true for map-only view returns 400", func(t *testing.T) interface{} { + d := newDB(t) + _ = d.tPut("_design/foo", map[string]interface{}{ + "views": map[string]interface{}{ + "bar": map[string]string{ + "map": `function(doc) { + emit(doc._id, [1]); + }`, + }, + }, + }) + _ = d.tPut("a", map[string]string{"a": "a"}) + _ = d.tPut("b", map[string]string{"b": "b"}) + + return test{ + db: d, + ddoc: "_design/foo", + view: "_view/bar", + options: kivik.Param("reduce", true), + wantErr: "reduce is invalid for map-only views", + wantStatus: http.StatusBadRequest, + } + }) + tests.Add("simple reduce function with reduce=true", func(t *testing.T) interface{} { + d := newDB(t) + _ = d.tPut("_design/foo", map[string]interface{}{ + "views": map[string]interface{}{ + "bar": map[string]string{ + "map": `function(doc) { + emit(doc._id, [1]); + }`, + // Manual implementation of _count for testing purposes. + "reduce": `function(sum, values, rereduce) { + if (rereduce) { + let sum=0; + for (let i=0; i < values.length; i++) { + sum += values[i]; + } + return sum; + } + return values.length; + }`, + }, + }, + }) + _ = d.tPut("a", map[string]string{"a": "a"}) + _ = d.tPut("b", map[string]string{"b": "b"}) + + return test{ + db: d, + ddoc: "_design/foo", + view: "_view/bar", + options: kivik.Param("reduce", true), + want: []rowResult{ + { + Key: "null", + Value: "3", // TODO: Should be 2 because ddocs should be ignored + }, + }, + } + }) + tests.Add("simple reduce function with reduce=false", func(t *testing.T) interface{} { + d := newDB(t) + _ = d.tPut("_design/foo", map[string]interface{}{ + "views": map[string]interface{}{ + "bar": map[string]string{ + "map": `function(doc) { + emit(doc._id, [1]); + }`, + // Manual implementation of _count for testing purposes. + "reduce": `function(sum, values, rereduce) { + if (rereduce) { + let sum=0; + for (let i=0; i < values.length; i++) { + sum += values[i]; + } + return sum; + } + return values.length; + }`, + }, + }, + }) + _ = d.tPut("a", map[string]string{"a": "a"}) + _ = d.tPut("b", map[string]string{"b": "b"}) + + return test{ + db: d, + ddoc: "_design/foo", + view: "_view/bar", + options: kivik.Param("reduce", false), + want: []rowResult{ + {ID: "_design/foo", Key: `"_design/foo"`, Value: `[1]`}, + {ID: "a", Key: `"a"`, Value: `[1]`}, + {ID: "b", Key: `"b"`, Value: `[1]`}, + }, + } + }) + tests.Add("reduce function throws an error", func(t *testing.T) interface{} { + d := newDB(t) + _ = d.tPut("_design/foo", map[string]interface{}{ + "views": map[string]interface{}{ + "bar": map[string]string{ + "map": `function(doc) { + emit(doc._id, [1]); + }`, + // Manual implementation of _count for testing purposes. + "reduce": `function(sum, values, rereduce) { + throw new Error("broken"); + }`, + }, + }, + }) + _ = d.tPut("a", map[string]string{"a": "a"}) + _ = d.tPut("b", map[string]string{"b": "b"}) + + return test{ + db: d, + ddoc: "_design/foo", + view: "_view/bar", + want: []rowResult{ + { + Key: "null", + Value: "null", + }, + }, + } + }) + tests.Add("built-in _count reduce function", func(t *testing.T) interface{} { + d := newDB(t) + _ = d.tPut("_design/foo", map[string]interface{}{ + "views": map[string]interface{}{ + "bar": map[string]string{ + "map": `function(doc) { + emit(doc._id, [1]); + }`, + // Manual implementation of _count for testing purposes. + "reduce": `_count`, + }, + }, + }) + _ = d.tPut("a", map[string]string{"a": "a"}) + _ = d.tPut("b", map[string]string{"b": "b"}) + + return test{ + db: d, + ddoc: "_design/foo", + view: "_view/bar", + want: []rowResult{ + { + Key: "null", + Value: "3", // TODO: Should be 2 because ddocs should be ignored + }, + }, + } + }) + tests.Add("built-in _sum reduce function", func(t *testing.T) interface{} { + d := newDB(t) + _ = d.tPut("_design/foo", map[string]interface{}{ + "views": map[string]interface{}{ + "bar": map[string]string{ + "map": `function(doc) { + emit(doc._id, 3); + }`, + // Manual implementation of _count for testing purposes. + "reduce": `_sum`, + }, + }, + }) + _ = d.tPut("a", map[string]string{"a": "a"}) + _ = d.tPut("b", map[string]string{"b": "b"}) + + return test{ + db: d, + ddoc: "_design/foo", + view: "_view/bar", + want: []rowResult{ + { + Key: "null", + Value: "9", // TODO: Should be 2 because ddocs should be ignored + }, + }, + } + }) /* TODO: - - Are conflicts or other metadata exposed to map function? - - built-in reduce functions: _sum, _count + - built-in reduce functions: + - _approx_count_distinct (https://docs.couchdb.org/en/stable/ddocs/ddocs.html#approx_count_distinct) + - _approx_count_distinct + - start/end keys + - group behavior + - _stats (https://docs.couchdb.org/en/stable/ddocs/ddocs.html#stats) - Options: - conflicts - descending @@ -447,7 +683,8 @@ func TestDBQuery(t *testing.T) { - startkey_docid - start_key_doc_id - update_seq - - map function takes too long + - map/reduce function takes too long + - exclude design docs by default */ diff --git a/x/sqlite/schema.go b/x/sqlite/schema.go index c71391f22..574da5c88 100644 --- a/x/sqlite/schema.go +++ b/x/sqlite/schema.go @@ -94,11 +94,17 @@ var schema = []string{ )`, } -var viewMapSchema = []string{ +var viewSchema = []string{ `CREATE TABLE IF NOT EXISTS {{ .Map }} ( id TEXT NOT NULL, key TEXT COLLATE COUCHDB_UCI, value TEXT )`, `CREATE INDEX IF NOT EXISTS {{ .IndexMap }} ON {{ .Map }} (key)`, + `CREATE TABLE IF NOT EXISTS {{ .Reduce }} ( + min_key TEXT, + max_key TEXT, + value TEXT, + UNIQUE (min_key, max_key) + )`, } diff --git a/x/sqlite/templ.go b/x/sqlite/templ.go index d85b62828..87e03ab29 100644 --- a/x/sqlite/templ.go +++ b/x/sqlite/templ.go @@ -71,10 +71,11 @@ func (t *tmplFuncs) hashedName(typ string) string { if t.ddoc == "" { panic("ddoc template method called outside of a ddoc template") } - name := strings.Join([]string{t.ddoc, t.rev, typ, t.viewName}, "_") + name := strings.Join([]string{t.ddoc, t.rev, t.viewName}, "_") if t.hash == "" { t.hash = md5sumString(name)[:8] } + name += "_" + typ if len(name) > maxTableLen-len(t.hash) { name = name[:maxTableLen-len(t.hash)] } diff --git a/x/sqlite/views.go b/x/sqlite/views.go index 78ec27334..15739fd24 100644 --- a/x/sqlite/views.go +++ b/x/sqlite/views.go @@ -114,7 +114,6 @@ func (d *db) queryView(ctx context.Context, view string, options driver.Options) WHERE NOT deleted ) SELECT - 0 AS ord, rev.id AS id, rev.id AS key, '{"value":{"rev":"' || rev.rev || '-' || rev.rev_id || '"}}' AS value, @@ -166,14 +165,18 @@ func (r *rows) Next(row *driver.Row) error { return io.EOF } var ( + id *string key, doc []byte value *[]byte conflicts *string rev string ) - if err := r.rows.Scan(discard{}, &row.ID, &key, &value, &rev, &doc, &conflicts); err != nil { + if err := r.rows.Scan(&id, &key, &value, &rev, &doc, &conflicts); err != nil { return err } + if id != nil { + row.ID = *id + } row.Key = key if len(key) == 0 { row.Key = []byte("null")