diff --git a/events/pebblepersist.go b/events/pebblepersist.go index 2c1c787e5..0ace1c1b8 100644 --- a/events/pebblepersist.go +++ b/events/pebblepersist.go @@ -118,7 +118,8 @@ func eventFromPebbleIter(iter *pebble.Iterator) (*XRPCStreamEvent, error) { func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { var key [8]byte - binary.BigEndian.PutUint64(key[:], uint64(since)) + // Add 1 to the cursor because IterOptions.LowerBound is inclusive. + binary.BigEndian.PutUint64(key[:], uint64(since+1)) iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{LowerBound: key[:]}) if err != nil { diff --git a/events/persist.go b/events/persist.go index 670fdc9aa..9e94be954 100644 --- a/events/persist.go +++ b/events/persist.go @@ -11,6 +11,8 @@ import ( // Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelLabels type EventPersistence interface { Persist(ctx context.Context, e *XRPCStreamEvent) error + + // Playback calls cb on persisted events with sequence number > since. Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error TakeDownRepo(ctx context.Context, usr models.Uid) error Flush(context.Context) error