Skip to content

Commit

Permalink
CrawlDispatcher bring back multiple workers per PDS, consciously
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Dec 20, 2024
1 parent 6538e72 commit d91feca
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 54 deletions.
2 changes: 1 addition & 1 deletion bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error {
limits.PerDay.SetLimit(body.PerDay)

// Set the crawl rate limit
bgs.repoFetcher.GetOrCreateLimiter(pds.ID, float64(body.CrawlRate)).SetLimit(rate.Limit(body.CrawlRate))
bgs.repoFetcher.GetOrCreateLimiter2(pds.ID, float64(body.CrawlRate)).SetLimit(rate.Limit(body.CrawlRate))

return e.JSON(200, map[string]any{
"success": "true",
Expand Down
136 changes: 85 additions & 51 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package indexer
import (
"context"
"fmt"
"golang.org/x/time/rate"
"log/slog"
"math/rand"
"sync"
"sync/atomic"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
Expand All @@ -25,12 +26,13 @@ type CrawlDispatcher struct {
// from fetchWorker back to mainLoop
complete chan models.Uid

// maplk is around both todo and inProgress
// maplk is around: todo, inProgress, pdsQueues, pdsIds
maplk sync.Mutex
newWork sync.Cond
todo map[models.Uid]*crawlWork
inProgress map[models.Uid]*crawlWork
pdsQueues map[uint]*SynchronizedChunkQueue[*crawlWork]
pdsQueues map[uint]pdsQueue
pdsIds []uint

repoFetcher CrawlRepoFetcher

Expand All @@ -41,9 +43,16 @@ type CrawlDispatcher struct {
done chan struct{}
}

type pdsQueue struct {
queue *SynchronizedChunkQueue[*crawlWork]
pdsId uint
limiter *rate.Limiter
}

// this is what we need of RepoFetcher
type CrawlRepoFetcher interface {
FetchAndIndexRepo(ctx context.Context, job *crawlWork) error
GetOrCreateLimiter(pdsID uint) *rate.Limiter
}

func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) {
Expand All @@ -60,7 +69,7 @@ func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog
concurrency: concurrency,
todo: make(map[models.Uid]*crawlWork),
inProgress: make(map[models.Uid]*crawlWork),
pdsQueues: make(map[uint]*SynchronizedChunkQueue[*crawlWork]),
pdsQueues: make(map[uint]pdsQueue),
log: log,
done: make(chan struct{}),
}
Expand Down Expand Up @@ -102,7 +111,7 @@ type crawlWork struct {
}

func (c *CrawlDispatcher) mainLoop() {
localPdsQueues := make(map[uint]*SynchronizedChunkQueue[*crawlWork])
localPdsQueues := make(map[uint]pdsQueue)
for {
var crawlJob *crawlWork = nil
select {
Expand All @@ -125,7 +134,7 @@ func (c *CrawlDispatcher) mainLoop() {
pq = c.getPdsQueue(pds)
localPdsQueues[pds] = pq
}
pq.Push(crawlJob)
pq.queue.Push(crawlJob)
c.newWork.Broadcast()

// TODO: unbounded per-PDS queues here
Expand All @@ -135,15 +144,31 @@ func (c *CrawlDispatcher) mainLoop() {
}
}

func (c *CrawlDispatcher) getPdsQueue(pds uint) *SynchronizedChunkQueue[*crawlWork] {
func (c *CrawlDispatcher) getPdsQueue(pds uint) pdsQueue {
c.maplk.Lock()
defer c.maplk.Unlock()
pq, ok := c.pdsQueues[pds]
if !ok {
pq = NewSynchronizedChunkQueue[*crawlWork]()
c.pdsQueues[pds] = pq
if ok {
return pq
}

// yield lock for slow section that hits database and takes a different lock
c.maplk.Unlock()
npq := pdsQueue{
queue: NewSynchronizedChunkQueue[*crawlWork](),
pdsId: pds,
limiter: c.repoFetcher.GetOrCreateLimiter(pds),
}
// retake lock and see if we still need to insert a pdsQueue
c.maplk.Lock()

pq, ok = c.pdsQueues[pds]
if ok {
return pq
}
return pq
c.pdsQueues[pds] = npq
c.pdsIds = append(c.pdsIds, pds)
return npq
}

func (c *CrawlDispatcher) recordComplete(uid models.Uid) *crawlWork {
Expand Down Expand Up @@ -228,41 +253,75 @@ func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork {
c.todo[catchup.user.Uid] = cw
return cw
}
func (c *CrawlDispatcher) getPdsForWork() (uint, *SynchronizedChunkQueue[*crawlWork]) {
func (c *CrawlDispatcher) getPdsForWork() (uint, pdsQueue) {
c.maplk.Lock()
defer c.maplk.Unlock()
// TODO: this is _maybe_ kinda long for inside the lock?
// internally takes the pdsQueue lock and the limiter lock
for {
for pds, pq := range c.pdsQueues {
if pq.Reserved.Load() {
continue
}
if !pq.Any() {
continue
}
ok := pq.Reserved.CompareAndSwap(false, true)
if ok {
minSleep := time.Minute
noQueues := true
// start at a random place in the list of PDS ids
if len(c.pdsIds) > 0 {
offset := rand.Intn(len(c.pdsIds))
for i := 0; i < len(c.pdsIds); i++ {
pds := c.pdsIds[(i+offset)%len(c.pdsIds)]
pq := c.pdsQueues[pds]
if !pq.queue.Any() {
continue
}
noQueues = false
now := time.Now()
tok := pq.limiter.TokensAt(now)
if tok >= 1.0 {
// ready now!
return pds, pq
}
if tok < 1.0 {
// not ready yet, but calculate next availability in case we need to sleep
rate := float64(pq.limiter.Limit())
need := 1.0 - tok
dt := time.Duration(float64(time.Second) * (need / rate))
if dt < minSleep {
minSleep = dt
}
continue
}
return pds, pq
}
}
c.newWork.Wait()
select {
case <-c.done:
// Shutdown
return 0, pdsQueue{}
default:
}
if noQueues {
c.newWork.Wait()
} else {
c.maplk.Unlock()
time.Sleep(minSleep)
c.maplk.Lock()
}
}
}
func (c *CrawlDispatcher) fetchWorker(fetchWorkerId int) {
// TODO: Shutdown
log := c.log.With("fwi", fetchWorkerId)
for {
// get a pds with some available work
pds, pq := c.getPdsForWork()
if pq == nil {
if pq.queue == nil {
// Shutdown
return
}
log.Info("fetchWorker pds", "pds", pds)
// continue with this pds until its queue is empty
for {
ok, job := pq.Pop()
ok, job := pq.queue.Pop()
if !ok {
break
}
log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid)
// TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS
if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
} else {
Expand All @@ -273,28 +332,9 @@ func (c *CrawlDispatcher) fetchWorker(fetchWorkerId int) {
c.complete <- job.act.Uid
}
log.Info("fetchWorker pds empty", "pds", pds)
pq.Reserved.Store(false) // release our reservation
}
}

//func (c *CrawlDispatcher) fetchWorkerX() {
// for {
// select {
// case job := <-c.repoSync:
// c.log.Info("fetchWorker start", "pds", job.act.PDS, "uid", job.act.Uid)
// // TODO: only run one fetchWorker per PDS because FetchAndIndexRepo will Limiter.Wait() to ensure rate limits per-PDS
// if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
// c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
// } else {
// c.log.Info("fetchWorker done", "pds", job.act.PDS, "uid", job.act.Uid)
// }
//
// // TODO: do we still just do this if it errors?
// c.complete <- job.act.Uid
// }
// }
//}

func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *models.ActorInfo) error {
if ai.PDS == 0 {
panic("must have pds for user in queue")
Expand All @@ -307,9 +347,6 @@ func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *models.ActorInfo) error

userCrawlsEnqueued.Inc()

//ctx, span := otel.Tracer("crawler").Start(ctx, "addToCrawler")
//defer span.End()

c.log.Info("crawl", "pds", cw.act.PDS, "uid", cw.act.Uid)
select {
case c.ingest <- cw:
Expand Down Expand Up @@ -426,9 +463,6 @@ type SynchronizedChunkQueue[T any] struct {
ChunkQueue[T]

l sync.Mutex

// true if a fetchWorker() is processing this queue
Reserved atomic.Bool
}

func NewSynchronizedChunkQueue[T any]() *SynchronizedChunkQueue[T] {
Expand Down
36 changes: 34 additions & 2 deletions indexer/repofetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func (rf *RepoFetcher) GetLimiter(pdsID uint) *rate.Limiter {
return rf.Limiters[pdsID]
}

func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Limiter {
// GetOrCreateLimiter2 is for when we have already fetched the pds record from the db
func (rf *RepoFetcher) GetOrCreateLimiter2(pdsID uint, pdsrate float64) *rate.Limiter {
rf.LimitMux.Lock()
defer rf.LimitMux.Unlock()

Expand All @@ -66,6 +67,37 @@ func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Lim
return lim
}

// GetOrCreateLimiter will fetch the pds from the db if needed
// See also GetOrCreateLimiter2 if the pds record is already available.
func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint) *rate.Limiter {
rf.LimitMux.Lock()
lim, ok := rf.Limiters[pdsID]
if ok {
// return limiter already built
rf.LimitMux.Unlock()
return lim
}
// release lock while we do db fetch
rf.LimitMux.Unlock()

var pds models.PDS
if err := rf.db.First(&pds, "id = ?", pdsID).Error; err != nil {
rf.log.Error("failed to find pds", "pdsID", pdsID, "err", err)
return nil
}
nlim := rate.NewLimiter(rate.Limit(pds.CrawlRateLimit), 1)

rf.LimitMux.Lock()
defer rf.LimitMux.Unlock()
lim, ok = rf.Limiters[pdsID]
if ok {
// it was added while we were getting ready
return lim
}
rf.Limiters[pdsID] = nlim
return nlim
}

func (rf *RepoFetcher) SetLimiter(pdsID uint, lim *rate.Limiter) {
rf.LimitMux.Lock()
defer rf.LimitMux.Unlock()
Expand All @@ -83,7 +115,7 @@ func (rf *RepoFetcher) fetchRepo(ctx context.Context, c *xrpc.Client, pds *model
attribute.String("rev", rev),
)

limiter := rf.GetOrCreateLimiter(pds.ID, pds.CrawlRateLimit)
limiter := rf.GetOrCreateLimiter2(pds.ID, pds.CrawlRateLimit)

// Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos
limiter.Wait(ctx)
Expand Down

0 comments on commit d91feca

Please sign in to comment.