diff --git a/.gitignore b/.gitignore index a52e16a..16be990 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ go.work .env data/ +training/ diff --git a/README.md b/README.md index c5548ba..be0fd57 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,16 @@ The following Query Parameters are supported: - An absent cursor or a cursor from the future will result in live-tail operation - When reconnecting, use the `time_us` from your most recently processed event and maybe provide a negative buffer (i.e. subtract a few seconds) to ensure gapless playback +### Compression + +Jetstream supports `zstd`-based compression of messages. Jetstream uses a custom dictionary for compression that can be found in `pkg/models/zstd_dictionary` and is required to decode compressed messages from the server. + +`zstd` compressed Jetstream messages are ~56% smaller on average than the raw JSON version of the Jetstream firehose. + +The provided client library uses compression by default, using an embedded copy of the Dictionary from the `models` package. + +### Examples + A simple example that hits the public instance looks like: ```bash diff --git a/cmd/client/main.go b/cmd/client/main.go index 2848800..b8aa480 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -2,12 +2,14 @@ package main import ( "context" + "encoding/json" "fmt" "log" "log/slog" "os" "time" + apibsky "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" @@ -27,6 +29,7 @@ func main() { config := client.DefaultClientConfig() config.WebsocketURL = serverAddr + config.Compress = true h := &handler{ seenSeqs: make(map[int64]struct{}), @@ -39,7 +42,21 @@ func main() { log.Fatalf("failed to create client: %v", err) } - cursor := time.Now().Add(1 * -time.Hour).UnixMicro() + cursor := time.Now().Add(5 * -time.Hour).UnixMicro() + + // Every 5 seconds print the events read and bytes read and average event size + go func() { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ticker.C: + eventsRead := c.EventsRead.Load() + bytesRead := c.BytesRead.Load() + avgEventSize := bytesRead / eventsRead + logger.Info("stats", "events_read", eventsRead, "bytes_read", bytesRead, "avg_event_size", avgEventSize) + } + } + }() if err := c.ConnectAndRead(ctx, &cursor); err != nil { log.Fatalf("failed to connect: %v", err) @@ -54,19 +71,19 @@ type handler struct { } func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { - fmt.Println("evt") + // fmt.Println("evt") // Unmarshal the record if there is one - // if event.Commit != nil && (event.Commit.OpType == models.CommitCreateRecord || event.Commit.OpType == models.CommitUpdateRecord) { - // switch event.Commit.Collection { - // case "app.bsky.feed.post": - // var post apibsky.FeedPost - // if err := json.Unmarshal(event.Commit.Record, &post); err != nil { - // return fmt.Errorf("failed to unmarshal post: %w", err) - // } - // // fmt.Printf("%v |(%s)| %s\n", time.UnixMicro(event.TimeUS).Local().Format("15:04:05"), event.Did, post.Text) - // } - // } + if event.Commit != nil && (event.Commit.OpType == models.CommitCreateRecord || event.Commit.OpType == models.CommitUpdateRecord) { + switch event.Commit.Collection { + case "app.bsky.feed.post": + var post apibsky.FeedPost + if err := json.Unmarshal(event.Commit.Record, &post); err != nil { + return fmt.Errorf("failed to unmarshal post: %w", err) + } + // fmt.Printf("%v |(%s)| %s\n", time.UnixMicro(event.TimeUS).Local().Format("15:04:05"), event.Did, post.Text) + } + } return nil } diff --git a/cmd/jetstream/main.go b/cmd/jetstream/main.go index 3e8cc0c..e9ac443 100644 --- a/cmd/jetstream/main.go +++ b/cmd/jetstream/main.go @@ -69,6 +69,12 @@ func main() { Value: "./data", EnvVars: []string{"JETSTREAM_DATA_DIR"}, }, + &cli.StringFlag{ + Name: "zstd-dictionary-path", + Usage: "path to the zstd dictionary file", + EnvVars: []string{"JETSTREAM_ZSTD_DICTIONARY_PATH"}, + Required: false, + }, &cli.DurationFlag{ Name: "event-ttl", Usage: "time to live for events", @@ -342,7 +348,7 @@ func Jetstream(cctx *cli.Context) error { c.Shutdown() - err = c.DB.Close() + err = c.UncompressedDB.Close() if err != nil { log.Error("failed to close pebble db", "error", err) } diff --git a/cmd/jetstream/server.go b/cmd/jetstream/server.go index 37c660d..774c860 100644 --- a/cmd/jetstream/server.go +++ b/cmd/jetstream/server.go @@ -14,7 +14,6 @@ import ( "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/jetstream/pkg/consumer" "github.com/bluesky-social/jetstream/pkg/models" - "github.com/goccy/go-json" "github.com/gorilla/websocket" "github.com/labstack/echo/v4" "github.com/prometheus/client_golang/prometheus" @@ -39,6 +38,7 @@ type Subscriber struct { id int64 cLk sync.Mutex cursor *int64 + compress bool deliveredCounter prometheus.Counter bytesCounter prometheus.Counter // wantedCollections is nil if the subscriber wants all collections @@ -68,7 +68,7 @@ func NewServer(maxSubRate float64) (*Server, error) { var maxConcurrentEmits = int64(100) var cutoverThresholdUS = int64(1_000_000) -func (s *Server) Emit(ctx context.Context, e models.Event) error { +func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes []byte) error { ctx, span := tracer.Start(ctx, "Emit") defer span.End() @@ -78,14 +78,7 @@ func (s *Server) Emit(ctx context.Context, e models.Event) error { defer s.lk.RUnlock() eventsEmitted.Inc() - - b, err := json.Marshal(e) - if err != nil { - log.Error("failed to marshal event", "error", err) - return fmt.Errorf("failed to marshal event: %w", err) - } - - evtSize := float64(len(b)) + evtSize := float64(len(asJSON)) bytesEmitted.Add(evtSize) collection := "" @@ -93,7 +86,8 @@ func (s *Server) Emit(ctx context.Context, e models.Event) error { collection = e.Commit.Collection } - getEncodedEvent := func() []byte { return b } + getJSONEvent := func() []byte { return asJSON } + getCompressedEvent := func() []byte { return compBytes } sem := semaphore.NewWeighted(maxConcurrentEmits) for _, sub := range s.Subscribers { @@ -110,7 +104,14 @@ func (s *Server) Emit(ctx context.Context, e models.Event) error { if sub.cursor != nil && sub.seq < e.TimeUS-cutoverThresholdUS { return } - emitToSubscriber(ctx, log, sub, e.TimeUS, e.Did, collection, false, getEncodedEvent) + + // Pick the event valuer for the subscriber + getEventBytes := getJSONEvent + if sub.compress { + getEventBytes = getCompressedEvent + } + + emitToSubscriber(ctx, log, sub, e.TimeUS, e.Did, collection, false, getEventBytes) }(sub) } @@ -124,7 +125,7 @@ func (s *Server) Emit(ctx context.Context, e models.Event) error { return nil } -func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, timeUS int64, did, collection string, playback bool, getEncodedEvent func() []byte) error { +func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, timeUS int64, did, collection string, playback bool, getEventBytes func() []byte) error { if !sub.WantsCollection(collection) { return nil } @@ -140,7 +141,7 @@ func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, ti return nil } - evtBytes := getEncodedEvent() + evtBytes := getEventBytes() if playback { // Copy the event bytes so the playback iterator can reuse the buffer evtBytes = append([]byte{}, evtBytes...) @@ -191,7 +192,7 @@ func (s *Server) GetSeq() int64 { return s.seq } -func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, wantedCollectionPrefixes []string, wantedCollections []string, wantedDids []string, cursor *int64) *Subscriber { +func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, compress bool, wantedCollectionPrefixes []string, wantedCollections []string, wantedDids []string, cursor *int64) (*Subscriber, error) { s.lk.Lock() defer s.lk.Unlock() @@ -227,6 +228,7 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, wantedCollecti wantedCollections: wantedCol, wantedDids: didMap, cursor: cursor, + compress: compress, deliveredCounter: eventsDelivered.WithLabelValues(realIP), bytesCounter: bytesDelivered.WithLabelValues(realIP), rl: rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate)), @@ -242,9 +244,11 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, wantedCollecti "id", sub.id, "wantedCollections", wantedCol, "wantedDids", wantedDids, + "cursor", cursor, + "compress", compress, ) - return &sub + return &sub, nil } func (s *Server) RemoveSubscriber(num int64) { @@ -306,6 +310,10 @@ func (s *Server) HandleSubscribe(c echo.Context) error { return fmt.Errorf("too many wanted DIDs") } + // Check if the user wants zstd compression + acceptEncoding := c.Request().Header.Get("Accept-Encoding") + compress := strings.Contains(acceptEncoding, "zstd") + var cursor *int64 var err error qCursor := c.Request().URL.Query().Get("cursor") @@ -342,16 +350,21 @@ func (s *Server) HandleSubscribe(c echo.Context) error { } }() - sub := s.AddSubscriber(ws, c.RealIP(), wantedCollectionPrefixes, wantedCollections, wantedDids, cursor) + sub, err := s.AddSubscriber(ws, c.RealIP(), compress, wantedCollectionPrefixes, wantedCollections, wantedDids, cursor) + if err != nil { + log.Error("failed to add subscriber", "error", err) + return err + } defer s.RemoveSubscriber(sub.id) if cursor != nil { log.Info("replaying events", "cursor", *cursor) playbackRateLimit := s.maxSubRate * 10 + go func() { for { - lastSeq, err := s.Consumer.ReplayEvents(ctx, *cursor, playbackRateLimit, func(ctx context.Context, timeUS int64, did, collection string, getEncodedEvent func() []byte) error { - return emitToSubscriber(ctx, log, sub, timeUS, did, collection, true, getEncodedEvent) + lastSeq, err := s.Consumer.ReplayEvents(ctx, sub.compress, *cursor, playbackRateLimit, func(ctx context.Context, timeUS int64, did, collection string, getEventBytes func() []byte) error { + return emitToSubscriber(ctx, log, sub, timeUS, did, collection, true, getEventBytes) }) if err != nil { log.Error("failed to replay events", "error", err) @@ -390,6 +403,16 @@ func (s *Server) HandleSubscribe(c echo.Context) error { log.Error("failed to wait for rate limiter", "error", err) return fmt.Errorf("failed to wait for rate limiter: %w", err) } + + // When compression is enabled, the buffer contains the compressed message + if compress { + if err := ws.WriteMessage(websocket.BinaryMessage, *msg); err != nil { + log.Error("failed to write message to websocket", "error", err) + return nil + } + continue + } + if err := ws.WriteMessage(websocket.TextMessage, *msg); err != nil { log.Error("failed to write message to websocket", "error", err) return nil diff --git a/docker-compose.yaml b/docker-compose.yaml index 729ba84..9c4671c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,3 +11,4 @@ services: - ./data:/data environment: - JETSTREAM_DATA_DIR=/data + - JETSTREAM_MAX_SUB_RATE=1_000_000 diff --git a/go.mod b/go.mod index 2f23492..8ec18f2 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/cockroachdb/pebble v1.1.2 github.com/goccy/go-json v0.10.2 github.com/gorilla/websocket v1.5.1 + github.com/klauspost/compress v1.17.9 github.com/labstack/echo/v4 v4.11.3 github.com/labstack/gommon v0.4.1 github.com/prometheus/client_golang v1.19.1 @@ -71,7 +72,6 @@ require ( github.com/jbenet/goprocess v0.1.4 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/go.sum b/go.sum index b145be8..851fd74 100644 --- a/go.sum +++ b/go.sum @@ -184,8 +184,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/koron/go-ssdp v0.0.3 h1:JivLMY45N76b4p/vsWGOKewBQu6uf39y8l+AQ7sDKx8= diff --git a/pkg/client/client.go b/pkg/client/client.go index 0473f61..f748e8b 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -10,9 +10,12 @@ import ( "github.com/bluesky-social/jetstream/pkg/models" "github.com/goccy/go-json" "github.com/gorilla/websocket" + "github.com/klauspost/compress/zstd" + "go.uber.org/atomic" ) type ClientConfig struct { + Compress bool WebsocketURL string WantedDids []string WantedCollections []string @@ -25,15 +28,19 @@ type Scheduler interface { } type Client struct { - Scheduler Scheduler - con *websocket.Conn - config *ClientConfig - logger *slog.Logger - shutdown chan chan struct{} + Scheduler Scheduler + con *websocket.Conn + config *ClientConfig + logger *slog.Logger + decoder *zstd.Decoder + BytesRead atomic.Int64 + EventsRead atomic.Int64 + shutdown chan chan struct{} } func DefaultClientConfig() *ClientConfig { return &ClientConfig{ + Compress: true, WebsocketURL: "ws://localhost:6008/subscribe", WantedDids: []string{}, WantedCollections: []string{}, @@ -49,12 +56,23 @@ func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler) ( } logger = logger.With("component", "jetstream-client") - return &Client{ + c := Client{ config: config, shutdown: make(chan chan struct{}), logger: logger, Scheduler: scheduler, - }, nil + } + + if config.Compress { + c.config.ExtraHeaders["Accept-Encoding"] = "zstd" + dec, err := zstd.NewReader(nil, zstd.WithDecoderDicts(models.ZSTDDictionary)) + if err != nil { + return nil, fmt.Errorf("failed to create zstd decoder: %w", err) + } + c.decoder = dec + } + + return &c, nil } func (c *Client) ConnectAndRead(ctx context.Context, cursor *int64) error { @@ -109,6 +127,9 @@ func (c *Client) ConnectAndRead(ctx context.Context, cursor *int64) error { func (c *Client) readLoop(ctx context.Context) error { c.logger.Info("starting websocket read loop") + bytesRead := bytesRead.WithLabelValues(c.config.WebsocketURL) + eventsRead := eventsRead.WithLabelValues(c.config.WebsocketURL) + for { select { case <-ctx.Done(): @@ -125,6 +146,21 @@ func (c *Client) readLoop(ctx context.Context) error { return fmt.Errorf("failed to read message from websocket: %w", err) } + bytesRead.Add(float64(len(msg))) + eventsRead.Inc() + c.BytesRead.Add(int64(len(msg))) + c.EventsRead.Inc() + + // Decompress the message if necessary + if c.decoder != nil && c.config.Compress { + m, err := c.decoder.DecodeAll(msg, nil) + if err != nil { + c.logger.Error("failed to decompress message", "error", err) + return fmt.Errorf("failed to decompress message: %w", err) + } + msg = m + } + // Unpack the message and pass it to the handler var event models.Event if err := json.Unmarshal(msg, &event); err != nil { diff --git a/pkg/client/metrics.go b/pkg/client/metrics.go new file mode 100644 index 0000000..e16bcb8 --- /dev/null +++ b/pkg/client/metrics.go @@ -0,0 +1,16 @@ +package client + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var bytesRead = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jetstream_client_bytes_read", + Help: "The total number of bytes read from the server", +}, []string{"client"}) + +var eventsRead = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jetstream_client_events_read", + Help: "The total number of events read from the server", +}, []string{"client"}) diff --git a/pkg/consumer/consumer.go b/pkg/consumer/consumer.go index 5ca1c82..78fd38c 100644 --- a/pkg/consumer/consumer.go +++ b/pkg/consumer/consumer.go @@ -17,6 +17,7 @@ import ( "github.com/bluesky-social/jetstream/pkg/monotonic" "github.com/cockroachdb/pebble" "github.com/goccy/go-json" + "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -26,8 +27,10 @@ import ( type Consumer struct { SocketURL string Progress *Progress - Emit func(context.Context, models.Event) error - DB *pebble.DB + Emit func(context.Context, *models.Event, []byte, []byte) error + UncompressedDB *pebble.DB + CompressedDB *pebble.DB + encoder *zstd.Encoder EventTTL time.Duration logger *slog.Logger clock *monotonic.Clock @@ -48,10 +51,16 @@ func NewConsumer( socketURL string, dataDir string, eventTTL time.Duration, - emit func(context.Context, models.Event) error, + emit func(context.Context, *models.Event, []byte, []byte) error, ) (*Consumer, error) { - dbPath := dataDir + "/jetstream.db" - db, err := pebble.Open(dbPath, &pebble.Options{}) + uDBPath := dataDir + "/jetstream.uncompressed.db" + uDB, err := pebble.Open(uDBPath, &pebble.Options{}) + if err != nil { + return nil, fmt.Errorf("failed to open db: %w", err) + } + + cDBPath := dataDir + "/jetstream.compressed.db" + cDB, err := pebble.Open(cDBPath, &pebble.Options{}) if err != nil { return nil, fmt.Errorf("failed to open db: %w", err) } @@ -63,6 +72,11 @@ func NewConsumer( return nil, fmt.Errorf("failed to create clock: %w", err) } + encoder, err := zstd.NewWriter(nil, zstd.WithEncoderDict(models.ZSTDDictionary)) + if err != nil { + return nil, fmt.Errorf("failed to create zstd encoder: %w", err) + } + c := Consumer{ SocketURL: socketURL, Progress: &Progress{ @@ -70,7 +84,9 @@ func NewConsumer( }, EventTTL: eventTTL, Emit: emit, - DB: db, + UncompressedDB: uDB, + CompressedDB: cDB, + encoder: encoder, logger: log, clock: clock, buf: make(chan *models.Event, 10_000), @@ -325,12 +341,21 @@ func (c *Consumer) RunSequencer(ctx context.Context) error { // Assign a time_us to the event e.TimeUS = c.clock.Now() c.sequenced.Inc() - if err := c.PersistEvent(ctx, e); err != nil { + + // Encode the event in JSON and compress it + asJSON, err := json.Marshal(e) + if err != nil { + log.Error("failed to marshal event", "error", err) + return + } + compBytes := c.encoder.EncodeAll(asJSON, nil) + + if err := c.PersistEvent(ctx, e, asJSON, compBytes); err != nil { log.Error("failed to persist event", "error", err) return } c.persisted.Inc() - if err := c.Emit(ctx, *e); err != nil { + if err := c.Emit(ctx, e, asJSON, compBytes); err != nil { log.Error("failed to emit event", "error", err) } c.emitted.Inc() diff --git a/pkg/consumer/persist.go b/pkg/consumer/persist.go index 5d3a0c7..1908d7d 100644 --- a/pkg/consumer/persist.go +++ b/pkg/consumer/persist.go @@ -54,7 +54,7 @@ func (c *Consumer) WriteCursor(ctx context.Context) error { } // Write the cursor JSON to pebble - err = c.DB.Set(cursorKey, data, pebble.Sync) + err = c.UncompressedDB.Set(cursorKey, data, pebble.Sync) if err != nil { return fmt.Errorf("failed to write cursor to pebble: %w", err) } @@ -68,7 +68,7 @@ func (c *Consumer) ReadCursor(ctx context.Context) error { defer span.End() // Read the cursor from pebble - data, closer, err := c.DB.Get(cursorKey) + data, closer, err := c.UncompressedDB.Get(cursorKey) if err != nil { if err == pebble.ErrNotFound { return nil @@ -87,17 +87,10 @@ func (c *Consumer) ReadCursor(ctx context.Context) error { } // PersistEvent persists an event to PebbleDB -func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event) error { +func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event, asJSON, compBytes []byte) error { ctx, span := tracer.Start(ctx, "PersistEvent") defer span.End() - // Persist the event to PebbleDB - data, err := json.Marshal(evt) - if err != nil { - log.Error("failed to marshal event", "error", err) - return fmt.Errorf("failed to marshal event: %w", err) - } - // Key structure for events in PebbleDB // {{event_time_us}}_{{repo}}_{{collection}} var key []byte @@ -107,12 +100,20 @@ func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event) error { key = []byte(fmt.Sprintf("%d_%s", evt.TimeUS, evt.Did)) } - err = c.DB.Set(key, data, pebble.NoSync) + // Write the event to the uncompressed DB + err := c.UncompressedDB.Set(key, asJSON, pebble.NoSync) if err != nil { log.Error("failed to write event to pebble", "error", err) return fmt.Errorf("failed to write event to pebble: %w", err) } + // Compress the event and write it to the compressed DB + err = c.CompressedDB.Set(key, compBytes, pebble.NoSync) + if err != nil { + log.Error("failed to write compressed event to pebble", "error", err) + return fmt.Errorf("failed to write compressed event to pebble: %w", err) + } + return nil } @@ -127,12 +128,19 @@ func (c *Consumer) TrimEvents(ctx context.Context) error { trimKey := []byte(fmt.Sprintf("%d", trimUntil)) // Delete all numeric keys older than the trim key - err := c.DB.DeleteRange([]byte("0"), trimKey, pebble.Sync) + err := c.UncompressedDB.DeleteRange([]byte("0"), trimKey, pebble.Sync) if err != nil { log.Error("failed to delete old events", "error", err) return fmt.Errorf("failed to delete old events: %w", err) } + // Delete all numeric keys older than the trim key in the compressed DB + err = c.CompressedDB.DeleteRange([]byte("0"), trimKey, pebble.Sync) + if err != nil { + log.Error("failed to delete old compressed events", "error", err) + return fmt.Errorf("failed to delete old compressed events: %w", err) + } + return nil } @@ -140,7 +148,7 @@ func (c *Consumer) TrimEvents(ctx context.Context) error { var finalKey = []byte("9700000000000000") // ReplayEvents replays events from PebbleDB -func (c *Consumer) ReplayEvents(ctx context.Context, cursor int64, playbackRateLimit float64, emit func(context.Context, int64, string, string, func() []byte) error) (int64, error) { +func (c *Consumer) ReplayEvents(ctx context.Context, compressed bool, cursor int64, playbackRateLimit float64, emit func(context.Context, int64, string, string, func() []byte) error) (int64, error) { ctx, span := tracer.Start(ctx, "ReplayEvents") defer span.End() @@ -149,10 +157,19 @@ func (c *Consumer) ReplayEvents(ctx context.Context, cursor int64, playbackRateL limiter := rate.NewLimiter(rate.Limit(playbackRateLimit), int(playbackRateLimit)) // Iterate over all events starting from the cursor - iter, err := c.DB.NewIterWithContext(ctx, &pebble.IterOptions{ + var iter *pebble.Iterator + var err error + + iterOptions := &pebble.IterOptions{ LowerBound: []byte(fmt.Sprintf("%d", cursor)), UpperBound: finalKey, - }) + } + + if compressed { + iter, err = c.CompressedDB.NewIterWithContext(ctx, iterOptions) + } else { + iter, err = c.UncompressedDB.NewIterWithContext(ctx, iterOptions) + } if err != nil { log.Error("failed to create iterator", "error", err) return 0, fmt.Errorf("failed to create iterator: %w", err) diff --git a/pkg/models/models.go b/pkg/models/models.go index 31f190d..9c7e115 100644 --- a/pkg/models/models.go +++ b/pkg/models/models.go @@ -3,9 +3,14 @@ package models import ( "github.com/goccy/go-json" + _ "embed" + comatproto "github.com/bluesky-social/indigo/api/atproto" ) +//go:embed zstd_dictionary +var ZSTDDictionary []byte + type Event struct { Did string `json:"did"` TimeUS int64 `json:"time_us"` diff --git a/pkg/models/zstd_dictionary b/pkg/models/zstd_dictionary new file mode 100644 index 0000000..106847e Binary files /dev/null and b/pkg/models/zstd_dictionary differ diff --git a/train.sh b/train.sh new file mode 100755 index 0000000..5f3ccab --- /dev/null +++ b/train.sh @@ -0,0 +1,7 @@ +i=0; websocat "ws://localhost:6008/subscribe?cursor=0" | while IFS= read -r line; do + echo "$line" > "training/$i.json" + i=$((i+1)) + if [ "$i" -ge 100000 ]; then + break + fi +done