From 4b4167612d10a89d9d1e600721ca09865234f36f Mon Sep 17 00:00:00 2001 From: Jaz Date: Sat, 21 Sep 2024 23:14:27 -0700 Subject: [PATCH] Embed zstd dictionary and update readme --- Dockerfile | 3 -- README.md | 10 ++++++ cmd/client/main.go | 16 +-------- cmd/jetstream/main.go | 32 ------------------ docker-compose.yaml | 1 - pkg/client/client.go | 6 ++-- pkg/consumer/consumer.go | 6 +++- pkg/models/models.go | 5 +++ zstd-dictionary => pkg/models/zstd_dictionary | Bin 9 files changed, 23 insertions(+), 56 deletions(-) rename zstd-dictionary => pkg/models/zstd_dictionary (100%) diff --git a/Dockerfile b/Dockerfile index 3f5c1d3..cce3a71 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,8 +32,5 @@ COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certifica # Copy the binary from the first stage. COPY --from=builder /app/jetstream . -# Copy the zstd dictionary -COPY zstd-dictionary /zstd-dictionary - # Set the startup command to run the binary CMD ["./jetstream"] diff --git a/README.md b/README.md index c5548ba..be0fd57 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,16 @@ The following Query Parameters are supported: - An absent cursor or a cursor from the future will result in live-tail operation - When reconnecting, use the `time_us` from your most recently processed event and maybe provide a negative buffer (i.e. subtract a few seconds) to ensure gapless playback +### Compression + +Jetstream supports `zstd`-based compression of messages. Jetstream uses a custom dictionary for compression that can be found in `pkg/models/zstd_dictionary` and is required to decode compressed messages from the server. + +`zstd` compressed Jetstream messages are ~56% smaller on average than the raw JSON version of the Jetstream firehose. + +The provided client library uses compression by default, using an embedded copy of the Dictionary from the `models` package. + +### Examples + A simple example that hits the public instance looks like: ```bash diff --git a/cmd/client/main.go b/cmd/client/main.go index 2c45e7a..b8aa480 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "log" "log/slog" "os" @@ -28,19 +27,6 @@ func main() { }))) logger := slog.Default() - // Open the zstd dictionary file if it is set - f, err := os.Open("zstd-dictionary") - if err != nil { - log.Fatalf("failed to open zstd dictionary file: %v", err) - } - - dictBytes, err := io.ReadAll(f) - if err != nil { - f.Close() - log.Fatalf("failed to read zstd dictionary file: %v", err) - } - f.Close() - config := client.DefaultClientConfig() config.WebsocketURL = serverAddr config.Compress = true @@ -51,7 +37,7 @@ func main() { scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) - c, err := client.NewClient(config, logger, scheduler, dictBytes) + c, err := client.NewClient(config, logger, scheduler) if err != nil { log.Fatalf("failed to create client: %v", err) } diff --git a/cmd/jetstream/main.go b/cmd/jetstream/main.go index b8e643c..e9ac443 100644 --- a/cmd/jetstream/main.go +++ b/cmd/jetstream/main.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "log" "log/slog" "net/http" @@ -19,7 +18,6 @@ import ( "github.com/bluesky-social/indigo/events/schedulers/parallel" "github.com/bluesky-social/jetstream/pkg/consumer" "github.com/gorilla/websocket" - "github.com/klauspost/compress/zstd" "github.com/labstack/echo/v4" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel" @@ -121,35 +119,6 @@ func Jetstream(cctx *cli.Context) error { return fmt.Errorf("failed to parse ws-url: %w", err) } - // Open the zstd dictionary file if it is set - var dictBytes []byte - if cctx.String("zstd-dictionary-path") != "" { - f, err := os.Open(cctx.String("zstd-dictionary-path")) - if err != nil { - return fmt.Errorf("failed to open zstd dictionary file: %w", err) - } - - dictBytes, err = io.ReadAll(f) - if err != nil { - f.Close() - return fmt.Errorf("failed to read zstd dictionary file: %w", err) - } - f.Close() - } - - var enc *zstd.Encoder - if len(dictBytes) > 0 { - enc, err = zstd.NewWriter(nil, zstd.WithEncoderDict(dictBytes)) - if err != nil { - return fmt.Errorf("failed to create zstd encoder: %w", err) - } - } else { - enc, err = zstd.NewWriter(nil) - if err != nil { - return fmt.Errorf("failed to create zstd encoder: %w", err) - } - } - s, err := NewServer(cctx.Float64("max-sub-rate")) if err != nil { return fmt.Errorf("failed to create server: %w", err) @@ -161,7 +130,6 @@ func Jetstream(cctx *cli.Context) error { u.String(), cctx.String("data-dir"), cctx.Duration("event-ttl"), - enc, s.Emit, ) if err != nil { diff --git a/docker-compose.yaml b/docker-compose.yaml index 18d8b67..9c4671c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,5 +11,4 @@ services: - ./data:/data environment: - JETSTREAM_DATA_DIR=/data - - JETSTREAM_ZSTD_DICTIONARY_PATH=/zstd-dictionary - JETSTREAM_MAX_SUB_RATE=1_000_000 diff --git a/pkg/client/client.go b/pkg/client/client.go index e873701..f748e8b 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -33,7 +33,6 @@ type Client struct { config *ClientConfig logger *slog.Logger decoder *zstd.Decoder - dictBytes []byte BytesRead atomic.Int64 EventsRead atomic.Int64 shutdown chan chan struct{} @@ -51,7 +50,7 @@ func DefaultClientConfig() *ClientConfig { } } -func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler, dictBytes []byte) (*Client, error) { +func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler) (*Client, error) { if config == nil { config = DefaultClientConfig() } @@ -66,8 +65,7 @@ func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler, d if config.Compress { c.config.ExtraHeaders["Accept-Encoding"] = "zstd" - c.dictBytes = dictBytes - dec, err := zstd.NewReader(nil, zstd.WithDecoderDicts(dictBytes)) + dec, err := zstd.NewReader(nil, zstd.WithDecoderDicts(models.ZSTDDictionary)) if err != nil { return nil, fmt.Errorf("failed to create zstd decoder: %w", err) } diff --git a/pkg/consumer/consumer.go b/pkg/consumer/consumer.go index 2f0e2cc..78fd38c 100644 --- a/pkg/consumer/consumer.go +++ b/pkg/consumer/consumer.go @@ -51,7 +51,6 @@ func NewConsumer( socketURL string, dataDir string, eventTTL time.Duration, - encoder *zstd.Encoder, emit func(context.Context, *models.Event, []byte, []byte) error, ) (*Consumer, error) { uDBPath := dataDir + "/jetstream.uncompressed.db" @@ -73,6 +72,11 @@ func NewConsumer( return nil, fmt.Errorf("failed to create clock: %w", err) } + encoder, err := zstd.NewWriter(nil, zstd.WithEncoderDict(models.ZSTDDictionary)) + if err != nil { + return nil, fmt.Errorf("failed to create zstd encoder: %w", err) + } + c := Consumer{ SocketURL: socketURL, Progress: &Progress{ diff --git a/pkg/models/models.go b/pkg/models/models.go index 31f190d..9c7e115 100644 --- a/pkg/models/models.go +++ b/pkg/models/models.go @@ -3,9 +3,14 @@ package models import ( "github.com/goccy/go-json" + _ "embed" + comatproto "github.com/bluesky-social/indigo/api/atproto" ) +//go:embed zstd_dictionary +var ZSTDDictionary []byte + type Event struct { Did string `json:"did"` TimeUS int64 `json:"time_us"` diff --git a/zstd-dictionary b/pkg/models/zstd_dictionary similarity index 100% rename from zstd-dictionary rename to pkg/models/zstd_dictionary