diff --git a/bgs/bgs.go b/bgs/bgs.go index 1199a1ae8..11cf2fc3a 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -143,6 +143,10 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm return nil, err } + compactor := NewCompactor(nil) + compactor.Start(bgs) + bgs.compactor = compactor + return bgs, nil } @@ -366,6 +370,8 @@ func (bgs *BGS) Shutdown() []error { errs = append(errs, err) } + bgs.compactor.Shutdown() + return errs } diff --git a/bgs/compact.go b/bgs/compact.go index 4d0779acf..d200404bb 100644 --- a/bgs/compact.go +++ b/bgs/compact.go @@ -33,6 +33,7 @@ func (q *queue) Append(uid models.Uid, fast bool) { q.q = append(q.q, queueItem{uid: uid, fast: fast}) q.members[uid] = struct{}{} + compactionQueueDepth.Inc() } func (q *queue) Prepend(uid models.Uid, fast bool) { @@ -45,6 +46,7 @@ func (q *queue) Prepend(uid models.Uid, fast bool) { q.q = append([]queueItem{{uid: uid, fast: fast}}, q.q...) q.members[uid] = struct{}{} + compactionQueueDepth.Inc() } func (q *queue) Has(uid models.Uid) bool { @@ -71,6 +73,7 @@ func (q *queue) Remove(uid models.Uid) { } delete(q.members, uid) + compactionQueueDepth.Dec() } func (q *queue) Pop() (*queueItem, bool) { @@ -85,6 +88,7 @@ func (q *queue) Pop() (*queueItem, bool) { q.q = q.q[1:] delete(q.members, item.uid) + compactionQueueDepth.Dec() return &item, true } @@ -96,19 +100,49 @@ type CompactorState struct { } type Compactor struct { - q *queue - state *CompactorState - stateLk sync.RWMutex - exit chan struct{} + q *queue + state *CompactorState + stateLk sync.RWMutex + exit chan struct{} + exited chan struct{} + requeueInterval time.Duration + requeueLimit int + requeueShardCount int + requeueFast bool } -func NewCompactor() *Compactor { +type CompactorOptions struct { + RequeueInterval time.Duration + RequeueLimit int + RequeueShardCount int + RequeueFast bool +} + +func DefaultCompactorOptions() *CompactorOptions { + return &CompactorOptions{ + RequeueInterval: time.Hour * 12, + RequeueLimit: 0, + RequeueShardCount: 50, + RequeueFast: true, + } +} + +func NewCompactor(opts *CompactorOptions) *Compactor { + if opts == nil { + opts = DefaultCompactorOptions() + } + return &Compactor{ q: &queue{ members: make(map[models.Uid]struct{}), }, - state: &CompactorState{}, - exit: make(chan struct{}), + state: &CompactorState{}, + exit: make(chan struct{}), + exited: make(chan struct{}), + requeueInterval: opts.RequeueInterval, + requeueLimit: opts.RequeueLimit, + requeueFast: opts.RequeueFast, + requeueShardCount: opts.RequeueShardCount, } } @@ -141,11 +175,55 @@ func (c *Compactor) GetState() *CompactorState { var errNoReposToCompact = fmt.Errorf("no repos to compact") -func (c *Compactor) Run(bgs *BGS) { +func (c *Compactor) Start(bgs *BGS) { + log.Info("starting compactor") + go c.DoWork(bgs) + go func() { + log.Infow("starting compactor requeue routine", + "interval", c.requeueInterval, + "limit", c.requeueLimit, + "shardCount", c.requeueShardCount, + "fast", c.requeueFast, + ) + + // Enqueue all repos on startup + ctx := context.Background() + ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine") + if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil { + log.Errorw("failed to enqueue all repos", "err", err) + } + span.End() + + t := time.NewTicker(c.requeueInterval) + for { + select { + case <-c.exit: + return + case <-t.C: + ctx := context.Background() + ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine") + if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil { + log.Errorw("failed to enqueue all repos", "err", err) + } + span.End() + } + } + }() +} + +func (c *Compactor) Shutdown() { + log.Info("stopping compactor") + close(c.exit) + <-c.exited + log.Info("compactor stopped") +} + +func (c *Compactor) DoWork(bgs *BGS) { for { select { case <-c.exit: - log.Warn("compactor exiting") + log.Info("compactor worker exiting, no more active compactions running") + close(c.exited) return default: } @@ -155,7 +233,7 @@ func (c *Compactor) Run(bgs *BGS) { state, err := c.CompactNext(ctx, bgs) if err != nil { if err == errNoReposToCompact { - log.Warn("no repos to compact, waiting and retrying") + log.Info("no repos to compact, waiting and retrying") time.Sleep(time.Second * 5) continue } @@ -170,7 +248,7 @@ func (c *Compactor) Run(bgs *BGS) { // Pause for a bit to avoid spamming failed compactions time.Sleep(time.Millisecond * 100) } else { - log.Warnw("compacted repo", + log.Infow("compacted repo", "uid", state.latestUID, "repo", state.latestDID, "status", state.status, @@ -182,7 +260,7 @@ func (c *Compactor) Run(bgs *BGS) { } func (c *Compactor) CompactNext(ctx context.Context, bgs *BGS) (*CompactorState, error) { - ctx, span := otel.Tracer("bgs").Start(ctx, "CompactNext") + ctx, span := otel.Tracer("compactor").Start(ctx, "CompactNext") defer span.End() item, ok := c.q.Pop() @@ -194,26 +272,40 @@ func (c *Compactor) CompactNext(ctx context.Context, bgs *BGS) (*CompactorState, user, err := bgs.lookupUserByUID(ctx, item.uid) if err != nil { + span.RecordError(err) c.SetState(item.uid, "unknown", "failed_getting_user", nil) return nil, fmt.Errorf("failed to get user %d: %w", item.uid, err) } + span.SetAttributes(attribute.String("repo", user.Did), attribute.Int("uid", int(item.uid))) + c.SetState(item.uid, user.Did, "compacting", nil) start := time.Now() st, err := bgs.repoman.CarStore().CompactUserShards(ctx, item.uid, item.fast) if err != nil { + span.RecordError(err) c.SetState(item.uid, user.Did, "failed_compacting", nil) return nil, fmt.Errorf("failed to compact shards for user %d: %w", item.uid, err) } compactionDuration.Observe(time.Since(start).Seconds()) + span.SetAttributes( + attribute.Int("shards.deleted", st.ShardsDeleted), + attribute.Int("shards.new", st.NewShards), + attribute.Int("dupes", st.DupeCount), + attribute.Int("shards.skipped", st.SkippedShards), + attribute.Int("refs", st.TotalRefs), + ) + c.SetState(item.uid, user.Did, "done", st) return c.GetState(), nil } func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) { + ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueRepo") + defer span.End() log.Infow("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast) c.q.Append(user.ID, fast) } @@ -223,7 +315,7 @@ func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) { // shardCount is the number of shards to compact per user (0 = default of 50) // fast is whether to use the fast compaction method (skip large shards) func (c *Compactor) EnqueueAllRepos(ctx context.Context, bgs *BGS, lim int, shardCount int, fast bool) error { - ctx, span := otel.Tracer("bgs").Start(ctx, "EnqueueAllRepos") + ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueAllRepos") defer span.End() span.SetAttributes( @@ -239,7 +331,7 @@ func (c *Compactor) EnqueueAllRepos(ctx context.Context, bgs *BGS, lim int, shar span.SetAttributes(attribute.Int("clampedShardCount", shardCount)) log := log.With("source", "compactor_enqueue_all_repos", "lim", lim, "shardCount", shardCount, "fast", fast) - log.Warn("enqueueing all repos") + log.Info("enqueueing all repos") repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx, shardCount) if err != nil { @@ -258,63 +350,7 @@ func (c *Compactor) EnqueueAllRepos(ctx context.Context, bgs *BGS, lim int, shar c.q.Append(r.Usr, fast) } - log.Warn("done enqueueing all repos") + log.Info("done enqueueing all repos") return nil } - -func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool, fast bool) (*compactionStats, error) { - ctx, span := otel.Tracer("bgs").Start(ctx, "runRepoCompaction") - defer span.End() - - log.Warn("starting repo compaction") - - runStart := time.Now() - - repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx, 50) - if err != nil { - return nil, fmt.Errorf("failed to get repos to compact: %w", err) - } - - if lim > 0 && len(repos) > lim { - repos = repos[:lim] - } - - if dry { - return &compactionStats{ - Targets: repos, - }, nil - } - - results := make(map[models.Uid]*carstore.CompactionStats) - for i, r := range repos { - select { - case <-ctx.Done(): - return &compactionStats{ - Targets: repos, - Completed: results, - }, nil - default: - } - - repostart := time.Now() - st, err := bgs.repoman.CarStore().CompactUserShards(context.Background(), r.Usr, fast) - if err != nil { - log.Errorf("failed to compact shards for user %d: %s", r.Usr, err) - continue - } - compactionDuration.Observe(time.Since(repostart).Seconds()) - results[r.Usr] = st - - if i%100 == 0 { - log.Warnf("compacted %d repos in %s", i+1, time.Since(runStart)) - } - } - - log.Warnf("compacted %d repos in %s", len(repos), time.Since(runStart)) - - return &compactionStats{ - Targets: repos, - Completed: results, - }, nil -} diff --git a/bgs/metrics.go b/bgs/metrics.go index 12b401e04..b33677e6e 100644 --- a/bgs/metrics.go +++ b/bgs/metrics.go @@ -48,6 +48,11 @@ var compactionDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Buckets: prometheus.ExponentialBuckets(0.001, 3, 14), }) +var compactionQueueDepth = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "compaction_queue_depth", + Help: "The current depth of the compaction queue", +}) + var newUsersDiscovered = promauto.NewCounter(prometheus.CounterOpts{ Name: "bgs_new_users_discovered", Help: "The total number of new users discovered directly from the firehose (not from refs)", diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index fd93c676e..e571628f8 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -11,7 +11,7 @@ import ( "time" "github.com/bluesky-social/indigo/api" - "github.com/bluesky-social/indigo/bgs" + libbgs "github.com/bluesky-social/indigo/bgs" "github.com/bluesky-social/indigo/blobs" "github.com/bluesky-social/indigo/carstore" "github.com/bluesky-social/indigo/did" @@ -335,7 +335,7 @@ func Bigsky(cctx *cli.Context) error { } log.Infow("constructing bgs") - bgs, err := bgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, hr, !cctx.Bool("crawl-insecure-ws")) + bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, hr, !cctx.Bool("crawl-insecure-ws")) if err != nil { return err }