From c08362e39f94e914a6a76c13047cea8fc331c1ea Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 21 Sep 2023 14:00:08 -0700 Subject: [PATCH 1/4] improved compaction, minimize shard count while minimizing IO --- carstore/bs.go | 140 ++++++++++++++++++++++++++++++++---------- carstore/repo_test.go | 8 ++- 2 files changed, 114 insertions(+), 34 deletions(-) diff --git a/carstore/bs.go b/carstore/bs.go index 86d3cfaaa..b534eb74f 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -367,6 +367,11 @@ func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev s return err } + var allshards []CarShard + if err := cs.meta.Order("seq desc").Where("usr = ?", user).Find(&allshards).Error; err != nil { + return err + } + if !incremental && earlySeq > 0 { // have to do it the ugly way return fmt.Errorf("nyi") @@ -1015,8 +1020,8 @@ func (cs *CarStore) deleteShards(ctx context.Context, shs []*CarShard) error { type shardStat struct { ID uint - Seq int Dirty int + Seq int Total int refs []blockRef @@ -1045,14 +1050,15 @@ func shouldCompact(s shardStat) bool { return false } -func aggrRefs(brefs []blockRef, staleCids map[cid.Cid]bool) []shardStat { +func aggrRefs(brefs []blockRef, shards map[uint]CarShard, staleCids map[cid.Cid]bool) []shardStat { byId := make(map[uint]*shardStat) for _, br := range brefs { s, ok := byId[br.Shard] if !ok { s = &shardStat{ - ID: br.Shard, + ID: br.Shard, + Seq: shards[br.Shard].Seq, } byId[br.Shard] = s } @@ -1071,7 +1077,7 @@ func aggrRefs(brefs []blockRef, staleCids map[cid.Cid]bool) []shardStat { } sort.Slice(out, func(i, j int) bool { - return out[i].ID < out[j].ID + return out[i].Seq < out[j].Seq }) return out @@ -1081,6 +1087,29 @@ type compBucket struct { shards []shardStat cleanBlocks int + expSize int +} + +func (cb *compBucket) shouldCompact() bool { + if len(cb.shards) == 0 { + return false + } + + if len(cb.shards) > 5 { + return true + } + + var frac float64 + for _, s := range cb.shards { + frac += s.dirtyFrac() + } + frac /= float64(len(cb.shards)) + + if len(cb.shards) > 3 && frac > 0.2 { + + } + + return frac > 0.4 } func (cb *compBucket) addShardStat(ss shardStat) { @@ -1144,7 +1173,15 @@ func (cs *CarStore) GetCompactionTargets(ctx context.Context) ([]CompactionTarge return targets, nil } -func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) error { +type CompactionStats struct { + StartShards int + NewShards int + SkippedShards int + ShardsDeleted int + RefsDeleted int +} + +func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*CompactionStats, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards") defer span.End() @@ -1152,7 +1189,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro var shards []CarShard if err := cs.meta.WithContext(ctx).Find(&shards, "usr = ?", user).Error; err != nil { - return err + return nil, err } var shardIds []uint @@ -1166,13 +1203,13 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro } var brefs []blockRef - if err := cs.meta.WithContext(ctx).Raw(`select * from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil { - return err + if err := cs.meta.WithContext(ctx).Raw(`select shard, cid from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil { + return nil, err } var staleRefs []staleRef if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil { - return err + return nil, err } stale := make(map[cid.Cid]bool) @@ -1195,7 +1232,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro // focus on compacting everything else. it leaves *some* dirty blocks // still around but we're doing that anyways since compaction isnt a // perfect process - return fmt.Errorf("WIP: not currently handling this case") + return nil, fmt.Errorf("WIP: not currently handling this case") } keep := make(map[cid.Cid]bool) @@ -1205,65 +1242,104 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro } } - results := aggrRefs(brefs, stale) + results := aggrRefs(brefs, shardsById, stale) + var sum int + for _, r := range results { + sum += r.Total + } + + lowBound := 20 + N := 10 + // we want to *aim* for N shards per user + // the last several should be left small to allow easy loading from disk + // for updates (since recent blocks are most likely needed for edits) + // the beginning of the list should be some sort of exponential fall-off + // with the area under the curve targeted by the total number of blocks we + // have + var threshs []int + tot := len(brefs) + for i := 0; i < N; i++ { + v := tot / 2 + if v < lowBound { + v = lowBound + } + tot = tot / 2 + threshs = append(threshs, v) + } thresholdForPosition := func(i int) int { - // TODO: calculate some curve here so earlier shards end up with more - // blocks and recent shards end up with less - return 50 + if i >= len(threshs) { + return 5 + } + + return threshs[i] } cur := new(compBucket) + cur.expSize = thresholdForPosition(0) var compactionQueue []*compBucket for i, r := range results { - if shouldCompact(r) { - if cur.cleanBlocks > thresholdForPosition(i) { - compactionQueue = append(compactionQueue, cur) - cur = new(compBucket) - } + cur.addShardStat(r) - cur.addShardStat(r) - } else { - if !cur.isEmpty() { - compactionQueue = append(compactionQueue, cur) - cur = new(compBucket) + if cur.cleanBlocks > cur.expSize || i > len(results)-3 { + compactionQueue = append(compactionQueue, cur) + cur = &compBucket{ + expSize: thresholdForPosition(len(compactionQueue)), } } } - if !cur.isEmpty() { compactionQueue = append(compactionQueue, cur) } + stats := &CompactionStats{ + StartShards: len(shards), + } + removedShards := make(map[uint]bool) for _, b := range compactionQueue { + if !b.shouldCompact() { + stats.SkippedShards += len(b.shards) + continue + } + if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil { - return err + return nil, err } + stats.NewShards++ + var todelete []*CarShard for _, s := range b.shards { removedShards[s.ID] = true sh, ok := shardsById[s.ID] if !ok { - return fmt.Errorf("missing shard to delete") + return nil, fmt.Errorf("missing shard to delete") } todelete = append(todelete, &sh) } + stats.ShardsDeleted += len(todelete) if err := cs.deleteShards(ctx, todelete); err != nil { - return fmt.Errorf("deleting shards: %w", err) + return nil, fmt.Errorf("deleting shards: %w", err) } } // now we need to delete the staleRefs we successfully cleaned up // we can delete a staleRef if all the shards that have blockRefs with matching stale refs were processed - return cs.deleteStaleRefs(ctx, brefs, staleRefs, removedShards) + num, err := cs.deleteStaleRefs(ctx, brefs, staleRefs, removedShards) + if err != nil { + return nil, err + } + + stats.RefsDeleted = num + + return stats, nil } -func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error { +func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) (int, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs") defer span.End() @@ -1296,11 +1372,11 @@ func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, stale } if err := cs.meta.Delete(&staleRef{}, "id in (?)", sl).Error; err != nil { - return err + return 0, err } } - return nil + return len(staleToDelete), nil } func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error { diff --git a/carstore/repo_test.go b/carstore/repo_test.go index c07447af2..5ce81ece7 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -140,7 +140,7 @@ func TestBasicOperation(t *testing.T) { } checkRepo(t, buf, recs) - if err := cs.CompactUserShards(ctx, 1); err != nil { + if _, err := cs.CompactUserShards(ctx, 1); err != nil { t.Fatal(err) } @@ -217,10 +217,13 @@ func TestRepeatedCompactions(t *testing.T) { head = nroot } fmt.Println("Run compaction", loop) - if err := cs.CompactUserShards(ctx, 1); err != nil { + st, err := cs.CompactUserShards(ctx, 1) + if err != nil { t.Fatal(err) } + fmt.Printf("%#v\n", st) + buf := new(bytes.Buffer) if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { t.Fatal(err) @@ -236,6 +239,7 @@ func TestRepeatedCompactions(t *testing.T) { } func checkRepo(t *testing.T, r io.Reader, expRecs []cid.Cid) { + t.Helper() rep, err := repo.ReadRepoFromCar(context.TODO(), r) if err != nil { t.Fatal(err) From 6c774322c9646ed7476cd7559a603ea1d78cee5d Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 21 Sep 2023 15:04:45 -0700 Subject: [PATCH 2/4] some cleanup --- carstore/bs.go | 37 +++++++++---------------------------- 1 file changed, 9 insertions(+), 28 deletions(-) diff --git a/carstore/bs.go b/carstore/bs.go index b534eb74f..d72671ec6 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -367,11 +367,6 @@ func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev s return err } - var allshards []CarShard - if err := cs.meta.Order("seq desc").Where("usr = ?", user).Find(&allshards).Error; err != nil { - return err - } - if !incremental && earlySeq > 0 { // have to do it the ugly way return fmt.Errorf("nyi") @@ -1031,25 +1026,6 @@ func (s shardStat) dirtyFrac() float64 { return float64(s.Dirty) / float64(s.Total) } -func shouldCompact(s shardStat) bool { - // if shard is mostly removed blocks - if s.dirtyFrac() > 0.5 { - return true - } - - // if its a big shard with a sufficient number of removed blocks - if s.Dirty > 1000 { - return true - } - - // if its just rather small and we want to compact it up with other shards - if s.Total < 20 { - return true - } - - return false -} - func aggrRefs(brefs []blockRef, shards map[uint]CarShard, staleCids map[cid.Cid]bool) []shardStat { byId := make(map[uint]*shardStat) @@ -1106,7 +1082,7 @@ func (cb *compBucket) shouldCompact() bool { frac /= float64(len(cb.shards)) if len(cb.shards) > 3 && frac > 0.2 { - + return true } return frac > 0.4 @@ -1268,10 +1244,9 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co } thresholdForPosition := func(i int) int { - if i >= len(threshs) { - return 5 + if i > len(threshs) { + return lowBound } - return threshs[i] } @@ -1300,9 +1275,15 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co for _, b := range compactionQueue { if !b.shouldCompact() { stats.SkippedShards += len(b.shards) + for _, s := range b.shards { + fmt.Println("o: ", s.Total, s.dirtyFrac()) + + } continue } + fmt.Println("n: ", b.cleanBlocks) + if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil { return nil, err } From 31f985e00ce597b0eff9a9bebddaa8db185c16fc Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 21 Sep 2023 15:15:35 -0700 Subject: [PATCH 3/4] fix build --- bgs/admin.go | 4 +++- bgs/bgs.go | 2 +- carstore/bs.go | 10 +++++----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/bgs/admin.go b/bgs/admin.go index d1b9ef57d..1804a78cb 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -414,12 +414,14 @@ func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error { return fmt.Errorf("no such user: %w", err) } - if err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID); err != nil { + stats, err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID) + if err != nil { return fmt.Errorf("compaction failed: %w", err) } return e.JSON(200, map[string]any{ "success": "true", + "stats": stats, }) } diff --git a/bgs/bgs.go b/bgs/bgs.go index a9d24fc60..ecf42a688 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -1090,7 +1090,7 @@ func (bgs *BGS) runRepoCompaction(ctx context.Context) error { } for _, r := range repos { - if err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil { + if _, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil { log.Errorf("failed to compact shards for user %d: %s", r.Usr, err) continue } diff --git a/carstore/bs.go b/carstore/bs.go index d72671ec6..ba800c1bc 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -1150,11 +1150,11 @@ func (cs *CarStore) GetCompactionTargets(ctx context.Context) ([]CompactionTarge } type CompactionStats struct { - StartShards int - NewShards int - SkippedShards int - ShardsDeleted int - RefsDeleted int + StartShards int `json:"startShards"` + NewShards int `json:"newShards"` + SkippedShards int `json:"skippedShards"` + ShardsDeleted int `json:"shardsDeleted"` + RefsDeleted int `json:"refsDeleted"` } func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*CompactionStats, error) { From ce46f329a13e28c6339bbbefac748c1adfdf5095 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 21 Sep 2023 16:03:55 -0700 Subject: [PATCH 4/4] remove debug prints --- carstore/bs.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/carstore/bs.go b/carstore/bs.go index ba800c1bc..cdea28f0b 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -1275,15 +1275,9 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co for _, b := range compactionQueue { if !b.shouldCompact() { stats.SkippedShards += len(b.shards) - for _, s := range b.shards { - fmt.Println("o: ", s.Total, s.dirtyFrac()) - - } continue } - fmt.Println("n: ", b.cleanBlocks) - if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil { return nil, err }