Skip to content

Commit

Permalink
Pass rev to operation handlers in backfill library (#488)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 authored Dec 18, 2023
2 parents ad730a7 + 17bb48c commit c055813
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 20 deletions.
32 changes: 17 additions & 15 deletions backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Job interface {
// Once done it clears the buffer and marks the job as "complete"
// Allowing the Job interface to abstract away the details of how buffered
// operations are stored and/or locked
FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error
FlushBufferedOps(ctx context.Context, cb func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error

ClearBufferedOps(ctx context.Context) error
}
Expand All @@ -56,9 +56,9 @@ type Store interface {
// Backfiller is a struct which handles backfilling a repo
type Backfiller struct {
Name string
HandleCreateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error
HandleUpdateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error
HandleDeleteRecord func(ctx context.Context, repo string, path string) error
HandleCreateRecord func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error
HandleUpdateRecord func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error
HandleDeleteRecord func(ctx context.Context, repo string, rev string, path string) error
Store Store

// Number of backfills to process in parallel
Expand Down Expand Up @@ -120,9 +120,9 @@ func DefaultBackfillOptions() *BackfillOptions {
func NewBackfiller(
name string,
store Store,
handleCreate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error,
handleUpdate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error,
handleDelete func(ctx context.Context, repo string, path string) error,
handleCreate func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error,
handleUpdate func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error,
handleDelete func(ctx context.Context, repo string, rev string, path string) error,
opts *BackfillOptions,
) *Backfiller {
if opts == nil {
Expand Down Expand Up @@ -210,20 +210,20 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int {

// Flush buffered operations, clear the buffer, and mark the job as "complete"
// Clearning and marking are handled by the job interface
err := job.FlushBufferedOps(ctx, func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error {
err := job.FlushBufferedOps(ctx, func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error {
switch repomgr.EventKind(kind) {
case repomgr.EvtKindCreateRecord:
err := b.HandleCreateRecord(ctx, repo, path, rec, cid)
err := b.HandleCreateRecord(ctx, repo, rev, path, rec, cid)
if err != nil {
log.Error("failed to handle create record", "error", err)
}
case repomgr.EvtKindUpdateRecord:
err := b.HandleUpdateRecord(ctx, repo, path, rec, cid)
err := b.HandleUpdateRecord(ctx, repo, rev, path, rec, cid)
if err != nil {
log.Error("failed to handle update record", "error", err)
}
case repomgr.EvtKindDeleteRecord:
err := b.HandleDeleteRecord(ctx, repo, path)
err := b.HandleDeleteRecord(ctx, repo, rev, path)
if err != nil {
log.Error("failed to handle delete record", "error", err)
}
Expand Down Expand Up @@ -376,6 +376,8 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
}
}()

rev := r.SignedCommit().Rev

// Consumer routines
for i := 0; i < numRoutines; i++ {
wg.Add(1)
Expand All @@ -399,7 +401,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
continue
}

err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, recM, &item.nodeCid)
err = b.HandleCreateRecord(ctx, repoDid, rev, item.recordPath, recM, &item.nodeCid)
if err != nil {
recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)}
continue
Expand Down Expand Up @@ -509,15 +511,15 @@ func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscrib
for _, op := range ops {
switch op.kind {
case "create":
if err := bf.HandleCreateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil {
if err := bf.HandleCreateRecord(ctx, evt.Repo, evt.Rev, op.path, op.rec, op.cid); err != nil {
return fmt.Errorf("create record failed: %w", err)
}
case "update":
if err := bf.HandleUpdateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil {
if err := bf.HandleUpdateRecord(ctx, evt.Repo, evt.Rev, op.path, op.rec, op.cid); err != nil {
return fmt.Errorf("update record failed: %w", err)
}
case "delete":
if err := bf.HandleDeleteRecord(ctx, evt.Repo, op.path); err != nil {
if err := bf.HandleDeleteRecord(ctx, evt.Repo, evt.Rev, op.path); err != nil {
return fmt.Errorf("delete record failed: %w", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions backfill/gormstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (j *Gormjob) SetState(ctx context.Context, state string) error {
return j.db.Save(j.dbj).Error
}

func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error {
func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error {
// TODO: this will block any events for this repo while this flush is ongoing, is that okay?
j.lk.Lock()
defer j.lk.Unlock()
Expand All @@ -325,7 +325,7 @@ func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path strin
}

for _, op := range opset.ops {
if err := fn(op.kind, op.path, op.rec, op.cid); err != nil {
if err := fn(op.kind, opset.rev, op.path, op.rec, op.cid); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion backfill/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (j *Memjob) SetRev(ctx context.Context, rev string) error {
return nil
}

func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error {
func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error {
panic("TODO: copy what we end up doing from the gormstore")
/*
j.lk.Lock()
Expand Down
4 changes: 2 additions & 2 deletions search/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (s *Server) discoverRepos() {
log.Info("finished repo discovery", "totalJobs", total, "totalErrored", totalErrored)
}

func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, rec typegen.CBORMarshaler, rcid *cid.Cid) error {
func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, rev string, path string, rec typegen.CBORMarshaler, rcid *cid.Cid) error {
// Since this gets called in a backfill job, we need to check if the path is a post or profile
if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
return nil
Expand Down Expand Up @@ -211,7 +211,7 @@ func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path s
return nil
}

func (s *Server) handleDelete(ctx context.Context, rawDID, path string) error {
func (s *Server) handleDelete(ctx context.Context, rawDID, rev, path string) error {
// Since this gets called in a backfill job, we need to check if the path is a post or profile
if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
return nil
Expand Down

0 comments on commit c055813

Please sign in to comment.