From b675e0f78cbdaa6951966c94672289408a6694d3 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Thu, 31 Aug 2023 17:52:58 +0000 Subject: [PATCH 1/6] Add backfilling to search backed by Gorm --- backfill/backfill.go | 397 ++++++++++++++++++++++++++++++++++++++ backfill/backfill_test.go | 124 ++++++++++++ backfill/gormstore.go | 215 +++++++++++++++++++++ backfill/memstore.go | 159 +++++++++++++++ backfill/metrics.go | 31 +++ backfill/util.go | 33 ++++ search/firehose.go | 280 +++++++++++++++++++++++++++ search/server.go | 211 ++------------------ 8 files changed, 1257 insertions(+), 193 deletions(-) create mode 100644 backfill/backfill.go create mode 100644 backfill/backfill_test.go create mode 100644 backfill/gormstore.go create mode 100644 backfill/memstore.go create mode 100644 backfill/metrics.go create mode 100644 backfill/util.go create mode 100644 search/firehose.go diff --git a/backfill/backfill.go b/backfill/backfill.go new file mode 100644 index 000000000..e01bee474 --- /dev/null +++ b/backfill/backfill.go @@ -0,0 +1,397 @@ +package backfill + +import ( + "context" + "errors" + "fmt" + "net/http" + "sync" + "time" + + // Blank import to register types for CBORGEN + _ "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/repo" + "github.com/bluesky-social/indigo/repomgr" + "github.com/ipfs/go-cid" + typegen "github.com/whyrusleeping/cbor-gen" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.uber.org/zap" + "golang.org/x/time/rate" +) + +// Job is an interface for a backfill job +type Job interface { + Repo() string + State() string + SetState(ctx context.Context, state string) error + + // FlushBufferedOps calls the given callback for each buffered operation + // Once done it clears the buffer and marks the job as "complete" + // Allowing the Job interface to abstract away the details of how buffered + // operations are stored and/or locked + FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error + + ClearBufferedOps(ctx context.Context) error +} + +// Store is an interface for a backfill store which holds Jobs +type Store interface { + // BufferOp buffers an operation for a job and returns true if the operation was buffered + // If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete) + BufferOp(ctx context.Context, repo string, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) + GetJob(ctx context.Context, repo string) (Job, error) + GetNextEnqueuedJob(ctx context.Context) (Job, error) +} + +// Backfiller is a struct which handles backfilling a repo +type Backfiller struct { + Name string + HandleCreateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error + HandleUpdateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error + HandleDeleteRecord func(ctx context.Context, repo string, path string) error + Store Store + + // Number of backfills to process in parallel + ParallelBackfills int + // Number of records to process in parallel for each backfill + ParallelRecordCreates int + // Prefix match for records to backfill i.e. app.bsky.feed.app/ + // If empty, all records will be backfilled + NSIDFilter string + CheckoutPath string + + Logger *zap.SugaredLogger + syncLimiter *rate.Limiter + + magicHeaderKey string + magicHeaderVal string + + stop chan chan struct{} +} + +var ( + // StateEnqueued is the state of a backfill job when it is first created + StateEnqueued = "enqueued" + // StateInProgress is the state of a backfill job when it is being processed + StateInProgress = "in_progress" + // StateComplete is the state of a backfill job when it has been processed + StateComplete = "complete" +) + +// ErrJobComplete is returned when trying to buffer an op for a job that is complete +var ErrJobComplete = errors.New("job is complete") + +// ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist +var ErrJobNotFound = errors.New("job not found") + +var tracer = otel.Tracer("backfiller") + +type BackfillOptions struct { + ParallelBackfills int + ParallelRecordCreates int + NSIDFilter string + SyncRequestsPerSecond int + CheckoutPath string +} + +func DefaultBackfillOptions() *BackfillOptions { + return &BackfillOptions{ + ParallelBackfills: 10, + ParallelRecordCreates: 100, + NSIDFilter: "", + SyncRequestsPerSecond: 2, + CheckoutPath: "https://bsky.social/xrpc/com.atproto.sync.getCheckout", + } +} + +// NewBackfiller creates a new Backfiller +func NewBackfiller( + name string, + store Store, + handleCreate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, + handleUpdate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, + handleDelete func(ctx context.Context, repo string, path string) error, + logger *zap.SugaredLogger, + opts *BackfillOptions, +) *Backfiller { + if opts == nil { + opts = DefaultBackfillOptions() + } + return &Backfiller{ + Name: name, + Store: store, + HandleCreateRecord: handleCreate, + HandleUpdateRecord: handleUpdate, + HandleDeleteRecord: handleDelete, + ParallelBackfills: opts.ParallelBackfills, + ParallelRecordCreates: opts.ParallelRecordCreates, + NSIDFilter: opts.NSIDFilter, + Logger: logger, + syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), 1), + CheckoutPath: opts.CheckoutPath, + stop: make(chan chan struct{}), + } +} + +// Start starts the backfill processor routine +func (b *Backfiller) Start() { + ctx := context.Background() + + log := b.Logger.With("source", "backfiller_main") + log.Info("starting backfill processor") + + sem := make(chan struct{}, b.ParallelBackfills) + + for { + select { + case stopped := <-b.stop: + log.Info("stopping backfill processor") + close(stopped) + return + default: + } + + // Get the next job + job, err := b.Store.GetNextEnqueuedJob(ctx) + if err != nil { + log.Errorf("failed to get next backfill: %+v", err) + time.Sleep(1 * time.Second) + continue + } else if job == nil { + time.Sleep(1 * time.Second) + continue + } + + // Mark the backfill as "in progress" + err = job.SetState(ctx, StateInProgress) + if err != nil { + log.Errorf("failed to set backfill state: %+v", err) + continue + } + + sem <- struct{}{} + go func(j Job) { + b.BackfillRepo(ctx, j) + backfillJobsProcessed.WithLabelValues(b.Name).Inc() + <-sem + }(job) + } +} + +// Stop stops the backfill processor +func (b *Backfiller) Stop() { + b.Logger.Info("stopping backfill processor") + stopped := make(chan struct{}) + b.stop <- stopped + <-stopped + b.Logger.Info("backfill processor stopped") +} + +// FlushBuffer processes buffered operations for a job +func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { + ctx, span := tracer.Start(ctx, "FlushBuffer") + defer span.End() + log := b.Logger.With("source", "backfiller_buffer_flush", "repo", job.Repo()) + + processed := 0 + + repo := job.Repo() + + // Flush buffered operations, clear the buffer, and mark the job as "complete" + // Clearning and marking are handled by the job interface + err := job.FlushBufferedOps(ctx, func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { + switch repomgr.EventKind(kind) { + case repomgr.EvtKindCreateRecord: + err := b.HandleCreateRecord(ctx, repo, path, rec, cid) + if err != nil { + log.Errorf("failed to handle create record: %+v", err) + } + case repomgr.EvtKindUpdateRecord: + err := b.HandleUpdateRecord(ctx, repo, path, rec, cid) + if err != nil { + log.Errorf("failed to handle update record: %+v", err) + } + case repomgr.EvtKindDeleteRecord: + err := b.HandleDeleteRecord(ctx, repo, path) + if err != nil { + log.Errorf("failed to handle delete record: %+v", err) + } + } + backfillOpsBuffered.WithLabelValues(b.Name).Dec() + processed++ + return nil + }) + if err != nil { + log.Errorf("failed to flush buffered ops: %+v", err) + } + + return processed +} + +type recordQueueItem struct { + recordPath string + nodeCid cid.Cid +} + +type recordResult struct { + recordPath string + err error +} + +// BackfillRepo backfills a repo +func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { + ctx, span := tracer.Start(ctx, "BackfillRepo") + defer span.End() + + start := time.Now() + + repoDid := job.Repo() + + log := b.Logger.With("source", "backfiller_backfill_repo", "repo", repoDid) + log.Infof("processing backfill for %s", repoDid) + + var url = fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) + + // GET and CAR decode the body + client := &http.Client{ + Transport: otelhttp.NewTransport(http.DefaultTransport), + Timeout: 120 * time.Second, + } + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + log.Errorf("Error creating request: %v", err) + return + } + + req.Header.Set("Accept", "application/vnd.ipld.car") + req.Header.Set("User-Agent", fmt.Sprintf("atproto-backfill-%s/0.0.1", b.Name)) + if b.magicHeaderKey != "" && b.magicHeaderVal != "" { + req.Header.Set(b.magicHeaderKey, b.magicHeaderVal) + } + + b.syncLimiter.Wait(ctx) + + resp, err := client.Do(req) + if err != nil { + log.Errorf("Error sending request: %v", err) + return + } + + if resp.StatusCode != http.StatusOK { + log.Errorf("Error response: %v", resp.StatusCode) + reason := "unknown error" + if resp.StatusCode == http.StatusBadRequest { + reason = "repo not found" + } + state := fmt.Sprintf("failed (%s)", reason) + + // Mark the job as "failed" + err := job.SetState(ctx, state) + if err != nil { + log.Errorf("failed to set job state: %+v", err) + } + + // Clear buffered ops + err = job.ClearBufferedOps(ctx) + if err != nil { + log.Errorf("failed to clear buffered ops: %+v", err) + } + return + } + + instrumentedReader := instrumentedReader{ + source: resp.Body, + counter: backfillBytesProcessed.WithLabelValues(b.Name), + } + + defer instrumentedReader.Close() + + r, err := repo.ReadRepoFromCar(ctx, instrumentedReader) + if err != nil { + log.Errorf("Error reading repo: %v", err) + + state := "failed (couldn't read repo CAR from response body)" + + // Mark the job as "failed" + err := job.SetState(ctx, state) + if err != nil { + log.Errorf("failed to set job state: %+v", err) + } + + // Clear buffered ops + err = job.ClearBufferedOps(ctx) + if err != nil { + log.Errorf("failed to clear buffered ops: %+v", err) + } + return + } + + numRecords := 0 + numRoutines := b.ParallelRecordCreates + recordQueue := make(chan recordQueueItem, numRoutines) + recordResults := make(chan recordResult, numRoutines) + + wg := sync.WaitGroup{} + + // Producer routine + go func() { + defer close(recordQueue) + r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error { + numRecords++ + recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid} + return nil + }) + }() + + // Consumer routines + for i := 0; i < numRoutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for item := range recordQueue { + recordCid, rec, err := r.GetRecord(ctx, item.recordPath) + if err != nil { + recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get record: %w", err)} + continue + } + + // Verify that the record cid matches the cid in the event + if recordCid != item.nodeCid { + recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("mismatch in record and op cid: %s != %s", recordCid, item.nodeCid)} + continue + } + + err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, &rec, &recordCid) + if err != nil { + recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} + continue + } + + backfillRecordsProcessed.WithLabelValues(b.Name).Inc() + recordResults <- recordResult{recordPath: item.recordPath, err: err} + } + }() + } + + resultWG := sync.WaitGroup{} + resultWG.Add(1) + // Handle results + go func() { + defer resultWG.Done() + for result := range recordResults { + if result.err != nil { + log.Errorf("Error processing record %s: %v", result.recordPath, result.err) + } + } + }() + + wg.Wait() + close(recordResults) + resultWG.Wait() + + // Process buffered operations, marking the job as "complete" when done + numProcessed := b.FlushBuffer(ctx, job) + + log.Infow("backfill complete", "buffered_records_processed", numProcessed, "records_backfilled", numRecords, "duration", time.Since(start)) +} diff --git a/backfill/backfill_test.go b/backfill/backfill_test.go new file mode 100644 index 000000000..b675d2d47 --- /dev/null +++ b/backfill/backfill_test.go @@ -0,0 +1,124 @@ +package backfill_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/bluesky-social/indigo/backfill" + "github.com/ipfs/go-cid" + typegen "github.com/whyrusleeping/cbor-gen" + "go.uber.org/zap" +) + +var logger *zap.SugaredLogger + +type testState struct { + creates int + updates int + deletes int + lk sync.Mutex +} + +func TestBackfill(t *testing.T) { + ctx := context.Background() + + testRepos := []string{ + "did:plc:q6gjnaw2blty4crticxkmujt", + "did:plc:f5f4diimystr7ima7nqvamhe", + "did:plc:t7y4sud4dhptvzz7ibnv5cbt", + } + + mem := backfill.NewMemstore() + + rawLog, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + + logger = rawLog.Sugar() + + ts := &testState{} + + opts := backfill.DefaultBackfillOptions() + opts.NSIDFilter = "app.bsky.feed.follow/" + + bf := backfill.NewBackfiller( + "backfill-test", + mem, + ts.handleCreate, + ts.handleUpdate, + ts.handleDelete, + logger, + opts, + ) + + logger.Info("starting backfiller") + + go bf.Start() + + for _, repo := range testRepos { + mem.EnqueueJob(repo) + } + + // Wait until job 0 is in progress + for { + s, err := mem.GetJob(ctx, testRepos[0]) + if err != nil { + t.Fatal(err) + } + if s.State() == backfill.StateInProgress { + mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/1", nil, &cid.Undef) + mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/2", nil, &cid.Undef) + mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/3", nil, &cid.Undef) + mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/4", nil, &cid.Undef) + mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/5", nil, &cid.Undef) + + mem.BufferOp(ctx, testRepos[0], "create", "app.bsky.feed.follow/1", nil, &cid.Undef) + + mem.BufferOp(ctx, testRepos[0], "update", "app.bsky.feed.follow/1", nil, &cid.Undef) + + break + } + time.Sleep(100 * time.Millisecond) + } + + for { + ts.lk.Lock() + if ts.deletes >= 5 && ts.creates >= 1 && ts.updates >= 1 { + ts.lk.Unlock() + break + } + ts.lk.Unlock() + time.Sleep(100 * time.Millisecond) + } + + bf.Stop() + + logger.Infof("shutting down") +} + +func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { + logger.Infof("got create: %s %s", repo, path) + ts.lk.Lock() + ts.creates++ + ts.lk.Unlock() + return nil +} + +func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { + logger.Infof("got update: %s %s", repo, path) + ts.lk.Lock() + ts.updates++ + ts.lk.Unlock() + return nil +} + +func (ts *testState) handleDelete(ctx context.Context, repo string, path string) error { + logger.Infof("got delete: %s %s", repo, path) + ts.lk.Lock() + ts.deletes++ + ts.lk.Unlock() + return nil +} diff --git a/backfill/gormstore.go b/backfill/gormstore.go new file mode 100644 index 000000000..c874d7a7a --- /dev/null +++ b/backfill/gormstore.go @@ -0,0 +1,215 @@ +package backfill + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ipfs/go-cid" + typegen "github.com/whyrusleeping/cbor-gen" + "gorm.io/gorm" +) + +type Gormjob struct { + repo string + state string + lk sync.Mutex + bufferedOps map[string][]*bufferedOp + + dbj *GormDBJob + db *gorm.DB + + createdAt time.Time + updatedAt time.Time +} + +type GormDBJob struct { + gorm.Model + Repo string `gorm:"unique;index"` + State string `gorm:"index"` +} + +// Gormstore is a gorm-backed implementation of the Backfill Store interface +type Gormstore struct { + lk sync.RWMutex + jobs map[string]*Gormjob + db *gorm.DB +} + +func NewGormstore(db *gorm.DB) *Gormstore { + return &Gormstore{ + jobs: make(map[string]*Gormjob), + db: db, + } +} + +func (s *Gormstore) LoadJobs(ctx context.Context) error { + // Load all jobs from the database + var dbjobs []*GormDBJob + if err := s.db.Find(&dbjobs).Error; err != nil { + return err + } + + s.lk.Lock() + defer s.lk.Unlock() + + // Convert them to in-memory jobs + for i := range dbjobs { + dbj := dbjobs[i] + j := &Gormjob{ + repo: dbj.Repo, + state: dbj.State, + bufferedOps: map[string][]*bufferedOp{}, + createdAt: dbj.CreatedAt, + updatedAt: dbj.UpdatedAt, + + dbj: dbj, + db: s.db, + } + s.jobs[dbj.Repo] = j + } + + return nil +} + +func (s *Gormstore) EnqueueJob(repo string) error { + // Persist the job to the database + dbj := &GormDBJob{ + Repo: repo, + State: StateEnqueued, + } + if err := s.db.Create(dbj).Error; err != nil { + if err == gorm.ErrDuplicatedKey { + return nil + } + return err + } + + s.lk.Lock() + defer s.lk.Unlock() + + // Convert it to an in-memory job + if _, ok := s.jobs[repo]; ok { + // The DB create should have errored if the job already existed, but just in case + return fmt.Errorf("job already exists for repo %s", repo) + } + + j := &Gormjob{ + repo: repo, + createdAt: time.Now(), + updatedAt: time.Now(), + state: StateEnqueued, + bufferedOps: map[string][]*bufferedOp{}, + + dbj: dbj, + db: s.db, + } + s.jobs[repo] = j + + return nil +} + +func (s *Gormstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { + s.lk.RLock() + + // If the job doesn't exist, we can't buffer an op for it + j, ok := s.jobs[repo] + s.lk.RUnlock() + if !ok { + return false, ErrJobNotFound + } + + j.lk.Lock() + defer j.lk.Unlock() + + switch j.state { + case StateComplete: + return false, ErrJobComplete + case StateInProgress: + // keep going and buffer the op + default: + return false, nil + } + + j.bufferedOps[path] = append(j.bufferedOps[path], &bufferedOp{ + kind: kind, + rec: rec, + cid: cid, + }) + j.updatedAt = time.Now() + return true, nil +} + +func (s *Gormstore) GetJob(ctx context.Context, repo string) (Job, error) { + s.lk.RLock() + defer s.lk.RUnlock() + + j, ok := s.jobs[repo] + if !ok || j == nil { + return nil, nil + } + return j, nil +} + +func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) { + s.lk.RLock() + defer s.lk.RUnlock() + + for _, j := range s.jobs { + if j.State() == StateEnqueued { + return j, nil + } + } + return nil, nil +} + +func (j *Gormjob) Repo() string { + return j.repo +} + +func (j *Gormjob) State() string { + j.lk.Lock() + defer j.lk.Unlock() + + return j.state +} + +func (j *Gormjob) SetState(ctx context.Context, state string) error { + j.lk.Lock() + defer j.lk.Unlock() + + j.state = state + j.updatedAt = time.Now() + + // Persist the job to the database + j.dbj.State = state + return j.db.Save(j.dbj).Error +} + +func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error { + j.lk.Lock() + defer j.lk.Unlock() + + for path, ops := range j.bufferedOps { + for _, op := range ops { + if err := fn(op.kind, path, op.rec, op.cid); err != nil { + return err + } + } + } + + j.bufferedOps = map[string][]*bufferedOp{} + j.state = StateComplete + + return nil +} + +func (j *Gormjob) ClearBufferedOps(ctx context.Context) error { + j.lk.Lock() + defer j.lk.Unlock() + + j.bufferedOps = map[string][]*bufferedOp{} + j.updatedAt = time.Now() + return nil +} diff --git a/backfill/memstore.go b/backfill/memstore.go new file mode 100644 index 000000000..85e552efa --- /dev/null +++ b/backfill/memstore.go @@ -0,0 +1,159 @@ +package backfill + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ipfs/go-cid" + typegen "github.com/whyrusleeping/cbor-gen" +) + +type bufferedOp struct { + kind string + rec *typegen.CBORMarshaler + cid *cid.Cid +} + +type Memjob struct { + repo string + state string + lk sync.Mutex + bufferedOps map[string][]*bufferedOp + + createdAt time.Time + updatedAt time.Time +} + +// Memstore is a simple in-memory implementation of the Backfill Store interface +type Memstore struct { + lk sync.RWMutex + jobs map[string]*Memjob +} + +func NewMemstore() *Memstore { + return &Memstore{ + jobs: make(map[string]*Memjob), + } +} + +func (s *Memstore) EnqueueJob(repo string) error { + s.lk.Lock() + defer s.lk.Unlock() + + if _, ok := s.jobs[repo]; ok { + return fmt.Errorf("job already exists for repo %s", repo) + } + + j := &Memjob{ + repo: repo, + createdAt: time.Now(), + updatedAt: time.Now(), + state: StateEnqueued, + bufferedOps: map[string][]*bufferedOp{}, + } + s.jobs[repo] = j + return nil +} + +func (s *Memstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { + s.lk.Lock() + + // If the job doesn't exist, we can't buffer an op for it + j, ok := s.jobs[repo] + s.lk.Unlock() + if !ok { + return false, ErrJobNotFound + } + + j.lk.Lock() + defer j.lk.Unlock() + + switch j.state { + case StateComplete: + return false, ErrJobComplete + case StateInProgress: + // keep going and buffer the op + default: + return false, nil + } + + j.bufferedOps[path] = append(j.bufferedOps[path], &bufferedOp{ + kind: kind, + rec: rec, + cid: cid, + }) + j.updatedAt = time.Now() + return true, nil +} + +func (s *Memstore) GetJob(ctx context.Context, repo string) (Job, error) { + s.lk.RLock() + defer s.lk.RUnlock() + + j, ok := s.jobs[repo] + if !ok || j == nil { + return nil, nil + } + return j, nil +} + +func (s *Memstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) { + s.lk.RLock() + defer s.lk.RUnlock() + + for _, j := range s.jobs { + if j.State() == StateEnqueued { + return j, nil + } + } + return nil, nil +} + +func (j *Memjob) Repo() string { + return j.repo +} + +func (j *Memjob) State() string { + j.lk.Lock() + defer j.lk.Unlock() + + return j.state +} + +func (j *Memjob) SetState(ctx context.Context, state string) error { + j.lk.Lock() + defer j.lk.Unlock() + + j.state = state + j.updatedAt = time.Now() + return nil +} + +func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error { + j.lk.Lock() + defer j.lk.Unlock() + + for path, ops := range j.bufferedOps { + for _, op := range ops { + if err := fn(op.kind, path, op.rec, op.cid); err != nil { + return err + } + } + } + + j.bufferedOps = map[string][]*bufferedOp{} + j.state = StateComplete + + return nil +} + +func (j *Memjob) ClearBufferedOps(ctx context.Context) error { + j.lk.Lock() + defer j.lk.Unlock() + + j.bufferedOps = map[string][]*bufferedOp{} + j.updatedAt = time.Now() + return nil +} diff --git a/backfill/metrics.go b/backfill/metrics.go new file mode 100644 index 000000000..effa21ee2 --- /dev/null +++ b/backfill/metrics.go @@ -0,0 +1,31 @@ +package backfill + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var backfillJobsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "backfill_jobs_enqueued_total", + Help: "The total number of backfill jobs enqueued", +}, []string{"backfiller_name"}) + +var backfillJobsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "backfill_jobs_processed_total", + Help: "The total number of backfill jobs processed", +}, []string{"backfiller_name"}) + +var backfillRecordsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "backfill_records_processed_total", + Help: "The total number of backfill records processed", +}, []string{"backfiller_name"}) + +var backfillOpsBuffered = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "backfill_ops_buffered", + Help: "The number of backfill operations buffered", +}, []string{"backfiller_name"}) + +var backfillBytesProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "backfill_bytes_processed_total", + Help: "The total number of backfill bytes processed", +}, []string{"backfiller_name"}) diff --git a/backfill/util.go b/backfill/util.go new file mode 100644 index 000000000..f8f159290 --- /dev/null +++ b/backfill/util.go @@ -0,0 +1,33 @@ +package backfill + +import ( + "io" + + "github.com/prometheus/client_golang/prometheus" +) + +type instrumentedReader struct { + source io.ReadCloser + counter prometheus.Counter +} + +func (r instrumentedReader) Read(b []byte) (int, error) { + n, err := r.source.Read(b) + r.counter.Add(float64(n)) + return n, err +} + +func (r instrumentedReader) Close() error { + var buf [32]byte + var n int + var err error + for err == nil { + n, err = r.source.Read(buf[:]) + r.counter.Add(float64(n)) + } + closeerr := r.source.Close() + if err != nil && err != io.EOF { + return err + } + return closeerr +} diff --git a/search/firehose.go b/search/firehose.go new file mode 100644 index 000000000..b59451c14 --- /dev/null +++ b/search/firehose.go @@ -0,0 +1,280 @@ +package search + +import ( + "bytes" + "context" + "fmt" + "net/http" + "strings" + + comatproto "github.com/bluesky-social/indigo/api/atproto" + bsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/backfill" + "github.com/bluesky-social/indigo/events" + "github.com/bluesky-social/indigo/events/schedulers/autoscaling" + lexutil "github.com/bluesky-social/indigo/lex/util" + "github.com/bluesky-social/indigo/repo" + "github.com/bluesky-social/indigo/repomgr" + "github.com/gorilla/websocket" + "github.com/ipfs/go-cid" + typegen "github.com/whyrusleeping/cbor-gen" +) + +func (s *Server) getLastCursor() (int64, error) { + var lastSeq LastSeq + if err := s.db.Find(&lastSeq).Error; err != nil { + return 0, err + } + + if lastSeq.ID == 0 { + return 0, s.db.Create(&lastSeq).Error + } + + return lastSeq.Seq, nil +} + +func (s *Server) updateLastCursor(curs int64) error { + return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error +} + +func (s *Server) RunIndexer(ctx context.Context) error { + cur, err := s.getLastCursor() + if err != nil { + return fmt.Errorf("get last cursor: %w", err) + } + + err = s.bfs.LoadJobs(ctx) + if err != nil { + return fmt.Errorf("loading backfill jobs: %w", err) + } + s.bf.Start() + + d := websocket.DefaultDialer + con, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", s.bgshost, cur), http.Header{}) + if err != nil { + return fmt.Errorf("events dial failed: %w", err) + } + + rsc := &events.RepoStreamCallbacks{ + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { + defer func() { + if evt.Seq%50 == 0 { + if err := s.updateLastCursor(evt.Seq); err != nil { + log.Error("Failed to update cursor: ", err) + } + } + }() + if evt.TooBig && evt.Prev != nil { + log.Errorf("skipping non-genesis too big events for now: %d", evt.Seq) + return nil + } + + if evt.TooBig { + if err := s.processTooBigCommit(ctx, evt); err != nil { + log.Errorf("failed to process tooBig event: %s", err) + return nil + } + + return nil + } + + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) + if err != nil { + log.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err) + return nil + } + + for _, op := range evt.Ops { + ek := repomgr.EventKind(op.Action) + switch ek { + case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: + rc, rec, err := r.GetRecord(ctx, op.Path) + if err != nil { + e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) + log.Error(e) + return nil + } + + if lexutil.LexLink(rc) != *op.Cid { + log.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) + return nil + } + + if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { + log.Errorf("failed to handle op: %s", err) + return nil + } + + case repomgr.EvtKindDeleteRecord: + if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { + log.Errorf("failed to handle delete: %s", err) + return nil + } + } + } + + return nil + + }, + RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { + if err := s.updateUserHandle(ctx, evt.Did, evt.Handle); err != nil { + log.Errorf("failed to update user handle: %s", err) + } + return nil + }, + } + + return events.HandleRepoStream( + ctx, con, autoscaling.NewScheduler( + autoscaling.DefaultAutoscaleSettings(), + s.bgshost, + rsc.EventHandler, + ), + ) +} + +func (s *Server) handleCreateOrUpdate(ctx context.Context, did string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error { + // Since this gets called in a backfill job, we need to check if the path is a post or profile + if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { + return nil + } + + u, err := s.getOrCreateUser(ctx, did) + if err != nil { + return fmt.Errorf("checking user: %w", err) + } + rec := *recP + + switch rec := rec.(type) { + case *bsky.FeedPost: + if err := s.indexPost(ctx, u, rec, path, *rcid); err != nil { + return fmt.Errorf("indexing post: %w", err) + } + case *bsky.ActorProfile: + if err := s.indexProfile(ctx, u, rec); err != nil { + return fmt.Errorf("indexing profile: %w", err) + } + default: + } + return nil +} + +func (s *Server) handleDelete(ctx context.Context, did string, path string) error { + // Since this gets called in a backfill job, we need to check if the path is a post or profile + if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { + return nil + } + + u, err := s.getOrCreateUser(ctx, did) + if err != nil { + return err + } + + switch { + // TODO: handle profile deletes, its an edge case, but worth doing still + case strings.Contains(path, "app.bsky.feed.post"): + if err := s.deletePost(ctx, u, path); err != nil { + return err + } + } + + return nil +} + +func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec typegen.CBORMarshaler) error { + var err error + if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { + return nil + } + + if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord { + log.Infof("handling create(%d): %s - %s", seq, did, path) + + // Try to buffer the op, if it fails, we need to create a backfill job + _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) + if err == backfill.ErrJobNotFound { + log.Infof("no job found for repo %s, creating one", did) + + if err := s.bfs.EnqueueJob(did); err != nil { + return fmt.Errorf("enqueueing job: %w", err) + } + + // Try to buffer the op again so it gets picked up by the backfill job + _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) + if err != nil { + return fmt.Errorf("buffering op: %w", err) + } + } else if err == backfill.ErrJobComplete { + // Backfill is done for this repo so we can just index it now + err = s.handleCreateOrUpdate(ctx, did, path, &rec, rcid) + } + } else if op == repomgr.EvtKindDeleteRecord { + log.Infof("handling delete(%d): %s - %s", seq, did, path) + + // Try to buffer the op, if it fails, we need to create a backfill job + _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) + if err == backfill.ErrJobNotFound { + log.Infof("no job found for repo %s, creating one", did) + + if err := s.bfs.EnqueueJob(did); err != nil { + return fmt.Errorf("enqueueing job: %w", err) + } + + // Try to buffer the op again so it gets picked up by the backfill job + _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) + if err != nil { + return fmt.Errorf("buffering op: %w", err) + } + } else if err == backfill.ErrJobComplete { + // Backfill is done for this repo so we can delete imemdiately + err = s.handleDelete(ctx, did, path) + } + } + + if err != nil { + return fmt.Errorf("failed to handle op: %w", err) + } + + return nil +} + +func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { + repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "", evt.Commit.String()) + if err != nil { + return err + } + + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata)) + if err != nil { + return err + } + + u, err := s.getOrCreateUser(ctx, evt.Repo) + if err != nil { + return err + } + + return r.ForEach(ctx, "", func(k string, v cid.Cid) error { + if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") { + rcid, rec, err := r.GetRecord(ctx, k) + if err != nil { + log.Errorf("failed to get record from repo checkout: %s", err) + return nil + } + + switch rec := rec.(type) { + case *bsky.FeedPost: + if err := s.indexPost(ctx, u, rec, k, rcid); err != nil { + return fmt.Errorf("indexing post: %w", err) + } + case *bsky.ActorProfile: + if err := s.indexProfile(ctx, u, rec); err != nil { + return fmt.Errorf("indexing profile: %w", err) + } + default: + } + + } + return nil + }) +} diff --git a/search/server.go b/search/server.go index a21614f3c..0cfb2f26e 100644 --- a/search/server.go +++ b/search/server.go @@ -1,29 +1,20 @@ package search import ( - "bytes" "context" "encoding/base32" "encoding/json" "fmt" - "net/http" "strconv" "strings" api "github.com/bluesky-social/indigo/api" - comatproto "github.com/bluesky-social/indigo/api/atproto" bsky "github.com/bluesky-social/indigo/api/bsky" - "github.com/bluesky-social/indigo/events" - "github.com/bluesky-social/indigo/events/schedulers/autoscaling" - lexutil "github.com/bluesky-social/indigo/lex/util" - "github.com/bluesky-social/indigo/repo" - "github.com/bluesky-social/indigo/repomgr" + "github.com/bluesky-social/indigo/backfill" "github.com/bluesky-social/indigo/util/version" "github.com/bluesky-social/indigo/xrpc" - "github.com/gorilla/websocket" lru "github.com/hashicorp/golang-lru" - "github.com/ipfs/go-cid" flatfs "github.com/ipfs/go-ds-flatfs" blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log" @@ -44,6 +35,9 @@ type Server struct { plc *api.PLCServer echo *echo.Echo + bfs *backfill.Gormstore + bf *backfill.Backfiller + userCache *lru.Cache } @@ -72,6 +66,7 @@ func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) db.AutoMigrate(&PostRef{}) db.AutoMigrate(&User{}) db.AutoMigrate(&LastSeq{}) + db.AutoMigrate(&backfill.GormDBJob{}) // TODO: robust client xc := &xrpc.Client{ @@ -102,193 +97,23 @@ func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) plc: plc, userCache: ucache, } - return s, nil -} - -func (s *Server) getLastCursor() (int64, error) { - var lastSeq LastSeq - if err := s.db.Find(&lastSeq).Error; err != nil { - return 0, err - } - - if lastSeq.ID == 0 { - return 0, s.db.Create(&lastSeq).Error - } - - return lastSeq.Seq, nil -} - -func (s *Server) updateLastCursor(curs int64) error { - return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error -} - -func (s *Server) RunIndexer(ctx context.Context) error { - cur, err := s.getLastCursor() - if err != nil { - return fmt.Errorf("get last cursor: %w", err) - } - - d := websocket.DefaultDialer - con, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", s.bgshost, cur), http.Header{}) - if err != nil { - return fmt.Errorf("events dial failed: %w", err) - } - - rsc := &events.RepoStreamCallbacks{ - RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { - if evt.TooBig && evt.Prev != nil { - log.Errorf("skipping non-genesis too big events for now: %d", evt.Seq) - return nil - } - - if evt.TooBig { - if err := s.processTooBigCommit(ctx, evt); err != nil { - log.Errorf("failed to process tooBig event: %s", err) - return nil - } - - return nil - } - - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) - if err != nil { - log.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err) - return nil - } - - for _, op := range evt.Ops { - ek := repomgr.EventKind(op.Action) - switch ek { - case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: - rc, rec, err := r.GetRecord(ctx, op.Path) - if err != nil { - e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) - log.Error(e) - return nil - } - - if lexutil.LexLink(rc) != *op.Cid { - log.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) - return nil - } - - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { - log.Errorf("failed to handle op: %s", err) - return nil - } - - case repomgr.EvtKindDeleteRecord: - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { - log.Errorf("failed to handle delete: %s", err) - return nil - } - } - } - - return nil - - }, - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { - if err := s.updateUserHandle(ctx, evt.Did, evt.Handle); err != nil { - log.Errorf("failed to update user handle: %s", err) - } - return nil - }, - } - return events.HandleRepoStream( - ctx, con, autoscaling.NewScheduler( - autoscaling.DefaultAutoscaleSettings(), - s.bgshost, - rsc.EventHandler, - ), + bfstore := backfill.NewGormstore(db) + opts := backfill.DefaultBackfillOptions() + bf := backfill.NewBackfiller( + "search", + bfstore, + s.handleCreateOrUpdate, + s.handleCreateOrUpdate, + s.handleDelete, + log.Desugar().Sugar(), + opts, ) -} - -func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec any) error { - if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord { - - log.Infof("handling event(%d): %s - %s", seq, did, path) - u, err := s.getOrCreateUser(ctx, did) - if err != nil { - return fmt.Errorf("checking user: %w", err) - } - switch rec := rec.(type) { - case *bsky.FeedPost: - if err := s.indexPost(ctx, u, rec, path, *rcid); err != nil { - return fmt.Errorf("indexing post: %w", err) - } - case *bsky.ActorProfile: - if err := s.indexProfile(ctx, u, rec); err != nil { - return fmt.Errorf("indexing profile: %w", err) - } - default: - } - - } else if op == repomgr.EvtKindDeleteRecord { - u, err := s.getOrCreateUser(ctx, did) - if err != nil { - return err - } - - switch { - // TODO: handle profile deletes, its an edge case, but worth doing still - case strings.Contains(path, "app.bsky.feed.post"): - if err := s.deletePost(ctx, u, path); err != nil { - return err - } - } - - } - - if seq%50 == 0 { - if err := s.updateLastCursor(seq); err != nil { - log.Error("Failed to update cursor: ", err) - } - } - - return nil -} - -func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { - repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "", evt.Commit.String()) - if err != nil { - return err - } - - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata)) - if err != nil { - return err - } - - u, err := s.getOrCreateUser(ctx, evt.Repo) - if err != nil { - return err - } - return r.ForEach(ctx, "", func(k string, v cid.Cid) error { - if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") { - rcid, rec, err := r.GetRecord(ctx, k) - if err != nil { - log.Errorf("failed to get record from repo checkout: %s", err) - return nil - } - - switch rec := rec.(type) { - case *bsky.FeedPost: - if err := s.indexPost(ctx, u, rec, k, rcid); err != nil { - return fmt.Errorf("indexing post: %w", err) - } - case *bsky.ActorProfile: - if err := s.indexProfile(ctx, u, rec); err != nil { - return fmt.Errorf("indexing profile: %w", err) - } - default: - } + s.bfs = bfstore + s.bf = bf - } - return nil - }) + return s, nil } func (s *Server) SearchPosts(ctx context.Context, srch string, offset, size int) ([]PostSearchResult, error) { From 106cc8013bc832c5d3a811b125ba1a3095e14da3 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Thu, 31 Aug 2023 22:02:33 +0000 Subject: [PATCH 2/6] Refactor backfill to use slog package --- backfill/backfill.go | 54 ++++++++++++++++++++------------------- backfill/backfill_test.go | 24 +++++------------ search/server.go | 1 - 3 files changed, 35 insertions(+), 44 deletions(-) diff --git a/backfill/backfill.go b/backfill/backfill.go index e01bee474..96aea0a50 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "net/http" "sync" "time" @@ -16,7 +17,6 @@ import ( typegen "github.com/whyrusleeping/cbor-gen" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" - "go.uber.org/zap" "golang.org/x/time/rate" ) @@ -61,7 +61,6 @@ type Backfiller struct { NSIDFilter string CheckoutPath string - Logger *zap.SugaredLogger syncLimiter *rate.Limiter magicHeaderKey string @@ -112,7 +111,6 @@ func NewBackfiller( handleCreate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, handleUpdate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, handleDelete func(ctx context.Context, repo string, path string) error, - logger *zap.SugaredLogger, opts *BackfillOptions, ) *Backfiller { if opts == nil { @@ -127,7 +125,6 @@ func NewBackfiller( ParallelBackfills: opts.ParallelBackfills, ParallelRecordCreates: opts.ParallelRecordCreates, NSIDFilter: opts.NSIDFilter, - Logger: logger, syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), 1), CheckoutPath: opts.CheckoutPath, stop: make(chan chan struct{}), @@ -138,7 +135,7 @@ func NewBackfiller( func (b *Backfiller) Start() { ctx := context.Background() - log := b.Logger.With("source", "backfiller_main") + log := slog.With(slog.String("source", "backfiller"), slog.String("name", b.Name)) log.Info("starting backfill processor") sem := make(chan struct{}, b.ParallelBackfills) @@ -155,7 +152,7 @@ func (b *Backfiller) Start() { // Get the next job job, err := b.Store.GetNextEnqueuedJob(ctx) if err != nil { - log.Errorf("failed to get next backfill: %+v", err) + log.Error(fmt.Sprintf("failed to get next backfill: %+v", err)) time.Sleep(1 * time.Second) continue } else if job == nil { @@ -166,7 +163,7 @@ func (b *Backfiller) Start() { // Mark the backfill as "in progress" err = job.SetState(ctx, StateInProgress) if err != nil { - log.Errorf("failed to set backfill state: %+v", err) + log.Error(fmt.Sprintf("failed to set backfill state: %+v", err)) continue } @@ -181,18 +178,19 @@ func (b *Backfiller) Start() { // Stop stops the backfill processor func (b *Backfiller) Stop() { - b.Logger.Info("stopping backfill processor") + log := slog.With(slog.String("source", "backfiller"), slog.String("name", b.Name)) + log.Info("stopping backfill processor") stopped := make(chan struct{}) b.stop <- stopped <-stopped - b.Logger.Info("backfill processor stopped") + log.Info("backfill processor stopped") } // FlushBuffer processes buffered operations for a job func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { ctx, span := tracer.Start(ctx, "FlushBuffer") defer span.End() - log := b.Logger.With("source", "backfiller_buffer_flush", "repo", job.Repo()) + log := slog.With(slog.String("source", "backfiller_buffer_flush"), slog.String("repo", job.Repo())) processed := 0 @@ -205,17 +203,17 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { case repomgr.EvtKindCreateRecord: err := b.HandleCreateRecord(ctx, repo, path, rec, cid) if err != nil { - log.Errorf("failed to handle create record: %+v", err) + log.Error(fmt.Sprintf("failed to handle create record: %+v", err)) } case repomgr.EvtKindUpdateRecord: err := b.HandleUpdateRecord(ctx, repo, path, rec, cid) if err != nil { - log.Errorf("failed to handle update record: %+v", err) + log.Error(fmt.Sprintf("failed to handle update record: %+v", err)) } case repomgr.EvtKindDeleteRecord: err := b.HandleDeleteRecord(ctx, repo, path) if err != nil { - log.Errorf("failed to handle delete record: %+v", err) + log.Error(fmt.Sprintf("failed to handle delete record: %+v", err)) } } backfillOpsBuffered.WithLabelValues(b.Name).Dec() @@ -223,7 +221,7 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { return nil }) if err != nil { - log.Errorf("failed to flush buffered ops: %+v", err) + log.Error(fmt.Sprintf("failed to flush buffered ops: %+v", err)) } return processed @@ -248,8 +246,8 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { repoDid := job.Repo() - log := b.Logger.With("source", "backfiller_backfill_repo", "repo", repoDid) - log.Infof("processing backfill for %s", repoDid) + log := slog.With(slog.String("source", "backfiller_backfill_repo"), slog.String("repo", repoDid)) + log.Info(fmt.Sprintf("processing backfill for %s", repoDid)) var url = fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) @@ -260,7 +258,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { } req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { - log.Errorf("Error creating request: %v", err) + log.Error(fmt.Sprintf("Error creating request: %v", err)) return } @@ -274,12 +272,12 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { resp, err := client.Do(req) if err != nil { - log.Errorf("Error sending request: %v", err) + log.Error(fmt.Sprintf("Error sending request: %v", err)) return } if resp.StatusCode != http.StatusOK { - log.Errorf("Error response: %v", resp.StatusCode) + log.Info(fmt.Sprintf("Error response: %v", resp.StatusCode)) reason := "unknown error" if resp.StatusCode == http.StatusBadRequest { reason = "repo not found" @@ -289,13 +287,13 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { // Mark the job as "failed" err := job.SetState(ctx, state) if err != nil { - log.Errorf("failed to set job state: %+v", err) + log.Error(fmt.Sprintf("failed to set job state: %+v", err)) } // Clear buffered ops err = job.ClearBufferedOps(ctx) if err != nil { - log.Errorf("failed to clear buffered ops: %+v", err) + log.Error(fmt.Sprintf("failed to clear buffered ops: %+v", err)) } return } @@ -309,20 +307,20 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { r, err := repo.ReadRepoFromCar(ctx, instrumentedReader) if err != nil { - log.Errorf("Error reading repo: %v", err) + log.Error(fmt.Sprintf("Error reading repo: %v", err)) state := "failed (couldn't read repo CAR from response body)" // Mark the job as "failed" err := job.SetState(ctx, state) if err != nil { - log.Errorf("failed to set job state: %+v", err) + log.Error(fmt.Sprintf("failed to set job state: %+v", err)) } // Clear buffered ops err = job.ClearBufferedOps(ctx) if err != nil { - log.Errorf("failed to clear buffered ops: %+v", err) + log.Error(fmt.Sprintf("failed to clear buffered ops: %+v", err)) } return } @@ -381,7 +379,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { defer resultWG.Done() for result := range recordResults { if result.err != nil { - log.Errorf("Error processing record %s: %v", result.recordPath, result.err) + log.Error(fmt.Sprintf("Error processing record %s: %v", result.recordPath, result.err)) } } }() @@ -393,5 +391,9 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { // Process buffered operations, marking the job as "complete" when done numProcessed := b.FlushBuffer(ctx, job) - log.Infow("backfill complete", "buffered_records_processed", numProcessed, "records_backfilled", numRecords, "duration", time.Since(start)) + log.Info("backfill complete", + slog.Int("buffered_records_processed", numProcessed), + slog.Int("records_backfilled", numRecords), + slog.Duration("duration", time.Since(start)), + ) } diff --git a/backfill/backfill_test.go b/backfill/backfill_test.go index b675d2d47..62e6b21f6 100644 --- a/backfill/backfill_test.go +++ b/backfill/backfill_test.go @@ -2,6 +2,8 @@ package backfill_test import ( "context" + "fmt" + "log/slog" "sync" "testing" "time" @@ -9,11 +11,8 @@ import ( "github.com/bluesky-social/indigo/backfill" "github.com/ipfs/go-cid" typegen "github.com/whyrusleeping/cbor-gen" - "go.uber.org/zap" ) -var logger *zap.SugaredLogger - type testState struct { creates int updates int @@ -31,14 +30,6 @@ func TestBackfill(t *testing.T) { } mem := backfill.NewMemstore() - - rawLog, err := zap.NewDevelopment() - if err != nil { - t.Fatal(err) - } - - logger = rawLog.Sugar() - ts := &testState{} opts := backfill.DefaultBackfillOptions() @@ -50,11 +41,10 @@ func TestBackfill(t *testing.T) { ts.handleCreate, ts.handleUpdate, ts.handleDelete, - logger, opts, ) - logger.Info("starting backfiller") + slog.Info("starting backfiller") go bf.Start() @@ -96,11 +86,11 @@ func TestBackfill(t *testing.T) { bf.Stop() - logger.Infof("shutting down") + slog.Info("shutting down") } func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { - logger.Infof("got create: %s %s", repo, path) + slog.Info(fmt.Sprintf("got create: %s %s", repo, path)) ts.lk.Lock() ts.creates++ ts.lk.Unlock() @@ -108,7 +98,7 @@ func (ts *testState) handleCreate(ctx context.Context, repo string, path string, } func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { - logger.Infof("got update: %s %s", repo, path) + slog.Info(fmt.Sprintf("got update: %s %s", repo, path)) ts.lk.Lock() ts.updates++ ts.lk.Unlock() @@ -116,7 +106,7 @@ func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, } func (ts *testState) handleDelete(ctx context.Context, repo string, path string) error { - logger.Infof("got delete: %s %s", repo, path) + slog.Info(fmt.Sprintf("got delete: %s %s", repo, path)) ts.lk.Lock() ts.deletes++ ts.lk.Unlock() diff --git a/search/server.go b/search/server.go index 0cfb2f26e..063dec4bd 100644 --- a/search/server.go +++ b/search/server.go @@ -106,7 +106,6 @@ func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) s.handleCreateOrUpdate, s.handleCreateOrUpdate, s.handleDelete, - log.Desugar().Sugar(), opts, ) From 3123df384deb9cbfed182e94c55ab9cf3b4a0149 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Thu, 31 Aug 2023 22:40:01 +0000 Subject: [PATCH 3/6] Cleaner logging usage --- backfill/backfill.go | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/backfill/backfill.go b/backfill/backfill.go index 96aea0a50..bd7690d3e 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -135,7 +135,7 @@ func NewBackfiller( func (b *Backfiller) Start() { ctx := context.Background() - log := slog.With(slog.String("source", "backfiller"), slog.String("name", b.Name)) + log := slog.With("source", "backfiller", "name", b.Name) log.Info("starting backfill processor") sem := make(chan struct{}, b.ParallelBackfills) @@ -152,7 +152,7 @@ func (b *Backfiller) Start() { // Get the next job job, err := b.Store.GetNextEnqueuedJob(ctx) if err != nil { - log.Error(fmt.Sprintf("failed to get next backfill: %+v", err)) + log.Error("failed to get next enqueued job", "error", err) time.Sleep(1 * time.Second) continue } else if job == nil { @@ -163,7 +163,7 @@ func (b *Backfiller) Start() { // Mark the backfill as "in progress" err = job.SetState(ctx, StateInProgress) if err != nil { - log.Error(fmt.Sprintf("failed to set backfill state: %+v", err)) + log.Error("failed to set job state", "error", err) continue } @@ -178,7 +178,7 @@ func (b *Backfiller) Start() { // Stop stops the backfill processor func (b *Backfiller) Stop() { - log := slog.With(slog.String("source", "backfiller"), slog.String("name", b.Name)) + log := slog.With("source", "backfiller", "name", b.Name) log.Info("stopping backfill processor") stopped := make(chan struct{}) b.stop <- stopped @@ -190,7 +190,7 @@ func (b *Backfiller) Stop() { func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { ctx, span := tracer.Start(ctx, "FlushBuffer") defer span.End() - log := slog.With(slog.String("source", "backfiller_buffer_flush"), slog.String("repo", job.Repo())) + log := slog.With("source", "backfiller_buffer_flush", "repo", job.Repo()) processed := 0 @@ -203,17 +203,17 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { case repomgr.EvtKindCreateRecord: err := b.HandleCreateRecord(ctx, repo, path, rec, cid) if err != nil { - log.Error(fmt.Sprintf("failed to handle create record: %+v", err)) + log.Error("failed to handle create record", "error", err) } case repomgr.EvtKindUpdateRecord: err := b.HandleUpdateRecord(ctx, repo, path, rec, cid) if err != nil { - log.Error(fmt.Sprintf("failed to handle update record: %+v", err)) + log.Error("failed to handle update record", "error", err) } case repomgr.EvtKindDeleteRecord: err := b.HandleDeleteRecord(ctx, repo, path) if err != nil { - log.Error(fmt.Sprintf("failed to handle delete record: %+v", err)) + log.Error("failed to handle delete record", "error", err) } } backfillOpsBuffered.WithLabelValues(b.Name).Dec() @@ -221,7 +221,7 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { return nil }) if err != nil { - log.Error(fmt.Sprintf("failed to flush buffered ops: %+v", err)) + log.Error("failed to flush buffered ops", "error", err) } return processed @@ -246,7 +246,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { repoDid := job.Repo() - log := slog.With(slog.String("source", "backfiller_backfill_repo"), slog.String("repo", repoDid)) + log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid) log.Info(fmt.Sprintf("processing backfill for %s", repoDid)) var url = fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) @@ -258,7 +258,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { } req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { - log.Error(fmt.Sprintf("Error creating request: %v", err)) + log.Error("failed to create request", "error", err) return } @@ -272,12 +272,12 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { resp, err := client.Do(req) if err != nil { - log.Error(fmt.Sprintf("Error sending request: %v", err)) + log.Error("failed to send request", "error", err) return } if resp.StatusCode != http.StatusOK { - log.Info(fmt.Sprintf("Error response: %v", resp.StatusCode)) + log.Info("failed to get repo", "status", resp.StatusCode) reason := "unknown error" if resp.StatusCode == http.StatusBadRequest { reason = "repo not found" @@ -287,13 +287,13 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { // Mark the job as "failed" err := job.SetState(ctx, state) if err != nil { - log.Error(fmt.Sprintf("failed to set job state: %+v", err)) + log.Error("failed to set job state", "error", err) } // Clear buffered ops err = job.ClearBufferedOps(ctx) if err != nil { - log.Error(fmt.Sprintf("failed to clear buffered ops: %+v", err)) + log.Error("failed to clear buffered ops", "error", err) } return } @@ -307,20 +307,20 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { r, err := repo.ReadRepoFromCar(ctx, instrumentedReader) if err != nil { - log.Error(fmt.Sprintf("Error reading repo: %v", err)) + log.Error("failed to read repo from car", "error", err) state := "failed (couldn't read repo CAR from response body)" // Mark the job as "failed" err := job.SetState(ctx, state) if err != nil { - log.Error(fmt.Sprintf("failed to set job state: %+v", err)) + log.Error("failed to set job state", "error", err) } // Clear buffered ops err = job.ClearBufferedOps(ctx) if err != nil { - log.Error(fmt.Sprintf("failed to clear buffered ops: %+v", err)) + log.Error("failed to clear buffered ops", "error", err) } return } @@ -379,7 +379,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { defer resultWG.Done() for result := range recordResults { if result.err != nil { - log.Error(fmt.Sprintf("Error processing record %s: %v", result.recordPath, result.err)) + log.Error("Error processing record", "record", result.recordPath, "error", result.err) } } }() @@ -392,8 +392,8 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { numProcessed := b.FlushBuffer(ctx, job) log.Info("backfill complete", - slog.Int("buffered_records_processed", numProcessed), - slog.Int("records_backfilled", numRecords), - slog.Duration("duration", time.Since(start)), + "buffered_records_processed", numProcessed, + "records_backfilled", numRecords, + "duration", time.Since(start), ) } From 771f03657d22bf6f368ad007185854661043d628 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Thu, 31 Aug 2023 22:41:47 +0000 Subject: [PATCH 4/6] Use new logging in test too --- backfill/backfill_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/backfill/backfill_test.go b/backfill/backfill_test.go index 62e6b21f6..4c7f7f4b7 100644 --- a/backfill/backfill_test.go +++ b/backfill/backfill_test.go @@ -2,7 +2,6 @@ package backfill_test import ( "context" - "fmt" "log/slog" "sync" "testing" @@ -90,7 +89,7 @@ func TestBackfill(t *testing.T) { } func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { - slog.Info(fmt.Sprintf("got create: %s %s", repo, path)) + slog.Info("got create", "repo", repo, "path", path) ts.lk.Lock() ts.creates++ ts.lk.Unlock() @@ -98,7 +97,7 @@ func (ts *testState) handleCreate(ctx context.Context, repo string, path string, } func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { - slog.Info(fmt.Sprintf("got update: %s %s", repo, path)) + slog.Info("got update", "repo", repo, "path", path) ts.lk.Lock() ts.updates++ ts.lk.Unlock() @@ -106,7 +105,7 @@ func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, } func (ts *testState) handleDelete(ctx context.Context, repo string, path string) error { - slog.Info(fmt.Sprintf("got delete: %s %s", repo, path)) + slog.Info("got delete", "repo", repo, "path", path) ts.lk.Lock() ts.deletes++ ts.lk.Unlock() From 69372516ddcf7003b3279bbb47c0e6681b13e2c6 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Tue, 5 Sep 2023 20:19:04 +0000 Subject: [PATCH 5/6] Clean up repo traversal --- backfill/backfill.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/backfill/backfill.go b/backfill/backfill.go index bd7690d3e..24404a6d6 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -11,6 +11,7 @@ import ( // Blank import to register types for CBORGEN _ "github.com/bluesky-social/indigo/api/bsky" + lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/repo" "github.com/bluesky-social/indigo/repomgr" "github.com/ipfs/go-cid" @@ -249,7 +250,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid) log.Info(fmt.Sprintf("processing backfill for %s", repoDid)) - var url = fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) + url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) // GET and CAR decode the body client := &http.Client{ @@ -348,19 +349,24 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { go func() { defer wg.Done() for item := range recordQueue { - recordCid, rec, err := r.GetRecord(ctx, item.recordPath) + blk, err := r.Blockstore().Get(ctx, item.nodeCid) if err != nil { - recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get record: %w", err)} + recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get blocks for record: %w", err)} + continue + } + rec, err := lexutil.CborDecodeValue(blk.RawData()) + if err != nil { + recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to decode record: %w", err)} continue } - // Verify that the record cid matches the cid in the event - if recordCid != item.nodeCid { - recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("mismatch in record and op cid: %s != %s", recordCid, item.nodeCid)} + recM, ok := rec.(typegen.CBORMarshaler) + if !ok { + recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to cast record to CBORMarshaler")} continue } - err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, &rec, &recordCid) + err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, &recM, &item.nodeCid) if err != nil { recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} continue From 16fe560ed60fc60856641b675e49e0ab52fd94aa Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Mon, 11 Sep 2023 20:31:05 +0000 Subject: [PATCH 6/6] Start the backfill in a goroutine --- search/firehose.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/search/firehose.go b/search/firehose.go index b59451c14..e1b5a5cb9 100644 --- a/search/firehose.go +++ b/search/firehose.go @@ -47,7 +47,7 @@ func (s *Server) RunIndexer(ctx context.Context) error { if err != nil { return fmt.Errorf("loading backfill jobs: %w", err) } - s.bf.Start() + go s.bf.Start() d := websocket.DefaultDialer con, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", s.bgshost, cur), http.Header{})