Skip to content

Commit

Permalink
CrawlDispatcher per-pds queue
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Dec 20, 2024
1 parent 3eff0c5 commit aa744ef
Showing 1 changed file with 152 additions and 7 deletions.
159 changes: 152 additions & 7 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
Expand All @@ -19,15 +20,17 @@ type CrawlDispatcher struct {
catchup chan *crawlWork

// from mainLoop to fetchWorker()
repoSync chan *crawlWork
//repoSync chan *crawlWork

// from fetchWorker back to mainLoop
complete chan models.Uid

// maplk is around both todo and inProgress
maplk sync.Mutex
newWork sync.Cond
todo map[models.Uid]*crawlWork
inProgress map[models.Uid]*crawlWork
pdsQueues map[uint]*SynchronizedChunkQueue[*crawlWork]

repoFetcher CrawlRepoFetcher

Expand All @@ -49,17 +52,19 @@ func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog
}

out := &CrawlDispatcher{
ingest: make(chan *crawlWork),
repoSync: make(chan *crawlWork, concurrency*2),
ingest: make(chan *crawlWork),
//repoSync: make(chan *crawlWork, concurrency*2),
complete: make(chan models.Uid, concurrency*2),
catchup: make(chan *crawlWork),
repoFetcher: repoFetcher,
concurrency: concurrency,
todo: make(map[models.Uid]*crawlWork),
inProgress: make(map[models.Uid]*crawlWork),
pdsQueues: make(map[uint]*SynchronizedChunkQueue[*crawlWork]),
log: log,
done: make(chan struct{}),
}
out.newWork.L = &out.maplk
go out.CatchupRepoGaugePoller()

return out, nil
Expand Down Expand Up @@ -97,6 +102,7 @@ type crawlWork struct {
}

func (c *CrawlDispatcher) mainLoop() {
localPdsQueues := make(map[uint]*SynchronizedChunkQueue[*crawlWork])
for {
var crawlJob *crawlWork = nil
select {
Expand All @@ -112,12 +118,34 @@ func (c *CrawlDispatcher) mainLoop() {
}

if crawlJob != nil {
c.repoSync <- crawlJob
// send to fetchWorker()
pds := crawlJob.act.PDS
pq, ok := localPdsQueues[pds]
if !ok {
pq = c.getPdsQueue(pds)
localPdsQueues[pds] = pq
}
pq.Push(crawlJob)
c.newWork.Broadcast()

// TODO: unbounded per-PDS queues here
//c.repoSync <- crawlJob
c.dequeueJob(crawlJob)
}
}
}

func (c *CrawlDispatcher) getPdsQueue(pds uint) *SynchronizedChunkQueue[*crawlWork] {
c.maplk.Lock()
defer c.maplk.Unlock()
pq, ok := c.pdsQueues[pds]
if !ok {
pq = &SynchronizedChunkQueue[*crawlWork]{}
c.pdsQueues[pds] = pq
}
return pq
}

func (c *CrawlDispatcher) recordComplete(uid models.Uid) *crawlWork {
c.maplk.Lock()
defer c.maplk.Unlock()
Expand Down Expand Up @@ -200,11 +228,35 @@ func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork {
c.todo[catchup.user.Uid] = cw
return cw
}

func (c *CrawlDispatcher) getPdsForWork() (uint, *SynchronizedChunkQueue[*crawlWork]) {
c.maplk.Lock()
defer c.maplk.Unlock()
for {
for pds, pq := range c.pdsQueues {
if pq.Reserved.Load() {
continue
}
ok := pq.Reserved.CompareAndSwap(false, true)
if ok {
return pds, pq
}
}
c.newWork.Wait()
}
}
func (c *CrawlDispatcher) fetchWorker() {
// TODO: Shutdown
for {
select {
case job := <-c.repoSync:
pds, pq := c.getPdsForWork()
if pq == nil {
return
}
c.log.Info("fetchWorker pds", "pds", pds)
for {
ok, job := pq.Pop()
if !ok {
break
}
c.log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid)
// TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS
if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
Expand All @@ -216,9 +268,28 @@ func (c *CrawlDispatcher) fetchWorker() {
// TODO: do we still just do this if it errors?
c.complete <- job.act.Uid
}
c.log.Info("fetchWorker pds empty", "pds", pds)
}
}

//func (c *CrawlDispatcher) fetchWorkerX() {
// for {
// select {
// case job := <-c.repoSync:
// c.log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid)
// // TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS
// if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
// c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
// } else {
// c.log.Info("fetchWorker done", "pds", job.act.PDS, "uid", job.act.Uid)
// }
//
// // TODO: do we still just do this if it errors?
// c.complete <- job.act.Uid
// }
// }
//}

func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *models.ActorInfo) error {
if ai.PDS == 0 {
panic("must have pds for user in queue")
Expand Down Expand Up @@ -301,3 +372,77 @@ func (c *CrawlDispatcher) CatchupRepoGaugePoller() {
}
}
}

// Unbounded queue with chunk-size slices internally
// not synchronized, wrap with mutex if needed
type ChunkQueue[T any] struct {
they [][]T
chunkSize int
}

const defaultChunkSize = 1000

func (cq *ChunkQueue[T]) Push(x T) {
last := len(cq.they) - 1
if last >= 0 {
chunk := cq.they[last]
if cq.chunkSize == 0 {
cq.chunkSize = defaultChunkSize
}
if len(chunk) < cq.chunkSize {
chunk = append(chunk, x)
cq.they[last] = chunk
return
}
}
chunk := make([]T, 1, cq.chunkSize)
chunk[0] = x
cq.they = append(cq.they, chunk)
}

func (cq *ChunkQueue[T]) Pop() (bool, T) {
if len(cq.they) == 0 {
var x T
return false, x
}
chunk := cq.they[0]
out := chunk[0]
if len(chunk) == 1 {
cq.they = cq.they[1:]
} else {
chunk = chunk[1:]
cq.they[0] = chunk
}
return true, out
}

func (cq *ChunkQueue[T]) Any() bool {
return len(cq.they) != 0
}

type SynchronizedChunkQueue[T any] struct {
ChunkQueue[T]

l sync.Mutex

// true if a fetchWorker() is processing this queue
Reserved atomic.Bool
}

func (cq *SynchronizedChunkQueue[T]) Push(x T) {
cq.l.Lock()
defer cq.l.Unlock()
cq.ChunkQueue.Push(x)
}

func (cq *SynchronizedChunkQueue[T]) Pop() (bool, T) {
cq.l.Lock()
defer cq.l.Unlock()
return cq.ChunkQueue.Pop()
}

func (cq *SynchronizedChunkQueue[T]) Any() bool {
cq.l.Lock()
defer cq.l.Unlock()
return cq.ChunkQueue.Any()
}

0 comments on commit aa744ef

Please sign in to comment.