From 7badf0e5493c4c9ef9876d6deb6db3947aa29a9c Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Sat, 16 Dec 2023 11:53:09 +0200 Subject: [PATCH] Prevent deadlocks with replicas (#524) --- replica.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/replica.go b/replica.go index 3d06a9eb..b8b3a9e5 100644 --- a/replica.go +++ b/replica.go @@ -176,7 +176,7 @@ func (r *Replica) Sync(ctx context.Context) (err error) { // the generation on the database has changed. if r.Pos().Generation != generation { // Create snapshot if no snapshots exist for generation. - snapshotN, err := r.snapshotN(generation) + snapshotN, err := r.snapshotN(ctx, generation) if err != nil { return err } else if snapshotN == 0 { @@ -237,6 +237,12 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { var g errgroup.Group g.Go(func() error { _, err := r.Client.WriteWALSegment(ctx, pos, pr) + + // Always close pipe reader to signal writers. + if e := pr.CloseWithError(err); err == nil { + return e + } + return err }) @@ -331,8 +337,8 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { } // snapshotN returns the number of snapshots for a generation. -func (r *Replica) snapshotN(generation string) (int, error) { - itr, err := r.Client.Snapshots(context.Background(), generation) +func (r *Replica) snapshotN(ctx context.Context, generation string) (int, error) { + itr, err := r.Client.Snapshots(ctx, generation) if err != nil { return 0, err }