Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into bolson/log-expirement
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Nov 30, 2024
2 parents dfa94a9 + 9bb22ba commit 716442a
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 9 deletions.
21 changes: 20 additions & 1 deletion bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
userLookupDuration.Observe(time.Since(s).Seconds())
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
repoCommitsResultCounter.WithLabelValues(host.Host, "nou").Inc()
return fmt.Errorf("looking up event user: %w", err)
}

Expand All @@ -904,6 +905,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
subj, err := bgs.createExternalUser(ctx, evt.Repo)
newUserDiscoveryDuration.Observe(time.Since(start).Seconds())
if err != nil {
repoCommitsResultCounter.WithLabelValues(host.Host, "uerr").Inc()
return fmt.Errorf("fed event create external user: %w", err)
}

Expand All @@ -918,20 +920,24 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
if u.GetTakenDown() || ustatus == events.AccountStatusTakendown {
span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown()))
bgs.log.Debug("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
repoCommitsResultCounter.WithLabelValues(host.Host, "tdu").Inc()
return nil
}

if ustatus == events.AccountStatusSuspended {
bgs.log.Debug("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
repoCommitsResultCounter.WithLabelValues(host.Host, "susu").Inc()
return nil
}

if ustatus == events.AccountStatusDeactivated {
bgs.log.Debug("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
repoCommitsResultCounter.WithLabelValues(host.Host, "du").Inc()
return nil
}

if evt.Rebase {
repoCommitsResultCounter.WithLabelValues(host.Host, "rebase").Inc()
return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host)
}

Expand All @@ -942,10 +948,12 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event

subj, err := bgs.createExternalUser(ctx, evt.Repo)
if err != nil {
repoCommitsResultCounter.WithLabelValues(host.Host, "uerr2").Inc()
return err
}

if subj.PDS != host.ID {
repoCommitsResultCounter.WithLabelValues(host.Host, "noauth").Inc()
return fmt.Errorf("event from non-authoritative pds")
}
}
Expand All @@ -954,16 +962,19 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
span.SetAttributes(attribute.Bool("tombstoned", true))
// we've checked the authority of the users PDS, so reinstate the account
if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil {
repoCommitsResultCounter.WithLabelValues(host.Host, "tomb").Inc()
return fmt.Errorf("failed to un-tombstone a user: %w", err)
}
u.SetTombstoned(false)

ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
repoCommitsResultCounter.WithLabelValues(host.Host, "nou2").Inc()
return fmt.Errorf("failed to look up user (tombstone recover): %w", err)
}

// Now a simple re-crawl should suffice to bring the user back online
repoCommitsResultCounter.WithLabelValues(host.Host, "catchupt").Inc()
return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
}

Expand All @@ -972,6 +983,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
rebasesCounter.WithLabelValues(host.Host).Add(1)
ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
repoCommitsResultCounter.WithLabelValues(host.Host, "nou3").Inc()
return fmt.Errorf("failed to look up user (slow path): %w", err)
}

Expand All @@ -983,26 +995,33 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
// processor coming off of the pds stream, we should investigate
// whether or not we even need this 'slow path' logic, as it makes
// accounting for which events have been processed much harder
repoCommitsResultCounter.WithLabelValues(host.Host, "catchup").Inc()
return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
}

