Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

if repo base has a 'prev', ignore block diff errors #446

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,13 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
// Producer routine
go func() {
defer close(recordQueue)
r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error {
if err := r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error {
numRecords++
recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid}
return nil
})
}); err != nil {
log.Error("failed to iterated records in repo", "err", err)
}
}()

// Consumer routines
Expand Down
19 changes: 8 additions & 11 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func setToSlice(s map[cid.Cid]bool) []cid.Cid {
return out
}

func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block) (map[cid.Cid]bool, error) {
func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block, skipcids map[cid.Cid]bool) (map[cid.Cid]bool, error) {
ctx, span := otel.Tracer("repo").Start(ctx, "BlockDiff")
defer span.End()

Expand Down Expand Up @@ -899,6 +899,10 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n
c := queue[0]
queue = queue[1:]

if skipcids != nil && skipcids[c] {
continue
}

oblk, err := bs.Get(ctx, c)
if err != nil {
return nil, fmt.Errorf("get failed in old tree: %w", err)
Expand Down Expand Up @@ -956,20 +960,13 @@ func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, since *stri
}
}

rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks)
if err != nil {
return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s,since=%v,rev=%s): %w", ds.baseCid, since, ds.lastRev, err)
}

ds.rmcids = rmcids

return carr.Header.Roots[0], ds, nil
}

func (ds *DeltaSession) CalcDiff(ctx context.Context, nroot cid.Cid) error {
rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks)
func (ds *DeltaSession) CalcDiff(ctx context.Context, skipcids map[cid.Cid]bool) error {
rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks, skipcids)
if err != nil {
return fmt.Errorf("block diff failed: %w", err)
return fmt.Errorf("block diff failed (base=%s,rev=%s): %w", ds.baseCid, ds.lastRev, err)
}

ds.rmcids = rmcids
Expand Down
16 changes: 8 additions & 8 deletions carstore/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestBasicOperation(t *testing.T) {
t.Fatal(err)
}

