From 2e4caf4b7628127ba874241d1040d5952cf9902a Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 21 Nov 2023 13:21:37 -0600 Subject: [PATCH] if repo base has a 'prev', ignore block diff errors --- backfill/backfill.go | 6 ++++-- carstore/bs.go | 21 +++++++++------------ carstore/repo_test.go | 16 ++++++++-------- cmd/gosky/main.go | 2 +- repo/repo.go | 6 +++--- repomgr/repomgr.go | 40 +++++++++++++++++++++++++++++++--------- testing/sig_test.go | 2 +- 7 files changed, 57 insertions(+), 36 deletions(-) diff --git a/backfill/backfill.go b/backfill/backfill.go index 14d6741a9..a67861dcf 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -342,11 +342,13 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { // Producer routine go func() { defer close(recordQueue) - r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error { + if err := r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error { numRecords++ recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid} return nil - }) + }); err != nil { + log.Error("failed to iterated records in repo", "err", err) + } }() // Consumer routines diff --git a/carstore/bs.go b/carstore/bs.go index a992a2776..79f245362 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -861,7 +861,7 @@ func setToSlice(s map[cid.Cid]bool) []cid.Cid { return out } -func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block) (map[cid.Cid]bool, error) { +func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block, skipmissing bool) (map[cid.Cid]bool, error) { ctx, span := otel.Tracer("repo").Start(ctx, "BlockDiff") defer span.End() @@ -901,7 +901,11 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n oblk, err := bs.Get(ctx, c) if err != nil { - return nil, fmt.Errorf("get failed in old tree: %w", err) + if skipmissing && ipld.IsNotFound(err) { + log.Warnw("missing block in old tree", "root", oldroot, "missing", c) + } else { + return nil, fmt.Errorf("get failed in old tree: %w", err) + } } if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) { @@ -956,20 +960,13 @@ func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, since *stri } } - rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks) - if err != nil { - return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s,since=%v,rev=%s): %w", ds.baseCid, since, ds.lastRev, err) - } - - ds.rmcids = rmcids - return carr.Header.Roots[0], ds, nil } -func (ds *DeltaSession) CalcDiff(ctx context.Context, nroot cid.Cid) error { - rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks) +func (ds *DeltaSession) CalcDiff(ctx context.Context, skipmissing bool) error { + rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks, skipmissing) if err != nil { - return fmt.Errorf("block diff failed: %w", err) + return fmt.Errorf("block diff failed (base=%s,rev=%s): %w", ds.baseCid, ds.lastRev, err) } ds.rmcids = rmcids diff --git a/carstore/repo_test.go b/carstore/repo_test.go index 0c6e9b0ab..4566369cb 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -103,7 +103,7 @@ func TestBasicOperation(t *testing.T) { t.Fatal(err) } - rr, err := repo.OpenRepo(ctx, ds, head, true) + rr, err := repo.OpenRepo(ctx, ds, head) if err != nil { t.Fatal(err) } @@ -125,7 +125,7 @@ func TestBasicOperation(t *testing.T) { rev = nrev - if err := ds.CalcDiff(ctx, nroot); err != nil { + if err := ds.CalcDiff(ctx, false); err != nil { t.Fatal(err) } @@ -188,7 +188,7 @@ func TestRepeatedCompactions(t *testing.T) { t.Fatal(err) } - rr, err := repo.OpenRepo(ctx, ds, head, true) + rr, err := repo.OpenRepo(ctx, ds, head) if err != nil { t.Fatal(err) } @@ -217,7 +217,7 @@ func TestRepeatedCompactions(t *testing.T) { rev = nrev - if err := ds.CalcDiff(ctx, nroot); err != nil { + if err := ds.CalcDiff(ctx, false); err != nil { t.Fatal(err) } @@ -338,7 +338,7 @@ func BenchmarkRepoWritesCarstore(b *testing.B) { b.Fatal(err) } - rr, err := repo.OpenRepo(ctx, ds, head, true) + rr, err := repo.OpenRepo(ctx, ds, head) if err != nil { b.Fatal(err) } @@ -356,7 +356,7 @@ func BenchmarkRepoWritesCarstore(b *testing.B) { } rev = nrev - if err := ds.CalcDiff(ctx, nroot); err != nil { + if err := ds.CalcDiff(ctx, false); err != nil { b.Fatal(err) } @@ -386,7 +386,7 @@ func BenchmarkRepoWritesFlatfs(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - rr, err := repo.OpenRepo(ctx, bs, head, true) + rr, err := repo.OpenRepo(ctx, bs, head) if err != nil { b.Fatal(err) } @@ -424,7 +424,7 @@ func BenchmarkRepoWritesSqlite(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - rr, err := repo.OpenRepo(ctx, bs, head, true) + rr, err := repo.OpenRepo(ctx, bs, head) if err != nil { b.Fatal(err) } diff --git a/cmd/gosky/main.go b/cmd/gosky/main.go index bdc845bf3..a028d8e3f 100644 --- a/cmd/gosky/main.go +++ b/cmd/gosky/main.go @@ -363,7 +363,7 @@ func unpackRecords(blks []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) ([]any } } - r, err := repo.OpenRepo(ctx, bstore, carr.Header.Roots[0], false) + r, err := repo.OpenRepo(ctx, bstore, carr.Header.Roots[0]) if err != nil { return nil, err } diff --git a/repo/repo.go b/repo/repo.go index c982c1a47..c3df60f39 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -107,7 +107,7 @@ func ReadRepoFromCar(ctx context.Context, r io.Reader) (*Repo, error) { return nil, err } - return OpenRepo(ctx, bs, root, false) + return OpenRepo(ctx, bs, root) } func NewRepo(ctx context.Context, did string, bs blockstore.Blockstore) *Repo { @@ -128,7 +128,7 @@ func NewRepo(ctx context.Context, did string, bs blockstore.Blockstore) *Repo { } } -func OpenRepo(ctx context.Context, bs blockstore.Blockstore, root cid.Cid, fullRepo bool) (*Repo, error) { +func OpenRepo(ctx context.Context, bs blockstore.Blockstore, root cid.Cid) (*Repo, error) { cst := util.CborStore(bs) var sc SignedCommit @@ -363,7 +363,7 @@ func (r *Repo) DiffSince(ctx context.Context, oldrepo cid.Cid) ([]*mst.DiffOp, e var oldTree cid.Cid if oldrepo.Defined() { - otherRepo, err := OpenRepo(ctx, r.bs, oldrepo, true) + otherRepo, err := OpenRepo(ctx, r.bs, oldrepo) if err != nil { return nil, err } diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index 14e61f6bc..7e4594515 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -163,7 +163,7 @@ func (rm *RepoManager) CreateRecord(ctx context.Context, user models.Uid, collec head := ds.BaseCid() - r, err := repo.OpenRepo(ctx, ds, head, true) + r, err := repo.OpenRepo(ctx, ds, head) if err != nil { return "", cid.Undef, err } @@ -227,7 +227,7 @@ func (rm *RepoManager) UpdateRecord(ctx context.Context, user models.Uid, collec } head := ds.BaseCid() - r, err := repo.OpenRepo(ctx, ds, head, true) + r, err := repo.OpenRepo(ctx, ds, head) if err != nil { return cid.Undef, err } @@ -297,7 +297,7 @@ func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collec } head := ds.BaseCid() - r, err := repo.OpenRepo(ctx, ds, head, true) + r, err := repo.OpenRepo(ctx, ds, head) if err != nil { return err } @@ -432,7 +432,7 @@ func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collectio return cid.Undef, nil, err } - r, err := repo.OpenRepo(ctx, bs, head, true) + r, err := repo.OpenRepo(ctx, bs, head) if err != nil { return cid.Undef, nil, err } @@ -462,7 +462,7 @@ func (rm *RepoManager) GetRecordProof(ctx context.Context, user models.Uid, coll return cid.Undef, nil, err } - r, err := repo.OpenRepo(ctx, bs, head, true) + r, err := repo.OpenRepo(ctx, bs, head) if err != nil { return cid.Undef, nil, err } @@ -486,7 +486,7 @@ func (rm *RepoManager) GetProfile(ctx context.Context, uid models.Uid) (*bsky.Ac return nil, err } - r, err := repo.OpenRepo(ctx, bs, head, true) + r, err := repo.OpenRepo(ctx, bs, head) if err != nil { return nil, err } @@ -543,7 +543,7 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, return fmt.Errorf("importing external carslice: %w", err) } - r, err := repo.OpenRepo(ctx, ds, root, true) + r, err := repo.OpenRepo(ctx, ds, root) if err != nil { return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err) } @@ -552,6 +552,28 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, return err } + var badPrev bool + if ds.BaseCid().Defined() { + oldrepo, err := repo.OpenRepo(ctx, ds, ds.BaseCid()) + if err != nil { + return fmt.Errorf("failed to check data root in old repo: %w", err) + } + + // if the old commit has a 'prev', CalcDiff will error out while trying + // to walk it. This is an old repo thing that is being deprecated. + // This check is a temporary workaround until all repos get migrated + // and this becomes no longer an issue + prev, _ := oldrepo.PrevCommit(ctx) + if prev != nil { + badPrev = true + } + } + + if err := ds.CalcDiff(ctx, badPrev); err != nil { + return fmt.Errorf("failed while calculating mst diff (since=%v): %w", since, err) + + } + var evtops []RepoOp for _, op := range ops { @@ -650,7 +672,7 @@ func (rm *RepoManager) BatchWrite(ctx context.Context, user models.Uid, writes [ } head := ds.BaseCid() - r, err := repo.OpenRepo(ctx, ds, head, true) + r, err := repo.OpenRepo(ctx, ds, head) if err != nil { return err } @@ -781,7 +803,7 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD } err = rm.processNewRepo(ctx, user, r, rev, func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error { - r, err := repo.OpenRepo(ctx, bs, root, true) + r, err := repo.OpenRepo(ctx, bs, root) if err != nil { return fmt.Errorf("opening new repo: %w", err) } diff --git a/testing/sig_test.go b/testing/sig_test.go index d37c1df20..7dc0fb603 100644 --- a/testing/sig_test.go +++ b/testing/sig_test.go @@ -33,7 +33,7 @@ func TestVerification(t *testing.T) { t.Fatal(err) } - r, err := repo.OpenRepo(ctx, bs, c, true) + r, err := repo.OpenRepo(ctx, bs, c) if err != nil { t.Fatal(err) }