Skip to content

Commit

Permalink
Merge pull request #985 from go-kivik/AllDocsAttachments
Browse files Browse the repository at this point in the history
All docs attachments
  • Loading branch information
flimzy authored May 27, 2024
2 parents ffb8060 + 21c6714 commit 0345bdf
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 67 deletions.
53 changes: 45 additions & 8 deletions x/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,14 @@ func (d *db) performQuery(
reduce.reduce_func,
IIF($4, last_seq, "") AS update_seq,
NULL,
NULL
NULL,
0 AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM {{ .Design }} AS map
JOIN reduce
WHERE id = $5
Expand All @@ -154,7 +161,14 @@ func (d *db) performQuery(
value AS value,
NULL AS rev,
NULL AS doc,
NULL AS conflicts
NULL AS conflicts,
0 AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM {{ .Map }} AS view
JOIN reduce
WHERE reduce.reducible AND ($3 IS NULL OR $3 == TRUE)
Expand All @@ -171,7 +185,14 @@ func (d *db) performQuery(
view.value,
IIF($1, docs.rev || '-' || docs.rev_id, "") AS rev,
IIF($1, docs.doc, NULL) AS doc,
IIF($2, GROUP_CONCAT(conflicts.rev || '-' || conflicts.rev_id, ','), NULL) AS conflicts
IIF($2, GROUP_CONCAT(conflicts.rev || '-' || conflicts.rev_id, ','), NULL) AS conflicts,
0 AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM {{ .Map }} AS view
JOIN reduce
JOIN {{ .Docs }} AS docs ON view.id = docs.id AND view.rev = docs.rev AND view.rev_id = docs.rev_id
Expand Down Expand Up @@ -248,7 +269,14 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi
reduce.reduce_func,
NULL,
NULL,
NULL
NULL,
0 AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM {{ .Design }} AS map
JOIN reduce
WHERE id = $1
Expand All @@ -267,7 +295,14 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi
value AS value,
NULL AS rev,
NULL AS doc,
NULL AS conflicts
NULL AS conflicts,
0 AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM {{ .Map }}
JOIN reduce
WHERE reduce.reducible AND ($6 IS NULL OR $6 == TRUE)
Expand Down Expand Up @@ -296,7 +331,10 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi
break
}
var upToDate bool
if err := results.Scan(&upToDate, &reducible, &reduceFuncJS, discard{}, discard{}, discard{}); err != nil {
if err := results.Scan(
&upToDate, &reducible, &reduceFuncJS, discard{}, discard{}, discard{},
discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{},
); err != nil {
return nil, err
}
if !reducible {
Expand Down Expand Up @@ -374,7 +412,7 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision
CASE WHEN row_number = 1 THEN rev END AS rev,
CASE WHEN row_number = 1 THEN doc END AS doc,
CASE WHEN row_number = 1 THEN deleted END AS deleted,
COALESCE(attachment_count,0) AS attachment_count,
COALESCE(attachment_count, 0) AS attachment_count,
filename,
content_type,
length,
Expand Down Expand Up @@ -413,7 +451,6 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision
rev AS rev,
rev_id AS rev_id,
IIF($1, doc, NULL) AS doc,
deleted AS deleted, -- TODO:remove this?
ROW_NUMBER() OVER (PARTITION BY id ORDER BY rev DESC, rev_id DESC) AS rank
FROM leaves
) AS rev
Expand Down
5 changes: 4 additions & 1 deletion x/sqlite/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ func (r *reduceRowIter) Next() (*reduceRow, error) {
return nil, io.EOF
}
var row reduceRow
if err := r.results.Scan(&row.ID, &row.Key, &row.Value, discard{}, discard{}, discard{}); err != nil {
if err := r.results.Scan(
&row.ID, &row.Key, &row.Value, discard{}, discard{}, discard{},
discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{},
); err != nil {
return nil, err
}
return &row, nil
Expand Down
195 changes: 139 additions & 56 deletions x/sqlite/views.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -77,7 +78,7 @@ func (d *db) queryBuiltinView(
ctx context.Context,
vopts *viewOptions,
) (driver.Rows, error) {
args := []interface{}{vopts.includeDocs, vopts.conflicts, vopts.updateSeq}
args := []interface{}{vopts.includeDocs, vopts.conflicts, vopts.updateSeq, vopts.attachments}

where := append([]string{""}, vopts.buildWhere(&args)...)

Expand All @@ -88,37 +89,76 @@ func (d *db) queryBuiltinView(
NULL AS reduce_func,
IIF($3, MAX(seq), "") AS update_seq,
NULL,
NULL
NULL,
NULL AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM {{ .Docs }}
UNION ALL
SELECT *
SELECT
CASE WHEN row_number = 1 THEN id END AS id,
CASE WHEN row_number = 1 THEN key END AS key,
CASE WHEN row_number = 1 THEN value END AS value,
CASE WHEN row_number = 1 THEN rev END AS rev,
CASE WHEN row_number = 1 THEN doc END AS doc,
CASE WHEN row_number = 1 THEN conflicts END AS conflicts,
COALESCE(attachment_count, 0) AS attachment_count,
filename,
content_type,
length,
digest,
rev_pos,
data
FROM (
SELECT
view.id AS id,
view.key AS key,
view.id,
view.key,
'{"value":{"rev":"' || view.rev || '-' || view.rev_id || '"}}' AS value,
view.rev || '-' || view.rev_id AS rev,
view.doc AS doc,
IIF($2, GROUP_CONCAT(conflicts.rev || '-' || conflicts.rev_id, ','), NULL) AS conflicts
view.doc,
view.conflicts,
SUM(CASE WHEN bridge.pk IS NOT NULL THEN 1 ELSE 0 END) OVER (PARTITION BY view.id, view.rev, view.rev_id) AS attachment_count,
ROW_NUMBER() OVER (PARTITION BY view.id, view.rev, view.rev_id) AS row_number,
att.filename AS filename,
att.content_type AS content_type,
att.length AS length,
att.digest AS digest,
att.rev_pos AS rev_pos,
IIF($4, att.data, NULL) AS data
FROM (
SELECT
id AS id,
rev AS rev,
rev_id AS rev_id,
key AS key,
IIF($1, doc, NULL) AS doc,
deleted AS deleted, -- TODO:remove this?
ROW_NUMBER() OVER (PARTITION BY id ORDER BY rev DESC, rev_id DESC) AS rank
FROM leaves
view.id AS id,
view.key AS key,
view.rev AS rev,
view.rev_id AS rev_id,
view.doc AS doc,
IIF($2, GROUP_CONCAT(conflicts.rev || '-' || conflicts.rev_id, ','), NULL) AS conflicts
FROM (
SELECT
id AS id,
rev AS rev,
rev_id AS rev_id,
key AS key,
IIF($1, doc, NULL) AS doc,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY rev DESC, rev_id DESC) AS rank
FROM leaves
) AS view
LEFT JOIN leaves AS conflicts ON conflicts.id = view.id AND NOT (view.rev = conflicts.rev AND view.rev_id = conflicts.rev_id)
WHERE view.rank = 1
%[2]s -- WHERE
GROUP BY view.id, view.rev, view.rev_id
%[1]s -- ORDER BY
LIMIT %[3]d OFFSET %[4]d
) AS view
LEFT JOIN leaves AS conflicts ON conflicts.id = view.id AND NOT (view.rev = conflicts.rev AND view.rev_id = conflicts.rev_id)
WHERE view.rank = 1
%[2]s
GROUP BY view.id, view.rev, view.rev_id
%[1]s
LIMIT %[3]d OFFSET %[4]d
LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON view.id = bridge.id AND view.rev = bridge.rev AND view.rev_id = bridge.rev_id
LEFT JOIN {{ .Attachments }} AS att ON bridge.pk = att.pk
%[1]s -- ORDER BY
)
`), vopts.buildOrderBy(), strings.Join(where, " AND "), vopts.limit, vopts.skip)
results, err := d.db.QueryContext(ctx, query, args...) //nolint:rowserrcheck // Err checked in Next
Expand Down Expand Up @@ -155,7 +195,10 @@ func readFirstRow(results *sql.Rows, vopts *viewOptions) (*viewMetadata, error)
return nil, errors.New("no rows returned")
}
var meta viewMetadata
if err := results.Scan(&meta.upToDate, &meta.reducible, &meta.reduceFuncJS, &meta.updateSeq, discard{}, discard{}); err != nil {
if err := results.Scan(
&meta.upToDate, &meta.reducible, &meta.reduceFuncJS, &meta.updateSeq, discard{}, discard{},
discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{},
); err != nil {
_ = results.Close() //nolint:sqlclosecheck // Aborting
return nil, err
}
Expand Down Expand Up @@ -183,44 +226,84 @@ type rows struct {
var _ driver.Rows = (*rows)(nil)

func (r *rows) Next(row *driver.Row) error {
if !r.rows.Next() {
if err := r.rows.Err(); err != nil {
return err
}
return io.EOF
}
var (
id *string
key, doc []byte
value *[]byte
conflicts *string
rev string
attachmentsCount int
full *fullDoc
)
if err := r.rows.Scan(&id, &key, &value, &rev, &doc, &conflicts); err != nil {
return err
}
if id != nil {
row.ID = *id
}
row.Key = key
if len(key) == 0 {
row.Key = []byte("null")
}
if value == nil {
row.Value = strings.NewReader("null")
} else {
row.Value = bytes.NewReader(*value)
}
if doc != nil {
toMerge := fullDoc{
ID: row.ID,
Rev: rev,
Doc: doc,
for {
if !r.rows.Next() {
if err := r.rows.Err(); err != nil {
return err
}
return io.EOF
}
var (
key, doc []byte
value, data *[]byte
id, conflicts, rowRev, filename, contentType *string
length *int64
revPos *int
digest *md5sum
)
if err := r.rows.Scan(
&id, &key, &value, &rowRev, &doc, &conflicts,
&attachmentsCount,
&filename, &contentType, &length, &digest, &revPos, &data,
); err != nil {
return err
}
if conflicts != nil {
toMerge.Conflicts = strings.Split(*conflicts, ",")
if rowRev != nil {
// If rowRev is populated, it means we're on the first row for the
// document. Otherwise, we're on an attachment-only row.
if id != nil {
row.ID = *id
}
row.Key = key
if len(key) == 0 {
row.Key = []byte("null")
}
if value == nil {
row.Value = strings.NewReader("null")
} else {
row.Value = bytes.NewReader(*value)
}
if doc != nil {
full = &fullDoc{
ID: row.ID,
Rev: *rowRev,
Doc: doc,
}
if conflicts != nil {
full.Conflicts = strings.Split(*conflicts, ",")
}
}
}
row.Doc = toMerge.toReader()
if filename != nil {
if full.Attachments == nil {
full.Attachments = make(map[string]*attachment)
}
var jsonData json.RawMessage
if data != nil {
var err error
jsonData, err = json.Marshal(*data)
if err != nil {
return err
}
}
full.Attachments[*filename] = &attachment{
ContentType: *contentType,
Length: *length,
Digest: *digest,
RevPos: *revPos,
Data: jsonData,
}
}
if attachmentsCount == 0 || attachmentsCount == len(full.Attachments) {
break
}
}
if full != nil {
row.Doc = full.toReader()
}
return nil
}
Expand Down
Loading

0 comments on commit 0345bdf

Please sign in to comment.