Skip to content

Commit

Permalink
split up batches of deletes during compaction (#324)
Browse files Browse the repository at this point in the history
turns out you cant pass infinitely sized arrays to your sql driver.
  • Loading branch information
whyrusleeping authored Sep 21, 2023
2 parents a6fcf91 + b058816 commit e600648
Showing 1 changed file with 60 additions and 29 deletions.
89 changes: 60 additions & 29 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ type blockRef struct {
}

type staleRef struct {
ID uint `gorm:"primarykey"`
Cid models.DbCID `gorm:"index"`
Usr models.Uid
ID uint `gorm:"primarykey"`
Cid models.DbCID
Usr models.Uid `gorm:"index"`
}

type userView struct {
Expand Down Expand Up @@ -965,36 +965,52 @@ func (cs *CarStore) deleteShards(ctx context.Context, shs []*CarShard) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShard")
defer span.End()

var ids []uint
for _, sh := range shs {
ids = append(ids, sh.ID)
}
deleteSlice := func(ctx context.Context, subs []*CarShard) error {
var ids []uint
for _, sh := range subs {
ids = append(ids, sh.ID)
}

txn := cs.meta.Begin()
txn := cs.meta.Begin()

if err := txn.Delete(&CarShard{}, "id in (?)", ids).Error; err != nil {
return err
}
if err := txn.Delete(&CarShard{}, "id in (?)", ids).Error; err != nil {
return err
}

if err := txn.Delete(&blockRef{}, "shard in (?)", ids).Error; err != nil {
return err
}
if err := txn.Delete(&blockRef{}, "shard in (?)", ids).Error; err != nil {
return err
}

if err := txn.Commit().Error; err != nil {
return err
}
if err := txn.Commit().Error; err != nil {
return err
}

var outErr error
for _, sh := range shs {
if err := cs.deleteShardFile(ctx, sh); err != nil {
if !os.IsNotExist(err) {
return err
var outErr error
for _, sh := range subs {
if err := cs.deleteShardFile(ctx, sh); err != nil {
if !os.IsNotExist(err) {
return err
}
outErr = err
}
outErr = err
}

return outErr
}

return outErr
chunkSize := 100
for i := 0; i < len(shs); i += chunkSize {
sl := shs[i:]
if len(sl) > chunkSize {
sl = sl[:chunkSize]
}

if err := deleteSlice(ctx, sl); err != nil {
return err
}
}

return nil
}

type shardStat struct {
Expand Down Expand Up @@ -1135,7 +1151,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
span.SetAttributes(attribute.Int64("user", int64(user)))

var shards []CarShard
if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil {
if err := cs.meta.WithContext(ctx).Find(&shards, "usr = ?", user).Error; err != nil {
return err
}

Expand All @@ -1150,12 +1166,12 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
}

var brefs []blockRef
if err := cs.meta.Debug().Raw(`select * from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil {
if err := cs.meta.WithContext(ctx).Raw(`select * from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil {
return err
}

var staleRefs []staleRef
if err := cs.meta.Debug().Find(&staleRefs, "usr = ?", user).Error; err != nil {
if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil {
return err
}

Expand Down Expand Up @@ -1244,6 +1260,13 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
// now we need to delete the staleRefs we successfully cleaned up
// we can delete a staleRef if all the shards that have blockRefs with matching stale refs were processed

return cs.deleteStaleRefs(ctx, brefs, staleRefs, removedShards)
}

func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs")
defer span.End()

brByCid := make(map[cid.Cid][]blockRef)
for _, br := range brefs {
brByCid[br.Cid.CID] = append(brByCid[br.Cid.CID], br)
Expand All @@ -1265,8 +1288,16 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
}
}

if err := cs.meta.Delete(&staleRef{}, "id in (?)", staleToDelete).Error; err != nil {
return err
chunkSize := 100
for i := 0; i < len(staleToDelete); i += chunkSize {
sl := staleToDelete[i:]
if len(sl) > chunkSize {
sl = sl[:chunkSize]
}

if err := cs.meta.Delete(&staleRef{}, "id in (?)", sl).Error; err != nil {
return err
}
}

return nil
Expand Down

0 comments on commit e600648

Please sign in to comment.