diff --git a/app/tier1.go b/app/tier1.go index bcee0e01..808edca5 100644 --- a/app/tier1.go +++ b/app/tier1.go @@ -60,6 +60,7 @@ type Tier1Config struct { StateStoreDefaultTag string BlockType string StateBundleSize uint64 + EnforceCompression bool // refuse incoming requests that do not accept gzip compression (ConnectRPC or GRPC) MaxSubrequests uint64 SubrequestsEndpoint string @@ -199,6 +200,7 @@ func (a *Tier1App) Run() error { a.config.BlockType, subrequestsClientConfig, tier2RequestParameters, + a.config.EnforceCompression, opts..., ) if err != nil { diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index d3a485d6..e54389b6 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -9,6 +9,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## v1.11.3 + +### Server-side + +* Fixed: detection of gzip compression on 'connect' protocol (js/ts clients) +* Added: tier1.Config `EnforceCompression` to refuse incoming connections that do not support GZIP compression (default: false) + ## v1.11.2 ### Server-side diff --git a/service/tier1.go b/service/tier1.go index 545f7e6b..afe0b42b 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -72,6 +72,7 @@ type Tier1Service struct { resolveCursor pipeline.CursorResolver getHeadBlock func() (uint64, error) + enforceCompression bool tier2RequestParameters reqctx.Tier2RequestParameters } @@ -129,6 +130,7 @@ func NewTier1( substreamsClientConfig *client.SubstreamsClientConfig, tier2RequestParameters reqctx.Tier2RequestParameters, + enforceCompression bool, opts ...Option, ) (*Tier1Service, error) { @@ -174,6 +176,7 @@ func NewTier1( logger: logger, tier2RequestParameters: tier2RequestParameters, blockExecutionTimeout: 3 * time.Minute, + enforceCompression: enforceCompression, } s.streamFactoryFunc = sf.New @@ -209,6 +212,14 @@ func (s *Tier1Service) Blocks( ctx, span := reqctx.WithSpan(ctx, "substreams/tier1/request") defer span.EndWithErr(&err) + var compressed bool + if matchGZIPHeader(req) { + compressed = true + } + if s.enforceCompression && !compressed { + return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Your client does not accept gzip-compressed streams. Check how to enable it on your GRPC or ConnectRPC client")) + } + request := req.Msg if request.Modules == nil { return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request")) @@ -243,16 +254,6 @@ func (s *Tier1Service) Blocks( for i := 0; i < len(moduleNames); i++ { moduleNames[i] = request.Modules.Modules[i].Name } - var compressed bool - for _, vv := range req.Header().Values("grpc-Accept-Encoding") { - for _, v := range strings.Split(vv, ",") { - if v == "gzip" { - compressed = true - break - } - } - } - fields := []zap.Field{ zap.Int64("start_block", request.StartBlockNum), zap.Uint64("stop_block", request.StopBlockNum), @@ -713,3 +714,24 @@ func toConnectError(ctx context.Context, err error) error { // data? return connect.NewError(connect.CodeInternal, err) } + +// must be lowercase +var compressionHeader = map[string]bool{"grpc-accept-encoding": true, "connect-accept-encoding": true} + +const compressionValue = "gzip" + +func matchGZIPHeader(req *connect.Request[pbsubstreamsrpc.Request]) bool { + + for k, v := range req.Header() { + if compressionHeader[strings.ToLower(k)] { + for _, vv := range v { + for _, vvv := range strings.Split(vv, ",") { + if strings.ToLower(vvv) == compressionValue { + return true + } + } + } + } + } + return false +}