Skip to content

Commit

Permalink
indexer_scheduler fix limiter usage
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Dec 17, 2024
1 parent 994cce3 commit 13a6d4a
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
)

type CrawlDispatcher struct {
// from Crawl()
ingest chan *models.ActorInfo

// from AddToCatchupQueue()
catchup chan *crawlWork

complete chan models.Uid
Expand Down Expand Up @@ -143,7 +145,9 @@ func (c *CrawlDispatcher) mainLoop() {
pdsID := crawlJob.act.PDS
limiter := c.repoFetcher.GetOrCreateLimiterLazy(pdsID)
now := time.Now()
wouldDelay := limiter.ReserveN(now, 1).DelayFrom(now)
res := limiter.ReserveN(now, 1)
wouldDelay := res.DelayFrom(now)
res.Cancel()
crawlJob.eligibleTime = now.Add(wouldDelay)
// put crawl job on heap sorted by eligible time
c.enheapJob(crawlJob)
Expand Down Expand Up @@ -318,6 +322,7 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) {
defer c.repoSyncLock.Unlock()
pdsJobs, has := c.repoSyncPds[crawlJob.act.PDS]
if has {
// to honor PDS rate limits, put new job at end of per-pds list
if !pdsJobs.alreadyEnheaped {
heap.Push(c, crawlJob)
catchupPending.Set(float64(len(c.repoSyncHeap)))
Expand Down

0 comments on commit 13a6d4a

Please sign in to comment.