-
Notifications
You must be signed in to change notification settings - Fork 43
/
changes.go
217 lines (194 loc) · 6.5 KB
/
changes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package kivik
import (
"context"
"encoding/json"
"errors"
"io"
"net/http"
"github.com/go-kivik/kivik/v4/driver"
internal "github.com/go-kivik/kivik/v4/int/errors"
)
// Changes is an iterator over the database changes feed.
type Changes struct {
*iter
changesi driver.Changes
}
type changesIterator struct {
driver.Changes
*ChangesMetadata
}
var _ iterator = &changesIterator{}
func (c *changesIterator) Next(i interface{}) error {
change := i.(*driver.Change)
change.ID = ""
change.Seq = ""
change.Deleted = false
change.Changes = change.Changes[:0]
change.Doc = change.Doc[:0]
err := c.Changes.Next(change)
if err == io.EOF || err == driver.EOQ {
c.ChangesMetadata = &ChangesMetadata{
LastSeq: c.Changes.LastSeq(),
Pending: c.Changes.Pending(),
}
}
return err
}
func newChanges(ctx context.Context, onClose func(), changesi driver.Changes) *Changes {
return &Changes{
iter: newIterator(ctx, onClose, &changesIterator{Changes: changesi}, &driver.Change{}),
changesi: changesi,
}
}
// Close closes the iterator, preventing further enumeration, and freeing any
// resources (such as the http request body) of the underlying feed. If
// [Changes.Next] is called and there are no further results, the iterator is
// closed automatically and it will suffice to check the result of
// [Changes.Err]. Close is idempotent and does not affect the result of
// [Changes.Err].
func (c *Changes) Close() error {
return c.iter.Close()
}
// Err returns the error, if any, that was encountered during iteration. Err may
// be called after an explicit or implicit [Changes.Close].
func (c *Changes) Err() error {
return c.iter.Err()
}
// Next prepares the next iterator result value for reading. It returns true on
// success, or false if there is no next result or an error occurs while
// preparing it. [Changes.Err] should be consulted to distinguish between the
// two.
func (c *Changes) Next() bool {
return c.iter.Next()
}
// Changes returns a list of changed revs.
func (c *Changes) Changes() []string {
return c.curVal.(*driver.Change).Changes
}
// Deleted returns true if the change relates to a deleted document.
func (c *Changes) Deleted() bool {
return c.curVal.(*driver.Change).Deleted
}
// ID returns the ID of the current result.
func (c *Changes) ID() string {
return c.curVal.(*driver.Change).ID
}
// ScanDoc copies the data from the result into dest. See [ResultSet.ScanValue]
// for additional details.
func (c *Changes) ScanDoc(dest interface{}) error {
err := c.isReady()
if err != nil {
return err
}
return json.Unmarshal(c.curVal.(*driver.Change).Doc, dest)
}
// Changes returns an iterator over the real-time [changes feed]. The feed remains
// open until explicitly closed, or an error is encountered.
//
// [changes feed]: http://couchdb.readthedocs.io/en/latest/api/database/changes.html#get--db-_changes
func (db *DB) Changes(ctx context.Context, options ...Option) *Changes {
if db.err != nil {
return &Changes{iter: errIterator(db.err)}
}
endQuery, err := db.startQuery()
if err != nil {
return &Changes{iter: errIterator(err)}
}
changesi, err := db.driverDB.Changes(ctx, multiOptions(options))
if err != nil {
endQuery()
return &Changes{iter: errIterator(err)}
}
return newChanges(ctx, endQuery, changesi)
}
// Seq returns the Seq of the current result.
func (c *Changes) Seq() string {
return c.curVal.(*driver.Change).Seq
}
// ChangesMetadata contains metadata about a changes feed.
type ChangesMetadata struct {
// LastSeq is the last update sequence id present in the change set, if
// returned by the server.
LastSeq string
// Pending is the count of remaining items in the change feed.
Pending int64
}
// Metadata returns the result metadata for the changes feed. It must be called
// after [Changes.Next] returns false or [Changes.Iterator] has been completely
// and successfully iterated. Otherwise it will return an error.
func (c *Changes) Metadata() (*ChangesMetadata, error) {
if c.iter == nil || (c.state != stateEOQ && c.state != stateClosed) {
return nil, &internal.Error{Status: http.StatusBadRequest, Err: errors.New("Metadata must not be called until result set iteration is complete")}
}
return c.feed.(*changesIterator).ChangesMetadata, nil
}
// ETag returns the unquoted ETag header, if any.
func (c *Changes) ETag() string {
if c.changesi == nil {
return ""
}
return c.changesi.ETag()
}
// Change represents a single change in the changes feed, as returned by
// [Changes.Iterator].
//
// !!NOTICE!! This struct is considered experimental, and may change without
// notice.
type Change struct {
// ID is the document ID to which the change relates.
ID string `json:"id"`
// Seq is the update sequence for the changes feed.
Seq string `json:"seq"`
// Deleted is set to true for the changes feed, if the document has been
// deleted.
Deleted bool `json:"deleted"`
// Changes represents a list of document leaf revisions for the /_changes
// endpoint.
Changes []string `json:"-"`
// Doc is the raw, un-decoded JSON document. This is only populated when
// include_docs=true is set.
doc json.RawMessage
}
// ScanDoc copies the data from the result into dest. See [Row.ScanValue]
// for additional details.
func (c *Change) ScanDoc(dest interface{}) error {
return json.Unmarshal(c.doc, dest)
}
// Iterator returns a function that can be used to iterate over the changes
// feed. This function works with Go 1.23's range functions, and is an
// alternative to using [Changes.Next] directly.
//
// !!NOTICE!! This function is considered experimental, and may change without
// notice.
func (c *Changes) Iterator() func(yield func(*Change, error) bool) {
return func(yield func(*Change, error) bool) {
for c.Next() {
dChange := c.curVal.(*driver.Change)
change := &Change{
ID: dChange.ID,
Seq: dChange.Seq,
Deleted: dChange.Deleted,
Changes: dChange.Changes,
doc: dChange.Doc,
}
if !yield(change, nil) {
_ = c.Close()
return
}
}
if err := c.Err(); err != nil {
yield(nil, err)
}
}
}