Skip to content

Commit

Permalink
if repo base has a 'prev', ignore block diff errors
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Nov 21, 2023
1 parent 9e0e90f commit 2e4caf4
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 36 deletions.
6 changes: 4 additions & 2 deletions backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,13 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
// Producer routine
go func() {
defer close(recordQueue)
r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error {
if err := r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error {
numRecords++
recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid}
return nil
})
}); err != nil {
log.Error("failed to iterated records in repo", "err", err)
}
}()

// Consumer routines
Expand Down
21 changes: 9 additions & 12 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func setToSlice(s map[cid.Cid]bool) []cid.Cid {
return out
}

func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block) (map[cid.Cid]bool, error) {
func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block, skipmissing bool) (map[cid.Cid]bool, error) {
ctx, span := otel.Tracer("repo").Start(ctx, "BlockDiff")
defer span.End()

Expand Down Expand Up @@ -901,7 +901,11 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n

oblk, err := bs.Get(ctx, c)
if err != nil {
return nil, fmt.Errorf("get failed in old tree: %w", err)
if skipmissing && ipld.IsNotFound(err) {
log.Warnw("missing block in old tree", "root", oldroot, "missing", c)
} else {
return nil, fmt.Errorf("get failed in old tree: %w", err)
}
}

if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) {
Expand Down Expand Up @@ -956,20 +960,13 @@ func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, since *stri
}
}

rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks)
if err != nil {
return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s,since=%v,rev=%s): %w", ds.baseCid, since, ds.lastRev, err)
}

ds.rmcids = rmcids

return carr.Header.Roots[0], ds, nil
}

func (ds *DeltaSession) CalcDiff(ctx context.Context, nroot cid.Cid) error {
rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks)
func (ds *DeltaSession) CalcDiff(ctx context.Context, skipmissing bool) error {
rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks, skipmissing)
if err != nil {
return fmt.Errorf("block diff failed: %w", err)
return fmt.Errorf("block diff failed (base=%s,rev=%s): %w", ds.baseCid, ds.lastRev, err)
}

ds.rmcids = rmcids
Expand Down
16 changes: 8 additions & 8 deletions carstore/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestBasicOperation(t *testing.T) {
t.Fatal(err)
}

