diff --git a/indexer/crawler.go b/indexer/crawler.go index 84a1a795..19224252 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -101,29 +101,14 @@ func (c *CrawlDispatcher) mainLoop() { var crawlJob *crawlWork = nil select { case crawlJob = <-c.ingest: + c.log.Info("ml ingest", "pds", crawlJob.act.PDS, "uid", crawlJob.act.Uid) case crawlJob = <-c.catchup: + c.log.Info("ml catchup", "pds", crawlJob.act.PDS, "uid", crawlJob.act.Uid) // from AddToCatchupQueue() // CatchupJobs are for processing events that come in while a crawl is in progress // They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress case uid := <-c.complete: - c.maplk.Lock() - - job, ok := c.inProgress[uid] - if !ok { - panic("should not be possible to not have a job in progress we receive a completion signal for") - } - delete(c.inProgress, uid) - - // If there are any subsequent jobs for this UID, add it back to the todo list or buffer. - // We're basically pumping the `next` queue into the `catchup` queue and will do this over and over until the `next` queue is empty. - if len(job.next) > 0 { - c.todo[uid] = job - job.initScrape = false - job.catchup = job.next - job.next = nil - crawlJob = job - } - c.maplk.Unlock() + crawlJob = c.recordComplete(uid) } if crawlJob != nil { @@ -133,6 +118,29 @@ func (c *CrawlDispatcher) mainLoop() { } } +func (c *CrawlDispatcher) recordComplete(uid models.Uid) *crawlWork { + c.maplk.Lock() + defer c.maplk.Unlock() + + job, ok := c.inProgress[uid] + if !ok { + panic("should not be possible to not have a job in progress we receive a completion signal for") + } + delete(c.inProgress, uid) + c.log.Info("ml complete", "pds", job.act.PDS, "uid", job.act.Uid) + + // If there are any subsequent jobs for this UID, add it back to the todo list or buffer. + // We're basically pumping the `next` queue into the `catchup` queue and will do this over and over until the `next` queue is empty. + if len(job.next) > 0 { + c.todo[uid] = job + job.initScrape = false + job.catchup = job.next + job.next = nil + return job + } + return nil +} + // enqueueJobForActor adds a new crawl job to the todo list if there isn't already a job in progress for this actor func (c *CrawlDispatcher) enqueueJobForActor(ai *models.ActorInfo) *crawlWork { c.maplk.Lock() @@ -197,9 +205,12 @@ func (c *CrawlDispatcher) fetchWorker() { for { select { case job := <-c.repoSync: + c.log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid) // TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) + } else { + c.log.Info("fetchWorker done", "pds", job.act.PDS, "uid", job.act.Uid) } // TODO: do we still just do this if it errors?