Skip to content

Commit

Permalink
indexer_scheduler shutdown and comment
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Dec 17, 2024
1 parent bdb8550 commit 106172e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
14 changes: 14 additions & 0 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (c *CrawlDispatcher) Run() {

func (c *CrawlDispatcher) Shutdown() {
close(c.done)
c.repoSyncCond.Broadcast()
}

type catchupJob struct {
Expand Down Expand Up @@ -135,6 +136,8 @@ func (c *CrawlDispatcher) mainLoop() {
crawlJob = job
}
c.maplk.Unlock()
case <-c.done:
return
}
if crawlJob != nil {
pdsID := crawlJob.act.PDS
Expand Down Expand Up @@ -213,6 +216,9 @@ func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork {
func (c *CrawlDispatcher) fetchWorker() {
for {
job := c.nextJob()
if job == nil {
return
}
nextInPds := job.nextInPds
job.nextInPds = nil
if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
Expand Down Expand Up @@ -303,6 +309,7 @@ func (c *CrawlDispatcher) CatchupRepoGaugePoller() {
}

// priority-queue for crawlJob based on eligibleTime
// Put jobs on heap which will come off through nextJob()
func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) {
if crawlJob.alreadyEnheaped {
c.log.Error("CrawlDispatcher trying to enheap alreadyEnheaped", "pds", crawlJob.alreadyEnheaped, "uid", crawlJob.act.Uid)
Expand Down Expand Up @@ -343,6 +350,8 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) {
// nextJob returns next available crawlJob based on eligibleTime; block until some available.
// The caller of .nextJob() should .enheapJob(crawlJob.nextInPds) if any after executing crawlJob's work
//
// nextJob() blocks until it has work to return, or returns nil if we are done due to Shutdown()
//
// There's a tiny race where .nextJob() could return the only work for a PDS,
// outside event could .enheapJob() a next one for that PDS,
// get enheaped as available immediately because the rate limiter hasn't ticked the work done from .nextJob() above,
Expand All @@ -353,6 +362,11 @@ func (c *CrawlDispatcher) nextJob() *crawlWork {
defer c.repoSyncLock.Unlock()
for len(c.repoSyncHeap) == 0 {
c.repoSyncCond.Wait()
select {
case <-c.done:
return nil
default:
}
}
x := heap.Pop(c)
catchupPending.Set(float64(len(c.repoSyncHeap)))
Expand Down
3 changes: 2 additions & 1 deletion indexer/repofetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Lim
return lim
}

// GetOrCreateLimiterLazy is GetOrCreateLimiter with a lazy fetch of PDS from database if needed
// GetOrCreateLimiterLazy is GetOrCreateLimiter with a lazy fetch of PDS from database if needed.
// Used by indexer
func (rf *RepoFetcher) GetOrCreateLimiterLazy(pdsID uint) *rate.Limiter {
rf.LimitMux.RLock()
defer rf.LimitMux.RUnlock()
Expand Down

0 comments on commit 106172e

Please sign in to comment.