Skip to content

Commit

Permalink
indexer_scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Dec 17, 2024
1 parent 377c10a commit bdb8550
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 54 deletions.
1 change: 1 addition & 0 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ func stringLink(lnk *lexutil.LexLink) string {
return lnk.String()
}

// called from fedmgr.go Slurper.handleConnection() through .cb
func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error {
ctx, span := tracer.Start(ctx, "handleFedEvent")
defer span.End()
Expand Down
204 changes: 151 additions & 53 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package indexer

import (
"container/heap"
"context"
"fmt"
"log/slog"
"sync"
"time"

"golang.org/x/time/rate"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/models"

Expand All @@ -16,8 +19,6 @@ import (
type CrawlDispatcher struct {
ingest chan *models.ActorInfo

repoSync chan *crawlWork

catchup chan *crawlWork

complete chan models.Uid
Expand All @@ -26,32 +27,45 @@ type CrawlDispatcher struct {
todo map[models.Uid]*crawlWork
inProgress map[models.Uid]*crawlWork

doRepoCrawl func(context.Context, *crawlWork) error
repoFetcher CrawlRepoFetcher

concurrency int

repoSyncHeap []*crawlWork
// map [pdsID] *crawlWork pending jobs for that PDS, head of linked list on .nextInPds
repoSyncPds map[uint]*crawlWork
repoSyncLock sync.Mutex
repoSyncCond sync.Cond

log *slog.Logger

done chan struct{}
}

func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) {
// this is what we need of RepoFetcher, made interface so it can be passed in without dependency
type CrawlRepoFetcher interface {
FetchAndIndexRepo(ctx context.Context, job *crawlWork) error
GetOrCreateLimiterLazy(pdsID uint) *rate.Limiter
}

func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) {
if concurrency < 1 {
return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency")
}

out := &CrawlDispatcher{
ingest: make(chan *models.ActorInfo),
repoSync: make(chan *crawlWork),
complete: make(chan models.Uid),
catchup: make(chan *crawlWork),
doRepoCrawl: repoFn,
repoFetcher: repoFetcher,
concurrency: concurrency,
todo: make(map[models.Uid]*crawlWork),
inProgress: make(map[models.Uid]*crawlWork),
log: log,
done: make(chan struct{}),
repoSyncPds: make(map[uint]*crawlWork),
}
out.repoSyncCond.L = &out.repoSyncLock
go out.CatchupRepoGaugePoller()

return out, nil
Expand Down Expand Up @@ -86,49 +100,22 @@ type crawlWork struct {
// for events that come in while this actor is being processed
// next items are processed after the crawl
next []*catchupJob

eligibleTime time.Time
nextInPds *crawlWork
alreadyEnheaped bool
}

func (c *CrawlDispatcher) mainLoop() {
var nextDispatchedJob *crawlWork
var jobsAwaitingDispatch []*crawlWork

// dispatchQueue represents the repoSync worker channel to which we dispatch crawl work
var dispatchQueue chan *crawlWork

for {
var crawlJob *crawlWork = nil
select {
case actorToCrawl := <-c.ingest:
// TODO: max buffer size
crawlJob := c.enqueueJobForActor(actorToCrawl)
if crawlJob == nil {
break
}

if nextDispatchedJob == nil {
nextDispatchedJob = crawlJob
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, crawlJob)
}
case dispatchQueue <- nextDispatchedJob:
c.dequeueJob(nextDispatchedJob)

if len(jobsAwaitingDispatch) > 0 {
nextDispatchedJob = jobsAwaitingDispatch[0]
jobsAwaitingDispatch = jobsAwaitingDispatch[1:]
} else {
nextDispatchedJob = nil
dispatchQueue = nil
}
case catchupJob := <-c.catchup:
crawlJob = c.enqueueJobForActor(actorToCrawl)
case crawlJob = <-c.catchup:
// CatchupJobs are for processing events that come in while a crawl is in progress
// They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress
if nextDispatchedJob == nil {
nextDispatchedJob = catchupJob
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, catchupJob)
}
case uid := <-c.complete:
c.maplk.Lock()

Expand All @@ -145,15 +132,21 @@ func (c *CrawlDispatcher) mainLoop() {
job.initScrape = false
job.catchup = job.next
job.next = nil
if nextDispatchedJob == nil {
nextDispatchedJob = job
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, job)
}
crawlJob = job
}
c.maplk.Unlock()
}
if crawlJob != nil {
pdsID := crawlJob.act.PDS
limiter := c.repoFetcher.GetOrCreateLimiterLazy(pdsID)
now := time.Now()
wouldDelay := limiter.ReserveN(now, 1).DelayFrom(now)
crawlJob.eligibleTime = now.Add(wouldDelay)
// put crawl job on heap sorted by eligible time
c.enheapJob(crawlJob)
c.dequeueJob(crawlJob)
crawlJob = nil
}
}
}

Expand Down Expand Up @@ -219,14 +212,18 @@ func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork {

func (c *CrawlDispatcher) fetchWorker() {
for {
select {
case job := <-c.repoSync:
if err := c.doRepoCrawl(context.TODO(), job); err != nil {
c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
}
job := c.nextJob()
nextInPds := job.nextInPds
job.nextInPds = nil
if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
}

// TODO: do we still just do this if it errors?
c.complete <- job.act.Uid
// TODO: do we still just do this if it errors?
c.complete <- job.act.Uid

if nextInPds != nil {
c.enheapJob(nextInPds)
}
}
}
Expand Down Expand Up @@ -304,3 +301,104 @@ func (c *CrawlDispatcher) CatchupRepoGaugePoller() {
}
}
}

