diff --git a/indexer/crawler.go b/indexer/crawler.go index 129eb590..b4047d2b 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -17,8 +17,10 @@ import ( ) type CrawlDispatcher struct { + // from Crawl() ingest chan *models.ActorInfo + // from AddToCatchupQueue() catchup chan *crawlWork complete chan models.Uid @@ -143,7 +145,9 @@ func (c *CrawlDispatcher) mainLoop() { pdsID := crawlJob.act.PDS limiter := c.repoFetcher.GetOrCreateLimiterLazy(pdsID) now := time.Now() - wouldDelay := limiter.ReserveN(now, 1).DelayFrom(now) + res := limiter.ReserveN(now, 1) + wouldDelay := res.DelayFrom(now) + res.Cancel() crawlJob.eligibleTime = now.Add(wouldDelay) // put crawl job on heap sorted by eligible time c.enheapJob(crawlJob) @@ -318,6 +322,7 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) { defer c.repoSyncLock.Unlock() pdsJobs, has := c.repoSyncPds[crawlJob.act.PDS] if has { + // to honor PDS rate limits, put new job at end of per-pds list if !pdsJobs.alreadyEnheaped { heap.Push(c, crawlJob) catchupPending.Set(float64(len(c.repoSyncHeap)))