Skip to content

Commit

Permalink
Extract doc/rev reading loop to its own function
Browse files Browse the repository at this point in the history
  • Loading branch information
flimzy committed Oct 12, 2023
1 parent 30632da commit 7ca58d4
Showing 1 changed file with 26 additions and 19 deletions.
45 changes: 26 additions & 19 deletions replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,30 +348,37 @@ func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, result
if !ok {
return nil
}
for _, rev := range rd.Missing {
atomic.AddInt32(&r.missingChecks, 1)
d, err := readDoc(ctx, r.source, rd.ID, rev)
r.callback(ReplicationEvent{
Type: eventDocument,
Read: true,
DocID: rd.ID,
Error: err,
})
if err != nil {
return fmt.Errorf("read doc %s: %w", rd.ID, err)
}
atomic.AddInt32(&r.reads, 1)
atomic.AddInt32(&r.missingFound, 1)
select {
case <-ctx.Done():
return ctx.Err()
case results <- d:
}
if err := r.readDoc(ctx, rd.ID, rd.Missing, results); err != nil {
return err
}
}
}
}

func (r *replicator) readDoc(ctx context.Context, id string, revs []string, results chan<- *document) error {
for _, rev := range revs {
atomic.AddInt32(&r.missingChecks, 1)
d, err := readDoc(ctx, r.source, id, rev)
r.callback(ReplicationEvent{
Type: eventDocument,
Read: true,
DocID: id,
Error: err,
})
if err != nil {
return fmt.Errorf("read doc %s: %w", id, err)
}
atomic.AddInt32(&r.reads, 1)
atomic.AddInt32(&r.missingFound, 1)
select {
case <-ctx.Done():
return ctx.Err()
case results <- d:
}
}
return nil
}

func readDoc(ctx context.Context, db *DB, docID, rev string) (*document, error) {
doc := new(document)
row := db.Get(ctx, docID, Params(map[string]interface{}{
Expand Down

0 comments on commit 7ca58d4

Please sign in to comment.