diff --git a/bgs/admin.go b/bgs/admin.go index e6a18525..7d3ff94c 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -414,7 +414,7 @@ func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error { limits.PerDay.SetLimit(body.PerDay) // Set the crawl rate limit - bgs.repoFetcher.GetOrCreateLimiter(pds.ID, float64(body.CrawlRate)).SetLimit(rate.Limit(body.CrawlRate)) + bgs.repoFetcher.GetOrCreateLimiter2(pds.ID, float64(body.CrawlRate)).SetLimit(rate.Limit(body.CrawlRate)) return e.JSON(200, map[string]any{ "success": "true", diff --git a/indexer/crawler.go b/indexer/crawler.go index 6ee9541e..b8df4ceb 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -3,9 +3,10 @@ package indexer import ( "context" "fmt" + "golang.org/x/time/rate" "log/slog" + "math/rand" "sync" - "sync/atomic" "time" comatproto "github.com/bluesky-social/indigo/api/atproto" @@ -25,12 +26,13 @@ type CrawlDispatcher struct { // from fetchWorker back to mainLoop complete chan models.Uid - // maplk is around both todo and inProgress + // maplk is around: todo, inProgress, pdsQueues, pdsIds maplk sync.Mutex newWork sync.Cond todo map[models.Uid]*crawlWork inProgress map[models.Uid]*crawlWork - pdsQueues map[uint]*SynchronizedChunkQueue[*crawlWork] + pdsQueues map[uint]pdsQueue + pdsIds []uint repoFetcher CrawlRepoFetcher @@ -41,9 +43,16 @@ type CrawlDispatcher struct { done chan struct{} } +type pdsQueue struct { + queue *SynchronizedChunkQueue[*crawlWork] + pdsId uint + limiter *rate.Limiter +} + // this is what we need of RepoFetcher type CrawlRepoFetcher interface { FetchAndIndexRepo(ctx context.Context, job *crawlWork) error + GetOrCreateLimiter(pdsID uint) *rate.Limiter } func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) { @@ -60,7 +69,7 @@ func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog concurrency: concurrency, todo: make(map[models.Uid]*crawlWork), inProgress: make(map[models.Uid]*crawlWork), - pdsQueues: make(map[uint]*SynchronizedChunkQueue[*crawlWork]), + pdsQueues: make(map[uint]pdsQueue), log: log, done: make(chan struct{}), } @@ -102,7 +111,7 @@ type crawlWork struct { } func (c *CrawlDispatcher) mainLoop() { - localPdsQueues := make(map[uint]*SynchronizedChunkQueue[*crawlWork]) + localPdsQueues := make(map[uint]pdsQueue) for { var crawlJob *crawlWork = nil select { @@ -125,7 +134,7 @@ func (c *CrawlDispatcher) mainLoop() { pq = c.getPdsQueue(pds) localPdsQueues[pds] = pq } - pq.Push(crawlJob) + pq.queue.Push(crawlJob) c.newWork.Broadcast() // TODO: unbounded per-PDS queues here @@ -135,15 +144,31 @@ func (c *CrawlDispatcher) mainLoop() { } } -func (c *CrawlDispatcher) getPdsQueue(pds uint) *SynchronizedChunkQueue[*crawlWork] { +func (c *CrawlDispatcher) getPdsQueue(pds uint) pdsQueue { c.maplk.Lock() defer c.maplk.Unlock() pq, ok := c.pdsQueues[pds] - if !ok { - pq = NewSynchronizedChunkQueue[*crawlWork]() - c.pdsQueues[pds] = pq + if ok { + return pq + } + + // yield lock for slow section that hits database and takes a different lock + c.maplk.Unlock() + npq := pdsQueue{ + queue: NewSynchronizedChunkQueue[*crawlWork](), + pdsId: pds, + limiter: c.repoFetcher.GetOrCreateLimiter(pds), + } + // retake lock and see if we still need to insert a pdsQueue + c.maplk.Lock() + + pq, ok = c.pdsQueues[pds] + if ok { + return pq } - return pq + c.pdsQueues[pds] = npq + c.pdsIds = append(c.pdsIds, pds) + return npq } func (c *CrawlDispatcher) recordComplete(uid models.Uid) *crawlWork { @@ -228,41 +253,75 @@ func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork { c.todo[catchup.user.Uid] = cw return cw } -func (c *CrawlDispatcher) getPdsForWork() (uint, *SynchronizedChunkQueue[*crawlWork]) { +func (c *CrawlDispatcher) getPdsForWork() (uint, pdsQueue) { c.maplk.Lock() defer c.maplk.Unlock() + // TODO: this is _maybe_ kinda long for inside the lock? + // internally takes the pdsQueue lock and the limiter lock for { - for pds, pq := range c.pdsQueues { - if pq.Reserved.Load() { - continue - } - if !pq.Any() { - continue - } - ok := pq.Reserved.CompareAndSwap(false, true) - if ok { + minSleep := time.Minute + noQueues := true + // start at a random place in the list of PDS ids + if len(c.pdsIds) > 0 { + offset := rand.Intn(len(c.pdsIds)) + for i := 0; i < len(c.pdsIds); i++ { + pds := c.pdsIds[(i+offset)%len(c.pdsIds)] + pq := c.pdsQueues[pds] + if !pq.queue.Any() { + continue + } + noQueues = false + now := time.Now() + tok := pq.limiter.TokensAt(now) + if tok >= 1.0 { + // ready now! + return pds, pq + } + if tok < 1.0 { + // not ready yet, but calculate next availability in case we need to sleep + rate := float64(pq.limiter.Limit()) + need := 1.0 - tok + dt := time.Duration(float64(time.Second) * (need / rate)) + if dt < minSleep { + minSleep = dt + } + continue + } return pds, pq } } - c.newWork.Wait() + select { + case <-c.done: + // Shutdown + return 0, pdsQueue{} + default: + } + if noQueues { + c.newWork.Wait() + } else { + c.maplk.Unlock() + time.Sleep(minSleep) + c.maplk.Lock() + } } } func (c *CrawlDispatcher) fetchWorker(fetchWorkerId int) { - // TODO: Shutdown log := c.log.With("fwi", fetchWorkerId) for { + // get a pds with some available work pds, pq := c.getPdsForWork() - if pq == nil { + if pq.queue == nil { + // Shutdown return } log.Info("fetchWorker pds", "pds", pds) + // continue with this pds until its queue is empty for { - ok, job := pq.Pop() + ok, job := pq.queue.Pop() if !ok { break } log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid) - // TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) } else { @@ -273,28 +332,9 @@ func (c *CrawlDispatcher) fetchWorker(fetchWorkerId int) { c.complete <- job.act.Uid } log.Info("fetchWorker pds empty", "pds", pds) - pq.Reserved.Store(false) // release our reservation } } -//func (c *CrawlDispatcher) fetchWorkerX() { -// for { -// select { -// case job := <-c.repoSync: -// c.log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid) -// // TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS -// if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { -// c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) -// } else { -// c.log.Info("fetchWorker done", "pds", job.act.PDS, "uid", job.act.Uid) -// } -// -// // TODO: do we still just do this if it errors? -// c.complete <- job.act.Uid -// } -// } -//} - func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *models.ActorInfo) error { if ai.PDS == 0 { panic("must have pds for user in queue") @@ -307,9 +347,6 @@ func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *models.ActorInfo) error userCrawlsEnqueued.Inc() - //ctx, span := otel.Tracer("crawler").Start(ctx, "addToCrawler") - //defer span.End() - c.log.Info("crawl", "pds", cw.act.PDS, "uid", cw.act.Uid) select { case c.ingest <- cw: @@ -426,9 +463,6 @@ type SynchronizedChunkQueue[T any] struct { ChunkQueue[T] l sync.Mutex - - // true if a fetchWorker() is processing this queue - Reserved atomic.Bool } func NewSynchronizedChunkQueue[T any]() *SynchronizedChunkQueue[T] { diff --git a/indexer/repofetch.go b/indexer/repofetch.go index 1e93612d..df8e5724 100644 --- a/indexer/repofetch.go +++ b/indexer/repofetch.go @@ -53,7 +53,8 @@ func (rf *RepoFetcher) GetLimiter(pdsID uint) *rate.Limiter { return rf.Limiters[pdsID] } -func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Limiter { +// GetOrCreateLimiter2 is for when we have already fetched the pds record from the db +func (rf *RepoFetcher) GetOrCreateLimiter2(pdsID uint, pdsrate float64) *rate.Limiter { rf.LimitMux.Lock() defer rf.LimitMux.Unlock() @@ -66,6 +67,37 @@ func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Lim return lim } +// GetOrCreateLimiter will fetch the pds from the db if needed +// See also GetOrCreateLimiter2 if the pds record is already available. +func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint) *rate.Limiter { + rf.LimitMux.Lock() + lim, ok := rf.Limiters[pdsID] + if ok { + // return limiter already built + rf.LimitMux.Unlock() + return lim + } + // release lock while we do db fetch + rf.LimitMux.Unlock() + + var pds models.PDS + if err := rf.db.First(&pds, "id = ?", pdsID).Error; err != nil { + rf.log.Error("failed to find pds", "pdsID", pdsID, "err", err) + return nil + } + nlim := rate.NewLimiter(rate.Limit(pds.CrawlRateLimit), 1) + + rf.LimitMux.Lock() + defer rf.LimitMux.Unlock() + lim, ok = rf.Limiters[pdsID] + if ok { + // it was added while we were getting ready + return lim + } + rf.Limiters[pdsID] = nlim + return nlim +} + func (rf *RepoFetcher) SetLimiter(pdsID uint, lim *rate.Limiter) { rf.LimitMux.Lock() defer rf.LimitMux.Unlock() @@ -83,7 +115,7 @@ func (rf *RepoFetcher) fetchRepo(ctx context.Context, c *xrpc.Client, pds *model attribute.String("rev", rev), ) - limiter := rf.GetOrCreateLimiter(pds.ID, pds.CrawlRateLimit) + limiter := rf.GetOrCreateLimiter2(pds.ID, pds.CrawlRateLimit) // Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos limiter.Wait(ctx)