Skip to content

Commit

Permalink
remove direct dependence of indexer on repomgr
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Nov 20, 2023
1 parent 9e0e90f commit f1dbaa8
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 159 deletions.
4 changes: 2 additions & 2 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (bgs *BGS) handleListPDSs(e echo.Context) error {
MaxEventsPerSecond: p.CrawlRateLimit,
}

limiter = bgs.Index.GetLimiter(p.ID)
limiter = bgs.repoFetcher.GetLimiter(p.ID)
if limiter != nil {
crawlRate.TokenCount = limiter.Tokens()
}
Expand Down Expand Up @@ -410,7 +410,7 @@ func (bgs *BGS) handleAdminChangePDSCrawlLimit(e echo.Context) error {
}

// Update the crawl limit in the limiter
limiter := bgs.Index.GetOrCreateLimiter(pds.ID, limit)
limiter := bgs.repoFetcher.GetOrCreateLimiter(pds.ID, limit)
limiter.SetLimit(rate.Limit(limit))

return e.JSON(200, map[string]any{
Expand Down
18 changes: 10 additions & 8 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ var log = logging.Logger("bgs")
const serverListenerBootTimeout = 5 * time.Second

type BGS struct {
Index *indexer.Indexer
db *gorm.DB
slurper *Slurper
events *events.EventManager
didr did.Resolver
Index *indexer.Indexer
db *gorm.DB
slurper *Slurper
events *events.EventManager
didr did.Resolver
repoFetcher *indexer.RepoFetcher

blobs blobs.BlobStore
hr api.HandleResolver
Expand Down Expand Up @@ -106,15 +107,16 @@ type SocketConsumer struct {
EventsSent promclient.Counter
}

func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, blobs blobs.BlobStore, hr api.HandleResolver, ssl bool) (*BGS, error) {
func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, blobs blobs.BlobStore, rf *indexer.RepoFetcher, hr api.HandleResolver, ssl bool) (*BGS, error) {
db.AutoMigrate(User{})
db.AutoMigrate(AuthToken{})
db.AutoMigrate(models.PDS{})
db.AutoMigrate(models.DomainBan{})

bgs := &BGS{
Index: ix,
db: db,
Index: ix,
db: db,
repoFetcher: rf,

hr: hr,
repoman: repoman,
Expand Down
6 changes: 4 additions & 2 deletions cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ func Bigsky(cctx *cli.Context) error {

notifman := &notifs.NullNotifs{}

ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, repoman, true, cctx.Bool("spidering"), cctx.Bool("aggregation"))
rf := indexer.NewRepoFetcher(db, repoman)

ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, rf, true, cctx.Bool("spidering"), cctx.Bool("aggregation"))
if err != nil {
return err
}
Expand Down Expand Up @@ -335,7 +337,7 @@ func Bigsky(cctx *cli.Context) error {
}

log.Infow("constructing bgs")
bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, hr, !cctx.Bool("crawl-insecure-ws"))
bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, rf, hr, !cctx.Bool("crawl-insecure-ws"))
if err != nil {
return err
}
Expand Down
145 changes: 2 additions & 143 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package indexer

import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
Expand All @@ -19,13 +17,10 @@ import (
"github.com/bluesky-social/indigo/repomgr"
"github.com/bluesky-social/indigo/util"
"github.com/bluesky-social/indigo/xrpc"
"golang.org/x/time/rate"

"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
Expand All @@ -42,14 +37,8 @@ type Indexer struct {
events *events.EventManager
didr did.Resolver

// TODO: i feel like the repomgr doesnt belong here
repomgr *repomgr.RepoManager

Crawler *CrawlDispatcher

Limiters map[uint]*rate.Limiter
LimitMux sync.RWMutex

doAggregations bool
doSpider bool

Expand All @@ -58,7 +47,7 @@ type Indexer struct {
ApplyPDSClientSettings func(*xrpc.Client)
}

func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, repoman *repomgr.RepoManager, crawl, aggregate, spider bool) (*Indexer, error) {
func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) {
db.AutoMigrate(&models.FeedPost{})
db.AutoMigrate(&models.ActorInfo{})
db.AutoMigrate(&models.FollowRecord{})
Expand All @@ -69,9 +58,7 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events
db: db,
notifman: notifman,
events: evtman,
repomgr: repoman,
didr: didr,
Limiters: make(map[uint]*rate.Limiter),
doAggregations: aggregate,
doSpider: spider,
SendRemoteFollow: func(context.Context, string, uint) error {
Expand All @@ -81,7 +68,7 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events
}

if crawl {
c, err := NewCrawlDispatcher(ix.FetchAndIndexRepo, 10)
c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, 10)
if err != nil {
return nil, err
}
Expand All @@ -93,33 +80,6 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events
return ix, nil
}

func (ix *Indexer) GetLimiter(pdsID uint) *rate.Limiter {
ix.LimitMux.RLock()
defer ix.LimitMux.RUnlock()

return ix.Limiters[pdsID]
}

func (ix *Indexer) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Limiter {
ix.LimitMux.RLock()
defer ix.LimitMux.RUnlock()

lim, ok := ix.Limiters[pdsID]
if !ok {
lim = rate.NewLimiter(rate.Limit(pdsrate), 1)
ix.Limiters[pdsID] = lim
}

return lim
}

func (ix *Indexer) SetLimiter(pdsID uint, lim *rate.Limiter) {
ix.LimitMux.Lock()
defer ix.LimitMux.Unlock()

ix.Limiters[pdsID] = lim
}

func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) error {
ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent")
defer span.End()
Expand Down Expand Up @@ -414,107 +374,6 @@ func isNotFound(err error) bool {
return false
}

func (ix *Indexer) fetchRepo(ctx context.Context, c *xrpc.Client, pds *models.PDS, did string, rev string) ([]byte, error) {
ctx, span := otel.Tracer("indexer").Start(ctx, "fetchRepo")
defer span.End()

span.SetAttributes(
attribute.String("pds", pds.Host),
attribute.String("did", did),
attribute.String("rev", rev),
)

limiter := ix.GetOrCreateLimiter(pds.ID, pds.CrawlRateLimit)

// Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos
limiter.Wait(ctx)

log.Debugw("SyncGetRepo", "did", did, "since", rev)
// TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us
repo, err := comatproto.SyncGetRepo(ctx, c, did, rev)
if err != nil {
reposFetched.WithLabelValues("fail").Inc()
return nil, fmt.Errorf("failed to fetch repo (did=%s,rev=%s,host=%s): %w", did, rev, pds.Host, err)
}
reposFetched.WithLabelValues("success").Inc()

return repo, nil
}

// TODO: since this function is the only place we depend on the repomanager, i wonder if this should be wired some other way?
func (ix *Indexer) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error {
ctx, span := otel.Tracer("indexer").Start(ctx, "FetchAndIndexRepo")
defer span.End()

span.SetAttributes(attribute.Int("catchup", len(job.catchup)))

ai := job.act

var pds models.PDS
if err := ix.db.First(&pds, "id = ?", ai.PDS).Error; err != nil {
return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err)
}

rev, err := ix.repomgr.GetRepoRev(ctx, ai.Uid)
if err != nil && !isNotFound(err) {
return fmt.Errorf("failed to get repo root: %w", err)
}

// attempt to process buffered events
if !job.initScrape && len(job.catchup) > 0 {
first := job.catchup[0]
var resync bool
if first.evt.Since == nil || rev == *first.evt.Since {
for i, j := range job.catchup {
catchupEventsProcessed.Inc()
if err := ix.repomgr.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, j.evt.Since, j.evt.Rev, j.evt.Blocks, j.evt.Ops); err != nil {
log.Errorw("buffered event catchup failed", "error", err, "did", ai.Did, "i", i, "jobCount", len(job.catchup), "seq", j.evt.Seq)
resync = true // fall back to a repo sync
break
}
}

if !resync {
return nil
}
}
}

if rev == "" {
span.SetAttributes(attribute.Bool("full", true))
}

c := models.ClientForPds(&pds)
ix.ApplyPDSClientSettings(c)

repo, err := ix.fetchRepo(ctx, c, &pds, ai.Did, rev)
if err != nil {
return err
}

if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), &rev); err != nil {
span.RecordError(err)

if ipld.IsNotFound(err) {
log.Errorw("partial repo fetch was missing data", "did", ai.Did, "pds", pds.Host, "rev", rev)
repo, err := ix.fetchRepo(ctx, c, &pds, ai.Did, "")
if err != nil {
return err
}

if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), nil); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to import backup repo (%s): %w", ai.Did, err)
}

return nil
}
return fmt.Errorf("importing fetched repo (curRev: %s): %w", rev, err)
}

return nil
}

func (ix *Indexer) GetPost(ctx context.Context, uri string) (*models.FeedPost, error) {
puri, err := util.ParseAtUri(uri)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion indexer/posts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func testIndexer(t *testing.T) *testIx {

didr := testPLC(t)

ix, err := NewIndexer(maindb, notifman, evtman, didr, repoman, false, true, true)
rf := NewRepoFetcher(maindb, repoman)

ix, err := NewIndexer(maindb, notifman, evtman, didr, rf, false, true, true)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit f1dbaa8

Please sign in to comment.