Skip to content

Commit

Permalink
CrawlDispatcher experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Dec 19, 2024
1 parent 16c36b8 commit 4e13454
Showing 1 changed file with 26 additions and 52 deletions.
78 changes: 26 additions & 52 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/models"

"go.opentelemetry.io/otel"
)

type CrawlDispatcher struct {
// from Crawl()
ingest chan *models.ActorInfo
ingest chan *crawlWork

// from AddToCatchupQueue()
catchup chan *crawlWork

// from main loop to fetchWorker()
// from mainLoop to fetchWorker()
repoSync chan *crawlWork

// from fetchWorker back to mainLoop
complete chan models.Uid

// maplk is around both todo and inProgress
maplk sync.Mutex
todo map[models.Uid]*crawlWork
inProgress map[models.Uid]*crawlWork
Expand All @@ -49,9 +49,9 @@ func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog
}

out := &CrawlDispatcher{
ingest: make(chan *models.ActorInfo),
repoSync: make(chan *crawlWork),
complete: make(chan models.Uid),
ingest: make(chan *crawlWork),
repoSync: make(chan *crawlWork, concurrency*2),
complete: make(chan models.Uid, concurrency*2),
catchup: make(chan *crawlWork),
repoFetcher: repoFetcher,
concurrency: concurrency,
Expand Down Expand Up @@ -97,46 +97,14 @@ type crawlWork struct {
}

func (c *CrawlDispatcher) mainLoop() {
var nextDispatchedJob *crawlWork
var jobsAwaitingDispatch []*crawlWork

// dispatchQueue represents the repoSync worker channel to which we dispatch crawl work
var dispatchQueue chan *crawlWork

for {
var crawlJob *crawlWork = nil
select {
case actorToCrawl := <-c.ingest:
// TODO: max buffer size
crawlJob := c.enqueueJobForActor(actorToCrawl)
if crawlJob == nil {
break
}

if nextDispatchedJob == nil {
nextDispatchedJob = crawlJob
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, crawlJob)
}
case dispatchQueue <- nextDispatchedJob:
c.dequeueJob(nextDispatchedJob)

if len(jobsAwaitingDispatch) > 0 {
nextDispatchedJob = jobsAwaitingDispatch[0]
jobsAwaitingDispatch = jobsAwaitingDispatch[1:]
} else {
nextDispatchedJob = nil
dispatchQueue = nil
}
case catchupJob := <-c.catchup:
case crawlJob = <-c.ingest:
case crawlJob = <-c.catchup:
// from AddToCatchupQueue()
// CatchupJobs are for processing events that come in while a crawl is in progress
// They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress
if nextDispatchedJob == nil {
nextDispatchedJob = catchupJob
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, catchupJob)
}
case uid := <-c.complete:
c.maplk.Lock()

Expand All @@ -153,15 +121,15 @@ func (c *CrawlDispatcher) mainLoop() {
job.initScrape = false
job.catchup = job.next
job.next = nil
if nextDispatchedJob == nil {
nextDispatchedJob = job
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, job)
}
crawlJob = job
}
c.maplk.Unlock()
}

if crawlJob != nil {
c.repoSync <- crawlJob
c.dequeueJob(crawlJob)
}
}
}

Expand Down Expand Up @@ -229,6 +197,7 @@ func (c *CrawlDispatcher) fetchWorker() {
for {
select {
case job := <-c.repoSync:
// TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS
if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
}
Expand All @@ -244,13 +213,18 @@ func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *models.ActorInfo) error
panic("must have pds for user in queue")
}

cw := c.enqueueJobForActor(ai)
if cw == nil {
return nil
}

userCrawlsEnqueued.Inc()

ctx, span := otel.Tracer("crawler").Start(ctx, "addToCrawler")
defer span.End()
//ctx, span := otel.Tracer("crawler").Start(ctx, "addToCrawler")
//defer span.End()

select {
case c.ingest <- ai:
case c.ingest <- cw:
return nil
case <-ctx.Done():
return ctx.Err()
Expand Down

0 comments on commit 4e13454

Please sign in to comment.