Skip to content

Commit

Permalink
Embed zstd dictionary and update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Sep 22, 2024
1 parent 28c1ba2 commit 4b41676
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 56 deletions.
3 changes: 0 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,5 @@ COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certifica
# Copy the binary from the first stage.
COPY --from=builder /app/jetstream .

# Copy the zstd dictionary
COPY zstd-dictionary /zstd-dictionary

# Set the startup command to run the binary
CMD ["./jetstream"]
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ The following Query Parameters are supported:
- An absent cursor or a cursor from the future will result in live-tail operation
- When reconnecting, use the `time_us` from your most recently processed event and maybe provide a negative buffer (i.e. subtract a few seconds) to ensure gapless playback

### Compression

Jetstream supports `zstd`-based compression of messages. Jetstream uses a custom dictionary for compression that can be found in `pkg/models/zstd_dictionary` and is required to decode compressed messages from the server.

`zstd` compressed Jetstream messages are ~56% smaller on average than the raw JSON version of the Jetstream firehose.

The provided client library uses compression by default, using an embedded copy of the Dictionary from the `models` package.

### Examples

A simple example that hits the public instance looks like:

```bash
Expand Down
16 changes: 1 addition & 15 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"log/slog"
"os"
Expand All @@ -28,19 +27,6 @@ func main() {
})))
logger := slog.Default()

// Open the zstd dictionary file if it is set
f, err := os.Open("zstd-dictionary")
if err != nil {
log.Fatalf("failed to open zstd dictionary file: %v", err)
}

dictBytes, err := io.ReadAll(f)
if err != nil {
f.Close()
log.Fatalf("failed to read zstd dictionary file: %v", err)
}
f.Close()

config := client.DefaultClientConfig()
config.WebsocketURL = serverAddr
config.Compress = true
Expand All @@ -51,7 +37,7 @@ func main() {

scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent)

c, err := client.NewClient(config, logger, scheduler, dictBytes)
c, err := client.NewClient(config, logger, scheduler)
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
Expand Down
32 changes: 0 additions & 32 deletions cmd/jetstream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"log"
"log/slog"
"net/http"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/bluesky-social/indigo/events/schedulers/parallel"
"github.com/bluesky-social/jetstream/pkg/consumer"
"github.com/gorilla/websocket"
"github.com/klauspost/compress/zstd"
"github.com/labstack/echo/v4"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -121,35 +119,6 @@ func Jetstream(cctx *cli.Context) error {
return fmt.Errorf("failed to parse ws-url: %w", err)
}

// Open the zstd dictionary file if it is set
var dictBytes []byte
if cctx.String("zstd-dictionary-path") != "" {
f, err := os.Open(cctx.String("zstd-dictionary-path"))
if err != nil {
return fmt.Errorf("failed to open zstd dictionary file: %w", err)
}

dictBytes, err = io.ReadAll(f)
if err != nil {
f.Close()
return fmt.Errorf("failed to read zstd dictionary file: %w", err)
}
f.Close()
}

var enc *zstd.Encoder
if len(dictBytes) > 0 {
enc, err = zstd.NewWriter(nil, zstd.WithEncoderDict(dictBytes))
if err != nil {
return fmt.Errorf("failed to create zstd encoder: %w", err)
}
} else {
enc, err = zstd.NewWriter(nil)
if err != nil {
return fmt.Errorf("failed to create zstd encoder: %w", err)
}
}

s, err := NewServer(cctx.Float64("max-sub-rate"))
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
Expand All @@ -161,7 +130,6 @@ func Jetstream(cctx *cli.Context) error {
u.String(),
cctx.String("data-dir"),
cctx.Duration("event-ttl"),
enc,
s.Emit,
)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ services:
- ./data:/data
environment:
- JETSTREAM_DATA_DIR=/data
- JETSTREAM_ZSTD_DICTIONARY_PATH=/zstd-dictionary
- JETSTREAM_MAX_SUB_RATE=1_000_000
6 changes: 2 additions & 4 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type Client struct {
config *ClientConfig
logger *slog.Logger
decoder *zstd.Decoder
dictBytes []byte
BytesRead atomic.Int64
EventsRead atomic.Int64
shutdown chan chan struct{}
Expand All @@ -51,7 +50,7 @@ func DefaultClientConfig() *ClientConfig {
}
}

func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler, dictBytes []byte) (*Client, error) {
func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler) (*Client, error) {
if config == nil {
config = DefaultClientConfig()
}
Expand All @@ -66,8 +65,7 @@ func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler, d

if config.Compress {
c.config.ExtraHeaders["Accept-Encoding"] = "zstd"
c.dictBytes = dictBytes
dec, err := zstd.NewReader(nil, zstd.WithDecoderDicts(dictBytes))
dec, err := zstd.NewReader(nil, zstd.WithDecoderDicts(models.ZSTDDictionary))
if err != nil {
return nil, fmt.Errorf("failed to create zstd decoder: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func NewConsumer(
socketURL string,
dataDir string,
eventTTL time.Duration,
encoder *zstd.Encoder,
emit func(context.Context, *models.Event, []byte, []byte) error,
) (*Consumer, error) {
uDBPath := dataDir + "/jetstream.uncompressed.db"
Expand All @@ -73,6 +72,11 @@ func NewConsumer(
return nil, fmt.Errorf("failed to create clock: %w", err)
}

encoder, err := zstd.NewWriter(nil, zstd.WithEncoderDict(models.ZSTDDictionary))
if err != nil {
return nil, fmt.Errorf("failed to create zstd encoder: %w", err)
}

c := Consumer{
SocketURL: socketURL,
Progress: &Progress{
Expand Down
5 changes: 5 additions & 0 deletions pkg/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ package models
import (
"github.com/goccy/go-json"

_ "embed"

comatproto "github.com/bluesky-social/indigo/api/atproto"
)

//go:embed zstd_dictionary
var ZSTDDictionary []byte

type Event struct {
Did string `json:"did"`
TimeUS int64 `json:"time_us"`
Expand Down
File renamed without changes.

0 comments on commit 4b41676

Please sign in to comment.