diff --git a/indexer/crawler.go b/indexer/crawler.go index 7e2656dd..84a1a795 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -9,22 +9,22 @@ import ( comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/models" - - "go.opentelemetry.io/otel" ) type CrawlDispatcher struct { // from Crawl() - ingest chan *models.ActorInfo + ingest chan *crawlWork // from AddToCatchupQueue() catchup chan *crawlWork - // from main loop to fetchWorker() + // from mainLoop to fetchWorker() repoSync chan *crawlWork + // from fetchWorker back to mainLoop complete chan models.Uid + // maplk is around both todo and inProgress maplk sync.Mutex todo map[models.Uid]*crawlWork inProgress map[models.Uid]*crawlWork @@ -49,9 +49,9 @@ func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog } out := &CrawlDispatcher{ - ingest: make(chan *models.ActorInfo), - repoSync: make(chan *crawlWork), - complete: make(chan models.Uid), + ingest: make(chan *crawlWork), + repoSync: make(chan *crawlWork, concurrency*2), + complete: make(chan models.Uid, concurrency*2), catchup: make(chan *crawlWork), repoFetcher: repoFetcher, concurrency: concurrency, @@ -97,46 +97,14 @@ type crawlWork struct { } func (c *CrawlDispatcher) mainLoop() { - var nextDispatchedJob *crawlWork - var jobsAwaitingDispatch []*crawlWork - - // dispatchQueue represents the repoSync worker channel to which we dispatch crawl work - var dispatchQueue chan *crawlWork - for { + var crawlJob *crawlWork = nil select { - case actorToCrawl := <-c.ingest: - // TODO: max buffer size - crawlJob := c.enqueueJobForActor(actorToCrawl) - if crawlJob == nil { - break - } - - if nextDispatchedJob == nil { - nextDispatchedJob = crawlJob - dispatchQueue = c.repoSync - } else { - jobsAwaitingDispatch = append(jobsAwaitingDispatch, crawlJob) - } - case dispatchQueue <- nextDispatchedJob: - c.dequeueJob(nextDispatchedJob) - - if len(jobsAwaitingDispatch) > 0 { - nextDispatchedJob = jobsAwaitingDispatch[0] - jobsAwaitingDispatch = jobsAwaitingDispatch[1:] - } else { - nextDispatchedJob = nil - dispatchQueue = nil - } - case catchupJob := <-c.catchup: + case crawlJob = <-c.ingest: + case crawlJob = <-c.catchup: + // from AddToCatchupQueue() // CatchupJobs are for processing events that come in while a crawl is in progress // They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress - if nextDispatchedJob == nil { - nextDispatchedJob = catchupJob - dispatchQueue = c.repoSync - } else { - jobsAwaitingDispatch = append(jobsAwaitingDispatch, catchupJob) - } case uid := <-c.complete: c.maplk.Lock() @@ -153,15 +121,15 @@ func (c *CrawlDispatcher) mainLoop() { job.initScrape = false job.catchup = job.next job.next = nil - if nextDispatchedJob == nil { - nextDispatchedJob = job - dispatchQueue = c.repoSync - } else { - jobsAwaitingDispatch = append(jobsAwaitingDispatch, job) - } + crawlJob = job } c.maplk.Unlock() } + + if crawlJob != nil { + c.repoSync <- crawlJob + c.dequeueJob(crawlJob) + } } } @@ -229,6 +197,7 @@ func (c *CrawlDispatcher) fetchWorker() { for { select { case job := <-c.repoSync: + // TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) } @@ -244,13 +213,18 @@ func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *models.ActorInfo) error panic("must have pds for user in queue") } + cw := c.enqueueJobForActor(ai) + if cw == nil { + return nil + } + userCrawlsEnqueued.Inc() - ctx, span := otel.Tracer("crawler").Start(ctx, "addToCrawler") - defer span.End() + //ctx, span := otel.Tracer("crawler").Start(ctx, "addToCrawler") + //defer span.End() select { - case c.ingest <- ai: + case c.ingest <- cw: return nil case <-ctx.Done(): return ctx.Err()