Skip to content

Commit

Permalink
simplify CrawlDispatcher.mainLoop()
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Dec 18, 2024
1 parent b125248 commit 602d2ef
Showing 1 changed file with 9 additions and 40 deletions.
49 changes: 9 additions & 40 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,46 +89,15 @@ type crawlWork struct {
}

func (c *CrawlDispatcher) mainLoop() {
var nextDispatchedJob *crawlWork
var jobsAwaitingDispatch []*crawlWork

// dispatchQueue represents the repoSync worker channel to which we dispatch crawl work
var dispatchQueue chan *crawlWork

for {
var crawlJob *crawlWork = nil
select {
case actorToCrawl := <-c.ingest:
// TODO: max buffer size
crawlJob := c.enqueueJobForActor(actorToCrawl)
if crawlJob == nil {
break
}

if nextDispatchedJob == nil {
nextDispatchedJob = crawlJob
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, crawlJob)
}
case dispatchQueue <- nextDispatchedJob:
c.dequeueJob(nextDispatchedJob)

if len(jobsAwaitingDispatch) > 0 {
nextDispatchedJob = jobsAwaitingDispatch[0]
jobsAwaitingDispatch = jobsAwaitingDispatch[1:]
} else {
nextDispatchedJob = nil
dispatchQueue = nil
}
case catchupJob := <-c.catchup:
crawlJob = c.enqueueJobForActor(actorToCrawl)
case crawlJob = <-c.catchup:
// CatchupJobs are for processing events that come in while a crawl is in progress
// They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress
if nextDispatchedJob == nil {
nextDispatchedJob = catchupJob
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, catchupJob)
}
case uid := <-c.complete:
c.maplk.Lock()

Expand All @@ -145,15 +114,15 @@ func (c *CrawlDispatcher) mainLoop() {
job.initScrape = false
job.catchup = job.next
job.next = nil
if nextDispatchedJob == nil {
nextDispatchedJob = job
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, job)
}
crawlJob = job
}
c.maplk.Unlock()
}
if crawlJob != nil {
c.repoSync <- crawlJob
c.dequeueJob(crawlJob)
crawlJob = nil
}
}
}

Expand Down

0 comments on commit 602d2ef

Please sign in to comment.