diff --git a/indexer/crawler.go b/indexer/crawler.go index 526da9bb..d6ae642c 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -89,46 +89,15 @@ type crawlWork struct { } func (c *CrawlDispatcher) mainLoop() { - var nextDispatchedJob *crawlWork - var jobsAwaitingDispatch []*crawlWork - - // dispatchQueue represents the repoSync worker channel to which we dispatch crawl work - var dispatchQueue chan *crawlWork - for { + var crawlJob *crawlWork = nil select { case actorToCrawl := <-c.ingest: // TODO: max buffer size - crawlJob := c.enqueueJobForActor(actorToCrawl) - if crawlJob == nil { - break - } - - if nextDispatchedJob == nil { - nextDispatchedJob = crawlJob - dispatchQueue = c.repoSync - } else { - jobsAwaitingDispatch = append(jobsAwaitingDispatch, crawlJob) - } - case dispatchQueue <- nextDispatchedJob: - c.dequeueJob(nextDispatchedJob) - - if len(jobsAwaitingDispatch) > 0 { - nextDispatchedJob = jobsAwaitingDispatch[0] - jobsAwaitingDispatch = jobsAwaitingDispatch[1:] - } else { - nextDispatchedJob = nil - dispatchQueue = nil - } - case catchupJob := <-c.catchup: + crawlJob = c.enqueueJobForActor(actorToCrawl) + case crawlJob = <-c.catchup: // CatchupJobs are for processing events that come in while a crawl is in progress // They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress - if nextDispatchedJob == nil { - nextDispatchedJob = catchupJob - dispatchQueue = c.repoSync - } else { - jobsAwaitingDispatch = append(jobsAwaitingDispatch, catchupJob) - } case uid := <-c.complete: c.maplk.Lock() @@ -145,15 +114,15 @@ func (c *CrawlDispatcher) mainLoop() { job.initScrape = false job.catchup = job.next job.next = nil - if nextDispatchedJob == nil { - nextDispatchedJob = job - dispatchQueue = c.repoSync - } else { - jobsAwaitingDispatch = append(jobsAwaitingDispatch, job) - } + crawlJob = job } c.maplk.Unlock() } + if crawlJob != nil { + c.repoSync <- crawlJob + c.dequeueJob(crawlJob) + crawlJob = nil + } } }