diff --git a/config/config.go b/config/config.go index 63db7d2977..e88ef9a8a9 100644 --- a/config/config.go +++ b/config/config.go @@ -278,11 +278,13 @@ type Config struct { LuaModules *listFlag `yaml:"lua-modules"` LuaSources *listFlag `yaml:"lua-sources"` - EnableOpenPolicyAgent bool `yaml:"enable-open-policy-agent"` - OpenPolicyAgentConfigTemplate string `yaml:"open-policy-agent-config-template"` - OpenPolicyAgentEnvoyMetadata string `yaml:"open-policy-agent-envoy-metadata"` - OpenPolicyAgentCleanerInterval time.Duration `yaml:"open-policy-agent-cleaner-interval"` - OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"` + EnableOpenPolicyAgent bool `yaml:"enable-open-policy-agent"` + OpenPolicyAgentConfigTemplate string `yaml:"open-policy-agent-config-template"` + OpenPolicyAgentEnvoyMetadata string `yaml:"open-policy-agent-envoy-metadata"` + OpenPolicyAgentCleanerInterval time.Duration `yaml:"open-policy-agent-cleaner-interval"` + OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"` + OpenPolicyAgentMaxRequestBodySize int64 `yaml:"open-policy-agent-max-request-body-size"` + OpenPolicyAgentMaxMemoryBodyParsing int64 `yaml:"open-policy-agent-max-memory-body-parsing"` } const ( @@ -501,6 +503,8 @@ func NewConfig() *Config { flag.StringVar(&cfg.OpenPolicyAgentEnvoyMetadata, "open-policy-agent-envoy-metadata", "", "JSON file containing meta-data passed as input for compatibility with Envoy policies in the format") flag.DurationVar(&cfg.OpenPolicyAgentCleanerInterval, "open-policy-agent-cleaner-interval", openpolicyagent.DefaultCleanerInterval, "Duration in seconds to wait before cleaning up unused opa instances") flag.DurationVar(&cfg.OpenPolicyAgentStartupTimeout, "open-policy-agent-startup-timeout", openpolicyagent.DefaultOpaStartupTimeout, "Maximum duration in seconds to wait for the open policy agent to start up") + flag.Int64Var(&cfg.OpenPolicyAgentMaxRequestBodySize, "open-policy-agent-max-request-body-size", openpolicyagent.DefaultMaxRequestBodySize, "Maximum number of bytes from a http request body that are passed as input to the policy") + flag.Int64Var(&cfg.OpenPolicyAgentMaxMemoryBodyParsing, "open-policy-agent-max-memory-body-parsing", openpolicyagent.DefaultMaxMemoryBodyParsing, "Total number of bytes used to parse http request bodies across all requests. Once the limit is met, requests will be rejected.") // TLS client certs flag.StringVar(&cfg.ClientKeyFile, "client-tls-key", "", "TLS Key file for backend connections, multiple keys may be given comma separated - the order must match the certs") @@ -901,11 +905,13 @@ func (c *Config) ToOptions() skipper.Options { LuaModules: c.LuaModules.values, LuaSources: c.LuaSources.values, - EnableOpenPolicyAgent: c.EnableOpenPolicyAgent, - OpenPolicyAgentConfigTemplate: c.OpenPolicyAgentConfigTemplate, - OpenPolicyAgentEnvoyMetadata: c.OpenPolicyAgentEnvoyMetadata, - OpenPolicyAgentCleanerInterval: c.OpenPolicyAgentCleanerInterval, - OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout, + EnableOpenPolicyAgent: c.EnableOpenPolicyAgent, + OpenPolicyAgentConfigTemplate: c.OpenPolicyAgentConfigTemplate, + OpenPolicyAgentEnvoyMetadata: c.OpenPolicyAgentEnvoyMetadata, + OpenPolicyAgentCleanerInterval: c.OpenPolicyAgentCleanerInterval, + OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout, + OpenPolicyAgentMaxRequestBodySize: c.OpenPolicyAgentMaxRequestBodySize, + OpenPolicyAgentMaxMemoryBodyParsing: c.OpenPolicyAgentMaxMemoryBodyParsing, } for _, rcci := range c.CloneRoute { eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl) diff --git a/config/config_test.go b/config/config_test.go index 2eeecba8da..2510771f24 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -10,6 +10,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "github.com/zalando/skipper/filters/openpolicyagent" "github.com/zalando/skipper/net" "github.com/zalando/skipper/proxy" "gopkg.in/yaml.v2" @@ -160,8 +161,10 @@ func defaultConfig() *Config { ValidateQueryLog: true, LuaModules: commaListFlag(), LuaSources: commaListFlag(), - OpenPolicyAgentCleanerInterval: 10 * time.Second, + OpenPolicyAgentCleanerInterval: openpolicyagent.DefaultCleanIdlePeriod, OpenPolicyAgentStartupTimeout: 30 * time.Second, + OpenPolicyAgentMaxRequestBodySize: openpolicyagent.DefaultMaxRequestBodySize, + OpenPolicyAgentMaxMemoryBodyParsing: openpolicyagent.DefaultMaxMemoryBodyParsing, } } diff --git a/docs/reference/filters.md b/docs/reference/filters.md index 6a451785ed..3ee8b64e83 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -1844,6 +1844,20 @@ Headers both to the upstream and the downstream service can be manipulated the s This allows both to add and remove unwanted headers in allow/deny cases. +#### opaAuthorizeRequestWithBody + +Requests can also be authorized based on the request body the same way that is supported with the [Open Policy Agent Envoy plugin](https://www.openpolicyagent.org/docs/latest/envoy-primer/#example-input), look for the input attribute `parsed_body` in the upstream documentation. + +This filter has the same parameters that the `opaAuthorizeRequest` filter has. + +A request's body is parsed up to a maximum size with a default of 1MB that can be configured via the `-open-policy-agent-max-request-body-size` command line argument. To avoid OOM errors due to too many concurrent authorized body requests, another flag `-open-policy-agent-max-memory-body-parsing` controls how much memory can be used across all requests with a default of 100MB. If in-flight requests that use body authorization exceed that limit, incoming requests that use the body will be rejected with an internal server error. The number of concurrent requests is + +$$ n_{max-memory-body-parsing} \over min(avg(n_{request-content-length}), n_{max-request-body-size}) $$ + +so if requests on average have 100KB and the maximum memory is set to 100MB, on average 1024 authorized requests can be processed concurrently. + +The filter also honors the `skip-request-body-parse` of the corresponding [configuration](https://www.openpolicyagent.org/docs/latest/envoy-introduction/#configuration) that the OPA plugin uses. + #### opaServeResponse Always serves the response even if the policy allows the request and can customize the response completely. Can be used to re-implement legacy authorization services by already using data in Open Policy Agent but implementing an old REST API. This can also be useful to support Single Page Applications to return the calling users' permissions. @@ -1887,6 +1901,20 @@ For this filter, the data flow looks like this independent of an allow/deny deci ``` +#### opaServeResponseWithReqBody + +If you want to serve requests directly from an Open Policy Agent policy that uses the request body, this can be done by using the `input.parsed_body` attribute the same way that is supported with the [Open Policy Agent Envoy plugin](https://www.openpolicyagent.org/docs/latest/envoy-primer/#example-input). + +This filter has the same parameters that the `opaServeResponse` filter has. + +A request's body is parsed up to a maximum size with a default of 1MB that can be configured via the `-open-policy-agent-max-request-body-size` command line argument. To avoid OOM errors due to too many concurrent authorized body requests, another flag `-open-policy-agent-max-memory-body-parsing` controls how much memory can be used across all requests with a default of 100MB. If in-flight requests that use body authorization exceed that limit, incoming requests that use the body will be rejected with an internal server error. The number of concurrent requests is + +$$ n_{max-memory-body-parsing} \over min(avg(n_{request-content-length}), n_{max-request-body-size}) $$ + +so if requests on average have 100KB and the maximum memory is set to 100MB, on average 1024 authorized requests can be processed concurrently. + +The filter also honors the `skip-request-body-parse` of the corresponding [configuration](https://www.openpolicyagent.org/docs/latest/envoy-introduction/#configuration) that the OPA plugin uses. + ## Cookie Handling ### dropRequestCookie diff --git a/filters/filters.go b/filters/filters.go index b99c14979b..c79fc80978 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -347,7 +347,9 @@ const ( ConsistentHashKeyName = "consistentHashKey" ConsistentHashBalanceFactorName = "consistentHashBalanceFactor" OpaAuthorizeRequestName = "opaAuthorizeRequest" + OpaAuthorizeRequestWithBodyName = "opaAuthorizeRequestWithBody" OpaServeResponseName = "opaServeResponse" + OpaServeResponseWithReqBodyName = "opaServeResponseWithReqBody" // Undocumented filters HealthCheckName = "healthcheck" diff --git a/filters/openpolicyagent/evaluation.go b/filters/openpolicyagent/evaluation.go index cc8ed0042c..aebac29fe1 100644 --- a/filters/openpolicyagent/evaluation.go +++ b/filters/openpolicyagent/evaluation.go @@ -41,7 +41,7 @@ func (opa *OpenPolicyAgentInstance) Eval(ctx context.Context, req *ext_authz_v3. } logger := opa.manager.Logger().WithFields(map[string]interface{}{"decision-id": result.DecisionID}) - input, err = envoyauth.RequestToInput(req, logger, nil, true) + input, err = envoyauth.RequestToInput(req, logger, nil, opa.EnvoyPluginConfig().SkipRequestBodyParse) if err != nil { return nil, fmt.Errorf("failed to convert request to input: %w", err) } diff --git a/filters/openpolicyagent/internal/envoy/envoyplugin.go b/filters/openpolicyagent/internal/envoy/envoyplugin.go index a0a88a940f..6e143ae7e4 100644 --- a/filters/openpolicyagent/internal/envoy/envoyplugin.go +++ b/filters/openpolicyagent/internal/envoy/envoyplugin.go @@ -48,8 +48,10 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) { // PluginConfig represents the plugin configuration. type PluginConfig struct { - Path string `json:"path"` - DryRun bool `json:"dry-run"` + Path string `json:"path"` + DryRun bool `json:"dry-run"` + SkipRequestBodyParse bool `json:"skip-request-body-parse"` + ParsedQuery ast.Body } diff --git a/filters/openpolicyagent/internal/envoy/skipperadapter.go b/filters/openpolicyagent/internal/envoy/skipperadapter.go index 918d9099d7..9f8f0acae9 100644 --- a/filters/openpolicyagent/internal/envoy/skipperadapter.go +++ b/filters/openpolicyagent/internal/envoy/skipperadapter.go @@ -8,7 +8,7 @@ import ( ext_authz_v3 "github.com/envoyproxy/go-control-plane/envoy/service/auth/v3" ) -func AdaptToExtAuthRequest(req *http.Request, metadata *ext_authz_v3_core.Metadata, contextExtensions map[string]string) *ext_authz_v3.CheckRequest { +func AdaptToExtAuthRequest(req *http.Request, metadata *ext_authz_v3_core.Metadata, contextExtensions map[string]string, rawBody []byte) *ext_authz_v3.CheckRequest { headers := make(map[string]string, len(req.Header)) for h, vv := range req.Header { @@ -25,6 +25,7 @@ func AdaptToExtAuthRequest(req *http.Request, metadata *ext_authz_v3_core.Metada Method: req.Method, Path: req.URL.Path, Headers: headers, + RawBody: rawBody, }, }, ContextExtensions: contextExtensions, diff --git a/filters/openpolicyagent/opaauthorizerequest/opaauthorizerequest.go b/filters/openpolicyagent/opaauthorizerequest/opaauthorizerequest.go index 17bc4292ec..3b9cf38107 100644 --- a/filters/openpolicyagent/opaauthorizerequest/opaauthorizerequest.go +++ b/filters/openpolicyagent/opaauthorizerequest/opaauthorizerequest.go @@ -1,10 +1,14 @@ package opaauthorizerequest import ( - ext_authz_v3_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + "encoding/json" + "errors" + "io" "net/http" "time" + ext_authz_v3_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + "github.com/zalando/skipper/filters" "gopkg.in/yaml.v2" @@ -15,19 +19,31 @@ import ( const responseHeadersKey = "open-policy-agent:decision-response-headers" type spec struct { - registry *openpolicyagent.OpenPolicyAgentRegistry - opts []func(*openpolicyagent.OpenPolicyAgentInstanceConfig) error + registry *openpolicyagent.OpenPolicyAgentRegistry + opts []func(*openpolicyagent.OpenPolicyAgentInstanceConfig) error + name string + bodyParsing bool } func NewOpaAuthorizeRequestSpec(registry *openpolicyagent.OpenPolicyAgentRegistry, opts ...func(*openpolicyagent.OpenPolicyAgentInstanceConfig) error) filters.Spec { return &spec{ registry: registry, opts: opts, + name: filters.OpaAuthorizeRequestName, + } +} + +func NewOpaAuthorizeRequestWithBodySpec(registry *openpolicyagent.OpenPolicyAgentRegistry, opts ...func(*openpolicyagent.OpenPolicyAgentInstanceConfig) error) filters.Spec { + return &spec{ + registry: registry, + opts: opts, + name: filters.OpaAuthorizeRequestWithBodyName, + bodyParsing: true, } } func (s *spec) Name() string { - return filters.OpaAuthorizeRequestName + return s.name } func (s *spec) CreateFilter(args []interface{}) (filters.Filter, error) { @@ -75,6 +91,7 @@ func (s *spec) CreateFilter(args []interface{}) (filters.Filter, error) { opa: opa, registry: s.registry, envoyContextExtensions: envoyContextExtensions, + bodyParsing: s.bodyParsing, }, nil } @@ -82,6 +99,7 @@ type opaAuthorizeRequestFilter struct { opa *openpolicyagent.OpenPolicyAgentInstance registry *openpolicyagent.OpenPolicyAgentRegistry envoyContextExtensions map[string]string + bodyParsing bool } func (f *opaAuthorizeRequestFilter) Request(fc filters.FilterContext) { @@ -89,12 +107,32 @@ func (f *opaAuthorizeRequestFilter) Request(fc filters.FilterContext) { span, ctx := f.opa.StartSpanFromFilterContext(fc) defer span.Finish() - authzreq := envoy.AdaptToExtAuthRequest(req, f.opa.InstanceConfig().GetEnvoyMetadata(), f.envoyContextExtensions) + var rawBodyBytes []byte + if f.bodyParsing { + var body io.ReadCloser + var err error + var finalizer func() + body, rawBodyBytes, finalizer, err = f.opa.ExtractHttpBodyOptionally(req) + defer finalizer() + if err != nil { + f.opa.HandleInvalidDecisionError(fc, span, nil, err, !f.opa.EnvoyPluginConfig().DryRun) + return + } + req.Body = body + } + + authzreq := envoy.AdaptToExtAuthRequest(req, f.opa.InstanceConfig().GetEnvoyMetadata(), f.envoyContextExtensions, rawBodyBytes) start := time.Now() result, err := f.opa.Eval(ctx, authzreq) fc.Metrics().MeasureSince(f.opa.MetricsKey("eval_time"), start) + var jsonErr *json.SyntaxError + if errors.As(err, &jsonErr) { + f.opa.HandleEvaluationError(fc, span, result, err, !f.opa.EnvoyPluginConfig().DryRun, http.StatusBadRequest) + return + } + if err != nil { f.opa.HandleInvalidDecisionError(fc, span, result, err, !f.opa.EnvoyPluginConfig().DryRun) return diff --git a/filters/openpolicyagent/opaauthorizerequest/opaauthorizerequest_test.go b/filters/openpolicyagent/opaauthorizerequest/opaauthorizerequest_test.go index 84b940249c..33b88f8d36 100644 --- a/filters/openpolicyagent/opaauthorizerequest/opaauthorizerequest_test.go +++ b/filters/openpolicyagent/opaauthorizerequest/opaauthorizerequest_test.go @@ -5,6 +5,7 @@ import ( "io" "net/http" "net/http/httptest" + "strings" "testing" opasdktest "github.com/open-policy-agent/opa/sdk/test" @@ -19,9 +20,14 @@ import ( func TestAuthorizeRequestFilter(t *testing.T) { for _, ti := range []struct { msg string + filterName string + extraeskip string bundleName string regoQuery string requestPath string + requestMethod string + requestHeaders http.Header + body string contextExtensions string expectedBody string expectedHeaders http.Header @@ -31,9 +37,11 @@ func TestAuthorizeRequestFilter(t *testing.T) { }{ { msg: "Allow Requests", + filterName: "opaAuthorizeRequest", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow", requestPath: "/allow", + requestMethod: "GET", contextExtensions: "", expectedStatus: http.StatusOK, expectedBody: "Welcome!", @@ -43,9 +51,11 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Allow Matching Context Extension", + filterName: "opaAuthorizeRequest", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow_context_extensions", requestPath: "/allow", + requestMethod: "GET", contextExtensions: "com.mycompany.myprop: myvalue", expectedStatus: http.StatusOK, expectedBody: "Welcome!", @@ -66,9 +76,11 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Simple Forbidden", + filterName: "opaAuthorizeRequest", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow", requestPath: "/forbidden", + requestMethod: "GET", contextExtensions: "", expectedStatus: http.StatusForbidden, expectedHeaders: make(http.Header), @@ -77,9 +89,11 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Allow With Structured Rules", + filterName: "opaAuthorizeRequest", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow_object", requestPath: "/allow/structured", + requestMethod: "GET", contextExtensions: "", expectedStatus: http.StatusOK, expectedBody: "Welcome!", @@ -89,9 +103,11 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Forbidden With Body", + filterName: "opaAuthorizeRequest", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow_object", requestPath: "/forbidden", + requestMethod: "GET", contextExtensions: "", expectedStatus: http.StatusUnauthorized, expectedHeaders: map[string][]string{"X-Ext-Auth-Allow": {"no"}}, @@ -101,9 +117,11 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Misconfigured Rego Query", + filterName: "opaAuthorizeRequest", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/invalid_path", requestPath: "/allow", + requestMethod: "GET", contextExtensions: "", expectedStatus: http.StatusInternalServerError, expectedBody: "", @@ -113,9 +131,11 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Wrong Query Data Type", + filterName: "opaAuthorizeRequest", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow_wrong_type", requestPath: "/allow", + requestMethod: "GET", contextExtensions: "", expectedStatus: http.StatusInternalServerError, expectedBody: "", @@ -125,9 +145,11 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Wrong Query Data Type", + filterName: "opaAuthorizeRequest", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow_object_invalid_headers_to_remove", requestPath: "/allow", + requestMethod: "GET", contextExtensions: "", expectedStatus: http.StatusInternalServerError, expectedBody: "", @@ -137,9 +159,11 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Wrong Query Data Type", + filterName: "opaAuthorizeRequest", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow_object_invalid_headers", requestPath: "/allow", + requestMethod: "GET", contextExtensions: "", expectedStatus: http.StatusInternalServerError, expectedBody: "", @@ -147,6 +171,81 @@ func TestAuthorizeRequestFilter(t *testing.T) { backendHeaders: make(http.Header), removeHeaders: make(http.Header), }, + { + msg: "Allow With Body", + filterName: "opaAuthorizeRequestWithBody", + bundleName: "somebundle.tar.gz", + regoQuery: "envoy/authz/allow_body", + requestMethod: "POST", + body: `{ "target_id" : "123456" }`, + requestHeaders: map[string][]string{"content-type": {"application/json"}}, + requestPath: "/allow_body", + expectedStatus: http.StatusOK, + expectedBody: "Welcome!", + expectedHeaders: make(http.Header), + backendHeaders: make(http.Header), + removeHeaders: make(http.Header), + }, + { + msg: "Forbidden With Body", + filterName: "opaAuthorizeRequestWithBody", + bundleName: "somebundle.tar.gz", + regoQuery: "envoy/authz/allow_body", + requestMethod: "POST", + body: `{ "target_id" : "wrong id" }`, + requestHeaders: map[string][]string{"content-type": {"application/json"}}, + requestPath: "/allow_body", + expectedStatus: http.StatusForbidden, + expectedBody: "", + expectedHeaders: make(http.Header), + backendHeaders: make(http.Header), + removeHeaders: make(http.Header), + }, + { + msg: "GET against body protected endpoint", + filterName: "opaAuthorizeRequestWithBody", + bundleName: "somebundle.tar.gz", + regoQuery: "envoy/authz/allow_body", + requestMethod: "GET", + requestHeaders: map[string][]string{"content-type": {"application/json"}}, + requestPath: "/allow_body", + expectedStatus: http.StatusForbidden, + expectedBody: "", + expectedHeaders: make(http.Header), + backendHeaders: make(http.Header), + removeHeaders: make(http.Header), + }, + { + msg: "Broken Body", + filterName: "opaAuthorizeRequestWithBody", + bundleName: "somebundle.tar.gz", + regoQuery: "envoy/authz/allow_body", + requestMethod: "POST", + body: `{ "target_id" / "wrong id" }`, + requestHeaders: map[string][]string{"content-type": {"application/json"}}, + requestPath: "/allow_body", + expectedStatus: http.StatusBadRequest, + expectedBody: "", + expectedHeaders: make(http.Header), + backendHeaders: make(http.Header), + removeHeaders: make(http.Header), + }, + { + msg: "Chained OPA filter with body", + filterName: "opaAuthorizeRequestWithBody", + extraeskip: `-> opaAuthorizeRequestWithBody("somebundle.tar.gz")`, + bundleName: "somebundle.tar.gz", + regoQuery: "envoy/authz/allow_body", + requestMethod: "POST", + body: `{ "target_id" : "123456" }`, + requestHeaders: map[string][]string{"content-type": {"application/json"}}, + requestPath: "/allow_body", + expectedStatus: http.StatusOK, + expectedBody: "Welcome!", + expectedHeaders: make(http.Header), + backendHeaders: make(http.Header), + removeHeaders: make(http.Header), + }, } { t.Run(ti.msg, func(t *testing.T) { t.Logf("Running test for %v", ti) @@ -154,6 +253,12 @@ func TestAuthorizeRequestFilter(t *testing.T) { w.Write([]byte("Welcome!")) assert.True(t, isHeadersPresent(t, ti.backendHeaders, r.Header), "Enriched request header is absent.") assert.True(t, isHeadersAbsent(t, ti.removeHeaders, r.Header), "Unwanted HTTP Headers present.") + + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, ti.body, string(body)) })) opaControlPlane := opasdktest.MustNewServer( @@ -211,6 +316,12 @@ func TestAuthorizeRequestFilter(t *testing.T) { "allowed": true, "headers": "bogus string instead of object" } + + default allow_body = false + + allow_body { + input.parsed_body.target_id == "123456" + } `, }), ) @@ -242,17 +353,29 @@ func TestAuthorizeRequestFilter(t *testing.T) { opaFactory := openpolicyagent.NewOpenPolicyAgentRegistry() ftSpec := NewOpaAuthorizeRequestSpec(opaFactory, openpolicyagent.WithConfigTemplate(config)) fr.Register(ftSpec) + ftSpec = NewOpaAuthorizeRequestWithBodySpec(opaFactory, openpolicyagent.WithConfigTemplate(config)) + fr.Register(ftSpec) - r := eskip.MustParse(fmt.Sprintf(`* -> opaAuthorizeRequest("%s", "%s") -> "%s"`, ti.bundleName, ti.contextExtensions, clientServer.URL)) + r := eskip.MustParse(fmt.Sprintf(`* -> %s("%s", "%s") %s -> "%s"`, ti.filterName, ti.bundleName, ti.contextExtensions, ti.extraeskip, clientServer.URL)) proxy := proxytest.New(fr, r...) - req, err := http.NewRequest("GET", proxy.URL+ti.requestPath, nil) + var bodyReader io.Reader + if ti.body != "" { + bodyReader = strings.NewReader(ti.body) + } + + req, err := http.NewRequest(ti.requestMethod, proxy.URL+ti.requestPath, bodyReader) for name, values := range ti.removeHeaders { for _, value := range values { req.Header.Add(name, value) //adding the headers to validate removal. } } + for name, values := range ti.requestHeaders { + for _, value := range values { + req.Header.Add(name, value) + } + } assert.NoError(t, err) diff --git a/filters/openpolicyagent/opaserveresponse/opaserveresponse.go b/filters/openpolicyagent/opaserveresponse/opaserveresponse.go index 6134fa0d74..76e8b34501 100644 --- a/filters/openpolicyagent/opaserveresponse/opaserveresponse.go +++ b/filters/openpolicyagent/opaserveresponse/opaserveresponse.go @@ -1,6 +1,7 @@ package opaserveresponse import ( + "io" "time" "github.com/zalando/skipper/filters" @@ -11,19 +12,32 @@ import ( ) type spec struct { - registry *openpolicyagent.OpenPolicyAgentRegistry - opts []func(*openpolicyagent.OpenPolicyAgentInstanceConfig) error + registry *openpolicyagent.OpenPolicyAgentRegistry + opts []func(*openpolicyagent.OpenPolicyAgentInstanceConfig) error + name string + bodyParsing bool } func NewOpaServeResponseSpec(registry *openpolicyagent.OpenPolicyAgentRegistry, opts ...func(*openpolicyagent.OpenPolicyAgentInstanceConfig) error) filters.Spec { return &spec{ - registry: registry, - opts: opts, + registry: registry, + opts: opts, + name: filters.OpaServeResponseName, + bodyParsing: false, + } +} + +func NewOpaServeResponseWithReqBodySpec(registry *openpolicyagent.OpenPolicyAgentRegistry, opts ...func(*openpolicyagent.OpenPolicyAgentInstanceConfig) error) filters.Spec { + return &spec{ + registry: registry, + opts: opts, + name: filters.OpaServeResponseWithReqBodyName, + bodyParsing: true, } } func (s *spec) Name() string { - return filters.OpaServeResponseName + return s.name } func (s *spec) CreateFilter(args []interface{}) (filters.Filter, error) { @@ -70,6 +84,7 @@ func (s *spec) CreateFilter(args []interface{}) (filters.Filter, error) { opa: opa, registry: s.registry, envoyContextExtensions: envoyContextExtensions, + bodyParsing: s.bodyParsing, }, nil } @@ -77,13 +92,29 @@ type opaServeResponseFilter struct { opa *openpolicyagent.OpenPolicyAgentInstance registry *openpolicyagent.OpenPolicyAgentRegistry envoyContextExtensions map[string]string + bodyParsing bool } func (f *opaServeResponseFilter) Request(fc filters.FilterContext) { span, ctx := f.opa.StartSpanFromFilterContext(fc) defer span.Finish() + req := fc.Request() + + var rawBodyBytes []byte + if f.bodyParsing { + var body io.ReadCloser + var err error + var finalizer func() + body, rawBodyBytes, finalizer, err = f.opa.ExtractHttpBodyOptionally(req) + defer finalizer() + if err != nil { + f.opa.ServeInvalidDecisionError(fc, span, nil, err) + return + } + req.Body = body + } - authzreq := envoy.AdaptToExtAuthRequest(fc.Request(), f.opa.InstanceConfig().GetEnvoyMetadata(), f.envoyContextExtensions) + authzreq := envoy.AdaptToExtAuthRequest(fc.Request(), f.opa.InstanceConfig().GetEnvoyMetadata(), f.envoyContextExtensions, rawBodyBytes) start := time.Now() result, err := f.opa.Eval(ctx, authzreq) diff --git a/filters/openpolicyagent/opaserveresponse/opaserveresponse_test.go b/filters/openpolicyagent/opaserveresponse/opaserveresponse_test.go index 7fec7665c8..92d721a098 100644 --- a/filters/openpolicyagent/opaserveresponse/opaserveresponse_test.go +++ b/filters/openpolicyagent/opaserveresponse/opaserveresponse_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "net/url" "path" + "strings" "testing" opasdktest "github.com/open-policy-agent/opa/sdk/test" @@ -21,9 +22,12 @@ import ( func TestAuthorizeRequestFilter(t *testing.T) { for _, ti := range []struct { msg string + filterName string bundleName string regoQuery string requestPath string + requestHeaders http.Header + body string expectedBody string contextExtensions string expectedHeaders http.Header @@ -31,6 +35,7 @@ func TestAuthorizeRequestFilter(t *testing.T) { }{ { msg: "Allow Requests", + filterName: "opaServeResponse", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow", requestPath: "allow", @@ -40,6 +45,7 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Simple Forbidden", + filterName: "opaServeResponse", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow", requestPath: "forbidden", @@ -48,6 +54,7 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Misconfigured Rego Query", + filterName: "opaServeResponse", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/invalid_path", requestPath: "allow", @@ -57,6 +64,7 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Allow With Structured Rules", + filterName: "opaServeResponse", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow_object", requestPath: "allow/structured", @@ -84,6 +92,7 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Allow With Structured Body", + filterName: "opaServeResponse", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow_object_structured_body", requestPath: "allow/structured", @@ -93,6 +102,7 @@ func TestAuthorizeRequestFilter(t *testing.T) { }, { msg: "Allow with context extensions", + filterName: "opaServeResponse", bundleName: "somebundle.tar.gz", regoQuery: "envoy/authz/allow_object_contextextensions", requestPath: "allow/structured", @@ -101,6 +111,18 @@ func TestAuthorizeRequestFilter(t *testing.T) { expectedHeaders: map[string][]string{"X-Ext-Auth-Allow": {"yes"}}, expectedBody: `{"hello":"world"}`, }, + { + msg: "Use request body", + filterName: "opaServeResponseWithReqBody", + bundleName: "somebundle.tar.gz", + regoQuery: "envoy/authz/allow_object_req_body", + requestPath: "allow/allow_object_req_body", + requestHeaders: map[string][]string{"content-type": {"application/json"}}, + body: `{"hello":"world"}`, + expectedStatus: http.StatusOK, + expectedBody: `{"hello":"world"}`, + expectedHeaders: map[string][]string{}, + }, } { t.Run(ti.msg, func(t *testing.T) { t.Logf("Running test for %v", ti) @@ -168,7 +190,6 @@ func TestAuthorizeRequestFilter(t *testing.T) { } } - allow_object_contextextensions = response { input.parsed_path = [ "allow", "structured" ] response := { @@ -179,6 +200,15 @@ func TestAuthorizeRequestFilter(t *testing.T) { } } + allow_object_req_body = response { + response := { + "allowed": true, + "headers": {}, + "body": json.marshal(input.parsed_body), + "http_status": 200 + } + } + `, }), ) @@ -202,13 +232,20 @@ func TestAuthorizeRequestFilter(t *testing.T) { "plugins": { "envoy_ext_authz_grpc": { "path": %q, - "dry-run": false + "dry-run": false, + "skip-request-body-parse": false } + }, + "decision_logs": { + "console": true } }`, opaControlPlane.URL(), ti.regoQuery)) opaFactory := openpolicyagent.NewOpenPolicyAgentRegistry() ftSpec := NewOpaServeResponseSpec(opaFactory, openpolicyagent.WithConfigTemplate(config)) + fr.Register(ftSpec) + ftSpec = NewOpaServeResponseWithReqBodySpec(opaFactory, openpolicyagent.WithConfigTemplate(config)) + fr.Register(ftSpec) filterArgs := []interface{}{ti.bundleName} if ti.contextExtensions != "" { @@ -218,17 +255,20 @@ func TestAuthorizeRequestFilter(t *testing.T) { _, err := ftSpec.CreateFilter(filterArgs) assert.NoErrorf(t, err, "error in creating filter: %v", err) - fr.Register(ftSpec) - - r := eskip.MustParse(fmt.Sprintf(`* -> opaServeResponse("%s", "%s") -> "%s"`, ti.bundleName, ti.contextExtensions, clientServer.URL)) + r := eskip.MustParse(fmt.Sprintf(`* -> %s("%s", "%s") -> "%s"`, ti.filterName, ti.bundleName, ti.contextExtensions, clientServer.URL)) proxy := proxytest.New(fr, r...) reqURL, err := url.Parse(proxy.URL) assert.NoErrorf(t, err, "Failed to parse url %s: %v", proxy.URL, err) reqURL.Path = path.Join(reqURL.Path, ti.requestPath) - req, err := http.NewRequest("GET", reqURL.String(), nil) + req, err := http.NewRequest("GET", reqURL.String(), strings.NewReader(ti.body)) assert.NoError(t, err) + for name, values := range ti.requestHeaders { + for _, value := range values { + req.Header.Add(name, value) + } + } rsp, err := proxy.Client().Do(req) assert.NoError(t, err) diff --git a/filters/openpolicyagent/openpolicyagent.go b/filters/openpolicyagent/openpolicyagent.go index 6e4b802982..32ab0bbafd 100644 --- a/filters/openpolicyagent/openpolicyagent.go +++ b/filters/openpolicyagent/openpolicyagent.go @@ -3,7 +3,10 @@ package openpolicyagent import ( "bytes" "context" + "errors" "fmt" + "io" + "net/http" "os" "strings" "sync" @@ -14,6 +17,7 @@ import ( "github.com/google/uuid" "github.com/open-policy-agent/opa/ast" "github.com/open-policy-agent/opa/config" + "github.com/open-policy-agent/opa/dependencies" "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/plugins" "github.com/open-policy-agent/opa/plugins/discovery" @@ -24,6 +28,7 @@ import ( iCache "github.com/open-policy-agent/opa/topdown/cache" opatracing "github.com/open-policy-agent/opa/tracing" "github.com/opentracing/opentracing-go" + "golang.org/x/sync/semaphore" "google.golang.org/protobuf/encoding/protojson" "github.com/zalando/skipper/filters" @@ -33,10 +38,14 @@ import ( ) const ( + DefaultCleanIdlePeriod = 10 * time.Second defaultReuseDuration = 30 * time.Second defaultShutdownGracePeriod = 30 * time.Second - DefaultCleanerInterval = 10 * time.Second DefaultOpaStartupTimeout = 30 * time.Second + + DefaultMaxRequestBodySize = 1 << 20 // 1 MB + DefaultMaxMemoryBodyParsing = 100 * DefaultMaxRequestBodySize + defaultBodyBufferSize = 8192 * 1024 ) type OpenPolicyAgentRegistry struct { @@ -55,6 +64,10 @@ type OpenPolicyAgentRegistry struct { quit chan struct{} reuseDuration time.Duration cleanInterval time.Duration + + maxMemoryBodyParsingSem *semaphore.Weighted + maxRequestBodyBytes int64 + bodyReadBufferSize int64 } type OpenPolicyAgentFilter interface { @@ -68,6 +81,27 @@ func WithReuseDuration(duration time.Duration) func(*OpenPolicyAgentRegistry) er } } +func WithMaxRequestBodyBytes(n int64) func(*OpenPolicyAgentRegistry) error { + return func(cfg *OpenPolicyAgentRegistry) error { + cfg.maxRequestBodyBytes = n + return nil + } +} + +func WithMaxMemoryBodyParsing(n int64) func(*OpenPolicyAgentRegistry) error { + return func(cfg *OpenPolicyAgentRegistry) error { + cfg.maxMemoryBodyParsingSem = semaphore.NewWeighted(n) + return nil + } +} + +func WithReadBodyBufferSize(n int64) func(*OpenPolicyAgentRegistry) error { + return func(cfg *OpenPolicyAgentRegistry) error { + cfg.bodyReadBufferSize = n + return nil + } +} + func WithCleanInterval(interval time.Duration) func(*OpenPolicyAgentRegistry) error { return func(cfg *OpenPolicyAgentRegistry) error { cfg.cleanInterval = interval @@ -77,17 +111,23 @@ func WithCleanInterval(interval time.Duration) func(*OpenPolicyAgentRegistry) er func NewOpenPolicyAgentRegistry(opts ...func(*OpenPolicyAgentRegistry) error) *OpenPolicyAgentRegistry { registry := &OpenPolicyAgentRegistry{ - reuseDuration: defaultReuseDuration, - cleanInterval: DefaultCleanerInterval, - instances: make(map[string]*OpenPolicyAgentInstance), - lastused: make(map[*OpenPolicyAgentInstance]time.Time), - quit: make(chan struct{}), + reuseDuration: defaultReuseDuration, + cleanInterval: DefaultCleanIdlePeriod, + instances: make(map[string]*OpenPolicyAgentInstance), + lastused: make(map[*OpenPolicyAgentInstance]time.Time), + quit: make(chan struct{}), + maxRequestBodyBytes: DefaultMaxMemoryBodyParsing, + bodyReadBufferSize: defaultBodyBufferSize, } for _, opt := range opts { opt(registry) } + if registry.maxMemoryBodyParsingSem == nil { + registry.maxMemoryBodyParsingSem = semaphore.NewWeighted(DefaultMaxMemoryBodyParsing) + } + go registry.startCleanerDaemon() return registry @@ -290,7 +330,8 @@ func (registry *OpenPolicyAgentRegistry) newOpenPolicyAgentInstance(bundleName s return nil, err } - engine, err := New(inmem.New(), configBytes, config, filterName, bundleName) + engine, err := registry.new(inmem.New(), configBytes, config, filterName, bundleName, + registry.maxRequestBodyBytes, registry.bodyReadBufferSize) if err != nil { return nil, err } @@ -315,6 +356,10 @@ type OpenPolicyAgentInstance struct { interQueryBuiltinCache iCache.InterQueryCache once sync.Once stopped bool + registry *OpenPolicyAgentRegistry + + maxBodyBytes int64 + bodyReadBufferSize int64 } func envVariablesMap() map[string]string { @@ -347,8 +392,8 @@ func interpolateConfigTemplate(configTemplate []byte, bundleName string) ([]byte return buf.Bytes(), nil } -// New returns a new OPA object. -func New(store storage.Store, configBytes []byte, instanceConfig OpenPolicyAgentInstanceConfig, filterName string, bundleName string) (*OpenPolicyAgentInstance, error) { +// new returns a new OPA object. +func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes []byte, instanceConfig OpenPolicyAgentInstanceConfig, filterName string, bundleName string, maxBodyBytes int64, bodyReadBufferSize int64) (*OpenPolicyAgentInstance, error) { id := uuid.New().String() opaConfig, err := config.ParseConfig(configBytes, id) if err != nil { @@ -372,11 +417,15 @@ func New(store storage.Store, configBytes []byte, instanceConfig OpenPolicyAgent manager.Register("discovery", discovery) opa := &OpenPolicyAgentInstance{ + registry: registry, instanceConfig: instanceConfig, manager: manager, opaConfig: opaConfig, bundleName: bundleName, + maxBodyBytes: maxBodyBytes, + bodyReadBufferSize: bodyReadBufferSize, + preparedQueryDoOnce: new(sync.Once), interQueryBuiltinCache: iCache.NewInterQueryCache(manager.InterQueryBuiltinCacheConfig()), } @@ -520,6 +569,119 @@ func (opa *OpenPolicyAgentInstance) MetricsKey(key string) string { return key + "." + opa.bundleName } +var ( + ErrClosed = errors.New("reader closed") + ErrTotalBodyBytesExceeded = errors.New("buffer for in-flight request body authorization in Open Policy Agent exceeded") +) + +type bufferedBodyReader struct { + input io.ReadCloser + maxBufferSize int64 + + bodyBuffer bytes.Buffer + readBuffer []byte + + once sync.Once + err error + closed bool +} + +func newBufferedBodyReader(input io.ReadCloser, maxBufferSize int64, readBufferSize int64) *bufferedBodyReader { + return &bufferedBodyReader{ + input: input, + maxBufferSize: maxBufferSize, + readBuffer: make([]byte, readBufferSize), + } +} + +func (m *bufferedBodyReader) fillBuffer(expectedSize int64) ([]byte, error) { + var err error + + for err == nil && int64(m.bodyBuffer.Len()) < m.maxBufferSize && int64(m.bodyBuffer.Len()) < expectedSize { + var n int + n, err = m.input.Read(m.readBuffer) + + m.bodyBuffer.Write(m.readBuffer[:n]) + } + + if err == io.EOF { + err = nil + } + + return m.bodyBuffer.Bytes(), err +} + +func (m *bufferedBodyReader) Read(p []byte) (int, error) { + if m.closed { + return 0, ErrClosed + } + + if m.err != nil { + return 0, m.err + } + + // First read the buffered body + if m.bodyBuffer.Len() != 0 { + return m.bodyBuffer.Read(p) + } + + // Continue reading from the underlying body reader + return m.input.Read(p) +} + +// Close closes the undelrying reader if it implements io.Closer. +func (m *bufferedBodyReader) Close() error { + var err error + m.once.Do(func() { + m.closed = true + if c, ok := m.input.(io.Closer); ok { + err = c.Close() + } + }) + return err +} + +func bodyUpperBound(contentLength, maxBodyBytes int64) int64 { + if contentLength <= 0 { + return maxBodyBytes + } + + if contentLength < maxBodyBytes { + return contentLength + } + + return maxBodyBytes +} + +func (opa *OpenPolicyAgentInstance) ExtractHttpBodyOptionally(req *http.Request) (io.ReadCloser, []byte, func(), error) { + body := req.Body + + if body != nil && !opa.EnvoyPluginConfig().SkipRequestBodyParse && + req.ContentLength <= int64(opa.maxBodyBytes) { + + bases, err := dependencies.Base(opa.Compiler(), opa.EnvoyPluginConfig().ParsedQuery) + if err != nil { + return req.Body, nil, func() {}, nil + } + + for _, base := range bases { + if base.HasPrefix(ast.MustParseRef("input.parsed_body")) { + wrapper := newBufferedBodyReader(req.Body, opa.maxBodyBytes, opa.bodyReadBufferSize) + + requestedBodyBytes := bodyUpperBound(req.ContentLength, opa.maxBodyBytes) + if !opa.registry.maxMemoryBodyParsingSem.TryAcquire(requestedBodyBytes) { + return req.Body, nil, func() {}, ErrTotalBodyBytesExceeded + } + + rawBody, err := wrapper.fillBuffer(req.ContentLength) + return wrapper, rawBody, func() { opa.registry.maxMemoryBodyParsingSem.Release(requestedBodyBytes) }, err + } + } + } + + return req.Body, nil, func() {}, nil +} + // ParsedQuery is an implementation of the envoyauth.EvalContext interface func (opa *OpenPolicyAgentInstance) ParsedQuery() ast.Body { return opa.EnvoyPluginConfig().ParsedQuery diff --git a/filters/openpolicyagent/openpolicyagent_test.go b/filters/openpolicyagent/openpolicyagent_test.go index 7a9f53e74e..3b3a799e69 100644 --- a/filters/openpolicyagent/openpolicyagent_test.go +++ b/filters/openpolicyagent/openpolicyagent_test.go @@ -1,11 +1,10 @@ package openpolicyagent import ( + "bytes" "context" "encoding/json" "fmt" - "github.com/open-policy-agent/opa/ast" - "github.com/open-policy-agent/opa/storage/inmem" "io" "net/http" "os" @@ -13,12 +12,15 @@ import ( "testing" "time" + "github.com/open-policy-agent/opa/ast" + ext_authz_v3_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" authv3 "github.com/envoyproxy/go-control-plane/envoy/service/auth/v3" _struct "github.com/golang/protobuf/ptypes/struct" "github.com/open-policy-agent/opa-envoy-plugin/envoyauth" opaconf "github.com/open-policy-agent/opa/config" opasdktest "github.com/open-policy-agent/opa/sdk/test" + "github.com/open-policy-agent/opa/storage/inmem" "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/assert" "github.com/zalando/skipper/filters" @@ -146,6 +148,15 @@ func mockControlPlaneWithResourceBundle() (*opasdktest.Server, []byte) { default allow = false `, }), + opasdktest.MockBundle("/bundles/use_body", map[string]string{ + "main.rego": ` + package envoy.authz + + default allow = false + + allow { input.parsed_body } + `, + }), opasdktest.MockBundle("/bundles/anotherbundlename", map[string]string{ "main.rego": ` package envoy.authz @@ -169,7 +180,8 @@ func mockControlPlaneWithResourceBundle() (*opasdktest.Server, []byte) { "plugins": { "envoy_ext_authz_grpc": { "path": "/envoy/authz/allow", - "dry-run": false + "dry-run": false, + "skip-request-body-parse": false } } }`, opaControlPlane.URL())) @@ -233,10 +245,12 @@ func TestRegistry(t *testing.T) { func TestOpaEngineStartFailureWithTimeout(t *testing.T) { _, config := mockControlPlaneWithDiscoveryBundle("bundles/discovery-with-wrong-bundle") + registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second)) assert.NoError(t, err) - engine, err := New(inmem.New(), config, *cfg, "testfilter", "test") + engine, err := registry.new(inmem.New(), config, *cfg, "testfilter", "test", DefaultMaxRequestBodySize, defaultBodyBufferSize) assert.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), cfg.startupTimeout) @@ -495,3 +509,190 @@ func TestResponses(t *testing.T) { assert.True(t, fc.FServed) assert.Equal(t, fc.FResponse.StatusCode, http.StatusInternalServerError) } + +func TestBodyExtraction(t *testing.T) { + + _, config := mockControlPlaneWithResourceBundle() + + for _, ti := range []struct { + msg string + body string + contentLength int64 + maxBodySize int64 + readBodyBuffer int64 + + bodyInPolicy string + }{ + { + msg: "Read body ", + body: `{ "welcome": "world" }`, + maxBodySize: 1024, + readBodyBuffer: defaultBodyBufferSize, + bodyInPolicy: `{ "welcome": "world" }`, + }, + { + msg: "Read body in chunks", + body: `{ "welcome": "world" }`, + maxBodySize: 1024, + readBodyBuffer: 5, + bodyInPolicy: `{ "welcome": "world" }`, + }, + { + msg: "Read body with client sending more data than expected", + body: `{ "welcome": "world" }`, + maxBodySize: 1024, + readBodyBuffer: 5, + contentLength: 5, + bodyInPolicy: `{ "we`, + }, + { + msg: "Read body exhausing max bytes", + body: `{ "welcome": "world" }`, + maxBodySize: 5, + readBodyBuffer: 5, + bodyInPolicy: ``, + }, + } { + t.Run(ti.msg, func(t *testing.T) { + t.Logf("Running test for %v", ti) + + registry := NewOpenPolicyAgentRegistry(WithMaxRequestBodyBytes(ti.maxBodySize), + WithReadBodyBufferSize(ti.readBodyBuffer)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config)) + assert.NoError(t, err) + + inst, err := registry.NewOpenPolicyAgentInstance("use_body", *cfg, "testfilter") + assert.NoError(t, err) + + contentLength := ti.contentLength + if contentLength == 0 { + contentLength = int64(len(ti.body)) + } + + req := http.Request{ + ContentLength: contentLength, + Body: io.NopCloser(bytes.NewReader([]byte(ti.body))), + } + + body, peekBody, finalizer, err := inst.ExtractHttpBodyOptionally(&req) + defer finalizer() + assert.NoError(t, err) + defer body.Close() + + fullBody, err := io.ReadAll(body) + assert.NoError(t, err) + assert.Equal(t, ti.body, string(fullBody), "Full body must be readable") + + assert.Equal(t, ti.bodyInPolicy, string(peekBody), "Body has been read up till maximum") + }) + } +} + +func TestBodyExtractionExhausingTotalBytes(t *testing.T) { + + _, config := mockControlPlaneWithResourceBundle() + + registry := NewOpenPolicyAgentRegistry(WithMaxRequestBodyBytes(21), + WithReadBodyBufferSize(21), + WithMaxMemoryBodyParsing(40)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config)) + assert.NoError(t, err) + + inst, err := registry.NewOpenPolicyAgentInstance("use_body", *cfg, "testfilter") + assert.NoError(t, err) + + req1 := http.Request{ + ContentLength: 21, + Body: io.NopCloser(bytes.NewReader([]byte(`{ "welcome": "world" }`))), + } + + _, _, f1, err := inst.ExtractHttpBodyOptionally(&req1) + assert.NoError(t, err) + + req2 := http.Request{ + ContentLength: 21, + Body: io.NopCloser(bytes.NewReader([]byte(`{ "welcome": "world" }`))), + } + + _, _, f2, err := inst.ExtractHttpBodyOptionally(&req2) + if assert.Error(t, err) { + assert.Equal(t, ErrTotalBodyBytesExceeded, err) + } + + f1() + f2() + + req3 := http.Request{ + ContentLength: 21, + Body: io.NopCloser(bytes.NewReader([]byte(`{ "welcome": "world" }`))), + } + + _, _, f3, err := inst.ExtractHttpBodyOptionally(&req3) + f3() + assert.NoError(t, err) +} + +func TestBodyExtractionEmptyBody(t *testing.T) { + + _, config := mockControlPlaneWithResourceBundle() + + registry := NewOpenPolicyAgentRegistry(WithMaxRequestBodyBytes(21), + WithReadBodyBufferSize(21), + WithMaxMemoryBodyParsing(40)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config)) + assert.NoError(t, err) + + inst, err := registry.NewOpenPolicyAgentInstance("use_body", *cfg, "testfilter") + assert.NoError(t, err) + + req1 := http.Request{ + ContentLength: 0, + Body: nil, + } + + body, bodybytes, f1, err := inst.ExtractHttpBodyOptionally(&req1) + assert.NoError(t, err) + assert.Nil(t, body) + assert.Nil(t, bodybytes) + + f1() +} + +func TestBodyExtractionUnknownBody(t *testing.T) { + + _, config := mockControlPlaneWithResourceBundle() + + registry := NewOpenPolicyAgentRegistry(WithMaxRequestBodyBytes(21), + WithReadBodyBufferSize(21), + WithMaxMemoryBodyParsing(21)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config)) + assert.NoError(t, err) + + inst, err := registry.NewOpenPolicyAgentInstance("use_body", *cfg, "testfilter") + assert.NoError(t, err) + + req1 := http.Request{ + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader([]byte(`{ "welcome": "world" }`))), + } + + _, _, f1, err := inst.ExtractHttpBodyOptionally(&req1) + assert.NoError(t, err) + + req2 := http.Request{ + ContentLength: 3, + Body: io.NopCloser(bytes.NewReader([]byte(`{ }`))), + } + + _, _, f2, err := inst.ExtractHttpBodyOptionally(&req2) + if assert.Error(t, err) { + assert.Equal(t, ErrTotalBodyBytesExceeded, err) + } + + f1() + f2() +} diff --git a/filters/openpolicyagent/response.go b/filters/openpolicyagent/response.go index df613b1716..7a35459bd1 100644 --- a/filters/openpolicyagent/response.go +++ b/filters/openpolicyagent/response.go @@ -16,6 +16,10 @@ func (opa *OpenPolicyAgentInstance) ServeInvalidDecisionError(fc filters.FilterC } func (opa *OpenPolicyAgentInstance) HandleInvalidDecisionError(fc filters.FilterContext, span opentracing.Span, result *envoyauth.EvalResult, err error, serve bool) { + opa.HandleEvaluationError(fc, span, result, err, serve, http.StatusInternalServerError) +} + +func (opa *OpenPolicyAgentInstance) HandleEvaluationError(fc filters.FilterContext, span opentracing.Span, result *envoyauth.EvalResult, err error, serve bool, status int) { fc.Metrics().IncCounter(opa.MetricsKey("decision.err")) span.SetTag("error", true) @@ -39,12 +43,12 @@ func (opa *OpenPolicyAgentInstance) HandleInvalidDecisionError(fc filters.Filter opa.Logger().WithFields(map[string]interface{}{ "err": err, - }).Info("Rejecting request because of an invalid decision") + }).Info("Rejecting request because of an error") } if serve { resp := http.Response{} - resp.StatusCode = http.StatusInternalServerError + resp.StatusCode = status fc.Serve(&resp) } diff --git a/skipper.go b/skipper.go index 3a6dc0344c..63c06f3367 100644 --- a/skipper.go +++ b/skipper.go @@ -912,11 +912,13 @@ type Options struct { // filters. LuaSources []string - EnableOpenPolicyAgent bool - OpenPolicyAgentConfigTemplate string - OpenPolicyAgentEnvoyMetadata string - OpenPolicyAgentCleanerInterval time.Duration - OpenPolicyAgentStartupTimeout time.Duration + EnableOpenPolicyAgent bool + OpenPolicyAgentConfigTemplate string + OpenPolicyAgentEnvoyMetadata string + OpenPolicyAgentCleanerInterval time.Duration + OpenPolicyAgentStartupTimeout time.Duration + OpenPolicyAgentMaxRequestBodySize int64 + OpenPolicyAgentMaxMemoryBodyParsing int64 } func (o *Options) KubernetesDataClientOptions() kubernetes.Options { @@ -1796,7 +1798,10 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { var opaRegistry *openpolicyagent.OpenPolicyAgentRegistry if o.EnableOpenPolicyAgent { - opaRegistry = openpolicyagent.NewOpenPolicyAgentRegistry(openpolicyagent.WithCleanInterval(o.OpenPolicyAgentCleanerInterval)) + opaRegistry := openpolicyagent.NewOpenPolicyAgentRegistry( + openpolicyagent.WithMaxRequestBodyBytes(o.OpenPolicyAgentMaxRequestBodySize), + openpolicyagent.WithMaxMemoryBodyParsing(o.OpenPolicyAgentMaxMemoryBodyParsing), + openpolicyagent.WithCleanInterval(o.OpenPolicyAgentCleanerInterval)) defer opaRegistry.Close() opts := make([]func(*openpolicyagent.OpenPolicyAgentInstanceConfig) error, 0) @@ -1809,7 +1814,9 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { o.CustomFilters = append(o.CustomFilters, opaauthorizerequest.NewOpaAuthorizeRequestSpec(opaRegistry, opts...), + opaauthorizerequest.NewOpaAuthorizeRequestWithBodySpec(opaRegistry, opts...), opaserveresponse.NewOpaServeResponseSpec(opaRegistry, opts...), + opaserveresponse.NewOpaServeResponseWithReqBodySpec(opaRegistry, opts...), ) }