From b73541a95e9be558bb5aae41f2e73611f6f24455 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 21:24:08 +0200 Subject: [PATCH 01/12] Document replication steps with links to the protocol docs --- replicate.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/replicate.go b/replicate.go index 0ab5f0846..822106567 100644 --- a/replicate.go +++ b/replicate.go @@ -226,6 +226,9 @@ type change struct { Changes []string } +// readChanges reads the changes feed. +// +// https://docs.couchdb.org/en/stable/replication/protocol.html#listen-to-changes-feed func readChanges(ctx context.Context, db *DB, results chan<- *change, options Option, cb eventCallback) error { changes := db.Changes(ctx, options, Param("feed", "normal"), Param("style", "all_docs")) cb(ReplicationEvent{ @@ -270,6 +273,9 @@ type revDiff struct { const rdBatchSize = 10 +// readDiffs reads the diffs for the reported changes. +// +// https://docs.couchdb.org/en/stable/replication/protocol.html#calculate-revision-difference func readDiffs(ctx context.Context, db *DB, ch <-chan *change, results chan<- *revDiff, cb eventCallback) error { for { revMap := map[string][]string{} @@ -338,6 +344,10 @@ func readDiffs(ctx context.Context, db *DB, ch <-chan *change, results chan<- *r } } +// readDocs reads the document revisions that have changed between source and +// target. +// +// https://docs.couchdb.org/en/stable/replication/protocol.html#fetch-changed-documents func readDocs(ctx context.Context, db *DB, diffs <-chan *revDiff, results chan<- *document, result *resultWrapper, cb eventCallback) error { for { var rd *revDiff @@ -432,6 +442,9 @@ func readDoc(ctx context.Context, db *DB, docID, rev string) (*document, error) return doc, nil } +// storeDocs updates the changed documents. +// +// https://docs.couchdb.org/en/stable/replication/protocol.html#upload-batch-of-changed-documents func storeDocs(ctx context.Context, db *DB, docs <-chan *document, result *resultWrapper, cb eventCallback) error { for doc := range docs { _, err := db.Put(ctx, doc.ID, doc, Param("new_edits", false)) From d8a8b274be7aa0d3038ec051cd3c93fd56cc8e90 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 21:27:26 +0200 Subject: [PATCH 02/12] Make replication steps methods of the new replicator type --- replicate.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/replicate.go b/replicate.go index 822106567..2de574f76 100644 --- a/replicate.go +++ b/replicate.go @@ -168,8 +168,10 @@ func Replicate(ctx context.Context, target, source *DB, options ...Option) (*Rep allOptions(options).Apply(repOpts) cb := repOpts.callback() + r := &replicator{} + if repOpts.copySecurity { - if err := copySecurity(ctx, target, source, cb); err != nil { + if err := r.copySecurity(ctx, target, source, cb); err != nil { return result.ReplicationResult, err } } @@ -177,29 +179,32 @@ func Replicate(ctx context.Context, target, source *DB, options ...Option) (*Rep changes := make(chan *change) group.Go(func() error { defer close(changes) - return readChanges(ctx, source, changes, allOptions(options), cb) + return r.readChanges(ctx, source, changes, allOptions(options), cb) }) diffs := make(chan *revDiff) group.Go(func() error { defer close(diffs) - return readDiffs(ctx, target, changes, diffs, cb) + return r.readDiffs(ctx, target, changes, diffs, cb) }) docs := make(chan *document) group.Go(func() error { defer close(docs) - return readDocs(ctx, source, diffs, docs, result, cb) + return r.readDocs(ctx, source, diffs, docs, result, cb) }) group.Go(func() error { - return storeDocs(ctx, target, docs, result, cb) + return r.storeDocs(ctx, target, docs, result, cb) }) return result.ReplicationResult, group.Wait() } -func copySecurity(ctx context.Context, target, source *DB, cb eventCallback) error { +// replicator manages a single replication. +type replicator struct{} + +func (*replicator) copySecurity(ctx context.Context, target, source *DB, cb eventCallback) error { sec, err := source.Security(ctx) cb(ReplicationEvent{ Type: eventSecurity, @@ -229,7 +234,7 @@ type change struct { // readChanges reads the changes feed. // // https://docs.couchdb.org/en/stable/replication/protocol.html#listen-to-changes-feed -func readChanges(ctx context.Context, db *DB, results chan<- *change, options Option, cb eventCallback) error { +func (*replicator) readChanges(ctx context.Context, db *DB, results chan<- *change, options Option, cb eventCallback) error { changes := db.Changes(ctx, options, Param("feed", "normal"), Param("style", "all_docs")) cb(ReplicationEvent{ Type: eventChanges, @@ -276,7 +281,7 @@ const rdBatchSize = 10 // readDiffs reads the diffs for the reported changes. // // https://docs.couchdb.org/en/stable/replication/protocol.html#calculate-revision-difference -func readDiffs(ctx context.Context, db *DB, ch <-chan *change, results chan<- *revDiff, cb eventCallback) error { +func (*replicator) readDiffs(ctx context.Context, db *DB, ch <-chan *change, results chan<- *revDiff, cb eventCallback) error { for { revMap := map[string][]string{} var change *change @@ -348,7 +353,7 @@ func readDiffs(ctx context.Context, db *DB, ch <-chan *change, results chan<- *r // target. // // https://docs.couchdb.org/en/stable/replication/protocol.html#fetch-changed-documents -func readDocs(ctx context.Context, db *DB, diffs <-chan *revDiff, results chan<- *document, result *resultWrapper, cb eventCallback) error { +func (*replicator) readDocs(ctx context.Context, db *DB, diffs <-chan *revDiff, results chan<- *document, result *resultWrapper, cb eventCallback) error { for { var rd *revDiff var ok bool @@ -445,7 +450,7 @@ func readDoc(ctx context.Context, db *DB, docID, rev string) (*document, error) // storeDocs updates the changed documents. // // https://docs.couchdb.org/en/stable/replication/protocol.html#upload-batch-of-changed-documents -func storeDocs(ctx context.Context, db *DB, docs <-chan *document, result *resultWrapper, cb eventCallback) error { +func (*replicator) storeDocs(ctx context.Context, db *DB, docs <-chan *document, result *resultWrapper, cb eventCallback) error { for doc := range docs { _, err := db.Put(ctx, doc.ID, doc, Param("new_edits", false)) cb(ReplicationEvent{ From 034ff31568d62742aa3c3609b0effae0a3d046ef Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 21:34:53 +0200 Subject: [PATCH 03/12] Move source/target into replicator struct --- replicate.go | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/replicate.go b/replicate.go index 2de574f76..4b973062e 100644 --- a/replicate.go +++ b/replicate.go @@ -168,10 +168,13 @@ func Replicate(ctx context.Context, target, source *DB, options ...Option) (*Rep allOptions(options).Apply(repOpts) cb := repOpts.callback() - r := &replicator{} + r := &replicator{ + target: target, + source: source, + } if repOpts.copySecurity { - if err := r.copySecurity(ctx, target, source, cb); err != nil { + if err := r.copySecurity(ctx, cb); err != nil { return result.ReplicationResult, err } } @@ -179,33 +182,35 @@ func Replicate(ctx context.Context, target, source *DB, options ...Option) (*Rep changes := make(chan *change) group.Go(func() error { defer close(changes) - return r.readChanges(ctx, source, changes, allOptions(options), cb) + return r.readChanges(ctx, changes, allOptions(options), cb) }) diffs := make(chan *revDiff) group.Go(func() error { defer close(diffs) - return r.readDiffs(ctx, target, changes, diffs, cb) + return r.readDiffs(ctx, changes, diffs, cb) }) docs := make(chan *document) group.Go(func() error { defer close(docs) - return r.readDocs(ctx, source, diffs, docs, result, cb) + return r.readDocs(ctx, diffs, docs, result, cb) }) group.Go(func() error { - return r.storeDocs(ctx, target, docs, result, cb) + return r.storeDocs(ctx, docs, result, cb) }) return result.ReplicationResult, group.Wait() } // replicator manages a single replication. -type replicator struct{} +type replicator struct { + target, source *DB +} -func (*replicator) copySecurity(ctx context.Context, target, source *DB, cb eventCallback) error { - sec, err := source.Security(ctx) +func (r *replicator) copySecurity(ctx context.Context, cb eventCallback) error { + sec, err := r.source.Security(ctx) cb(ReplicationEvent{ Type: eventSecurity, Read: true, @@ -214,7 +219,7 @@ func (*replicator) copySecurity(ctx context.Context, target, source *DB, cb even if err != nil { return fmt.Errorf("read security: %w", err) } - err = target.SetSecurity(ctx, sec) + err = r.target.SetSecurity(ctx, sec) cb(ReplicationEvent{ Type: eventSecurity, Read: false, @@ -234,8 +239,8 @@ type change struct { // readChanges reads the changes feed. // // https://docs.couchdb.org/en/stable/replication/protocol.html#listen-to-changes-feed -func (*replicator) readChanges(ctx context.Context, db *DB, results chan<- *change, options Option, cb eventCallback) error { - changes := db.Changes(ctx, options, Param("feed", "normal"), Param("style", "all_docs")) +func (r *replicator) readChanges(ctx context.Context, results chan<- *change, options Option, cb eventCallback) error { + changes := r.source.Changes(ctx, options, Param("feed", "normal"), Param("style", "all_docs")) cb(ReplicationEvent{ Type: eventChanges, Read: true, @@ -281,7 +286,7 @@ const rdBatchSize = 10 // readDiffs reads the diffs for the reported changes. // // https://docs.couchdb.org/en/stable/replication/protocol.html#calculate-revision-difference -func (*replicator) readDiffs(ctx context.Context, db *DB, ch <-chan *change, results chan<- *revDiff, cb eventCallback) error { +func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results chan<- *revDiff, cb eventCallback) error { for { revMap := map[string][]string{} var change *change @@ -305,7 +310,7 @@ func (*replicator) readDiffs(ctx context.Context, db *DB, ch <-chan *change, res if len(revMap) == 0 { return nil } - diffs := db.RevsDiff(ctx, revMap) + diffs := r.target.RevsDiff(ctx, revMap) err := diffs.Err() cb(ReplicationEvent{ Type: eventRevsDiff, @@ -353,7 +358,7 @@ func (*replicator) readDiffs(ctx context.Context, db *DB, ch <-chan *change, res // target. // // https://docs.couchdb.org/en/stable/replication/protocol.html#fetch-changed-documents -func (*replicator) readDocs(ctx context.Context, db *DB, diffs <-chan *revDiff, results chan<- *document, result *resultWrapper, cb eventCallback) error { +func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, results chan<- *document, result *resultWrapper, cb eventCallback) error { for { var rd *revDiff var ok bool @@ -366,7 +371,7 @@ func (*replicator) readDocs(ctx context.Context, db *DB, diffs <-chan *revDiff, } for _, rev := range rd.Missing { result.missingChecked() - d, err := readDoc(ctx, db, rd.ID, rev) + d, err := readDoc(ctx, r.source, rd.ID, rev) cb(ReplicationEvent{ Type: eventDocument, Read: true, @@ -450,9 +455,9 @@ func readDoc(ctx context.Context, db *DB, docID, rev string) (*document, error) // storeDocs updates the changed documents. // // https://docs.couchdb.org/en/stable/replication/protocol.html#upload-batch-of-changed-documents -func (*replicator) storeDocs(ctx context.Context, db *DB, docs <-chan *document, result *resultWrapper, cb eventCallback) error { +func (r *replicator) storeDocs(ctx context.Context, docs <-chan *document, result *resultWrapper, cb eventCallback) error { for doc := range docs { - _, err := db.Put(ctx, doc.ID, doc, Param("new_edits", false)) + _, err := r.target.Put(ctx, doc.ID, doc, Param("new_edits", false)) cb(ReplicationEvent{ Type: "document", Read: false, From 26db85fe2b3c6cd82be455415b5bec22a11e1955 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 21:46:17 +0200 Subject: [PATCH 04/12] add a constructor for the replicator type --- replicate.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/replicate.go b/replicate.go index 4b973062e..0a274f63a 100644 --- a/replicate.go +++ b/replicate.go @@ -156,6 +156,8 @@ func ReplicateCopySecurity() Option { // filter (string) - The name of a filter function. // doc_ids (array of string) - Array of document IDs to be synchronized. func Replicate(ctx context.Context, target, source *DB, options ...Option) (*ReplicationResult, error) { + r := newReplicator(target, source) + result := &resultWrapper{ ReplicationResult: &ReplicationResult{ StartTime: time.Now(), @@ -168,11 +170,6 @@ func Replicate(ctx context.Context, target, source *DB, options ...Option) (*Rep allOptions(options).Apply(repOpts) cb := repOpts.callback() - r := &replicator{ - target: target, - source: source, - } - if repOpts.copySecurity { if err := r.copySecurity(ctx, cb); err != nil { return result.ReplicationResult, err @@ -209,6 +206,13 @@ type replicator struct { target, source *DB } +func newReplicator(target, source *DB) *replicator { + return &replicator{ + target: target, + source: source, + } +} + func (r *replicator) copySecurity(ctx context.Context, cb eventCallback) error { sec, err := r.source.Security(ctx) cb(ReplicationEvent{ From 3d6e63a694a6489b8ea53082753928eed1fae401 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 21:57:09 +0200 Subject: [PATCH 05/12] Rework replication stats calculation --- replicate.go | 87 +++++++++++++++++++--------------------------------- 1 file changed, 32 insertions(+), 55 deletions(-) diff --git a/replicate.go b/replicate.go index 0a274f63a..04bbe44b6 100644 --- a/replicate.go +++ b/replicate.go @@ -18,7 +18,7 @@ import ( "context" "fmt" "io" - "sync" + "sync/atomic" "time" "golang.org/x/sync/errgroup" @@ -35,41 +35,6 @@ type ReplicationResult struct { StartTime time.Time `json:"start_time"` } -type resultWrapper struct { - *ReplicationResult - mu sync.Mutex -} - -func (r *resultWrapper) read() { - r.mu.Lock() - r.DocsRead++ - r.mu.Unlock() -} - -func (r *resultWrapper) missingChecked() { - r.mu.Lock() - r.MissingChecked++ - r.mu.Unlock() -} - -func (r *resultWrapper) missingFound() { - r.mu.Lock() - r.MissingFound++ - r.mu.Unlock() -} - -func (r *resultWrapper) writeError() { - r.mu.Lock() - r.DocWriteFailures++ - r.mu.Unlock() -} - -func (r *resultWrapper) write() { - r.mu.Lock() - r.DocsWritten++ - r.mu.Unlock() -} - const ( eventSecurity = "security" eventChanges = "changes" @@ -157,22 +122,18 @@ func ReplicateCopySecurity() Option { // doc_ids (array of string) - Array of document IDs to be synchronized. func Replicate(ctx context.Context, target, source *DB, options ...Option) (*ReplicationResult, error) { r := newReplicator(target, source) + err := r.replicate(ctx, options...) + return r.result(), err +} - result := &resultWrapper{ - ReplicationResult: &ReplicationResult{ - StartTime: time.Now(), - }, - } - defer func() { - result.EndTime = time.Now() - }() +func (r *replicator) replicate(ctx context.Context, options ...Option) error { repOpts := &replicateOptions{} allOptions(options).Apply(repOpts) cb := repOpts.callback() if repOpts.copySecurity { if err := r.copySecurity(ctx, cb); err != nil { - return result.ReplicationResult, err + return err } } group, ctx := errgroup.WithContext(ctx) @@ -191,25 +152,41 @@ func Replicate(ctx context.Context, target, source *DB, options ...Option) (*Rep docs := make(chan *document) group.Go(func() error { defer close(docs) - return r.readDocs(ctx, diffs, docs, result, cb) + return r.readDocs(ctx, diffs, docs, cb) }) group.Go(func() error { - return r.storeDocs(ctx, docs, result, cb) + return r.storeDocs(ctx, docs, cb) }) - return result.ReplicationResult, group.Wait() + return group.Wait() } // replicator manages a single replication. type replicator struct { target, source *DB + start time.Time + // replication stats counters + writeFailures, reads, writes, missingChecks, missingFound int32 } func newReplicator(target, source *DB) *replicator { return &replicator{ target: target, source: source, + start: time.Now(), + } +} + +func (r *replicator) result() *ReplicationResult { + return &ReplicationResult{ + StartTime: r.start, + EndTime: time.Now(), + DocWriteFailures: int(r.writeFailures), + DocsRead: int(r.reads), + DocsWritten: int(r.writes), + MissingChecked: int(r.missingChecks), + MissingFound: int(r.missingFound), } } @@ -362,7 +339,7 @@ func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results c // target. // // https://docs.couchdb.org/en/stable/replication/protocol.html#fetch-changed-documents -func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, results chan<- *document, result *resultWrapper, cb eventCallback) error { +func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, results chan<- *document, cb eventCallback) error { for { var rd *revDiff var ok bool @@ -374,7 +351,7 @@ func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, result return nil } for _, rev := range rd.Missing { - result.missingChecked() + atomic.AddInt32(&r.missingChecks, 1) d, err := readDoc(ctx, r.source, rd.ID, rev) cb(ReplicationEvent{ Type: eventDocument, @@ -385,8 +362,8 @@ func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, result if err != nil { return fmt.Errorf("read doc %s: %w", rd.ID, err) } - result.read() - result.missingFound() + atomic.AddInt32(&r.reads, 1) + atomic.AddInt32(&r.missingFound, 1) select { case <-ctx.Done(): return ctx.Err() @@ -459,7 +436,7 @@ func readDoc(ctx context.Context, db *DB, docID, rev string) (*document, error) // storeDocs updates the changed documents. // // https://docs.couchdb.org/en/stable/replication/protocol.html#upload-batch-of-changed-documents -func (r *replicator) storeDocs(ctx context.Context, docs <-chan *document, result *resultWrapper, cb eventCallback) error { +func (r *replicator) storeDocs(ctx context.Context, docs <-chan *document, cb eventCallback) error { for doc := range docs { _, err := r.target.Put(ctx, doc.ID, doc, Param("new_edits", false)) cb(ReplicationEvent{ @@ -469,10 +446,10 @@ func (r *replicator) storeDocs(ctx context.Context, docs <-chan *document, resul Error: err, }) if err != nil { - result.writeError() + atomic.AddInt32(&r.writeFailures, 1) return fmt.Errorf("store doc %s: %w", doc.ID, err) } - result.write() + atomic.AddInt32(&r.writes, 1) } return nil } From 191fede25aa0addbe0c1aeaaa037a6f6caa99297 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 22:12:22 +0200 Subject: [PATCH 06/12] Make callback a field of replicator struct --- replicate.go | 45 +++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/replicate.go b/replicate.go index 04bbe44b6..104602fb6 100644 --- a/replicate.go +++ b/replicate.go @@ -129,10 +129,10 @@ func Replicate(ctx context.Context, target, source *DB, options ...Option) (*Rep func (r *replicator) replicate(ctx context.Context, options ...Option) error { repOpts := &replicateOptions{} allOptions(options).Apply(repOpts) - cb := repOpts.callback() + r.cb = repOpts.callback() if repOpts.copySecurity { - if err := r.copySecurity(ctx, cb); err != nil { + if err := r.copySecurity(ctx); err != nil { return err } } @@ -140,23 +140,23 @@ func (r *replicator) replicate(ctx context.Context, options ...Option) error { changes := make(chan *change) group.Go(func() error { defer close(changes) - return r.readChanges(ctx, changes, allOptions(options), cb) + return r.readChanges(ctx, changes, allOptions(options)) }) diffs := make(chan *revDiff) group.Go(func() error { defer close(diffs) - return r.readDiffs(ctx, changes, diffs, cb) + return r.readDiffs(ctx, changes, diffs) }) docs := make(chan *document) group.Go(func() error { defer close(docs) - return r.readDocs(ctx, diffs, docs, cb) + return r.readDocs(ctx, diffs, docs) }) group.Go(func() error { - return r.storeDocs(ctx, docs, cb) + return r.storeDocs(ctx, docs) }) return group.Wait() @@ -165,6 +165,7 @@ func (r *replicator) replicate(ctx context.Context, options ...Option) error { // replicator manages a single replication. type replicator struct { target, source *DB + cb eventCallback start time.Time // replication stats counters writeFailures, reads, writes, missingChecks, missingFound int32 @@ -190,9 +191,9 @@ func (r *replicator) result() *ReplicationResult { } } -func (r *replicator) copySecurity(ctx context.Context, cb eventCallback) error { +func (r *replicator) copySecurity(ctx context.Context) error { sec, err := r.source.Security(ctx) - cb(ReplicationEvent{ + r.cb(ReplicationEvent{ Type: eventSecurity, Read: true, Error: err, @@ -201,7 +202,7 @@ func (r *replicator) copySecurity(ctx context.Context, cb eventCallback) error { return fmt.Errorf("read security: %w", err) } err = r.target.SetSecurity(ctx, sec) - cb(ReplicationEvent{ + r.cb(ReplicationEvent{ Type: eventSecurity, Read: false, Error: err, @@ -220,9 +221,9 @@ type change struct { // readChanges reads the changes feed. // // https://docs.couchdb.org/en/stable/replication/protocol.html#listen-to-changes-feed -func (r *replicator) readChanges(ctx context.Context, results chan<- *change, options Option, cb eventCallback) error { +func (r *replicator) readChanges(ctx context.Context, results chan<- *change, options Option) error { changes := r.source.Changes(ctx, options, Param("feed", "normal"), Param("style", "all_docs")) - cb(ReplicationEvent{ + r.cb(ReplicationEvent{ Type: eventChanges, Read: true, }) @@ -233,7 +234,7 @@ func (r *replicator) readChanges(ctx context.Context, results chan<- *change, op ID: changes.ID(), Changes: changes.Changes(), } - cb(ReplicationEvent{ + r.cb(ReplicationEvent{ Type: eventChange, DocID: ch.ID, Read: true, @@ -246,7 +247,7 @@ func (r *replicator) readChanges(ctx context.Context, results chan<- *change, op } } if err := changes.Err(); err != nil { - cb(ReplicationEvent{ + r.cb(ReplicationEvent{ Type: eventChanges, Read: true, Error: err, @@ -267,7 +268,7 @@ const rdBatchSize = 10 // readDiffs reads the diffs for the reported changes. // // https://docs.couchdb.org/en/stable/replication/protocol.html#calculate-revision-difference -func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results chan<- *revDiff, cb eventCallback) error { +func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results chan<- *revDiff) error { for { revMap := map[string][]string{} var change *change @@ -293,7 +294,7 @@ func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results c } diffs := r.target.RevsDiff(ctx, revMap) err := diffs.Err() - cb(ReplicationEvent{ + r.cb(ReplicationEvent{ Type: eventRevsDiff, Read: true, Error: err, @@ -305,7 +306,7 @@ func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results c for diffs.Next() { var val revDiff if err := diffs.ScanValue(&val); err != nil { - cb(ReplicationEvent{ + r.cb(ReplicationEvent{ Type: eventRevsDiff, Read: true, Error: err, @@ -313,7 +314,7 @@ func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results c return err } val.ID, _ = diffs.ID() - cb(ReplicationEvent{ + r.cb(ReplicationEvent{ Type: eventRevsDiff, Read: true, DocID: val.ID, @@ -325,7 +326,7 @@ func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results c } } if err := diffs.Err(); err != nil { - cb(ReplicationEvent{ + r.cb(ReplicationEvent{ Type: eventRevsDiff, Read: true, Error: err, @@ -339,7 +340,7 @@ func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results c // target. // // https://docs.couchdb.org/en/stable/replication/protocol.html#fetch-changed-documents -func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, results chan<- *document, cb eventCallback) error { +func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, results chan<- *document) error { for { var rd *revDiff var ok bool @@ -353,7 +354,7 @@ func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, result for _, rev := range rd.Missing { atomic.AddInt32(&r.missingChecks, 1) d, err := readDoc(ctx, r.source, rd.ID, rev) - cb(ReplicationEvent{ + r.cb(ReplicationEvent{ Type: eventDocument, Read: true, DocID: rd.ID, @@ -436,10 +437,10 @@ func readDoc(ctx context.Context, db *DB, docID, rev string) (*document, error) // storeDocs updates the changed documents. // // https://docs.couchdb.org/en/stable/replication/protocol.html#upload-batch-of-changed-documents -func (r *replicator) storeDocs(ctx context.Context, docs <-chan *document, cb eventCallback) error { +func (r *replicator) storeDocs(ctx context.Context, docs <-chan *document) error { for doc := range docs { _, err := r.target.Put(ctx, doc.ID, doc, Param("new_edits", false)) - cb(ReplicationEvent{ + r.cb(ReplicationEvent{ Type: "document", Read: false, DocID: doc.ID, From 32d9badae3f06a2460408c20602d3bdc9eddbb38 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 22:17:02 +0200 Subject: [PATCH 07/12] Proxy callback through replicator.callback method --- replicate.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/replicate.go b/replicate.go index 104602fb6..806bb900a 100644 --- a/replicate.go +++ b/replicate.go @@ -179,6 +179,13 @@ func newReplicator(target, source *DB) *replicator { } } +func (r *replicator) callback(e ReplicationEvent) { + if r.cb == nil { + return + } + r.cb(e) +} + func (r *replicator) result() *ReplicationResult { return &ReplicationResult{ StartTime: r.start, @@ -193,7 +200,7 @@ func (r *replicator) result() *ReplicationResult { func (r *replicator) copySecurity(ctx context.Context) error { sec, err := r.source.Security(ctx) - r.cb(ReplicationEvent{ + r.callback(ReplicationEvent{ Type: eventSecurity, Read: true, Error: err, @@ -202,7 +209,7 @@ func (r *replicator) copySecurity(ctx context.Context) error { return fmt.Errorf("read security: %w", err) } err = r.target.SetSecurity(ctx, sec) - r.cb(ReplicationEvent{ + r.callback(ReplicationEvent{ Type: eventSecurity, Read: false, Error: err, @@ -223,7 +230,7 @@ type change struct { // https://docs.couchdb.org/en/stable/replication/protocol.html#listen-to-changes-feed func (r *replicator) readChanges(ctx context.Context, results chan<- *change, options Option) error { changes := r.source.Changes(ctx, options, Param("feed", "normal"), Param("style", "all_docs")) - r.cb(ReplicationEvent{ + r.callback(ReplicationEvent{ Type: eventChanges, Read: true, }) @@ -234,7 +241,7 @@ func (r *replicator) readChanges(ctx context.Context, results chan<- *change, op ID: changes.ID(), Changes: changes.Changes(), } - r.cb(ReplicationEvent{ + r.callback(ReplicationEvent{ Type: eventChange, DocID: ch.ID, Read: true, @@ -247,7 +254,7 @@ func (r *replicator) readChanges(ctx context.Context, results chan<- *change, op } } if err := changes.Err(); err != nil { - r.cb(ReplicationEvent{ + r.callback(ReplicationEvent{ Type: eventChanges, Read: true, Error: err, @@ -294,7 +301,7 @@ func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results c } diffs := r.target.RevsDiff(ctx, revMap) err := diffs.Err() - r.cb(ReplicationEvent{ + r.callback(ReplicationEvent{ Type: eventRevsDiff, Read: true, Error: err, @@ -306,7 +313,7 @@ func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results c for diffs.Next() { var val revDiff if err := diffs.ScanValue(&val); err != nil { - r.cb(ReplicationEvent{ + r.callback(ReplicationEvent{ Type: eventRevsDiff, Read: true, Error: err, @@ -314,7 +321,7 @@ func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results c return err } val.ID, _ = diffs.ID() - r.cb(ReplicationEvent{ + r.callback(ReplicationEvent{ Type: eventRevsDiff, Read: true, DocID: val.ID, @@ -326,7 +333,7 @@ func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results c } } if err := diffs.Err(); err != nil { - r.cb(ReplicationEvent{ + r.callback(ReplicationEvent{ Type: eventRevsDiff, Read: true, Error: err, @@ -354,7 +361,7 @@ func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, result for _, rev := range rd.Missing { atomic.AddInt32(&r.missingChecks, 1) d, err := readDoc(ctx, r.source, rd.ID, rev) - r.cb(ReplicationEvent{ + r.callback(ReplicationEvent{ Type: eventDocument, Read: true, DocID: rd.ID, @@ -440,7 +447,7 @@ func readDoc(ctx context.Context, db *DB, docID, rev string) (*document, error) func (r *replicator) storeDocs(ctx context.Context, docs <-chan *document) error { for doc := range docs { _, err := r.target.Put(ctx, doc.ID, doc, Param("new_edits", false)) - r.cb(ReplicationEvent{ + r.callback(ReplicationEvent{ Type: "document", Read: false, DocID: doc.ID, From 09bde2f3a561c459cbedc76b29b918d5b15cf8f3 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 22:17:54 +0200 Subject: [PATCH 08/12] apply callback option directly to replicator struct --- replicate.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/replicate.go b/replicate.go index 806bb900a..1ce5c2907 100644 --- a/replicate.go +++ b/replicate.go @@ -68,8 +68,8 @@ type ReplicationEvent struct { type eventCallback func(ReplicationEvent) func (c eventCallback) Apply(target interface{}) { - if ro, ok := target.(*replicateOptions); ok { - ro.cb = c + if r, ok := target.(*replicator); ok { + r.cb = c } } @@ -80,7 +80,6 @@ func ReplicateCallback(callback func(ReplicationEvent)) Option { } type replicateOptions struct { - cb eventCallback // CopySecurity indicates that the secuurity object should be read from // source, and copied to the target, before the replication. Use with // caution! The security object is not versioned, and will be @@ -88,13 +87,6 @@ type replicateOptions struct { copySecurity bool } -func (o replicateOptions) callback() eventCallback { - if o.cb != nil { - return o.cb - } - return func(ReplicationEvent) {} -} - type replicateCopySecurityOption struct{} func (r replicateCopySecurityOption) Apply(target interface{}) { @@ -127,9 +119,10 @@ func Replicate(ctx context.Context, target, source *DB, options ...Option) (*Rep } func (r *replicator) replicate(ctx context.Context, options ...Option) error { + opts := allOptions(options) repOpts := &replicateOptions{} - allOptions(options).Apply(repOpts) - r.cb = repOpts.callback() + opts.Apply(repOpts) + opts.Apply(r) if repOpts.copySecurity { if err := r.copySecurity(ctx); err != nil { From eb61c637c0be46f87ea42318c00b011c4c5562da Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 22:20:30 +0200 Subject: [PATCH 09/12] merge security option into replicator struct --- replicate.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/replicate.go b/replicate.go index 1ce5c2907..d98e7223d 100644 --- a/replicate.go +++ b/replicate.go @@ -79,19 +79,11 @@ func ReplicateCallback(callback func(ReplicationEvent)) Option { return eventCallback(callback) } -type replicateOptions struct { - // CopySecurity indicates that the secuurity object should be read from - // source, and copied to the target, before the replication. Use with - // caution! The security object is not versioned, and will be - // unconditionally overwritten! - copySecurity bool -} - type replicateCopySecurityOption struct{} func (r replicateCopySecurityOption) Apply(target interface{}) { - if ro, ok := target.(*replicateOptions); ok { - ro.copySecurity = true + if r, ok := target.(*replicator); ok { + r.withSecurity = true } } @@ -120,11 +112,9 @@ func Replicate(ctx context.Context, target, source *DB, options ...Option) (*Rep func (r *replicator) replicate(ctx context.Context, options ...Option) error { opts := allOptions(options) - repOpts := &replicateOptions{} - opts.Apply(repOpts) opts.Apply(r) - if repOpts.copySecurity { + if r.withSecurity { if err := r.copySecurity(ctx); err != nil { return err } @@ -159,7 +149,12 @@ func (r *replicator) replicate(ctx context.Context, options ...Option) error { type replicator struct { target, source *DB cb eventCallback - start time.Time + // withSecurity indicates that the secuurity object should be read from + // source, and copied to the target, before the replication. Use with + // caution! The security object is not versioned, and will be + // unconditionally overwritten! + withSecurity bool + start time.Time // replication stats counters writeFailures, reads, writes, missingChecks, missingFound int32 } From 30d08853d9a82cf954434e95bbb05fc4563197d5 Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 22:22:10 +0200 Subject: [PATCH 10/12] Do options parsing earlier --- replicate.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/replicate.go b/replicate.go index d98e7223d..d3e33c89d 100644 --- a/replicate.go +++ b/replicate.go @@ -105,15 +105,15 @@ func ReplicateCopySecurity() Option { // filter (string) - The name of a filter function. // doc_ids (array of string) - Array of document IDs to be synchronized. func Replicate(ctx context.Context, target, source *DB, options ...Option) (*ReplicationResult, error) { + opts := allOptions(options) + r := newReplicator(target, source) - err := r.replicate(ctx, options...) + opts.Apply(r) + err := r.replicate(ctx, opts) return r.result(), err } -func (r *replicator) replicate(ctx context.Context, options ...Option) error { - opts := allOptions(options) - opts.Apply(r) - +func (r *replicator) replicate(ctx context.Context, options Option) error { if r.withSecurity { if err := r.copySecurity(ctx); err != nil { return err @@ -123,7 +123,7 @@ func (r *replicator) replicate(ctx context.Context, options ...Option) error { changes := make(chan *change) group.Go(func() error { defer close(changes) - return r.readChanges(ctx, changes, allOptions(options)) + return r.readChanges(ctx, changes, options) }) diffs := make(chan *revDiff) From 30632daf489b5ad8526826eb6dde2094ee45d96f Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 22:22:55 +0200 Subject: [PATCH 11/12] Move withSecurity check into the copySecurity method --- replicate.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/replicate.go b/replicate.go index d3e33c89d..8cf895b90 100644 --- a/replicate.go +++ b/replicate.go @@ -114,11 +114,10 @@ func Replicate(ctx context.Context, target, source *DB, options ...Option) (*Rep } func (r *replicator) replicate(ctx context.Context, options Option) error { - if r.withSecurity { - if err := r.copySecurity(ctx); err != nil { - return err - } + if err := r.copySecurity(ctx); err != nil { + return err } + group, ctx := errgroup.WithContext(ctx) changes := make(chan *change) group.Go(func() error { @@ -187,6 +186,9 @@ func (r *replicator) result() *ReplicationResult { } func (r *replicator) copySecurity(ctx context.Context) error { + if !r.withSecurity { + return nil + } sec, err := r.source.Security(ctx) r.callback(ReplicationEvent{ Type: eventSecurity, From 7ca58d4cff3d8d1456a04008fb7c29264397518f Mon Sep 17 00:00:00 2001 From: Jonathan Hall Date: Thu, 12 Oct 2023 22:26:42 +0200 Subject: [PATCH 12/12] Extract doc/rev reading loop to its own function --- replicate.go | 45 ++++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/replicate.go b/replicate.go index 8cf895b90..a22097d9e 100644 --- a/replicate.go +++ b/replicate.go @@ -348,30 +348,37 @@ func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, result if !ok { return nil } - for _, rev := range rd.Missing { - atomic.AddInt32(&r.missingChecks, 1) - d, err := readDoc(ctx, r.source, rd.ID, rev) - r.callback(ReplicationEvent{ - Type: eventDocument, - Read: true, - DocID: rd.ID, - Error: err, - }) - if err != nil { - return fmt.Errorf("read doc %s: %w", rd.ID, err) - } - atomic.AddInt32(&r.reads, 1) - atomic.AddInt32(&r.missingFound, 1) - select { - case <-ctx.Done(): - return ctx.Err() - case results <- d: - } + if err := r.readDoc(ctx, rd.ID, rd.Missing, results); err != nil { + return err } } } } +func (r *replicator) readDoc(ctx context.Context, id string, revs []string, results chan<- *document) error { + for _, rev := range revs { + atomic.AddInt32(&r.missingChecks, 1) + d, err := readDoc(ctx, r.source, id, rev) + r.callback(ReplicationEvent{ + Type: eventDocument, + Read: true, + DocID: id, + Error: err, + }) + if err != nil { + return fmt.Errorf("read doc %s: %w", id, err) + } + atomic.AddInt32(&r.reads, 1) + atomic.AddInt32(&r.missingFound, 1) + select { + case <-ctx.Done(): + return ctx.Err() + case results <- d: + } + } + return nil +} + func readDoc(ctx context.Context, db *DB, docID, rev string) (*document, error) { doc := new(document) row := db.Get(ctx, docID, Params(map[string]interface{}{