diff --git a/indexer/crawler.go b/indexer/crawler.go index 851623c8..129eb590 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -81,6 +81,7 @@ func (c *CrawlDispatcher) Run() { func (c *CrawlDispatcher) Shutdown() { close(c.done) + c.repoSyncCond.Broadcast() } type catchupJob struct { @@ -135,6 +136,8 @@ func (c *CrawlDispatcher) mainLoop() { crawlJob = job } c.maplk.Unlock() + case <-c.done: + return } if crawlJob != nil { pdsID := crawlJob.act.PDS @@ -213,6 +216,9 @@ func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork { func (c *CrawlDispatcher) fetchWorker() { for { job := c.nextJob() + if job == nil { + return + } nextInPds := job.nextInPds job.nextInPds = nil if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { @@ -303,6 +309,7 @@ func (c *CrawlDispatcher) CatchupRepoGaugePoller() { } // priority-queue for crawlJob based on eligibleTime +// Put jobs on heap which will come off through nextJob() func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) { if crawlJob.alreadyEnheaped { c.log.Error("CrawlDispatcher trying to enheap alreadyEnheaped", "pds", crawlJob.alreadyEnheaped, "uid", crawlJob.act.Uid) @@ -343,6 +350,8 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) { // nextJob returns next available crawlJob based on eligibleTime; block until some available. // The caller of .nextJob() should .enheapJob(crawlJob.nextInPds) if any after executing crawlJob's work // +// nextJob() blocks until it has work to return, or returns nil if we are done due to Shutdown() +// // There's a tiny race where .nextJob() could return the only work for a PDS, // outside event could .enheapJob() a next one for that PDS, // get enheaped as available immediately because the rate limiter hasn't ticked the work done from .nextJob() above, @@ -353,6 +362,11 @@ func (c *CrawlDispatcher) nextJob() *crawlWork { defer c.repoSyncLock.Unlock() for len(c.repoSyncHeap) == 0 { c.repoSyncCond.Wait() + select { + case <-c.done: + return nil + default: + } } x := heap.Pop(c) catchupPending.Set(float64(len(c.repoSyncHeap))) diff --git a/indexer/repofetch.go b/indexer/repofetch.go index 1d257299..0e72f7cb 100644 --- a/indexer/repofetch.go +++ b/indexer/repofetch.go @@ -66,7 +66,8 @@ func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Lim return lim } -// GetOrCreateLimiterLazy is GetOrCreateLimiter with a lazy fetch of PDS from database if needed +// GetOrCreateLimiterLazy is GetOrCreateLimiter with a lazy fetch of PDS from database if needed. +// Used by indexer func (rf *RepoFetcher) GetOrCreateLimiterLazy(pdsID uint) *rate.Limiter { rf.LimitMux.RLock() defer rf.LimitMux.RUnlock()