From bdb85508047b50e7b795dc00f28be8781dba64a5 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Mon, 18 Nov 2024 11:24:08 -0500 Subject: [PATCH] indexer_scheduler --- bgs/bgs.go | 1 + indexer/crawler.go | 204 ++++++++++++++++++++++++++++++++----------- indexer/indexer.go | 2 +- indexer/metrics.go | 5 ++ indexer/repofetch.go | 20 +++++ 5 files changed, 178 insertions(+), 54 deletions(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index 119e63f6..c2fee549 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -874,6 +874,7 @@ func stringLink(lnk *lexutil.LexLink) string { return lnk.String() } +// called from fedmgr.go Slurper.handleConnection() through .cb func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error { ctx, span := tracer.Start(ctx, "handleFedEvent") defer span.End() diff --git a/indexer/crawler.go b/indexer/crawler.go index 526da9bb..851623c8 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -1,12 +1,15 @@ package indexer import ( + "container/heap" "context" "fmt" "log/slog" "sync" "time" + "golang.org/x/time/rate" + comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/models" @@ -16,8 +19,6 @@ import ( type CrawlDispatcher struct { ingest chan *models.ActorInfo - repoSync chan *crawlWork - catchup chan *crawlWork complete chan models.Uid @@ -26,32 +27,45 @@ type CrawlDispatcher struct { todo map[models.Uid]*crawlWork inProgress map[models.Uid]*crawlWork - doRepoCrawl func(context.Context, *crawlWork) error + repoFetcher CrawlRepoFetcher concurrency int + repoSyncHeap []*crawlWork + // map [pdsID] *crawlWork pending jobs for that PDS, head of linked list on .nextInPds + repoSyncPds map[uint]*crawlWork + repoSyncLock sync.Mutex + repoSyncCond sync.Cond + log *slog.Logger done chan struct{} } -func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) { +// this is what we need of RepoFetcher, made interface so it can be passed in without dependency +type CrawlRepoFetcher interface { + FetchAndIndexRepo(ctx context.Context, job *crawlWork) error + GetOrCreateLimiterLazy(pdsID uint) *rate.Limiter +} + +func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) { if concurrency < 1 { return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency") } out := &CrawlDispatcher{ ingest: make(chan *models.ActorInfo), - repoSync: make(chan *crawlWork), complete: make(chan models.Uid), catchup: make(chan *crawlWork), - doRepoCrawl: repoFn, + repoFetcher: repoFetcher, concurrency: concurrency, todo: make(map[models.Uid]*crawlWork), inProgress: make(map[models.Uid]*crawlWork), log: log, done: make(chan struct{}), + repoSyncPds: make(map[uint]*crawlWork), } + out.repoSyncCond.L = &out.repoSyncLock go out.CatchupRepoGaugePoller() return out, nil @@ -86,49 +100,22 @@ type crawlWork struct { // for events that come in while this actor is being processed // next items are processed after the crawl next []*catchupJob + + eligibleTime time.Time + nextInPds *crawlWork + alreadyEnheaped bool } func (c *CrawlDispatcher) mainLoop() { - var nextDispatchedJob *crawlWork - var jobsAwaitingDispatch []*crawlWork - - // dispatchQueue represents the repoSync worker channel to which we dispatch crawl work - var dispatchQueue chan *crawlWork - for { + var crawlJob *crawlWork = nil select { case actorToCrawl := <-c.ingest: // TODO: max buffer size - crawlJob := c.enqueueJobForActor(actorToCrawl) - if crawlJob == nil { - break - } - - if nextDispatchedJob == nil { - nextDispatchedJob = crawlJob - dispatchQueue = c.repoSync - } else { - jobsAwaitingDispatch = append(jobsAwaitingDispatch, crawlJob) - } - case dispatchQueue <- nextDispatchedJob: - c.dequeueJob(nextDispatchedJob) - - if len(jobsAwaitingDispatch) > 0 { - nextDispatchedJob = jobsAwaitingDispatch[0] - jobsAwaitingDispatch = jobsAwaitingDispatch[1:] - } else { - nextDispatchedJob = nil - dispatchQueue = nil - } - case catchupJob := <-c.catchup: + crawlJob = c.enqueueJobForActor(actorToCrawl) + case crawlJob = <-c.catchup: // CatchupJobs are for processing events that come in while a crawl is in progress // They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress - if nextDispatchedJob == nil { - nextDispatchedJob = catchupJob - dispatchQueue = c.repoSync - } else { - jobsAwaitingDispatch = append(jobsAwaitingDispatch, catchupJob) - } case uid := <-c.complete: c.maplk.Lock() @@ -145,15 +132,21 @@ func (c *CrawlDispatcher) mainLoop() { job.initScrape = false job.catchup = job.next job.next = nil - if nextDispatchedJob == nil { - nextDispatchedJob = job - dispatchQueue = c.repoSync - } else { - jobsAwaitingDispatch = append(jobsAwaitingDispatch, job) - } + crawlJob = job } c.maplk.Unlock() } + if crawlJob != nil { + pdsID := crawlJob.act.PDS + limiter := c.repoFetcher.GetOrCreateLimiterLazy(pdsID) + now := time.Now() + wouldDelay := limiter.ReserveN(now, 1).DelayFrom(now) + crawlJob.eligibleTime = now.Add(wouldDelay) + // put crawl job on heap sorted by eligible time + c.enheapJob(crawlJob) + c.dequeueJob(crawlJob) + crawlJob = nil + } } } @@ -219,14 +212,18 @@ func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork { func (c *CrawlDispatcher) fetchWorker() { for { - select { - case job := <-c.repoSync: - if err := c.doRepoCrawl(context.TODO(), job); err != nil { - c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) - } + job := c.nextJob() + nextInPds := job.nextInPds + job.nextInPds = nil + if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { + c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) + } - // TODO: do we still just do this if it errors? - c.complete <- job.act.Uid + // TODO: do we still just do this if it errors? + c.complete <- job.act.Uid + + if nextInPds != nil { + c.enheapJob(nextInPds) } } } @@ -304,3 +301,104 @@ func (c *CrawlDispatcher) CatchupRepoGaugePoller() { } } } + +// priority-queue for crawlJob based on eligibleTime +func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) { + if crawlJob.alreadyEnheaped { + c.log.Error("CrawlDispatcher trying to enheap alreadyEnheaped", "pds", crawlJob.alreadyEnheaped, "uid", crawlJob.act.Uid) + } + c.repoSyncLock.Lock() + defer c.repoSyncLock.Unlock() + pdsJobs, has := c.repoSyncPds[crawlJob.act.PDS] + if has { + if !pdsJobs.alreadyEnheaped { + heap.Push(c, crawlJob) + catchupPending.Set(float64(len(c.repoSyncHeap))) + c.repoSyncCond.Signal() + pdsJobs.alreadyEnheaped = true + } + if pdsJobs == crawlJob { + return + } + for pdsJobs.nextInPds != nil { + pdsJobs = pdsJobs.nextInPds + if pdsJobs == crawlJob { + // we re-enheap something later? weird but okay? + return + } + } + pdsJobs.nextInPds = crawlJob + return + } else { + c.repoSyncPds[crawlJob.act.PDS] = crawlJob + } + if !crawlJob.alreadyEnheaped { + heap.Push(c, crawlJob) + catchupPending.Set(float64(len(c.repoSyncHeap))) + c.repoSyncCond.Signal() + crawlJob.alreadyEnheaped = true + } +} + +// nextJob returns next available crawlJob based on eligibleTime; block until some available. +// The caller of .nextJob() should .enheapJob(crawlJob.nextInPds) if any after executing crawlJob's work +// +// There's a tiny race where .nextJob() could return the only work for a PDS, +// outside event could .enheapJob() a next one for that PDS, +// get enheaped as available immediately because the rate limiter hasn't ticked the work done from .nextJob() above, +// and then the worker trying to execute the next enheaped work for the PDS would execute immediately but Sleep() to wait for the rate limiter. +// We will call this 'not too bad', 'good enough for now'. -- bolson 2024-11 +func (c *CrawlDispatcher) nextJob() *crawlWork { + c.repoSyncLock.Lock() + defer c.repoSyncLock.Unlock() + for len(c.repoSyncHeap) == 0 { + c.repoSyncCond.Wait() + } + x := heap.Pop(c) + catchupPending.Set(float64(len(c.repoSyncHeap))) + crawlJob := x.(*crawlWork) + if crawlJob.nextInPds != nil { + prev := c.repoSyncPds[crawlJob.act.PDS] + if prev != crawlJob { + c.log.Error("CrawlDispatcher internal: pds next is not next in eligible heap", "pds", crawlJob.act.PDS) + } + } + delete(c.repoSyncPds, crawlJob.act.PDS) + crawlJob.alreadyEnheaped = false + return crawlJob +} + +// part of container/heap.Interface and sort.Interface +// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS +func (c *CrawlDispatcher) Len() int { + return len(c.repoSyncHeap) +} + +// part of container/heap.Interface and sort.Interface +// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS +func (c *CrawlDispatcher) Less(i, j int) bool { + return c.repoSyncHeap[i].eligibleTime.Before(c.repoSyncHeap[j].eligibleTime) +} + +// part of container/heap.Interface and sort.Interface +// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS +func (c *CrawlDispatcher) Swap(i, j int) { + t := c.repoSyncHeap[i] + c.repoSyncHeap[i] = c.repoSyncHeap[j] + c.repoSyncHeap[j] = t +} + +// part of container/heap.Interface +// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS +func (c *CrawlDispatcher) Push(x any) { + c.repoSyncHeap = append(c.repoSyncHeap, x.(*crawlWork)) +} + +// part of container/heap.Interface +// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS +func (c *CrawlDispatcher) Pop() any { + heaplen := len(c.repoSyncHeap) + out := c.repoSyncHeap[heaplen-1] + c.repoSyncHeap = c.repoSyncHeap[:heaplen-1] + return out +} diff --git a/indexer/indexer.go b/indexer/indexer.go index e6a324e9..6920c7fb 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -69,7 +69,7 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events } if crawl { - c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency, ix.log) + c, err := NewCrawlDispatcher(fetcher, fetcher.MaxConcurrency, ix.log) if err != nil { return nil, err } diff --git a/indexer/metrics.go b/indexer/metrics.go index 447460e8..29885ddb 100644 --- a/indexer/metrics.go +++ b/indexer/metrics.go @@ -44,3 +44,8 @@ var catchupReposGauge = promauto.NewGauge(prometheus.GaugeOpts{ Name: "indexer_catchup_repos", Help: "Number of repos waiting on catchup", }) + +var catchupPending = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "indexer_catchup_pending", + Help: "Number of catchup pending in heap before cd worker", +}) diff --git a/indexer/repofetch.go b/indexer/repofetch.go index 8ce68bb5..1d257299 100644 --- a/indexer/repofetch.go +++ b/indexer/repofetch.go @@ -66,6 +66,26 @@ func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Lim return lim } +// GetOrCreateLimiterLazy is GetOrCreateLimiter with a lazy fetch of PDS from database if needed +func (rf *RepoFetcher) GetOrCreateLimiterLazy(pdsID uint) *rate.Limiter { + rf.LimitMux.RLock() + defer rf.LimitMux.RUnlock() + + lim, ok := rf.Limiters[pdsID] + if !ok { + // TODO: single source from DefaultCrawlLimit from Slurper or its config source + pdsrate := float64(5) + var pds models.PDS + if err := rf.db.First(&pds, "id = ?", pdsID).Error; err == nil { + pdsrate = pds.CrawlRateLimit + } + lim = rate.NewLimiter(rate.Limit(pdsrate), 1) + rf.Limiters[pdsID] = lim + } + + return lim +} + func (rf *RepoFetcher) SetLimiter(pdsID uint, lim *rate.Limiter) { rf.LimitMux.Lock() defer rf.LimitMux.Unlock()