Skip to content

Commit

Permalink
CrawlDispatcher fetchWorker logging
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Dec 20, 2024
1 parent 8e778a7 commit 6538e72
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *CrawlDispatcher) Run() {
go c.mainLoop()

for i := 0; i < c.concurrency; i++ {
go c.fetchWorker()
go c.fetchWorker(i)
}
}

Expand Down Expand Up @@ -247,31 +247,32 @@ func (c *CrawlDispatcher) getPdsForWork() (uint, *SynchronizedChunkQueue[*crawlW
c.newWork.Wait()
}
}
func (c *CrawlDispatcher) fetchWorker() {
func (c *CrawlDispatcher) fetchWorker(fetchWorkerId int) {
// TODO: Shutdown
log := c.log.With("fwi", fetchWorkerId)
for {
pds, pq := c.getPdsForWork()
if pq == nil {
return
}
c.log.Info("fetchWorker pds", "pds", pds)
log.Info("fetchWorker pds", "pds", pds)
for {
ok, job := pq.Pop()
if !ok {
break
}
c.log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid)
log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid)
// TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS
if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
} else {
c.log.Info("fetchWorker done", "pds", job.act.PDS, "uid", job.act.Uid)
log.Info("fetchWorker done", "pds", job.act.PDS, "uid", job.act.Uid)
}

// TODO: do we still just do this if it errors?
c.complete <- job.act.Uid
}
c.log.Info("fetchWorker pds empty", "pds", pds)
log.Info("fetchWorker pds empty", "pds", pds)
pq.Reserved.Store(false) // release our reservation
}
}
Expand Down

0 comments on commit 6538e72

Please sign in to comment.