diff --git a/cmd/jetstream/server.go b/cmd/jetstream/server.go index 39fa0e4..8912377 100644 --- a/cmd/jetstream/server.go +++ b/cmd/jetstream/server.go @@ -90,6 +90,8 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes [] getJSONEvent := func() []byte { return asJSON } getCompressedEvent := func() []byte { return compBytes } + // Concurrently emit to all subscribers + // We can't move on until all subscribers have received the event or been dropped for being too slow sem := semaphore.NewWeighted(maxConcurrentEmits) for _, sub := range s.Subscribers { if err := sem.Acquire(ctx, 1); err != nil { @@ -106,7 +108,7 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes [] return } - // Pick the event valuer for the subscriber + // Pick the event valuer for the subscriber based on their compression preference getEventBytes := getJSONEvent if sub.compress { getEventBytes = getCompressedEvent diff --git a/pkg/consumer/consumer.go b/pkg/consumer/consumer.go index 37ad452..2d7b217 100644 --- a/pkg/consumer/consumer.go +++ b/pkg/consumer/consumer.go @@ -343,19 +343,24 @@ func (c *Consumer) RunSequencer(ctx context.Context) error { e.TimeUS = c.clock.Now() c.sequenced.Inc() - // Encode the event in JSON and compress it + // Serialize the event as JSON asJSON, err := json.Marshal(e) if err != nil { log.Error("failed to marshal event", "error", err) return } + + // Compress the serialized JSON using zstd compBytes := c.encoder.EncodeAll(asJSON, nil) + // Persist the event to the uncompressed and compressed DBs if err := c.PersistEvent(ctx, e, asJSON, compBytes); err != nil { log.Error("failed to persist event", "error", err) return } c.persisted.Inc() + + // Emit the event to subscribers if err := c.Emit(ctx, e, asJSON, compBytes); err != nil { log.Error("failed to emit event", "error", err) } diff --git a/pkg/consumer/persist.go b/pkg/consumer/persist.go index 12afe4c..4d2bbb6 100644 --- a/pkg/consumer/persist.go +++ b/pkg/consumer/persist.go @@ -100,14 +100,14 @@ func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event, asJSON, key = []byte(fmt.Sprintf("%d_%s", evt.TimeUS, evt.Did)) } - // Write the event to the uncompressed DB + // Write the uncompressed event to the uncompressed DB err := c.UncompressedDB.Set(key, asJSON, pebble.NoSync) if err != nil { log.Error("failed to write event to pebble", "error", err) return fmt.Errorf("failed to write event to pebble: %w", err) } - // Compress the event and write it to the compressed DB + // Write the compressed event to the compressed DB err = c.CompressedDB.Set(key, compBytes, pebble.NoSync) if err != nil { log.Error("failed to write compressed event to pebble", "error", err)