diff --git a/bgs/admin.go b/bgs/admin.go index d1b9ef57d..1804a78cb 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -414,12 +414,14 @@ func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error { return fmt.Errorf("no such user: %w", err) } - if err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID); err != nil { + stats, err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID) + if err != nil { return fmt.Errorf("compaction failed: %w", err) } return e.JSON(200, map[string]any{ "success": "true", + "stats": stats, }) } diff --git a/bgs/bgs.go b/bgs/bgs.go index a9d24fc60..ecf42a688 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -1090,7 +1090,7 @@ func (bgs *BGS) runRepoCompaction(ctx context.Context) error { } for _, r := range repos { - if err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil { + if _, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil { log.Errorf("failed to compact shards for user %d: %s", r.Usr, err) continue } diff --git a/carstore/bs.go b/carstore/bs.go index 86d3cfaaa..cdea28f0b 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -1015,8 +1015,8 @@ func (cs *CarStore) deleteShards(ctx context.Context, shs []*CarShard) error { type shardStat struct { ID uint - Seq int Dirty int + Seq int Total int refs []blockRef @@ -1026,33 +1026,15 @@ func (s shardStat) dirtyFrac() float64 { return float64(s.Dirty) / float64(s.Total) } -func shouldCompact(s shardStat) bool { - // if shard is mostly removed blocks - if s.dirtyFrac() > 0.5 { - return true - } - - // if its a big shard with a sufficient number of removed blocks - if s.Dirty > 1000 { - return true - } - - // if its just rather small and we want to compact it up with other shards - if s.Total < 20 { - return true - } - - return false -} - -func aggrRefs(brefs []blockRef, staleCids map[cid.Cid]bool) []shardStat { +func aggrRefs(brefs []blockRef, shards map[uint]CarShard, staleCids map[cid.Cid]bool) []shardStat { byId := make(map[uint]*shardStat) for _, br := range brefs { s, ok := byId[br.Shard] if !ok { s = &shardStat{ - ID: br.Shard, + ID: br.Shard, + Seq: shards[br.Shard].Seq, } byId[br.Shard] = s } @@ -1071,7 +1053,7 @@ func aggrRefs(brefs []blockRef, staleCids map[cid.Cid]bool) []shardStat { } sort.Slice(out, func(i, j int) bool { - return out[i].ID < out[j].ID + return out[i].Seq < out[j].Seq }) return out @@ -1081,6 +1063,29 @@ type compBucket struct { shards []shardStat cleanBlocks int + expSize int +} + +func (cb *compBucket) shouldCompact() bool { + if len(cb.shards) == 0 { + return false + } + + if len(cb.shards) > 5 { + return true + } + + var frac float64 + for _, s := range cb.shards { + frac += s.dirtyFrac() + } + frac /= float64(len(cb.shards)) + + if len(cb.shards) > 3 && frac > 0.2 { + return true + } + + return frac > 0.4 } func (cb *compBucket) addShardStat(ss shardStat) { @@ -1144,7 +1149,15 @@ func (cs *CarStore) GetCompactionTargets(ctx context.Context) ([]CompactionTarge return targets, nil } -func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) error { +type CompactionStats struct { + StartShards int `json:"startShards"` + NewShards int `json:"newShards"` + SkippedShards int `json:"skippedShards"` + ShardsDeleted int `json:"shardsDeleted"` + RefsDeleted int `json:"refsDeleted"` +} + +func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*CompactionStats, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards") defer span.End() @@ -1152,7 +1165,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro var shards []CarShard if err := cs.meta.WithContext(ctx).Find(&shards, "usr = ?", user).Error; err != nil { - return err + return nil, err } var shardIds []uint @@ -1166,13 +1179,13 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro } var brefs []blockRef - if err := cs.meta.WithContext(ctx).Raw(`select * from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil { - return err + if err := cs.meta.WithContext(ctx).Raw(`select shard, cid from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil { + return nil, err } var staleRefs []staleRef if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil { - return err + return nil, err } stale := make(map[cid.Cid]bool) @@ -1195,7 +1208,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro // focus on compacting everything else. it leaves *some* dirty blocks // still around but we're doing that anyways since compaction isnt a // perfect process - return fmt.Errorf("WIP: not currently handling this case") + return nil, fmt.Errorf("WIP: not currently handling this case") } keep := make(map[cid.Cid]bool) @@ -1205,65 +1218,103 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro } } - results := aggrRefs(brefs, stale) + results := aggrRefs(brefs, shardsById, stale) + var sum int + for _, r := range results { + sum += r.Total + } + + lowBound := 20 + N := 10 + // we want to *aim* for N shards per user + // the last several should be left small to allow easy loading from disk + // for updates (since recent blocks are most likely needed for edits) + // the beginning of the list should be some sort of exponential fall-off + // with the area under the curve targeted by the total number of blocks we + // have + var threshs []int + tot := len(brefs) + for i := 0; i < N; i++ { + v := tot / 2 + if v < lowBound { + v = lowBound + } + tot = tot / 2 + threshs = append(threshs, v) + } thresholdForPosition := func(i int) int { - // TODO: calculate some curve here so earlier shards end up with more - // blocks and recent shards end up with less - return 50 + if i > len(threshs) { + return lowBound + } + return threshs[i] } cur := new(compBucket) + cur.expSize = thresholdForPosition(0) var compactionQueue []*compBucket for i, r := range results { - if shouldCompact(r) { - if cur.cleanBlocks > thresholdForPosition(i) { - compactionQueue = append(compactionQueue, cur) - cur = new(compBucket) - } + cur.addShardStat(r) - cur.addShardStat(r) - } else { - if !cur.isEmpty() { - compactionQueue = append(compactionQueue, cur) - cur = new(compBucket) + if cur.cleanBlocks > cur.expSize || i > len(results)-3 { + compactionQueue = append(compactionQueue, cur) + cur = &compBucket{ + expSize: thresholdForPosition(len(compactionQueue)), } } } - if !cur.isEmpty() { compactionQueue = append(compactionQueue, cur) } + stats := &CompactionStats{ + StartShards: len(shards), + } + removedShards := make(map[uint]bool) for _, b := range compactionQueue { + if !b.shouldCompact() { + stats.SkippedShards += len(b.shards) + continue + } + if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil { - return err + return nil, err } + stats.NewShards++ + var todelete []*CarShard for _, s := range b.shards { removedShards[s.ID] = true sh, ok := shardsById[s.ID] if !ok { - return fmt.Errorf("missing shard to delete") + return nil, fmt.Errorf("missing shard to delete") } todelete = append(todelete, &sh) } + stats.ShardsDeleted += len(todelete) if err := cs.deleteShards(ctx, todelete); err != nil { - return fmt.Errorf("deleting shards: %w", err) + return nil, fmt.Errorf("deleting shards: %w", err) } } // now we need to delete the staleRefs we successfully cleaned up // we can delete a staleRef if all the shards that have blockRefs with matching stale refs were processed - return cs.deleteStaleRefs(ctx, brefs, staleRefs, removedShards) + num, err := cs.deleteStaleRefs(ctx, brefs, staleRefs, removedShards) + if err != nil { + return nil, err + } + + stats.RefsDeleted = num + + return stats, nil } -func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error { +func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) (int, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs") defer span.End() @@ -1296,11 +1347,11 @@ func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, stale } if err := cs.meta.Delete(&staleRef{}, "id in (?)", sl).Error; err != nil { - return err + return 0, err } } - return nil + return len(staleToDelete), nil } func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error { diff --git a/carstore/repo_test.go b/carstore/repo_test.go index c07447af2..5ce81ece7 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -140,7 +140,7 @@ func TestBasicOperation(t *testing.T) { } checkRepo(t, buf, recs) - if err := cs.CompactUserShards(ctx, 1); err != nil { + if _, err := cs.CompactUserShards(ctx, 1); err != nil { t.Fatal(err) } @@ -217,10 +217,13 @@ func TestRepeatedCompactions(t *testing.T) { head = nroot } fmt.Println("Run compaction", loop) - if err := cs.CompactUserShards(ctx, 1); err != nil { + st, err := cs.CompactUserShards(ctx, 1) + if err != nil { t.Fatal(err) } + fmt.Printf("%#v\n", st) + buf := new(bytes.Buffer) if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { t.Fatal(err) @@ -236,6 +239,7 @@ func TestRepeatedCompactions(t *testing.T) { } func checkRepo(t *testing.T, r io.Reader, expRecs []cid.Cid) { + t.Helper() rep, err := repo.ReadRepoFromCar(context.TODO(), r) if err != nil { t.Fatal(err)