diff --git a/config/config.go b/config/config.go index a64f379827..63db7d2977 100644 --- a/config/config.go +++ b/config/config.go @@ -282,6 +282,7 @@ type Config struct { OpenPolicyAgentConfigTemplate string `yaml:"open-policy-agent-config-template"` OpenPolicyAgentEnvoyMetadata string `yaml:"open-policy-agent-envoy-metadata"` OpenPolicyAgentCleanerInterval time.Duration `yaml:"open-policy-agent-cleaner-interval"` + OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"` } const ( @@ -498,7 +499,8 @@ func NewConfig() *Config { flag.BoolVar(&cfg.EnableOpenPolicyAgent, "enable-open-policy-agent", false, "enables Open Policy Agent filters") flag.StringVar(&cfg.OpenPolicyAgentConfigTemplate, "open-policy-agent-config-template", "", "file containing a template for an Open Policy Agent configuration file that is interpolated for each OPA filter instance") flag.StringVar(&cfg.OpenPolicyAgentEnvoyMetadata, "open-policy-agent-envoy-metadata", "", "JSON file containing meta-data passed as input for compatibility with Envoy policies in the format") - flag.DurationVar(&cfg.OpenPolicyAgentCleanerInterval, "open-policy-agent-cleaner-interval", openpolicyagent.DefaultCleanIdlePeriod, "JSON file containing meta-data passed as input for compatibility with Envoy policies in the format") + flag.DurationVar(&cfg.OpenPolicyAgentCleanerInterval, "open-policy-agent-cleaner-interval", openpolicyagent.DefaultCleanerInterval, "Duration in seconds to wait before cleaning up unused opa instances") + flag.DurationVar(&cfg.OpenPolicyAgentStartupTimeout, "open-policy-agent-startup-timeout", openpolicyagent.DefaultOpaStartupTimeout, "Maximum duration in seconds to wait for the open policy agent to start up") // TLS client certs flag.StringVar(&cfg.ClientKeyFile, "client-tls-key", "", "TLS Key file for backend connections, multiple keys may be given comma separated - the order must match the certs") @@ -903,6 +905,7 @@ func (c *Config) ToOptions() skipper.Options { OpenPolicyAgentConfigTemplate: c.OpenPolicyAgentConfigTemplate, OpenPolicyAgentEnvoyMetadata: c.OpenPolicyAgentEnvoyMetadata, OpenPolicyAgentCleanerInterval: c.OpenPolicyAgentCleanerInterval, + OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout, } for _, rcci := range c.CloneRoute { eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl) diff --git a/config/config_test.go b/config/config_test.go index c833717040..2eeecba8da 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -161,6 +161,7 @@ func defaultConfig() *Config { LuaModules: commaListFlag(), LuaSources: commaListFlag(), OpenPolicyAgentCleanerInterval: 10 * time.Second, + OpenPolicyAgentStartupTimeout: 30 * time.Second, } } diff --git a/filters/openpolicyagent/openpolicyagent.go b/filters/openpolicyagent/openpolicyagent.go index 38c360a675..3963a00269 100644 --- a/filters/openpolicyagent/openpolicyagent.go +++ b/filters/openpolicyagent/openpolicyagent.go @@ -23,7 +23,6 @@ import ( "github.com/open-policy-agent/opa/storage/inmem" iCache "github.com/open-policy-agent/opa/topdown/cache" opatracing "github.com/open-policy-agent/opa/tracing" - opautil "github.com/open-policy-agent/opa/util" "github.com/opentracing/opentracing-go" "google.golang.org/protobuf/encoding/protojson" @@ -36,7 +35,8 @@ import ( const ( defaultReuseDuration = 30 * time.Second defaultShutdownGracePeriod = 30 * time.Second - DefaultCleanIdlePeriod = 10 * time.Second + DefaultCleanerInterval = 10 * time.Second + DefaultOpaStartupTimeout = 30 * time.Second ) type OpenPolicyAgentRegistry struct { @@ -78,7 +78,7 @@ func WithCleanInterval(interval time.Duration) func(*OpenPolicyAgentRegistry) er func NewOpenPolicyAgentRegistry(opts ...func(*OpenPolicyAgentRegistry) error) *OpenPolicyAgentRegistry { registry := &OpenPolicyAgentRegistry{ reuseDuration: defaultReuseDuration, - cleanInterval: DefaultCleanIdlePeriod, + cleanInterval: DefaultCleanerInterval, instances: make(map[string]*OpenPolicyAgentInstance), lastused: make(map[*OpenPolicyAgentInstance]time.Time), quit: make(chan struct{}), @@ -96,6 +96,7 @@ func NewOpenPolicyAgentRegistry(opts ...func(*OpenPolicyAgentRegistry) error) *O type OpenPolicyAgentInstanceConfig struct { envoyMetadata *ext_authz_v3_core.Metadata configTemplate []byte + startupTimeout time.Duration } func WithConfigTemplate(configTemplate []byte) func(*OpenPolicyAgentInstanceConfig) error { @@ -144,12 +145,21 @@ func WithEnvoyMetadataFile(file string) func(*OpenPolicyAgentInstanceConfig) err } } +func WithStartupTimeout(timeout time.Duration) func(*OpenPolicyAgentInstanceConfig) error { + return func(cfg *OpenPolicyAgentInstanceConfig) error { + cfg.startupTimeout = timeout + return nil + } +} + func (cfg *OpenPolicyAgentInstanceConfig) GetEnvoyMetadata() *ext_authz_v3_core.Metadata { return cfg.envoyMetadata } func NewOpenPolicyAgentConfig(opts ...func(*OpenPolicyAgentInstanceConfig) error) (*OpenPolicyAgentInstanceConfig, error) { - cfg := OpenPolicyAgentInstanceConfig{} + cfg := OpenPolicyAgentInstanceConfig{ + startupTimeout: DefaultOpaStartupTimeout, + } for _, opt := range opts { if err := opt(&cfg); err != nil { @@ -285,14 +295,10 @@ func (registry *OpenPolicyAgentRegistry) newOpenPolicyAgentInstance(bundleName s return nil, err } - ctx := context.Background() - if err = engine.Start(ctx); err != nil { - return nil, err - } + ctx, cancel := context.WithTimeout(context.Background(), config.startupTimeout) + defer cancel() - err = engine.waitPluginsReady(100*time.Millisecond, 30*time.Second) - if err != nil { - engine.Logger().WithFields(map[string]interface{}{"err": err}).Error("Failed to wait for plugins activation.") + if err = engine.Start(ctx, config.startupTimeout); err != nil { return nil, err } @@ -308,6 +314,7 @@ type OpenPolicyAgentInstance struct { preparedQueryDoOnce *sync.Once interQueryBuiltinCache iCache.InterQueryCache once sync.Once + stopped bool } func envVariablesMap() map[string]string { @@ -343,7 +350,6 @@ func interpolateConfigTemplate(configTemplate []byte, bundleName string) ([]byte // New returns a new OPA object. func New(store storage.Store, configBytes []byte, instanceConfig OpenPolicyAgentInstanceConfig, filterName string, bundleName string) (*OpenPolicyAgentInstance, error) { id := uuid.New().String() - opaConfig, err := config.ParseConfig(configBytes, id) if err != nil { return nil, err @@ -382,19 +388,11 @@ func New(store storage.Store, configBytes []byte, instanceConfig OpenPolicyAgent // Start asynchronously starts the policy engine's plugins that download // policies, report status, etc. -func (opa *OpenPolicyAgentInstance) Start(ctx context.Context) error { - return opa.manager.Start(ctx) -} +func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error { + err := opa.manager.Start(ctx) -func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) { - opa.once.Do(func() { - opa.manager.Stop(ctx) - }) -} - -func (opa *OpenPolicyAgentInstance) waitPluginsReady(checkInterval, timeout time.Duration) error { - if timeout <= 0 { - return nil + if err != nil { + return err } // check readiness of all plugins @@ -407,9 +405,48 @@ func (opa *OpenPolicyAgentInstance) waitPluginsReady(checkInterval, timeout time return true } - opa.Logger().Debug("Waiting for plugins activation (%v).", timeout) + err = waitFunc(ctx, pluginsReady, 100*time.Millisecond) - return opautil.WaitFunc(pluginsReady, checkInterval, timeout) + if err != nil { + for pluginName, status := range opa.manager.PluginStatus() { + if status != nil && status.State != plugins.StateOK { + opa.Logger().WithFields(map[string]interface{}{ + "plugin_name": pluginName, + "plugin_state": status.State, + "error_message": status.Message, + }).Error("Open policy agent plugin did not start in time") + } + } + opa.Close(ctx) + return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err) + } + return nil +} + +func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) { + opa.once.Do(func() { + opa.manager.Stop(ctx) + opa.stopped = true + }) +} + +func waitFunc(ctx context.Context, fun func() bool, interval time.Duration) error { + if fun() { + return nil + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timed out while starting: %w", ctx.Err()) + case <-ticker.C: + if fun() { + return nil + } + } + } } func (opa *OpenPolicyAgentInstance) InstanceConfig() *OpenPolicyAgentInstanceConfig { diff --git a/filters/openpolicyagent/openpolicyagent_test.go b/filters/openpolicyagent/openpolicyagent_test.go index 4cbf9f5f2e..70c3621944 100644 --- a/filters/openpolicyagent/openpolicyagent_test.go +++ b/filters/openpolicyagent/openpolicyagent_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/open-policy-agent/opa/storage/inmem" "io" "net/http" "os" @@ -89,7 +90,49 @@ func TestLoadEnvoyMetadata(t *testing.T) { assert.Equal(t, expected, cfg.envoyMetadata) } -func mockControlPlane() (*opasdktest.Server, []byte) { +func mockControlPlaneWithDiscoveryBundle(discoveryBundle string) (*opasdktest.Server, []byte) { + opaControlPlane := opasdktest.MustNewServer( + opasdktest.MockBundle("/bundles/test", map[string]string{ + "main.rego": ` + package envoy.authz + + default allow = false + `, + }), + opasdktest.MockBundle("/bundles/discovery", map[string]string{ + "data.json": ` + {"discovery":{"bundles":{"bundles/test":{"persist":false,"resource":"bundles/test","service":"test"}}}} + `, + }), + opasdktest.MockBundle("/bundles/discovery-with-wrong-bundle", map[string]string{ + "data.json": ` + {"discovery":{"bundles":{"bundles/non-existing-bundle":{"persist":false,"resource":"bundles/non-existing-bundle","service":"test"}}}} + `, + }), + opasdktest.MockBundle("/bundles/discovery-with-parsing-error", map[string]string{ + "data.json": ` + {unparsable : json} + `, + }), + ) + + config := []byte(fmt.Sprintf(`{ + "services": { + "test": { + "url": %q + } + }, + "discovery": { + "name": "discovery", + "resource": %q, + "service": "test" + } + }`, opaControlPlane.URL(), discoveryBundle)) + + return opaControlPlane, config +} + +func mockControlPlaneWithResourceBundle() (*opasdktest.Server, []byte) { opaControlPlane := opasdktest.MustNewServer( opasdktest.MockBundle("/bundles/test", map[string]string{ "main.rego": ` @@ -130,7 +173,7 @@ func mockControlPlane() (*opasdktest.Server, []byte) { } func TestRegistry(t *testing.T) { - _, config := mockControlPlane() + _, config := mockControlPlaneWithResourceBundle() registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) @@ -182,8 +225,86 @@ func TestRegistry(t *testing.T) { assert.Error(t, err, "should not work after close") } +func TestOpaEngineStartFailureWithTimeout(t *testing.T) { + _, config := mockControlPlaneWithDiscoveryBundle("bundles/discovery-with-wrong-bundle") + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second)) + assert.NoError(t, err) + + engine, err := New(inmem.New(), config, *cfg, "testfilter", "test") + assert.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), cfg.startupTimeout) + defer cancel() + + err = engine.Start(ctx, cfg.startupTimeout) + assert.True(t, engine.stopped) + assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s") +} + +func TestOpaActivationSuccessWithDiscovery(t *testing.T) { + _, config := mockControlPlaneWithDiscoveryBundle("bundles/discovery") + + registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config)) + assert.NoError(t, err) + + instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") + assert.NotNil(t, instance) + assert.NoError(t, err) + assert.Equal(t, 1, len(registry.instances)) +} + +func TestOpaActivationFailureWithWrongServiceConfig(t *testing.T) { + configWithUnknownService := []byte(`{ + "discovery": { + "name": "discovery", + "resource": "discovery", + "service": "test" + }}`) + + registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(configWithUnknownService), WithStartupTimeout(1*time.Second)) + assert.NoError(t, err) + + instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") + assert.Nil(t, instance) + assert.Contains(t, err.Error(), "invalid configuration for discovery") + assert.Equal(t, 0, len(registry.instances)) +} + +func TestOpaActivationTimeOutWithDiscoveryPointingWrongBundle(t *testing.T) { + _, config := mockControlPlaneWithDiscoveryBundle("/bundles/discovery-with-wrong-bundle") + + registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second)) + assert.NoError(t, err) + + instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") + assert.Nil(t, instance) + assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded") + assert.Equal(t, 0, len(registry.instances)) +} + +func TestOpaActivationTimeOutWithDiscoveryParsingError(t *testing.T) { + _, config := mockControlPlaneWithDiscoveryBundle("/bundles/discovery-with-parsing-error") + + registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second)) + assert.NoError(t, err) + + instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") + assert.Nil(t, instance) + assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded") + assert.Equal(t, 0, len(registry.instances)) +} + func TestStartup(t *testing.T) { - _, config := mockControlPlane() + _, config := mockControlPlaneWithResourceBundle() registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) @@ -199,7 +320,7 @@ func TestStartup(t *testing.T) { } func TestTracing(t *testing.T) { - _, config := mockControlPlane() + _, config := mockControlPlaneWithResourceBundle() registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) @@ -222,7 +343,7 @@ func TestTracing(t *testing.T) { } func TestEval(t *testing.T) { - _, config := mockControlPlane() + _, config := mockControlPlaneWithResourceBundle() registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) @@ -250,7 +371,7 @@ func TestEval(t *testing.T) { } func TestResponses(t *testing.T) { - _, config := mockControlPlane() + _, config := mockControlPlaneWithResourceBundle() registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) diff --git a/go.mod b/go.mod index 0c62de8cb8..7c6b5526d5 100644 --- a/go.mod +++ b/go.mod @@ -73,6 +73,7 @@ require ( github.com/agnivade/levenshtein v1.1.1 // indirect github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/biter777/processex v0.0.0-20210102170504-01bb369eda71 // indirect github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect github.com/bytecodealliance/wasmtime-go/v3 v3.0.2 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect diff --git a/skipper.go b/skipper.go index e41ca73854..97ff66990f 100644 --- a/skipper.go +++ b/skipper.go @@ -916,6 +916,7 @@ type Options struct { OpenPolicyAgentConfigTemplate string OpenPolicyAgentEnvoyMetadata string OpenPolicyAgentCleanerInterval time.Duration + OpenPolicyAgentStartupTimeout time.Duration } func (o *Options) KubernetesDataClientOptions() kubernetes.Options { @@ -1798,7 +1799,9 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { defer opaRegistry.Close() opts := make([]func(*openpolicyagent.OpenPolicyAgentInstanceConfig) error, 0) - opts = append(opts, openpolicyagent.WithConfigTemplateFile(o.OpenPolicyAgentConfigTemplate)) + opts = append(opts, + openpolicyagent.WithConfigTemplateFile(o.OpenPolicyAgentConfigTemplate), + openpolicyagent.WithStartupTimeout(o.OpenPolicyAgentStartupTimeout)) if o.OpenPolicyAgentEnvoyMetadata != "" { opts = append(opts, openpolicyagent.WithEnvoyMetadataFile(o.OpenPolicyAgentEnvoyMetadata)) }