From b1c1543f277835b3a22962e5c07feb48aff8cef8 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 18 Jul 2024 14:57:06 +0200 Subject: [PATCH 1/5] Update Changes feed ETag to be based only on the final seq This seems to be how CouchDB does it. --- x/sqlite/changes.go | 4 ++-- x/sqlite/changes_test.go | 27 +++++++++++++-------------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/x/sqlite/changes.go b/x/sqlite/changes.go index 55b1efffd..c9fb954d8 100644 --- a/x/sqlite/changes.go +++ b/x/sqlite/changes.go @@ -148,7 +148,7 @@ func (d *db) normalChangesQueryWithDocs(direction string) string { COUNT(*) AS id, NULL AS seq, NULL AS deleted, - COUNT(*) || '.' || COALESCE(MIN(seq),0) || '.' || COALESCE(MAX(seq),0) AS rev, + COALESCE(MAX(seq),0) AS summary, NULL AS doc, NULL AS attachment_count, NULL AS filename, @@ -214,7 +214,7 @@ func (d *db) normalChangesQueryWithoutDocs(direction string) string { COUNT(*) AS id, NULL AS seq, NULL AS deleted, - COUNT(*) || '.' || COALESCE(MIN(seq),0) || '.' || COALESCE(MAX(seq),0) AS rev, + COALESCE(MAX(seq),0) AS summary, NULL AS doc, NULL AS attachment_count, NULL AS filename, diff --git a/x/sqlite/changes_test.go b/x/sqlite/changes_test.go index 87a66df6d..43c5ae14c 100644 --- a/x/sqlite/changes_test.go +++ b/x/sqlite/changes_test.go @@ -50,7 +50,7 @@ func TestDBChanges(t *testing.T) { tests := testy.NewTable() tests.Add("no changes in db", test{ wantLastSeq: &[]string{""}[0], - wantETag: &[]string{"c7ba27130f956748671e845893fd6b80"}[0], + wantETag: &[]string{"cfcd208495d565ef66e7dff9f98764da"}[0], }) tests.Add("one change", func(t *testing.T) interface{} { d := newDB(t) @@ -65,7 +65,7 @@ func TestDBChanges(t *testing.T) { }, }, wantLastSeq: &[]string{"1"}[0], - wantETag: &[]string{"872ccd9c6dce18ce6ea4d5106540f089"}[0], + wantETag: &[]string{"c4ca4238a0b923820dcc509a6f75849b"}[0], } }) tests.Add("deleted event", func(t *testing.T) interface{} { @@ -89,7 +89,7 @@ func TestDBChanges(t *testing.T) { }, }, wantLastSeq: &[]string{"2"}[0], - wantETag: &[]string{"9562870d7e8245d03c2ac6055dff735f"}[0], + wantETag: &[]string{"c81e728d9d4c2f636f067f89cc14862c"}[0], } }) tests.Add("longpoll", func(t *testing.T) interface{} { @@ -139,7 +139,7 @@ func TestDBChanges(t *testing.T) { }, }, wantLastSeq: &[]string{"2"}[0], - wantETag: &[]string{"bf701dae9aff5bb22b8f000dc9bf6199"}[0], + wantETag: &[]string{"c81e728d9d4c2f636f067f89cc14862c"}[0], } }) tests.Add("malformed sequence id", test{ @@ -164,7 +164,7 @@ func TestDBChanges(t *testing.T) { }, }, wantLastSeq: &[]string{"2"}[0], - wantETag: &[]string{"bf701dae9aff5bb22b8f000dc9bf6199"}[0], + wantETag: &[]string{"c81e728d9d4c2f636f067f89cc14862c"}[0], } }) tests.Add("future since value returns only latest change, longpoll mode", func(t *testing.T) interface{} { @@ -251,7 +251,7 @@ func TestDBChanges(t *testing.T) { }), wantChanges: nil, wantLastSeq: &[]string{"2"}[0], - wantETag: &[]string{"c7ba27130f956748671e845893fd6b80"}[0], + wantETag: &[]string{"cfcd208495d565ef66e7dff9f98764da"}[0], } }) tests.Add("limit=0 acts the same as limit=1", func(t *testing.T) interface{} { @@ -270,7 +270,7 @@ func TestDBChanges(t *testing.T) { }, }, wantLastSeq: &[]string{"1"}[0], - wantETag: &[]string{"9562870d7e8245d03c2ac6055dff735f"}[0], + wantETag: &[]string{"c81e728d9d4c2f636f067f89cc14862c"}[0], wantPending: &[]int64{1}[0], } }) @@ -290,7 +290,7 @@ func TestDBChanges(t *testing.T) { }, }, wantLastSeq: &[]string{"1"}[0], - wantETag: &[]string{"9562870d7e8245d03c2ac6055dff735f"}[0], + wantETag: &[]string{"c81e728d9d4c2f636f067f89cc14862c"}[0], wantPending: &[]int64{1}[0], } }) @@ -310,7 +310,7 @@ func TestDBChanges(t *testing.T) { }, }, wantLastSeq: &[]string{"1"}[0], - wantETag: &[]string{"9562870d7e8245d03c2ac6055dff735f"}[0], + wantETag: &[]string{"c81e728d9d4c2f636f067f89cc14862c"}[0], wantPending: &[]int64{1}[0], } }) @@ -359,7 +359,7 @@ func TestDBChanges(t *testing.T) { }, }, wantLastSeq: &[]string{"1"}[0], - wantETag: &[]string{"9562870d7e8245d03c2ac6055dff735f"}[0], + wantETag: &[]string{"c81e728d9d4c2f636f067f89cc14862c"}[0], } }) tests.Add("include docs, normal feed", func(t *testing.T) interface{} { @@ -386,7 +386,7 @@ func TestDBChanges(t *testing.T) { }, }, wantLastSeq: &[]string{"2"}[0], - wantETag: &[]string{"9562870d7e8245d03c2ac6055dff735f"}[0], + wantETag: &[]string{"c81e728d9d4c2f636f067f89cc14862c"}[0], } }) tests.Add("include docs, attachment stubs, normal feed", func(t *testing.T) interface{} { @@ -410,7 +410,7 @@ func TestDBChanges(t *testing.T) { }, }, wantLastSeq: &[]string{"1"}[0], - wantETag: &[]string{"872ccd9c6dce18ce6ea4d5106540f089"}[0], + wantETag: &[]string{"c4ca4238a0b923820dcc509a6f75849b"}[0], } }) tests.Add("include docs and attachments, normal feed", func(t *testing.T) interface{} { @@ -437,13 +437,12 @@ func TestDBChanges(t *testing.T) { }, }, wantLastSeq: &[]string{"1"}[0], - wantETag: &[]string{"872ccd9c6dce18ce6ea4d5106540f089"}[0], + wantETag: &[]string{"c4ca4238a0b923820dcc509a6f75849b"}[0], } }) /* TODO: - - ETag should be based only on last sequence, I think - Options - doc_ids - conflicts From c5bd8684a5cad9eea5a4d7f4b5a8bd0d2eaae0c4 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 18 Jul 2024 15:48:09 +0200 Subject: [PATCH 2/5] Add support for filter=_doc_ids to changes feed --- x/sqlite/changes.go | 35 +++++++++++-------- x/sqlite/changes_test.go | 72 ++++++++++++++++++++++++++++++++++++++-- x/sqlite/find_test.go | 4 +-- x/sqlite/options.go | 32 ++++++++++++++++-- 4 files changed, 124 insertions(+), 19 deletions(-) diff --git a/x/sqlite/changes.go b/x/sqlite/changes.go index c9fb954d8..1fefac35c 100644 --- a/x/sqlite/changes.go +++ b/x/sqlite/changes.go @@ -85,21 +85,28 @@ func (d *db) newNormalChanges(ctx context.Context, opts optsMap, since, lastSeq if err != nil { return nil, err } + attachments, err := opts.attachments() + if err != nil { + return nil, err + } + + args := []any{since, attachments} + where, err := opts.changesWhere(&args) + if err != nil { + return nil, err + } var query string if includeDocs { - query = d.normalChangesQueryWithDocs(descendingToDirection(descending)) + query = d.normalChangesQueryWithDocs(descendingToDirection(descending), where) } else { - query = d.normalChangesQueryWithoutDocs(descendingToDirection(descending)) + query = d.normalChangesQueryWithoutDocs(descendingToDirection(descending), where) } if limit > 0 { query += " LIMIT " + strconv.FormatUint(limit+1, 10) } - attachments, err := opts.attachments() - if err != nil { - return nil, err - } - c.rows, err = d.db.QueryContext(ctx, query, since, attachments) //nolint:rowserrcheck,sqlclosecheck // Err checked in Next + + c.rows, err = d.db.QueryContext(ctx, query, args...) //nolint:rowserrcheck,sqlclosecheck // Err checked in Next if err != nil { return nil, err } @@ -130,7 +137,7 @@ func (d *db) newNormalChanges(ctx context.Context, opts optsMap, since, lastSeq return c, nil } -func (d *db) normalChangesQueryWithDocs(direction string) string { +func (d *db) normalChangesQueryWithDocs(direction, where string) string { return fmt.Sprintf(d.query(` WITH results AS ( SELECT @@ -192,12 +199,13 @@ func (d *db) normalChangesQueryWithDocs(direction string) string { FROM results LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON bridge.id = results.id AND bridge.rev = results.rev AND bridge.rev_id = results.rev_id LEFT JOIN {{ .Attachments }} AS att ON att.pk = bridge.pk - ORDER BY seq %s + %[2]s -- WHERE + ORDER BY seq %[1]s ) - `), direction) + `), direction, where) } -func (d *db) normalChangesQueryWithoutDocs(direction string) string { +func (d *db) normalChangesQueryWithoutDocs(direction, where string) string { return fmt.Sprintf(d.query(` WITH results AS ( SELECT @@ -247,9 +255,10 @@ func (d *db) normalChangesQueryWithoutDocs(direction string) string { results.deleted, results.rev || '-' || results.rev_id AS rev FROM results - ORDER BY seq %s + %[2]s -- WHERE + ORDER BY seq %[1]s ) - `), direction) + `), direction, where) } func (c *normalChanges) Next(change *driver.Change) error { diff --git a/x/sqlite/changes_test.go b/x/sqlite/changes_test.go index 43c5ae14c..3978e5a7a 100644 --- a/x/sqlite/changes_test.go +++ b/x/sqlite/changes_test.go @@ -440,17 +440,85 @@ func TestDBChanges(t *testing.T) { wantETag: &[]string{"c4ca4238a0b923820dcc509a6f75849b"}[0], } }) + tests.Add("filter=_doc_ids without doc_ids", test{ + options: kivik.Params(map[string]interface{}{ + "filter": "_doc_ids", + }), + wantStatus: http.StatusBadRequest, + wantErr: "filter=_doc_ids requires doc_ids parameter", + }) + tests.Add("filter=_doc_ids with invalid doc_ids", test{ + options: kivik.Params(map[string]interface{}{ + "filter": "_doc_ids", + "doc_ids": 3, + }), + wantStatus: http.StatusBadRequest, + wantErr: "invalid value for 'doc_ids': 3", + }) + tests.Add("filter=_doc_ids with invalid doc_ids field", test{ + options: kivik.Params(map[string]interface{}{ + "filter": "_doc_ids", + "doc_ids": []interface{}{"foo", 3}, + }), + wantStatus: http.StatusBadRequest, + wantErr: "invalid 'doc_ids' field: 3", + }) + tests.Add("normal feed, doc_ids", func(t *testing.T) interface{} { + d := newDB(t) + rev := d.tPut("doc1", map[string]string{"foo": "bar"}) + _ = d.tPut("doc2", map[string]string{"foo": "bar"}) + + return test{ + db: d, + options: kivik.Params(map[string]interface{}{ + "filter": "_doc_ids", + "doc_ids": []interface{}{"doc1"}, + }), + wantChanges: []driver.Change{ + { + ID: "doc1", + Seq: "1", + Changes: driver.ChangedRevs{rev}, + }, + }, + wantLastSeq: &[]string{"1"}[0], + wantETag: &[]string{"c81e728d9d4c2f636f067f89cc14862c"}[0], + } + }) + tests.Add("normal feed with docs, doc_ids", func(t *testing.T) interface{} { + d := newDB(t) + rev := d.tPut("doc1", map[string]string{"foo": "bar"}) + _ = d.tPut("doc2", map[string]string{"foo": "bar"}) + + return test{ + db: d, + options: kivik.Params(map[string]interface{}{ + "include_docs": true, + "filter": "_doc_ids", + "doc_ids": []interface{}{"doc1"}, + }), + wantChanges: []driver.Change{ + { + ID: "doc1", + Seq: "1", + Changes: driver.ChangedRevs{rev}, + Doc: []byte(`{"_id":"doc1","_rev":"1-66f46afbe3effef8424aa0e291d21560","foo":"bar"}`), + }, + }, + wantLastSeq: &[]string{"1"}[0], + wantETag: &[]string{"c81e728d9d4c2f636f067f89cc14862c"}[0], + } + }) /* TODO: - Options - - doc_ids - conflicts - feed - normal - longpoll - continuous - - filter + - filter w/ design doc - att_encoding_info - style - timeout diff --git a/x/sqlite/find_test.go b/x/sqlite/find_test.go index 3b94db267..ae06f402a 100644 --- a/x/sqlite/find_test.go +++ b/x/sqlite/find_test.go @@ -203,7 +203,7 @@ func TestFind(t *testing.T) { tests.Add("sort, invalid field", test{ query: `{"selector":{},"sort":["x",3]}`, wantStatus: http.StatusBadRequest, - wantErr: "invalid sort field: 3", + wantErr: "invalid 'sort' field: 3", }) tests.Add("fields, non-array", test{ query: `{"selector":{},"fields":"x"}`, @@ -213,7 +213,7 @@ func TestFind(t *testing.T) { tests.Add("fields, invalid field", test{ query: `{"selector":{},"fields":["x",3]}`, wantStatus: http.StatusBadRequest, - wantErr: "invalid fields field: 3", + wantErr: "invalid 'fields' field: 3", }) /* diff --git a/x/sqlite/options.go b/x/sqlite/options.go index 87ae77916..00d58ffbc 100644 --- a/x/sqlite/options.go +++ b/x/sqlite/options.go @@ -206,6 +206,34 @@ func (o optsMap) changesLimit() (uint64, error) { return limit, nil } +func (o optsMap) changesWhere(args *[]any) (string, error) { + filter, _ := o["filter"].(string) + switch filter { + case "": + return "", nil + case "_doc_ids": + raw, ok := o["doc_ids"] + if !ok { + return "", &internal.Error{Status: http.StatusBadRequest, Message: "filter=_doc_ids requires doc_ids parameter"} + } + list, ok := raw.([]any) + if !ok { + return "", &internal.Error{Status: http.StatusBadRequest, Message: fmt.Sprintf("invalid value for 'doc_ids': %v", raw)} + } + start := len(*args) + for _, v := range list { + if _, ok := v.(string); !ok { + return "", &internal.Error{Status: http.StatusBadRequest, Message: fmt.Sprintf("invalid 'doc_ids' field: %v", v)} + } + *args = append(*args, v) + } + + return fmt.Sprintf("WHERE results.id IN (%s)", placeholders(start+1, len(*args)-start)), nil + default: + panic("unimplemented") + } +} + // limit returns the limit value as an int64, or -1 if the limit is unset. // If the limit is invalid, an error is returned with status 400. func (o optsMap) limit() (int64, error) { @@ -240,7 +268,7 @@ func (o optsMap) fields() ([]string, error) { for _, v := range f { s, ok := v.(string) if !ok { - return nil, &internal.Error{Status: http.StatusBadRequest, Message: fmt.Sprintf("invalid fields field: %v", v)} + return nil, &internal.Error{Status: http.StatusBadRequest, Message: fmt.Sprintf("invalid 'fields' field: %v", v)} } fields = append(fields, s) } @@ -513,7 +541,7 @@ func (o optsMap) sort() ([]string, error) { for i, v := range list { s, ok := v.(string) if !ok { - return nil, &internal.Error{Status: http.StatusBadRequest, Message: fmt.Sprintf("invalid sort field: %v", v)} + return nil, &internal.Error{Status: http.StatusBadRequest, Message: fmt.Sprintf("invalid 'sort' field: %v", v)} } sort[i] = s } From 1db4f9e793c9311f7298c3f131f7f77ee3f391f0 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 18 Jul 2024 17:03:06 +0200 Subject: [PATCH 3/5] Consolidate duplicate Changes feed queries --- x/sqlite/changes.go | 69 +++------------------------------------------ 1 file changed, 4 insertions(+), 65 deletions(-) diff --git a/x/sqlite/changes.go b/x/sqlite/changes.go index 1fefac35c..190944102 100644 --- a/x/sqlite/changes.go +++ b/x/sqlite/changes.go @@ -90,18 +90,13 @@ func (d *db) newNormalChanges(ctx context.Context, opts optsMap, since, lastSeq return nil, err } - args := []any{since, attachments} + args := []any{since, attachments, includeDocs} where, err := opts.changesWhere(&args) if err != nil { return nil, err } - var query string - if includeDocs { - query = d.normalChangesQueryWithDocs(descendingToDirection(descending), where) - } else { - query = d.normalChangesQueryWithoutDocs(descendingToDirection(descending), where) - } + query := d.normalChangesQueryWithDocs(descendingToDirection(descending), where) if limit > 0 { query += " LIMIT " + strconv.FormatUint(limit+1, 10) } @@ -146,7 +141,7 @@ func (d *db) normalChangesQueryWithDocs(direction, where string) string { deleted, rev, rev_id, - doc + IIF($3, doc, NULL) AS doc FROM {{ .Docs }} WHERE ($1 IS NULL OR seq > $1) ORDER BY seq @@ -197,7 +192,7 @@ func (d *db) normalChangesQueryWithDocs(direction, where string) string { att.rev_pos, IIF($2, data, NULL) AS data FROM results - LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON bridge.id = results.id AND bridge.rev = results.rev AND bridge.rev_id = results.rev_id + LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON bridge.id = results.id AND bridge.rev = results.rev AND bridge.rev_id = results.rev_id AND doc IS NOT NULL LEFT JOIN {{ .Attachments }} AS att ON att.pk = bridge.pk %[2]s -- WHERE ORDER BY seq %[1]s @@ -205,62 +200,6 @@ func (d *db) normalChangesQueryWithDocs(direction, where string) string { `), direction, where) } -func (d *db) normalChangesQueryWithoutDocs(direction, where string) string { - return fmt.Sprintf(d.query(` - WITH results AS ( - SELECT - id, - seq, - deleted, - rev, - rev_id - FROM {{ .Docs }} - WHERE ($1 IS NULL OR seq > $1) - ORDER BY seq - ) - SELECT - COUNT(*) AS id, - NULL AS seq, - NULL AS deleted, - COALESCE(MAX(seq),0) AS summary, - NULL AS doc, - NULL AS attachment_count, - NULL AS filename, - NULL AS content_type, - NULL AS length, - NULL AS digest, - NULL AS rev_pos, - NULL AS data - FROM results - - UNION ALL - - SELECT - id, - seq, - deleted, - rev, - NULL AS doc, - 0 AS attachment_count, - NULL AS filename, - NULL AS content_type, - NULL AS length, - NULL AS digest, - NULL AS rev_pos, - NULL AS data - FROM ( - SELECT - results.id, - results.seq, - results.deleted, - results.rev || '-' || results.rev_id AS rev - FROM results - %[2]s -- WHERE - ORDER BY seq %[1]s - ) - `), direction, where) -} - func (c *normalChanges) Next(change *driver.Change) error { var ( rev string From 9288ed23a46bf67fd2476f209bedbb4e7a182309 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 18 Jul 2024 17:04:48 +0200 Subject: [PATCH 4/5] Inline normalChangesQueryWithDocs for improved readability --- x/sqlite/changes.go | 75 ++++++++++++++++++++++----------------------- 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/x/sqlite/changes.go b/x/sqlite/changes.go index 190944102..25f1e3bc6 100644 --- a/x/sqlite/changes.go +++ b/x/sqlite/changes.go @@ -96,44 +96,7 @@ func (d *db) newNormalChanges(ctx context.Context, opts optsMap, since, lastSeq return nil, err } - query := d.normalChangesQueryWithDocs(descendingToDirection(descending), where) - if limit > 0 { - query += " LIMIT " + strconv.FormatUint(limit+1, 10) - } - - c.rows, err = d.db.QueryContext(ctx, query, args...) //nolint:rowserrcheck,sqlclosecheck // Err checked in Next - if err != nil { - return nil, err - } - - // The first row is used to calculate the ETag; it's done as part of the - // same query, even though it's a bit ugly, to ensure it's all in the same - // implicit transaction. - if !c.rows.Next() { - // should never happen - return nil, errors.New("no rows returned") - } - var summary string - if err := c.rows.Scan( - &c.pending, - discard{}, discard{}, - &summary, - discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, - ); err != nil { - return nil, err - } - - if feed == feedNormal { - h := md5.New() - _, _ = h.Write([]byte(summary)) - c.etag = hex.EncodeToString(h.Sum(nil)) - } - - return c, nil -} - -func (d *db) normalChangesQueryWithDocs(direction, where string) string { - return fmt.Sprintf(d.query(` + query := fmt.Sprintf(d.query(` WITH results AS ( SELECT id, @@ -197,7 +160,41 @@ func (d *db) normalChangesQueryWithDocs(direction, where string) string { %[2]s -- WHERE ORDER BY seq %[1]s ) - `), direction, where) + `), descendingToDirection(descending), where) + + if limit > 0 { + query += " LIMIT " + strconv.FormatUint(limit+1, 10) + } + + c.rows, err = d.db.QueryContext(ctx, query, args...) //nolint:rowserrcheck,sqlclosecheck // Err checked in Next + if err != nil { + return nil, err + } + + // The first row is used to calculate the ETag; it's done as part of the + // same query, even though it's a bit ugly, to ensure it's all in the same + // implicit transaction. + if !c.rows.Next() { + // should never happen + return nil, errors.New("no rows returned") + } + var summary string + if err := c.rows.Scan( + &c.pending, + discard{}, discard{}, + &summary, + discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, + ); err != nil { + return nil, err + } + + if feed == feedNormal { + h := md5.New() + _, _ = h.Write([]byte(summary)) + c.etag = hex.EncodeToString(h.Sum(nil)) + } + + return c, nil } func (c *normalChanges) Next(change *driver.Change) error { From e800ee01fa8093ce2f1754cd1d67e224528a90b5 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 18 Jul 2024 17:40:28 +0200 Subject: [PATCH 5/5] Consolidate duplicate longpoll changes queries --- x/sqlite/changes.go | 69 ++++------------------------------------ x/sqlite/changes_test.go | 10 +++--- 2 files changed, 11 insertions(+), 68 deletions(-) diff --git a/x/sqlite/changes.go b/x/sqlite/changes.go index 25f1e3bc6..a0988d790 100644 --- a/x/sqlite/changes.go +++ b/x/sqlite/changes.go @@ -322,6 +322,7 @@ func (d *db) Changes(ctx context.Context, options driver.Options) (driver.Change type longpollChanges struct { stmt *sql.Stmt since uint64 + includeDocs bool attachments bool lastSeq string ctx context.Context @@ -337,13 +338,6 @@ type longpollChange struct { var _ driver.Changes = (*longpollChanges)(nil) func (d *db) newLongpollChanges(ctx context.Context, includeDocs, attachments bool) (*longpollChanges, error) { - if includeDocs { - return d.newLongpollChangesWithDocs(ctx, attachments) - } - return d.newLongpollChangesWithoutDocs(ctx) -} - -func (d *db) newLongpollChangesWithDocs(ctx context.Context, attachments bool) (*longpollChanges, error) { since, err := d.lastSeq(ctx) if err != nil { return nil, err @@ -368,7 +362,7 @@ func (d *db) newLongpollChangesWithDocs(ctx context.Context, attachments bool) ( doc.seq, doc.deleted, doc.rev || '-' || doc.rev_id AS rev, - doc.doc, + doc, att.filename, att.content_type, att.length, @@ -383,13 +377,13 @@ func (d *db) newLongpollChangesWithDocs(ctx context.Context, attachments bool) ( deleted, rev, rev_id, - doc + IIF($3, doc, NULL) AS doc FROM {{ .Docs }} WHERE seq > $1 ORDER BY seq LIMIT 1 ) AS doc - LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON bridge.id = doc.id AND bridge.rev = doc.rev AND bridge.rev_id = doc.rev_id + LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON bridge.id = doc.id AND bridge.rev = doc.rev AND bridge.rev_id = doc.rev_id AND doc IS NOT NULL LEFT JOIN {{ .Attachments }} AS att ON att.pk = bridge.pk ) `)) @@ -403,6 +397,7 @@ func (d *db) newLongpollChangesWithDocs(ctx context.Context, attachments bool) ( stmt: stmt, since: since, attachments: attachments, + includeDocs: includeDocs, ctx: ctx, cancel: cancel, changes: changes, @@ -413,58 +408,6 @@ func (d *db) newLongpollChangesWithDocs(ctx context.Context, attachments bool) ( return c, nil } -func (d *db) newLongpollChangesWithoutDocs(ctx context.Context) (*longpollChanges, error) { - since, err := d.lastSeq(ctx) - if err != nil { - return nil, err - } - - stmt, err := d.db.PrepareContext(ctx, d.query(` - SELECT - doc.id, - doc.seq, - doc.deleted, - doc.rev || '-' || doc.rev_id AS rev, - NULL AS doc, - NULL AS filename, - NULL AS content_type, - NULL AS length, - NULL AS digest, - NULL AS rev_pos, - NULL AS data - FROM ( - SELECT - id, - seq, - deleted, - rev, - rev_id, - doc - FROM {{ .Docs }} - WHERE seq > $1 - ORDER BY seq - LIMIT 1 - ) AS doc - `)) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithCancel(ctx) - changes := make(chan longpollChange) - c := &longpollChanges{ - stmt: stmt, - since: since, - ctx: ctx, - cancel: cancel, - changes: changes, - } - - go c.watch(changes) - - return c, nil -} - // watch runs in a loop until either the context is cancelled, or a change is // detected. func (c *longpollChanges) watch(changes chan<- longpollChange) { @@ -476,7 +419,7 @@ func (c *longpollChanges) watch(changes chan<- longpollChange) { bo.MaxElapsedTime = 0 err := backoff.Retry(func() error { - rows, err := c.stmt.QueryContext(c.ctx, c.since, c.attachments) + rows, err := c.stmt.QueryContext(c.ctx, c.since, c.attachments, c.includeDocs) if err != nil { return backoff.Permanent(err) } diff --git a/x/sqlite/changes_test.go b/x/sqlite/changes_test.go index 3978e5a7a..9a472d01f 100644 --- a/x/sqlite/changes_test.go +++ b/x/sqlite/changes_test.go @@ -1053,7 +1053,7 @@ func Test_normal_changes_query_without_docs(t *testing.T) { } } -// This test validates that the query for the longpolll changes feed does not +// This test validates that the query for the longpoll changes feed does not // duplicate unnecessary fields when returning multiple attachments. func Test_longpoll_changes_query(t *testing.T) { t.Parallel() @@ -1075,7 +1075,7 @@ func Test_longpoll_changes_query(t *testing.T) { }) // Then execute the prepared statement - rows, err := changes.stmt.Query(0, true) + rows, err := changes.stmt.Query(0, true, true) if err != nil { t.Fatal(err) } @@ -1114,7 +1114,7 @@ func Test_longpoll_changes_query(t *testing.T) { } } -// This test validates that the query for the longpolll changes feed does not +// This test validates that the query for the longpoll changes feed does not // include any attachment data when include_docs=false func Test_longpoll_changes_query_without_docs(t *testing.T) { t.Parallel() @@ -1136,7 +1136,7 @@ func Test_longpoll_changes_query_without_docs(t *testing.T) { }) // Then execute the prepared statement - rows, err := changes.stmt.Query(0, true) + rows, err := changes.stmt.Query(0, true, false) if err != nil { t.Fatal(err) } @@ -1169,7 +1169,7 @@ func Test_longpoll_changes_query_without_docs(t *testing.T) { {ID: &[]string{"doc1"}[0], Seq: &[]string{"1"}[0], Deleted: &[]bool{false}[0], Rev: &rev}, } - if d := cmp.Diff(got, want); d != "" { + if d := cmp.Diff(want, got); d != "" { t.Errorf("Unexpected rows:\n%s", d) } }