From 9a01159560dd503db10bb4c4f6ca052fafdd54fb Mon Sep 17 00:00:00 2001 From: Jaz Date: Sun, 22 Sep 2024 23:11:53 -0700 Subject: [PATCH] Add context for valuer interfaces --- cmd/jetstream/server.go | 4 ++++ pkg/consumer/persist.go | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/jetstream/server.go b/cmd/jetstream/server.go index aac0ad3..39fa0e4 100644 --- a/cmd/jetstream/server.go +++ b/cmd/jetstream/server.go @@ -86,6 +86,7 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes [] collection = e.Commit.Collection } + // Wrap the valuer functions for more lightweight event filtering getJSONEvent := func() []byte { return asJSON } getCompressedEvent := func() []byte { return compBytes } @@ -125,6 +126,9 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes [] return nil } +// emitToSubscriber sends an event to a subscriber if the subscriber wants the event +// It takes a valuer function to get the event bytes so that the caller can avoid +// unnecessary allocations and/or reading from the playback DB if the subscriber doesn't want the event func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, timeUS int64, did, collection string, playback bool, getEventBytes func() []byte) error { if !sub.WantsCollection(collection) { return nil diff --git a/pkg/consumer/persist.go b/pkg/consumer/persist.go index 1908d7d..12afe4c 100644 --- a/pkg/consumer/persist.go +++ b/pkg/consumer/persist.go @@ -207,7 +207,8 @@ func (c *Consumer) ReplayEvents(ctx context.Context, compressed bool, cursor int collection = parts[2] } - // Emit the event + // Emit the event with the valuer function so the subscriber can decide if it wants to filter it out + // without having to read the entire event from the database err = emit(ctx, timeUS, parts[1], collection, iter.Value) if err != nil { log.Error("failed to emit event", "error", err)