Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved compaction, minimize shard count while minimizing IO #328

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,14 @@ func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error {
return fmt.Errorf("no such user: %w", err)
}

if err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID); err != nil {
stats, err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID)
if err != nil {
return fmt.Errorf("compaction failed: %w", err)
}

return e.JSON(200, map[string]any{
"success": "true",
"stats": stats,
})
}

Expand Down
2 changes: 1 addition & 1 deletion bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ func (bgs *BGS) runRepoCompaction(ctx context.Context) error {
}

for _, r := range repos {
if err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil {
if _, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil {
log.Errorf("failed to compact shards for user %d: %s", r.Usr, err)
continue
}
Expand Down
153 changes: 102 additions & 51 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,8 @@ func (cs *CarStore) deleteShards(ctx context.Context, shs []*CarShard) error {

type shardStat struct {
ID uint
Seq int
Dirty int
Seq int
Total int

refs []blockRef
Expand All @@ -1026,33 +1026,15 @@ func (s shardStat) dirtyFrac() float64 {
return float64(s.Dirty) / float64(s.Total)
}

func shouldCompact(s shardStat) bool {
// if shard is mostly removed blocks
if s.dirtyFrac() > 0.5 {
return true
}

// if its a big shard with a sufficient number of removed blocks
if s.Dirty > 1000 {
return true
}

// if its just rather small and we want to compact it up with other shards
if s.Total < 20 {
return true
}

return false
}

func aggrRefs(brefs []blockRef, staleCids map[cid.Cid]bool) []shardStat {
func aggrRefs(brefs []blockRef, shards map[uint]CarShard, staleCids map[cid.Cid]bool) []shardStat {
byId := make(map[uint]*shardStat)

for _, br := range brefs {
s, ok := byId[br.Shard]
if !ok {
s = &shardStat{
ID: br.Shard,
ID: br.Shard,
Seq: shards[br.Shard].Seq,
}
byId[br.Shard] = s
}
Expand All @@ -1071,7 +1053,7 @@ func aggrRefs(brefs []blockRef, staleCids map[cid.Cid]bool) []shardStat {
}

sort.Slice(out, func(i, j int) bool {
return out[i].ID < out[j].ID
return out[i].Seq < out[j].Seq
})

return out
Expand All @@ -1081,6 +1063,29 @@ type compBucket struct {
shards []shardStat

cleanBlocks int
expSize int
}

func (cb *compBucket) shouldCompact() bool {
if len(cb.shards) == 0 {
return false
}

if len(cb.shards) > 5 {
return true
}

var frac float64
for _, s := range cb.shards {
frac += s.dirtyFrac()
}
frac /= float64(len(cb.shards))

if len(cb.shards) > 3 && frac > 0.2 {
return true
}

return frac > 0.4
}

func (cb *compBucket) addShardStat(ss shardStat) {
Expand Down Expand Up @@ -1144,15 +1149,23 @@ func (cs *CarStore) GetCompactionTargets(ctx context.Context) ([]CompactionTarge
return targets, nil
}

func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) error {
type CompactionStats struct {
StartShards int `json:"startShards"`
NewShards int `json:"newShards"`
SkippedShards int `json:"skippedShards"`
ShardsDeleted int `json:"shardsDeleted"`
RefsDeleted int `json:"refsDeleted"`
}

func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*CompactionStats, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards")
defer span.End()

span.SetAttributes(attribute.Int64("user", int64(user)))

var shards []CarShard
if err := cs.meta.WithContext(ctx).Find(&shards, "usr = ?", user).Error; err != nil {
return err
return nil, err
}

var shardIds []uint
Expand All @@ -1166,13 +1179,13 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
}

var brefs []blockRef
if err := cs.meta.WithContext(ctx).Raw(`select * from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil {
return err
if err := cs.meta.WithContext(ctx).Raw(`select shard, cid from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil {
return nil, err
}

var staleRefs []staleRef
if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil {
return err
return nil, err
}

stale := make(map[cid.Cid]bool)
Expand All @@ -1195,7 +1208,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
// focus on compacting everything else. it leaves *some* dirty blocks
// still around but we're doing that anyways since compaction isnt a
// perfect process
return fmt.Errorf("WIP: not currently handling this case")
return nil, fmt.Errorf("WIP: not currently handling this case")
}

keep := make(map[cid.Cid]bool)
Expand All @@ -1205,65 +1218,103 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
}
}

results := aggrRefs(brefs, stale)
results := aggrRefs(brefs, shardsById, stale)
var sum int
for _, r := range results {
sum += r.Total
}

lowBound := 20
N := 10
// we want to *aim* for N shards per user
// the last several should be left small to allow easy loading from disk
// for updates (since recent blocks are most likely needed for edits)
// the beginning of the list should be some sort of exponential fall-off
// with the area under the curve targeted by the total number of blocks we
// have
var threshs []int
tot := len(brefs)
for i := 0; i < N; i++ {
v := tot / 2
if v < lowBound {
v = lowBound
}
tot = tot / 2
threshs = append(threshs, v)
}

thresholdForPosition := func(i int) int {
// TODO: calculate some curve here so earlier shards end up with more
// blocks and recent shards end up with less
return 50
if i > len(threshs) {
return lowBound
}
return threshs[i]
}

cur := new(compBucket)
cur.expSize = thresholdForPosition(0)
var compactionQueue []*compBucket
for i, r := range results {
if shouldCompact(r) {
if cur.cleanBlocks > thresholdForPosition(i) {
compactionQueue = append(compactionQueue, cur)
cur = new(compBucket)
}
cur.addShardStat(r)

cur.addShardStat(r)
} else {
if !cur.isEmpty() {
compactionQueue = append(compactionQueue, cur)
cur = new(compBucket)
if cur.cleanBlocks > cur.expSize || i > len(results)-3 {
compactionQueue = append(compactionQueue, cur)
cur = &compBucket{
expSize: thresholdForPosition(len(compactionQueue)),
}
}
}

if !cur.isEmpty() {
compactionQueue = append(compactionQueue, cur)
}

stats := &CompactionStats{
StartShards: len(shards),
}

removedShards := make(map[uint]bool)
for _, b := range compactionQueue {
if !b.shouldCompact() {
stats.SkippedShards += len(b.shards)
continue
}

if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil {
return err
return nil, err
}

stats.NewShards++

var todelete []*CarShard
for _, s := range b.shards {
removedShards[s.ID] = true
sh, ok := shardsById[s.ID]
if !ok {
return fmt.Errorf("missing shard to delete")
return nil, fmt.Errorf("missing shard to delete")
}

todelete = append(todelete, &sh)
}

stats.ShardsDeleted += len(todelete)
if err := cs.deleteShards(ctx, todelete); err != nil {
return fmt.Errorf("deleting shards: %w", err)
return nil, fmt.Errorf("deleting shards: %w", err)
}
}

// now we need to delete the staleRefs we successfully cleaned up
// we can delete a staleRef if all the shards that have blockRefs with matching stale refs were processed

return cs.deleteStaleRefs(ctx, brefs, staleRefs, removedShards)
num, err := cs.deleteStaleRefs(ctx, brefs, staleRefs, removedShards)
if err != nil {
return nil, err
}

stats.RefsDeleted = num

return stats, nil
}

func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error {
func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) (int, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs")
defer span.End()

Expand Down Expand Up @@ -1296,11 +1347,11 @@ func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, stale
}

if err := cs.meta.Delete(&staleRef{}, "id in (?)", sl).Error; err != nil {
return err
return 0, err
}
}

return nil
return len(staleToDelete), nil
}

func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error {
Expand Down
8 changes: 6 additions & 2 deletions carstore/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestBasicOperation(t *testing.T) {
}
checkRepo(t, buf, recs)

if err := cs.CompactUserShards(ctx, 1); err != nil {
if _, err := cs.CompactUserShards(ctx, 1); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -217,10 +217,13 @@ func TestRepeatedCompactions(t *testing.T) {
head = nroot
}
fmt.Println("Run compaction", loop)
if err := cs.CompactUserShards(ctx, 1); err != nil {
st, err := cs.CompactUserShards(ctx, 1)
if err != nil {
t.Fatal(err)
}

fmt.Printf("%#v\n", st)

buf := new(bytes.Buffer)
if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil {
t.Fatal(err)
Expand All @@ -236,6 +239,7 @@ func TestRepeatedCompactions(t *testing.T) {
}

func checkRepo(t *testing.T, r io.Reader, expRecs []cid.Cid) {
t.Helper()
rep, err := repo.ReadRepoFromCar(context.TODO(), r)
if err != nil {
t.Fatal(err)
Expand Down
Loading