diff --git a/cmd/substation/demo.go b/cmd/substation/demo.go new file mode 100644 index 00000000..0965035d --- /dev/null +++ b/cmd/substation/demo.go @@ -0,0 +1,136 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/brexhq/substation/v2" + "github.com/brexhq/substation/v2/message" + "github.com/google/go-jsonnet" + "github.com/spf13/cobra" + "github.com/tidwall/gjson" +) + +func init() { + rootCmd.AddCommand(demoCmd) +} + +const demoConf = ` +local sub = import '../../substation.libsonnet'; + +{ + transforms: [ + // Move the event to the 'event.original' field. + sub.tf.obj.cp({object: { source_key: '@this', target_key: 'meta event.original' }}), + sub.tf.obj.cp({object: { source_key: 'meta @this' }}), + + // Insert the hash of the original event into the 'event.hash' field. + sub.tf.hash.sha256({obj: { src: 'event.original', trg: 'event.hash'}}), + + // Insert the event dataset into the 'event.dataset' field. + sub.tf.obj.insert({obj: { trg: 'event.dataset' }, value: 'aws.cloudtrail'}), + + // Insert the kind of event into the 'event.kind' field. + sub.tf.obj.insert({obj: { trg: 'event.kind' }, value: 'event'}), + + // Insert the event category into the 'event.category' field. + sub.tf.obj.insert({obj: { trg: std.format('%s.-1', 'event.category') }, value: 'configuration'}), + + // Insert the event type into the 'event.type' field. + sub.tf.obj.insert({obj: { trg: std.format('%s.-1', 'event.type') }, value: 'change'}), + + // Insert the outcome into the 'event.outcome' field. + sub.tf.meta.switch({ cases: [ + { + condition: sub.cnd.num.len.gt({ obj: { src: 'errorCode' }, value: 0 }), + transforms: [ + sub.tf.obj.insert({ obj: { trg: 'event.outcome' }, value: 'failure' }), + ], + }, + { + transforms: [ + sub.tf.obj.insert({ obj: { trg: 'event.outcome' }, value: 'success' }), + ], + }, + ] }), + + // Copy the event time to the '@timestamp' field. + sub.tf.obj.cp({obj: { src: 'event.original.eventTime', trg: '\\@timestamp' }}), + + // Copy the IP address to the 'source.ip' field. + sub.tf.obj.cp({obj: { src: 'event.original.sourceIPAddress', trg: 'source.ip' }}), + + // Copy the user agent to the 'user_agent.original' field. + sub.tf.obj.cp({obj: { src: 'event.original.userAgent', trg: 'user_agent.original' }}), + + // Copy the region to the 'cloud.region' field. + sub.tf.obj.cp({obj: { src: 'event.original.awsRegion', trg: 'cloud.region' }}), + + // Copy the account ID to the 'cloud.account.id' field. + sub.tf.obj.cp({obj: { src: 'event.original.userIdentity.accountId', trg: 'cloud.account.id' }}), + + // Add the cloud service provider to the 'cloud.provider' field. + sub.tf.obj.insert({obj: { trg: 'cloud.provider' }, value: 'aws'}), + + // Extract the cloud service into the 'cloud.service.name' field. + sub.tf.str.capture({obj: { src: 'event.original.eventSource', trg: 'cloud.service.name' }, pattern: '^(.*)\\.amazonaws\\.com$'}), + + // Make the event pretty before printing to the console. + sub.tf.obj.cp({obj: { src: '@this|@pretty' }}), + sub.tf.send.stdout(), + ], +} +` + +var demoCmd = &cobra.Command{ + Use: "demo", + Short: "demo substation", + Long: `'substation demo' shows how Substation transforms data. +It prints an anonymized CloudTrail event (input) and the +transformed result (output) to the console. The event is +partially normalized to the Elastic Common Schema (ECS). +`, + // Examples: + // substation demo + Example: ` substation demo +`, + Args: cobra.MaximumNArgs(0), + RunE: func(cmd *cobra.Command, args []string) error { + cfg := substation.Config{} + + vm := jsonnet.MakeVM() + res, err := vm.EvaluateAnonymousSnippet("demo", demoConf) + if err != nil { + return err + } + + if err := json.Unmarshal([]byte(res), &cfg); err != nil { + return err + } + + ctx := context.Background() // This doesn't need to be canceled. + sub, err := substation.New(ctx, cfg) + if err != nil { + return err + } + + evt := `{"eventVersion":"1.08","userIdentity":{"type":"IAMUser","principalId":"EXAMPLE123456789","arn":"arn:aws:iam::123456789012:user/Alice","accountId":"123456789012","accessKeyId":"ASIAEXAMPLE123","sessionContext":{"attributes":{"mfaAuthenticated":"false","creationDate":"2024-10-01T12:00:00Z"},"sessionIssuer":{"type":"AWS","principalId":"EXAMPLE123456","arn":"arn:aws:iam::123456789012:role/Admin","accountId":"123456789012","userName":"Admin"}}},"eventTime":"2024-10-01T12:30:45Z","eventSource":"s3.amazonaws.com","eventName":"PutBucketPolicy","awsRegion":"us-west-2","sourceIPAddress":"203.0.113.0","userAgent":"aws-sdk-python/1.0.0 Python/3.8.0 Linux/4.15.0","requestParameters":{"bucketName":"example-bucket","policy":"{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":\"*\",\"Action\":\"s3:GetObject\",\"Resource\":\"arn:aws:s3:::example-bucket/*\"}]}"}},"responseElements":{"location":"http://example-bucket.s3.amazonaws.com/"},"requestID":"EXAMPLE123456789","eventID":"EXAMPLE-1-2-3-4-5-6","readOnly":false,"resources":[{"ARN":"arn:aws:s3:::example-bucket","accountId":"123456789012","type":"AWS::S3::Bucket"}],"eventType":"AwsApiCall","managementEvent":true,"recipientAccountId":"123456789012"}` + msgs := []*message.Message{ + message.New().SetData([]byte(evt)), + message.New().AsControl(), + } + + // Make the input pretty before printing to the console. + fmt.Printf("input:\n%s\n", gjson.Get(evt, "@this|@pretty").String()) + fmt.Printf("output:\n") + + if _, err := sub.Transform(ctx, msgs...); err != nil { + return err + } + + fmt.Printf("\nconfig:%s\n", demoConf) + + return nil + }, +} diff --git a/cmd/substation/main.go b/cmd/substation/main.go new file mode 100644 index 00000000..ba618d1f --- /dev/null +++ b/cmd/substation/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "os" + + "github.com/google/go-jsonnet" + "github.com/spf13/cobra" +) + +var rootCmd = &cobra.Command{ + Use: "substation", + Long: "'substation' is a tool for managing Substation configurations.", +} + +func init() { + // Hides the 'completion' command. + rootCmd.AddCommand(&cobra.Command{ + Use: "completion", + Short: "generate the autocompletion script for the specified shell", + Hidden: true, + }) + + // Hides the 'help' command. + rootCmd.SetHelpCommand(&cobra.Command{ + Use: "no-help", + Hidden: true, + }) +} + +// buildFile returns JSON from a Jsonnet file. +func buildFile(f string, extVars map[string]string) (string, error) { + vm := jsonnet.MakeVM() + for k, v := range extVars { + vm.ExtVar(k, v) + } + + res, err := vm.EvaluateFile(f) + if err != nil { + return "", err + } + + return res, nil +} + +func main() { + err := rootCmd.Execute() + if err != nil { + os.Exit(1) + } +} diff --git a/cmd/substation/test.go b/cmd/substation/test.go new file mode 100644 index 00000000..68dbb3df --- /dev/null +++ b/cmd/substation/test.go @@ -0,0 +1,360 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "time" + + "github.com/spf13/cobra" + + "github.com/brexhq/substation/v2" + "github.com/brexhq/substation/v2/condition" + "github.com/brexhq/substation/v2/config" + "github.com/brexhq/substation/v2/message" +) + +// customConfig wraps the Substation config with support for tests. +type customConfig struct { + substation.Config + + Tests []struct { + Name string `json:"name"` + Transforms []config.Config `json:"transforms"` + Condition config.Config `json:"condition"` + } `json:"tests"` +} + +func init() { + rootCmd.AddCommand(testCmd) + testCmd.PersistentFlags().BoolP("recursive", "R", false, "recursively test all files") + testCmd.PersistentFlags().StringToString("ext-str", nil, "set external variables") +} + +func fiConfig(f string) (customConfig, error) { + fi, err := os.Open(f) + if err != nil { + if err == io.EOF { + return customConfig{}, nil + } + + return customConfig{}, err + } + + cfg := customConfig{} + if err := json.NewDecoder(fi).Decode(&cfg); err != nil { + return customConfig{}, err + } + + return cfg, nil +} + +func memConfig(m string) (customConfig, error) { + cfg := customConfig{} + if err := json.Unmarshal([]byte(m), &cfg); err != nil { + return customConfig{}, err + } + + return cfg, nil +} + +func test(ctx context.Context, file string, cfg customConfig) error { + start := time.Now() + + // These configurations are not valid. + if len(cfg.Transforms) == 0 { + return nil + } + + if len(cfg.Tests) == 0 { + fmt.Printf("?\t%s\t[no tests]\n", file) + + return nil + } + + var failedFile bool // Tracks if any test in a file failed. + for _, test := range cfg.Tests { + // cnd asserts that the test is successful. + cnd, err := condition.New(ctx, test.Condition) + if err != nil { + fmt.Printf("FAIL\t%s\t[test error]\n", file) + + //nolint:nilerr // errors should not disrupt the test. + return nil + } + + // setup creates the test messages that are tested. + setup, err := substation.New(ctx, substation.Config{ + Transforms: test.Transforms, + }) + if err != nil { + fmt.Printf("?\t%s\t[test error]\n", file) + + //nolint:nilerr // errors should not disrupt the test. + return nil + } + + // tester contains the config that will be tested. + // This has to be done for each test to ensure + // that there is no state shared between tests. + tester, err := substation.New(ctx, cfg.Config) + if err != nil { + fmt.Printf("?\t%s\t[config error]\n", file) + + //nolint:nilerr // errors should not disrupt the test. + return nil + } + + sMsgs, err := setup.Transform(ctx, message.New().AsControl()) + if err != nil { + fmt.Printf("?\t%s\t[test error]\n", file) + + //nolint:nilerr // errors should not disrupt the test. + return nil + } + + tMsgs, err := tester.Transform(ctx, sMsgs...) + if err != nil { + fmt.Printf("?\t%s\t[config error]\n", file) + + //nolint:nilerr // errors should not disrupt the test. + return nil + } + + for _, msg := range tMsgs { + // Skip control messages because they contain no data. + if msg.IsControl() { + continue + } + + ok, err := cnd.Condition(ctx, msg) + if err != nil { + fmt.Printf("?\t%s\t[test error]\n", file) + + //nolint:nilerr // errors should not disrupt the test. + return nil + } + + if !ok { + fmt.Printf("%s\n%s\n%s\n", + fmt.Sprintf("--- FAIL: %s", test.Name), + fmt.Sprintf(" message:\t%s", msg), + fmt.Sprintf(" condition:\t%s", cnd), + ) + + failedFile = true + } + } + } + + if failedFile { + fmt.Printf("FAIL\t%s\t%s\t\n", file, time.Since(start).Round(time.Microsecond)) + } else { + fmt.Printf("ok\t%s\t%s\t\n", file, time.Since(start).Round(time.Microsecond)) + } + + return nil +} + +var testCmd = &cobra.Command{ + Use: "test [path to configs]", + Short: "test configs", + Long: `'substation test' runs all tests in a configuration file. +It prints a summary of the test results in the format: + + ok path/to/config1.json 220µs + ? path/to/config2.json [no tests] + FAIL path/to/config3.json 349µs + ... + +If the file is not already compiled, then it is compiled before +testing ('.jsonnet', '.libsonnet' files are compiled to JSON). +The 'recursive' flag can be used to test all files in a directory, +and the current directory is used if no arg is provided. + +Tests are executed individually against configured transforms. +Each test executes on user-defined messages and is considered +successful if a condition returns true for every message. + +For example, this config contains two tests: + +{ + tests: [ + { + name: 'my-passing-test', + // Generates the test message '{"a": true}' which + // is run through the configured transforms and + // then checked against the condition. + transforms: [ + sub.tf.test.message({ value: {a: true} }), + ], + // Checks if key 'x' == 'true'. + condition: sub.cnd.str.eq({ object: {source_key: 'x'}, value: 'true' }), + }, + { + name: 'my-failing-test', + transforms: [ + sub.tf.test.message({ value: {a: true} }), + ], + // Checks if key 'y' == 'true'. + condition: sub.cnd.str.eq({ object: {source_key: 'y'}, value: 'true' }), + }, + ], + // Copies the value of key 'a' to key 'x'. + transforms: [ + sub.tf.obj.cp({ object: { source_key: 'a', target_key: 'x' } }), + ], +} + +WARNING: It is not recommended to test any configs that mutate +production resources, such as any enrichment or send transforms. +`, + // Examples: + // substation test [-R] + // substation test [-R] /path/to/configs + // substation test /path/to/config.json + // substation test /path/to/config.jsonnet + // substation test /path/to/my.libsonnet + Example: ` substation test [-R] + substation test [-R] /path/to/configs + substation test /path/to/config.json + substation test /path/to/config.jsonnet + substation test /path/to/my.libsonnet +`, + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() // This doesn't need to be canceled. + + // 'test' defaults to the current directory. + arg, err := os.Getwd() + if err != nil { + return err + } + + if len(args) > 0 { + arg = args[0] + } + + // Catches an edge case where the user is looking for help. + if arg == "help" { + fmt.Printf("warning: \"%s\" matched no files\n", arg) + return nil + } + + fi, err := os.Stat(arg) + if err != nil { + return err + } + + // If the arg is a file, then test only that file. + if !fi.IsDir() { + var cfg customConfig + + switch filepath.Ext(arg) { + case ".jsonnet", ".libsonnet": + m, err := cmd.PersistentFlags().GetStringToString("ext-str") + if err != nil { + return err + } + + // If the Jsonnet cannot compile, then the file is invalid. + mem, err := buildFile(arg, m) + if err != nil { + fmt.Printf("?\t%s\t[config error]\n", arg) + + return nil + } + + cfg, err = memConfig(mem) + if err != nil { + return err + } + case ".json": + cfg, err = fiConfig(arg) + if err != nil { + return err + } + default: + fmt.Printf("warning: \"%s\" matched no files\n", arg) + } + + if err := test(ctx, arg, cfg); err != nil { + return err + } + + return nil + } + + var entries []string + // Walk to get all valid files in the directory. + // + // These are assumed to be Substation configuration files, + // and are validated before attempting to run tests. + if err := filepath.WalkDir(arg, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + + if filepath.Ext(path) == ".json" || + filepath.Ext(path) == ".jsonnet" || + filepath.Ext(path) == ".libsonnet" { + entries = append(entries, path) + } + + // Skip directories, except the one provided as an argument, if + // the 'recursive' flag is not set. + if d.IsDir() && path != arg && !cmd.Flag("recursive").Changed { + return filepath.SkipDir + } + + return nil + }); err != nil { + return err + } + + if len(entries) == 0 { + fmt.Printf("warning: \"%s\" matched no files\n", arg) + + return nil + } + + for _, entry := range entries { + var cfg customConfig + + switch filepath.Ext(entry) { + case ".jsonnet", ".libsonnet": + m, err := cmd.PersistentFlags().GetStringToString("ext-str") + if err != nil { + return err + } + + // If the Jsonnet cannot compile, then the file is invalid. + mem, err := buildFile(entry, m) + if err != nil { + fmt.Printf("?\t%s\t[config error]\n", entry) + + continue + } + + cfg, err = memConfig(mem) + if err != nil { + return err + } + case ".json": + cfg, err = fiConfig(entry) + if err != nil { + return err + } + } + + if err := test(ctx, entry, cfg); err != nil { + return err + } + } + + return nil + }, +} diff --git a/go.mod b/go.mod index c9f4a913..751718fa 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/klauspost/compress v1.17.9 github.com/oschwald/maxminddb-golang v1.13.0 github.com/sirupsen/logrus v1.9.3 + github.com/spf13/cobra v1.8.1 github.com/tidwall/gjson v1.17.1 github.com/tidwall/sjson v1.2.5 golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 @@ -57,10 +58,13 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect github.com/aws/smithy-go v1.20.4 // indirect + github.com/google/go-jsonnet v0.20.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/itchyny/timefmt-go v0.1.6 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect @@ -70,4 +74,6 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect google.golang.org/grpc v1.64.1 // indirect google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + sigs.k8s.io/yaml v1.1.0 // indirect ) diff --git a/go.sum b/go.sum index 68f77d0d..9e6f1ac5 100644 --- a/go.sum +++ b/go.sum @@ -81,6 +81,7 @@ github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20230808105340-e631fe742486 github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20230808105340-e631fe742486/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/brexhq/substation v1.7.1 h1:v5SwdexouuTLzLdwSVJ/RbkJyt+lN+2a5WOw+l11ULI= github.com/brexhq/substation v1.7.1/go.mod h1:4GFx9JFQVZX17xjA7fY0tMaLnsPzhVKb8YzLNeymm2I= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -95,6 +96,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-jsonnet v0.20.0 h1:WG4TTSARuV7bSm4PMB4ohjxe33IHT5WVTrJSU33uT4g= +github.com/google/go-jsonnet v0.20.0/go.mod h1:VbgWF9JX7ztlv770x/TolZNGGFfiHEVx9G6ca2eUmeA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= @@ -107,6 +110,8 @@ github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISH github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/itchyny/gojq v0.12.16 h1:yLfgLxhIr/6sJNVmYfQjTIv0jGctu6/DgDoivmxTr7g= github.com/itchyny/gojq v0.12.16/go.mod h1:6abHbdC2uB9ogMS38XsErnfqJ94UlngIJGlRAIj4jTM= github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q= @@ -127,8 +132,13 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= +github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= @@ -174,3 +184,5 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= +sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= diff --git a/substation.libsonnet b/substation.libsonnet index 8570f557..9ddfdde0 100644 --- a/substation.libsonnet +++ b/substation.libsonnet @@ -1102,6 +1102,18 @@ local helpers = { settings: std.prune(std.mergePatch(default, helpers.abbv(settings))), }, }, + test: { + message(settings={}): { + local type = 'test_message', + local default = { + id: helpers.id(type, settings), + value: null, + }, + + type: type, + settings: std.prune(std.mergePatch(default, helpers.abbv(settings))), + }, + }, time: { from: { str(settings={}): $.transform.time.from.string(settings=settings), diff --git a/transform/test_message.go b/transform/test_message.go new file mode 100644 index 00000000..90672314 --- /dev/null +++ b/transform/test_message.go @@ -0,0 +1,57 @@ +package transform + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/brexhq/substation/v2/config" + "github.com/brexhq/substation/v2/message" + + iconfig "github.com/brexhq/substation/v2/internal/config" +) + +type testMessageConfig struct { + Value interface{} `json:"value"` + + ID string `json:"id"` +} + +func (c *testMessageConfig) Decode(in interface{}) error { + return iconfig.Decode(in, c) +} + +func newTestMessage(_ context.Context, cfg config.Config) (*testMessage, error) { + conf := testMessageConfig{} + if err := conf.Decode(cfg.Settings); err != nil { + return nil, fmt.Errorf("transform test_message: %v", err) + } + + if conf.ID == "" { + conf.ID = "test_message" + } + + tf := testMessage{ + conf: conf, + } + + return &tf, nil +} + +type testMessage struct { + conf testMessageConfig +} + +func (tf *testMessage) Transform(_ context.Context, msg *message.Message) ([]*message.Message, error) { + if msg.IsControl() { + m := message.New().SetData(anyToBytes(tf.conf.Value)) + return []*message.Message{m, msg}, nil + } + + return []*message.Message{msg}, nil +} + +func (tf *testMessage) String() string { + b, _ := json.Marshal(tf.conf) + return string(b) +} diff --git a/transform/transform.go b/transform/transform.go index 7aa22479..b90fc6f2 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -177,6 +177,9 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { //nolint return newStringSplit(ctx, cfg) case "string_uuid": return newStringUUID(ctx, cfg) + // Test transforms. + case "test_message": + return newTestMessage(ctx, cfg) // Time transforms. case "time_from_string": return newTimeFromString(ctx, cfg)