From 9457518337ae8452d622c7ec17d3bf8eb590053b Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 11 Sep 2023 11:19:20 -0700 Subject: [PATCH] switch to tracking dirty refs in separate table --- carstore/bs.go | 78 +++++++++++++++++++++++++++++++++---------- carstore/repo_test.go | 7 ++-- 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/carstore/bs.go b/carstore/bs.go index 01d79f4d9..10219575e 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -54,6 +54,10 @@ func NewCarStore(meta *gorm.DB, root string) (*CarStore, error) { if err := meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil { return nil, err } + if err := meta.AutoMigrate(&staleRef{}); err != nil { + return nil, err + } + return &CarStore{ meta: meta, rootDir: root, @@ -83,10 +87,15 @@ type blockRef struct { Cid models.DbCID `gorm:"index"` Shard uint `gorm:"index"` Offset int64 - Dirty bool //User uint `gorm:"index"` } +type staleRef struct { + ID uint `gorm:"primarykey"` + Cid models.DbCID `gorm:"index"` + Usr models.Uid +} + type userView struct { cs *CarStore user models.Uid @@ -678,13 +687,15 @@ func (cs *CarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[s } if len(rmcids) > 0 { - var torm []models.DbCID + var torm []staleRef for c := range rmcids { - torm = append(torm, models.DbCID{c}) + torm = append(torm, staleRef{ + Cid: models.DbCID{c}, + Usr: shard.Usr, + }) } - subq := cs.meta.Model(&blockRef{}).Joins("left join car_shards cs on cs.id = block_refs.shard").Where("cid in (?) AND usr = ?", torm, shard.Usr).Select("block_refs.id") - if err := tx.Model(&blockRef{}).Where("id in (?)", subq).UpdateColumn("dirty", true).Error; err != nil { + if err := tx.Create(torm).Error; err != nil { return err } } @@ -997,7 +1008,7 @@ func shouldCompact(s shardStat) bool { return false } -func aggrRefs(brefs []blockRef) []shardStat { +func aggrRefs(brefs []blockRef, staleCids map[cid.Cid]bool) []shardStat { byId := make(map[uint]*shardStat) for _, br := range brefs { @@ -1010,7 +1021,7 @@ func aggrRefs(brefs []blockRef) []shardStat { } s.Total++ - if br.Dirty { + if staleCids[br.Cid.CID] { s.Dirty++ } @@ -1122,16 +1133,19 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro return err } - cset := make(map[cid.Cid]bool) + var staleRefs []staleRef + if err := cs.meta.Debug().Find(&staleRefs, "usr = ?", user).Error; err != nil { + return err + } + + stale := make(map[cid.Cid]bool) var hasDirtyDupes bool - for _, br := range brefs { - if br.Dirty { - if cset[br.Cid.CID] { - hasDirtyDupes = true - break - } - cset[br.Cid.CID] = true + for _, br := range staleRefs { + if stale[br.Cid.CID] { + hasDirtyDupes = true + break } + stale[br.Cid.CID] = true } if hasDirtyDupes { @@ -1149,12 +1163,12 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro keep := make(map[cid.Cid]bool) for _, br := range brefs { - if !br.Dirty { + if !stale[br.Cid.CID] { keep[br.Cid.CID] = true } } - results := aggrRefs(brefs) + results := aggrRefs(brefs, stale) thresholdForPosition := func(i int) int { // TODO: calculate some curve here so earlier shards end up with more @@ -1184,12 +1198,14 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro compactionQueue = append(compactionQueue, cur) } + removedShards := make(map[uint]bool) for _, b := range compactionQueue { if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil { return err } for _, s := range b.shards { + removedShards[s.ID] = true sh, ok := shardsById[s.ID] if !ok { return fmt.Errorf("missing shard to delete") @@ -1201,6 +1217,34 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro } } + // now we need to delete the staleRefs we successfully cleaned up + // we can delete a staleRef if all the shards that have blockRefs with matching stale refs were processed + + brByCid := make(map[cid.Cid][]blockRef) + for _, br := range brefs { + brByCid[br.Cid.CID] = append(brByCid[br.Cid.CID], br) + } + + var staleToDelete []uint + for _, sr := range staleRefs { + brs := brByCid[sr.Cid.CID] + del := true + for _, br := range brs { + if !removedShards[br.Shard] { + del = false + break + } + } + + if del { + staleToDelete = append(staleToDelete, sr.ID) + } + } + + if err := cs.meta.Delete(&staleRef{}, "id in (?)", staleToDelete).Error; err != nil { + return err + } + return nil } diff --git a/carstore/repo_test.go b/carstore/repo_test.go index bcd90439c..c07447af2 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -177,8 +177,8 @@ func TestRepeatedCompactions(t *testing.T) { var recs []cid.Cid head := ncid - for loop := 0; loop < 30; loop++ { - for i := 0; i < 20; i++ { + for loop := 0; loop < 50; loop++ { + for i := 0; i < 100; i++ { ds, err := cs.NewDeltaSession(ctx, 1, &rev) if err != nil { t.Fatal(err) @@ -325,6 +325,9 @@ func BenchmarkRepoWritesCarstore(b *testing.B) { } rev = nrev + if err := ds.CalcDiff(ctx, nroot); err != nil { + b.Fatal(err) + } if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { b.Fatal(err)