Skip to content

Commit

Permalink
Wire up compactor into BGS, better tracing, requeue routine
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Oct 12, 2023
1 parent 83c459f commit 00f1a49
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 73 deletions.
6 changes: 6 additions & 0 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
return nil, err
}

compactor := NewCompactor(nil)
compactor.Start(bgs)
bgs.compactor = compactor

return bgs, nil
}

Expand Down Expand Up @@ -366,6 +370,8 @@ func (bgs *BGS) Shutdown() []error {
errs = append(errs, err)
}

bgs.compactor.Shutdown()

return errs
}

Expand Down
178 changes: 107 additions & 71 deletions bgs/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (q *queue) Append(uid models.Uid, fast bool) {

q.q = append(q.q, queueItem{uid: uid, fast: fast})
q.members[uid] = struct{}{}
compactionQueueDepth.Inc()
}

func (q *queue) Prepend(uid models.Uid, fast bool) {
Expand All @@ -45,6 +46,7 @@ func (q *queue) Prepend(uid models.Uid, fast bool) {

q.q = append([]queueItem{{uid: uid, fast: fast}}, q.q...)
q.members[uid] = struct{}{}
compactionQueueDepth.Inc()
}

func (q *queue) Has(uid models.Uid) bool {
Expand All @@ -71,6 +73,7 @@ func (q *queue) Remove(uid models.Uid) {
}

delete(q.members, uid)
compactionQueueDepth.Dec()
}

func (q *queue) Pop() (*queueItem, bool) {
Expand All @@ -85,6 +88,7 @@ func (q *queue) Pop() (*queueItem, bool) {
q.q = q.q[1:]
delete(q.members, item.uid)

compactionQueueDepth.Dec()
return &item, true
}

Expand All @@ -96,19 +100,49 @@ type CompactorState struct {
}

type Compactor struct {
q *queue
state *CompactorState
stateLk sync.RWMutex
exit chan struct{}
q *queue
state *CompactorState
stateLk sync.RWMutex
exit chan struct{}
exited chan struct{}
requeueInterval time.Duration
requeueLimit int
requeueShardCount int
requeueFast bool
}

func NewCompactor() *Compactor {
type CompactorOptions struct {
RequeueInterval time.Duration
RequeueLimit int
RequeueShardCount int
RequeueFast bool
}

func DefaultCompactorOptions() *CompactorOptions {
return &CompactorOptions{
RequeueInterval: time.Hour * 12,
RequeueLimit: 0,
RequeueShardCount: 50,
RequeueFast: true,
}
}

func NewCompactor(opts *CompactorOptions) *Compactor {
if opts == nil {
opts = DefaultCompactorOptions()
}

return &Compactor{
q: &queue{
members: make(map[models.Uid]struct{}),
},
state: &CompactorState{},
exit: make(chan struct{}),
state: &CompactorState{},
exit: make(chan struct{}),
exited: make(chan struct{}),
requeueInterval: opts.RequeueInterval,
requeueLimit: opts.RequeueLimit,
requeueFast: opts.RequeueFast,
requeueShardCount: opts.RequeueShardCount,
}
}

Expand Down Expand Up @@ -141,11 +175,55 @@ func (c *Compactor) GetState() *CompactorState {

var errNoReposToCompact = fmt.Errorf("no repos to compact")

func (c *Compactor) Run(bgs *BGS) {
func (c *Compactor) Start(bgs *BGS) {
log.Info("starting compactor")
go c.DoWork(bgs)
go func() {
log.Infow("starting compactor requeue routine",
"interval", c.requeueInterval,
"limit", c.requeueLimit,
"shardCount", c.requeueShardCount,
"fast", c.requeueFast,
)

// Enqueue all repos on startup
ctx := context.Background()
ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine")
if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil {
log.Errorw("failed to enqueue all repos", "err", err)
}
span.End()

t := time.NewTicker(c.requeueInterval)
for {
select {
case <-c.exit:
return
case <-t.C:
ctx := context.Background()
ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine")
if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil {
log.Errorw("failed to enqueue all repos", "err", err)
}
span.End()
}
}
}()
}

func (c *Compactor) Shutdown() {
log.Info("stopping compactor")
close(c.exit)
<-c.exited
log.Info("compactor stopped")
}

func (c *Compactor) DoWork(bgs *BGS) {
for {
select {
case <-c.exit:
log.Warn("compactor exiting")
log.Info("compactor worker exiting, no more active compactions running")
close(c.exited)
return
default:
}
Expand All @@ -155,7 +233,7 @@ func (c *Compactor) Run(bgs *BGS) {
state, err := c.CompactNext(ctx, bgs)
if err != nil {
if err == errNoReposToCompact {
log.Warn("no repos to compact, waiting and retrying")
log.Info("no repos to compact, waiting and retrying")
time.Sleep(time.Second * 5)
continue
}
Expand All @@ -170,7 +248,7 @@ func (c *Compactor) Run(bgs *BGS) {
// Pause for a bit to avoid spamming failed compactions
time.Sleep(time.Millisecond * 100)
} else {
log.Warnw("compacted repo",
log.Infow("compacted repo",
"uid", state.latestUID,
"repo", state.latestDID,
"status", state.status,
Expand All @@ -182,7 +260,7 @@ func (c *Compactor) Run(bgs *BGS) {
}

func (c *Compactor) CompactNext(ctx context.Context, bgs *BGS) (*CompactorState, error) {
ctx, span := otel.Tracer("bgs").Start(ctx, "CompactNext")
ctx, span := otel.Tracer("compactor").Start(ctx, "CompactNext")
defer span.End()

item, ok := c.q.Pop()
Expand All @@ -194,26 +272,40 @@ func (c *Compactor) CompactNext(ctx context.Context, bgs *BGS) (*CompactorState,

user, err := bgs.lookupUserByUID(ctx, item.uid)
if err != nil {
span.RecordError(err)
c.SetState(item.uid, "unknown", "failed_getting_user", nil)
return nil, fmt.Errorf("failed to get user %d: %w", item.uid, err)
}

span.SetAttributes(attribute.String("repo", user.Did), attribute.Int("uid", int(item.uid)))

c.SetState(item.uid, user.Did, "compacting", nil)

start := time.Now()
st, err := bgs.repoman.CarStore().CompactUserShards(ctx, item.uid, item.fast)
if err != nil {
span.RecordError(err)
c.SetState(item.uid, user.Did, "failed_compacting", nil)
return nil, fmt.Errorf("failed to compact shards for user %d: %w", item.uid, err)
}
compactionDuration.Observe(time.Since(start).Seconds())

span.SetAttributes(
attribute.Int("shards.deleted", st.ShardsDeleted),
attribute.Int("shards.new", st.NewShards),
attribute.Int("dupes", st.DupeCount),
attribute.Int("shards.skipped", st.SkippedShards),
attribute.Int("refs", st.TotalRefs),
)

c.SetState(item.uid, user.Did, "done", st)

return c.GetState(), nil
}

func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) {
ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueRepo")
defer span.End()
log.Infow("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast)
c.q.Append(user.ID, fast)
}
Expand All @@ -223,7 +315,7 @@ func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) {
// shardCount is the number of shards to compact per user (0 = default of 50)
// fast is whether to use the fast compaction method (skip large shards)
func (c *Compactor) EnqueueAllRepos(ctx context.Context, bgs *BGS, lim int, shardCount int, fast bool) error {
ctx, span := otel.Tracer("bgs").Start(ctx, "EnqueueAllRepos")
ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueAllRepos")
defer span.End()

span.SetAttributes(
Expand All @@ -239,7 +331,7 @@ func (c *Compactor) EnqueueAllRepos(ctx context.Context, bgs *BGS, lim int, shar
span.SetAttributes(attribute.Int("clampedShardCount", shardCount))

log := log.With("source", "compactor_enqueue_all_repos", "lim", lim, "shardCount", shardCount, "fast", fast)
log.Warn("enqueueing all repos")
log.Info("enqueueing all repos")

repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx, shardCount)
if err != nil {
Expand All @@ -258,63 +350,7 @@ func (c *Compactor) EnqueueAllRepos(ctx context.Context, bgs *BGS, lim int, shar
c.q.Append(r.Usr, fast)
}

log.Warn("done enqueueing all repos")
log.Info("done enqueueing all repos")

return nil
}

func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool, fast bool) (*compactionStats, error) {
ctx, span := otel.Tracer("bgs").Start(ctx, "runRepoCompaction")
defer span.End()

log.Warn("starting repo compaction")

runStart := time.Now()

repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx, 50)
if err != nil {
return nil, fmt.Errorf("failed to get repos to compact: %w", err)
}

if lim > 0 && len(repos) > lim {
repos = repos[:lim]
}

if dry {
return &compactionStats{
Targets: repos,
}, nil
}

results := make(map[models.Uid]*carstore.CompactionStats)
for i, r := range repos {
select {
case <-ctx.Done():
return &compactionStats{
Targets: repos,
Completed: results,
}, nil
default:
}

repostart := time.Now()
st, err := bgs.repoman.CarStore().CompactUserShards(context.Background(), r.Usr, fast)
if err != nil {
log.Errorf("failed to compact shards for user %d: %s", r.Usr, err)
continue
}
compactionDuration.Observe(time.Since(repostart).Seconds())
results[r.Usr] = st

if i%100 == 0 {
log.Warnf("compacted %d repos in %s", i+1, time.Since(runStart))
}
}

log.Warnf("compacted %d repos in %s", len(repos), time.Since(runStart))

return &compactionStats{
Targets: repos,
Completed: results,
}, nil
}
5 changes: 5 additions & 0 deletions bgs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ var compactionDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Buckets: prometheus.ExponentialBuckets(0.001, 3, 14),
})

var compactionQueueDepth = promauto.NewGauge(prometheus.GaugeOpts{
Name: "compaction_queue_depth",
Help: "The current depth of the compaction queue",
})

var newUsersDiscovered = promauto.NewCounter(prometheus.CounterOpts{
Name: "bgs_new_users_discovered",
Help: "The total number of new users discovered directly from the firehose (not from refs)",
Expand Down
4 changes: 2 additions & 2 deletions cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

"github.com/bluesky-social/indigo/api"
"github.com/bluesky-social/indigo/bgs"
libbgs "github.com/bluesky-social/indigo/bgs"
"github.com/bluesky-social/indigo/blobs"
"github.com/bluesky-social/indigo/carstore"
"github.com/bluesky-social/indigo/did"
Expand Down Expand Up @@ -335,7 +335,7 @@ func Bigsky(cctx *cli.Context) error {
}

log.Infow("constructing bgs")
bgs, err := bgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, hr, !cctx.Bool("crawl-insecure-ws"))
bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, hr, !cctx.Bool("crawl-insecure-ws"))
if err != nil {
return err
}
Expand Down

0 comments on commit 00f1a49

Please sign in to comment.