rr, err := repo.OpenRepo(ctx, ds, head, true)
rr, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
t.Fatal(err)
}
Expand All @@ -125,7 +125,7 @@ func TestBasicOperation(t *testing.T) {

rev = nrev

if err := ds.CalcDiff(ctx, nroot); err != nil {
if err := ds.CalcDiff(ctx, false); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -188,7 +188,7 @@ func TestRepeatedCompactions(t *testing.T) {
t.Fatal(err)
}

rr, err := repo.OpenRepo(ctx, ds, head, true)
rr, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestRepeatedCompactions(t *testing.T) {

rev = nrev

if err := ds.CalcDiff(ctx, nroot); err != nil {
if err := ds.CalcDiff(ctx, false); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -338,7 +338,7 @@ func BenchmarkRepoWritesCarstore(b *testing.B) {
b.Fatal(err)
}

rr, err := repo.OpenRepo(ctx, ds, head, true)
rr, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
b.Fatal(err)
}
Expand All @@ -356,7 +356,7 @@ func BenchmarkRepoWritesCarstore(b *testing.B) {
}

rev = nrev
if err := ds.CalcDiff(ctx, nroot); err != nil {
if err := ds.CalcDiff(ctx, false); err != nil {
b.Fatal(err)
}

Expand Down Expand Up @@ -386,7 +386,7 @@ func BenchmarkRepoWritesFlatfs(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {

rr, err := repo.OpenRepo(ctx, bs, head, true)
rr, err := repo.OpenRepo(ctx, bs, head)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func BenchmarkRepoWritesSqlite(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {

rr, err := repo.OpenRepo(ctx, bs, head, true)
rr, err := repo.OpenRepo(ctx, bs, head)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/gosky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func unpackRecords(blks []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) ([]any
}
}

r, err := repo.OpenRepo(ctx, bstore, carr.Header.Roots[0], false)
r, err := repo.OpenRepo(ctx, bstore, carr.Header.Roots[0])
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func ReadRepoFromCar(ctx context.Context, r io.Reader) (*Repo, error) {
return nil, err
}

return OpenRepo(ctx, bs, root, false)
return OpenRepo(ctx, bs, root)
}

func NewRepo(ctx context.Context, did string, bs blockstore.Blockstore) *Repo {
Expand All @@ -128,7 +128,7 @@ func NewRepo(ctx context.Context, did string, bs blockstore.Blockstore) *Repo {
}
}

func OpenRepo(ctx context.Context, bs blockstore.Blockstore, root cid.Cid, fullRepo bool) (*Repo, error) {
func OpenRepo(ctx context.Context, bs blockstore.Blockstore, root cid.Cid) (*Repo, error) {
cst := util.CborStore(bs)

var sc SignedCommit
Expand Down Expand Up @@ -363,7 +363,7 @@ func (r *Repo) DiffSince(ctx context.Context, oldrepo cid.Cid) ([]*mst.DiffOp, e

var oldTree cid.Cid
if oldrepo.Defined() {
otherRepo, err := OpenRepo(ctx, r.bs, oldrepo, true)
otherRepo, err := OpenRepo(ctx, r.bs, oldrepo)
if err != nil {
return nil, err
}
Expand Down
40 changes: 31 additions & 9 deletions repomgr/repomgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (rm *RepoManager) CreateRecord(ctx context.Context, user models.Uid, collec

head := ds.BaseCid()

r, err := repo.OpenRepo(ctx, ds, head, true)
r, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
return "", cid.Undef, err
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func (rm *RepoManager) UpdateRecord(ctx context.Context, user models.Uid, collec
}

head := ds.BaseCid()
r, err := repo.OpenRepo(ctx, ds, head, true)
r, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
return cid.Undef, err
}
Expand Down Expand Up @@ -297,7 +297,7 @@ func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collec
}

head := ds.BaseCid()
r, err := repo.OpenRepo(ctx, ds, head, true)
r, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
return err
}
Expand Down Expand Up @@ -432,7 +432,7 @@ func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collectio
return cid.Undef, nil, err
}

r, err := repo.OpenRepo(ctx, bs, head, true)
r, err := repo.OpenRepo(ctx, bs, head)
if err != nil {
return cid.Undef, nil, err
}
Expand Down Expand Up @@ -462,7 +462,7 @@ func (rm *RepoManager) GetRecordProof(ctx context.Context, user models.Uid, coll
return cid.Undef, nil, err
}

r, err := repo.OpenRepo(ctx, bs, head, true)
r, err := repo.OpenRepo(ctx, bs, head)
if err != nil {
return cid.Undef, nil, err
}
Expand All @@ -486,7 +486,7 @@ func (rm *RepoManager) GetProfile(ctx context.Context, uid models.Uid) (*bsky.Ac
return nil, err
}

r, err := repo.OpenRepo(ctx, bs, head, true)
r, err := repo.OpenRepo(ctx, bs, head)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -543,7 +543,7 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint,
return fmt.Errorf("importing external carslice: %w", err)
}

r, err := repo.OpenRepo(ctx, ds, root, true)
r, err := repo.OpenRepo(ctx, ds, root)
if err != nil {
return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err)
}
Expand All @@ -552,6 +552,28 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint,
return err
}

var badPrev bool
if ds.BaseCid().Defined() {
oldrepo, err := repo.OpenRepo(ctx, ds, ds.BaseCid())
if err != nil {
return fmt.Errorf("failed to check data root in old repo: %w", err)
}

// if the old commit has a 'prev', CalcDiff will error out while trying
// to walk it. This is an old repo thing that is being deprecated.
// This check is a temporary workaround until all repos get migrated
// and this becomes no longer an issue
prev, _ := oldrepo.PrevCommit(ctx)
if prev != nil {
badPrev = true
}
}

if err := ds.CalcDiff(ctx, badPrev); err != nil {
return fmt.Errorf("failed while calculating mst diff (since=%v): %w", since, err)

}

var evtops []RepoOp

for _, op := range ops {
Expand Down Expand Up @@ -650,7 +672,7 @@ func (rm *RepoManager) BatchWrite(ctx context.Context, user models.Uid, writes [
}

head := ds.BaseCid()
r, err := repo.OpenRepo(ctx, ds, head, true)
r, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
return err
}
Expand Down Expand Up @@ -781,7 +803,7 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD
}

err = rm.processNewRepo(ctx, user, r, rev, func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error {
r, err := repo.OpenRepo(ctx, bs, root, true)
r, err := repo.OpenRepo(ctx, bs, root)
if err != nil {
return fmt.Errorf("opening new repo: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion testing/sig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestVerification(t *testing.T) {
t.Fatal(err)
}

r, err := repo.OpenRepo(ctx, bs, c, true)
r, err := repo.OpenRepo(ctx, bs, c)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 2e4caf4

Please sign in to comment.