diff --git a/indexer/crawler.go b/indexer/crawler.go index df855f51..e6da0519 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "sync" + "sync/atomic" "time" comatproto "github.com/bluesky-social/indigo/api/atproto" @@ -19,15 +20,17 @@ type CrawlDispatcher struct { catchup chan *crawlWork // from mainLoop to fetchWorker() - repoSync chan *crawlWork + //repoSync chan *crawlWork // from fetchWorker back to mainLoop complete chan models.Uid // maplk is around both todo and inProgress maplk sync.Mutex + newWork sync.Cond todo map[models.Uid]*crawlWork inProgress map[models.Uid]*crawlWork + pdsQueues map[uint]*SynchronizedChunkQueue[*crawlWork] repoFetcher CrawlRepoFetcher @@ -49,17 +52,19 @@ func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog } out := &CrawlDispatcher{ - ingest: make(chan *crawlWork), - repoSync: make(chan *crawlWork, concurrency*2), + ingest: make(chan *crawlWork), + //repoSync: make(chan *crawlWork, concurrency*2), complete: make(chan models.Uid, concurrency*2), catchup: make(chan *crawlWork), repoFetcher: repoFetcher, concurrency: concurrency, todo: make(map[models.Uid]*crawlWork), inProgress: make(map[models.Uid]*crawlWork), + pdsQueues: make(map[uint]*SynchronizedChunkQueue[*crawlWork]), log: log, done: make(chan struct{}), } + out.newWork.L = &out.maplk go out.CatchupRepoGaugePoller() return out, nil @@ -97,6 +102,7 @@ type crawlWork struct { } func (c *CrawlDispatcher) mainLoop() { + localPdsQueues := make(map[uint]*SynchronizedChunkQueue[*crawlWork]) for { var crawlJob *crawlWork = nil select { @@ -112,12 +118,34 @@ func (c *CrawlDispatcher) mainLoop() { } if crawlJob != nil { - c.repoSync <- crawlJob + // send to fetchWorker() + pds := crawlJob.act.PDS + pq, ok := localPdsQueues[pds] + if !ok { + pq = c.getPdsQueue(pds) + localPdsQueues[pds] = pq + } + pq.Push(crawlJob) + c.newWork.Broadcast() + + // TODO: unbounded per-PDS queues here + //c.repoSync <- crawlJob c.dequeueJob(crawlJob) } } } +func (c *CrawlDispatcher) getPdsQueue(pds uint) *SynchronizedChunkQueue[*crawlWork] { + c.maplk.Lock() + defer c.maplk.Unlock() + pq, ok := c.pdsQueues[pds] + if !ok { + pq = &SynchronizedChunkQueue[*crawlWork]{} + c.pdsQueues[pds] = pq + } + return pq +} + func (c *CrawlDispatcher) recordComplete(uid models.Uid) *crawlWork { c.maplk.Lock() defer c.maplk.Unlock() @@ -200,11 +228,35 @@ func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork { c.todo[catchup.user.Uid] = cw return cw } - +func (c *CrawlDispatcher) getPdsForWork() (uint, *SynchronizedChunkQueue[*crawlWork]) { + c.maplk.Lock() + defer c.maplk.Unlock() + for { + for pds, pq := range c.pdsQueues { + if pq.Reserved.Load() { + continue + } + ok := pq.Reserved.CompareAndSwap(false, true) + if ok { + return pds, pq + } + } + c.newWork.Wait() + } +} func (c *CrawlDispatcher) fetchWorker() { + // TODO: Shutdown for { - select { - case job := <-c.repoSync: + pds, pq := c.getPdsForWork() + if pq == nil { + return + } + c.log.Info("fetchWorker pds", "pds", pds) + for { + ok, job := pq.Pop() + if !ok { + break + } c.log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid) // TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { @@ -216,9 +268,28 @@ func (c *CrawlDispatcher) fetchWorker() { // TODO: do we still just do this if it errors? c.complete <- job.act.Uid } + c.log.Info("fetchWorker pds empty", "pds", pds) } } +//func (c *CrawlDispatcher) fetchWorkerX() { +// for { +// select { +// case job := <-c.repoSync: +// c.log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid) +// // TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS +// if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { +// c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) +// } else { +// c.log.Info("fetchWorker done", "pds", job.act.PDS, "uid", job.act.Uid) +// } +// +// // TODO: do we still just do this if it errors? +// c.complete <- job.act.Uid +// } +// } +//} + func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *models.ActorInfo) error { if ai.PDS == 0 { panic("must have pds for user in queue") @@ -301,3 +372,77 @@ func (c *CrawlDispatcher) CatchupRepoGaugePoller() { } } } + +// Unbounded queue with chunk-size slices internally +// not synchronized, wrap with mutex if needed +type ChunkQueue[T any] struct { + they [][]T + chunkSize int +} + +const defaultChunkSize = 1000 + +func (cq *ChunkQueue[T]) Push(x T) { + last := len(cq.they) - 1 + if last >= 0 { + chunk := cq.they[last] + if cq.chunkSize == 0 { + cq.chunkSize = defaultChunkSize + } + if len(chunk) < cq.chunkSize { + chunk = append(chunk, x) + cq.they[last] = chunk + return + } + } + chunk := make([]T, 1, cq.chunkSize) + chunk[0] = x + cq.they = append(cq.they, chunk) +} + +func (cq *ChunkQueue[T]) Pop() (bool, T) { + if len(cq.they) == 0 { + var x T + return false, x + } + chunk := cq.they[0] + out := chunk[0] + if len(chunk) == 1 { + cq.they = cq.they[1:] + } else { + chunk = chunk[1:] + cq.they[0] = chunk + } + return true, out +} + +func (cq *ChunkQueue[T]) Any() bool { + return len(cq.they) != 0 +} + +type SynchronizedChunkQueue[T any] struct { + ChunkQueue[T] + + l sync.Mutex + + // true if a fetchWorker() is processing this queue + Reserved atomic.Bool +} + +func (cq *SynchronizedChunkQueue[T]) Push(x T) { + cq.l.Lock() + defer cq.l.Unlock() + cq.ChunkQueue.Push(x) +} + +func (cq *SynchronizedChunkQueue[T]) Pop() (bool, T) { + cq.l.Lock() + defer cq.l.Unlock() + return cq.ChunkQueue.Pop() +} + +func (cq *SynchronizedChunkQueue[T]) Any() bool { + cq.l.Lock() + defer cq.l.Unlock() + return cq.ChunkQueue.Any() +}