rr, err := repo.OpenRepo(ctx, ds, head, true)
rr, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
t.Fatal(err)
}
Expand All @@ -125,7 +125,7 @@ func TestBasicOperation(t *testing.T) {

rev = nrev

if err := ds.CalcDiff(ctx, nroot); err != nil {
if err := ds.CalcDiff(ctx, nil); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -188,7 +188,7 @@ func TestRepeatedCompactions(t *testing.T) {
t.Fatal(err)
}

rr, err := repo.OpenRepo(ctx, ds, head, true)
rr, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestRepeatedCompactions(t *testing.T) {

rev = nrev

if err := ds.CalcDiff(ctx, nroot); err != nil {
if err := ds.CalcDiff(ctx, nil); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -338,7 +338,7 @@ func BenchmarkRepoWritesCarstore(b *testing.B) {
b.Fatal(err)
}

rr, err := repo.OpenRepo(ctx, ds, head, true)
rr, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
b.Fatal(err)
}
Expand All @@ -356,7 +356,7 @@ func BenchmarkRepoWritesCarstore(b *testing.B) {
}

rev = nrev
if err := ds.CalcDiff(ctx, nroot); err != nil {
if err := ds.CalcDiff(ctx, nil); err != nil {
b.Fatal(err)
}

Expand Down Expand Up @@ -386,7 +386,7 @@ func BenchmarkRepoWritesFlatfs(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {

rr, err := repo.OpenRepo(ctx, bs, head, true)
rr, err := repo.OpenRepo(ctx, bs, head)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func BenchmarkRepoWritesSqlite(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {

rr, err := repo.OpenRepo(ctx, bs, head, true)
rr, err := repo.OpenRepo(ctx, bs, head)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/gosky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func unpackRecords(blks []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) ([]any
}
}

r, err := repo.OpenRepo(ctx, bstore, carr.Header.Roots[0], false)
r, err := repo.OpenRepo(ctx, bstore, carr.Header.Roots[0])
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func ReadRepoFromCar(ctx context.Context, r io.Reader) (*Repo, error) {
return nil, err
}

return OpenRepo(ctx, bs, root, false)
return OpenRepo(ctx, bs, root)
}

func NewRepo(ctx context.Context, did string, bs blockstore.Blockstore) *Repo {
Expand All @@ -128,7 +128,7 @@ func NewRepo(ctx context.Context, did string, bs blockstore.Blockstore) *Repo {
}
}

func OpenRepo(ctx context.Context, bs blockstore.Blockstore, root cid.Cid, fullRepo bool) (*Repo, error) {
func OpenRepo(ctx context.Context, bs blockstore.Blockstore, root cid.Cid) (*Repo, error) {
cst := util.CborStore(bs)

var sc SignedCommit
Expand Down Expand Up @@ -363,7 +363,7 @@ func (r *Repo) DiffSince(ctx context.Context, oldrepo cid.Cid) ([]*mst.DiffOp, e

var oldTree cid.Cid
if oldrepo.Defined() {
otherRepo, err := OpenRepo(ctx, r.bs, oldrepo, true)
otherRepo, err := OpenRepo(ctx, r.bs, oldrepo)
if err != nil {
return nil, err
}
Expand Down
42 changes: 33 additions & 9 deletions repomgr/repomgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (rm *RepoManager) CreateRecord(ctx context.Context, user models.Uid, collec

head := ds.BaseCid()

r, err := repo.OpenRepo(ctx, ds, head, true)
r, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
return "", cid.Undef, err
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func (rm *RepoManager) UpdateRecord(ctx context.Context, user models.Uid, collec
}

head := ds.BaseCid()
r, err := repo.OpenRepo(ctx, ds, head, true)
r, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
return cid.Undef, err
}
Expand Down Expand Up @@ -297,7 +297,7 @@ func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collec
}

head := ds.BaseCid()
r, err := repo.OpenRepo(ctx, ds, head, true)
r, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
return err
}
Expand Down Expand Up @@ -432,7 +432,7 @@ func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collectio
return cid.Undef, nil, err
}

r, err := repo.OpenRepo(ctx, bs, head, true)
r, err := repo.OpenRepo(ctx, bs, head)
if err != nil {
return cid.Undef, nil, err
}
Expand Down Expand Up @@ -462,7 +462,7 @@ func (rm *RepoManager) GetRecordProof(ctx context.Context, user models.Uid, coll
return cid.Undef, nil, err
}

r, err := repo.OpenRepo(ctx, bs, head, true)
r, err := repo.OpenRepo(ctx, bs, head)
if err != nil {
return cid.Undef, nil, err
}
Expand All @@ -486,7 +486,7 @@ func (rm *RepoManager) GetProfile(ctx context.Context, uid models.Uid) (*bsky.Ac
return nil, err
}

r, err := repo.OpenRepo(ctx, bs, head, true)
r, err := repo.OpenRepo(ctx, bs, head)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -543,7 +543,7 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint,
return fmt.Errorf("importing external carslice: %w", err)
}

r, err := repo.OpenRepo(ctx, ds, root, true)
r, err := repo.OpenRepo(ctx, ds, root)
if err != nil {
return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err)
}
Expand All @@ -552,6 +552,30 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint,
return err
}

var skipcids map[cid.Cid]bool
if ds.BaseCid().Defined() {
oldrepo, err := repo.OpenRepo(ctx, ds, ds.BaseCid())
if err != nil {
return fmt.Errorf("failed to check data root in old repo: %w", err)
}

// if the old commit has a 'prev', CalcDiff will error out while trying
// to walk it. This is an old repo thing that is being deprecated.
// This check is a temporary workaround until all repos get migrated
// and this becomes no longer an issue
prev, _ := oldrepo.PrevCommit(ctx)
if prev != nil {
skipcids = map[cid.Cid]bool{
*prev: true,
}
}
}

if err := ds.CalcDiff(ctx, skipcids); err != nil {
return fmt.Errorf("failed while calculating mst diff (since=%v): %w", since, err)

}

var evtops []RepoOp

for _, op := range ops {
Expand Down Expand Up @@ -650,7 +674,7 @@ func (rm *RepoManager) BatchWrite(ctx context.Context, user models.Uid, writes [
}

head := ds.BaseCid()
r, err := repo.OpenRepo(ctx, ds, head, true)
r, err := repo.OpenRepo(ctx, ds, head)
if err != nil {
return err
}
Expand Down Expand Up @@ -781,7 +805,7 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD
}

err = rm.processNewRepo(ctx, user, r, rev, func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error {
r, err := repo.OpenRepo(ctx, bs, root, true)
r, err := repo.OpenRepo(ctx, bs, root)
if err != nil {
return fmt.Errorf("opening new repo: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion testing/sig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestVerification(t *testing.T) {
t.Fatal(err)
}

r, err := repo.OpenRepo(ctx, bs, c, true)
r, err := repo.OpenRepo(ctx, bs, c)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading