diff --git a/indexer/crawler.go b/indexer/crawler.go index ab0cceae..6ee9541e 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -74,7 +74,7 @@ func (c *CrawlDispatcher) Run() { go c.mainLoop() for i := 0; i < c.concurrency; i++ { - go c.fetchWorker() + go c.fetchWorker(i) } } @@ -247,31 +247,32 @@ func (c *CrawlDispatcher) getPdsForWork() (uint, *SynchronizedChunkQueue[*crawlW c.newWork.Wait() } } -func (c *CrawlDispatcher) fetchWorker() { +func (c *CrawlDispatcher) fetchWorker(fetchWorkerId int) { // TODO: Shutdown + log := c.log.With("fwi", fetchWorkerId) for { pds, pq := c.getPdsForWork() if pq == nil { return } - c.log.Info("fetchWorker pds", "pds", pds) + log.Info("fetchWorker pds", "pds", pds) for { ok, job := pq.Pop() if !ok { break } - c.log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid) + log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid) // TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { - c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) + log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) } else { - c.log.Info("fetchWorker done", "pds", job.act.PDS, "uid", job.act.Uid) + log.Info("fetchWorker done", "pds", job.act.PDS, "uid", job.act.Uid) } // TODO: do we still just do this if it errors? c.complete <- job.act.Uid } - c.log.Info("fetchWorker pds empty", "pds", pds) + log.Info("fetchWorker pds empty", "pds", pds) pq.Reserved.Store(false) // release our reservation } }