if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil {
bgs.log.Warn("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())

if errors.Is(err, carstore.ErrRepoBaseMismatch) || ipld.IsNotFound(err) {
ai, lerr := bgs.Index.LookupUser(ctx, u.ID)
if lerr != nil {
bgs.log.Warn("failed handling event, no user", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
repoCommitsResultCounter.WithLabelValues(host.Host, "nou4").Inc()
return fmt.Errorf("failed to look up user %s (%d) (err case: %s): %w", u.Did, u.ID, err, lerr)
}

span.SetAttributes(attribute.Bool("catchup_queue", true))

bgs.log.Info("failed handling event, catchup", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
repoCommitsResultCounter.WithLabelValues(host.Host, "catchup2").Inc()
return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
}

bgs.log.Warn("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
repoCommitsResultCounter.WithLabelValues(host.Host, "err").Inc()
return fmt.Errorf("handle user event failed: %w", err)
}

repoCommitsResultCounter.WithLabelValues(host.Host, "ok").Inc()
return nil
case env.RepoHandle != nil:
bgs.log.Info("bgs got repo handle event", "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle)
Expand Down
4 changes: 4 additions & 0 deletions bgs/fedmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.PDS, s

cursor := host.Cursor

connectedInbound.Inc()
defer connectedInbound.Dec()
// TODO:? maybe keep a gauge of 'in retry backoff' sources?

var backoff int
for {
select {
Expand Down
10 changes: 10 additions & 0 deletions bgs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ var repoCommitsReceivedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "The total number of events received",
}, []string{"pds"})

var repoCommitsResultCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "repo_commits_result_counter",
Help: "The results of commit events received",
}, []string{"pds", "status"})

var rebasesCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "event_rebases",
Help: "The total number of rebase events received",
Expand All @@ -42,6 +47,11 @@ var externalUserCreationAttempts = promauto.NewCounter(prometheus.CounterOpts{
Help: "The total number of external users created",
})

var connectedInbound = promauto.NewGauge(prometheus.GaugeOpts{
Name: "bgs_connected_inbound",
Help: "Number of inbound firehoses we are consuming",
})

var compactionDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "compaction_duration",
Help: "A histogram of compaction latencies",
Expand Down
3 changes: 2 additions & 1 deletion cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,11 @@ func runBigsky(cctx *cli.Context) error {

rf := indexer.NewRepoFetcher(db, repoman, cctx.Int("max-fetch-concurrency"))

ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, rf, true, cctx.Bool("spidering"), false)
ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, rf, true, false, cctx.Bool("spidering"))
if err != nil {
return err
}
defer ix.Shutdown()

rlskip := cctx.String("bsky-social-rate-limit-skip")
ix.ApplyPDSClientSettings = func(c *xrpc.Client) {
Expand Down
37 changes: 34 additions & 3 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"sync"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/models"
Expand All @@ -30,14 +31,16 @@ type CrawlDispatcher struct {
concurrency int

log *slog.Logger

done chan struct{}
}

func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) {
if concurrency < 1 {
return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency")
}

return &CrawlDispatcher{
out := &CrawlDispatcher{
ingest: make(chan *models.ActorInfo),
repoSync: make(chan *crawlWork),
complete: make(chan models.Uid),
Expand All @@ -47,7 +50,11 @@ func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurre
todo: make(map[models.Uid]*crawlWork),
inProgress: make(map[models.Uid]*crawlWork),
log: log,
}, nil
done: make(chan struct{}),
}
go out.CatchupRepoGaugePoller()

return out, nil
}

func (c *CrawlDispatcher) Run() {
Expand All @@ -58,6 +65,10 @@ func (c *CrawlDispatcher) Run() {
}
}

func (c *CrawlDispatcher) Shutdown() {
close(c.done)
}

type catchupJob struct {
evt *comatproto.SyncSubscribeRepos_Commit
host *models.PDS
Expand Down Expand Up @@ -177,24 +188,26 @@ func (c *CrawlDispatcher) dequeueJob(job *crawlWork) {
}

func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork {
catchupEventsEnqueued.Inc()
c.maplk.Lock()
defer c.maplk.Unlock()

// If the actor crawl is enqueued, we can append to the catchup queue which gets emptied during the crawl
job, ok := c.todo[catchup.user.Uid]
if ok {
catchupEventsEnqueued.WithLabelValues("todo").Inc()
job.catchup = append(job.catchup, catchup)
return nil
}

// If the actor crawl is in progress, we can append to the nextr queue which gets emptied after the crawl
job, ok = c.inProgress[catchup.user.Uid]
if ok {
catchupEventsEnqueued.WithLabelValues("prog").Inc()
job.next = append(job.next, catchup)
return nil
}

catchupEventsEnqueued.WithLabelValues("new").Inc()
// Otherwise, we need to create a new crawl job for this actor and enqueue it
cw := &crawlWork{
act: catchup.user,
Expand Down Expand Up @@ -273,3 +286,21 @@ func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, uid models.Uid) bo

return false
}

func (c *CrawlDispatcher) countReposInSlowPath() int {
c.maplk.Lock()
defer c.maplk.Unlock()
return len(c.inProgress) + len(c.todo)
}

func (c *CrawlDispatcher) CatchupRepoGaugePoller() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.done:
case <-ticker.C:
catchupReposGauge.Set(float64(c.countReposInSlowPath()))
}
}
}
6 changes: 6 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events
return ix, nil
}

func (ix *Indexer) Shutdown() {
if ix.Crawler != nil {
ix.Crawler.Shutdown()
}
}

func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) error {
ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent")
defer span.End()
Expand Down
14 changes: 12 additions & 2 deletions indexer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,22 @@ var reposFetched = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "Number of repos fetched",
}, []string{"status"})

var catchupEventsEnqueued = promauto.NewCounter(prometheus.CounterOpts{
var catchupEventsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "indexer_catchup_events_enqueued",
Help: "Number of catchup events enqueued",
})
}, []string{"how"})

var catchupEventsProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "indexer_catchup_events_processed",
Help: "Number of catchup events processed",
})

var catchupEventsFailed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "indexer_catchup_events_failed",
Help: "Number of catchup events processed",
}, []string{"err"})

var catchupReposGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "indexer_catchup_repos",
Help: "Number of repos waiting on catchup",
})
1 change: 1 addition & 0 deletions indexer/posts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (ix *testIx) Cleanup() {
if ix.dir != "" {
_ = os.RemoveAll(ix.dir)
}
ix.ix.Shutdown()
}

// TODO: dedupe this out into some testing utility package
Expand Down
2 changes: 2 additions & 0 deletions indexer/repofetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@ func (rf *RepoFetcher) FetchAndIndexRepo(ctx context.Context, job *crawlWork) er

var pds models.PDS
if err := rf.db.First(&pds, "id = ?", ai.PDS).Error; err != nil {
catchupEventsFailed.WithLabelValues("nopds").Inc()
return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err)
}

rev, err := rf.repoman.GetRepoRev(ctx, ai.Uid)
if err != nil && !isNotFound(err) {
catchupEventsFailed.WithLabelValues("noroot").Inc()
return fmt.Errorf("failed to get repo root: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions plc/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ func (r *MemcachedDidResolver) GetDocument(ctx context.Context, didstr string) (
doc, ok := r.tryCache(didstr)
if ok {
span.SetAttributes(attribute.Bool("cache", true))
cacheHitsTotal.Inc()
memcacheHitsTotal.Inc()
return doc, nil
}
cacheMissesTotal.Inc()
memcacheMissesTotal.Inc()
span.SetAttributes(attribute.Bool("cache", false))

doc, err := r.res.GetDocument(ctx, didstr)
Expand Down
10 changes: 10 additions & 0 deletions plc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ var cacheMissesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "plc_cache_misses_total",
Help: "Total number of cache misses",
})

var memcacheHitsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "plc_memcache_hits_total",
Help: "Total number of cache hits",
})

var memcacheMissesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "plc_memcache_misses_total",
Help: "Total number of cache misses",
})

0 comments on commit 716442a

Please sign in to comment.