// priority-queue for crawlJob based on eligibleTime
func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) {
if crawlJob.alreadyEnheaped {
c.log.Error("CrawlDispatcher trying to enheap alreadyEnheaped", "pds", crawlJob.alreadyEnheaped, "uid", crawlJob.act.Uid)
}
c.repoSyncLock.Lock()
defer c.repoSyncLock.Unlock()
pdsJobs, has := c.repoSyncPds[crawlJob.act.PDS]
if has {
if !pdsJobs.alreadyEnheaped {
heap.Push(c, crawlJob)
catchupPending.Set(float64(len(c.repoSyncHeap)))
c.repoSyncCond.Signal()
pdsJobs.alreadyEnheaped = true
}
if pdsJobs == crawlJob {
return
}
for pdsJobs.nextInPds != nil {
pdsJobs = pdsJobs.nextInPds
if pdsJobs == crawlJob {
// we re-enheap something later? weird but okay?
return
}
}
pdsJobs.nextInPds = crawlJob
return
} else {
c.repoSyncPds[crawlJob.act.PDS] = crawlJob
}
if !crawlJob.alreadyEnheaped {
heap.Push(c, crawlJob)
catchupPending.Set(float64(len(c.repoSyncHeap)))
c.repoSyncCond.Signal()
crawlJob.alreadyEnheaped = true
}
}

// nextJob returns next available crawlJob based on eligibleTime; block until some available.
// The caller of .nextJob() should .enheapJob(crawlJob.nextInPds) if any after executing crawlJob's work
//
// There's a tiny race where .nextJob() could return the only work for a PDS,
// outside event could .enheapJob() a next one for that PDS,
// get enheaped as available immediately because the rate limiter hasn't ticked the work done from .nextJob() above,
// and then the worker trying to execute the next enheaped work for the PDS would execute immediately but Sleep() to wait for the rate limiter.
// We will call this 'not too bad', 'good enough for now'. -- bolson 2024-11
func (c *CrawlDispatcher) nextJob() *crawlWork {
c.repoSyncLock.Lock()
defer c.repoSyncLock.Unlock()
for len(c.repoSyncHeap) == 0 {
c.repoSyncCond.Wait()
}
x := heap.Pop(c)
catchupPending.Set(float64(len(c.repoSyncHeap)))
crawlJob := x.(*crawlWork)
if crawlJob.nextInPds != nil {
prev := c.repoSyncPds[crawlJob.act.PDS]
if prev != crawlJob {
c.log.Error("CrawlDispatcher internal: pds next is not next in eligible heap", "pds", crawlJob.act.PDS)
}
}
delete(c.repoSyncPds, crawlJob.act.PDS)
crawlJob.alreadyEnheaped = false
return crawlJob
}

// part of container/heap.Interface and sort.Interface
// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS
func (c *CrawlDispatcher) Len() int {
return len(c.repoSyncHeap)
}

// part of container/heap.Interface and sort.Interface
// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS
func (c *CrawlDispatcher) Less(i, j int) bool {
return c.repoSyncHeap[i].eligibleTime.Before(c.repoSyncHeap[j].eligibleTime)
}

// part of container/heap.Interface and sort.Interface
// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS
func (c *CrawlDispatcher) Swap(i, j int) {
t := c.repoSyncHeap[i]
c.repoSyncHeap[i] = c.repoSyncHeap[j]
c.repoSyncHeap[j] = t
}

// part of container/heap.Interface
// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS
func (c *CrawlDispatcher) Push(x any) {
c.repoSyncHeap = append(c.repoSyncHeap, x.(*crawlWork))
}

// part of container/heap.Interface
// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS
func (c *CrawlDispatcher) Pop() any {
heaplen := len(c.repoSyncHeap)
out := c.repoSyncHeap[heaplen-1]
c.repoSyncHeap = c.repoSyncHeap[:heaplen-1]
return out
}
2 changes: 1 addition & 1 deletion indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events
}

if crawl {
c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency, ix.log)
c, err := NewCrawlDispatcher(fetcher, fetcher.MaxConcurrency, ix.log)
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions indexer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,8 @@ var catchupReposGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "indexer_catchup_repos",
Help: "Number of repos waiting on catchup",
})

var catchupPending = promauto.NewGauge(prometheus.GaugeOpts{
Name: "indexer_catchup_pending",
Help: "Number of catchup pending in heap before cd worker",
})
20 changes: 20 additions & 0 deletions indexer/repofetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,26 @@ func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Lim
return lim
}

// GetOrCreateLimiterLazy is GetOrCreateLimiter with a lazy fetch of PDS from database if needed
func (rf *RepoFetcher) GetOrCreateLimiterLazy(pdsID uint) *rate.Limiter {
rf.LimitMux.RLock()
defer rf.LimitMux.RUnlock()

lim, ok := rf.Limiters[pdsID]
if !ok {
// TODO: single source from DefaultCrawlLimit from Slurper or its config source
pdsrate := float64(5)
var pds models.PDS
if err := rf.db.First(&pds, "id = ?", pdsID).Error; err == nil {
pdsrate = pds.CrawlRateLimit
}
lim = rate.NewLimiter(rate.Limit(pdsrate), 1)
rf.Limiters[pdsID] = lim
}

return lim
}

func (rf *RepoFetcher) SetLimiter(pdsID uint, lim *rate.Limiter) {
rf.LimitMux.Lock()
defer rf.LimitMux.Unlock()
Expand Down

0 comments on commit bdb8550

Please sign in to comment.