From 6149968e522f52241826fce8a4ac37b06708cbf5 Mon Sep 17 00:00:00 2001 From: Josh Liburdi Date: Mon, 28 Oct 2024 09:23:12 -0700 Subject: [PATCH] feat(cmd): Add tap and read Commands to CLI Tool (#269) --- .github/workflows/code.yml | 2 +- .../workflows/code_jsonnet.sh | 3 +- build/scripts/config/format.sh | 7 - cmd/README.md | 6 +- cmd/development/substation-bench/main.go | 206 ---------- cmd/development/substation-file/main.go | 194 ---------- .../substation-kinesis-tap/README.md | 55 --- .../substation-kinesis-tap/config.jsonnet | 7 - cmd/substation/build.go | 4 +- cmd/substation/demo.go | 297 +------------- cmd/substation/fmt.go | 4 +- cmd/substation/main.go | 107 ++++- cmd/substation/playground.go | 47 +-- cmd/substation/playground.tmpl | 13 +- cmd/substation/read.go | 364 ++++++++++++++++++ .../main.go => substation/tap.go} | 172 ++++++--- cmd/substation/test.go | 5 +- cmd/substation/vet.go | 7 +- examples/condition/meta/config.jsonnet | 2 +- examples/condition/number/config.jsonnet | 2 +- examples/condition/string/config.jsonnet | 2 +- .../transform/aggregate/sample/config.jsonnet | 2 +- .../aggregate/summarize/config.jsonnet | 2 +- .../transform/array/extend/config.jsonnet | 2 +- .../transform/array/flatten/config.jsonnet | 2 +- .../array/flatten_deep/config.jsonnet | 2 +- examples/transform/array/group/config.jsonnet | 2 +- .../enrich/http_secret/config.jsonnet | 2 +- .../enrich/kvstore_csv/config.jsonnet | 2 +- .../enrich/kvstore_json/config.jsonnet | 2 +- .../enrich/kvstore_set_add/config.jsonnet | 2 +- examples/transform/enrich/mmdb/config.jsonnet | 2 +- .../transform/enrich/urlscan/config.jsonnet | 2 +- examples/transform/format/zip/config.jsonnet | 2 +- .../meta/crash_program/config.jsonnet | 2 +- .../meta/default_value/config.jsonnet | 2 +- .../meta/each_in_array/config.jsonnet | 2 +- .../meta/exactly_once_consumer/config.jsonnet | 2 +- .../meta/exactly_once_producer/config.jsonnet | 2 +- .../meta/exactly_once_system/config.jsonnet | 2 +- .../meta/execution_time/config.jsonnet | 2 +- .../meta/retry_with_backoff/config.jsonnet | 2 +- .../transform/number/clamp/config.jsonnet | 2 +- examples/transform/number/max/config.jsonnet | 2 +- examples/transform/number/min/config.jsonnet | 2 +- .../send/aux_transforms/config.jsonnet | 2 +- .../send/aws_s3_glacier/config.jsonnet | 4 +- examples/transform/send/batch/config.jsonnet | 2 +- .../transform/send/datadog/config.jsonnet | 21 +- examples/transform/send/splunk/config.jsonnet | 9 +- .../transform/send/sumologic/config.jsonnet | 6 +- .../transform/test/config_test/config.jsonnet | 2 +- .../time/str_conversion/config.jsonnet | 2 +- .../utility/generate_ctrl/config.jsonnet | 2 +- .../utility/message_bytes/config.jsonnet | 2 +- .../utility/message_count/config.jsonnet | 2 +- .../utility/message_freshness/config.jsonnet | 2 +- substation_test.jsonnet | 3 +- 58 files changed, 689 insertions(+), 924 deletions(-) rename build/scripts/config/compile.sh => .github/workflows/code_jsonnet.sh (63%) delete mode 100644 build/scripts/config/format.sh delete mode 100644 cmd/development/substation-bench/main.go delete mode 100644 cmd/development/substation-file/main.go delete mode 100644 cmd/development/substation-kinesis-tap/README.md delete mode 100644 cmd/development/substation-kinesis-tap/config.jsonnet create mode 100644 cmd/substation/read.go rename cmd/{development/substation-kinesis-tap/main.go => substation/tap.go} (54%) diff --git a/.github/workflows/code.yml b/.github/workflows/code.yml index e2fe4b43..7fde02d5 100644 --- a/.github/workflows/code.yml +++ b/.github/workflows/code.yml @@ -64,4 +64,4 @@ jobs: - name: Compiling run: | go install github.com/google/go-jsonnet/cmd/jsonnet@latest - sh build/scripts/config/compile.sh + sh .github/workflows/code_jsonnet.sh diff --git a/build/scripts/config/compile.sh b/.github/workflows/code_jsonnet.sh similarity index 63% rename from build/scripts/config/compile.sh rename to .github/workflows/code_jsonnet.sh index e09a9232..5655ea2d 100644 --- a/build/scripts/config/compile.sh +++ b/.github/workflows/code_jsonnet.sh @@ -5,5 +5,6 @@ for file in $files do # 'rev | cut | rev' converts "path/to/file.jsonnet" to "path/to/file.json" f=$(echo $file | rev | cut -c 4- | rev) - jsonnet $file > $f + # This is run from the root of the repo. + jsonnet --ext-code-file sub="./substation.libsonnet" $file > $f done diff --git a/build/scripts/config/format.sh b/build/scripts/config/format.sh deleted file mode 100644 index 90adb1a8..00000000 --- a/build/scripts/config/format.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/sh -files=$(find . \( -name *.jsonnet -o -name *.libsonnet \)) - -for file in $files -do - jsonnetfmt -i $file -done diff --git a/cmd/README.md b/cmd/README.md index 0a29d514..b880a2d1 100644 --- a/cmd/README.md +++ b/cmd/README.md @@ -1,11 +1,11 @@ # cmd -This directory contains applications that run Substation. Applications are organized by their deployment target (e.g., AWS Lambda) and the source of the data they process (e.g., file, http). +This directory contains applications that run Substation. ## aws/lambda/ Applications that run on AWS Lambda. -## development/ +## substation/ -Applications that are used for development. +The Substation CLI tool, which is used for managing configurations and local development. diff --git a/cmd/development/substation-bench/main.go b/cmd/development/substation-bench/main.go deleted file mode 100644 index 21a03ffa..00000000 --- a/cmd/development/substation-bench/main.go +++ /dev/null @@ -1,206 +0,0 @@ -// Benchmarks the performance of Substation by sending a configurable number of events -// through the system and reporting the total time taken, the number of events sent, the -// amount of data sent, and the rate of events and data sent per second. -package main - -import ( - "context" - "encoding/json" - "flag" - "fmt" - "os" - "runtime/pprof" - "time" - - "golang.org/x/sync/errgroup" - - "github.com/brexhq/substation/v2" - "github.com/brexhq/substation/v2/message" - - "github.com/brexhq/substation/v2/internal/bufio" - "github.com/brexhq/substation/v2/internal/channel" - "github.com/brexhq/substation/v2/internal/file" -) - -type options struct { - Count int - Concurrency int - ConfigFile string - DataFile string - pprofCPU bool - pprofMemory bool -} - -func main() { - var opts options - - flag.StringVar(&opts.DataFile, "file", "", "File to parse") - flag.IntVar(&opts.Count, "count", 100000, "Number of events to process (default: 100000)") - flag.IntVar(&opts.Concurrency, "concurrency", -1, "Number of concurrent data transformation functions to run (default: number of CPUs available)") - flag.StringVar(&opts.ConfigFile, "config", "", "Substation configuration file (default: empty config)") - flag.BoolVar(&opts.pprofCPU, "cpu", false, "Enable CPU profiling (default: false)") - flag.BoolVar(&opts.pprofMemory, "mem", false, "Enable memory profiling (default: false)") - flag.Parse() - - if opts.DataFile == "" { - fmt.Println("missing required flag -file") - os.Exit(1) - } - - ctx := context.Background() - - fmt.Printf("%s: Configuring Substation\n", time.Now().Format(time.RFC3339Nano)) - var conf []byte - // If no config file is provided, then an empty config is used. - if opts.ConfigFile != "" { - path, err := file.Get(ctx, opts.ConfigFile) - defer os.Remove(path) - - if err != nil { - panic(err) - } - - conf, err = os.ReadFile(path) - if err != nil { - panic(err) - } - } else { - conf = []byte(`{"transforms":[]}`) - } - - cfg := substation.Config{} - if err := json.Unmarshal(conf, &cfg); err != nil { - panic(err) - } - - sub, err := substation.New(ctx, cfg) - if err != nil { - panic(err) - } - - // Collect the sample data for the benchmark. - path, err := file.Get(ctx, opts.DataFile) - defer os.Remove(path) - - if err != nil { - panic(fmt.Errorf("file: %v", err)) - } - - f, err := os.Open(path) - if err != nil { - panic(fmt.Errorf("file: %v", err)) - } - defer f.Close() - - scanner := bufio.NewScanner() - defer scanner.Close() - - if err := scanner.ReadFile(f); err != nil { - panic(err) - } - - fmt.Printf("%s: Loading data into memory\n", time.Now().Format(time.RFC3339Nano)) - var data [][]byte - dataBytes := 0 - for scanner.Scan() { - b := []byte(scanner.Text()) - for i := 0; i < opts.Count; i++ { - data = append(data, b) - dataBytes += len(b) - } - } - - if err := scanner.Err(); err != nil { - panic(err) - } - - if opts.pprofCPU { - f, err := os.Create("./cpu.prof") - if err != nil { - panic(err) - } - defer f.Close() // error handling omitted for example - if err := pprof.StartCPUProfile(f); err != nil { - panic(err) - } - defer pprof.StopCPUProfile() - } - - fmt.Printf("%s: Starting benchmark\n", time.Now().Format(time.RFC3339Nano)) - start := time.Now() - ch := channel.New[*message.Message]() - group, ctx := errgroup.WithContext(ctx) - - group.Go(func() error { - tfGroup, tfCtx := errgroup.WithContext(ctx) - tfGroup.SetLimit(opts.Concurrency) - - for message := range ch.Recv() { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - msg := message - tfGroup.Go(func() error { - if _, err := sub.Transform(tfCtx, msg); err != nil { - return err - } - - return nil - }) - } - - if err := tfGroup.Wait(); err != nil { - return err - } - - // ctrl messages flush the pipeline. This must be done - // after all messages have been processed. - ctrl := message.New().AsControl() - if _, err := sub.Transform(ctx, ctrl); err != nil { - return err - } - - return nil - }) - - group.Go(func() error { - defer ch.Close() - - for _, b := range data { - msg := message.New().SetData(b).SkipMissingValues() - ch.Send(msg) - } - - return nil - }) - - // Wait for all goroutines to complete. This includes the goroutines that are - // executing the transform functions. - if err := group.Wait(); err != nil { - panic(err) - } - - fmt.Printf("%s: Ending benchmark\n", time.Now().Format(time.RFC3339Nano)) - - // The benchmark reports the total time taken, the number of events sent, the - // amount of data sent, and the rate of events and data sent per second. - elapsed := time.Since(start) - fmt.Printf("\nBenchmark results:\n") - fmt.Printf("- %d events in %s\n", len(data), elapsed) - fmt.Printf("- %.2f events per second\n", float64(len(data))/elapsed.Seconds()) - fmt.Printf("- %d MB in %s\n", dataBytes/1000/1000, elapsed) - fmt.Printf("- %.2f MB per second\n", float64(dataBytes)/1000/1000/elapsed.Seconds()) - - if opts.pprofMemory { - heap, err := os.Create("./heap.prof") - if err != nil { - panic(err) - } - if err := pprof.WriteHeapProfile(heap); err != nil { - panic(err) - } - } -} diff --git a/cmd/development/substation-file/main.go b/cmd/development/substation-file/main.go deleted file mode 100644 index 96ef9052..00000000 --- a/cmd/development/substation-file/main.go +++ /dev/null @@ -1,194 +0,0 @@ -package main - -import ( - "bytes" - "context" - "encoding/json" - "flag" - "fmt" - "io" - "os" - "slices" - "time" - - "golang.org/x/sync/errgroup" - - "github.com/brexhq/substation/v2" - "github.com/brexhq/substation/v2/internal/bufio" - "github.com/brexhq/substation/v2/internal/channel" - "github.com/brexhq/substation/v2/internal/file" - "github.com/brexhq/substation/v2/internal/media" - "github.com/brexhq/substation/v2/message" -) - -type options struct { - File string - Config string -} - -// getConfig contextually retrieves a Substation configuration. -func getConfig(ctx context.Context, cfg string) (io.Reader, error) { - path, err := file.Get(ctx, cfg) - defer os.Remove(path) - - if err != nil { - return nil, err - } - - conf, err := os.Open(path) - if err != nil { - return nil, err - } - defer conf.Close() - - buf := new(bytes.Buffer) - if _, err := io.Copy(buf, conf); err != nil { - return nil, err - } - - return buf, nil -} - -func main() { - var opts options - - timeout := flag.Duration("timeout", 10*time.Second, "Timeout in seconds") - flag.StringVar(&opts.File, "file", "", "File to parse") - flag.StringVar(&opts.Config, "config", "", "Substation configuration file") - flag.Parse() - - ctx, cancel := context.WithTimeout(context.Background(), *timeout) - defer cancel() - - if err := run(ctx, opts); err != nil { - panic(fmt.Errorf("main: %v", err)) - } -} - -func run(ctx context.Context, opts options) error { - c, err := getConfig(ctx, opts.Config) - if err != nil { - return err - } - - cfg := substation.Config{} - if err := json.NewDecoder(c).Decode(&cfg); err != nil { - return err - } - - sub, err := substation.New(ctx, cfg) - if err != nil { - return err - } - - ch := channel.New[*message.Message]() - group, ctx := errgroup.WithContext(ctx) - - group.Go(func() error { - tfGroup, tfCtx := errgroup.WithContext(ctx) - tfGroup.SetLimit(1) - - for message := range ch.Recv() { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - msg := message - tfGroup.Go(func() error { - if _, err := sub.Transform(tfCtx, msg); err != nil { - return err - } - - return nil - }) - } - - if err := tfGroup.Wait(); err != nil { - return err - } - - // CTRL messages flush the pipeline. This must be done - // after all messages have been processed. - ctrl := message.New().AsControl() - if _, err := sub.Transform(tfCtx, ctrl); err != nil { - return err - } - - return nil - }) - - // Data ingest. - group.Go(func() error { - defer ch.Close() - - fi, err := file.Get(ctx, opts.File) - if err != nil { - return err - } - defer os.Remove(fi) - - f, err := os.Open(fi) - if err != nil { - return err - } - defer f.Close() - - mediaType, err := media.File(f) - if err != nil { - return err - } - - if _, err := f.Seek(0, 0); err != nil { - return err - } - - // Unsupported media types are sent as binary data. - if !slices.Contains(bufio.MediaTypes, mediaType) { - r, err := io.ReadAll(f) - if err != nil { - return err - } - - msg := message.New().SetData(r).SkipMissingValues() - ch.Send(msg) - - return nil - } - - scanner := bufio.NewScanner() - defer scanner.Close() - - if err := scanner.ReadFile(f); err != nil { - return err - } - - for scanner.Scan() { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - b := []byte(scanner.Text()) - msg := message.New().SetData(b).SkipMissingValues() - - ch.Send(msg) - } - - if err := scanner.Err(); err != nil { - return err - } - - return nil - }) - - // Wait for all goroutines to complete. This includes the goroutines that are - // executing the transform functions. - if err := group.Wait(); err != nil { - return err - } - - return nil -} diff --git a/cmd/development/substation-kinesis-tap/README.md b/cmd/development/substation-kinesis-tap/README.md deleted file mode 100644 index 858592bc..00000000 --- a/cmd/development/substation-kinesis-tap/README.md +++ /dev/null @@ -1,55 +0,0 @@ -# substation-kinesis-tap - -`substation-kinesis-tap` is a tool for tapping into and transforming data from an AWS Kinesis Data Stream in real-time with Substation. - -This is intended as a Substation development aid, but it has other uses as well, such as: -- Previewing live data in a stream by printing it to the console (default behavior) -- Sampling live data from a stream and saving it to a local file -- Forwarding live data between data pipeline stages for testing - -Warning: This is a development tool intended to provide temporary access to live data in a Kinesis Data Stream; if you need to process data from a Kinesis Data Stream with strict reliability guarantees, use the [AWS Lambda applications](/cmd/aws/lambda/). - -## Usage - -``` -% ./substation-kinesis-tap -h -Usage of ./substation: - -config string - The Substation configuration file used to transform records (default "./config.json") - -stream-name string - The AWS Kinesis Data Stream to fetch records from - -stream-offset string - Determines the offset of the stream (earliest, latest) (default "earliest") -``` - -Use the `SUBSTATION_DEBUG=1` environment variable to enable debug logging: -``` -% SUBSTATION_DEBUG=1 ./substation-kinesis-tap -stream-name my-stream -DEBU[0000] Retrieved active shards from Kinesis stream. count=2 stream=my-stream -DEBU[0001] Retrieved records from Kinesis shard. count=981 shard=0x140004a6f80 stream=my-stream -DEBU[0002] Retrieved records from Kinesis shard. count=1055 shard=0x140004a6fe0 stream=my-stream -DEBU[0003] Retrieved records from Kinesis shard. count=2333 shard=0x140004a6f80 stream=my-stream -DEBU[0003] Retrieved records from Kinesis shard. count=1110 shard=0x140004a6fe0 stream=my-stream -DEBU[0004] Retrieved records from Kinesis shard. count=2109 shard=0x140004a6f80 stream=my-stream -DEBU[0004] Retrieved records from Kinesis shard. count=1094 shard=0x140004a6fe0 stream=my-stream -^CDEBU[0004] Closed connections to the Kinesis stream. -DEBU[0004] Closed Substation pipeline. -DEBU[0004] Flushed Substation pipeline. -``` - -## Build - -Download, configure, and build the `substation-kinesis-tap` binary with these commands: - -``` -git clone https://github.com/brexhq/substation.git && \ -cd substation/cmd/development/substation-kinesis-tap && \ -jsonnet config.jsonnet > config.json && \ -go build . -``` - -## Authentication - -`substation-kinesis-tap` uses the AWS SDK for Go to authenticate with AWS. The SDK uses the same authentication methods as the AWS CLI, so you can use the same environment variables or configuration files to authenticate. - -For more information, see the [AWS CLI documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html). diff --git a/cmd/development/substation-kinesis-tap/config.jsonnet b/cmd/development/substation-kinesis-tap/config.jsonnet deleted file mode 100644 index 94c97c51..00000000 --- a/cmd/development/substation-kinesis-tap/config.jsonnet +++ /dev/null @@ -1,7 +0,0 @@ -local sub = import '../../../substation.libsonnet'; - -{ - transforms: [ - sub.tf.send.stdout(), - ], -} diff --git a/cmd/substation/build.go b/cmd/substation/build.go index 1971a32e..407d7e01 100644 --- a/cmd/substation/build.go +++ b/cmd/substation/build.go @@ -12,6 +12,8 @@ func init() { rootCmd.AddCommand(buildCmd) buildCmd.PersistentFlags().BoolP("recursive", "R", false, "recursively build all files") buildCmd.PersistentFlags().StringToString("ext-str", nil, "set external variables") + buildCmd.Flags().SortFlags = false + buildCmd.PersistentFlags().SortFlags = false } var buildCmd = &cobra.Command{ @@ -35,7 +37,7 @@ and the current directory is used if no arg is provided.`, // Catches an edge case where the user is looking for help. if path == "help" { - fmt.Printf("warning: %q matched no files\n", path) + fmt.Printf("warning: use -h instead.\n") return nil } diff --git a/cmd/substation/demo.go b/cmd/substation/demo.go index 508291bb..c71fd2b7 100644 --- a/cmd/substation/demo.go +++ b/cmd/substation/demo.go @@ -16,289 +16,6 @@ func init() { rootCmd.AddCommand(demoCmd) } -const demoConf = ` -local sub = import '../../substation.libsonnet'; - -{ - transforms: [ - // Move the event to the 'event.original' field. - sub.tf.obj.cp({object: { source_key: '@this', target_key: 'meta event.original' }}), - sub.tf.obj.cp({object: { source_key: 'meta @this' }}), - - // Insert the hash of the original event into the 'event.hash' field. - sub.tf.hash.sha256({obj: { src: 'event.original', trg: 'event.hash'}}), - - // Insert the event dataset into the 'event.dataset' field. - sub.tf.obj.insert({obj: { trg: 'event.dataset' }, value: 'aws.cloudtrail'}), - - // Insert the kind of event into the 'event.kind' field. - sub.tf.obj.insert({obj: { trg: 'event.kind' }, value: 'event'}), - - // Insert the event category into the 'event.category' field. - sub.tf.obj.insert({obj: { trg: std.format('%s.-1', 'event.category') }, value: 'configuration'}), - - // Insert the event type into the 'event.type' field. - sub.tf.obj.insert({obj: { trg: std.format('%s.-1', 'event.type') }, value: 'change'}), - - // Insert the outcome into the 'event.outcome' field. - sub.tf.meta.switch({ cases: [ - { - condition: sub.cnd.num.len.gt({ obj: { src: 'errorCode' }, value: 0 }), - transforms: [ - sub.tf.obj.insert({ obj: { trg: 'event.outcome' }, value: 'failure' }), - ], - }, - { - transforms: [ - sub.tf.obj.insert({ obj: { trg: 'event.outcome' }, value: 'success' }), - ], - }, - ] }), - - // Copy the event time to the '@timestamp' field. - sub.tf.obj.cp({obj: { src: 'event.original.eventTime', trg: '\\@timestamp' }}), - - // Copy the IP address to the 'source.ip' field. - sub.tf.obj.cp({obj: { src: 'event.original.sourceIPAddress', trg: 'source.ip' }}), - - // Copy the user agent to the 'user_agent.original' field. - sub.tf.obj.cp({obj: { src: 'event.original.userAgent', trg: 'user_agent.original' }}), - - // Copy the region to the 'cloud.region' field. - sub.tf.obj.cp({obj: { src: 'event.original.awsRegion', trg: 'cloud.region' }}), - - // Copy the account ID to the 'cloud.account.id' field. - sub.tf.obj.cp({obj: { src: 'event.original.userIdentity.accountId', trg: 'cloud.account.id' }}), - - // Add the cloud service provider to the 'cloud.provider' field. - sub.tf.obj.insert({obj: { trg: 'cloud.provider' }, value: 'aws'}), - - // Extract the cloud service into the 'cloud.service.name' field. - sub.tf.str.capture({obj: { src: 'event.original.eventSource', trg: 'cloud.service.name' }, pattern: '^(.*)\\.amazonaws\\.com$'}), - - // Make the event pretty before printing to the console. - sub.tf.obj.cp({obj: { src: '@this|@pretty' }}), - sub.tf.send.stdout(), - ], -} -` - -const demoCompiled = ` -{ - "transforms": [ - { - "settings": { - "id": "2bbe3748-28c56e0b", - "object": { - "source_key": "@this", - "target_key": "meta event.original" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "2bbe3748-61e51827", - "object": { - "source_key": "meta @this" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "324f1035-f49e5682", - "object": { - "source_key": "event.original", - "target_key": "event.hash" - } - }, - "type": "hash_sha256" - }, - { - "settings": { - "id": "5f4ae672-0478e109", - "object": { - "target_key": "event.dataset" - }, - "value": "aws.cloudtrail" - }, - "type": "object_insert" - }, - { - "settings": { - "id": "5f4ae672-7de9f731", - "object": { - "target_key": "event.kind" - }, - "value": "event" - }, - "type": "object_insert" - }, - { - "settings": { - "id": "5f4ae672-2c1fa54f", - "object": { - "target_key": "event.category.-1" - }, - "value": "configuration" - }, - "type": "object_insert" - }, - { - "settings": { - "id": "5f4ae672-e97ed8b8", - "object": { - "target_key": "event.type.-1" - }, - "value": "change" - }, - "type": "object_insert" - }, - { - "settings": { - "cases": [ - { - "condition": { - "settings": { - "measurement": "byte", - "object": { - "source_key": "errorCode" - }, - "value": 0 - }, - "type": "number_length_greater_than" - }, - "transforms": [ - { - "settings": { - "id": "5f4ae672-c3cc893e", - "object": { - "target_key": "event.outcome" - }, - "value": "failure" - }, - "type": "object_insert" - } - ] - }, - { - "transforms": [ - { - "settings": { - "id": "5f4ae672-87ff6d17", - "object": { - "target_key": "event.outcome" - }, - "value": "success" - }, - "type": "object_insert" - } - ] - } - ], - "id": "b3a47dd1-fddb5674" - }, - "type": "meta_switch" - }, - { - "settings": { - "id": "2bbe3748-e3640864", - "object": { - "source_key": "event.original.eventTime", - "target_key": "\\@timestamp" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "2bbe3748-63faf2a6", - "object": { - "source_key": "event.original.sourceIPAddress", - "target_key": "source.ip" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "2bbe3748-3b7dfda5", - "object": { - "source_key": "event.original.userAgent", - "target_key": "user_agent.original" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "2bbe3748-626bded4", - "object": { - "source_key": "event.original.awsRegion", - "target_key": "cloud.region" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "2bbe3748-061dfac7", - "object": { - "source_key": "event.original.userIdentity.accountId", - "target_key": "cloud.account.id" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "5f4ae672-5c9e5d3a", - "object": { - "target_key": "cloud.provider" - }, - "value": "aws" - }, - "type": "object_insert" - }, - { - "settings": { - "count": 0, - "id": "e3bd5484-53bd3692", - "object": { - "source_key": "event.original.eventSource", - "target_key": "cloud.service.name" - }, - "pattern": "^(.*)\\.amazonaws\\.com$" - }, - "type": "string_capture" - }, - { - "settings": { - "id": "2bbe3748-15552062", - "object": { - "source_key": "@this|@pretty" - } - }, - "type": "object_copy" - }, - { - "settings": { - "batch": { - "count": 1000, - "duration": "1m", - "size": 1000000 - }, - "id": "de19b3c9-67c1890d" - }, - "type": "send_stdout" - } - ] -} -` - -const demoEvt = `{"eventVersion":"1.08","userIdentity":{"type":"IAMUser","principalId":"EXAMPLE123456789","arn":"arn:aws:iam::123456789012:user/Alice","accountId":"123456789012","accessKeyId":"ASIAEXAMPLE123","sessionContext":{"attributes":{"mfaAuthenticated":"false","creationDate":"2024-10-01T12:00:00Z"},"sessionIssuer":{"type":"AWS","principalId":"EXAMPLE123456","arn":"arn:aws:iam::123456789012:role/Admin","accountId":"123456789012","userName":"Admin"}}},"eventTime":"2024-10-01T12:30:45Z","eventSource":"s3.amazonaws.com","eventName":"PutBucketPolicy","awsRegion":"us-west-2","sourceIPAddress":"203.0.113.0","userAgent":"aws-sdk-python/1.0.0 Python/3.8.0 Linux/4.15.0","requestParameters":{"bucketName":"example-bucket","policy":"{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":\"*\",\"Action\":\"s3:GetObject\",\"Resource\":\"arn:aws:s3:::example-bucket/*\"}]}"}},"responseElements":{"location":"http://example-bucket.s3.amazonaws.com/"},"requestID":"EXAMPLE123456789","eventID":"EXAMPLE-1-2-3-4-5-6","readOnly":false,"resources":[{"ARN":"arn:aws:s3:::example-bucket","accountId":"123456789012","type":"AWS::S3::Bucket"}],"eventType":"AwsApiCall","managementEvent":true,"recipientAccountId":"123456789012"}` - var demoCmd = &cobra.Command{ Use: "demo", Short: "demo substation", @@ -313,9 +30,13 @@ partially normalized to the Elastic Common Schema (ECS). `, Args: cobra.MaximumNArgs(0), RunE: func(cmd *cobra.Command, args []string) error { - cfg := substation.Config{} + conf, err := compileStr(confDemo, nil) + if err != nil { + return err + } - if err := json.Unmarshal([]byte(demoCompiled), &cfg); err != nil { + cfg := substation.Config{} + if err := json.Unmarshal([]byte(conf), &cfg); err != nil { return err } @@ -326,19 +47,19 @@ partially normalized to the Elastic Common Schema (ECS). } msgs := []*message.Message{ - message.New().SetData([]byte(demoEvt)).SkipMissingValues(), + message.New().SetData([]byte(evtDemo)).SkipMissingValues(), message.New().AsControl(), } // Make the input pretty before printing to the console. - fmt.Printf("input:\n%s\n", gjson.Get(demoEvt, "@this|@pretty").String()) + fmt.Printf("input:\n%s\n", gjson.Get(evtDemo, "@this|@pretty").String()) fmt.Printf("output:\n") if _, err := sub.Transform(ctx, msgs...); err != nil { return err } - fmt.Printf("\nconfig:%s\n", demoConf) + fmt.Printf("\nconfig:\n%s\n", confDemo) return nil }, diff --git a/cmd/substation/fmt.go b/cmd/substation/fmt.go index e6ec8bb7..24b9a04b 100644 --- a/cmd/substation/fmt.go +++ b/cmd/substation/fmt.go @@ -13,6 +13,8 @@ func init() { rootCmd.AddCommand(fmtCmd) fmtCmd.PersistentFlags().BoolP("write", "w", false, "write result to (source) file instead of stdout") fmtCmd.PersistentFlags().BoolP("recursive", "R", false, "recursively format all files") + fmtCmd.Flags().SortFlags = false + fmtCmd.PersistentFlags().SortFlags = false } var fmtCmd = &cobra.Command{ @@ -40,7 +42,7 @@ Supported file extensions: .jsonnet, .libsonnet`, // Catches an edge case where the user is looking for help. if path == "help" { - fmt.Printf("warning: %q matched no files\n", path) + fmt.Printf("warning: use -h instead.\n") return nil } diff --git a/cmd/substation/main.go b/cmd/substation/main.go index 34a91bd1..61fc96ec 100644 --- a/cmd/substation/main.go +++ b/cmd/substation/main.go @@ -1,12 +1,15 @@ package main import ( + "io" "os" "path/filepath" "strings" "github.com/google/go-jsonnet" "github.com/spf13/cobra" + + "github.com/brexhq/substation/v2" ) var rootCmd = &cobra.Command{ @@ -14,6 +17,88 @@ var rootCmd = &cobra.Command{ Long: "'substation' is a tool for managing Substation configurations.", } +const ( + // confStdout is the default configuration used by + // read-like commands. It prints any results (<= 100MB) + // to stdout. + confStdout = `local sub = std.extVar('sub'); + +{ + transforms: [ + sub.tf.send.stdout({ batch: { size: 100000000000, count: 1 } }), + ], +}` + + // confDemo is a demo configuration for AWS CloudTrail. + confDemo = `// Every config must import the Substation library. +local sub = std.extVar('sub'); + +{ + transforms: [ + // Move the event to the 'event.original' field. + sub.tf.obj.cp({object: { source_key: '@this', target_key: 'meta event.original' }}), + sub.tf.obj.cp({object: { source_key: 'meta @this' }}), + + // Insert the hash of the original event into the 'event.hash' field. + sub.tf.hash.sha256({obj: { src: 'event.original', trg: 'event.hash'}}), + + // Insert the event dataset into the 'event.dataset' field. + sub.tf.obj.insert({obj: { trg: 'event.dataset' }, value: 'aws.cloudtrail'}), + + // Insert the kind of event into the 'event.kind' field. + sub.tf.obj.insert({obj: { trg: 'event.kind' }, value: 'event'}), + + // Insert the event category into the 'event.category' field. + sub.tf.obj.insert({obj: { trg: std.format('%s.-1', 'event.category') }, value: 'configuration'}), + + // Insert the event type into the 'event.type' field. + sub.tf.obj.insert({obj: { trg: std.format('%s.-1', 'event.type') }, value: 'change'}), + + // Insert the outcome into the 'event.outcome' field. + sub.tf.meta.switch({ cases: [ + { + condition: sub.cnd.num.len.gt({ obj: { src: 'errorCode' }, value: 0 }), + transforms: [ + sub.tf.obj.insert({ obj: { trg: 'event.outcome' }, value: 'failure' }), + ], + }, + { + transforms: [ + sub.tf.obj.insert({ obj: { trg: 'event.outcome' }, value: 'success' }), + ], + }, + ] }), + + // Copy the event time to the '@timestamp' field. + sub.tf.obj.cp({obj: { src: 'event.original.eventTime', trg: '\\@timestamp' }}), + + // Copy the IP address to the 'source.ip' field. + sub.tf.obj.cp({obj: { src: 'event.original.sourceIPAddress', trg: 'source.ip' }}), + + // Copy the user agent to the 'user_agent.original' field. + sub.tf.obj.cp({obj: { src: 'event.original.userAgent', trg: 'user_agent.original' }}), + + // Copy the region to the 'cloud.region' field. + sub.tf.obj.cp({obj: { src: 'event.original.awsRegion', trg: 'cloud.region' }}), + + // Copy the account ID to the 'cloud.account.id' field. + sub.tf.obj.cp({obj: { src: 'event.original.userIdentity.accountId', trg: 'cloud.account.id' }}), + + // Add the cloud service provider to the 'cloud.provider' field. + sub.tf.obj.insert({obj: { trg: 'cloud.provider' }, value: 'aws'}), + + // Extract the cloud service into the 'cloud.service.name' field. + sub.tf.str.capture({obj: { src: 'event.original.eventSource', trg: 'cloud.service.name' }, pattern: '^(.*)\\.amazonaws\\.com$'}), + + // Make the event pretty before printing to the console. + sub.tf.obj.cp({obj: { src: '@this|@pretty' }}), + sub.tf.send.stdout(), + ], +}` + + evtDemo = `{"eventVersion":"1.08","userIdentity":{"type":"IAMUser","principalId":"EXAMPLE123456789","arn":"arn:aws:iam::123456789012:user/Alice","accountId":"123456789012","accessKeyId":"ASIAEXAMPLE123","sessionContext":{"attributes":{"mfaAuthenticated":"false","creationDate":"2024-10-01T12:00:00Z"},"sessionIssuer":{"type":"AWS","principalId":"EXAMPLE123456","arn":"arn:aws:iam::123456789012:role/Admin","accountId":"123456789012","userName":"Admin"}}},"eventTime":"2024-10-01T12:30:45Z","eventSource":"s3.amazonaws.com","eventName":"PutBucketPolicy","awsRegion":"us-west-2","sourceIPAddress":"203.0.113.0","userAgent":"aws-sdk-python/1.0.0 Python/3.8.0 Linux/4.15.0","requestParameters":{"bucketName":"example-bucket","policy":"{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":\"*\",\"Action\":\"s3:GetObject\",\"Resource\":\"arn:aws:s3:::example-bucket/*\"}]}"}},"responseElements":{"location":"http://example-bucket.s3.amazonaws.com/"},"requestID":"EXAMPLE123456789","eventID":"EXAMPLE-1-2-3-4-5-6","readOnly":false,"resources":[{"ARN":"arn:aws:s3:::example-bucket","accountId":"123456789012","type":"AWS::S3::Bucket"}],"eventType":"AwsApiCall","managementEvent":true,"recipientAccountId":"123456789012"}` +) + func init() { // Hides the 'completion' command. rootCmd.AddCommand(&cobra.Command{ @@ -30,13 +115,31 @@ func init() { } // compileFile returns JSON from a Jsonnet file. -func compileFile(f string, extVars map[string]string) (string, error) { +func compileFile(fi string, extVars map[string]string) (string, error) { + f, err := os.Open(fi) + if err != nil { + return "", err + } + defer f.Close() + + s, err := io.ReadAll(f) + if err != nil { + return "", err + } + + return compileStr(string(s), extVars) +} + +// compileStr returns JSON from a Jsonnet string. +func compileStr(s string, extVars map[string]string) (string, error) { vm := jsonnet.MakeVM() + vm.ExtCode("sub", substation.Library) + for k, v := range extVars { vm.ExtVar(k, v) } - res, err := vm.EvaluateFile(f) + res, err := vm.EvaluateAnonymousSnippet("snippet", s) if err != nil { return "", err } diff --git a/cmd/substation/playground.go b/cmd/substation/playground.go index 3c9dde96..77c58176 100644 --- a/cmd/substation/playground.go +++ b/cmd/substation/playground.go @@ -18,12 +18,12 @@ import ( "syscall" "time" + "github.com/google/go-jsonnet/formatter" + "github.com/spf13/cobra" + "github.com/brexhq/substation/v2" "github.com/brexhq/substation/v2/condition" "github.com/brexhq/substation/v2/message" - "github.com/google/go-jsonnet" - "github.com/google/go-jsonnet/formatter" - "github.com/spf13/cobra" ) //go:embed playground.tmpl @@ -34,9 +34,9 @@ func init() { } var playgroundCmd = &cobra.Command{ - Use: "playground", + Use: "play", Short: "start playground", - Long: `'substation playground' starts a local HTTP server for testing Substation configurations.`, + Long: `'substation play' starts a local instance of the Substation playground.`, RunE: runPlayground, } @@ -124,7 +124,7 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { // If shared data is present, don't include environment variables if sharedData == "" { - data.DefaultEnv = "# Add environment variables here, one per line\n# Example: KEY=VALUE" + data.DefaultEnv = "# Example: KEY=VALUE" } tmpl := template.Must(template.New("index").Parse(playgroundHTML)) @@ -137,11 +137,9 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { func handleDemo(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - cleanedDemoconf := strings.ReplaceAll(demoConf, "local sub = import '../../substation.libsonnet';\n\n", "") - if err := json.NewEncoder(w).Encode(map[string]interface{}{ - "config": cleanedDemoconf, - "input": demoEvt, + "config": confDemo, + "input": evtDemo, }); err != nil { http.Error(w, fmt.Sprintf("Error encoding response: %v", err), http.StatusInternalServerError) } @@ -162,19 +160,14 @@ func handleTest(w http.ResponseWriter, r *http.Request) { return } - combinedConfig := fmt.Sprintf(`local sub = %s; - -%s`, substation.Library, request.Config) - - vm := jsonnet.MakeVM() - jsonString, err := vm.EvaluateAnonymousSnippet("", combinedConfig) + conf, err := compileStr(request.Config, nil) if err != nil { - http.Error(w, fmt.Sprintf("Error evaluating Jsonnet: %v", err), http.StatusBadRequest) + http.Error(w, fmt.Sprintf("Error compiling config: %v", err), http.StatusBadRequest) return } var cfg customConfig - if err := json.Unmarshal([]byte(jsonString), &cfg); err != nil { + if err := json.Unmarshal([]byte(conf), &cfg); err != nil { http.Error(w, fmt.Sprintf("Invalid configuration: %v", err), http.StatusBadRequest) return } @@ -237,7 +230,7 @@ func handleTest(w http.ResponseWriter, r *http.Request) { testPassed := true for _, msg := range tMsgs { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { continue } @@ -289,19 +282,14 @@ func handleRun(w http.ResponseWriter, r *http.Request) { return } - combinedConfig := fmt.Sprintf(`local sub = %s; - -%s`, substation.Library, request.Config) - - vm := jsonnet.MakeVM() - jsonString, err := vm.EvaluateAnonymousSnippet("", combinedConfig) + conf, err := compileStr(request.Config, nil) if err != nil { - http.Error(w, fmt.Sprintf("Error evaluating Jsonnet: %v", err), http.StatusBadRequest) + http.Error(w, fmt.Sprintf("Error compiling config: %v", err), http.StatusBadRequest) return } var cfg substation.Config - if err := json.Unmarshal([]byte(jsonString), &cfg); err != nil { + if err := json.Unmarshal([]byte(conf), &cfg); err != nil { http.Error(w, fmt.Sprintf("Invalid configuration: %v", err), http.StatusBadRequest) return } @@ -330,7 +318,7 @@ func handleRun(w http.ResponseWriter, r *http.Request) { var output []string for _, msg := range result { - if !msg.IsControl() { + if !msg.HasFlag(message.IsControl) { output = append(output, string(msg.Data())) } } @@ -402,7 +390,6 @@ func handleShare(w http.ResponseWriter, r *http.Request) { var request struct { Config string `json:"config"` Input string `json:"input"` - Output string `json:"output"` } if err := json.NewDecoder(r.Body).Decode(&request); err != nil { @@ -411,7 +398,7 @@ func handleShare(w http.ResponseWriter, r *http.Request) { } // Combine and encode the data - combined := request.Config + "{substation-separator}" + request.Input + "{substation-separator}" + request.Output + combined := request.Config + "{substation-separator}" + request.Input + "{substation-separator}" encoded := base64.URLEncoding.EncodeToString([]byte(combined)) // Create the shareable URL diff --git a/cmd/substation/playground.tmpl b/cmd/substation/playground.tmpl index 0d78ae7a..c1e6e3f6 100644 --- a/cmd/substation/playground.tmpl +++ b/cmd/substation/playground.tmpl @@ -318,8 +318,7 @@

- Run your configuration, test it, or try a demo. - View examples + Run, test, or share a config. New to Substation? Try the demo to see how it works. Need inspiration? View more examples here.

@@ -330,7 +329,7 @@

Configuration

-

Configure the transformations to be applied to the input event.

+

Configure the transforms.

@@ -342,7 +341,7 @@ -

Paste the message data to be processed by Substation here.

+

Add message data.

@@ -353,7 +352,7 @@
-

The processed message data will appear here after running.

+

See the results.

@@ -366,7 +365,7 @@

Environment Variables

× -

Add environment variables here, one per line (KEY=VALUE). These will not be included when sharing.

+

Add environment variables, one per line (KEY=VALUE). These are not included when sharing the config.