diff --git a/backfill/backfill.go b/backfill/backfill.go new file mode 100644 index 000000000..24404a6d6 --- /dev/null +++ b/backfill/backfill.go @@ -0,0 +1,405 @@ +package backfill + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net/http" + "sync" + "time" + + // Blank import to register types for CBORGEN + _ "github.com/bluesky-social/indigo/api/bsky" + lexutil "github.com/bluesky-social/indigo/lex/util" + "github.com/bluesky-social/indigo/repo" + "github.com/bluesky-social/indigo/repomgr" + "github.com/ipfs/go-cid" + typegen "github.com/whyrusleeping/cbor-gen" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "golang.org/x/time/rate" +) + +// Job is an interface for a backfill job +type Job interface { + Repo() string + State() string + SetState(ctx context.Context, state string) error + + // FlushBufferedOps calls the given callback for each buffered operation + // Once done it clears the buffer and marks the job as "complete" + // Allowing the Job interface to abstract away the details of how buffered + // operations are stored and/or locked + FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error + + ClearBufferedOps(ctx context.Context) error +} + +// Store is an interface for a backfill store which holds Jobs +type Store interface { + // BufferOp buffers an operation for a job and returns true if the operation was buffered + // If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete) + BufferOp(ctx context.Context, repo string, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) + GetJob(ctx context.Context, repo string) (Job, error) + GetNextEnqueuedJob(ctx context.Context) (Job, error) +} + +// Backfiller is a struct which handles backfilling a repo +type Backfiller struct { + Name string + HandleCreateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error + HandleUpdateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error + HandleDeleteRecord func(ctx context.Context, repo string, path string) error + Store Store + + // Number of backfills to process in parallel + ParallelBackfills int + // Number of records to process in parallel for each backfill + ParallelRecordCreates int + // Prefix match for records to backfill i.e. app.bsky.feed.app/ + // If empty, all records will be backfilled + NSIDFilter string + CheckoutPath string + + syncLimiter *rate.Limiter + + magicHeaderKey string + magicHeaderVal string + + stop chan chan struct{} +} + +var ( + // StateEnqueued is the state of a backfill job when it is first created + StateEnqueued = "enqueued" + // StateInProgress is the state of a backfill job when it is being processed + StateInProgress = "in_progress" + // StateComplete is the state of a backfill job when it has been processed + StateComplete = "complete" +) + +// ErrJobComplete is returned when trying to buffer an op for a job that is complete +var ErrJobComplete = errors.New("job is complete") + +// ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist +var ErrJobNotFound = errors.New("job not found") + +var tracer = otel.Tracer("backfiller") + +type BackfillOptions struct { + ParallelBackfills int + ParallelRecordCreates int + NSIDFilter string + SyncRequestsPerSecond int + CheckoutPath string +} + +func DefaultBackfillOptions() *BackfillOptions { + return &BackfillOptions{ + ParallelBackfills: 10, + ParallelRecordCreates: 100, + NSIDFilter: "", + SyncRequestsPerSecond: 2, + CheckoutPath: "https://bsky.social/xrpc/com.atproto.sync.getCheckout", + } +} + +// NewBackfiller creates a new Backfiller +func NewBackfiller( + name string, + store Store, + handleCreate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, + handleUpdate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, + handleDelete func(ctx context.Context, repo string, path string) error, + opts *BackfillOptions, +) *Backfiller { + if opts == nil { + opts = DefaultBackfillOptions() + } + return &Backfiller{ + Name: name, + Store: store, + HandleCreateRecord: handleCreate, + HandleUpdateRecord: handleUpdate, + HandleDeleteRecord: handleDelete, + ParallelBackfills: opts.ParallelBackfills, + ParallelRecordCreates: opts.ParallelRecordCreates, + NSIDFilter: opts.NSIDFilter, + syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), 1), + CheckoutPath: opts.CheckoutPath, + stop: make(chan chan struct{}), + } +} + +// Start starts the backfill processor routine +func (b *Backfiller) Start() { + ctx := context.Background() + + log := slog.With("source", "backfiller", "name", b.Name) + log.Info("starting backfill processor") + + sem := make(chan struct{}, b.ParallelBackfills) + + for { + select { + case stopped := <-b.stop: + log.Info("stopping backfill processor") + close(stopped) + return + default: + } + + // Get the next job + job, err := b.Store.GetNextEnqueuedJob(ctx) + if err != nil { + log.Error("failed to get next enqueued job", "error", err) + time.Sleep(1 * time.Second) + continue + } else if job == nil { + time.Sleep(1 * time.Second) + continue + } + + // Mark the backfill as "in progress" + err = job.SetState(ctx, StateInProgress) + if err != nil { + log.Error("failed to set job state", "error", err) + continue + } + + sem <- struct{}{} + go func(j Job) { + b.BackfillRepo(ctx, j) + backfillJobsProcessed.WithLabelValues(b.Name).Inc() + <-sem + }(job) + } +} + +// Stop stops the backfill processor +func (b *Backfiller) Stop() { + log := slog.With("source", "backfiller", "name", b.Name) + log.Info("stopping backfill processor") + stopped := make(chan struct{}) + b.stop <- stopped + <-stopped + log.Info("backfill processor stopped") +} + +// FlushBuffer processes buffered operations for a job +func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { + ctx, span := tracer.Start(ctx, "FlushBuffer") + defer span.End() + log := slog.With("source", "backfiller_buffer_flush", "repo", job.Repo()) + + processed := 0 + + repo := job.Repo() + + // Flush buffered operations, clear the buffer, and mark the job as "complete" + // Clearning and marking are handled by the job interface + err := job.FlushBufferedOps(ctx, func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { + switch repomgr.EventKind(kind) { + case repomgr.EvtKindCreateRecord: + err := b.HandleCreateRecord(ctx, repo, path, rec, cid) + if err != nil { + log.Error("failed to handle create record", "error", err) + } + case repomgr.EvtKindUpdateRecord: + err := b.HandleUpdateRecord(ctx, repo, path, rec, cid) + if err != nil { + log.Error("failed to handle update record", "error", err) + } + case repomgr.EvtKindDeleteRecord: + err := b.HandleDeleteRecord(ctx, repo, path) + if err != nil { + log.Error("failed to handle delete record", "error", err) + } + } + backfillOpsBuffered.WithLabelValues(b.Name).Dec() + processed++ + return nil + }) + if err != nil { + log.Error("failed to flush buffered ops", "error", err) + } + + return processed +} + +type recordQueueItem struct { + recordPath string + nodeCid cid.Cid +} + +type recordResult struct { + recordPath string + err error +} + +// BackfillRepo backfills a repo +func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { + ctx, span := tracer.Start(ctx, "BackfillRepo") + defer span.End() + + start := time.Now() + + repoDid := job.Repo() + + log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid) + log.Info(fmt.Sprintf("processing backfill for %s", repoDid)) + + url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) + + // GET and CAR decode the body + client := &http.Client{ + Transport: otelhttp.NewTransport(http.DefaultTransport), + Timeout: 120 * time.Second, + } + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + log.Error("failed to create request", "error", err) + return + } + + req.Header.Set("Accept", "application/vnd.ipld.car") + req.Header.Set("User-Agent", fmt.Sprintf("atproto-backfill-%s/0.0.1", b.Name)) + if b.magicHeaderKey != "" && b.magicHeaderVal != "" { + req.Header.Set(b.magicHeaderKey, b.magicHeaderVal) + } + + b.syncLimiter.Wait(ctx) + + resp, err := client.Do(req) + if err != nil { + log.Error("failed to send request", "error", err) + return + } + + if resp.StatusCode != http.StatusOK { + log.Info("failed to get repo", "status", resp.StatusCode) + reason := "unknown error" + if resp.StatusCode == http.StatusBadRequest { + reason = "repo not found" + } + state := fmt.Sprintf("failed (%s)", reason) + + // Mark the job as "failed" + err := job.SetState(ctx, state) + if err != nil { + log.Error("failed to set job state", "error", err) + } + + // Clear buffered ops + err = job.ClearBufferedOps(ctx) + if err != nil { + log.Error("failed to clear buffered ops", "error", err) + } + return + } + + instrumentedReader := instrumentedReader{ + source: resp.Body, + counter: backfillBytesProcessed.WithLabelValues(b.Name), + } + + defer instrumentedReader.Close() + + r, err := repo.ReadRepoFromCar(ctx, instrumentedReader) + if err != nil { + log.Error("failed to read repo from car", "error", err) + + state := "failed (couldn't read repo CAR from response body)" + + // Mark the job as "failed" + err := job.SetState(ctx, state) + if err != nil { + log.Error("failed to set job state", "error", err) + } + + // Clear buffered ops + err = job.ClearBufferedOps(ctx) + if err != nil { + log.Error("failed to clear buffered ops", "error", err) + } + return + } + + numRecords := 0 + numRoutines := b.ParallelRecordCreates + recordQueue := make(chan recordQueueItem, numRoutines) + recordResults := make(chan recordResult, numRoutines) + + wg := sync.WaitGroup{} + + // Producer routine + go func() { + defer close(recordQueue) + r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error { + numRecords++ + recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid} + return nil + }) + }() + + // Consumer routines + for i := 0; i < numRoutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for item := range recordQueue { + blk, err := r.Blockstore().Get(ctx, item.nodeCid) + if err != nil { + recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get blocks for record: %w", err)} + continue + } + rec, err := lexutil.CborDecodeValue(blk.RawData()) + if err != nil { + recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to decode record: %w", err)} + continue + } + + recM, ok := rec.(typegen.CBORMarshaler) + if !ok { + recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to cast record to CBORMarshaler")} + continue + } + + err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, &recM, &item.nodeCid) + if err != nil { + recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} + continue + } + + backfillRecordsProcessed.WithLabelValues(b.Name).Inc() + recordResults <- recordResult{recordPath: item.recordPath, err: err} + } + }() + } + + resultWG := sync.WaitGroup{} + resultWG.Add(1) + // Handle results + go func() { + defer resultWG.Done() + for result := range recordResults { + if result.err != nil { + log.Error("Error processing record", "record", result.recordPath, "error", result.err) + } + } + }() + + wg.Wait() + close(recordResults) + resultWG.Wait() + + // Process buffered operations, marking the job as "complete" when done + numProcessed := b.FlushBuffer(ctx, job) + + log.Info("backfill complete", + "buffered_records_processed", numProcessed, + "records_backfilled", numRecords, + "duration", time.Since(start), + ) +} diff --git a/backfill/backfill_test.go b/backfill/backfill_test.go new file mode 100644 index 000000000..4c7f7f4b7 --- /dev/null +++ b/backfill/backfill_test.go @@ -0,0 +1,113 @@ +package backfill_test + +import ( + "context" + "log/slog" + "sync" + "testing" + "time" + + "github.com/bluesky-social/indigo/backfill" + "github.com/ipfs/go-cid" + typegen "github.com/whyrusleeping/cbor-gen" +) + +type testState struct { + creates int + updates int + deletes int + lk sync.Mutex +} + +func TestBackfill(t *testing.T) { + ctx := context.Background() + + testRepos := []string{ + "did:plc:q6gjnaw2blty4crticxkmujt", + "did:plc:f5f4diimystr7ima7nqvamhe", + "did:plc:t7y4sud4dhptvzz7ibnv5cbt", + } + + mem := backfill.NewMemstore() + ts := &testState{} + + opts := backfill.DefaultBackfillOptions() + opts.NSIDFilter = "app.bsky.feed.follow/" + + bf := backfill.NewBackfiller( + "backfill-test", + mem, + ts.handleCreate, + ts.handleUpdate, + ts.handleDelete, + opts, + ) + + slog.Info("starting backfiller") + + go bf.Start() + + for _, repo := range testRepos { + mem.EnqueueJob(repo) + } + + // Wait until job 0 is in progress + for { + s, err := mem.GetJob(ctx, testRepos[0]) + if err != nil { + t.Fatal(err) + } + if s.State() == backfill.StateInProgress { + mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/1", nil, &cid.Undef) + mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/2", nil, &cid.Undef) + mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/3", nil, &cid.Undef) + mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/4", nil, &cid.Undef) + mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/5", nil, &cid.Undef) + + mem.BufferOp(ctx, testRepos[0], "create", "app.bsky.feed.follow/1", nil, &cid.Undef) + + mem.BufferOp(ctx, testRepos[0], "update", "app.bsky.feed.follow/1", nil, &cid.Undef) + + break + } + time.Sleep(100 * time.Millisecond) + } + + for { + ts.lk.Lock() + if ts.deletes >= 5 && ts.creates >= 1 && ts.updates >= 1 { + ts.lk.Unlock() + break + } + ts.lk.Unlock() + time.Sleep(100 * time.Millisecond) + } + + bf.Stop() + + slog.Info("shutting down") +} + +func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { + slog.Info("got create", "repo", repo, "path", path) + ts.lk.Lock() + ts.creates++ + ts.lk.Unlock() + return nil +} + +func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { + slog.Info("got update", "repo", repo, "path", path) + ts.lk.Lock() + ts.updates++ + ts.lk.Unlock() + return nil +} + +func (ts *testState) handleDelete(ctx context.Context, repo string, path string) error { + slog.Info("got delete", "repo", repo, "path", path) + ts.lk.Lock() + ts.deletes++ + ts.lk.Unlock() + return nil +} diff --git a/backfill/gormstore.go b/backfill/gormstore.go new file mode 100644 index 000000000..c874d7a7a --- /dev/null +++ b/backfill/gormstore.go @@ -0,0 +1,215 @@ +package backfill + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ipfs/go-cid" + typegen "github.com/whyrusleeping/cbor-gen" + "gorm.io/gorm" +) + +type Gormjob struct { + repo string + state string + lk sync.Mutex + bufferedOps map[string][]*bufferedOp + + dbj *GormDBJob + db *gorm.DB + + createdAt time.Time + updatedAt time.Time +} + +type GormDBJob struct { + gorm.Model + Repo string `gorm:"unique;index"` + State string `gorm:"index"` +} + +// Gormstore is a gorm-backed implementation of the Backfill Store interface +type Gormstore struct { + lk sync.RWMutex + jobs map[string]*Gormjob + db *gorm.DB +} + +func NewGormstore(db *gorm.DB) *Gormstore { + return &Gormstore{ + jobs: make(map[string]*Gormjob), + db: db, + } +} + +func (s *Gormstore) LoadJobs(ctx context.Context) error { + // Load all jobs from the database + var dbjobs []*GormDBJob + if err := s.db.Find(&dbjobs).Error; err != nil { + return err + } + + s.lk.Lock() + defer s.lk.Unlock() + + // Convert them to in-memory jobs + for i := range dbjobs { + dbj := dbjobs[i] + j := &Gormjob{ + repo: dbj.Repo, + state: dbj.State, + bufferedOps: map[string][]*bufferedOp{}, + createdAt: dbj.CreatedAt, + updatedAt: dbj.UpdatedAt, + + dbj: dbj, + db: s.db, + } + s.jobs[dbj.Repo] = j + } + + return nil +} + +func (s *Gormstore) EnqueueJob(repo string) error { + // Persist the job to the database + dbj := &GormDBJob{ + Repo: repo, + State: StateEnqueued, + } + if err := s.db.Create(dbj).Error; err != nil { + if err == gorm.ErrDuplicatedKey { + return nil + } + return err + } + + s.lk.Lock() + defer s.lk.Unlock() + + // Convert it to an in-memory job + if _, ok := s.jobs[repo]; ok { + // The DB create should have errored if the job already existed, but just in case + return fmt.Errorf("job already exists for repo %s", repo) + } + + j := &Gormjob{ + repo: repo, + createdAt: time.Now(), + updatedAt: time.Now(), + state: StateEnqueued, + bufferedOps: map[string][]*bufferedOp{}, + + dbj: dbj, + db: s.db, + } + s.jobs[repo] = j + + return nil +} + +func (s *Gormstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { + s.lk.RLock() + + // If the job doesn't exist, we can't buffer an op for it + j, ok := s.jobs[repo] + s.lk.RUnlock() + if !ok { + return false, ErrJobNotFound + } + + j.lk.Lock() + defer j.lk.Unlock() + + switch j.state { + case StateComplete: + return false, ErrJobComplete + case StateInProgress: + // keep going and buffer the op + default: + return false, nil + } + + j.bufferedOps[path] = append(j.bufferedOps[path], &bufferedOp{ + kind: kind, + rec: rec, + cid: cid, + }) + j.updatedAt = time.Now() + return true, nil +} + +func (s *Gormstore) GetJob(ctx context.Context, repo string) (Job, error) { + s.lk.RLock() + defer s.lk.RUnlock() + + j, ok := s.jobs[repo] + if !ok || j == nil { + return nil, nil + } + return j, nil +} + +func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) { + s.lk.RLock() + defer s.lk.RUnlock() + + for _, j := range s.jobs { + if j.State() == StateEnqueued { + return j, nil + } + } + return nil, nil +} + +func (j *Gormjob) Repo() string { + return j.repo +} + +func (j *Gormjob) State() string { + j.lk.Lock() + defer j.lk.Unlock() + + return j.state +} + +func (j *Gormjob) SetState(ctx context.Context, state string) error { + j.lk.Lock() + defer j.lk.Unlock() + + j.state = state + j.updatedAt = time.Now() + + // Persist the job to the database + j.dbj.State = state + return j.db.Save(j.dbj).Error +} + +func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error { + j.lk.Lock() + defer j.lk.Unlock() + + for path, ops := range j.bufferedOps { + for _, op := range ops { + if err := fn(op.kind, path, op.rec, op.cid); err != nil { + return err + } + } + } + + j.bufferedOps = map[string][]*bufferedOp{} + j.state = StateComplete + + return nil +} + +func (j *Gormjob) ClearBufferedOps(ctx context.Context) error { + j.lk.Lock() + defer j.lk.Unlock() + + j.bufferedOps = map[string][]*bufferedOp{} + j.updatedAt = time.Now() + return nil +} diff --git a/backfill/memstore.go b/backfill/memstore.go new file mode 100644 index 000000000..85e552efa --- /dev/null +++ b/backfill/memstore.go @@ -0,0 +1,159 @@ +package backfill + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ipfs/go-cid" + typegen "github.com/whyrusleeping/cbor-gen" +) + +type bufferedOp struct { + kind string + rec *typegen.CBORMarshaler + cid *cid.Cid +} + +type Memjob struct { + repo string + state string + lk sync.Mutex + bufferedOps map[string][]*bufferedOp + + createdAt time.Time + updatedAt time.Time +} + +// Memstore is a simple in-memory implementation of the Backfill Store interface +type Memstore struct { + lk sync.RWMutex + jobs map[string]*Memjob +} + +func NewMemstore() *Memstore { + return &Memstore{ + jobs: make(map[string]*Memjob), + } +} + +func (s *Memstore) EnqueueJob(repo string) error { + s.lk.Lock() + defer s.lk.Unlock() + + if _, ok := s.jobs[repo]; ok { + return fmt.Errorf("job already exists for repo %s", repo) + } + + j := &Memjob{ + repo: repo, + createdAt: time.Now(), + updatedAt: time.Now(), + state: StateEnqueued, + bufferedOps: map[string][]*bufferedOp{}, + } + s.jobs[repo] = j + return nil +} + +func (s *Memstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { + s.lk.Lock() + + // If the job doesn't exist, we can't buffer an op for it + j, ok := s.jobs[repo] + s.lk.Unlock() + if !ok { + return false, ErrJobNotFound + } + + j.lk.Lock() + defer j.lk.Unlock() + + switch j.state { + case StateComplete: + return false, ErrJobComplete + case StateInProgress: + // keep going and buffer the op + default: + return false, nil + } + + j.bufferedOps[path] = append(j.bufferedOps[path], &bufferedOp{ + kind: kind, + rec: rec, + cid: cid, + }) + j.updatedAt = time.Now() + return true, nil +} + +func (s *Memstore) GetJob(ctx context.Context, repo string) (Job, error) { + s.lk.RLock() + defer s.lk.RUnlock() + + j, ok := s.jobs[repo] + if !ok || j == nil { + return nil, nil + } + return j, nil +} + +func (s *Memstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) { + s.lk.RLock() + defer s.lk.RUnlock() + + for _, j := range s.jobs { + if j.State() == StateEnqueued { + return j, nil + } + } + return nil, nil +} + +func (j *Memjob) Repo() string { + return j.repo +} + +func (j *Memjob) State() string { + j.lk.Lock() + defer j.lk.Unlock() + + return j.state +} + +func (j *Memjob) SetState(ctx context.Context, state string) error { + j.lk.Lock() + defer j.lk.Unlock() + + j.state = state + j.updatedAt = time.Now() + return nil +} + +func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error { + j.lk.Lock() + defer j.lk.Unlock() + + for path, ops := range j.bufferedOps { + for _, op := range ops { + if err := fn(op.kind, path, op.rec, op.cid); err != nil { + return err + } + } + } + + j.bufferedOps = map[string][]*bufferedOp{} + j.state = StateComplete + + return nil +} + +func (j *Memjob) ClearBufferedOps(ctx context.Context) error { + j.lk.Lock() + defer j.lk.Unlock() + + j.bufferedOps = map[string][]*bufferedOp{} + j.updatedAt = time.Now() + return nil +} diff --git a/backfill/metrics.go b/backfill/metrics.go new file mode 100644 index 000000000..effa21ee2 --- /dev/null +++ b/backfill/metrics.go @@ -0,0 +1,31 @@ +package backfill + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var backfillJobsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "backfill_jobs_enqueued_total", + Help: "The total number of backfill jobs enqueued", +}, []string{"backfiller_name"}) + +var backfillJobsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "backfill_jobs_processed_total", + Help: "The total number of backfill jobs processed", +}, []string{"backfiller_name"}) + +var backfillRecordsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "backfill_records_processed_total", + Help: "The total number of backfill records processed", +}, []string{"backfiller_name"}) + +var backfillOpsBuffered = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "backfill_ops_buffered", + Help: "The number of backfill operations buffered", +}, []string{"backfiller_name"}) + +var backfillBytesProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "backfill_bytes_processed_total", + Help: "The total number of backfill bytes processed", +}, []string{"backfiller_name"}) diff --git a/backfill/util.go b/backfill/util.go new file mode 100644 index 000000000..f8f159290 --- /dev/null +++ b/backfill/util.go @@ -0,0 +1,33 @@ +package backfill + +import ( + "io" + + "github.com/prometheus/client_golang/prometheus" +) + +type instrumentedReader struct { + source io.ReadCloser + counter prometheus.Counter +} + +func (r instrumentedReader) Read(b []byte) (int, error) { + n, err := r.source.Read(b) + r.counter.Add(float64(n)) + return n, err +} + +func (r instrumentedReader) Close() error { + var buf [32]byte + var n int + var err error + for err == nil { + n, err = r.source.Read(buf[:]) + r.counter.Add(float64(n)) + } + closeerr := r.source.Close() + if err != nil && err != io.EOF { + return err + } + return closeerr +} diff --git a/search/firehose.go b/search/firehose.go new file mode 100644 index 000000000..53d48564c --- /dev/null +++ b/search/firehose.go @@ -0,0 +1,280 @@ +package search + +import ( + "bytes" + "context" + "fmt" + "net/http" + "strings" + + comatproto "github.com/bluesky-social/indigo/api/atproto" + bsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/backfill" + "github.com/bluesky-social/indigo/events" + "github.com/bluesky-social/indigo/events/schedulers/autoscaling" + lexutil "github.com/bluesky-social/indigo/lex/util" + "github.com/bluesky-social/indigo/repo" + "github.com/bluesky-social/indigo/repomgr" + "github.com/gorilla/websocket" + "github.com/ipfs/go-cid" + typegen "github.com/whyrusleeping/cbor-gen" +) + +func (s *Server) getLastCursor() (int64, error) { + var lastSeq LastSeq + if err := s.db.Find(&lastSeq).Error; err != nil { + return 0, err + } + + if lastSeq.ID == 0 { + return 0, s.db.Create(&lastSeq).Error + } + + return lastSeq.Seq, nil +} + +func (s *Server) updateLastCursor(curs int64) error { + return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error +} + +func (s *Server) RunIndexer(ctx context.Context) error { + cur, err := s.getLastCursor() + if err != nil { + return fmt.Errorf("get last cursor: %w", err) + } + + err = s.bfs.LoadJobs(ctx) + if err != nil { + return fmt.Errorf("loading backfill jobs: %w", err) + } + go s.bf.Start() + + d := websocket.DefaultDialer + con, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", s.bgshost, cur), http.Header{}) + if err != nil { + return fmt.Errorf("events dial failed: %w", err) + } + + rsc := &events.RepoStreamCallbacks{ + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { + defer func() { + if evt.Seq%50 == 0 { + if err := s.updateLastCursor(evt.Seq); err != nil { + log.Error("Failed to update cursor: ", err) + } + } + }() + if evt.TooBig && evt.Prev != nil { + log.Errorf("skipping non-genesis too big events for now: %d", evt.Seq) + return nil + } + + if evt.TooBig { + if err := s.processTooBigCommit(ctx, evt); err != nil { + log.Errorf("failed to process tooBig event: %s", err) + return nil + } + + return nil + } + + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) + if err != nil { + log.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err) + return nil + } + + for _, op := range evt.Ops { + ek := repomgr.EventKind(op.Action) + switch ek { + case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: + rc, rec, err := r.GetRecord(ctx, op.Path) + if err != nil { + e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) + log.Error(e) + return nil + } + + if lexutil.LexLink(rc) != *op.Cid { + log.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) + return nil + } + + if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { + log.Errorf("failed to handle op: %s", err) + return nil + } + + case repomgr.EvtKindDeleteRecord: + if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { + log.Errorf("failed to handle delete: %s", err) + return nil + } + } + } + + return nil + + }, + RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { + if err := s.updateUserHandle(ctx, evt.Did, evt.Handle); err != nil { + log.Errorf("failed to update user handle: %s", err) + } + return nil + }, + } + + return events.HandleRepoStream( + ctx, con, autoscaling.NewScheduler( + autoscaling.DefaultAutoscaleSettings(), + s.bgshost, + rsc.EventHandler, + ), + ) +} + +func (s *Server) handleCreateOrUpdate(ctx context.Context, did string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error { + // Since this gets called in a backfill job, we need to check if the path is a post or profile + if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { + return nil + } + + u, err := s.getOrCreateUser(ctx, did) + if err != nil { + return fmt.Errorf("checking user: %w", err) + } + rec := *recP + + switch rec := rec.(type) { + case *bsky.FeedPost: + if err := s.indexPost(ctx, u, rec, path, *rcid); err != nil { + return fmt.Errorf("indexing post: %w", err) + } + case *bsky.ActorProfile: + if err := s.indexProfile(ctx, u, rec); err != nil { + return fmt.Errorf("indexing profile: %w", err) + } + default: + } + return nil +} + +func (s *Server) handleDelete(ctx context.Context, did string, path string) error { + // Since this gets called in a backfill job, we need to check if the path is a post or profile + if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { + return nil + } + + u, err := s.getOrCreateUser(ctx, did) + if err != nil { + return err + } + + switch { + // TODO: handle profile deletes, its an edge case, but worth doing still + case strings.Contains(path, "app.bsky.feed.post"): + if err := s.deletePost(ctx, u, path); err != nil { + return err + } + } + + return nil +} + +func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec typegen.CBORMarshaler) error { + var err error + if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { + return nil + } + + if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord { + log.Infof("handling create(%d): %s - %s", seq, did, path) + + // Try to buffer the op, if it fails, we need to create a backfill job + _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) + if err == backfill.ErrJobNotFound { + log.Infof("no job found for repo %s, creating one", did) + + if err := s.bfs.EnqueueJob(did); err != nil { + return fmt.Errorf("enqueueing job: %w", err) + } + + // Try to buffer the op again so it gets picked up by the backfill job + _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) + if err != nil { + return fmt.Errorf("buffering op: %w", err) + } + } else if err == backfill.ErrJobComplete { + // Backfill is done for this repo so we can just index it now + err = s.handleCreateOrUpdate(ctx, did, path, &rec, rcid) + } + } else if op == repomgr.EvtKindDeleteRecord { + log.Infof("handling delete(%d): %s - %s", seq, did, path) + + // Try to buffer the op, if it fails, we need to create a backfill job + _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) + if err == backfill.ErrJobNotFound { + log.Infof("no job found for repo %s, creating one", did) + + if err := s.bfs.EnqueueJob(did); err != nil { + return fmt.Errorf("enqueueing job: %w", err) + } + + // Try to buffer the op again so it gets picked up by the backfill job + _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) + if err != nil { + return fmt.Errorf("buffering op: %w", err) + } + } else if err == backfill.ErrJobComplete { + // Backfill is done for this repo so we can delete imemdiately + err = s.handleDelete(ctx, did, path) + } + } + + if err != nil { + return fmt.Errorf("failed to handle op: %w", err) + } + + return nil +} + +func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { + repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "") + if err != nil { + return err + } + + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata)) + if err != nil { + return err + } + + u, err := s.getOrCreateUser(ctx, evt.Repo) + if err != nil { + return err + } + + return r.ForEach(ctx, "", func(k string, v cid.Cid) error { + if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") { + rcid, rec, err := r.GetRecord(ctx, k) + if err != nil { + log.Errorf("failed to get record from repo checkout: %s", err) + return nil + } + + switch rec := rec.(type) { + case *bsky.FeedPost: + if err := s.indexPost(ctx, u, rec, k, rcid); err != nil { + return fmt.Errorf("indexing post: %w", err) + } + case *bsky.ActorProfile: + if err := s.indexProfile(ctx, u, rec); err != nil { + return fmt.Errorf("indexing profile: %w", err) + } + default: + } + + } + return nil + }) +} diff --git a/search/server.go b/search/server.go index 3e7b7d138..063dec4bd 100644 --- a/search/server.go +++ b/search/server.go @@ -1,29 +1,20 @@ package search import ( - "bytes" "context" "encoding/base32" "encoding/json" "fmt" - "net/http" "strconv" "strings" api "github.com/bluesky-social/indigo/api" - comatproto "github.com/bluesky-social/indigo/api/atproto" bsky "github.com/bluesky-social/indigo/api/bsky" - "github.com/bluesky-social/indigo/events" - "github.com/bluesky-social/indigo/events/schedulers/autoscaling" - lexutil "github.com/bluesky-social/indigo/lex/util" - "github.com/bluesky-social/indigo/repo" - "github.com/bluesky-social/indigo/repomgr" + "github.com/bluesky-social/indigo/backfill" "github.com/bluesky-social/indigo/util/version" "github.com/bluesky-social/indigo/xrpc" - "github.com/gorilla/websocket" lru "github.com/hashicorp/golang-lru" - "github.com/ipfs/go-cid" flatfs "github.com/ipfs/go-ds-flatfs" blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log" @@ -44,6 +35,9 @@ type Server struct { plc *api.PLCServer echo *echo.Echo + bfs *backfill.Gormstore + bf *backfill.Backfiller + userCache *lru.Cache } @@ -72,6 +66,7 @@ func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) db.AutoMigrate(&PostRef{}) db.AutoMigrate(&User{}) db.AutoMigrate(&LastSeq{}) + db.AutoMigrate(&backfill.GormDBJob{}) // TODO: robust client xc := &xrpc.Client{ @@ -102,193 +97,22 @@ func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) plc: plc, userCache: ucache, } - return s, nil -} - -func (s *Server) getLastCursor() (int64, error) { - var lastSeq LastSeq - if err := s.db.Find(&lastSeq).Error; err != nil { - return 0, err - } - - if lastSeq.ID == 0 { - return 0, s.db.Create(&lastSeq).Error - } - - return lastSeq.Seq, nil -} - -func (s *Server) updateLastCursor(curs int64) error { - return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error -} - -func (s *Server) RunIndexer(ctx context.Context) error { - cur, err := s.getLastCursor() - if err != nil { - return fmt.Errorf("get last cursor: %w", err) - } - - d := websocket.DefaultDialer - con, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", s.bgshost, cur), http.Header{}) - if err != nil { - return fmt.Errorf("events dial failed: %w", err) - } - - rsc := &events.RepoStreamCallbacks{ - RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { - if evt.TooBig && evt.Prev != nil { - log.Errorf("skipping non-genesis too big events for now: %d", evt.Seq) - return nil - } - - if evt.TooBig { - if err := s.processTooBigCommit(ctx, evt); err != nil { - log.Errorf("failed to process tooBig event: %s", err) - return nil - } - - return nil - } - - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) - if err != nil { - log.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err) - return nil - } - - for _, op := range evt.Ops { - ek := repomgr.EventKind(op.Action) - switch ek { - case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: - rc, rec, err := r.GetRecord(ctx, op.Path) - if err != nil { - e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) - log.Error(e) - return nil - } - - if lexutil.LexLink(rc) != *op.Cid { - log.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) - return nil - } - - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { - log.Errorf("failed to handle op: %s", err) - return nil - } - - case repomgr.EvtKindDeleteRecord: - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { - log.Errorf("failed to handle delete: %s", err) - return nil - } - } - } - - return nil - - }, - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { - if err := s.updateUserHandle(ctx, evt.Did, evt.Handle); err != nil { - log.Errorf("failed to update user handle: %s", err) - } - return nil - }, - } - return events.HandleRepoStream( - ctx, con, autoscaling.NewScheduler( - autoscaling.DefaultAutoscaleSettings(), - s.bgshost, - rsc.EventHandler, - ), + bfstore := backfill.NewGormstore(db) + opts := backfill.DefaultBackfillOptions() + bf := backfill.NewBackfiller( + "search", + bfstore, + s.handleCreateOrUpdate, + s.handleCreateOrUpdate, + s.handleDelete, + opts, ) -} - -func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec any) error { - if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord { - - log.Infof("handling event(%d): %s - %s", seq, did, path) - u, err := s.getOrCreateUser(ctx, did) - if err != nil { - return fmt.Errorf("checking user: %w", err) - } - switch rec := rec.(type) { - case *bsky.FeedPost: - if err := s.indexPost(ctx, u, rec, path, *rcid); err != nil { - return fmt.Errorf("indexing post: %w", err) - } - case *bsky.ActorProfile: - if err := s.indexProfile(ctx, u, rec); err != nil { - return fmt.Errorf("indexing profile: %w", err) - } - default: - } - - } else if op == repomgr.EvtKindDeleteRecord { - u, err := s.getOrCreateUser(ctx, did) - if err != nil { - return err - } - - switch { - // TODO: handle profile deletes, its an edge case, but worth doing still - case strings.Contains(path, "app.bsky.feed.post"): - if err := s.deletePost(ctx, u, path); err != nil { - return err - } - } - - } - - if seq%50 == 0 { - if err := s.updateLastCursor(seq); err != nil { - log.Error("Failed to update cursor: ", err) - } - } - - return nil -} - -func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { - repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "") - if err != nil { - return err - } - - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata)) - if err != nil { - return err - } - - u, err := s.getOrCreateUser(ctx, evt.Repo) - if err != nil { - return err - } - return r.ForEach(ctx, "", func(k string, v cid.Cid) error { - if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") { - rcid, rec, err := r.GetRecord(ctx, k) - if err != nil { - log.Errorf("failed to get record from repo checkout: %s", err) - return nil - } - - switch rec := rec.(type) { - case *bsky.FeedPost: - if err := s.indexPost(ctx, u, rec, k, rcid); err != nil { - return fmt.Errorf("indexing post: %w", err) - } - case *bsky.ActorProfile: - if err := s.indexProfile(ctx, u, rec); err != nil { - return fmt.Errorf("indexing profile: %w", err) - } - default: - } + s.bfs = bfstore + s.bf = bf - } - return nil - }) + return s, nil } func (s *Server) SearchPosts(ctx context.Context, srch string, offset, size int) ([]PostSearchResult, error) {