From 1cf0d421f4f564511846385028287fb966f89e56 Mon Sep 17 00:00:00 2001 From: Jaz Date: Sat, 21 Sep 2024 17:37:27 -0700 Subject: [PATCH 1/5] Experiment with ZSTD compression for Jetstream --- cmd/client/main.go | 41 +++++++++++++++++++++++---------- cmd/jetstream/server.go | 43 ++++++++++++++++++++++++++++++++--- docker-compose.yaml | 1 + go.mod | 2 +- go.sum | 4 ++-- pkg/client/client.go | 50 +++++++++++++++++++++++++++++++++++------ pkg/client/metrics.go | 16 +++++++++++++ 7 files changed, 132 insertions(+), 25 deletions(-) create mode 100644 pkg/client/metrics.go diff --git a/cmd/client/main.go b/cmd/client/main.go index 2848800..26df233 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -2,12 +2,14 @@ package main import ( "context" + "encoding/json" "fmt" "log" "log/slog" "os" "time" + apibsky "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" @@ -27,6 +29,7 @@ func main() { config := client.DefaultClientConfig() config.WebsocketURL = serverAddr + config.Compress = false h := &handler{ seenSeqs: make(map[int64]struct{}), @@ -39,7 +42,21 @@ func main() { log.Fatalf("failed to create client: %v", err) } - cursor := time.Now().Add(1 * -time.Hour).UnixMicro() + cursor := time.Now().Add(5 * -time.Hour).UnixMicro() + + // Every 5 seconds print the events read and bytes read and average event size + go func() { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ticker.C: + eventsRead := c.EventsRead.Load() + bytesRead := c.BytesRead.Load() + avgEventSize := bytesRead / eventsRead + logger.Info("stats", "events_read", eventsRead, "bytes_read", bytesRead, "avg_event_size", avgEventSize) + } + } + }() if err := c.ConnectAndRead(ctx, &cursor); err != nil { log.Fatalf("failed to connect: %v", err) @@ -54,19 +71,19 @@ type handler struct { } func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { - fmt.Println("evt") + // fmt.Println("evt") // Unmarshal the record if there is one - // if event.Commit != nil && (event.Commit.OpType == models.CommitCreateRecord || event.Commit.OpType == models.CommitUpdateRecord) { - // switch event.Commit.Collection { - // case "app.bsky.feed.post": - // var post apibsky.FeedPost - // if err := json.Unmarshal(event.Commit.Record, &post); err != nil { - // return fmt.Errorf("failed to unmarshal post: %w", err) - // } - // // fmt.Printf("%v |(%s)| %s\n", time.UnixMicro(event.TimeUS).Local().Format("15:04:05"), event.Did, post.Text) - // } - // } + if event.Commit != nil && (event.Commit.OpType == models.CommitCreateRecord || event.Commit.OpType == models.CommitUpdateRecord) { + switch event.Commit.Collection { + case "app.bsky.feed.post": + var post apibsky.FeedPost + if err := json.Unmarshal(event.Commit.Record, &post); err != nil { + return fmt.Errorf("failed to unmarshal post: %w", err) + } + // fmt.Printf("%v |(%s)| %s\n", time.UnixMicro(event.TimeUS).Local().Format("15:04:05"), event.Did, post.Text) + } + } return nil } diff --git a/cmd/jetstream/server.go b/cmd/jetstream/server.go index 37c660d..4240fd0 100644 --- a/cmd/jetstream/server.go +++ b/cmd/jetstream/server.go @@ -16,6 +16,7 @@ import ( "github.com/bluesky-social/jetstream/pkg/models" "github.com/goccy/go-json" "github.com/gorilla/websocket" + "github.com/klauspost/compress/zstd" "github.com/labstack/echo/v4" "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/semaphore" @@ -39,12 +40,14 @@ type Subscriber struct { id int64 cLk sync.Mutex cursor *int64 + compress bool deliveredCounter prometheus.Counter bytesCounter prometheus.Counter // wantedCollections is nil if the subscriber wants all collections wantedCollections *WantedCollections wantedDids map[string]struct{} rl *rate.Limiter + encoder *zstd.Encoder } type Server struct { @@ -141,6 +144,10 @@ func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, ti } evtBytes := getEncodedEvent() + if sub.compress { + evtBytes = sub.encoder.EncodeAll(evtBytes, nil) + } + if playback { // Copy the event bytes so the playback iterator can reuse the buffer evtBytes = append([]byte{}, evtBytes...) @@ -191,7 +198,7 @@ func (s *Server) GetSeq() int64 { return s.seq } -func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, wantedCollectionPrefixes []string, wantedCollections []string, wantedDids []string, cursor *int64) *Subscriber { +func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, compress bool, wantedCollectionPrefixes []string, wantedCollections []string, wantedDids []string, cursor *int64) (*Subscriber, error) { s.lk.Lock() defer s.lk.Unlock() @@ -227,11 +234,21 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, wantedCollecti wantedCollections: wantedCol, wantedDids: didMap, cursor: cursor, + compress: compress, deliveredCounter: eventsDelivered.WithLabelValues(realIP), bytesCounter: bytesDelivered.WithLabelValues(realIP), rl: rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate)), } + if compress { + encoder, err := zstd.NewWriter(nil) + if err != nil { + slog.Error("failed to create zstd encoder", "error", err) + return nil, fmt.Errorf("failed to create zstd encoder: %w", err) + } + sub.encoder = encoder + } + s.Subscribers[s.nextSub] = &sub s.nextSub++ @@ -242,9 +259,11 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, wantedCollecti "id", sub.id, "wantedCollections", wantedCol, "wantedDids", wantedDids, + "cursor", cursor, + "compress", compress, ) - return &sub + return &sub, nil } func (s *Server) RemoveSubscriber(num int64) { @@ -306,6 +325,10 @@ func (s *Server) HandleSubscribe(c echo.Context) error { return fmt.Errorf("too many wanted DIDs") } + // Check if the user wants zstd compression + acceptEncoding := c.Request().Header.Get("Accept-Encoding") + compress := strings.Contains(acceptEncoding, "zstd") + var cursor *int64 var err error qCursor := c.Request().URL.Query().Get("cursor") @@ -342,7 +365,11 @@ func (s *Server) HandleSubscribe(c echo.Context) error { } }() - sub := s.AddSubscriber(ws, c.RealIP(), wantedCollectionPrefixes, wantedCollections, wantedDids, cursor) + sub, err := s.AddSubscriber(ws, c.RealIP(), compress, wantedCollectionPrefixes, wantedCollections, wantedDids, cursor) + if err != nil { + log.Error("failed to add subscriber", "error", err) + return err + } defer s.RemoveSubscriber(sub.id) if cursor != nil { @@ -390,6 +417,16 @@ func (s *Server) HandleSubscribe(c echo.Context) error { log.Error("failed to wait for rate limiter", "error", err) return fmt.Errorf("failed to wait for rate limiter: %w", err) } + + // When compression is enabled, the buffer contains the compressed message + if compress { + if err := ws.WriteMessage(websocket.BinaryMessage, *msg); err != nil { + log.Error("failed to write message to websocket", "error", err) + return nil + } + continue + } + if err := ws.WriteMessage(websocket.TextMessage, *msg); err != nil { log.Error("failed to write message to websocket", "error", err) return nil diff --git a/docker-compose.yaml b/docker-compose.yaml index 729ba84..9c4671c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,3 +11,4 @@ services: - ./data:/data environment: - JETSTREAM_DATA_DIR=/data + - JETSTREAM_MAX_SUB_RATE=1_000_000 diff --git a/go.mod b/go.mod index 2f23492..8ec18f2 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/cockroachdb/pebble v1.1.2 github.com/goccy/go-json v0.10.2 github.com/gorilla/websocket v1.5.1 + github.com/klauspost/compress v1.17.9 github.com/labstack/echo/v4 v4.11.3 github.com/labstack/gommon v0.4.1 github.com/prometheus/client_golang v1.19.1 @@ -71,7 +72,6 @@ require ( github.com/jbenet/goprocess v0.1.4 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/go.sum b/go.sum index b145be8..851fd74 100644 --- a/go.sum +++ b/go.sum @@ -184,8 +184,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/koron/go-ssdp v0.0.3 h1:JivLMY45N76b4p/vsWGOKewBQu6uf39y8l+AQ7sDKx8= diff --git a/pkg/client/client.go b/pkg/client/client.go index 0473f61..f7a6114 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -10,9 +10,12 @@ import ( "github.com/bluesky-social/jetstream/pkg/models" "github.com/goccy/go-json" "github.com/gorilla/websocket" + "github.com/klauspost/compress/zstd" + "go.uber.org/atomic" ) type ClientConfig struct { + Compress bool WebsocketURL string WantedDids []string WantedCollections []string @@ -25,15 +28,19 @@ type Scheduler interface { } type Client struct { - Scheduler Scheduler - con *websocket.Conn - config *ClientConfig - logger *slog.Logger - shutdown chan chan struct{} + Scheduler Scheduler + con *websocket.Conn + config *ClientConfig + logger *slog.Logger + decoder *zstd.Decoder + BytesRead atomic.Int64 + EventsRead atomic.Int64 + shutdown chan chan struct{} } func DefaultClientConfig() *ClientConfig { return &ClientConfig{ + Compress: true, WebsocketURL: "ws://localhost:6008/subscribe", WantedDids: []string{}, WantedCollections: []string{}, @@ -49,12 +56,23 @@ func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler) ( } logger = logger.With("component", "jetstream-client") - return &Client{ + c := Client{ config: config, shutdown: make(chan chan struct{}), logger: logger, Scheduler: scheduler, - }, nil + } + + if config.Compress { + dec, err := zstd.NewReader(nil) + if err != nil { + return nil, fmt.Errorf("failed to create zstd decoder: %w", err) + } + c.config.ExtraHeaders["Accept-Encoding"] = "zstd" + c.decoder = dec + } + + return &c, nil } func (c *Client) ConnectAndRead(ctx context.Context, cursor *int64) error { @@ -109,6 +127,9 @@ func (c *Client) ConnectAndRead(ctx context.Context, cursor *int64) error { func (c *Client) readLoop(ctx context.Context) error { c.logger.Info("starting websocket read loop") + bytesRead := bytesRead.WithLabelValues(c.config.WebsocketURL) + eventsRead := eventsRead.WithLabelValues(c.config.WebsocketURL) + for { select { case <-ctx.Done(): @@ -125,6 +146,21 @@ func (c *Client) readLoop(ctx context.Context) error { return fmt.Errorf("failed to read message from websocket: %w", err) } + bytesRead.Add(float64(len(msg))) + eventsRead.Inc() + c.BytesRead.Add(int64(len(msg))) + c.EventsRead.Inc() + + // Decompress the message if necessary + if c.decoder != nil && c.config.Compress { + m, err := c.decoder.DecodeAll(msg, nil) + if err != nil { + c.logger.Error("failed to decompress message", "error", err) + return fmt.Errorf("failed to decompress message: %w", err) + } + msg = m + } + // Unpack the message and pass it to the handler var event models.Event if err := json.Unmarshal(msg, &event); err != nil { diff --git a/pkg/client/metrics.go b/pkg/client/metrics.go new file mode 100644 index 0000000..e16bcb8 --- /dev/null +++ b/pkg/client/metrics.go @@ -0,0 +1,16 @@ +package client + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var bytesRead = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jetstream_client_bytes_read", + Help: "The total number of bytes read from the server", +}, []string{"client"}) + +var eventsRead = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jetstream_client_events_read", + Help: "The total number of events read from the server", +}, []string{"client"}) From a41ccb113fed7b04658d9523eb1cd600220ce9b7 Mon Sep 17 00:00:00 2001 From: Jaz Date: Sat, 21 Sep 2024 18:05:08 -0700 Subject: [PATCH 2/5] ZSTD training script and resulting dictionary --- dictionary | Bin 0 -> 112640 bytes train.sh | 7 +++++++ 2 files changed, 7 insertions(+) create mode 100644 dictionary create mode 100755 train.sh diff --git a/dictionary b/dictionary new file mode 100644 index 0000000000000000000000000000000000000000..106847e7007b4ac409fd58912fed85d69aedb352 GIT binary patch literal 112640 zcmb@vTdZwMdLCpuo!B}=kcWUGc$wZNJe|GHT2(cw*4i)a(`_4B@h#mpkwPOb7H$Vn~pQWP(7A;}FM&NC;y^fQ7*O0VIUPBhNl40tpE}@&;dxwa?mX z%{AuS^Yl8q)n{MUoHfU+%U}QH`@UcNgX{mh{F#6LZ~QBN;eY$+H~-oHFZ}wozxQ*0 z{6GK0pZZ%r_c#CUZ~n!<_y7KL|Eu^b<2V1S|K@-Hp?~II`eFKKe+a++_QDaCU;G#T=AZlR z|MG9#e5L>M|JRS);XTR!^5Z}JKmPFBA2q-Fm%j4DKlDSN{-wje_!qwWXYd33<%hn4 z|Nj{Nf0fEu_dikUs<*y+_3^93Ca>Ov|GueR{HB{$BJ_k8Wm8&=wi)6&u6g(BJO9SV zuWXg*`PG|mzfw!Df4ai^2oWD|fBR-H+a7Vjz0rr*=(upU>#zQ;kG?+i`hZ7#bNERmJL}4!(^sl177QL+=?))#^rzqd+y4kZ zcCznvH{sK4_4pO9zVq>`c$-lr)1}qc%qF*N?PaPZzKbAbIro&Yv8&fH6N0ERs$w@K zB`u_h2j)ul>O0^0=-WpNW9K?!e5qGQi$TjtPIR2jtg(W~S{?fq8{^xr{^fVW?Y=v1u_!wEH0%oO8TR`6?lrmj-dmvP&pnW4EJPG<=|>Z<2Z^)CW-Z8?75~Sn>!(vZqRD=%BECcwcVPI z6Jw5HxS_T9JlVhb*4-yhAG`Kl|H=J)v9w7NQelmb zRyxg<$5|l*sZvhpoK-EpW;+qKm%%n7LLG9>)1WH#!-! zY*@B2UcS+zHxaqICTBOg>9b}eV??W*8N~$YYFoJyn>-zjAdO*7pHD-VXMNk3C6}xZ zCw*t5&uts!#1|E;J6Xqg>gJ)NnL4%6hs0F2aAbVG(PP17RIrhhUX_ILX&E}#xuHuM zA1@*@)F`s>qRw13jM?0}e0QVsxnnwC455uJd{}BTil`ZPRvH$KlWTf?^M3J8ZuDV^ zN*+0~ykMm{pOmzA>xWsa_(v3rkn~NyL?#d zQcu#ssK&h?8`bA|9mj?=sY`~aTGSBdmW*rG>_3g37O|2hnWIrJb3s?qPU*0YC->c~ z>!Hmg&)}7q5Fs@J}Eql>$ z>D^cor`$xRboVM1H{#}w-Cks2Xl{6Gh=0Mp8)=>`8wb#j>TLpOGgD9iDfV_7(H(jLzgw?E}F*^SLrgPS>pU$jhe`OBu>DUJ&6Exoy+_}@hqW|&zvk?l_U&|Q+XvNl(#cg|1P8ISy?VHu{m z@Y^tF7W-&`Gp-YR!|2sDeR`ujxb}Hl_EQyHtmqBBhCMoK0Sv2^OeVRgA}iOpSB*B& zxDdF`bt}5EVM!~8T#Zdq=S?}3w6yyc088wKNHf{0bSYTV_(UgGpWGMqA`05WW<1{) zy3eu^=7f;C&}21HVj@$t5o!8rn$~_uvOMOzY4eivFE<2?ycJ%vAgg?eu`-`3FFb1=78{^ETc5kX2aUN3dsbz=7>fBBK-_IAoC>|W|)VJ zv&7e>+PC$jl|}4{7|Xo2apg!_6`hX4Ry|&6L2sj*J3`6(#XIR^jKM808R=iJ(qRmR zZy&~Z`AXwV^D7=j=TiwbBSTcIwuu_jy4~w?>FTM^ZKU(#P#Gb#NA^A|Z`$%4A&a-dPp!FnL7tHm>_3 zX4#rHEgavFx;S1z9`S3~27Y(J3c`(F)@3H`#VZ(Mz_3*B1K%))SNCIlXa(QB`^>z< zON_1vr(p#j@+Y}`oIm;AuYUHuf5fgo`_+H)*{}W4XTSCbfAZab_}TaV@U!pz`u%^O ztwCC-HtQPG^0Ic{`_!b%^-zNJ&{NtK;5RwC+hl`($?wWG38V8|h`e=454k zGka%RvbP45-rk7l?xK5iPA91(3HRa;8j}rk3g^>qV|-{0-lZ3MD2++<9~iL^^6BSv%2hc@eB>y4AN0U7}sY$7Jxfx{D@B#4+)w>ZUEAwft zE912+s=1MBGAXI}uJw;1c0=y2FH5DYBUJ?n%lg56d*r@7G9M#Zhu+)|b7t!v4RB`n zBBMaBZs^n7{=sq&Gc9=Hr=g}xZsLwLQz=WupIME>t9xPMWaHxF$z5h?!hw zYFnvHOzmRkGVjU=(b{gDg;UZ<*%+63TU3)^m77Xq_v5TPI9W(#y`Rw2cQ0*BVPeu< zGW(DHq{mZ_F|_NpG0sf=<9DB#cf?t@S9k3CoYhDM)4I&WEMah+#X^;12Hw`fXeugQ zRteQ)VV%lokJQOp6fz_*cH(=3OEtBg;zV{aV`W#W%=-D1)hHUibaPRVVLNA09ZBI! z7pY3dq)|muaF&mmsp<+z?kc4UPV=dc_8GoJi@Z-Yh*|N*l}a-iCv zWmB(ZjO3o<5po{1SA@)(q_ON=l8TkW4<-5Ph}BWf;qcL2S6=9ZcXg#(zw~p`OM zj%+SR(L`xmATKOfXNM}PQ&o|CDM(BP!}%InRYiT_W;cwx=7bAfmW`{8sma_l&Cu6A?ZuZ3C1XwdK=^A<^--hxr%O|I^ahYkc_~7NtC=yl4X9% z5MSoAs}t7}*_E!r@{uBPJz~kBMpbF-qfkH?kcxj??pwN|fya;}97PbD~QV>M29iTDE?4(->J&Kb`B&+)H_6y;`Gs7;wA>ALsRmz|+^kafVyeT4L+2poHZ;N|&zJ2y@$T2A`!oyoM3T z5YXqgGK(CeB8nu7UD)-iw62IW|gfSY`ZezUs^sqP4%@q+( zlVC@u$B3%X9V==sVufr|8^MtDU03r_pf)zQ(Ar%Pf;Qj^yAr;=Ms!-lF6Yo2F% z&8H>nds5F0J+aa*!PZ!l4uyqhE_@=pDd|v*7T(*ea#1w&GN%pe4T|t(n~rR)Xxe#A zS+lx0OY_~Nr^a^4DWzr8E>_}&&JOmzl!28#Dx~_vB4a(x$x^PUT=ms}Xb9GjXaBznn~#~4Q6kMZ&?595Z64Lf^bq=KR1K_qj)S=443$`yHO zgoTslQpYwfvT4>8(q(RPs;!H7OF{1VpLuEu&5AB(@VI(v#yF;B-3}*LdYWfAyKsY^ zt#sm-Ts0_34!IURS4xh{h@| zxEvD{H+PJi;Peia#ORXz1cJP|^AX(F_kINaxS!8G3ez6lWMLTBr`s@R7W-&`GnW}$ zL3+jc(=5e9oHz9V!uTp7ZA7KEGB%B!)+oP^^N#QjG8z0V#}Tm^U_m}&eKycEZ%;1f zTBF8Ka4SDsOp`Xtq&wG-ZH+9pi3*`-?yXD^{pqw zXFd94>?}%@5iP1ZHmX3;s~aegM=dVh4990KQP;24*iR-lsDP&HiL!9Bcx1liqDclA zM`l@u6E`RGiIvVKRwZyN-i3`%;Wwr$JGub8Rb~>d%`wW{vE|CG zB|*%8@6kWQG<7vYX{%WkUGpW%Tro5@GikD|^gYBXVpoKotre(N#br9XOp$@nVKJht zC%)1lpw; zl81VBt>78zLpvMq_L@aEghUr102r`^L?Q_O!8hKBF_f^|7%$&={0(EkdYoP9xQ!KH zT~-&?l$bFht9nF$W7^pdHfp%x425IQW}o_W7|TM10_V<3>#-ASwZJ_}+qxrjE~U6NWjn*hd4Lxl-tx8`LlOi45>Gvit*Z zQ0B7=_>&E6_#tG&3DY@Z3`@*5#+fhm_}yo|5f8rY&F$ITs%*LxC!Q-RDXXxPle|qb zlC88R5uj=`8%jgFSyI>7G%H&nPh64<@Ks16`jusQF1jp*NzR)59d;@Jfw#wgfP)T0 zmdr3bI|)u@RnZOJ3JaVy(84WH5=-G}BVCL1u2<8do1(87pvQAjBUl=CqnMoZ!uUJ9EB|G@>T0P#r7=ywY3}d`}r4hK3D|TQ1 zJO%tsfhuZO51>LqsfcoKDlV72pPQt$^AIgH?v~W37MXExfvooZi4;vD5mBmKQIpYJ z8kSd)^t|CG+@V7E4b4cX7(8=)r4uA|q_2@joV#JrUBddFvvwS$U_*f#RjCUj^FA*d z0N9z_!vAL}n#HqxZhTrGGrlM_R=O+eB-`AW<9PM%7O89@F1eM46{*}YCtKAEpov(% z{oS{};}7obZwJgT^6m2DgYG(cz|9hZp#d@h)%z@Ky&dz))k{&mSp_53oN1QXO7|FV^FGmiWBU zI^#R}tydo(S`60v;p(Grz54XsS3&CR5F=_DRtsdb(&fiMLzlk8qi(c;L)~HxQ2IK+ z=g7j3OWB3dfs5`A5+4V0L;zJ+cvg$A8~%&AY+H~J@bw@jsu~X# zW~;O%UYf{5`Sy42pC6{C^Y}vV<_teU{tggse3|X5;F)7L*x~B%(c#msgE5hBKKeP2 z$B~Dh#Vq(^rt9ll{hZdbJyxxim+E!Ydv{YvWqi#L5=cR3GjNrez6=S=Q`f~c66bGj z7@%$2b)iw5+NUCHKe+-aMYs6=81dncd&GsOul|XL*Md8vJM8cJ9Z%w;pW1eM8zA=B za50t`+oIFk_Qx7DgAF@9><%n-ioIh;Y+(FRI=mLEj>+?F|Mu@!nAStsHSdmIVC&$G z@=$Bc2!_W2-Od2EY*{;igmHG?{yz3$h0TbMO0kdd{u2N8ari?xdQnW0G1UbIJ&}bH=gLhU9 zzKH8-9x7~xnG;?vOQcal%)zio^QhYe0u-jA9Wv}7d#I#uG7dy?=lx!!iEIMu(6@J& zqyk53`)rjjs~uqs6*XT{?Fgx%TR?s{5YeeiR4#N)+bNMKCCrKzg@G?36>d*6qQE;F zAT0F)ihzrNs%9Ag+$Y?nBu8SnNO>8}^+H=(;*N7k?@nBzB_Lq{jkSB(Bjg=9o~CzM zj&OHEeOoZYT><2w)w4jo~g^)=4VyQri63*1hsoApL&mO&13l^k`(yRfjG49zI zp%kBQ2({z;pA&JzZZ1Nr42v?kp<6n)U#y)75I1nxynjxtN;(`a(Zdm`B%(hO8#n4M zr82K*Pur#70$6_!>IrGeUJtE_Pt*fIQX3m&-Mgw8KzgF9jrut0PN5WV&P)03+3;}h z2!o$yKKKbRLK?RuY~aU-aH*HUWZ?J1h-4ltNS~8yJG|4 z33p7egW*kq{S^3wUFo0M8`Vy_uj+NyQz>ksi-s5vOOeW|Y&%Jk(YGcdDN81mIxUhi z+1-UhG1RV6BXX&$ofobrSJx`qT}mR5V4x9u*jO-^+tzx{)*EIqq#T&{_q#CUEWX$- zWC)?fz!Tv8@ZK|i`xgm2nLv>CG?V#|kY166p5_%e>xLt9GwT9i7~aLtXFt}3#*<}O zgSJ@bWCCb$=EByamY7LB6IbB6>QzNFcw6gK~9i2N}7SuK3W=SdENDcpFSg*ATzt0%;5 zIBB+bAt&(7x4#X%zX#K=#VG<2@PI2BJ~S9yft-sohZ{Ul2H`;-{?*{r!cX;wZw|qI zy3YlD?mPm zx-{Gtc-dWzs+V*jromUi2OowKZj@~Cx8Se@4-OtatPi7<3ZuS{yOR0B!y>wnXYfsCU4twy&P6sU?-#wF z#L8RROtNoUx-8jPTWbw-EtV}T-&5`{zoz7dirdTfvqdfWKGpbx=gGmo756F6Gn@Hw z0^yYR4$qXlfgNGd**tlQ>`t{R5|vrgExzP&1dyv7>(mT&X^k~QhUAJ%ERxQ%6%i0H z4_ovJ_cxA{ij-3_qVTxR!hn%Y$AOYl=C(b`UMJ-p^I zUptU^O@>`u)ppF65WEhHGW_!sV!5G-Tw|AFRM1a6! zMD)Q00BQT%uMmFYY(1k@n=()Hu?KwD*?CQ-rh)NPa0QbBq5&DMnT7ItLh-J$9i4g` zWhb&LoW}xja*7KPD_lb#9kdJ|YiN`Ww->S%4IMmAHw zj?2gdSjax|bBdncDX|9-sBc$wHinp{}Nmg`DUQmI&xnLHMAF{1=8imZ)nnRa|CL_+py zF&qUr@NU*MD&nfe~q)}-- zmq{-t)c%l3wMCweh$=ylP`HFAyC#5Qpq^Qcw#~)}*j8-Bod~dn5ms^EVt!c*4TU)c z?A58jS$lR{-VlDn=|%W~0c40?$1f`Gg)zJ(Us7?82An9lJsayQo~zz2m14<)aAl5{ zzU@TC$GYv|WIGV5G^}a`sFZccvXmx}gb5Rj2)PwNiN7fE^l8iu72xEN6+P!=C8NZ0Xfro#0 zw*kB!yxS+#u>a$~`&<9?xBj#L=a2q_fBN^p5&O^o&p-SppMCG&`|Nw5hyB)P-}~=B z``+(-_Py_a_PyWu>{s9Em<$pW?J9T#&sHZ+#u)cU(gALmg2V%(P;>aj)i9+LKqdfl zS&MrnrRxAFo~uyq*jpXnQjyXE@WrLV&I7+`W>aeSGBL~v!NzF$7`_Srqm^2mz;vSstbLfsFEv$xk4>yf!ig;0}PUj?=qq*t&g-zp-=OYS_ z3!ES^#MS7B6N#0Sn`F+id1g&&^Oqkh*#aZ#Gm1{D`TFPi}s$YE&WwAj+?$9n|s$&KKbCD!I)ML>BO0QD%# z{>Xmyfp?wln{F#$}4;uMp0 z3XQKk?eoZs$^&5~J6ib}3?m1)Sr4idmI#M(|JJQjzouDC0U68c1&VAyZjOt*hyNll zewOyr1#}1K)O4CmP=FEHN#MVHRNL= zRGDf6%eC0kMZbFLA@j5+0Hf#(DqE81CJ||t_SI@&3>Huokp$>nJW&@z01_yI-11e| z5jVTpfCA`^CNGd7j2wt2yp7;5Lt|&>!UsS24!k#ZQAQO+tbs0La@j@QI`^@3q_`#5Z44dpBMtp+mC}fMWP8N!%=I!sH;^Dvg>}-Smp`rn`lq6sI zI{0PEBx=&AIhOeWZ1dqg=@AqdF@Xn9E0 zMX+^rW$g?wV1W>B&f>L4I$I$}j_F7CAK5E^;(YruiRY%yl6=Z5PU$>QJ2n+N6#(*@ zW+3mqlPr}>t!Bg2PVp#Teqx+gSlAlva5jkhi;N(s8{ttAcU22Exn9(ju3#@ zcQ-K8m#pP5FDvE4V^8XIF?(B!9XmZQs7~lF32O|0h+Z*vHoOCJaQ0^4Icw!AVrC(O zP@wh`l9LneOgBR4xAma>3M~-wED6LfccUW!8q%q1aW< zYD$o$qqqbVO51e?LQ>60wYI1k+*)#XqDokHwY=yAsz= zE!Trb2re>&n2`r!pnEEa2~2I%q!7wHo&l&!Kr=*)3cPhdGI*!i8T_hk!aCeDEI5L2jVSn!t z&_7uDDvZI(`!QbZ5#HRqAs;MX#cG%kBGpk}b|Yg`;!M>=0#-jAbLbxCO-CkFzXcIV zI{D6}?Fmb>pnkKaP-{#o+g2QjU5^`nIw|HMW)z`-b`lOUTG*sQ$lVp zgXqj`lw!V&m-F;~e{5)GhFn_xAzIGU_n)ydH0W}Y^Bfk_0&6*DWQ|=~At-7=UGGPt z$9jk|*2X!jC;`wmTfqoz8pbkpBF=Wq*4bK+$ydnaHA~wt12*kDN-z8|A(u#W!5@G- zx`zTxR|jB44{hd!Lmsw-l0^)Zd#jpmN*SPRs;MWQG|+uE-l_o_11EeHm@;8;7ZHN& z90Vr$x|F6!ixYQTvBb3l1C6ZP&U;sD|AoL-sB|1S3Sbii9J9VUeEl0@Lk{1=^J5F1 z>^;8k7m#0-?L9vK^A|yf5=fnZ{=5I_bH_LK*s+jtVXf)Zx-P`Cv?aX~WI~p#Et9Dz zTZ5S1B%lgR9^u)p<7*dy1yayhEWAAECHo3AMXk1U(NRi4uqc9L|3zshXoF-^;sMJw&(UXS!4M&#)nMUhig*N8wR<;+byVlh&S%rJ;3L7=Wr&tR^^KQ4p?usK-w{i z6>4Rx_03SD{!V)(CRXY1EE z9+#4k$9whLQdc)+DRd=s52lELYNPY*v2L!dz|I81(U3Fs<^Ziqu?3?-p8+X9{s zhfB5#xBw&2`}ZKf?bhD9J`9VfP;1&GEMC47z$duALLGf4^8CN@TzO93MXD1u5rW0CNM)5^qLK^Pl+g7k!?5d(Cifa+mB1)xJg3o)Nl(bp#|4KT1t zmjMF{eftwTK`RAKG~B)f7hYq zq@Br1!Kgxk&uXBLnv-0!vRr$yjuUuwp^)=A;^@+&89!MgX#yY{pKOn#aoFZk=PTYkpm$-vFh;)?_z-C9sxQNhopxX05IlV!$1UK`0 zdvpf2tZ9Sr<0Aj-$bEluEo)k;`(e)P+M@x^-^-|x+?{|egXi3t1z@uwp2rFIu3cBC zWuRnn?Ek|iD1|OBV2zC}xaGd8O)*J zLwf)IFVEVyv^F+jE52(W1XN}96y7GA-+LwErRYKj}0KM?HVz$n~+yj^e?Hqsw}ymII^ z{v9|~9iVw2V+9a1_-`3P0*n|K1V8cQ`HvP?XIU%(GYO3gOacYVs8_DcAfLRn>so5z zvULHv2je>AFDGHQ7~=DK7xP20Vi4!;$^q(i6XlB2b(jK`r( z$NLxKk6#R#hdj(|f+>V0{4)CY+(JV4EGj7zz?MSRLPyDhe5}p}ZZnOvcLd0RlA}zE zv_m>}O*H_DMO>Bs1S&~0sNLipM)0c#Q7wa36a;VMQ)VBD2nZnW(s)zIj2DmvZ4iev z?J7!ChSIEn;3;gXaRXQ!F|fsSX=pS>D;oXmi_Wfqrd7=h#!cg;)R~35&U3q)aR$-s zCc3%41Py^s9LWTEzeg*5y72fkGJ8rwCfLIxdpj}A|N8rfh$<;tYdf-DL01kxe2c&$`GL{q?gTg=`c4*R3TWByl zuhD=g9qL`I1ZWmj`IJ~QVD$uR+dac6F+ACI4zJfU5Bhi$vI9Ew8&YZaNRhXLzyXw&>m|y zu22ga%}fcp-1Kc63+MwD#Yp3m=d6{s7stqZ01_QzX%h41GEy4(^2q zv|~XZAv|RZQLi3Km{S2v667eSG-?3ZwF-=~Q_uSayFx%eY?RQSK@XhgFeY8Gno|Ze zQY?!M$l$bfJ_4)EIyFPyYRyJ3TZ$~1XdP)NgVy(sT_7DK8YQXQF>)e=2MgVv*d=%`1R;xPkh;MM`axG3W6bODLHYNK zSDJ!4b#wMi$B^g@Egbi1q6k%FLF`2$tPk%$v-2Q%c6}wz*F;S<%Dj$dx@erg-Ws@0 z0&E4Bryd|OUX&%M{(z^qfK$pWuF;Tm-zM4yiIWvJNyPE60tHy3y16DN1$TQ;f3zs} zkcO#@S~6E!S=2BT#K<6xJ_&Ti$-5#nNC@raG?Jh(_~k0gib*xQwTv=qT0r0^dR+Aj z3btvn6qKnw8-u2u(G3Wkmw>oXe}(#@0&?!so%gU>k6n*tU7

jq)KY^YH#Ns|~av z>sN(1$Di=}aNJ-916H5>2SysEO zhB7bdQiXgP zw0)4enM2YTe?XNY$lKtxf%RHK_~Tvz1jG4zF%-nhtYYtvBpfB$q89F?}0XlO*S_R)uTY!cS=pnX^Y@zTIUh0A-3z&VN1_%CgIpE>0 z4!?xI2d$neoQjDuLhxpr_w`*Z^%?4 zAN_S?WqSX~_6n?5bw7RgGg12?4k9_}+kYHp6HSvXw2$6TV3=uawD1j(975aLcM~2a zE=&$454?X!HQjGo?4mGZVO6o54+>V>!ae=jwjH*A-}b~d+vr05ZV!i9!9-w9`nUfz z`c9*nVA!VRR#}p5n2K*`>Fm1wbKgo#846WMOo2;|9fjWjqBHOQv3&b`iH(DR4z_Ms zZF#?2@tD}W;G4q>#XsI}_1CX)V1qnP_&}^SjxbcO`XDoe;SJQfp=JeEJ0=Oo0GdG# z1BdC@zRX5%5BgDf0^kr5Kuk=9`X-uX+OBQ-84i&opdn1?-de5>5T)Et5jI5$b-GiX zK*kgrAS%u^G#StfN@X17-eo@B))P!T2IwiS$`k2!DJtOxM<_%i0m#qMtwN!ncL}v~ z$O`~VHgl+=pmc=Z%8f74RQOcy3Up9%t!FY#;z?MjQ$t(EF@Lh$-PVz7AVYb7J-kb| zqoo!*aYSBe=3y>;$kwb0=)p*6rb=QyRjceI#F$@yJQJcUQ0nee@ zdS)HRD*O)Q|J%#RH4w~3{pt&l+@8lO5JM&aa9u=ld-v`$KEx(nbVtvkWD#>tE1LPC zSvR0iQ6#v{xq~?FQlRZ1xUw@a%;MqHYy?WkEZTb;iH}nq?kaF>1Hhun}(K2I@7x= zJ=XB+TIdy&XgtwDs5ur3)SdS*lYz4e{Lf`trQpn<8SXnY`kRM*Py|(#DeG#r=57Jy zxSVEz!v3yG${iRI==*m`XoM1nGuanmPd?vRVGN+jzod-?=k)H1u%{rWqaZk&=_Hdh znp$qPAQ8wS5FUiRY0#*1482knIyWfvFF-?FTwpl0 zC!EvJIh7^Q0}0{lQ6f$uV2ipHjZmD`FcTKFxTYSkEhaSAbrmKF4WoFE${s39Kx+Y! zznAWy5L-`~s1&?yQD?cx30g3(yFoB;WkE~jG7NOD=@)8u`VdMy3PnaV9h@#~c>3`E zGiC(11dv*8&W0jTEUU{J$blA}T+rf$aZ-_u06^mWxM%NP6=T;h6ekBhMv!wuPnITuhGVm^nq++icGy~<*zs;&!Os%VCAS5fU-apK ztU_0xBKEj(u%&5Zuy3F3cuA;CE8LiIX!E(cxkKlG7Scd;h3q$j?CmOYutstTnu#os z|15UXKDJr_Suc=-DhmV=t(rv5Rkx?VC2kOW^VhhjS=9{xs6cMzTg1@4);zy4F%-XdLUb!|~QR z=fuDeCoLGf5EMkkkfM77dX1B2NNMSTEsV=uf5|6FytF~4fR|tktyK|se2i&0oJXE9 zO6v6H`tCAiY)}e%UVeejz`KVWqd0v(%9nA-;l%K(yQ8km$4(3~ChLUitumeFJ+kMf zRlY+%1y{7`oGwG3@yY>)RDwpkBB3>&KvPL@$ly|=8G&VKvesxtWhB(fPdYJkQ#O18 z&HmYHCli#I%CeFW$)*yhvwQ$fcxDPBvoWd07=h#*JC}j8hR)+aleYVZ+~sr51PWig zM01b21$=9u{bq`fR$HLS&&@5rzC;~+q*XVf2)<#=KXk|+=i(n3^e_gbF1Im0wAPoz zK{xygSJJuaTmpU`1hbH^?_7lz&P7KcVGe07lvC~ddev2+2*B4d)!Ni8bz>8`>1IB=JwvF-f z9RmO+nj7Ant@0VSLO3^POlMFVgd!&7-MY1}Do7G8C6uOS>rxA~Cy#B!(Gi+#aK4?$ zdFGX_puDLÐKZgA5t;dQKrN9Wdi)thnLLk8c9g(Ht6ixc1Sh+{}^BXz#j}BZO{9 z=)stFIWTa5-L?y4sEdia-J1aLFhf!Nl>o4b3a}8~E}&HHxqA@K1w)NoWE+qqgM@Vq z2Koi=-qHR2$UKB$I22gIFlQF~Xn->z4)_GH5J&BV9vpyz*CnC4Eul1-!P7HjLDFx1 z*XMaJKFsGXLhTH7xS8SGRzOstOSai3V*s-btnVf9;Q&BG&8sg!@OzF~4`bw@{D#ou z%-n5;`NNoXOk8w@{8%up5194(vGVzri7quV?8vnu`5v;(A@vvX=*YmLXfmh+xjy`r z<6qD6fAIid0cr;g7zJLNDNY@_#HXt)nz&i>tPUC_3ySL_w?=dxGL^e5`C*L_v&&=! zPg)S!qoF`q6uV8b#5K7FBkH2yeOR+8ZRLx)XooT6uzX2fw4*oIH&HFCJEYVbpbP;Z1W&V= zhS{CODj0~kw%{<{W3r!Iq9C;Bd^GVejzn64rtUGQBtGe_;w*Dzf@s=6YRQ@=!d)!K z6-bjER0;e6Z0LY!7B5Y1w zh-iH}i~?pRFQs$n`+-YhK%xMHzuU3{D4Re1hG5_UOo(fM#-i5y_lp z1&r0UiGEPaFk%e5+>h~c%dnLbudkmr$vsY(M6T8hqO#MBPCdwdJMkWC3xp<|q}Qxe6d29A#Etf@6_ZNK8X}M(kdG=qo;?Wdm|M$d|a~fm0p| zXcFxiv+HXJ07EJ7{oH22+I(?k8jPX)WEqtv&j&?|SM0jxHvqry|XO9XC3-7RR>>k8T+vjLRq=Xz>ME z`^)NXXD&tz1CH^BT+gEc&iFYvfOl8dUoKKr4^ z@(98Av0#t~2W7hh?jAK;MMhafKvNj>w*Zo{u?5;G%ozfc-2l>O$kU^%C3+K;aVkZT zM~&~6-GT_r2K#D62w7%9X85)QEg&`H~9h2o%PW-Wp1Gpnwkq(mXEx%+K6U*f7- zQgG@!x9};Md4rtTMy@!DVZ<%=E#VjCA4l3nC8E`f%=povf8PIAb;V zrk6mTdZrIn1K37r)6D}eS*yEga4TBE#4!LPNA42pa-chmXlGbBFbpDemOxR656dVP zGORqy-YrbFm<8=$r-AkvWu{&^WZ45-I^dibfI{+XZt=BLkA!vJA}Zl3Kj zqyut*C5ej;>L$JrsYh=o1DIm!Ry36cUkygB zTQjo_x3m**)uA7ja3Y2lbKP_muyV4q^prOW0&{_mexFN?zFU5joc29*c@1Ik0tR_Vbw*>K_5EBB;;cd`ReWdSaGYwrrx0uTg#AruZ^LSatms}yY*OtJdDE0@ql6|D>tS4q;K>G=lJc$O=e?1I7`T3SzX zG&Nd-mQ+|;?sjlMiUTMrdQr(8;-F=fy{LeSq!qZ!2w-q7K4cj_oJB;w3Wyr^w1eJJ zt^i87tb`;~Qz63OdVm-Wl%dKR0*}g|0I?v=noE$>%M2KN)OPf#4i4C;SU}c_&|{}A ztU(!nM(^5F#|jO+(LJ$*ZqqY=B8B@vhoz^HgKi@L0Z12mR_Zc}upfwFifYFVNQfoU`39X@^q0+9aY`JlzZGb0Z( z!>3qPRzs%RWl<&zny*nb!Q+EAzOHgHK#LC-(x!5NvgHVx>SsLZL zh#r(a0ZYCpPcsreCw#uFYJUpRgE4qqf2d^t_}yn5z9`@`K=?Zst|V+8GM28I^06NvbzRi6G69lkX1AgghbcZzT!<(_NQFZCgA;5+mLA=L z(Zjsobng=U%X^~5nXwIQ>ehZ$f+|*74{V(E-~~N4kinwB3qq;^EhMmdV2IHnd^ZW#5C+AT*qh!rQCjxKvL2L!4dH0_b`7~fE5-AIO$5p@gF*lE zv29}yuUs>CNpqmdkk}YjAf`s(G)H?vy<}$flow5nMiZ#LLbhM_e4$-= z%K0g?zCe8sj^4Agujq)SsnVp)xX#9cwg6B#K%a-Uio_6nFyyIg%c~gzK#*_B_U8+V zn8012VMhf<7%GzJfz$bM-SYg$=?B1wp%Nr6fj^BDeQe5?!JonyP}}*E;7>f%3)uZi zqb-7lk$UeUp-t(%)!BY>1;VlR*z%{mF5DjvUKeP1aB}Y7@o2=ePM49n*9zd(h*d_7 z5KTe8847!dOov#G3-ApB$EraOYOj+K4QC>3_9sr)_JN<2bvT*#J{O086S=($J7wSH zdgjZZyQ<4e$g$<5@MtUqOQh=M@XF1=7ITO5Io>7DV6#&4uPoZ-U z8O0KsX^ns=s9lsZIl7;uLpFp~e#aTbfW-&tg_KE+#!SGWHA3ez)HQqxs+=_(cp_00 z{R&?qH;zri?(D096+Xk5=vX)zKAeWacu`s$cp z(WbYC5<9v8ox0kGnoU(5qa63_YS&{QvX(36O)LXHVq$WnAR zfhXr11N7G3T*O@p_aXz2aY(@r2eZ++r8c{{ZI}Qo;^tkyOIM#P5Cdg37gc>(2J2`e zJ>H2JMh*8&f-l7mAdeNHzVoz9{IC-dT!8;Ot7WnlEL8N#TBk%t*hEJ=5wi~~mqWN5 z3SfZ6swIY~#!RM!mU>E3Q;5fqn7v{Oow|=WUELQ-5KAPv1n)Jf%-9z|lENU0*>| zSr1TpSSFzhTS1i>;4xExr_vW_A(Ga$Ko*<;-LpVub~q7opyOTDPOX`gBo&}KU>z?d zOHStx0VU3*52EMa`S?ECzrMW^AdGW2oMK^Ii3=yV&?GA`|gdTEe!_*sU8d@|k;p ziq{PwvKP4r=t|s76(;&68#A=d1(1C6)?(!7cb)W$Hzq@D7oBAWX#(8<#J&=Y^JrN! z2hp<5K>rcOp0|*+mJUe?h{AYdscSoLLBiS1Jkc3g&$t^4usKoMhWrJ*Vo0)|x&S=y z3x!s3JNkb>zb*q)6NGRKQjCD2bQME0j2I0S1hqh+a51+GPDoKL(^M3b*v&CQZUgP> z(Gg>8T$`{M%J7*icNNVA0QC^LxVcD|3wOEa|MBbI70%Gp@W+RJQf-W^8l@&B0^+_rUQ{BpmeT7401C%gA6^~ z?EPHTC{A+}8`a9YKZ zCI3Hb@77yOn%?)_>r+sg07>9!ZnEk41g*t zw~_42*7(z9M42;X0hiHH^%_^^=mKX8)YYH%{^fYZ5!>%x#UNFBOgvQt)!0 zlm$*+V;L}FRJ+u9J`~8ds}PYVt2hS^rn6=W;U`HYR66+~8EVn&3GYw)r^RmFqBW<>IVh{GPXjnb%8S(fGjt)L-`C+uoR0!cWu44QFhkry5%!>~XG|eJSu<|_m zRXIb3Dk2%1Aei9nkRzB8KiX+~hX@tm($MOjR8a}2Hkv*nn;8%9z5N}3sH9k&t(6@3 zq^W=t5cl2703JZGG$k9pngqnlk=M*wWCmdYi8|y7dm$2RYb95G3O*Q<{_ABh6ufk; zrovJ4{7z1CR=};kV^6*hEm5?VRKxE-sAjAc??vPhBIzSy;yG z?%Bi?FAA0RhOG?%0)QBxHz7#;>~)V!>^9()3)|9eRoAj((}onV^V(FcCC28HR(R*t zMGKiVuMO&CCnaA)ZF;%u)yo~5)QfhFrI^y%q&2zaZ1X>y1rst|2V3)IQUu zhY|6eeJDhZKv;Y-&gVkW&@?sze`}taq8f4=&I!>DjbE-d@>96f5HEsS0;!MyUs(2@ zB+n<08vbkQ*cc#SIj>1z=~Wu-?NmK>(;iV1SOD%Ymt4Q5?F=!{Y4P>#E9&8Hu!bV^ zXNuz`b+BrJONNepT&?MHMb-Im$0-h8h@ZMH^(dw|EBh=3rOe5@cdBv!v#r+Lcr+Z^ zw-0GRD^-FL1oqN{&m?>a>~%?=Irydi;?JREFxdY`e{p(vwUDCD*? zM&uoL|3d-0G5lv>%JAR%Mk}@c&Fjsfo(7~xSx|Bq<>O{b%&p(_T>&1~rn}%_7j;!^kxRV4Vy#1aCB4yAfz%MVb29WAgR?z` zn^9NT!GuVzi7fWYWNs}p?#wI~dmIVt^{leeVFH3v3<3FmyE9&fQL5>Z$C>3L*Kp+p z!tl8o4tvZCSi_%0LKbF)1Ds^s>&tj-#niOLQaE#~`UMjIz&JBY#zwEv#YJ2@cbW<289B8GPL9i}1E3C6>orroMO%Ogw`+d#-`s_iNIN=ihy{ zp}8Tz;(W%C50NeGtT4e1!}2B@2tFWCezJ062?(gifJlYddRjUscaYt#cXl@H8X^F= z=bBVN(^9z%$-oi(aQ865y-YSW=IST7Hn^EiB2SU>kx^Amb%XDu2#`C0mFt#jIdq8F z9JJk4!k;^4W0QLP8jB+~cV9}O+uM8fC_sHmVc1Y+FJ);3 z{zY}b>okl*86%Fu)s8@hNCvK)Taq}E#zjd`Sjxr(&LWO{Vc5_sB4*Hvg~O!AP-G#m z;f^bv@9PDX8#MO7rpC;%VimP-?aL5H*Cl_GvhU;>g#mV*P&>;R=g)eucgoRY0i2IfiZc0E1`>?n4 zt#YfVBa@{bTqZV2$0l9wtj69*Qq=GWg4q@k$%xFra^$jWLO1!@7lG=3SlvM*za!Whv6l*5W#%fFwyT{h0$*nf)EL;tVor20U zZPh~SoI~GVMj)}$IBYK@kQ|{w&$iydN(NxlM-EAMP~k!aPL_7_rchqD?NSXs=Z@F4 zOB;&}*%>-c*Y53JSYv`$7#4D?VH1RSoZ&Zx3}fesIm5Z2#}}n6#%^`LX%0hzlLGPX z9=6$q1tP-yE}cIKV;{H%oQ2PXu~Ri`pTL*}V#9N|I|XFNPX z*6!sT7nSK`9ZtnaYn$)XB5zDn%7JekjjUoL`y$BWDi&kgL3>$(Se1@j_&KYq8+2~G znV3VyG?PoxsqE7z#DI1a&ohY8T%FjN>a|D!{yP~)Wwd1}fV(gNyM1@bMmLBVcl1wm zDCZhXFF%u!v|jPNyYpoSU1y1{?h=%f9dHf|&Iuk&o1_bn_wJP)t=86(;d*;|q+y`dy5lqiW&6bcamNs1K@HJ> z$$mX_1NvGRa@cKlOG_7HGma$tVKgQZgI|nd-fL*z>)NGMm zfspj=y-=5E+WM(+D?7Yw^V#2#*UO|@y|(`xUEoYT4A4v#x4i@KJ6+c>m!o#7dv|cz za*$R<@uMN!%2a-|g!X$U83OwG)od4!BH+}j#Q7|ckl^N8wSG}2b(18aZ3l2=Yunvq zT~w9Z%^1t4&Nz@LQHmM@^OA3Q(%jvkVY@J1Z6M%SQ&%6()z>PYqnr2qJQOew8{!#Q zc4vy8qYIq5`)YNS6ZT8M?g<-+7jkiOe{Nz7SEx^`IA%c79TX%lx7HvY%!1_4}Fu+0|IcnKD zDkQwaa}sDpo1fNJ9^OCnQ2F70pt@%mMO6_!ebD3tHBXt5GID{v>_O2Yd z&P@|vu0nd(;VNfdm6zXt=1qBkcGcNRISR6*Y{JZ)3q&3eJ&#5u6i7@rHrlm`>-sWxRsb^Lc;{~&KfOH%XZLyM#PkN#NhPGe1#p)E) z*YG8XD0D}(VnTXJgq6N9N#qme)UV0yft(X}vu||={5}@+%!-09uWOy+OVM^)xYZp^ zD=7FH9Q~oq*&Wvm0)u=(yLU-XbhNNv>JWGVB=m&e3WOAd&z#JQ4;bh`?P?fTgL+K_Eh61*A0Q0gJ~MZ;+0P3Bg}Z=0P; z)y10Is?!*uZiITY$xK;75Z;GPf(q+{aLX*sP{I{c*aU7Y^B@Q@S6ckAY8hOsK5Vl( zhgLwIaNq$U)abBty?HQo4T9LV#!ADm^>t#2{S-%>Er1Nnl#ZN8+KXPl)>1s71F`L4 zv-u{zU15*~VT~E{)~>$tJu``<$u5i%B8yc zFDIS0$F!=NyhThDeE8kTVTcGCq^+qt)Wn;VrC8OIT35Q)BmwqZNlPVAZSIlOP6U<; zaoM0#3cNzzy+fS#WtDkf4xSD~NlF$JLNv(yxY?SjWj7|PM~P^NBiWkqE(Id`D4%Lr z=9nq-K3wC>+3@_k&%ERu4A%AS zc|Rw_FvfaFIu^~gwawCCg|1$ee#kAgA?S+ExaUNc&!Hr*leQU>mp#3Inm zv7YdJ<;76~e3xbGLflXsgVwiJvyfEhK#Q!)Nz60Ixwltn_M~;JEyBvpJEES$%uA_h z6@YsSeo`9v=6OKKyvCf+AHzo3kV&dSPf6NNc9xO~z4d+-O!si57k%4NAsqr}m4DdZ+}P&uch7CE z-aY?k=ZfPy_|-rBm8k2g_RDwgKoi#F+v2(`u+1>qqk5ef%#EU-|O0zN;R-i`z*m ztRK!lg7`!XYWBNL#^c!g#QzmwS3`dIyIX1oWDjEH^x{<`m#^6d=b|S3_%nanq2bb-{Su8E$iRE||1iUXDV1s>;C4kkVG z91TR`bxlpmqF6+NH&+m+i>-<#60HRcoi=5g%;Q|lJG@ypEupmqUBRpe@BJJuW|QT_sEb9hZb3ulPM$@Gd*$uc{_kMO!dOXV+|mHA@*@D`ysu? zhX;D({egH%UqhBWcY;Jii@gDO6dCE6H4f9vt&;WJ$4QNgE0Zi+MFsxbj%>gSBbn|Q zB?DYRrx`dltW*phJA2CE6YmY`vs`wg=RlY{trdW)+bAnqZ>hqY zY2Ks7!BH^G9ayj)rR9%rprWO@9pH5xpnpl`sI2j@4ADhWD+qFp9f4wT~{!{b!M98Y9xpR>t~D$eDuGM znIo9-^B$4^!A^Sc?Za#jKg!gNtn8zcw~8{bSV;o57U@11r`om z;&7*tSa(=j+9#x8^+73NH}62)SaV{NRAcu;lmn7Z;+oQuJcHeOw2#$N+FR3b2MbfH z&u8w9!1O)qB2fkNIHj@Iil{&kJ-e8`0v2hK@=4Do7csro^j?Ru^Ma zZVxm7aPXwQ=zV66MiclPsN2UF5B2EN)B53btAV!;bfRBso3^?x;sIM;r=XWY;UhN{w+QT)@EW_vDeP#z#U)tjO{_arM zJ(UjCb6pNjB=`qYMN|1+<^5{T=B>84hg2xHL4w1-Ar}e;$12llV&|&9>U#bqBX$Hi z)vD_uQs8vgj7Auz#J^G#OwimkdLz%(oUDE5!3QMnl9^dX?lcCVK2qBfS=|PH63xL> zR8Xp#aF!_XiWpgBI~-IDM`JU|fb|xu?FRzj)2!En9~#O+#G!5QBUEel0{-8lEJ(sA zNF-@j8=`h9Y#7PDj;CU$Csdl%8Wp(EDu7X&BsNmvR9ft=!6kFGiXO8fA6u92XTSK% z^)BLU7k~TeC#K*YL16#KKgXHw|2w^lYN#t90q8|_;iXv>Lyd^KD3An*LMH17w9QF% z=YWcxv^11P9nW>h%$ISL`ds^)&fsKTg1TI5Aj<43g%+Cvzu z!*I&-brIW57HAQg6xS?>0=f*uvO3P6l2~1vi4D1Co+MGGiYi|aPEeOy(-@3>h$7)I zyI{ZR$EHa^&X-G0{=<&3L6GDcrWg*_ICJZtfA^XF2JIGTuR41sV(ynP32h)u_;C?1 zwN)`Ced64;jzMIX78;U zfDcoECcqPPv}J(B=rpW}5F_>-Euh*FGUr01*S=J)k48LBD)`lvh+Er*`!$8C#?S`J zV?>mDow`Gw4dkh8J@q!U%f;1WF~bfIyHf~% z*nI5RU+BliR+~ItA3);d^x;cVLeJ96?s+@(U6S{YX#1-F=N!1ZW}bAeM)%h%tBSgSWbUrVvRp{03IK(9s?Q@>I#m zt&)Iar-cMVj;2+4ARI_KW|WZq97CxvF|4U&g2;_k#MT}(>(E`H8-EBKr!?N^bt&8K zrUD(rn8QcEh8ZI_PQirq<1xh_--a18wzjc4dT?ty@Bi_L8Nb9rzXFJ4e}BMF zVT4?@jrR=BZLKdTSo3QLM*PPW#e4VrUzlIBeqFo2qyFgk|K9K$MfscMZ%*I)f%vMp z`PFyvb!$KW;^Fq)cYgP)-}oc?)v?8 zQmrWR3wZh?Tz$3v^}W?MwZS~S&#~!mR}nq6^=d|qs(uZw$ba!~TCA-R)>EW=xKDrk zYGApbo^;Bq?~-#7XDBCs@t5t@t+;Xs0>CplaF&PQ3kv)n(;VYYef4AXl>8$?RIIAY ztN+5X{>wEKfDg=P-~%JRUlDiY_Uq-HU%vXzhl|p-gt#TwoI`eB1#{L}Tv z@zMEUp2_%({5I<~4zU~jI8N>LXY$|qu@Tnu&ibY6Z_I~jI`hTTueE-cln+0J{==d7 zXt^3+#|K+I0$_=K5fEYfVD>Anv9Ub6{2}d$0-JU(Hw*O$F*L$O3tGyi>0sl!^JJJJ z)qd(`LD2vO?wJM*xOCWQkmteJ&FSbMb1TDC?McbuxHtrwv|GxYV0JRu%N!ASGFrNv zqtX%R1oLpU>?I^4V~Q-Ayp42>rkl){WmB&K=RQKB37>fAL&_;l2x#wzTfX+la%i9K z#m&bRz1!&a6JG3Wkcal+HNfwA>G-ih$4EIqTJdm=Pd}IdW2|cm=W|aQ$b?j+nF_-i zV9{kkKlOo(#;LY=s%{+^s=LS!kjz6*Go|engY^VZ!jDUSTHzzELAb_FxAr;C}3@wAxKt?o;-i zX2pw6UJJKf`q&?mvBa8xwUwMpQ&n6us0z0ODs@Wyllx(8lw3xHWz<_+=l>Ww2@#)t zU_-F7D+;p#>ZN~>)E&JR^4a+>|va8{W)?)UX+L@LarsHM246<{Bko?+196j0aLaEOn zbl4s+>INk8ICd*2lSidu$ow!6)3~g|AxRu_$6R`Sqa%nl$b!MzW@f@cP~dkf`hmFa zO-*&PK%k#;5+|D)gXo;3@G;cPVtK?zugV&hN~YTFMIgn}r)nAD5nk}B93ebU$AwML z9*)Y+4xHJOw*^;0lZ&oXBS{&=Wt85q6fey`Jw$zN2;7(T3t*bpg%*HW>l;uVyC$D_ z8t0e)fi5*W*V;yg=rTeMN zS3lx{hKlQe=zr5=o@)ei^BT1Gyhu2+TI?^y4Zid0>z*N{U0Yp?|gXQ`T+pBGHi)jO51ZWH=(jdv8$>446^`9m|-RtCANJC^( zUtzay(VupgVk_{4Wdm zCq$3cfvuW97kSDFfpNc%R5$dG6Wm9sSF~8fxQ|QYWhgjiX7`|ru1yN!;^RG9;rGiy z9lMdeFkv?>w%0qa!{plWtjb2YbtsmpU8KoL*xsw}^LljrAcXaGSon|d|BtSI^4+h# zeQ-P@2CL5rV$0X)t+i7!gxzFjh+AT`PPTb>EYd89lDu_*N^t|E-9U5Sx@xr{p54`E zhpq+fr3E>$kch=M3q_UtKJ_6L*)YNZKwwj~UtNpo+%$oghj*e~?y$zVD=K$Aa! zn8MoE9bu_?)%W{KX=M3_s*Mw;K=n1__W*<|tz{n*KmFT3`Rl*>2k(FSr~m2S{txf}_#eLi`p@2f{g2=O@lXHe zAO7jz{K?<{k6-_f@7MqRllNc$i}zpuukU}#7yN_uzyADxd;j%c{muXUzpVfG_22&w z|AN2&>EHabKmYgt5C8t>|Nf8v<@>Ms!~gXDYyR+0-+%qj-+#?F{u93OWOoWmMf$QY zTG;Rn-Y;@i6TWDogCR$RMmqPEGlLTjv{r5)&g?Do_;(kr*BN1f|+r zZ23EqiEeFqe@PKKkl9eA!SjCQ%|_bmHR4hpuJPL5^tx#b3j3RDbw1JD$F`|gK7zwC zvSU~nq_eh@DshZmY~suNaEgX$%)JB|O`m3I%OK79>bUsRfi@wcZoY62S0v+ zlXl~d4i9_Oiv*H(55_%`IPuW`-`oNVon`#_ft8UUWW&6VQ@gV|CVKrWZG_HotqLwdz_ z61jwPQ4xk6x|4(fDTeeTssRU<<%~f<9}r<4j{P`n_WvcB&~@Xs`0B~fofJMRb%v;H z>n__jz$?~84zz>i7MP74+$O&H{_hq=M8X4~<+b&6$^MTw3E|SN<>K_*@)&%~jCl3L% zw+hH!qycS2FiKnANy(Ry%XUav`_9$x#oYtZl|R&)?~wNq@)v{vR~{Z~ndSe%1cn-yVK^V?)j32Rvhkzlr<3NwofkAA=+E zlrX-=+B~CT+u|kn4>R}995|A%y=#}$mDo_I>)V@`%+jMdr~!smxGb02$5E*9+X>To zT(#l>L*RPCu^->k5-g%9G3``_5$Ddn;v1ROPG%=o5lmPb)OezGnZ4^67$=^e;+eQ5 z-E7!L2I@&{44}OTyU2U#3Sei$1~)r%sG$ERl$J?l&M6&&Ora|OQXyZ>DG z@Dsye;rnIZATQWa6|4dF&)#IsvitA8xzJ*r>%|3Ay@wg#I+@)0xq?dS+0V7aqn6=? zoj0rFth24xU`P`r*scyG)^-3LRlLMy9+hL>$9dYjnoK2Barj>Yl)D&h?Wp^t7lc@b zFO?zFH%nCOl?2$m;F>w5(_57lYhrn0< zkYX}HtR7nVDZwRnA@Xx)%>Pzc$RhYp&1Q>=8;TBV=$QQgq2M_9VGxIk^lED}*Q*2-4b;zF!Y2}OA{IP8XA97{xFiM?_A z$cM z%vd}WOrHWvOcNHeVB8s27HV*S*+RO(%=Q(w6mI7x@y@yTR$G@o3IX}2gRR83e`PcN z7lAUC<9BE0#6S{mnbw}|wfN5Xv98xaapVhYg>S<#kv6VdLNN+lW5raJ`1){|J5 z0~3%S>|ilJBK&~RyL)i=jG|*q!B@Kd(Ud3184KHLXo@(umV~@iw9_?uXB&njG_^cd z^al8qSp8fkUOCs)fm>A6Nx8Na8GvIgmSMzily|J$Jh+(%GUd6*FKi&B#EakJuCcXK zNcd0>*Ldw!z2x8~sp{tbP@-+`6u$!*2g-?a^>+zQCZD2rV=~FfmVniMP?lDJvMUP( z>igl}HvIR&iLan>Lb5s8P28lgT`#ABP#x|Lqsj89XL?6ZTqAXX0Y?;56o*+eh#~0R z7FEdBBZ^i}=zs{kwikZiTM6urfwyOCH@SoJetWkfTp!x7{b|lTrISd}mwo(V{&5o&2ZFTC4h2=a zfnc^IR0Fdc%g?@!`?$|{qkU`F0+MH3bGokGGHQ{d^e|DD4!V@p6hO-uq^k|x$@ zTZCk_A@R(kO#}i%tj$KhpA{zX%!0C57?hl{#jU(U2DkyI*bD(lnZFF^B`AFMFFS&c z<~mQ)+chsJN((&9Upa-2E^rpuC+J7ry^Q;M8Vot!kRTe7=*o~d1|Upo>~T#xU9uP` zBk{*V(M}END0ti(W4h`=;% zIq|8q#<4!(;{uHGQ#Xb~Wjtj9QV;&msZYpfLGIiV?2CumaUPnbwzAVbBNT8R8#F+J zbPE~SHX-gw+4`PR>>32x+IZp^O7K$Yxb*UnEFDGlw)l z`kV*So(-+4d%M-8Z(`&I$1AExqskFS6m-6gcR}OY~QGKAnpHO>9dJ0HWB^z;&q!^&U zAu+7Sjdf#ZRY)sfWIswZaS_)2fwy=()?R-@1UDCr6Re1BDt!Y5<_SK0_8W5iVeZpj zM1pzx{xdrl&T)*j#aSARameQxbr=%1%!^`& zTeS*`O=aZ`LI-xO$q{ul$8EHE-j)#@(b_0LlPq_VmLYq|Z?ZMkP$+JA+84dgJzf%d z@s_d1R*4+?D4H%BYg|$yGuBY*?2~rzxSdIzt&P){#9@hWK*t+3L|Qtd>f$iVvE`01co6tjo36R&xP3gW)QDyel0)kC-q4{I*j)dsQy$&W z(`Iz!|A8OGE$NVFr-4yDi*CUc%#&De0SVTH!;rrefr|vJC6v{W$$*P}kmCuR0446J zg*B;}y~8aMb{bbA7beiN)%82YOBPqlLOLNY&Pns{LCMTOQi8@3IHWoP=4^~^t_wmb zut2l0l=7-9CnPi@#R{LRX!9888!{Vs3_jM6g{+4N-(@C(n>ZC#)oUK`C7UEc9hjFa zmst=W)%eeLNiLIh0uTB1c-SY_9$nzfHi=q|RLUdo!KWr+!O13W7SB~KqHx%OI$srj z?MagmRsmRQGLr0<&N4vzjj6)UEs{!r@Y1= z<0z;1>F!&)YdCXS7|Ur@lhNo{Fw%y@tn-CAk(o!21`MeMTI`Z2cZqMmExp|Ln1zwy zlU0tx@{%riq7I~;Ki>V30q)7OG~nn}s^R&TE}m;uu;2y|O1F>KVwx@keLT6O#B-Jy z`kEdn%baBtmBf$Xx71bI_HLv-A9*mTm-I6B@!_j%fu_8GT>9R~|W$(Lg^TQFK2 z^v1pjqj?=hQ)@=!ojfoww_!8@u@xpeNWGAy4e?d_;jh!%FC}I>z!FYWR{%I;-=u&X zrI!7SA#}JnVJM;zgY}_fMr7)CJ8{S(;(7&gXwvHi8F$}CL@Cspf_eek!MNj+c?WR? zHM+{@YL{;SxHmO<;)iRT*~OmsJ!cltA@4w|v;8&}PPEo}NTx+nBbcz-2m{DQFX9c%?UukpnkJQee0mP=O7nSZ3$RiL5$^Mbw$&oR&$A=2|Uv1u)3Vs zI*U9Ev%H;W!qUhW(>Up>?a@kV=S|zk_<({W%s$}^aE+RFOO>GX9Z7FmJyP2Z+R$?= zO*5xV(Gz@Jmi+NeXy`zuVUv@~#5aEWqh_U%Ykod+1;cYv4DhA&XAAV9z^+_R8H4w9 zK_+A;ejW`lSOQ`gGM}kbuKbJ5%N+yJvZj2^|RFHoy1Y601JC7-I zs1s$eiF-ryTwO9Qf+H5UmEH7)J$0j~Uod}P+54a6NsE=0Eallnji*wNcd8E!jYJJ3 z>QPF=S9G;Ulz<9_Xlo41gqPH<*A9tvS8IapMg7*mINYHUg+}tU4mE_3A>eOyIx~eb zV??ldQjr!nJ8sov-iTK+y}HFN?oA^BFYyoZ3j-q5c*9h5VUiWY?uvo_Cn_RG)_47Am=|eGk8?arY6~^UG#9NL ztP~U9g4DbjMrAV65l4ZLqfYR`7~Oueq9iD&nvn;Mg}Y#E3w$yPYg6 z#~&1I-fIbm`)mdXdUOjt;^%6}kiCRg6%uJ$mrY+W-x8`vY~~;$mlbBEGIP3R%z_3v z=GLZ@%yw&tV+ryazc5%AQEU%(xQQg#tE=kmMMS2a11lZgVg2QsJwAOW9pWebJ zPUD4@#9?%Ay+T=&v~gGxCIYLLz#?{VVa{degt4|kX~R*Q)6!$qMdXb&ixM}g9VEd` zf~l;Swo!7ps8w97?UGRMW*3qQry~>y8&p`2Bf;%P#CdgSBCo_UJQ5x`SH-OBu~Q;m z4TKy5zI@cAfru^pU{}`-#VWYw-j(I4=cyq=18rLnj#KV7s#>DLSz{U#`1?VsqG0r? zO_5S|4VL7cy8uS<95#S`^tEzI!!E@}&dqg&h!Q{HZZBBD4E9W*_TjVf;U2mE-y?k| zzWSwa9q3^{ET2eQC+~gWS7o@8f`=vPt!j$dNll&z4QLHGqKNEr@u;-51z;?Dzem`< z_1>e7NGPN@n)QqXT?DdhvmxI)Y#A?$l^A^4)^XS(KQVy7v^HeT?XBaj?>!Q-8yc9i z*uTO0&`olIMM7m6tk#>Eh^nTxK__DbK%-1a3h+kIl_C~19uLqr#!RiWl}H@UfjfT3DZU;pC#EG*_y!V1vPpF;SHgn>rX?J zE*-A*8)cQw3M6?;Z0o5edO92s!FgoH@e3Ruwj|YomD=_t*nba{3|M*@GJb>_Jck{) zjH-jr2RpzJyS=8{{)1G@G!Z=~IvJo5B`Jj{;Q*?dbc&C6+vg3@-P@0`Sduo;=>po| zn68Rqrdci#WBb){U7LtJu+y_XENidHnb?&&?)xc3j@1i2XutPjB=jNN1#D!Jx>pf? z7^cSWY;(aXUFZJLOT1^h6yBg7orNs+Ew@dSpICX|=kc+%*&WsmBGQ&lF(LW*#r*Nb zS--jDo;^dZp*piF`9H@&by%?0-C55Dc3Z^Kz8hdNjlD^&8OKI#gi5A^($P&E7_o;U z9(%7&>Z~1?cCQce143{oQYPXl6~W;7v#RW}+-#U^%sp^~*v!yAo2k^n1 z{Nv8oIn0#`&oewOdNOy(I0s~-HG+CF#X%!Bt7B}T;%dZzxh))ks@jf=%u6?|Jmme& zJysr<@qb1!A#!|!_rXRWozIv5j zHAB}@1mlCedForUS4XRbM6@8a#cnCJRXVom_9h~7yjy*5#$ayaNe_%?Cs!=G3fFh6 z?}<_7eN;KRVuw{78C&^9NJNVu&hU8!zdxctc}+w)eo=vnV&=$D#;D_@4y%m_IiCr@u$m@ z^oiSRd!5T|lrJ^;SQrKZ7W-XC929gQYstOMUxXAp;^3?Ytl*{phXe7}@}qIlMr1>0 z&w{=$#SJEy?14FSmCn%&pk&CP!u)Y;-DRi5TM*jMKBQ_gK?x^x@+NBx9wK`WyBVU= zP1U28avMYc&JTX@15AMb(O(?@-)a3nfuj;05|@5b*^~iTD@Pub^NKAiP(ekKYUM|$ zM5jS?yx_q$$uuFZHYfTv!se=(in0hs(*)IFh;+=CSTo3YK`t9$G^iRsQD|#7v&Yj~ z_wmoM6Wqf1KF|G$B%wSQJ%wHEMv|Dlti+HdW_{IQsn|j$TKdZ{T(_U4%S026}aoIoh$c84z`kEttHJPP{l#l{(M4Kd;P6 zPDfQe*GLPN-33qj^-&eDDj_!3Vb5oof&IZgoQqESZCE6V`1cN5K-Y^V?Tjtzwu+9Q ze$NwFA~Z1h<4JAaL2GX8nnQww`803qc43!Kie_`Ih&xzvbl;bpnpP)m2at&0{sViP zKfbN>wmP`vc+OvN2VXD9AV}O@Tly^9Bn)|zI+E8iYOZ1m25IRU%1K%PQxbkL)6eK3 zh;&aq072R!ikA zyfBv03cDcAaKQVNCmnWP+8wU(>ANBfGSID!cr=c{m+aa&nE^|rk*ElR}{OGRoz}oT> z^L^wlQ9wdo$g3rLmx_c}aJEWhPK}$BF3@N}+~}shBhEA)#SA@fu;CHnn` z`sjJ~Cu%W3XL83szBju?I7+WD*Q@iv$1moO+uO-GK!%~9_i&jr@7d7>&Wy0}?7F|! z;-yylsd|Jm&zt2EpObbf9(c_H!a0)kr10^5V2B*J-q&eeG}$!iV05C+R~BPXYVXui zq)m9OVtk$oiJ<8XV=4&AeZ#HC~32{?qRMQh92H=eW#gx%+bMu`Li> z`}S|$+%RJ1!YG}NyzxEX>F~pI2Ml@a1&AYywAUA>8Yf5wjdbf?*WJ=tzSmf?X#Hs( z5|S{ZEK*F?MkfxkidkF{Rhfx5p8_PclyH$@P2^2V$e;Kox$d}WOna{cRsChgw2K?G zq$IE78*)fr`i31*`(59I)&=!=uoKTpTV=W$FD|e;YaVKqskJ*)UC<2^D*ZtZNJzQt z#`ZR-sUTTn5LxZUgEdXcnk=lr_71X~nPdR>z&qVz3~WowRxwilPINF^VLWDalf?_# z0mA6&A$G9_*YG`EM~_6;pq<$0ntUR4M;zsFxvd=Hv!^yEg|ge43~G6QZ!s9U#F2hr zFLG4>e2-Zhu{Z89X*kXkG+;pAjgM~nxi17HEsvO0@*ldzArbb_jUD6FcMy9}C0w*U z(hbFvOb$Ba%c`v>CiV-IqQm92Ib7w;`aED9Pv3uLb0i)a4T^PEkLnH!^%MlBL714h zJR%FSrKw$!kSeu~;~82er6N;=l=nsw2R`oT*7{EJYMG&L@ru+j5|*+i*~Hmf#topDtp>KNub4sfzk}xF#kWF>73>apw0@RQe z5=0MQEXUX)lGv*8aP!{$rl_%J6PO$4VV#!YrOWBzur|GIBaM$ z&Y`Y^PcxCu%2kgx4=d$lF1cSgF8|=-^m_4$NsjU*e zY{5H~3EPt45Wn+|XFaN20)4hL*X|s8oe5}}fnDKynkdRWMVpdMMLkb`f`4((G;ZC_ zc(Z^;xt}ls)+EeTbvCtA z!}dhNSU!;}hG1cAhMgA+gHTNFa0$mA9?2-2#f&u{@28Q>Gm1fN*}M`Qkq6B5(Ua71 zh~%B>DV$+#_FlxGXpHhj=GijD&{`}8%(2^dVN@gq6eg@5H;>BIi@ZS8OvX^ucIXlE zg~aft*?6&(f*fhxsf#)ZM$9%X)!{l9!TB4|&P=0@W?{X?YcJL%(2k|a^Ev<%pTmXc ztHv3S^dC}nx6GtgtopeX&K-~N6Fhi+o76hz@0i~!ym&6?yC63pa1P&{-y79IL)pJe}b%rCF0{vh@nz3FZ`(03Q#w z{bqPZ_`34{9Hei3QYN|!k#I8sXmPjlbX?U#IAoI7Kx)!8s=Rau9Cjiho6X7Dl1Zz| z#u;mdnoQ5M^T$*q{?<~{7N1kxh)dc=Y&hzXh(K%oeJ@&Sr$gEG8ZFsV>os0mYOkNx z@=Hrx^9!6it)n$N7wmL`fk)8wnDYS?mUYA24@<7=398l{8*$vWLsjqPLJEGSo7QWIjW$}u5c(jOq5*TJgS8apN?OV)(W;0HtFm7A?; z#;KpjDL#E^ScJA65dM@fn%#IKwW4*pxxXmPYp`*d00TU{+28a@bB#Lte5W%!=OmQa*{HUsXx|pxsbN&$td=5zu2iC{_j9dk_@8K%1t^Kd1*iog36FTWy9SY#lG8GXZ(3=P`AcNBi>EEN#%fr5} zGOb6M6$BZ%MHgpnvmGcG55zfOMN?rtn@y{EUf$uBzRe(EW_(Wq_{Zh5dtMJhh1$qZ zlj9(ZWx;-s;~++$|HdRa#^NPU0~W+IC%7-+pSg*d5vzbff-JlU=(7rYIG-iL!_DE( zJt?jHW*3dbg`{-|=mS392mbg*%#3UW8KS2+T;{X9SWskOUFesoolh^8wVIh=$(#>U zKD*j;0Ej!@k&_pTv?RAsfjux<;e1P+rCv&>!;Gx9h8fmVYC2iDL*ix#zG({Nb|F}g z2`uwijJBz57*4eghq}!Uh0!q(WMf8hZ`&V*-e+`P*gYyNo)6QNZ+h%nslZa{GZFC_ zy^(2g9u9FhEPaR2ih-TiI66i;Z>@qzC?{hwcSDEQ>M%5L6`cySL_-hT-oW2lVD%)T zKp=ng(0kFfMj^9iP~;G1oIOkt$==MbGc;eBu}R*^W)YGmk%>NbECR9#qu5+$8H^#O zc+xG^7(!gk1p^ z(B{NB5GLV-&O;O-VUe^z4$fi?uRVWSzmm?W`o*8qDe^CHl>EhC645~xA`MhX=-}!x znO~t1F!;t5_&~$q`9J^p$r1_cIYIwGXOxq%Aey)(=*1Qd8H;UECG%Xs76T+Dl*1u6 ztR-V&vj@>hTP3ogbS;~pP5oLGGwBpt0&=ZwaSL4fk<)c%iP$OpHC#77af2dOCCp## z_%A>CU~(hhzj@h-JZ(^w6cz0qDr|{NyCVR`w(#zXPVPS3$%4qqaADjU5vgX)6P~gt zP{HxQID7)3MiL+eT;c!<{V;^{yRlPznb2?fuO~ za|t4cB{A`Ds4`DG!1WrDdAP=zv*9@gc&6gOHevM~d3i7#p?H1({SOFXbeC3;?u36X z0vhx}uA#sqy*Z9WkS+mEdOa*%cXanWzEG+z86bKY#0y<)y1LgcI^E??hJ9QpGW)92 z4iM8BPgRBh0xUyPhI;G7;Rv?~p(*H~9Op3TjEt)=HDnzubHZ`cvF(j3C11rBDZqIR z*_rg{&AG*hU2vGNtKe)hYE#!(GKntRd>iyV;yZDf0(~Z|4bQ)e3;>Rn(z7bG=Q1_} z+E?_dM;AEzPOK`LyPdT-s5k=+nIcA%SPG>T3K`bYX%!l%VYbsO9}CWr0efRi%#wIa zYl_%9UpV~1VSUJreE}8I!<}e*c2c1KT6m{IJj^@X<}hpx3bIr;)}p?lRI~3MNulRO z;r$ZjhGD>`@O0K|NC;r-H9p5$#KlzJysZ9@)}rG$YIZ`E>xrcptvP?ywC?G8>XS^M zNQ+umW~PyWc5zOKWV16Lw5A0w%DD*_TCilcCO^uYG0b*mCwVgxJe|8+JRKh*enlRs zTqHtaDN>?}e5tf&FCwpq2yaIl8x)L+h*(lp-MMme*G)`hkr=FWIM?n7LXKIGcHB*; z2T4J0--wU>#s|1Pf%-L+PB>RSjZ&UllApTZ^_h0~_X$~bu!rJ0B;t1oeGUhJ;mvk< zDi{*VYK7$G!ntaC zCWl8Gm=|QdK=Yae##)dHXO2s{>=cutZVJ>*ZN+(5*)DUTJY=xYB0Jh@qnVRQg$oeB z%8YwPHOYBy=V03s4$DeK+`-2NX1|fMLIT&sCz<~ZRtV!V|4in8)Z2p0z_V(5@(#{>G(CklaD6aa_1CUdQoSUh<^-XOio)rWqYd643ca)Jx1RiEpIp#$sEVT^R4VgzpqjD5~<}U zAtpD*!vl$CX?D^sgfA!ja+>94eKJ%{G-U8SJbIVD9xFTtLjJsULf)+`8mzJ}S>E>8 z=fj9z_2U=w$89jPu^7vYyv;4GpnJlG_17(R35GDUnX; zWD^UyV3G}Wv5qfBcGc9RQ{oea!iU>f-n3mBuZF}zsgxpg$eK2C0yXWNkl>R2VznLB z{b?my7m4OwqY#aNu&cqvgUT3M8K9IgD|{R93XHQHBVV@R(t~1cE$=i=q!I0G^1AU^%xehJ3vJJtYe$?`ho~X^c`AmF9`n=( zLx{HY2<+gxALTzv!C-m#Jg4%X!v=kOw=MtS-M3t`xE^+_s54<2dS)gheB-xMg09QH z{lE|UY7GE8fL!AIK7a#pn2lisQBIDjhYwrfC&nBw4E9_PQk^Aa{XY5?zoZCwkh{!K*z=-M_f!1%6^B&d z_n5$@g|nbo@aVcKj}Tfljd0X#+&?YB3?TSj#66)l zq}RF()I971rxXF0l0+P@1c}%8f!8Sl1VCap_n=+)ECFEO`K_Rnu>dKmOqt)l`0mzrOblJ5ne29Huy?`Qqh2Q2kzqaqzdJ z=vQy0{=@#}#!}<>tBkx3d+1XUZHi}EnXK0JVh znPxmuiVEL)y+cdR>;-?bVXkJ|>+xCVCGz1S?z*|s$aFGaE8CN^%9~3q(AZ=SlpY=; z^rbrUAWg(pouD`_i5Q%cNcjz^U`cG)DmV2te$i`PX@Yc?IGM3h<@6-GZrRw4?r!j+ zVCTMUb{7@a(wsFcdNMJ1@GGhsj!Dihww}~*@^M)0copq_BV0A2K`u`Ii%9aFb33pS z#{_d$SfNQ9^5FtecXSmIV0tA=v&6WK#9=kII6S<*q=6ZwT$i-#6a}Fq9O3`j;|qT_ zhtTgZM6%#`LW|af%^L^4ZeF+~SN2_#IGt~BVx#vDsgb?BO9~FXS2q|-3w5LTveQLy z_l0%;)$RRyBttNm@Aim z52)X@*H=71KW^!Q=N~!;ZE;3~_R4F5^%wa}l#AIbbTHwULyz@Pg1l7NP%I6*aLP@xEQ`vxOF?CwfmDce3q4zpj2W^eGR!0uEM;nSs%3^8~EK;3BlVv;td zA|29A72qu^zGPNWQ`(|b?4 zBOAqHgd|Uoj-Z}h;U@H7shcloYEZAQZVL&>NET5qmnaD=8l}#|jW_T*@(#AzoVXyW z&kKl7m6@1*yQ=X63FmHIi$`q^jQ~JkYlhvUuFD#$izPlLCr2G4p0J--9-}qk)U0ct zVFLpDI>ez~B$2@>Dsmj3oYcucB(puSdDLO=Q(8!~d-y*-##(7AMy!>#jyilxZAu++ z2{(7JC%<)#bs#rgRM?#ELF+a0pg&yWwX){*J?PMY>+^dMhuh5LjHzV~k-$N`iH5>c z;f(k?sr{aWNF1}0Kvm;x;gOQGuunGM%(q(95)_b(GJ=nD@}#Lwr?Kf83&~1}L;g@l z0_2jIDXY##tW`u3#sdkde$aQ|;7KeupLEWg!pdC-i(Nlw4%D=(Da~+*KRJFgaoVsf zr9VQ{`(xRD1poQe(}ruv&vx2aE7Wp#b9ON31&LFSi-RRfgZ<&^hU9uv@4^Kjszhhr zM5&IGX{?uS))8Jd z+}A}a+nk^|WYGqBORHyBF?pF?p)eOT77$ZVt6`atM+U2wZ|#-JUDfb^u*^_ zPOpuX&j6pfM7zG_^vIwTof%`^kM-^Dpr>E3)3iI5OO zoy2j_#S8GT=rC{ThtAuz27*#nj0AygWPG^^DJeBJ8u$~(#nSuq=H!UleVmrF&igLM z3@VvJ!RZ-TWj)39CfRB<7)K(lTH*<}0dI`fv~g$N9P`OL9JB0;3gp3Fz2@D6(E2ib z<@D*V*NE2m#?SKfS?7wN?t?^Vw?KyeUv(zIYra|&QdFJQ*Fk?L!cXlQIPPjE_?>Hg|cz$;oH8 zOPn?=&$mZR1&jxu*sJV{C~OJonKI)rPCiM>npJ56$d-LP3{0J!upI)6D{d6X`ffFM zlQ$O`MB2-Y<fEf+BkTGUeV8 z`{(%?%w?E^zb4*0y1pHlh4!W~n` zE#ZsE)LnHv30on(g4Yurlu9zU1FOa$LFJ1=$Qj^rxOMl*|0OQ(J^2B|@vmwt;m#nHPD-xRPX<0!Pp0>d9B)dSJ48K&6mR3A~+wt{$R3n&&maw;Plp%14G%=ImJ_Buhs7iu?HnU&9A?!Rk59hj`I*M$*dE zq=ecL6b7b0d9^A#ADS&5#11ENi@Vb1{h=MTO_fZ67fmS0 zDjSBy0K7$0Zo(n9MmSqFY8K^{V4FL>jbWB(vy6m9mBebghM}skjM**Ddaj+}+`19V z8OEdA1_XWunPVwy!S++tjTbKP%B(zfdA9{|5%k{W97r#uDcU>*7Atjz(n5BQGi;=x zjd+?vVCxjPyG6^yaP|`9U=SoJx@%U;H-FOWHIfi#lF#y_u{l9%cwSU$$mB*zT9}d* zMAIDgfGT2=F1o?U>cT3duHCD_9_ z?a}Ao^sxKNs|TO|@0Wd(Ga)Yc7f+7=-))>0nJCrE$p-A9_4k^B*WEXB!S+TLpzBN+ zlY^tdU{2ge#*Jij8CCmTkBW$@2RIRjw@tgp+_2E%E9gigy# z#v_9r)IQ#ymphMaRf@E>N?znVa!IAg!AJYEvcR6^#?^Bn{a3&9{i`VE0Td2+EvesE zzH{}xGW*5%R&FT!J+6KpFOB#NepUB{XtSrlzd@@UY`YzsgvNeV%W@gI|n z0*js44t&ei6(rqFsQQh;Ab2`IeKtPa#{!iRwQIyB81G(^A)>STD5@c-r3<-3np7^d z)49ounC{%7MCOvdlg5yPi2H#$1Bmqb2k5gI@Rp2p6V`;;lO#iGni37o4EA8q_(h}w z*$f3B{WgXuE^R1zz|lYUmW<3O@!b`K!#ljQtnP3X>HsqwQ9)WW( zm0I?qfK^o4ut=Mef(*=br^)h5laN-!hy;Qj@)pHmyjq>ahf`6N#qsEXb|i@?ls>uC zpCG&%d4xF-^^al80fo> zV=UbIvw>!Y7M32oB)cbw$nAZ9gztMl+fVR$-z*S~BSt{lB{=4c>KVs+$_kN0wq@tz z&Fil}$c|YF2|g1o`Qh%n6*sd(fcQ&NzJgT~x5#9ud03YF-43fg9)5YAMG#IT)h28G z5eG^8S*wQL>|sZQ*gMyvhKth*JevVROF%5r5`d3h;WEk(PGZm7zPC$yRm7-Iqp#%z#@z9r_hqfyinN?10p?2P{kO@wA|YNOfl5h^xw8fUH5RFry2OTA zbjUnn{l*NOcM^B2UEvWkj>I#QJrLb`2PFX1Rs2%ai4n7g%63vw!Eb2Bx)!~Le!p{7qx$zz!0>X2ZX zr~4lh?O|@8*X@70k^{uEt7m#@Zuv(QeMI9J3(NkYxqbfeVzmN!JpsM7=5{UP+F$=n BUhe<^ literal 0 HcmV?d00001 diff --git a/train.sh b/train.sh new file mode 100755 index 0000000..5f3ccab --- /dev/null +++ b/train.sh @@ -0,0 +1,7 @@ +i=0; websocat "ws://localhost:6008/subscribe?cursor=0" | while IFS= read -r line; do + echo "$line" > "training/$i.json" + i=$((i+1)) + if [ "$i" -ge 100000 ]; then + break + fi +done From d2b963ee9eb25a9b9b1bb1d7ee8d894cab13fbdb Mon Sep 17 00:00:00 2001 From: Jaz Date: Sat, 21 Sep 2024 18:06:28 -0700 Subject: [PATCH 3/5] Gitignore bump --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index a52e16a..16be990 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ go.work .env data/ +training/ From 28c1ba29848fd529e85dcd4e923e70739a380753 Mon Sep 17 00:00:00 2001 From: Jaz Date: Sat, 21 Sep 2024 18:53:29 -0700 Subject: [PATCH 4/5] Store both compressed and uncompressed playback buffers, only compress events once, never decompress on the server --- Dockerfile | 3 +++ cmd/client/main.go | 18 +++++++++++-- cmd/jetstream/main.go | 40 +++++++++++++++++++++++++++- cmd/jetstream/server.go | 48 ++++++++++++---------------------- docker-compose.yaml | 1 + pkg/client/client.go | 8 +++--- pkg/consumer/consumer.go | 37 ++++++++++++++++++++------ pkg/consumer/persist.go | 47 ++++++++++++++++++++++----------- dictionary => zstd-dictionary | Bin 9 files changed, 142 insertions(+), 60 deletions(-) rename dictionary => zstd-dictionary (100%) diff --git a/Dockerfile b/Dockerfile index cce3a71..3f5c1d3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,5 +32,8 @@ COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certifica # Copy the binary from the first stage. COPY --from=builder /app/jetstream . +# Copy the zstd dictionary +COPY zstd-dictionary /zstd-dictionary + # Set the startup command to run the binary CMD ["./jetstream"] diff --git a/cmd/client/main.go b/cmd/client/main.go index 26df233..2c45e7a 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "log" "log/slog" "os" @@ -27,9 +28,22 @@ func main() { }))) logger := slog.Default() + // Open the zstd dictionary file if it is set + f, err := os.Open("zstd-dictionary") + if err != nil { + log.Fatalf("failed to open zstd dictionary file: %v", err) + } + + dictBytes, err := io.ReadAll(f) + if err != nil { + f.Close() + log.Fatalf("failed to read zstd dictionary file: %v", err) + } + f.Close() + config := client.DefaultClientConfig() config.WebsocketURL = serverAddr - config.Compress = false + config.Compress = true h := &handler{ seenSeqs: make(map[int64]struct{}), @@ -37,7 +51,7 @@ func main() { scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) - c, err := client.NewClient(config, logger, scheduler) + c, err := client.NewClient(config, logger, scheduler, dictBytes) if err != nil { log.Fatalf("failed to create client: %v", err) } diff --git a/cmd/jetstream/main.go b/cmd/jetstream/main.go index 3e8cc0c..b8e643c 100644 --- a/cmd/jetstream/main.go +++ b/cmd/jetstream/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "log" "log/slog" "net/http" @@ -18,6 +19,7 @@ import ( "github.com/bluesky-social/indigo/events/schedulers/parallel" "github.com/bluesky-social/jetstream/pkg/consumer" "github.com/gorilla/websocket" + "github.com/klauspost/compress/zstd" "github.com/labstack/echo/v4" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel" @@ -69,6 +71,12 @@ func main() { Value: "./data", EnvVars: []string{"JETSTREAM_DATA_DIR"}, }, + &cli.StringFlag{ + Name: "zstd-dictionary-path", + Usage: "path to the zstd dictionary file", + EnvVars: []string{"JETSTREAM_ZSTD_DICTIONARY_PATH"}, + Required: false, + }, &cli.DurationFlag{ Name: "event-ttl", Usage: "time to live for events", @@ -113,6 +121,35 @@ func Jetstream(cctx *cli.Context) error { return fmt.Errorf("failed to parse ws-url: %w", err) } + // Open the zstd dictionary file if it is set + var dictBytes []byte + if cctx.String("zstd-dictionary-path") != "" { + f, err := os.Open(cctx.String("zstd-dictionary-path")) + if err != nil { + return fmt.Errorf("failed to open zstd dictionary file: %w", err) + } + + dictBytes, err = io.ReadAll(f) + if err != nil { + f.Close() + return fmt.Errorf("failed to read zstd dictionary file: %w", err) + } + f.Close() + } + + var enc *zstd.Encoder + if len(dictBytes) > 0 { + enc, err = zstd.NewWriter(nil, zstd.WithEncoderDict(dictBytes)) + if err != nil { + return fmt.Errorf("failed to create zstd encoder: %w", err) + } + } else { + enc, err = zstd.NewWriter(nil) + if err != nil { + return fmt.Errorf("failed to create zstd encoder: %w", err) + } + } + s, err := NewServer(cctx.Float64("max-sub-rate")) if err != nil { return fmt.Errorf("failed to create server: %w", err) @@ -124,6 +161,7 @@ func Jetstream(cctx *cli.Context) error { u.String(), cctx.String("data-dir"), cctx.Duration("event-ttl"), + enc, s.Emit, ) if err != nil { @@ -342,7 +380,7 @@ func Jetstream(cctx *cli.Context) error { c.Shutdown() - err = c.DB.Close() + err = c.UncompressedDB.Close() if err != nil { log.Error("failed to close pebble db", "error", err) } diff --git a/cmd/jetstream/server.go b/cmd/jetstream/server.go index 4240fd0..774c860 100644 --- a/cmd/jetstream/server.go +++ b/cmd/jetstream/server.go @@ -14,9 +14,7 @@ import ( "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/jetstream/pkg/consumer" "github.com/bluesky-social/jetstream/pkg/models" - "github.com/goccy/go-json" "github.com/gorilla/websocket" - "github.com/klauspost/compress/zstd" "github.com/labstack/echo/v4" "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/semaphore" @@ -47,7 +45,6 @@ type Subscriber struct { wantedCollections *WantedCollections wantedDids map[string]struct{} rl *rate.Limiter - encoder *zstd.Encoder } type Server struct { @@ -71,7 +68,7 @@ func NewServer(maxSubRate float64) (*Server, error) { var maxConcurrentEmits = int64(100) var cutoverThresholdUS = int64(1_000_000) -func (s *Server) Emit(ctx context.Context, e models.Event) error { +func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes []byte) error { ctx, span := tracer.Start(ctx, "Emit") defer span.End() @@ -81,14 +78,7 @@ func (s *Server) Emit(ctx context.Context, e models.Event) error { defer s.lk.RUnlock() eventsEmitted.Inc() - - b, err := json.Marshal(e) - if err != nil { - log.Error("failed to marshal event", "error", err) - return fmt.Errorf("failed to marshal event: %w", err) - } - - evtSize := float64(len(b)) + evtSize := float64(len(asJSON)) bytesEmitted.Add(evtSize) collection := "" @@ -96,7 +86,8 @@ func (s *Server) Emit(ctx context.Context, e models.Event) error { collection = e.Commit.Collection } - getEncodedEvent := func() []byte { return b } + getJSONEvent := func() []byte { return asJSON } + getCompressedEvent := func() []byte { return compBytes } sem := semaphore.NewWeighted(maxConcurrentEmits) for _, sub := range s.Subscribers { @@ -113,7 +104,14 @@ func (s *Server) Emit(ctx context.Context, e models.Event) error { if sub.cursor != nil && sub.seq < e.TimeUS-cutoverThresholdUS { return } - emitToSubscriber(ctx, log, sub, e.TimeUS, e.Did, collection, false, getEncodedEvent) + + // Pick the event valuer for the subscriber + getEventBytes := getJSONEvent + if sub.compress { + getEventBytes = getCompressedEvent + } + + emitToSubscriber(ctx, log, sub, e.TimeUS, e.Did, collection, false, getEventBytes) }(sub) } @@ -127,7 +125,7 @@ func (s *Server) Emit(ctx context.Context, e models.Event) error { return nil } -func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, timeUS int64, did, collection string, playback bool, getEncodedEvent func() []byte) error { +func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, timeUS int64, did, collection string, playback bool, getEventBytes func() []byte) error { if !sub.WantsCollection(collection) { return nil } @@ -143,11 +141,7 @@ func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, ti return nil } - evtBytes := getEncodedEvent() - if sub.compress { - evtBytes = sub.encoder.EncodeAll(evtBytes, nil) - } - + evtBytes := getEventBytes() if playback { // Copy the event bytes so the playback iterator can reuse the buffer evtBytes = append([]byte{}, evtBytes...) @@ -240,15 +234,6 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, compress bool, rl: rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate)), } - if compress { - encoder, err := zstd.NewWriter(nil) - if err != nil { - slog.Error("failed to create zstd encoder", "error", err) - return nil, fmt.Errorf("failed to create zstd encoder: %w", err) - } - sub.encoder = encoder - } - s.Subscribers[s.nextSub] = &sub s.nextSub++ @@ -375,10 +360,11 @@ func (s *Server) HandleSubscribe(c echo.Context) error { if cursor != nil { log.Info("replaying events", "cursor", *cursor) playbackRateLimit := s.maxSubRate * 10 + go func() { for { - lastSeq, err := s.Consumer.ReplayEvents(ctx, *cursor, playbackRateLimit, func(ctx context.Context, timeUS int64, did, collection string, getEncodedEvent func() []byte) error { - return emitToSubscriber(ctx, log, sub, timeUS, did, collection, true, getEncodedEvent) + lastSeq, err := s.Consumer.ReplayEvents(ctx, sub.compress, *cursor, playbackRateLimit, func(ctx context.Context, timeUS int64, did, collection string, getEventBytes func() []byte) error { + return emitToSubscriber(ctx, log, sub, timeUS, did, collection, true, getEventBytes) }) if err != nil { log.Error("failed to replay events", "error", err) diff --git a/docker-compose.yaml b/docker-compose.yaml index 9c4671c..18d8b67 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,4 +11,5 @@ services: - ./data:/data environment: - JETSTREAM_DATA_DIR=/data + - JETSTREAM_ZSTD_DICTIONARY_PATH=/zstd-dictionary - JETSTREAM_MAX_SUB_RATE=1_000_000 diff --git a/pkg/client/client.go b/pkg/client/client.go index f7a6114..e873701 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -33,6 +33,7 @@ type Client struct { config *ClientConfig logger *slog.Logger decoder *zstd.Decoder + dictBytes []byte BytesRead atomic.Int64 EventsRead atomic.Int64 shutdown chan chan struct{} @@ -50,7 +51,7 @@ func DefaultClientConfig() *ClientConfig { } } -func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler) (*Client, error) { +func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler, dictBytes []byte) (*Client, error) { if config == nil { config = DefaultClientConfig() } @@ -64,11 +65,12 @@ func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler) ( } if config.Compress { - dec, err := zstd.NewReader(nil) + c.config.ExtraHeaders["Accept-Encoding"] = "zstd" + c.dictBytes = dictBytes + dec, err := zstd.NewReader(nil, zstd.WithDecoderDicts(dictBytes)) if err != nil { return nil, fmt.Errorf("failed to create zstd decoder: %w", err) } - c.config.ExtraHeaders["Accept-Encoding"] = "zstd" c.decoder = dec } diff --git a/pkg/consumer/consumer.go b/pkg/consumer/consumer.go index 5ca1c82..2f0e2cc 100644 --- a/pkg/consumer/consumer.go +++ b/pkg/consumer/consumer.go @@ -17,6 +17,7 @@ import ( "github.com/bluesky-social/jetstream/pkg/monotonic" "github.com/cockroachdb/pebble" "github.com/goccy/go-json" + "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -26,8 +27,10 @@ import ( type Consumer struct { SocketURL string Progress *Progress - Emit func(context.Context, models.Event) error - DB *pebble.DB + Emit func(context.Context, *models.Event, []byte, []byte) error + UncompressedDB *pebble.DB + CompressedDB *pebble.DB + encoder *zstd.Encoder EventTTL time.Duration logger *slog.Logger clock *monotonic.Clock @@ -48,10 +51,17 @@ func NewConsumer( socketURL string, dataDir string, eventTTL time.Duration, - emit func(context.Context, models.Event) error, + encoder *zstd.Encoder, + emit func(context.Context, *models.Event, []byte, []byte) error, ) (*Consumer, error) { - dbPath := dataDir + "/jetstream.db" - db, err := pebble.Open(dbPath, &pebble.Options{}) + uDBPath := dataDir + "/jetstream.uncompressed.db" + uDB, err := pebble.Open(uDBPath, &pebble.Options{}) + if err != nil { + return nil, fmt.Errorf("failed to open db: %w", err) + } + + cDBPath := dataDir + "/jetstream.compressed.db" + cDB, err := pebble.Open(cDBPath, &pebble.Options{}) if err != nil { return nil, fmt.Errorf("failed to open db: %w", err) } @@ -70,7 +80,9 @@ func NewConsumer( }, EventTTL: eventTTL, Emit: emit, - DB: db, + UncompressedDB: uDB, + CompressedDB: cDB, + encoder: encoder, logger: log, clock: clock, buf: make(chan *models.Event, 10_000), @@ -325,12 +337,21 @@ func (c *Consumer) RunSequencer(ctx context.Context) error { // Assign a time_us to the event e.TimeUS = c.clock.Now() c.sequenced.Inc() - if err := c.PersistEvent(ctx, e); err != nil { + + // Encode the event in JSON and compress it + asJSON, err := json.Marshal(e) + if err != nil { + log.Error("failed to marshal event", "error", err) + return + } + compBytes := c.encoder.EncodeAll(asJSON, nil) + + if err := c.PersistEvent(ctx, e, asJSON, compBytes); err != nil { log.Error("failed to persist event", "error", err) return } c.persisted.Inc() - if err := c.Emit(ctx, *e); err != nil { + if err := c.Emit(ctx, e, asJSON, compBytes); err != nil { log.Error("failed to emit event", "error", err) } c.emitted.Inc() diff --git a/pkg/consumer/persist.go b/pkg/consumer/persist.go index 5d3a0c7..1908d7d 100644 --- a/pkg/consumer/persist.go +++ b/pkg/consumer/persist.go @@ -54,7 +54,7 @@ func (c *Consumer) WriteCursor(ctx context.Context) error { } // Write the cursor JSON to pebble - err = c.DB.Set(cursorKey, data, pebble.Sync) + err = c.UncompressedDB.Set(cursorKey, data, pebble.Sync) if err != nil { return fmt.Errorf("failed to write cursor to pebble: %w", err) } @@ -68,7 +68,7 @@ func (c *Consumer) ReadCursor(ctx context.Context) error { defer span.End() // Read the cursor from pebble - data, closer, err := c.DB.Get(cursorKey) + data, closer, err := c.UncompressedDB.Get(cursorKey) if err != nil { if err == pebble.ErrNotFound { return nil @@ -87,17 +87,10 @@ func (c *Consumer) ReadCursor(ctx context.Context) error { } // PersistEvent persists an event to PebbleDB -func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event) error { +func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event, asJSON, compBytes []byte) error { ctx, span := tracer.Start(ctx, "PersistEvent") defer span.End() - // Persist the event to PebbleDB - data, err := json.Marshal(evt) - if err != nil { - log.Error("failed to marshal event", "error", err) - return fmt.Errorf("failed to marshal event: %w", err) - } - // Key structure for events in PebbleDB // {{event_time_us}}_{{repo}}_{{collection}} var key []byte @@ -107,12 +100,20 @@ func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event) error { key = []byte(fmt.Sprintf("%d_%s", evt.TimeUS, evt.Did)) } - err = c.DB.Set(key, data, pebble.NoSync) + // Write the event to the uncompressed DB + err := c.UncompressedDB.Set(key, asJSON, pebble.NoSync) if err != nil { log.Error("failed to write event to pebble", "error", err) return fmt.Errorf("failed to write event to pebble: %w", err) } + // Compress the event and write it to the compressed DB + err = c.CompressedDB.Set(key, compBytes, pebble.NoSync) + if err != nil { + log.Error("failed to write compressed event to pebble", "error", err) + return fmt.Errorf("failed to write compressed event to pebble: %w", err) + } + return nil } @@ -127,12 +128,19 @@ func (c *Consumer) TrimEvents(ctx context.Context) error { trimKey := []byte(fmt.Sprintf("%d", trimUntil)) // Delete all numeric keys older than the trim key - err := c.DB.DeleteRange([]byte("0"), trimKey, pebble.Sync) + err := c.UncompressedDB.DeleteRange([]byte("0"), trimKey, pebble.Sync) if err != nil { log.Error("failed to delete old events", "error", err) return fmt.Errorf("failed to delete old events: %w", err) } + // Delete all numeric keys older than the trim key in the compressed DB + err = c.CompressedDB.DeleteRange([]byte("0"), trimKey, pebble.Sync) + if err != nil { + log.Error("failed to delete old compressed events", "error", err) + return fmt.Errorf("failed to delete old compressed events: %w", err) + } + return nil } @@ -140,7 +148,7 @@ func (c *Consumer) TrimEvents(ctx context.Context) error { var finalKey = []byte("9700000000000000") // ReplayEvents replays events from PebbleDB -func (c *Consumer) ReplayEvents(ctx context.Context, cursor int64, playbackRateLimit float64, emit func(context.Context, int64, string, string, func() []byte) error) (int64, error) { +func (c *Consumer) ReplayEvents(ctx context.Context, compressed bool, cursor int64, playbackRateLimit float64, emit func(context.Context, int64, string, string, func() []byte) error) (int64, error) { ctx, span := tracer.Start(ctx, "ReplayEvents") defer span.End() @@ -149,10 +157,19 @@ func (c *Consumer) ReplayEvents(ctx context.Context, cursor int64, playbackRateL limiter := rate.NewLimiter(rate.Limit(playbackRateLimit), int(playbackRateLimit)) // Iterate over all events starting from the cursor - iter, err := c.DB.NewIterWithContext(ctx, &pebble.IterOptions{ + var iter *pebble.Iterator + var err error + + iterOptions := &pebble.IterOptions{ LowerBound: []byte(fmt.Sprintf("%d", cursor)), UpperBound: finalKey, - }) + } + + if compressed { + iter, err = c.CompressedDB.NewIterWithContext(ctx, iterOptions) + } else { + iter, err = c.UncompressedDB.NewIterWithContext(ctx, iterOptions) + } if err != nil { log.Error("failed to create iterator", "error", err) return 0, fmt.Errorf("failed to create iterator: %w", err) diff --git a/dictionary b/zstd-dictionary similarity index 100% rename from dictionary rename to zstd-dictionary From 4b4167612d10a89d9d1e600721ca09865234f36f Mon Sep 17 00:00:00 2001 From: Jaz Date: Sat, 21 Sep 2024 23:14:27 -0700 Subject: [PATCH 5/5] Embed zstd dictionary and update readme --- Dockerfile | 3 -- README.md | 10 ++++++ cmd/client/main.go | 16 +-------- cmd/jetstream/main.go | 32 ------------------ docker-compose.yaml | 1 - pkg/client/client.go | 6 ++-- pkg/consumer/consumer.go | 6 +++- pkg/models/models.go | 5 +++ zstd-dictionary => pkg/models/zstd_dictionary | Bin 9 files changed, 23 insertions(+), 56 deletions(-) rename zstd-dictionary => pkg/models/zstd_dictionary (100%) diff --git a/Dockerfile b/Dockerfile index 3f5c1d3..cce3a71 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,8 +32,5 @@ COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certifica # Copy the binary from the first stage. COPY --from=builder /app/jetstream . -# Copy the zstd dictionary -COPY zstd-dictionary /zstd-dictionary - # Set the startup command to run the binary CMD ["./jetstream"] diff --git a/README.md b/README.md index c5548ba..be0fd57 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,16 @@ The following Query Parameters are supported: - An absent cursor or a cursor from the future will result in live-tail operation - When reconnecting, use the `time_us` from your most recently processed event and maybe provide a negative buffer (i.e. subtract a few seconds) to ensure gapless playback +### Compression + +Jetstream supports `zstd`-based compression of messages. Jetstream uses a custom dictionary for compression that can be found in `pkg/models/zstd_dictionary` and is required to decode compressed messages from the server. + +`zstd` compressed Jetstream messages are ~56% smaller on average than the raw JSON version of the Jetstream firehose. + +The provided client library uses compression by default, using an embedded copy of the Dictionary from the `models` package. + +### Examples + A simple example that hits the public instance looks like: ```bash diff --git a/cmd/client/main.go b/cmd/client/main.go index 2c45e7a..b8aa480 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "log" "log/slog" "os" @@ -28,19 +27,6 @@ func main() { }))) logger := slog.Default() - // Open the zstd dictionary file if it is set - f, err := os.Open("zstd-dictionary") - if err != nil { - log.Fatalf("failed to open zstd dictionary file: %v", err) - } - - dictBytes, err := io.ReadAll(f) - if err != nil { - f.Close() - log.Fatalf("failed to read zstd dictionary file: %v", err) - } - f.Close() - config := client.DefaultClientConfig() config.WebsocketURL = serverAddr config.Compress = true @@ -51,7 +37,7 @@ func main() { scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) - c, err := client.NewClient(config, logger, scheduler, dictBytes) + c, err := client.NewClient(config, logger, scheduler) if err != nil { log.Fatalf("failed to create client: %v", err) } diff --git a/cmd/jetstream/main.go b/cmd/jetstream/main.go index b8e643c..e9ac443 100644 --- a/cmd/jetstream/main.go +++ b/cmd/jetstream/main.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "log" "log/slog" "net/http" @@ -19,7 +18,6 @@ import ( "github.com/bluesky-social/indigo/events/schedulers/parallel" "github.com/bluesky-social/jetstream/pkg/consumer" "github.com/gorilla/websocket" - "github.com/klauspost/compress/zstd" "github.com/labstack/echo/v4" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel" @@ -121,35 +119,6 @@ func Jetstream(cctx *cli.Context) error { return fmt.Errorf("failed to parse ws-url: %w", err) } - // Open the zstd dictionary file if it is set - var dictBytes []byte - if cctx.String("zstd-dictionary-path") != "" { - f, err := os.Open(cctx.String("zstd-dictionary-path")) - if err != nil { - return fmt.Errorf("failed to open zstd dictionary file: %w", err) - } - - dictBytes, err = io.ReadAll(f) - if err != nil { - f.Close() - return fmt.Errorf("failed to read zstd dictionary file: %w", err) - } - f.Close() - } - - var enc *zstd.Encoder - if len(dictBytes) > 0 { - enc, err = zstd.NewWriter(nil, zstd.WithEncoderDict(dictBytes)) - if err != nil { - return fmt.Errorf("failed to create zstd encoder: %w", err) - } - } else { - enc, err = zstd.NewWriter(nil) - if err != nil { - return fmt.Errorf("failed to create zstd encoder: %w", err) - } - } - s, err := NewServer(cctx.Float64("max-sub-rate")) if err != nil { return fmt.Errorf("failed to create server: %w", err) @@ -161,7 +130,6 @@ func Jetstream(cctx *cli.Context) error { u.String(), cctx.String("data-dir"), cctx.Duration("event-ttl"), - enc, s.Emit, ) if err != nil { diff --git a/docker-compose.yaml b/docker-compose.yaml index 18d8b67..9c4671c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,5 +11,4 @@ services: - ./data:/data environment: - JETSTREAM_DATA_DIR=/data - - JETSTREAM_ZSTD_DICTIONARY_PATH=/zstd-dictionary - JETSTREAM_MAX_SUB_RATE=1_000_000 diff --git a/pkg/client/client.go b/pkg/client/client.go index e873701..f748e8b 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -33,7 +33,6 @@ type Client struct { config *ClientConfig logger *slog.Logger decoder *zstd.Decoder - dictBytes []byte BytesRead atomic.Int64 EventsRead atomic.Int64 shutdown chan chan struct{} @@ -51,7 +50,7 @@ func DefaultClientConfig() *ClientConfig { } } -func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler, dictBytes []byte) (*Client, error) { +func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler) (*Client, error) { if config == nil { config = DefaultClientConfig() } @@ -66,8 +65,7 @@ func NewClient(config *ClientConfig, logger *slog.Logger, scheduler Scheduler, d if config.Compress { c.config.ExtraHeaders["Accept-Encoding"] = "zstd" - c.dictBytes = dictBytes - dec, err := zstd.NewReader(nil, zstd.WithDecoderDicts(dictBytes)) + dec, err := zstd.NewReader(nil, zstd.WithDecoderDicts(models.ZSTDDictionary)) if err != nil { return nil, fmt.Errorf("failed to create zstd decoder: %w", err) } diff --git a/pkg/consumer/consumer.go b/pkg/consumer/consumer.go index 2f0e2cc..78fd38c 100644 --- a/pkg/consumer/consumer.go +++ b/pkg/consumer/consumer.go @@ -51,7 +51,6 @@ func NewConsumer( socketURL string, dataDir string, eventTTL time.Duration, - encoder *zstd.Encoder, emit func(context.Context, *models.Event, []byte, []byte) error, ) (*Consumer, error) { uDBPath := dataDir + "/jetstream.uncompressed.db" @@ -73,6 +72,11 @@ func NewConsumer( return nil, fmt.Errorf("failed to create clock: %w", err) } + encoder, err := zstd.NewWriter(nil, zstd.WithEncoderDict(models.ZSTDDictionary)) + if err != nil { + return nil, fmt.Errorf("failed to create zstd encoder: %w", err) + } + c := Consumer{ SocketURL: socketURL, Progress: &Progress{ diff --git a/pkg/models/models.go b/pkg/models/models.go index 31f190d..9c7e115 100644 --- a/pkg/models/models.go +++ b/pkg/models/models.go @@ -3,9 +3,14 @@ package models import ( "github.com/goccy/go-json" + _ "embed" + comatproto "github.com/bluesky-social/indigo/api/atproto" ) +//go:embed zstd_dictionary +var ZSTDDictionary []byte + type Event struct { Did string `json:"did"` TimeUS int64 `json:"time_us"` diff --git a/zstd-dictionary b/pkg/models/zstd_dictionary similarity index 100% rename from zstd-dictionary rename to pkg/models/zstd_dictionary