Skip to content

Commit

Permalink
Retry logic, still need to go update search to use the new patterns
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Nov 28, 2023
1 parent bcff979 commit f86e8dd
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 8 deletions.
16 changes: 15 additions & 1 deletion backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"sync"
"time"

// Blank import to register types for CBORGEN
"github.com/bluesky-social/indigo/api/atproto"
// Blank import to register types for CBORGEN
_ "github.com/bluesky-social/indigo/api/bsky"
lexutil "github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/repo"
Expand All @@ -30,6 +30,7 @@ type Job interface {
Rev() string
SetState(ctx context.Context, state string) error
SetRev(ctx context.Context, rev string) error
RetryCount() int

BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error)
// FlushBufferedOps calls the given callback for each buffered operation
Expand Down Expand Up @@ -90,6 +91,9 @@ var ErrJobComplete = errors.New("job is complete")
// ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist
var ErrJobNotFound = errors.New("job not found")

// ErrEventGap is returned when an event is received with a since that doesn't match the current rev
var ErrEventGap = fmt.Errorf("buffered event revs did not line up")

var tracer = otel.Tracer("backfiller")

type BackfillOptions struct {
Expand Down Expand Up @@ -266,6 +270,9 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
repoDid := job.Repo()

log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid)
if job.RetryCount() > 0 {
log = log.With("retry_count", job.RetryCount())
}
log.Info(fmt.Sprintf("processing backfill for %s", repoDid))

url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid)
Expand Down Expand Up @@ -509,3 +516,10 @@ func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string,

return j.BufferOps(ctx, since, rev, ops)
}

// MaxRetries is the maximum number of times to retry a backfill job
var MaxRetries = 10

func computeExponentialBackoff(attempt int) time.Duration {
return time.Duration(1<<uint(attempt)) * 10 * time.Second
}
40 changes: 34 additions & 6 deletions backfill/gormstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backfill
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -24,13 +25,18 @@ type Gormjob struct {

createdAt time.Time
updatedAt time.Time

retryCount int
retryAfter *time.Time
}

type GormDBJob struct {
gorm.Model
Repo string `gorm:"unique;index"`
State string `gorm:"index"`
Rev string
Repo string `gorm:"unique;index"`
State string `gorm:"index"`
Rev string
RetryCount int
RetryAfter *time.Time
}

// Gormstore is a gorm-backed implementation of the Backfill Store interface
Expand Down Expand Up @@ -75,6 +81,9 @@ func (s *Gormstore) LoadJobs(ctx context.Context) error {

dbj: dbj,
db: s.db,

retryCount: dbj.RetryCount,
retryAfter: dbj.RetryAfter,
}
s.jobs[dbj.Repo] = j
}
Expand Down Expand Up @@ -169,6 +178,9 @@ func (s *Gormstore) loadJob(ctx context.Context, repo string) (*Gormjob, error)

dbj: &dbj,
db: s.db,

retryCount: dbj.RetryCount,
retryAfter: dbj.RetryAfter,
}
s.lk.Lock()
defer s.lk.Unlock()
Expand Down Expand Up @@ -228,7 +240,9 @@ func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) {
defer s.lk.RUnlock()

for _, j := range s.jobs {
if j.State() == StateEnqueued {
shouldRetry := strings.HasPrefix(j.State(), "failed") && j.retryAfter != nil && time.Now().After(*j.retryAfter)

if j.State() == StateEnqueued || shouldRetry {
return j, nil
}
}
Expand Down Expand Up @@ -268,13 +282,21 @@ func (j *Gormjob) SetState(ctx context.Context, state string) error {
j.state = state
j.updatedAt = time.Now()

if strings.HasPrefix(state, "failed") {
if j.retryCount < MaxRetries {
next := time.Now().Add(computeExponentialBackoff(j.retryCount))
j.retryAfter = &next
j.retryCount++
} else {
j.retryAfter = nil
}
}

// Persist the job to the database
j.dbj.State = state
return j.db.Save(j.dbj).Error
}

var ErrEventGap = fmt.Errorf("buffered event revs did not line up")

func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error {
// TODO: this will block any events for this repo while this flush is ongoing, is that okay?
j.lk.Lock()
Expand Down Expand Up @@ -320,6 +342,12 @@ func (j *Gormjob) ClearBufferedOps(ctx context.Context) error {
return nil
}

func (j *Gormjob) RetryCount() int {
j.lk.Lock()
defer j.lk.Unlock()
return j.retryCount
}

func (s *Gormstore) UpdateRev(ctx context.Context, repo, rev string) error {
j, err := s.GetJob(ctx, repo)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions backfill/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,9 @@ func (j *Memjob) ClearBufferedOps(ctx context.Context) error {
j.updatedAt = time.Now()
return nil
}

func (j *Memjob) RetryCount() int {
j.lk.Lock()
defer j.lk.Unlock()
return 0
}
2 changes: 1 addition & 1 deletion bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
}

// skip the fast path for rebases or if the user is already in the slow path
if bgs.Index.Crawler.RepoInSlowPath(ctx, host, u.ID) {
if bgs.Index.Crawler.RepoInSlowPath(ctx, u.ID) {
rebasesCounter.WithLabelValues(host.Host).Add(1)
ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
Expand Down

0 comments on commit f86e8dd

Please sign in to comment.