From ecbdd5098dedbac101657a2b60ba15de1e1f6fcc Mon Sep 17 00:00:00 2001 From: Leo Balduf Date: Thu, 21 Nov 2024 14:54:35 +0100 Subject: [PATCH] events: document EventPersistence.Playback behavior for since parameter, fix PebblePersist accordingly --- events/pebblepersist.go | 3 ++- events/persist.go | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/events/pebblepersist.go b/events/pebblepersist.go index 2c1c787e5..0ace1c1b8 100644 --- a/events/pebblepersist.go +++ b/events/pebblepersist.go @@ -118,7 +118,8 @@ func eventFromPebbleIter(iter *pebble.Iterator) (*XRPCStreamEvent, error) { func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { var key [8]byte - binary.BigEndian.PutUint64(key[:], uint64(since)) + // Add 1 to the cursor because IterOptions.LowerBound is inclusive. + binary.BigEndian.PutUint64(key[:], uint64(since+1)) iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{LowerBound: key[:]}) if err != nil { diff --git a/events/persist.go b/events/persist.go index 670fdc9aa..9e94be954 100644 --- a/events/persist.go +++ b/events/persist.go @@ -11,6 +11,8 @@ import ( // Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelLabels type EventPersistence interface { Persist(ctx context.Context, e *XRPCStreamEvent) error + + // Playback calls cb on persisted events with sequence number > since. Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error TakeDownRepo(ctx context.Context, usr models.Uid) error Flush(context.Context) error