diff --git a/bgs/admin.go b/bgs/admin.go index 15f754dd6..b4cdeb443 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -172,7 +172,7 @@ func (bgs *BGS) handleListPDSs(e echo.Context) error { MaxEventsPerSecond: p.CrawlRateLimit, } - limiter = bgs.Index.GetLimiter(p.ID) + limiter = bgs.repoFetcher.GetLimiter(p.ID) if limiter != nil { crawlRate.TokenCount = limiter.Tokens() } @@ -410,7 +410,7 @@ func (bgs *BGS) handleAdminChangePDSCrawlLimit(e echo.Context) error { } // Update the crawl limit in the limiter - limiter := bgs.Index.GetOrCreateLimiter(pds.ID, limit) + limiter := bgs.repoFetcher.GetOrCreateLimiter(pds.ID, limit) limiter.SetLimit(rate.Limit(limit)) return e.JSON(200, map[string]any{ diff --git a/bgs/bgs.go b/bgs/bgs.go index 1b27e5425..611a993e5 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -55,11 +55,12 @@ var log = logging.Logger("bgs") const serverListenerBootTimeout = 5 * time.Second type BGS struct { - Index *indexer.Indexer - db *gorm.DB - slurper *Slurper - events *events.EventManager - didr did.Resolver + Index *indexer.Indexer + db *gorm.DB + slurper *Slurper + events *events.EventManager + didr did.Resolver + repoFetcher *indexer.RepoFetcher blobs blobs.BlobStore hr api.HandleResolver @@ -106,15 +107,16 @@ type SocketConsumer struct { EventsSent promclient.Counter } -func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, blobs blobs.BlobStore, hr api.HandleResolver, ssl bool) (*BGS, error) { +func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, blobs blobs.BlobStore, rf *indexer.RepoFetcher, hr api.HandleResolver, ssl bool) (*BGS, error) { db.AutoMigrate(User{}) db.AutoMigrate(AuthToken{}) db.AutoMigrate(models.PDS{}) db.AutoMigrate(models.DomainBan{}) bgs := &BGS{ - Index: ix, - db: db, + Index: ix, + db: db, + repoFetcher: rf, hr: hr, repoman: repoman, diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index e547e6b98..69e3dd38c 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -283,7 +283,9 @@ func Bigsky(cctx *cli.Context) error { notifman := ¬ifs.NullNotifs{} - ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, repoman, true, cctx.Bool("spidering"), cctx.Bool("aggregation")) + rf := indexer.NewRepoFetcher(db, repoman) + + ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, rf, true, cctx.Bool("spidering"), cctx.Bool("aggregation")) if err != nil { return err } @@ -335,7 +337,7 @@ func Bigsky(cctx *cli.Context) error { } log.Infow("constructing bgs") - bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, hr, !cctx.Bool("crawl-insecure-ws")) + bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, rf, hr, !cctx.Bool("crawl-insecure-ws")) if err != nil { return err } diff --git a/indexer/indexer.go b/indexer/indexer.go index c1bbbdc07..5207bcef0 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -1,12 +1,10 @@ package indexer import ( - "bytes" "context" "database/sql" "errors" "fmt" - "sync" "time" comatproto "github.com/bluesky-social/indigo/api/atproto" @@ -19,13 +17,10 @@ import ( "github.com/bluesky-social/indigo/repomgr" "github.com/bluesky-social/indigo/util" "github.com/bluesky-social/indigo/xrpc" - "golang.org/x/time/rate" "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -42,14 +37,8 @@ type Indexer struct { events *events.EventManager didr did.Resolver - // TODO: i feel like the repomgr doesnt belong here - repomgr *repomgr.RepoManager - Crawler *CrawlDispatcher - Limiters map[uint]*rate.Limiter - LimitMux sync.RWMutex - doAggregations bool doSpider bool @@ -58,7 +47,7 @@ type Indexer struct { ApplyPDSClientSettings func(*xrpc.Client) } -func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, repoman *repomgr.RepoManager, crawl, aggregate, spider bool) (*Indexer, error) { +func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) { db.AutoMigrate(&models.FeedPost{}) db.AutoMigrate(&models.ActorInfo{}) db.AutoMigrate(&models.FollowRecord{}) @@ -69,9 +58,7 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events db: db, notifman: notifman, events: evtman, - repomgr: repoman, didr: didr, - Limiters: make(map[uint]*rate.Limiter), doAggregations: aggregate, doSpider: spider, SendRemoteFollow: func(context.Context, string, uint) error { @@ -81,7 +68,7 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events } if crawl { - c, err := NewCrawlDispatcher(ix.FetchAndIndexRepo, 10) + c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, 10) if err != nil { return nil, err } @@ -93,33 +80,6 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events return ix, nil } -func (ix *Indexer) GetLimiter(pdsID uint) *rate.Limiter { - ix.LimitMux.RLock() - defer ix.LimitMux.RUnlock() - - return ix.Limiters[pdsID] -} - -func (ix *Indexer) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Limiter { - ix.LimitMux.RLock() - defer ix.LimitMux.RUnlock() - - lim, ok := ix.Limiters[pdsID] - if !ok { - lim = rate.NewLimiter(rate.Limit(pdsrate), 1) - ix.Limiters[pdsID] = lim - } - - return lim -} - -func (ix *Indexer) SetLimiter(pdsID uint, lim *rate.Limiter) { - ix.LimitMux.Lock() - defer ix.LimitMux.Unlock() - - ix.Limiters[pdsID] = lim -} - func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) error { ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent") defer span.End() @@ -414,107 +374,6 @@ func isNotFound(err error) bool { return false } -func (ix *Indexer) fetchRepo(ctx context.Context, c *xrpc.Client, pds *models.PDS, did string, rev string) ([]byte, error) { - ctx, span := otel.Tracer("indexer").Start(ctx, "fetchRepo") - defer span.End() - - span.SetAttributes( - attribute.String("pds", pds.Host), - attribute.String("did", did), - attribute.String("rev", rev), - ) - - limiter := ix.GetOrCreateLimiter(pds.ID, pds.CrawlRateLimit) - - // Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos - limiter.Wait(ctx) - - log.Debugw("SyncGetRepo", "did", did, "since", rev) - // TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us - repo, err := comatproto.SyncGetRepo(ctx, c, did, rev) - if err != nil { - reposFetched.WithLabelValues("fail").Inc() - return nil, fmt.Errorf("failed to fetch repo (did=%s,rev=%s,host=%s): %w", did, rev, pds.Host, err) - } - reposFetched.WithLabelValues("success").Inc() - - return repo, nil -} - -// TODO: since this function is the only place we depend on the repomanager, i wonder if this should be wired some other way? -func (ix *Indexer) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error { - ctx, span := otel.Tracer("indexer").Start(ctx, "FetchAndIndexRepo") - defer span.End() - - span.SetAttributes(attribute.Int("catchup", len(job.catchup))) - - ai := job.act - - var pds models.PDS - if err := ix.db.First(&pds, "id = ?", ai.PDS).Error; err != nil { - return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err) - } - - rev, err := ix.repomgr.GetRepoRev(ctx, ai.Uid) - if err != nil && !isNotFound(err) { - return fmt.Errorf("failed to get repo root: %w", err) - } - - // attempt to process buffered events - if !job.initScrape && len(job.catchup) > 0 { - first := job.catchup[0] - var resync bool - if first.evt.Since == nil || rev == *first.evt.Since { - for i, j := range job.catchup { - catchupEventsProcessed.Inc() - if err := ix.repomgr.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, j.evt.Since, j.evt.Rev, j.evt.Blocks, j.evt.Ops); err != nil { - log.Errorw("buffered event catchup failed", "error", err, "did", ai.Did, "i", i, "jobCount", len(job.catchup), "seq", j.evt.Seq) - resync = true // fall back to a repo sync - break - } - } - - if !resync { - return nil - } - } - } - - if rev == "" { - span.SetAttributes(attribute.Bool("full", true)) - } - - c := models.ClientForPds(&pds) - ix.ApplyPDSClientSettings(c) - - repo, err := ix.fetchRepo(ctx, c, &pds, ai.Did, rev) - if err != nil { - return err - } - - if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), &rev); err != nil { - span.RecordError(err) - - if ipld.IsNotFound(err) { - log.Errorw("partial repo fetch was missing data", "did", ai.Did, "pds", pds.Host, "rev", rev) - repo, err := ix.fetchRepo(ctx, c, &pds, ai.Did, "") - if err != nil { - return err - } - - if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), nil); err != nil { - span.RecordError(err) - return fmt.Errorf("failed to import backup repo (%s): %w", ai.Did, err) - } - - return nil - } - return fmt.Errorf("importing fetched repo (curRev: %s): %w", rev, err) - } - - return nil -} - func (ix *Indexer) GetPost(ctx context.Context, uri string) (*models.FeedPost, error) { puri, err := util.ParseAtUri(uri) if err != nil { diff --git a/indexer/posts_test.go b/indexer/posts_test.go index cf79ff9c7..19c14b407 100644 --- a/indexer/posts_test.go +++ b/indexer/posts_test.go @@ -61,7 +61,9 @@ func testIndexer(t *testing.T) *testIx { didr := testPLC(t) - ix, err := NewIndexer(maindb, notifman, evtman, didr, repoman, false, true, true) + rf := NewRepoFetcher(maindb, repoman) + + ix, err := NewIndexer(maindb, notifman, evtman, didr, rf, false, true, true) if err != nil { t.Fatal(err) } diff --git a/indexer/repofetch.go b/indexer/repofetch.go new file mode 100644 index 000000000..b378949b8 --- /dev/null +++ b/indexer/repofetch.go @@ -0,0 +1,165 @@ +package indexer + +import ( + "bytes" + "context" + "fmt" + "sync" + + "github.com/bluesky-social/indigo/api/atproto" + "github.com/bluesky-social/indigo/models" + "github.com/bluesky-social/indigo/repomgr" + "github.com/bluesky-social/indigo/xrpc" + ipld "github.com/ipfs/go-ipld-format" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "golang.org/x/time/rate" + "gorm.io/gorm" +) + +func NewRepoFetcher(db *gorm.DB, rm *repomgr.RepoManager) *RepoFetcher { + return &RepoFetcher{ + repoman: rm, + db: db, + Limiters: make(map[uint]*rate.Limiter), + ApplyPDSClientSettings: func(*xrpc.Client) {}, + } +} + +type RepoFetcher struct { + repoman *repomgr.RepoManager + db *gorm.DB + + Limiters map[uint]*rate.Limiter + LimitMux sync.RWMutex + + ApplyPDSClientSettings func(*xrpc.Client) +} + +func (rf *RepoFetcher) GetLimiter(pdsID uint) *rate.Limiter { + rf.LimitMux.RLock() + defer rf.LimitMux.RUnlock() + + return rf.Limiters[pdsID] +} + +func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Limiter { + rf.LimitMux.RLock() + defer rf.LimitMux.RUnlock() + + lim, ok := rf.Limiters[pdsID] + if !ok { + lim = rate.NewLimiter(rate.Limit(pdsrate), 1) + rf.Limiters[pdsID] = lim + } + + return lim +} + +func (rf *RepoFetcher) SetLimiter(pdsID uint, lim *rate.Limiter) { + rf.LimitMux.Lock() + defer rf.LimitMux.Unlock() + + rf.Limiters[pdsID] = lim +} + +func (rf *RepoFetcher) fetchRepo(ctx context.Context, c *xrpc.Client, pds *models.PDS, did string, rev string) ([]byte, error) { + ctx, span := otel.Tracer("indexer").Start(ctx, "fetchRepo") + defer span.End() + + span.SetAttributes( + attribute.String("pds", pds.Host), + attribute.String("did", did), + attribute.String("rev", rev), + ) + + limiter := rf.GetOrCreateLimiter(pds.ID, pds.CrawlRateLimit) + + // Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos + limiter.Wait(ctx) + + log.Debugw("SyncGetRepo", "did", did, "since", rev) + // TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us + repo, err := atproto.SyncGetRepo(ctx, c, did, rev) + if err != nil { + reposFetched.WithLabelValues("fail").Inc() + return nil, fmt.Errorf("failed to fetch repo (did=%s,rev=%s,host=%s): %w", did, rev, pds.Host, err) + } + reposFetched.WithLabelValues("success").Inc() + + return repo, nil +} + +// TODO: since this function is the only place we depend on the repomanager, i wonder if this should be wired some other way? +func (rf *RepoFetcher) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error { + ctx, span := otel.Tracer("indexer").Start(ctx, "FetchAndIndexRepo") + defer span.End() + + span.SetAttributes(attribute.Int("catchup", len(job.catchup))) + + ai := job.act + + var pds models.PDS + if err := rf.db.First(&pds, "id = ?", ai.PDS).Error; err != nil { + return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err) + } + + rev, err := rf.repoman.GetRepoRev(ctx, ai.Uid) + if err != nil && !isNotFound(err) { + return fmt.Errorf("failed to get repo root: %w", err) + } + + // attempt to process buffered events + if !job.initScrape && len(job.catchup) > 0 { + first := job.catchup[0] + var resync bool + if first.evt.Since == nil || rev == *first.evt.Since { + for i, j := range job.catchup { + catchupEventsProcessed.Inc() + if err := rf.repoman.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, j.evt.Since, j.evt.Rev, j.evt.Blocks, j.evt.Ops); err != nil { + log.Errorw("buffered event catchup failed", "error", err, "did", ai.Did, "i", i, "jobCount", len(job.catchup), "seq", j.evt.Seq) + resync = true // fall back to a repo sync + break + } + } + + if !resync { + return nil + } + } + } + + if rev == "" { + span.SetAttributes(attribute.Bool("full", true)) + } + + c := models.ClientForPds(&pds) + rf.ApplyPDSClientSettings(c) + + repo, err := rf.fetchRepo(ctx, c, &pds, ai.Did, rev) + if err != nil { + return err + } + + if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), &rev); err != nil { + span.RecordError(err) + + if ipld.IsNotFound(err) { + log.Errorw("partial repo fetch was missing data", "did", ai.Did, "pds", pds.Host, "rev", rev) + repo, err := rf.fetchRepo(ctx, c, &pds, ai.Did, "") + if err != nil { + return err + } + + if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), nil); err != nil { + span.RecordError(err) + return fmt.Errorf("failed to import backup repo (%s): %w", ai.Did, err) + } + + return nil + } + return fmt.Errorf("importing fetched repo (curRev: %s): %w", rev, err) + } + + return nil +} diff --git a/pds/server.go b/pds/server.go index 07abe03e5..7afdf5376 100644 --- a/pds/server.go +++ b/pds/server.go @@ -80,7 +80,9 @@ func NewServer(db *gorm.DB, cs *carstore.CarStore, serkey *did.PrivKey, handleSu repoman := repomgr.NewRepoManager(cs, kmgr) notifman := notifs.NewNotificationManager(db, repoman.GetRecord) - ix, err := indexer.NewIndexer(db, notifman, evtman, didr, repoman, false, true, true) + rf := indexer.NewRepoFetcher(db, repoman) + + ix, err := indexer.NewIndexer(db, notifman, evtman, didr, rf, false, true, true) if err != nil { return nil, err } diff --git a/testing/utils.go b/testing/utils.go index ee515979e..3731d76f9 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -437,8 +437,9 @@ func SetupBGS(ctx context.Context, didr plc.PLCClient) (*TestBGS, error) { diskpersist, err := events.NewDiskPersistence(filepath.Join(dir, "dp-primary"), filepath.Join(dir, "dp-archive"), maindb, opts) evtman := events.NewEventManager(diskpersist) + rf := indexer.NewRepoFetcher(maindb, repoman) - ix, err := indexer.NewIndexer(maindb, notifman, evtman, didr, repoman, true, true, true) + ix, err := indexer.NewIndexer(maindb, notifman, evtman, didr, rf, true, true, true) if err != nil { return nil, err } @@ -451,7 +452,7 @@ func SetupBGS(ctx context.Context, didr plc.PLCClient) (*TestBGS, error) { tr := &api.TestHandleResolver{} - b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, nil, tr, false) + b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, nil, rf, tr, false) if err != nil { return nil, err }