From e11a2d7ca43bc93bf081fa9eb9c67f82b953743d Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 7 Sep 2023 15:56:57 -0700 Subject: [PATCH 1/6] implement first pass at compaction of carstore shards --- carstore/bs.go | 454 +++++++++++++++++++++++++++++++++--------- carstore/repo_test.go | 140 ++++++++++++- 2 files changed, 495 insertions(+), 99 deletions(-) diff --git a/carstore/bs.go b/carstore/bs.go index ebbb4d621..a1af40a59 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -9,6 +9,7 @@ import ( "io" "os" "path/filepath" + "sort" "strings" "sync" "time" @@ -73,7 +74,6 @@ type CarShard struct { Seq int `gorm:"index:idx_car_shards_seq;index:idx_car_shards_usr_seq,priority:2,sort:desc"` Path string Usr models.Uid `gorm:"index:idx_car_shards_usr;index:idx_car_shards_usr_seq,priority:1"` - Rebase bool Rev string } @@ -262,11 +262,11 @@ func (cs *CarStore) checkLastShardCache(user models.Uid) *CarShard { return nil } -func (cs *CarStore) putLastShardCache(user models.Uid, ls *CarShard) { +func (cs *CarStore) putLastShardCache(ls *CarShard) { cs.lscLk.Lock() defer cs.lscLk.Unlock() - cs.lastShardCache[user] = ls + cs.lastShardCache[ls.Usr] = ls } func (cs *CarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) { @@ -288,7 +288,7 @@ func (cs *CarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShar //} } - cs.putLastShardCache(user, &lastShard) + cs.putLastShardCache(&lastShard) return &lastShard, nil } @@ -375,15 +375,8 @@ func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev s } for _, sh := range shards { - // for rebase shards, only include the modified root, not the whole tree - if sh.Rebase && incremental { - if err := cs.writeBlockFromShard(ctx, &sh, w, sh.Root.CID); err != nil { - return err - } - } else { - if err := cs.writeShardBlocks(ctx, &sh, w); err != nil { - return err - } + if err := cs.writeShardBlocks(ctx, &sh, w); err != nil { + return err } } @@ -438,6 +431,33 @@ func (cs *CarStore) writeBlockFromShard(ctx context.Context, sh *CarShard, w io. } } +func (cs *CarStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error { + fi, err := os.Open(sh.Path) + if err != nil { + return err + } + defer fi.Close() + + rr, err := car.NewCarReader(fi) + if err != nil { + return err + } + + for { + blk, err := rr.Next() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + if err := cb(blk); err != nil { + return err + } + } +} + var _ blockstore.Blockstore = (*DeltaSession)(nil) func (ds *DeltaSession) BaseCid() cid.Cid { @@ -539,13 +559,20 @@ func (cs *CarStore) writeNewShardFile(ctx context.Context, user models.Uid, seq } func (cs *CarStore) deleteShardFile(ctx context.Context, sh *CarShard) error { - return os.Remove(fnameForShard(sh.Usr, sh.Seq)) + return os.Remove(sh.Path) } // CloseWithRoot writes all new blocks in a car file to the writer with the // given cid as the 'root' func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev string) ([]byte, error) { - return ds.closeWithRoot(ctx, root, rev, false) + ctx, span := otel.Tracer("carstore").Start(ctx, "CloseWithRoot") + defer span.End() + + if ds.readonly { + return nil, fmt.Errorf("cannot write to readonly deltaSession") + } + + return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids) } func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { @@ -566,13 +593,7 @@ func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { return hnw, nil } -func (ds *DeltaSession) closeWithRoot(ctx context.Context, root cid.Cid, rev string, rebase bool) ([]byte, error) { - ctx, span := otel.Tracer("carstore").Start(ctx, "CloseWithRoot") - defer span.End() - - if ds.readonly { - return nil, fmt.Errorf("cannot write to readonly deltaSession") - } +func (cs *CarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { buf := new(bytes.Buffer) hnw, err := WriteCarHeader(buf, root) @@ -586,8 +607,8 @@ func (ds *DeltaSession) closeWithRoot(ctx context.Context, root cid.Cid, rev str offset := hnw //brefs := make([]*blockRef, 0, len(ds.blks)) - brefs := make([]map[string]interface{}, 0, len(ds.blks)) - for k, blk := range ds.blks { + brefs := make([]map[string]interface{}, 0, len(blks)) + for k, blk := range blks { nw, err := LdWrite(buf, k.Bytes(), blk.RawData()) if err != nil { return nil, fmt.Errorf("failed to write block: %w", err) @@ -610,7 +631,7 @@ func (ds *DeltaSession) closeWithRoot(ctx context.Context, root cid.Cid, rev str offset += nw } - path, err := ds.cs.writeNewShardFile(ctx, ds.user, ds.seq, buf.Bytes()) + path, err := cs.writeNewShardFile(ctx, user, seq, buf.Bytes()) if err != nil { return nil, fmt.Errorf("failed to write shard file: %w", err) } @@ -618,31 +639,34 @@ func (ds *DeltaSession) closeWithRoot(ctx context.Context, root cid.Cid, rev str shard := CarShard{ Root: models.DbCID{root}, DataStart: hnw, - Seq: ds.seq, + Seq: seq, Path: path, - Usr: ds.user, + Usr: user, Rev: rev, } - if err := ds.putShard(ctx, &shard, brefs); err != nil { + if err := cs.putShard(ctx, &shard, brefs, rmcids, false); err != nil { return nil, err } return buf.Bytes(), nil } -func (ds *DeltaSession) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any) error { +func (cs *CarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool, nocache bool) error { ctx, span := otel.Tracer("carstore").Start(ctx, "putShard") defer span.End() // TODO: there should be a way to create the shard and block_refs that // reference it in the same query, would save a lot of time - tx := ds.cs.meta.WithContext(ctx).Begin() + tx := cs.meta.WithContext(ctx).Begin() if err := tx.WithContext(ctx).Create(shard).Error; err != nil { return fmt.Errorf("failed to create shard in DB tx: %w", err) } - ds.cs.putLastShardCache(ds.user, shard) + + if !nocache { + cs.putLastShardCache(shard) + } for _, ref := range brefs { ref["shard"] = shard.ID @@ -652,13 +676,13 @@ func (ds *DeltaSession) putShard(ctx context.Context, shard *CarShard, brefs []m return fmt.Errorf("failed to create block refs: %w", err) } - if len(ds.rmcids) > 0 { + if len(rmcids) > 0 { var torm []models.DbCID - for c := range ds.rmcids { + for c := range rmcids { torm = append(torm, models.DbCID{c}) } - subq := ds.cs.meta.Model(&blockRef{}).Joins("left join car_shards cs on cs.id = block_refs.shard").Where("cid in (?) AND usr = ?", torm, ds.user).Select("block_refs.id") + subq := cs.meta.Model(&blockRef{}).Joins("left join car_shards cs on cs.id = block_refs.shard").Where("cid in (?) AND usr = ?", torm, shard.Usr).Select("block_refs.id") if err := tx.Model(&blockRef{}).Where("id in (?)", subq).UpdateColumn("dirty", true).Error; err != nil { return err } @@ -715,39 +739,6 @@ func createInBatches(ctx context.Context, tx *gorm.DB, data []map[string]any, ba return nil } -func (ds *DeltaSession) CloseAsRebase(ctx context.Context, root cid.Cid, rev string) error { - _, err := ds.closeWithRoot(ctx, root, rev, true) - if err != nil { - return err - } - - // TODO: this *could* get large, might be worth doing it incrementally - var oldslices []CarShard - if err := ds.cs.meta.Find(&oldslices, "usr = ? AND seq < ?", ds.user, ds.seq).Error; err != nil { - return err - } - - // If anything here fails, cleanup is straightforward. Simply look for any - // shard in the database with a higher seq shard marked as 'rebase' - for _, sl := range oldslices { - if err := os.Remove(sl.Path); err != nil { - if !os.IsNotExist(err) { - return err - } - } - - if err := ds.cs.meta.Delete(&sl).Error; err != nil { - return err - } - - if err := ds.cs.meta.Where("shard = ?", sl.ID).Delete(&blockRef{}).Error; err != nil { - return err - } - } - - return nil -} - func LdWrite(w io.Writer, d ...[]byte) (int64, error) { var sum uint64 for _, s := range d { @@ -781,7 +772,7 @@ func setToSlice(s map[cid.Cid]bool) []cid.Cid { return out } -func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids []cid.Cid) (map[cid.Cid]bool, error) { +func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block) (map[cid.Cid]bool, error) { ctx, span := otel.Tracer("repo").Start(ctx, "BlockDiff") defer span.End() @@ -791,7 +782,7 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n // walk the entire 'new' portion of the tree, marking all referenced cids as 'keep' keepset := make(map[cid.Cid]bool) - for _, c := range newcids { + for c := range newcids { keepset[c] = true oblk, err := bs.Get(ctx, c) if err != nil { @@ -872,7 +863,7 @@ func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, since *stri } } - rmcids, err := BlockDiff(ctx, ds, ds.baseCid, cids) + rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks) if err != nil { return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s): %w", ds.baseCid, err) } @@ -882,6 +873,16 @@ func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, since *stri return carr.Header.Roots[0], ds, nil } +func (ds *DeltaSession) CalcDiff(ctx context.Context, nroot cid.Cid) error { + rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks) + if err != nil { + return fmt.Errorf("block diff failed: %w", err) + } + + ds.rmcids = rmcids + return nil +} + func (cs *CarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { lastShard, err := cs.getLastShard(ctx, user) if err != nil { @@ -930,49 +931,320 @@ func (cs *CarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error return out, nil } -func (cs *CarStore) checkFork(ctx context.Context, user models.Uid, prev cid.Cid) (bool, error) { - ctx, span := otel.Tracer("carstore").Start(ctx, "checkFork") - defer span.End() +func (cs *CarStore) TakeDownRepo(ctx context.Context, user models.Uid) error { + var shards []CarShard + if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil { + return err + } - lastShard, err := cs.getLastShard(ctx, user) + for _, sh := range shards { + if err := cs.deleteShard(ctx, &sh); err != nil { + if !os.IsNotExist(err) { + return err + } + } + } + + if err := cs.meta.Delete(&CarShard{}, "usr = ?", user).Error; err != nil { + return err + } + + return nil +} + +func (cs *CarStore) deleteShard(ctx context.Context, sh *CarShard) error { + if err := cs.meta.Delete(&CarShard{}, "id = ?", sh.ID).Error; err != nil { + return err + } + + if err := cs.meta.Delete(&blockRef{}, "shard = ?", sh.ID).Error; err != nil { + return err + } + + return cs.deleteShardFile(ctx, sh) +} + +type shardStat struct { + ID uint + Seq int + Dirty int + Total int + + refs []blockRef +} + +func (s shardStat) dirtyFrac() float64 { + return float64(s.Dirty) / float64(s.Total) +} + +func shouldCompact(s shardStat) bool { + // if shard is mostly removed blocks + if s.dirtyFrac() > 0.5 { + return true + } + + // if its a big shard with a sufficient number of removed blocks + if s.Dirty > 1000 { + return true + } + + // if its just rather small and we want to compact it up with other shards + if s.Total < 20 { + return true + } + + return false +} + +func aggrRefs(brefs []blockRef) []shardStat { + byId := make(map[uint]*shardStat) + + for _, br := range brefs { + s, ok := byId[br.Shard] + if !ok { + s = &shardStat{ + ID: br.Shard, + } + byId[br.Shard] = s + } + + s.Total++ + if br.Dirty { + s.Dirty++ + } + + s.refs = append(s.refs, br) + } + + var out []shardStat + for _, s := range byId { + out = append(out, *s) + } + + sort.Slice(out, func(i, j int) bool { + return out[i].ID < out[j].ID + }) + + return out +} + +type compBucket struct { + shards []shardStat + + cleanBlocks int +} + +func (cb *compBucket) addShardStat(ss shardStat) { + cb.cleanBlocks += (ss.Total - ss.Dirty) + cb.shards = append(cb.shards, ss) +} + +func (cb *compBucket) isFull() bool { + return cb.cleanBlocks > 50 +} + +func (cb *compBucket) isEmpty() bool { + return len(cb.shards) == 0 +} + +func (cs *CarStore) copyShardBlocksFiltered(ctx context.Context, sh *CarShard, w io.Writer, keep map[cid.Cid]bool) error { + fi, err := os.Open(sh.Path) if err != nil { - return false, err + return err } + defer fi.Close() - var maybeShard CarShard - if err := cs.meta.WithContext(ctx).Model(CarShard{}).Find(&maybeShard, "usr = ? AND root = ?", user, &models.DbCID{prev}).Error; err != nil { - return false, err + rr, err := car.NewCarReader(fi) + if err != nil { + return err } - if maybeShard.ID != 0 && maybeShard.ID == lastShard.ID { - // somehow we are checking if a valid 'append' is a fork, seems buggy, throw an error - return false, fmt.Errorf("invariant broken: checked for forkiness of a valid append (%d - %d)", lastShard.ID, maybeShard.ID) + for { + blk, err := rr.Next() + if err != nil { + return err + } + + if keep[blk.Cid()] { + _, err := LdWrite(w, blk.Cid().Bytes(), blk.RawData()) + return err + } } +} - if maybeShard.ID == 0 { - return false, nil +func (cs *CarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { + // TODO: some overwrite protections + fi, err := os.CreateTemp(cs.rootDir, fnameForShard(user, seq)) + if err != nil { + return nil, "", err } - return true, nil + return fi, fi.Name(), nil } -func (cs *CarStore) TakeDownRepo(ctx context.Context, user models.Uid) error { +func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) error { + /* + var results []shardStat + if err := cs.meta.Raw(` + WITH user_shards AS + (select cs.id, cs.seq, br.dirty from block_refs br left join car_shards cs on br.shard = cs.id where cs.usr = ?) + SELECT + id, + seq, + count(*) as total, + sum(case when dirty then 1 else 0 end) as dirty + FROM user_shards group by id, seq;`, user).Scan(&results).Error; err != nil { + return err + } + */ + + var brefs []blockRef + if err := cs.meta.Raw(`select br.* from block_refs br left join car_shards cs on br.shard = cs.id where cs.usr = ?`, user).Scan(&brefs).Error; err != nil { + return err + } + + cset := make(map[cid.Cid]bool) + var hasDirtyDupes bool + for _, br := range brefs { + if br.Dirty { + if cset[br.Cid.CID] { + hasDirtyDupes = true + break + } + cset[br.Cid.CID] = true + } + } + + if hasDirtyDupes { + // if we have no duplicates, then the keep set is simply all the 'clean' blockRefs + // in the case we have duplicate dirty references we have to compute + // the keep set by walking the entire repo to check if anything is + // still referencing the dirty block in question + return fmt.Errorf("WIP: not currently handling this case") + } + + keep := make(map[cid.Cid]bool) + for _, br := range brefs { + if !br.Dirty { + keep[br.Cid.CID] = true + } + } + + results := aggrRefs(brefs) + + var shardIds []uint + for _, r := range results { + shardIds = append(shardIds, r.ID) + } + + fmt.Println(shardIds) + var shards []CarShard - if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil { + if err := cs.meta.Find(&shards, "id in (?)", shardIds).Error; err != nil { return err } - for _, sh := range shards { - if err := cs.deleteShardFile(ctx, &sh); err != nil { - if !os.IsNotExist(err) { - return err + shardsById := make(map[uint]CarShard) + for _, s := range shards { + shardsById[s.ID] = s + } + + var compactionQueue []*compBucket + + cur := new(compBucket) + + for _, r := range results { + fmt.Println("res: ", shouldCompact(r), r.Dirty, r.Total) + if shouldCompact(r) { + if cur.isFull() { + compactionQueue = append(compactionQueue, cur) + cur = new(compBucket) + } + + cur.addShardStat(r) + } else { + if !cur.isEmpty() { + compactionQueue = append(compactionQueue, cur) + cur = new(compBucket) } } } - if err := cs.meta.Delete(&CarShard{}, "usr = ?", user).Error; err != nil { + if !cur.isEmpty() { + compactionQueue = append(compactionQueue, cur) + } + + for _, b := range compactionQueue { + if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil { + return err + } + + for _, s := range b.shards { + sh, ok := shardsById[s.ID] + if !ok { + return fmt.Errorf("missing shard to delete") + } + + if err := cs.deleteShard(ctx, &sh); err != nil { + return fmt.Errorf("deleting shard: %w", err) + } + } + } + + return nil +} + +func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error { + last := b.shards[len(b.shards)-1] + lastsh := shardsById[last.ID] + fi, path, err := cs.openNewCompactedShardFile(ctx, user, last.Seq) + if err != nil { return err } + defer fi.Close() + root := lastsh.Root.CID + + hnw, err := WriteCarHeader(fi, root) + if err != nil { + return err + } + + offset := hnw + var nbrefs []map[string]any + for _, s := range b.shards { + sh := shardsById[s.ID] + if err := cs.iterateShardBlocks(ctx, &sh, func(blk blockformat.Block) error { + if keep[blk.Cid()] { + nw, err := LdWrite(fi, blk.Cid().Bytes(), blk.RawData()) + if err != nil { + return fmt.Errorf("failed to write block: %w", err) + } + + nbrefs = append(nbrefs, map[string]interface{}{ + "cid": models.DbCID{blk.Cid()}, + "offset": offset, + }) + + offset += nw + } + return nil + }); err != nil { + return err + } + } + + shard := CarShard{ + Root: models.DbCID{root}, + DataStart: hnw, + Seq: lastsh.Seq, + Path: path, + Usr: user, + Rev: lastsh.Rev, + } + + if err := cs.putShard(ctx, &shard, nbrefs, nil, true); err != nil { + return err + } return nil } diff --git a/carstore/repo_test.go b/carstore/repo_test.go index 6b5629a12..1170f1fec 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "path/filepath" "testing" @@ -92,6 +93,7 @@ func TestBasicOperation(t *testing.T) { t.Fatal(err) } + var recs []cid.Cid head := ncid for i := 0; i < 10; i++ { ds, err := cs.NewDeltaSession(ctx, 1, &rev) @@ -104,12 +106,15 @@ func TestBasicOperation(t *testing.T) { t.Fatal(err) } - if _, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ + rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), - }); err != nil { + }) + if err != nil { t.Fatal(err) } + recs = append(recs, rc) + kmgr := &util.FakeKeyManager{} nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) if err != nil { @@ -118,6 +123,10 @@ func TestBasicOperation(t *testing.T) { rev = nrev + if err := ds.CalcDiff(ctx, nroot); err != nil { + t.Fatal(err) + } + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { t.Fatal(err) } @@ -129,20 +138,135 @@ func TestBasicOperation(t *testing.T) { if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { t.Fatal(err) } + checkRepo(t, buf, recs) - fmt.Println(buf.Len()) + if err := cs.CompactUserShards(ctx, 1); err != nil { + t.Fatal(err) + } + buf = new(bytes.Buffer) + if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { + t.Fatal(err) + } + checkRepo(t, buf, recs) } -func setupRepo(ctx context.Context, bs blockstore.Blockstore) (cid.Cid, string, error) { - nr := repo.NewRepo(ctx, "did:foo", bs) +func TestRepeatedCompactions(t *testing.T) { + ctx := context.TODO() + + cs, cleanup, err := testCarStore() + if err != nil { + t.Fatal(err) + } + defer cleanup() + + ds, err := cs.NewDeltaSession(ctx, 1, nil) + if err != nil { + t.Fatal(err) + } + + ncid, rev, err := setupRepo(ctx, ds) + if err != nil { + t.Fatal(err) + } + + if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { + t.Fatal(err) + } + + var recs []cid.Cid + head := ncid + + for loop := 0; loop < 100; loop++ { + for i := 0; i < 25; i++ { + ds, err := cs.NewDeltaSession(ctx, 1, &rev) + if err != nil { + t.Fatal(err) + } + + rr, err := repo.OpenRepo(ctx, ds, head, true) + if err != nil { + t.Fatal(err) + } + + rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ + Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), + }) + if err != nil { + t.Fatal(err) + } + + recs = append(recs, rc) + + kmgr := &util.FakeKeyManager{} + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) + if err != nil { + t.Fatal(err) + } + + rev = nrev + + if err := ds.CalcDiff(ctx, nroot); err != nil { + t.Fatal(err) + } + + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { + t.Fatal(err) + } + + head = nroot + } + fmt.Println("Run compaction", loop) + if err := cs.CompactUserShards(ctx, 1); err != nil { + t.Fatal(err) + } + + buf := new(bytes.Buffer) + if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { + t.Fatal(err) + } + checkRepo(t, buf, recs) + } + + buf := new(bytes.Buffer) + if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { + t.Fatal(err) + } + checkRepo(t, buf, recs) +} + +func checkRepo(t *testing.T, r io.Reader, expRecs []cid.Cid) { + rep, err := repo.ReadRepoFromCar(context.TODO(), r) + if err != nil { + t.Fatal(err) + } + + set := make(map[cid.Cid]bool) + for _, c := range expRecs { + set[c] = true + } + + if err := rep.ForEach(context.TODO(), "", func(k string, v cid.Cid) error { + if !set[v] { + return fmt.Errorf("have record we didnt expect") + } + + delete(set, v) + return nil - if _, _, err := nr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ - Text: fmt.Sprintf("hey look its a tweet %s", time.Now()), }); err != nil { - return cid.Undef, "", err + t.Fatal(err) } + if len(set) > 0 { + t.Fatalf("expected to find more cids in repo: %v", set) + } + +} + +func setupRepo(ctx context.Context, bs blockstore.Blockstore) (cid.Cid, string, error) { + nr := repo.NewRepo(ctx, "did:foo", bs) + kmgr := &util.FakeKeyManager{} ncid, rev, err := nr.Commit(ctx, kmgr.SignForUser) if err != nil { From 611acaa0f55885a888293fe2969478a322c8be38 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 8 Sep 2023 11:22:36 -0700 Subject: [PATCH 2/6] small cleanup in compaction routine --- carstore/bs.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/carstore/bs.go b/carstore/bs.go index a1af40a59..ab89542fc 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -1039,10 +1039,6 @@ func (cb *compBucket) addShardStat(ss shardStat) { cb.shards = append(cb.shards, ss) } -func (cb *compBucket) isFull() bool { - return cb.cleanBlocks > 50 -} - func (cb *compBucket) isEmpty() bool { return len(cb.shards) == 0 } @@ -1120,6 +1116,11 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro // in the case we have duplicate dirty references we have to compute // the keep set by walking the entire repo to check if anything is // still referencing the dirty block in question + + // we could also just add the duplicates to the keep set for now and + // focus on compacting everything else. it leaves *some* dirty blocks + // still around but we're doing that anyways since compaction isnt a + // perfect process return fmt.Errorf("WIP: not currently handling this case") } @@ -1137,8 +1138,6 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro shardIds = append(shardIds, r.ID) } - fmt.Println(shardIds) - var shards []CarShard if err := cs.meta.Find(&shards, "id in (?)", shardIds).Error; err != nil { return err @@ -1149,14 +1148,17 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro shardsById[s.ID] = s } - var compactionQueue []*compBucket + thresholdForPosition := func(i int) int { + // TODO: calculate some curve here so earlier shards end up with more + // blocks and recent shards end up with less + return 50 + } cur := new(compBucket) - - for _, r := range results { - fmt.Println("res: ", shouldCompact(r), r.Dirty, r.Total) + var compactionQueue []*compBucket + for i, r := range results { if shouldCompact(r) { - if cur.isFull() { + if cur.cleanBlocks > thresholdForPosition(i) { compactionQueue = append(compactionQueue, cur) cur = new(compBucket) } From 27ca98ff25f9e8e52e9821609c05dd6fc091cadc Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 8 Sep 2023 11:30:12 -0700 Subject: [PATCH 3/6] add tracing --- carstore/bs.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/carstore/bs.go b/carstore/bs.go index ab89542fc..94f49b303 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -1079,20 +1079,8 @@ func (cs *CarStore) openNewCompactedShardFile(ctx context.Context, user models.U } func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) error { - /* - var results []shardStat - if err := cs.meta.Raw(` - WITH user_shards AS - (select cs.id, cs.seq, br.dirty from block_refs br left join car_shards cs on br.shard = cs.id where cs.usr = ?) - SELECT - id, - seq, - count(*) as total, - sum(case when dirty then 1 else 0 end) as dirty - FROM user_shards group by id, seq;`, user).Scan(&results).Error; err != nil { - return err - } - */ + ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards") + defer span.End() var brefs []blockRef if err := cs.meta.Raw(`select br.* from block_refs br left join car_shards cs on br.shard = cs.id where cs.usr = ?`, user).Scan(&brefs).Error; err != nil { From 17c0eed54296011417c9059eb7db2f591f3f7969 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 8 Sep 2023 12:39:41 -0700 Subject: [PATCH 4/6] add some routes --- bgs/admin.go | 35 ++++++++++++++++++++++++++++ bgs/bgs.go | 26 +++++++++++++++++++++ carstore/bs.go | 54 +++++++++++++++++++++++++++++-------------- carstore/repo_test.go | 4 ++-- 4 files changed, 100 insertions(+), 19 deletions(-) diff --git a/bgs/admin.go b/bgs/admin.go index 645f728f2..d1b9ef57d 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -2,6 +2,7 @@ package bgs import ( "errors" + "fmt" "net/http" "strconv" "strings" @@ -399,3 +400,37 @@ func (bgs *BGS) handleAdminChangePDSCrawlLimit(e echo.Context) error { "success": "true", }) } + +func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error { + ctx := e.Request().Context() + + did := e.QueryParam("did") + if did == "" { + return fmt.Errorf("must pass a did") + } + + u, err := bgs.lookupUserByDid(ctx, did) + if err != nil { + return fmt.Errorf("no such user: %w", err) + } + + if err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID); err != nil { + return fmt.Errorf("compaction failed: %w", err) + } + + return e.JSON(200, map[string]any{ + "success": "true", + }) +} + +func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error { + ctx := e.Request().Context() + + if err := bgs.runRepoCompaction(ctx); err != nil { + return fmt.Errorf("compaction run failed: %w", err) + } + + return e.JSON(200, map[string]any{ + "success": "true", + }) +} diff --git a/bgs/bgs.go b/bgs/bgs.go index 4f0edea3f..878cf5396 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -305,6 +305,8 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error { // Repo-related Admin API admin.POST("/repo/takeDown", bgs.handleAdminTakeDownRepo) admin.POST("/repo/reverseTakedown", bgs.handleAdminReverseTakedown) + admin.POST("/repo/compact", bgs.handleAdminCompactRepo) + admin.POST("/repo/compactAll", bgs.handleAdminCompactAllRepos) // PDS-related Admin API admin.GET("/pds/list", bgs.handleListPDSs) @@ -1029,3 +1031,27 @@ func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error { return nil } + +func (bgs *BGS) runRepoCompaction(ctx context.Context) error { + ctx, span := otel.Tracer("bgs").Start(ctx, "runRepoCompaction") + defer span.End() + + repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx) + if err != nil { + return fmt.Errorf("failed to get repos to compact: %w", err) + } + + for _, r := range repos { + if err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil { + log.Errorf("failed to compact shards for user %d: %s", r.Usr, err) + continue + } + } + + return nil +} + +func (bgs *BGS) runRepoCompactor(ctx context.Context) { + for range time.Tick(time.Hour) { + } +} diff --git a/carstore/bs.go b/carstore/bs.go index 94f49b303..01d79f4d9 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -27,6 +27,7 @@ import ( carutil "github.com/ipld/go-car/util" cbg "github.com/whyrusleeping/cbor-gen" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "gorm.io/gorm" ) @@ -80,7 +81,7 @@ type CarShard struct { type blockRef struct { ID uint `gorm:"primarykey"` Cid models.DbCID `gorm:"index"` - Shard uint + Shard uint `gorm:"index"` Offset int64 Dirty bool //User uint `gorm:"index"` @@ -1078,12 +1079,46 @@ func (cs *CarStore) openNewCompactedShardFile(ctx context.Context, user models.U return fi, fi.Name(), nil } +type CompactionTarget struct { + Usr models.Uid + NumShards int +} + +func (cs *CarStore) GetCompactionTargets(ctx context.Context) ([]CompactionTarget, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets") + defer span.End() + + var targets []CompactionTarget + if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > 50 order by num_shards desc`).Scan(&targets).Error; err != nil { + return nil, err + } + + return targets, nil +} + func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) error { ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards") defer span.End() + span.SetAttributes(attribute.Int64("user", int64(user))) + + var shards []CarShard + if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil { + return err + } + + var shardIds []uint + for _, s := range shards { + shardIds = append(shardIds, s.ID) + } + + shardsById := make(map[uint]CarShard) + for _, s := range shards { + shardsById[s.ID] = s + } + var brefs []blockRef - if err := cs.meta.Raw(`select br.* from block_refs br left join car_shards cs on br.shard = cs.id where cs.usr = ?`, user).Scan(&brefs).Error; err != nil { + if err := cs.meta.Debug().Raw(`select * from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil { return err } @@ -1121,21 +1156,6 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro results := aggrRefs(brefs) - var shardIds []uint - for _, r := range results { - shardIds = append(shardIds, r.ID) - } - - var shards []CarShard - if err := cs.meta.Find(&shards, "id in (?)", shardIds).Error; err != nil { - return err - } - - shardsById := make(map[uint]CarShard) - for _, s := range shards { - shardsById[s.ID] = s - } - thresholdForPosition := func(i int) int { // TODO: calculate some curve here so earlier shards end up with more // blocks and recent shards end up with less diff --git a/carstore/repo_test.go b/carstore/repo_test.go index 1170f1fec..bcd90439c 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -177,8 +177,8 @@ func TestRepeatedCompactions(t *testing.T) { var recs []cid.Cid head := ncid - for loop := 0; loop < 100; loop++ { - for i := 0; i < 25; i++ { + for loop := 0; loop < 30; loop++ { + for i := 0; i < 20; i++ { ds, err := cs.NewDeltaSession(ctx, 1, &rev) if err != nil { t.Fatal(err) From 9457518337ae8452d622c7ec17d3bf8eb590053b Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 11 Sep 2023 11:19:20 -0700 Subject: [PATCH 5/6] switch to tracking dirty refs in separate table --- carstore/bs.go | 78 +++++++++++++++++++++++++++++++++---------- carstore/repo_test.go | 7 ++-- 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/carstore/bs.go b/carstore/bs.go index 01d79f4d9..10219575e 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -54,6 +54,10 @@ func NewCarStore(meta *gorm.DB, root string) (*CarStore, error) { if err := meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil { return nil, err } + if err := meta.AutoMigrate(&staleRef{}); err != nil { + return nil, err + } + return &CarStore{ meta: meta, rootDir: root, @@ -83,10 +87,15 @@ type blockRef struct { Cid models.DbCID `gorm:"index"` Shard uint `gorm:"index"` Offset int64 - Dirty bool //User uint `gorm:"index"` } +type staleRef struct { + ID uint `gorm:"primarykey"` + Cid models.DbCID `gorm:"index"` + Usr models.Uid +} + type userView struct { cs *CarStore user models.Uid @@ -678,13 +687,15 @@ func (cs *CarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[s } if len(rmcids) > 0 { - var torm []models.DbCID + var torm []staleRef for c := range rmcids { - torm = append(torm, models.DbCID{c}) + torm = append(torm, staleRef{ + Cid: models.DbCID{c}, + Usr: shard.Usr, + }) } - subq := cs.meta.Model(&blockRef{}).Joins("left join car_shards cs on cs.id = block_refs.shard").Where("cid in (?) AND usr = ?", torm, shard.Usr).Select("block_refs.id") - if err := tx.Model(&blockRef{}).Where("id in (?)", subq).UpdateColumn("dirty", true).Error; err != nil { + if err := tx.Create(torm).Error; err != nil { return err } } @@ -997,7 +1008,7 @@ func shouldCompact(s shardStat) bool { return false } -func aggrRefs(brefs []blockRef) []shardStat { +func aggrRefs(brefs []blockRef, staleCids map[cid.Cid]bool) []shardStat { byId := make(map[uint]*shardStat) for _, br := range brefs { @@ -1010,7 +1021,7 @@ func aggrRefs(brefs []blockRef) []shardStat { } s.Total++ - if br.Dirty { + if staleCids[br.Cid.CID] { s.Dirty++ } @@ -1122,16 +1133,19 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro return err } - cset := make(map[cid.Cid]bool) + var staleRefs []staleRef + if err := cs.meta.Debug().Find(&staleRefs, "usr = ?", user).Error; err != nil { + return err + } + + stale := make(map[cid.Cid]bool) var hasDirtyDupes bool - for _, br := range brefs { - if br.Dirty { - if cset[br.Cid.CID] { - hasDirtyDupes = true - break - } - cset[br.Cid.CID] = true + for _, br := range staleRefs { + if stale[br.Cid.CID] { + hasDirtyDupes = true + break } + stale[br.Cid.CID] = true } if hasDirtyDupes { @@ -1149,12 +1163,12 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro keep := make(map[cid.Cid]bool) for _, br := range brefs { - if !br.Dirty { + if !stale[br.Cid.CID] { keep[br.Cid.CID] = true } } - results := aggrRefs(brefs) + results := aggrRefs(brefs, stale) thresholdForPosition := func(i int) int { // TODO: calculate some curve here so earlier shards end up with more @@ -1184,12 +1198,14 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro compactionQueue = append(compactionQueue, cur) } + removedShards := make(map[uint]bool) for _, b := range compactionQueue { if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil { return err } for _, s := range b.shards { + removedShards[s.ID] = true sh, ok := shardsById[s.ID] if !ok { return fmt.Errorf("missing shard to delete") @@ -1201,6 +1217,34 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro } } + // now we need to delete the staleRefs we successfully cleaned up + // we can delete a staleRef if all the shards that have blockRefs with matching stale refs were processed + + brByCid := make(map[cid.Cid][]blockRef) + for _, br := range brefs { + brByCid[br.Cid.CID] = append(brByCid[br.Cid.CID], br) + } + + var staleToDelete []uint + for _, sr := range staleRefs { + brs := brByCid[sr.Cid.CID] + del := true + for _, br := range brs { + if !removedShards[br.Shard] { + del = false + break + } + } + + if del { + staleToDelete = append(staleToDelete, sr.ID) + } + } + + if err := cs.meta.Delete(&staleRef{}, "id in (?)", staleToDelete).Error; err != nil { + return err + } + return nil } diff --git a/carstore/repo_test.go b/carstore/repo_test.go index bcd90439c..c07447af2 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -177,8 +177,8 @@ func TestRepeatedCompactions(t *testing.T) { var recs []cid.Cid head := ncid - for loop := 0; loop < 30; loop++ { - for i := 0; i < 20; i++ { + for loop := 0; loop < 50; loop++ { + for i := 0; i < 100; i++ { ds, err := cs.NewDeltaSession(ctx, 1, &rev) if err != nil { t.Fatal(err) @@ -325,6 +325,9 @@ func BenchmarkRepoWritesCarstore(b *testing.B) { } rev = nrev + if err := ds.CalcDiff(ctx, nroot); err != nil { + b.Fatal(err) + } if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { b.Fatal(err) From ffcc832f522e13277ff122202fccb59305c042f9 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 11 Sep 2023 12:21:31 -0700 Subject: [PATCH 6/6] rm dead code --- bgs/bgs.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index 878cf5396..5a2f622f0 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -1050,8 +1050,3 @@ func (bgs *BGS) runRepoCompaction(ctx context.Context) error { return nil } - -func (bgs *BGS) runRepoCompactor(ctx context.Context) { - for range time.Tick(time.Hour) { - } -}