Skip to content

Commit

Permalink
Merge pull request #1024 from go-kivik/moresqlite
Browse files Browse the repository at this point in the history
x/sqlite: Changes to Changes feed
  • Loading branch information
flimzy authored Jul 18, 2024
2 parents f63c466 + e800ee0 commit 0a64d9d
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 184 deletions.
206 changes: 47 additions & 159 deletions x/sqlite/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,61 +85,26 @@ func (d *db) newNormalChanges(ctx context.Context, opts optsMap, since, lastSeq
if err != nil {
return nil, err
}

var query string
if includeDocs {
query = d.normalChangesQueryWithDocs(descendingToDirection(descending))
} else {
query = d.normalChangesQueryWithoutDocs(descendingToDirection(descending))
}
if limit > 0 {
query += " LIMIT " + strconv.FormatUint(limit+1, 10)
}
attachments, err := opts.attachments()
if err != nil {
return nil, err
}
c.rows, err = d.db.QueryContext(ctx, query, since, attachments) //nolint:rowserrcheck,sqlclosecheck // Err checked in Next
if err != nil {
return nil, err
}

// The first row is used to calculate the ETag; it's done as part of the
// same query, even though it's a bit ugly, to ensure it's all in the same
// implicit transaction.
if !c.rows.Next() {
// should never happen
return nil, errors.New("no rows returned")
}
var summary string
if err := c.rows.Scan(
&c.pending,
discard{}, discard{},
&summary,
discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{},
); err != nil {
args := []any{since, attachments, includeDocs}
where, err := opts.changesWhere(&args)
if err != nil {
return nil, err
}

if feed == feedNormal {
h := md5.New()
_, _ = h.Write([]byte(summary))
c.etag = hex.EncodeToString(h.Sum(nil))
}

return c, nil
}

func (d *db) normalChangesQueryWithDocs(direction string) string {
return fmt.Sprintf(d.query(`
query := fmt.Sprintf(d.query(`
WITH results AS (
SELECT
id,
seq,
deleted,
rev,
rev_id,
doc
IIF($3, doc, NULL) AS doc
FROM {{ .Docs }}
WHERE ($1 IS NULL OR seq > $1)
ORDER BY seq
Expand All @@ -148,7 +113,7 @@ func (d *db) normalChangesQueryWithDocs(direction string) string {
COUNT(*) AS id,
NULL AS seq,
NULL AS deleted,
COUNT(*) || '.' || COALESCE(MIN(seq),0) || '.' || COALESCE(MAX(seq),0) AS rev,
COALESCE(MAX(seq),0) AS summary,
NULL AS doc,
NULL AS attachment_count,
NULL AS filename,
Expand Down Expand Up @@ -190,66 +155,46 @@ func (d *db) normalChangesQueryWithDocs(direction string) string {
att.rev_pos,
IIF($2, data, NULL) AS data
FROM results
LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON bridge.id = results.id AND bridge.rev = results.rev AND bridge.rev_id = results.rev_id
LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON bridge.id = results.id AND bridge.rev = results.rev AND bridge.rev_id = results.rev_id AND doc IS NOT NULL
LEFT JOIN {{ .Attachments }} AS att ON att.pk = bridge.pk
ORDER BY seq %s
%[2]s -- WHERE
ORDER BY seq %[1]s
)
`), direction)
}
`), descendingToDirection(descending), where)

func (d *db) normalChangesQueryWithoutDocs(direction string) string {
return fmt.Sprintf(d.query(`
WITH results AS (
SELECT
id,
seq,
deleted,
rev,
rev_id
FROM {{ .Docs }}
WHERE ($1 IS NULL OR seq > $1)
ORDER BY seq
)
SELECT
COUNT(*) AS id,
NULL AS seq,
NULL AS deleted,
COUNT(*) || '.' || COALESCE(MIN(seq),0) || '.' || COALESCE(MAX(seq),0) AS rev,
NULL AS doc,
NULL AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM results
if limit > 0 {
query += " LIMIT " + strconv.FormatUint(limit+1, 10)
}

UNION ALL
c.rows, err = d.db.QueryContext(ctx, query, args...) //nolint:rowserrcheck,sqlclosecheck // Err checked in Next
if err != nil {
return nil, err
}

SELECT
id,
seq,
deleted,
rev,
NULL AS doc,
0 AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM (
SELECT
results.id,
results.seq,
results.deleted,
results.rev || '-' || results.rev_id AS rev
FROM results
ORDER BY seq %s
)
`), direction)
// The first row is used to calculate the ETag; it's done as part of the
// same query, even though it's a bit ugly, to ensure it's all in the same
// implicit transaction.
if !c.rows.Next() {
// should never happen
return nil, errors.New("no rows returned")
}
var summary string
if err := c.rows.Scan(
&c.pending,
discard{}, discard{},
&summary,
discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{},
); err != nil {
return nil, err
}

if feed == feedNormal {
h := md5.New()
_, _ = h.Write([]byte(summary))
c.etag = hex.EncodeToString(h.Sum(nil))
}

return c, nil
}

func (c *normalChanges) Next(change *driver.Change) error {
Expand Down Expand Up @@ -377,6 +322,7 @@ func (d *db) Changes(ctx context.Context, options driver.Options) (driver.Change
type longpollChanges struct {
stmt *sql.Stmt
since uint64
includeDocs bool
attachments bool
lastSeq string
ctx context.Context
Expand All @@ -392,13 +338,6 @@ type longpollChange struct {
var _ driver.Changes = (*longpollChanges)(nil)

func (d *db) newLongpollChanges(ctx context.Context, includeDocs, attachments bool) (*longpollChanges, error) {
if includeDocs {
return d.newLongpollChangesWithDocs(ctx, attachments)
}
return d.newLongpollChangesWithoutDocs(ctx)
}

func (d *db) newLongpollChangesWithDocs(ctx context.Context, attachments bool) (*longpollChanges, error) {
since, err := d.lastSeq(ctx)
if err != nil {
return nil, err
Expand All @@ -423,7 +362,7 @@ func (d *db) newLongpollChangesWithDocs(ctx context.Context, attachments bool) (
doc.seq,
doc.deleted,
doc.rev || '-' || doc.rev_id AS rev,
doc.doc,
doc,
att.filename,
att.content_type,
att.length,
Expand All @@ -438,13 +377,13 @@ func (d *db) newLongpollChangesWithDocs(ctx context.Context, attachments bool) (
deleted,
rev,
rev_id,
doc
IIF($3, doc, NULL) AS doc
FROM {{ .Docs }}
WHERE seq > $1
ORDER BY seq
LIMIT 1
) AS doc
LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON bridge.id = doc.id AND bridge.rev = doc.rev AND bridge.rev_id = doc.rev_id
LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON bridge.id = doc.id AND bridge.rev = doc.rev AND bridge.rev_id = doc.rev_id AND doc IS NOT NULL
LEFT JOIN {{ .Attachments }} AS att ON att.pk = bridge.pk
)
`))
Expand All @@ -458,6 +397,7 @@ func (d *db) newLongpollChangesWithDocs(ctx context.Context, attachments bool) (
stmt: stmt,
since: since,
attachments: attachments,
includeDocs: includeDocs,
ctx: ctx,
cancel: cancel,
changes: changes,
Expand All @@ -468,58 +408,6 @@ func (d *db) newLongpollChangesWithDocs(ctx context.Context, attachments bool) (
return c, nil
}

func (d *db) newLongpollChangesWithoutDocs(ctx context.Context) (*longpollChanges, error) {
since, err := d.lastSeq(ctx)
if err != nil {
return nil, err
}

stmt, err := d.db.PrepareContext(ctx, d.query(`
SELECT
doc.id,
doc.seq,
doc.deleted,
doc.rev || '-' || doc.rev_id AS rev,
NULL AS doc,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM (
SELECT
id,
seq,
deleted,
rev,
rev_id,
doc
FROM {{ .Docs }}
WHERE seq > $1
ORDER BY seq
LIMIT 1
) AS doc
`))
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(ctx)
changes := make(chan longpollChange)
c := &longpollChanges{
stmt: stmt,
since: since,
ctx: ctx,
cancel: cancel,
changes: changes,
}

go c.watch(changes)

return c, nil
}

// watch runs in a loop until either the context is cancelled, or a change is
// detected.
func (c *longpollChanges) watch(changes chan<- longpollChange) {
Expand All @@ -531,7 +419,7 @@ func (c *longpollChanges) watch(changes chan<- longpollChange) {
bo.MaxElapsedTime = 0

err := backoff.Retry(func() error {
rows, err := c.stmt.QueryContext(c.ctx, c.since, c.attachments)
rows, err := c.stmt.QueryContext(c.ctx, c.since, c.attachments, c.includeDocs)
if err != nil {
return backoff.Permanent(err)
}
Expand Down
Loading

0 comments on commit 0a64d9d

Please sign in to comment.