Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split up batches of deletes during compaction #324

Merged
merged 4 commits into from
Sep 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 60 additions & 29 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ type blockRef struct {
}

type staleRef struct {
ID uint `gorm:"primarykey"`
Cid models.DbCID `gorm:"index"`
Usr models.Uid
ID uint `gorm:"primarykey"`
Cid models.DbCID
Usr models.Uid `gorm:"index"`
}

type userView struct {
Expand Down Expand Up @@ -965,36 +965,52 @@ func (cs *CarStore) deleteShards(ctx context.Context, shs []*CarShard) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShard")
defer span.End()

var ids []uint
for _, sh := range shs {
ids = append(ids, sh.ID)
}
deleteSlice := func(ctx context.Context, subs []*CarShard) error {
var ids []uint
for _, sh := range subs {
ids = append(ids, sh.ID)
}

txn := cs.meta.Begin()
txn := cs.meta.Begin()

if err := txn.Delete(&CarShard{}, "id in (?)", ids).Error; err != nil {
return err
}
if err := txn.Delete(&CarShard{}, "id in (?)", ids).Error; err != nil {
return err
}

if err := txn.Delete(&blockRef{}, "shard in (?)", ids).Error; err != nil {
return err
}
if err := txn.Delete(&blockRef{}, "shard in (?)", ids).Error; err != nil {
return err
}

if err := txn.Commit().Error; err != nil {
return err
}
if err := txn.Commit().Error; err != nil {
return err
}

var outErr error
for _, sh := range shs {
if err := cs.deleteShardFile(ctx, sh); err != nil {
if !os.IsNotExist(err) {
return err
var outErr error
for _, sh := range subs {
if err := cs.deleteShardFile(ctx, sh); err != nil {
if !os.IsNotExist(err) {
return err
}
outErr = err
}
outErr = err
}

return outErr
}

return outErr
chunkSize := 100
for i := 0; i < len(shs); i += chunkSize {
sl := shs[i:]
if len(sl) > chunkSize {
sl = sl[:chunkSize]
}

if err := deleteSlice(ctx, sl); err != nil {
return err
}
}

return nil
}

type shardStat struct {
Expand Down Expand Up @@ -1135,7 +1151,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
span.SetAttributes(attribute.Int64("user", int64(user)))

var shards []CarShard
if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil {
if err := cs.meta.WithContext(ctx).Find(&shards, "usr = ?", user).Error; err != nil {
return err
}

Expand All @@ -1150,12 +1166,12 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
}

var brefs []blockRef
if err := cs.meta.Debug().Raw(`select * from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil {
if err := cs.meta.WithContext(ctx).Raw(`select * from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil {
return err
}

var staleRefs []staleRef
if err := cs.meta.Debug().Find(&staleRefs, "usr = ?", user).Error; err != nil {
if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil {
return err
}

Expand Down Expand Up @@ -1244,6 +1260,13 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
// now we need to delete the staleRefs we successfully cleaned up
// we can delete a staleRef if all the shards that have blockRefs with matching stale refs were processed

return cs.deleteStaleRefs(ctx, brefs, staleRefs, removedShards)
}

func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs")
defer span.End()

brByCid := make(map[cid.Cid][]blockRef)
for _, br := range brefs {
brByCid[br.Cid.CID] = append(brByCid[br.Cid.CID], br)
Expand All @@ -1265,8 +1288,16 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
}
}

if err := cs.meta.Delete(&staleRef{}, "id in (?)", staleToDelete).Error; err != nil {
return err
chunkSize := 100
for i := 0; i < len(staleToDelete); i += chunkSize {
sl := staleToDelete[i:]
if len(sl) > chunkSize {
sl = sl[:chunkSize]
}

if err := cs.meta.Delete(&staleRef{}, "id in (?)", sl).Error; err != nil {
return err
}
}

return nil
Expand Down
Loading