Skip to content

Commit

Permalink
add flag to skip big shards during compaction (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping authored Oct 10, 2023
2 parents a7f7b6b + bf99898 commit a2a51ba
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 12 deletions.
14 changes: 12 additions & 2 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,17 @@ func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error {
return fmt.Errorf("must pass a did")
}

var fast bool
if strings.ToLower(e.QueryParam("fast")) == "true" {
fast = true
}

u, err := bgs.lookupUserByDid(ctx, did)
if err != nil {
return fmt.Errorf("no such user: %w", err)
}

stats, err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID)
stats, err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID, fast)
if err != nil {
return fmt.Errorf("compaction failed: %w", err)
}
Expand All @@ -431,6 +436,11 @@ func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error {
dry = true
}

var fast bool
if strings.ToLower(e.QueryParam("fast")) == "true" {
fast = true
}

lim := 50
if limstr := e.QueryParam("limit"); limstr != "" {
v, err := strconv.Atoi(limstr)
Expand All @@ -441,7 +451,7 @@ func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error {
lim = v
}

stats, err := bgs.runRepoCompaction(ctx, lim, dry)
stats, err := bgs.runRepoCompaction(ctx, lim, dry, fast)
if err != nil {
return fmt.Errorf("compaction run failed: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ type compactionStats struct {
Targets []carstore.CompactionTarget
}

func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool) (*compactionStats, error) {
func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool, fast bool) (*compactionStats, error) {
ctx, span := otel.Tracer("bgs").Start(ctx, "runRepoCompaction")
defer span.End()

Expand Down Expand Up @@ -1242,7 +1242,7 @@ func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool) (*comp
}

repostart := time.Now()
st, err := bgs.repoman.CarStore().CompactUserShards(context.Background(), r.Usr)
st, err := bgs.repoman.CarStore().CompactUserShards(context.Background(), r.Usr, fast)
if err != nil {
log.Errorf("failed to compact shards for user %d: %s", r.Usr, err)
continue
Expand Down
43 changes: 42 additions & 1 deletion carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var log = logging.Logger("carstore")

const MaxSliceLength = 2 << 20

const BigShardThreshold = 2 << 20

type CarStore struct {
meta *gorm.DB
rootDir string
Expand Down Expand Up @@ -1207,6 +1209,15 @@ func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint)
return out, nil
}

func shardSize(sh *CarShard) (int64, error) {
st, err := os.Stat(sh.Path)
if err != nil {
return 0, fmt.Errorf("stat %q: %w", sh.Path, err)
}

return st.Size(), nil
}

type CompactionStats struct {
TotalRefs int `json:"totalRefs"`
StartShards int `json:"startShards"`
Expand All @@ -1217,7 +1228,7 @@ type CompactionStats struct {
DupeCount int `json:"dupeCount"`
}

func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*CompactionStats, error) {
func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards")
defer span.End()

Expand All @@ -1228,6 +1239,36 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co
return nil, err
}

sort.Slice(shards, func(i, j int) bool {
return shards[i].Seq < shards[j].Seq
})

if skipBigShards {
// Since we generally expect shards to start bigger and get smaller,
// and because we want to avoid compacting non-adjacent shards
// together, and because we want to avoid running a stat on every
// single shard (can be expensive for repos that havent been compacted
// in a while) we only skip a prefix of shard files that are over the
// threshold. this may end up not skipping some shards that are over
// the threshold if a below-threshold shard occurs before them, but
// since this is a heuristic and imperfect optimization, that is
// acceptable.
var skip int
for i, sh := range shards {
size, err := shardSize(&sh)
if err != nil {
return nil, fmt.Errorf("could not check size of shard file: %w", err)
}

if size > BigShardThreshold {
skip = i + 1
} else {
break
}
}
shards = shards[skip:]
}

span.SetAttributes(attribute.Int("shards", len(shards)))

var shardIds []uint
Expand Down
4 changes: 2 additions & 2 deletions carstore/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestBasicOperation(t *testing.T) {
}
checkRepo(t, cs, buf, recs)

if _, err := cs.CompactUserShards(ctx, 1); err != nil {
if _, err := cs.CompactUserShards(ctx, 1, false); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -228,7 +228,7 @@ func TestRepeatedCompactions(t *testing.T) {
head = nroot
}
fmt.Println("Run compaction", loop)
st, err := cs.CompactUserShards(ctx, 1)
st, err := cs.CompactUserShards(ctx, 1, false)
if err != nil {
t.Fatal(err)
}
Expand Down
33 changes: 28 additions & 5 deletions cmd/gosky/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,28 @@ var bgsSetNewSubsEnabledCmd = &cli.Command{

var bgsCompactRepo = &cli.Command{
Name: "compact-repo",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "fast",
},
},
Action: func(cctx *cli.Context) error {
url := cctx.String("bgs") + "/admin/repo/compact"
uu, err := url.Parse(cctx.String("bgs") + "/admin/repo/compact")
if err != nil {
return err
}

q := uu.Query()
did := cctx.Args().First()
q.Add("did", did)

url += fmt.Sprintf("?did=%s", did)
if cctx.Bool("fast") {
q.Add("fast", "true")
}

req, err := http.NewRequest("POST", url, nil)
uu.RawQuery = q.Encode()

req, err := http.NewRequest("POST", uu.String(), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -354,20 +368,29 @@ var bgsCompactAll = &cli.Command{
&cli.IntFlag{
Name: "limit",
},
&cli.BoolFlag{
Name: "fast",
},
},
Action: func(cctx *cli.Context) error {
uu, err := url.Parse(cctx.String("bgs") + "/admin/repo/compactAll")
if err != nil {
return err
}

q := uu.Query()
if cctx.Bool("dry") {
uu.Query().Add("dry", "true")
q.Add("dry", "true")
}

if cctx.Bool("fast") {
q.Add("fast", "true")
}

if cctx.IsSet("limit") {
uu.Query().Add("limit", fmt.Sprint(cctx.Int("limit")))
q.Add("limit", fmt.Sprint(cctx.Int("limit")))
}
uu.RawQuery = q.Encode()

req, err := http.NewRequest("POST", uu.String(), nil)
if err != nil {
Expand Down

0 comments on commit a2a51ba

Please sign in to comment.