Skip to content

Commit

Permalink
have compaction endpoints return more information (#358)
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping authored Oct 2, 2023
2 parents ed79f59 + a33e87b commit 491d808
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 13 deletions.
22 changes: 18 additions & 4 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,27 @@ func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error {
ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactAllRepos")
defer span.End()

if err := bgs.runRepoCompaction(ctx); err != nil {
var dry bool
if strings.ToLower(e.QueryParam("dry")) == "true" {
dry = true
}

lim := 50
if limstr := e.QueryParam("limit"); limstr != "" {
v, err := strconv.Atoi(limstr)
if err != nil {
return err
}

lim = v
}

stats, err := bgs.runRepoCompaction(ctx, lim, dry)
if err != nil {
return fmt.Errorf("compaction run failed: %w", err)
}

return e.JSON(200, map[string]any{
"success": "true",
})
return e.JSON(200, stats)
}

func (bgs *BGS) handleAdminPostResyncPDS(e echo.Context) error {
Expand Down
31 changes: 26 additions & 5 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,23 +1127,44 @@ func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error {
return nil
}

func (bgs *BGS) runRepoCompaction(ctx context.Context) error {
type compactionStats struct {
Completed map[models.Uid]*carstore.CompactionStats
Targets []carstore.CompactionTarget
}

func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool) (*compactionStats, error) {
ctx, span := otel.Tracer("bgs").Start(ctx, "runRepoCompaction")
defer span.End()

repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx)
repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx, 50)
if err != nil {
return fmt.Errorf("failed to get repos to compact: %w", err)
return nil, fmt.Errorf("failed to get repos to compact: %w", err)
}

if lim > 0 && len(repos) > lim {
repos = repos[:lim]
}

if dry {
return &compactionStats{
Targets: repos,
}, nil
}

results := make(map[models.Uid]*carstore.CompactionStats)
for _, r := range repos {
if _, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil {
st, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr)
if err != nil {
log.Errorf("failed to compact shards for user %d: %s", r.Usr, err)
continue
}
results[r.Usr] = st
}

return nil
return &compactionStats{
Targets: repos,
Completed: results,
}, nil
}

type repoHead struct {
Expand Down
10 changes: 6 additions & 4 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func createBlockRefs(ctx context.Context, tx *gorm.DB, brefs []map[string]any) e
ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs")
defer span.End()

if err := createInBatches(ctx, tx, brefs, 100); err != nil {
if err := createInBatches(ctx, tx, brefs, 1000); err != nil {
return err
}

Expand Down Expand Up @@ -1150,12 +1150,12 @@ type CompactionTarget struct {
NumShards int
}

func (cs *CarStore) GetCompactionTargets(ctx context.Context) ([]CompactionTarget, error) {
func (cs *CarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets")
defer span.End()

var targets []CompactionTarget
if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > 50 order by num_shards desc`).Scan(&targets).Error; err != nil {
if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > ? order by num_shards desc`, shardCount).Scan(&targets).Error; err != nil {
return nil, err
}

Expand Down Expand Up @@ -1186,6 +1186,7 @@ func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint)
}

type CompactionStats struct {
TotalRefs int `json:"totalRefs"`
StartShards int `json:"startShards"`
NewShards int `json:"newShards"`
SkippedShards int `json:"skippedShards"`
Expand Down Expand Up @@ -1326,6 +1327,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co

stats := &CompactionStats{
StartShards: len(shards),
TotalRefs: len(brefs),
}

removedShards := make(map[uint]bool)
Expand Down Expand Up @@ -1397,7 +1399,7 @@ func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, stale
}
}

chunkSize := 100
chunkSize := 500
for i := 0; i < len(staleToDelete); i += chunkSize {
sl := staleToDelete[i:]
if len(sl) > chunkSize {
Expand Down

0 comments on commit 491d808

Please sign in to comment.