diff --git a/replicate.go b/replicate.go index 8cf895b90..a22097d9e 100644 --- a/replicate.go +++ b/replicate.go @@ -348,30 +348,37 @@ func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, result if !ok { return nil } - for _, rev := range rd.Missing { - atomic.AddInt32(&r.missingChecks, 1) - d, err := readDoc(ctx, r.source, rd.ID, rev) - r.callback(ReplicationEvent{ - Type: eventDocument, - Read: true, - DocID: rd.ID, - Error: err, - }) - if err != nil { - return fmt.Errorf("read doc %s: %w", rd.ID, err) - } - atomic.AddInt32(&r.reads, 1) - atomic.AddInt32(&r.missingFound, 1) - select { - case <-ctx.Done(): - return ctx.Err() - case results <- d: - } + if err := r.readDoc(ctx, rd.ID, rd.Missing, results); err != nil { + return err } } } } +func (r *replicator) readDoc(ctx context.Context, id string, revs []string, results chan<- *document) error { + for _, rev := range revs { + atomic.AddInt32(&r.missingChecks, 1) + d, err := readDoc(ctx, r.source, id, rev) + r.callback(ReplicationEvent{ + Type: eventDocument, + Read: true, + DocID: id, + Error: err, + }) + if err != nil { + return fmt.Errorf("read doc %s: %w", id, err) + } + atomic.AddInt32(&r.reads, 1) + atomic.AddInt32(&r.missingFound, 1) + select { + case <-ctx.Done(): + return ctx.Err() + case results <- d: + } + } + return nil +} + func readDoc(ctx context.Context, db *DB, docID, rev string) (*document, error) { doc := new(document) row := db.Get(ctx, docID, Params(map[string]interface{}{