diff --git a/backfill/backfill.go b/backfill/backfill.go index 4b93501ed..243868670 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -10,8 +10,8 @@ import ( "sync" "time" - // Blank import to register types for CBORGEN "github.com/bluesky-social/indigo/api/atproto" + // Blank import to register types for CBORGEN _ "github.com/bluesky-social/indigo/api/bsky" lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/repo" @@ -30,6 +30,7 @@ type Job interface { Rev() string SetState(ctx context.Context, state string) error SetRev(ctx context.Context, rev string) error + RetryCount() int BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) // FlushBufferedOps calls the given callback for each buffered operation @@ -90,6 +91,9 @@ var ErrJobComplete = errors.New("job is complete") // ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist var ErrJobNotFound = errors.New("job not found") +// ErrEventGap is returned when an event is received with a since that doesn't match the current rev +var ErrEventGap = fmt.Errorf("buffered event revs did not line up") + var tracer = otel.Tracer("backfiller") type BackfillOptions struct { @@ -266,6 +270,9 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { repoDid := job.Repo() log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid) + if job.RetryCount() > 0 { + log = log.With("retry_count", job.RetryCount()) + } log.Info(fmt.Sprintf("processing backfill for %s", repoDid)) url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) @@ -509,3 +516,10 @@ func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, return j.BufferOps(ctx, since, rev, ops) } + +// MaxRetries is the maximum number of times to retry a backfill job +var MaxRetries = 10 + +func computeExponentialBackoff(attempt int) time.Duration { + return time.Duration(1<<uint(attempt)) * 10 * time.Second +} diff --git a/backfill/gormstore.go b/backfill/gormstore.go index 6f43fc69d..81306be12 100644 --- a/backfill/gormstore.go +++ b/backfill/gormstore.go @@ -3,6 +3,7 @@ package backfill import ( "context" "fmt" + "strings" "sync" "time" @@ -24,13 +25,18 @@ type Gormjob struct { createdAt time.Time updatedAt time.Time + + retryCount int + retryAfter *time.Time } type GormDBJob struct { gorm.Model - Repo string `gorm:"unique;index"` - State string `gorm:"index"` - Rev string + Repo string `gorm:"unique;index"` + State string `gorm:"index"` + Rev string + RetryCount int + RetryAfter *time.Time } // Gormstore is a gorm-backed implementation of the Backfill Store interface @@ -75,6 +81,9 @@ func (s *Gormstore) LoadJobs(ctx context.Context) error { dbj: dbj, db: s.db, + + retryCount: dbj.RetryCount, + retryAfter: dbj.RetryAfter, } s.jobs[dbj.Repo] = j } @@ -169,6 +178,9 @@ func (s *Gormstore) loadJob(ctx context.Context, repo string) (*Gormjob, error) dbj: &dbj, db: s.db, + + retryCount: dbj.RetryCount, + retryAfter: dbj.RetryAfter, } s.lk.Lock() defer s.lk.Unlock() @@ -228,7 +240,9 @@ func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) { defer s.lk.RUnlock() for _, j := range s.jobs { - if j.State() == StateEnqueued { + shouldRetry := strings.HasPrefix(j.State(), "failed") && j.retryAfter != nil && time.Now().After(*j.retryAfter) + + if j.State() == StateEnqueued || shouldRetry { return j, nil } } @@ -268,13 +282,21 @@ func (j *Gormjob) SetState(ctx context.Context, state string) error { j.state = state j.updatedAt = time.Now() + if strings.HasPrefix(state, "failed") { + if j.retryCount < MaxRetries { + next := time.Now().Add(computeExponentialBackoff(j.retryCount)) + j.retryAfter = &next + j.retryCount++ + } else { + j.retryAfter = nil + } + } + // Persist the job to the database j.dbj.State = state return j.db.Save(j.dbj).Error } -var ErrEventGap = fmt.Errorf("buffered event revs did not line up") - func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { // TODO: this will block any events for this repo while this flush is ongoing, is that okay? j.lk.Lock() @@ -320,6 +342,12 @@ func (j *Gormjob) ClearBufferedOps(ctx context.Context) error { return nil } +func (j *Gormjob) RetryCount() int { + j.lk.Lock() + defer j.lk.Unlock() + return j.retryCount +} + func (s *Gormstore) UpdateRev(ctx context.Context, repo, rev string) error { j, err := s.GetJob(ctx, repo) if err != nil { diff --git a/backfill/memstore.go b/backfill/memstore.go index 0614cee96..62cfa3c5d 100644 --- a/backfill/memstore.go +++ b/backfill/memstore.go @@ -203,3 +203,9 @@ func (j *Memjob) ClearBufferedOps(ctx context.Context) error { j.updatedAt = time.Now() return nil } + +func (j *Memjob) RetryCount() int { + j.lk.Lock() + defer j.lk.Unlock() + return 0 +} diff --git a/bgs/bgs.go b/bgs/bgs.go index c9eae0c7e..0da2ff501 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -851,7 +851,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event } // skip the fast path for rebases or if the user is already in the slow path - if bgs.Index.Crawler.RepoInSlowPath(ctx, host, u.ID) { + if bgs.Index.Crawler.RepoInSlowPath(ctx, u.ID) { rebasesCounter.WithLabelValues(host.Host).Add(1) ai, err := bgs.Index.LookupUser(ctx, u.ID) if err != nil {