Skip to content

Commit

Permalink
add some routes
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Sep 8, 2023
1 parent 27ca98f commit 17c0eed
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 19 deletions.
35 changes: 35 additions & 0 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bgs

import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -399,3 +400,37 @@ func (bgs *BGS) handleAdminChangePDSCrawlLimit(e echo.Context) error {
"success": "true",
})
}

func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error {
ctx := e.Request().Context()

did := e.QueryParam("did")
if did == "" {
return fmt.Errorf("must pass a did")
}

u, err := bgs.lookupUserByDid(ctx, did)
if err != nil {
return fmt.Errorf("no such user: %w", err)
}

if err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID); err != nil {
return fmt.Errorf("compaction failed: %w", err)
}

return e.JSON(200, map[string]any{
"success": "true",
})
}

func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error {
ctx := e.Request().Context()

if err := bgs.runRepoCompaction(ctx); err != nil {
return fmt.Errorf("compaction run failed: %w", err)
}

return e.JSON(200, map[string]any{
"success": "true",
})
}
26 changes: 26 additions & 0 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error {
// Repo-related Admin API
admin.POST("/repo/takeDown", bgs.handleAdminTakeDownRepo)
admin.POST("/repo/reverseTakedown", bgs.handleAdminReverseTakedown)
admin.POST("/repo/compact", bgs.handleAdminCompactRepo)
admin.POST("/repo/compactAll", bgs.handleAdminCompactAllRepos)

// PDS-related Admin API
admin.GET("/pds/list", bgs.handleListPDSs)
Expand Down Expand Up @@ -1029,3 +1031,27 @@ func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error {

return nil
}

func (bgs *BGS) runRepoCompaction(ctx context.Context) error {
ctx, span := otel.Tracer("bgs").Start(ctx, "runRepoCompaction")
defer span.End()

repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx)
if err != nil {
return fmt.Errorf("failed to get repos to compact: %w", err)
}

for _, r := range repos {
if err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil {
log.Errorf("failed to compact shards for user %d: %s", r.Usr, err)
continue
}
}

return nil
}

func (bgs *BGS) runRepoCompactor(ctx context.Context) {
for range time.Tick(time.Hour) {
}
}
54 changes: 37 additions & 17 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
carutil "github.com/ipld/go-car/util"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -80,7 +81,7 @@ type CarShard struct {
type blockRef struct {
ID uint `gorm:"primarykey"`
Cid models.DbCID `gorm:"index"`
Shard uint
Shard uint `gorm:"index"`
Offset int64
Dirty bool
//User uint `gorm:"index"`
Expand Down Expand Up @@ -1078,12 +1079,46 @@ func (cs *CarStore) openNewCompactedShardFile(ctx context.Context, user models.U
return fi, fi.Name(), nil
}

type CompactionTarget struct {
Usr models.Uid
NumShards int
}

func (cs *CarStore) GetCompactionTargets(ctx context.Context) ([]CompactionTarget, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets")
defer span.End()

var targets []CompactionTarget
if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > 50 order by num_shards desc`).Scan(&targets).Error; err != nil {
return nil, err
}

return targets, nil
}

func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards")
defer span.End()

span.SetAttributes(attribute.Int64("user", int64(user)))

var shards []CarShard
if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil {
return err
}

var shardIds []uint
for _, s := range shards {
shardIds = append(shardIds, s.ID)
}

shardsById := make(map[uint]CarShard)
for _, s := range shards {
shardsById[s.ID] = s
}

var brefs []blockRef
if err := cs.meta.Raw(`select br.* from block_refs br left join car_shards cs on br.shard = cs.id where cs.usr = ?`, user).Scan(&brefs).Error; err != nil {
if err := cs.meta.Debug().Raw(`select * from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil {
return err
}

Expand Down Expand Up @@ -1121,21 +1156,6 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro

results := aggrRefs(brefs)

var shardIds []uint
for _, r := range results {
shardIds = append(shardIds, r.ID)
}

var shards []CarShard
if err := cs.meta.Find(&shards, "id in (?)", shardIds).Error; err != nil {
return err
}

shardsById := make(map[uint]CarShard)
for _, s := range shards {
shardsById[s.ID] = s
}

thresholdForPosition := func(i int) int {
// TODO: calculate some curve here so earlier shards end up with more
// blocks and recent shards end up with less
Expand Down
4 changes: 2 additions & 2 deletions carstore/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ func TestRepeatedCompactions(t *testing.T) {
var recs []cid.Cid
head := ncid

for loop := 0; loop < 100; loop++ {
for i := 0; i < 25; i++ {
for loop := 0; loop < 30; loop++ {
for i := 0; i < 20; i++ {
ds, err := cs.NewDeltaSession(ctx, 1, &rev)
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 17c0eed

Please sign in to comment.