From c8910f258e3b1bccf300cadfc21e5277cde57b5e Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sat, 26 Oct 2024 09:05:45 -0700 Subject: [PATCH 01/12] feat(cmd): Add tap Command to CLI Tool --- .../substation-kinesis-tap/README.md | 55 ------ .../substation-kinesis-tap/config.jsonnet | 7 - .../main.go => substation/tap.go} | 173 ++++++++++++------ 3 files changed, 117 insertions(+), 118 deletions(-) delete mode 100644 cmd/development/substation-kinesis-tap/README.md delete mode 100644 cmd/development/substation-kinesis-tap/config.jsonnet rename cmd/{development/substation-kinesis-tap/main.go => substation/tap.go} (54%) diff --git a/cmd/development/substation-kinesis-tap/README.md b/cmd/development/substation-kinesis-tap/README.md deleted file mode 100644 index 858592bc..00000000 --- a/cmd/development/substation-kinesis-tap/README.md +++ /dev/null @@ -1,55 +0,0 @@ -# substation-kinesis-tap - -`substation-kinesis-tap` is a tool for tapping into and transforming data from an AWS Kinesis Data Stream in real-time with Substation. - -This is intended as a Substation development aid, but it has other uses as well, such as: -- Previewing live data in a stream by printing it to the console (default behavior) -- Sampling live data from a stream and saving it to a local file -- Forwarding live data between data pipeline stages for testing - -Warning: This is a development tool intended to provide temporary access to live data in a Kinesis Data Stream; if you need to process data from a Kinesis Data Stream with strict reliability guarantees, use the [AWS Lambda applications](/cmd/aws/lambda/). - -## Usage - -``` -% ./substation-kinesis-tap -h -Usage of ./substation: - -config string - The Substation configuration file used to transform records (default "./config.json") - -stream-name string - The AWS Kinesis Data Stream to fetch records from - -stream-offset string - Determines the offset of the stream (earliest, latest) (default "earliest") -``` - -Use the `SUBSTATION_DEBUG=1` environment variable to enable debug logging: -``` -% SUBSTATION_DEBUG=1 ./substation-kinesis-tap -stream-name my-stream -DEBU[0000] Retrieved active shards from Kinesis stream. count=2 stream=my-stream -DEBU[0001] Retrieved records from Kinesis shard. count=981 shard=0x140004a6f80 stream=my-stream -DEBU[0002] Retrieved records from Kinesis shard. count=1055 shard=0x140004a6fe0 stream=my-stream -DEBU[0003] Retrieved records from Kinesis shard. count=2333 shard=0x140004a6f80 stream=my-stream -DEBU[0003] Retrieved records from Kinesis shard. count=1110 shard=0x140004a6fe0 stream=my-stream -DEBU[0004] Retrieved records from Kinesis shard. count=2109 shard=0x140004a6f80 stream=my-stream -DEBU[0004] Retrieved records from Kinesis shard. count=1094 shard=0x140004a6fe0 stream=my-stream -^CDEBU[0004] Closed connections to the Kinesis stream. -DEBU[0004] Closed Substation pipeline. -DEBU[0004] Flushed Substation pipeline. -``` - -## Build - -Download, configure, and build the `substation-kinesis-tap` binary with these commands: - -``` -git clone https://github.com/brexhq/substation.git && \ -cd substation/cmd/development/substation-kinesis-tap && \ -jsonnet config.jsonnet > config.json && \ -go build . -``` - -## Authentication - -`substation-kinesis-tap` uses the AWS SDK for Go to authenticate with AWS. The SDK uses the same authentication methods as the AWS CLI, so you can use the same environment variables or configuration files to authenticate. - -For more information, see the [AWS CLI documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html). diff --git a/cmd/development/substation-kinesis-tap/config.jsonnet b/cmd/development/substation-kinesis-tap/config.jsonnet deleted file mode 100644 index 94c97c51..00000000 --- a/cmd/development/substation-kinesis-tap/config.jsonnet +++ /dev/null @@ -1,7 +0,0 @@ -local sub = import '../../../substation.libsonnet'; - -{ - transforms: [ - sub.tf.send.stdout(), - ], -} diff --git a/cmd/development/substation-kinesis-tap/main.go b/cmd/substation/tap.go similarity index 54% rename from cmd/development/substation-kinesis-tap/main.go rename to cmd/substation/tap.go index 19dfa6fe..09ee2f8e 100644 --- a/cmd/development/substation-kinesis-tap/main.go +++ b/cmd/substation/tap.go @@ -1,15 +1,11 @@ package main import ( - "bytes" "context" - "encoding/json" "errors" - "flag" "fmt" - "io" - "os" "os/signal" + "path/filepath" "runtime" "syscall" "time" @@ -18,6 +14,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/awslabs/kinesis-aggregation/go/v2/deaggregator" + "github.com/spf13/cobra" "golang.org/x/sync/errgroup" "github.com/brexhq/substation/v2" @@ -25,68 +22,132 @@ import ( "github.com/brexhq/substation/v2/internal/channel" iconfig "github.com/brexhq/substation/v2/internal/config" - "github.com/brexhq/substation/v2/internal/file" "github.com/brexhq/substation/v2/internal/log" ) -type options struct { - Config string +func init() { + rootCmd.AddCommand(tapCmd) + tapCmd.PersistentFlags().String("offset", "latest", "the offset to read from (earliest, latest)") + tapCmd.PersistentFlags().String("aws-kinesis-data-stream", "", "ARN of the Kinesis Data Stream to tap") + tapCmd.PersistentFlags().StringToString("ext-str", nil, "set external variables") +} + +var tapConfig = fmt.Sprintf(`local sub = %s; - // StreamName is the name of the Kinesis stream to read from. - StreamName string - // StreamOffset is the read offset of the stream (earliest, latest). - StreamOffset string +{ + transforms: [ + sub.tf.send.stdout(), + ], } +`, substation.Library) + +var tapCmd = &cobra.Command{ + Use: "tap [path]", + Short: "tap data streams", + Long: `'substation tap' reads from a data stream. +It supports these data stream sources: + AWS Kinesis Data Streams (--aws-kinesis-data-stream) + +The data stream can be read from either the beginning +(earliest) or the end (latest) using the --offset flag. +Reading the stream can be interrupted by sending an +interrupt signal (ex. Ctrl+C). + +If the config is not already compiled, then it is compiled +before reading the stream ('.jsonnet', '.libsonnet' files are +compiled to JSON). If no config is provided, then the stream +data is sent to stdout. + +Debug logs can be enabled to report the status of reading +from the data stream. Use this environment variable to +enable debug logs: SUBSTATION_DEBUG=true + +WARNING: This command is intended to provide temporary access +to streaming data and should not be used for production workloads. +`, + Example: ` substation tap --aws-kinesis-data-stream arn:aws:kinesis:us-east-1:123456789012:stream/my-stream + substation tap --aws-kinesis-data-stream arn:aws:kinesis:us-east-1:123456789012:stream/my-stream --offset earliest + substation tap /path/to/config.json --aws-kinesis-data-stream arn:aws:kinesis:us-east-1:123456789012:stream/my-stream +`, + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + // If no path is provided, then a default config is used. + path := "" + if len(args) > 0 { + path = args[0] + } -// getConfig contextually retrieves a Substation configuration. -func getConfig(ctx context.Context, cfg string) (io.Reader, error) { - path, err := file.Get(ctx, cfg) - defer os.Remove(path) + // Catches an edge case where the user is looking for help. + if path == "help" { + fmt.Printf("warning: use -h instead.\n") + return nil + } - if err != nil { - return nil, err - } + ext, err := cmd.PersistentFlags().GetStringToString("ext-str") + if err != nil { + return err + } - conf, err := os.Open(path) - if err != nil { - return nil, err - } - defer conf.Close() + offset, err := cmd.Flags().GetString("offset") + if err != nil { + return err + } - buf := new(bytes.Buffer) - if _, err := io.Copy(buf, conf); err != nil { - return nil, err - } + kinesis, err := cmd.Flags().GetString("aws-kinesis-data-stream") + if err != nil { + return err + } - return buf, nil + if kinesis != "" { + return tapKinesis(path, ext, offset, kinesis) + } + + return fmt.Errorf("no valid data stream source provided") + }, } -func main() { - var opts options +//nolint:gocognit, cyclop, gocyclo // Ignore cognitive and cyclomatic complexity. +func tapKinesis(arg string, extVars map[string]string, offset, stream string) error { + cfg := customConfig{} - flag.StringVar(&opts.Config, "config", "./config.json", "The Substation configuration file used to transform records") - flag.StringVar(&opts.StreamName, "stream-name", "", "The AWS Kinesis Data Stream to fetch records from") - flag.StringVar(&opts.StreamOffset, "stream-offset", "earliest", "Determines the offset of the stream (earliest, latest)") - flag.Parse() + switch filepath.Ext(arg) { + case ".jsonnet", ".libsonnet": + mem, err := compileFile(arg, extVars) + if err != nil { + // This is an error in the Jsonnet syntax. + // The line number and column range are included. + // + // Example: `vet.jsonnet:19:36-38 Unknown variable: st` + fmt.Printf("%v\n", err) - if err := run(context.Background(), opts); err != nil { - panic(fmt.Errorf("main: %v", err)) - } -} + return nil + } -//nolint:gocognit // Ignore cognitive complexity. -func run(ctx context.Context, opts options) error { - cfg := substation.Config{} - c, err := getConfig(ctx, opts.Config) - if err != nil { - return err - } + cfg, err = memConfig(mem) + if err != nil { + return err + } + case ".json": + fi, err := fiConfig(arg) + if err != nil { + return err + } - if err := json.NewDecoder(c).Decode(&cfg); err != nil { - return err + cfg = fi + default: + mem, err := compileStr(tapConfig, extVars) + if err != nil { + return err + } + + cfg, err = memConfig(mem) + if err != nil { + return err + } } - sub, err := substation.New(ctx, cfg) + ctx := context.Background() + sub, err := substation.New(ctx, cfg.Config) if err != nil { return err } @@ -151,22 +212,22 @@ func run(ctx context.Context, opts options) error { client := kinesis.NewFromConfig(awsCfg) resp, err := client.ListShards(ctx, &kinesis.ListShardsInput{ - StreamName: &opts.StreamName, + StreamARN: &stream, }) if err != nil { return err } - log.WithField("stream", opts.StreamName).WithField("count", len(resp.Shards)).Debug("Retrieved active shards from Kinesis stream.") + log.WithField("stream", stream).WithField("count", len(resp.Shards)).Debug("Retrieved active shards from Kinesis stream.") var iType string - switch opts.StreamOffset { + switch offset { case "earliest": iType = "TRIM_HORIZON" case "latest": iType = "LATEST" default: - return fmt.Errorf("invalid offset: %s", opts.StreamOffset) + return fmt.Errorf("invalid offset: %s", stream) } // Each shard is read concurrently using a worker @@ -187,7 +248,7 @@ func run(ctx context.Context, opts options) error { // reached or the context is cancelled. for _, shard := range resp.Shards { iterator, err := client.GetShardIterator(ctx, &kinesis.GetShardIteratorInput{ - StreamName: &opts.StreamName, + StreamARN: &stream, ShardId: shard.ShardId, ShardIteratorType: types.ShardIteratorType(iType), }) @@ -217,7 +278,7 @@ func run(ctx context.Context, opts options) error { } if resp.NextShardIterator == nil { - log.WithField("stream", opts.StreamName).WithField("shard", shard.ShardId).Debug("Reached end of Kinesis shard.") + log.WithField("stream", stream).WithField("shard", shard.ShardId).Debug("Reached end of Kinesis shard.") break } @@ -234,7 +295,7 @@ func run(ctx context.Context, opts options) error { return err } - log.WithField("stream", opts.StreamName).WithField("shard", shard.ShardId).WithField("count", len(deagg)).Debug("Retrieved records from Kinesis shard.") + log.WithField("stream", stream).WithField("shard", shard.ShardId).WithField("count", len(deagg)).Debug("Retrieved records from Kinesis shard.") for _, record := range deagg { msg := message.New().SetData(record.Data).SkipMissingValues() From 01c4d4c13cb738da67237abebe4f5e596bea2205 Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sat, 26 Oct 2024 09:06:20 -0700 Subject: [PATCH 02/12] refactor(cmd): help Warning, Msg Flags --- cmd/substation/build.go | 2 +- cmd/substation/fmt.go | 2 +- cmd/substation/main.go | 15 +++++++++++++++ cmd/substation/playground.go | 4 ++-- cmd/substation/test.go | 3 ++- cmd/substation/vet.go | 2 +- 6 files changed, 22 insertions(+), 6 deletions(-) diff --git a/cmd/substation/build.go b/cmd/substation/build.go index 1971a32e..a067bc42 100644 --- a/cmd/substation/build.go +++ b/cmd/substation/build.go @@ -35,7 +35,7 @@ and the current directory is used if no arg is provided.`, // Catches an edge case where the user is looking for help. if path == "help" { - fmt.Printf("warning: %q matched no files\n", path) + fmt.Printf("warning: use -h instead.\n") return nil } diff --git a/cmd/substation/fmt.go b/cmd/substation/fmt.go index e6ec8bb7..7f6dffe9 100644 --- a/cmd/substation/fmt.go +++ b/cmd/substation/fmt.go @@ -40,7 +40,7 @@ Supported file extensions: .jsonnet, .libsonnet`, // Catches an edge case where the user is looking for help. if path == "help" { - fmt.Printf("warning: %q matched no files\n", path) + fmt.Printf("warning: use -h instead.\n") return nil } diff --git a/cmd/substation/main.go b/cmd/substation/main.go index 34a91bd1..7727536e 100644 --- a/cmd/substation/main.go +++ b/cmd/substation/main.go @@ -44,6 +44,21 @@ func compileFile(f string, extVars map[string]string) (string, error) { return res, nil } +// compileStr returns JSON from a Jsonnet string. +func compileStr(m string, extVars map[string]string) (string, error) { + vm := jsonnet.MakeVM() + for k, v := range extVars { + vm.ExtVar(k, v) + } + + res, err := vm.EvaluateAnonymousSnippet("snippet", m) + if err != nil { + return "", err + } + + return res, nil +} + // pathVars returns the directory and file name of a file path. func pathVars(p string) (string, string) { dir, fn := filepath.Split(p) diff --git a/cmd/substation/playground.go b/cmd/substation/playground.go index 3c9dde96..bfa46e4d 100644 --- a/cmd/substation/playground.go +++ b/cmd/substation/playground.go @@ -237,7 +237,7 @@ func handleTest(w http.ResponseWriter, r *http.Request) { testPassed := true for _, msg := range tMsgs { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { continue } @@ -330,7 +330,7 @@ func handleRun(w http.ResponseWriter, r *http.Request) { var output []string for _, msg := range result { - if !msg.IsControl() { + if !msg.HasFlag(message.IsControl) { output = append(output, string(msg.Data())) } } diff --git a/cmd/substation/test.go b/cmd/substation/test.go index e4749362..6f68e8ef 100644 --- a/cmd/substation/test.go +++ b/cmd/substation/test.go @@ -54,6 +54,7 @@ func fiConfig(f string) (customConfig, error) { func memConfig(m string) (customConfig, error) { cfg := customConfig{} + if err := json.Unmarshal([]byte(m), &cfg); err != nil { return customConfig{}, err } @@ -135,7 +136,7 @@ production resources, such as any enrichment or send transforms. // Catches an edge case where the user is looking for help. if path == "help" { - fmt.Printf("warning: %q matched no files\n", path) + fmt.Printf("warning: use -h instead.\n") return nil } diff --git a/cmd/substation/vet.go b/cmd/substation/vet.go index 24cd332b..25fb5b9a 100644 --- a/cmd/substation/vet.go +++ b/cmd/substation/vet.go @@ -74,7 +74,7 @@ array: // Catches an edge case where the user is looking for help. if path == "help" { - fmt.Printf("warning: %q matched no files\n", path) + fmt.Printf("warning: use -h instead.\n") return nil } From 462f1c9bd38bcc9135054571912ae54fea852bca Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sat, 26 Oct 2024 09:14:21 -0700 Subject: [PATCH 03/12] docs(cmd): Add SemVer Warning --- cmd/substation/tap.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/substation/tap.go b/cmd/substation/tap.go index 09ee2f8e..bc662982 100644 --- a/cmd/substation/tap.go +++ b/cmd/substation/tap.go @@ -64,6 +64,10 @@ enable debug logs: SUBSTATION_DEBUG=true WARNING: This command is intended to provide temporary access to streaming data and should not be used for production workloads. + +WARNING: This command is "experimental" and does not strictly +adhere to semantic versioning. Refer to the versioning policy +for more information. `, Example: ` substation tap --aws-kinesis-data-stream arn:aws:kinesis:us-east-1:123456789012:stream/my-stream substation tap --aws-kinesis-data-stream arn:aws:kinesis:us-east-1:123456789012:stream/my-stream --offset earliest From e54dd95e5358c6e5af7986cc28fd5f83f4818da6 Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sun, 27 Oct 2024 09:49:52 -0700 Subject: [PATCH 04/12] feat(cmd): Add read Command to CLI Tool --- cmd/substation/read.go | 364 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 364 insertions(+) create mode 100644 cmd/substation/read.go diff --git a/cmd/substation/read.go b/cmd/substation/read.go new file mode 100644 index 00000000..7de78112 --- /dev/null +++ b/cmd/substation/read.go @@ -0,0 +1,364 @@ +package main + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "slices" + "strings" + + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/hashicorp/go-retryablehttp" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" + + "github.com/brexhq/substation/v2" + "github.com/brexhq/substation/v2/message" + + "github.com/brexhq/substation/v2/internal/bufio" + "github.com/brexhq/substation/v2/internal/channel" + iconfig "github.com/brexhq/substation/v2/internal/config" + "github.com/brexhq/substation/v2/internal/media" +) + +func init() { + rootCmd.AddCommand(readCmd) + readCmd.PersistentFlags().String("file", "", "path to the file to read") + readCmd.PersistentFlags().String("http", "", "http(s) endpoint to read") + readCmd.PersistentFlags().String("aws", "", "aws s3 object to read") + readCmd.PersistentFlags().StringToString("ext-str", nil, "set external variables") + readCmd.Flags().SortFlags = false + readCmd.PersistentFlags().SortFlags = false +} + +var readCmd = &cobra.Command{ + Use: "read [path]", + Short: "read files", + Long: `'substation read' reads data from a file. +It supports these file sources: + Local File (--file) + HTTP(S) Endpoint (--http) + AWS S3 Object (--aws) + +If the config is not already compiled, then it is compiled +before reading the stream ('.jsonnet', '.libsonnet' files are +compiled to JSON). If no config is provided, then the stream +data is sent to stdout. + +WARNING: This command is "experimental" and does not strictly +adhere to semantic versioning. Refer to the versioning policy +for more information. +`, + Example: ` substation read --file /path/to/file.json + substation read --http https://example.com + substation read --aws s3://bucket/path/to/file.json + substation read /path/to/config.json --file /path/to/file.json +`, + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + // If no path is provided, then a default config is used. + path := "" + if len(args) > 0 { + path = args[0] + } + + // Catches an edge case where the user is looking for help. + if path == "help" { + fmt.Printf("warning: use -h instead.\n") + return nil + } + + ext, err := cmd.PersistentFlags().GetStringToString("ext-str") + if err != nil { + return err + } + + var cfg customConfig + + switch filepath.Ext(path) { + case ".jsonnet", ".libsonnet": + mem, err := compileFile(path, ext) + if err != nil { + // This is an error in the Jsonnet syntax. + // The line number and column range are included. + // + // Example: `vet.jsonnet:19:36-38 Unknown variable: st` + fmt.Printf("%v\n", err) + + return nil + } + + cfg, err = memConfig(mem) + if err != nil { + return err + } + case ".json": + fi, err := fiConfig(path) + if err != nil { + return err + } + + cfg = fi + default: + mem, err := compileStr(confStdout, ext) + if err != nil { + return err + } + + cfg, err = memConfig(mem) + if err != nil { + return err + } + } + + switch { + case cmd.Flags().Lookup("file").Changed: + fi, err := cmd.PersistentFlags().GetString("file") + if err != nil { + return err + } + + f, err := readFile(fi) + if err != nil { + return err + } + + return read(cfg, f) + case cmd.Flags().Lookup("http").Changed: + fi, err := cmd.PersistentFlags().GetString("http") + if err != nil { + return err + } + + f, err := readHTTP(fi) + defer func() { // Always clean up the temp file. + _ = f.Close() + _ = os.Remove(f.Name()) + }() + + if err != nil { + return err + } + + if err := read(cfg, f); err != nil { + return err + } + + return nil + case cmd.Flags().Lookup("aws").Changed: + fi, err := cmd.PersistentFlags().GetString("aws") + if err != nil { + return err + } + + f, err := readS3(fi) + defer func() { // Always clean up the temp file. + _ = f.Close() + _ = os.Remove(f.Name()) + }() + + if err != nil { + return err + } + + if err := read(cfg, f); err != nil { + return err + } + + return nil + } + + return fmt.Errorf("no valid file source provided") + }, +} + +func read(cfg customConfig, f *os.File) error { + if f == nil { + return fmt.Errorf("invalid file") + } + + ctx := context.Background() + sub, err := substation.New(ctx, cfg.Config) + if err != nil { + return err + } + + ch := channel.New[*message.Message]() + group, ctx := errgroup.WithContext(ctx) + + group.Go(func() error { + tfGroup, tfCtx := errgroup.WithContext(ctx) + tfGroup.SetLimit(1) // Set to 1 to process messages sequentially. + + for message := range ch.Recv() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + msg := message + tfGroup.Go(func() error { + if _, err := sub.Transform(tfCtx, msg); err != nil { + return err + } + + return nil + }) + } + + if err := tfGroup.Wait(); err != nil { + return err + } + + // ctrl messages flush the pipeline. This must be done + // after all messages have been processed. + ctrl := message.New().AsControl() + if _, err := sub.Transform(tfCtx, ctrl); err != nil { + return err + } + + return nil + }) + + group.Go(func() error { + defer ch.Close() + + mediaType, err := media.File(f) + if err != nil { + return err + } + + if _, err := f.Seek(0, 0); err != nil { + return err + } + + // Unsupported media types are sent as binary data. + if !slices.Contains(bufio.MediaTypes, mediaType) { + r, err := io.ReadAll(f) + if err != nil { + return err + } + + msg := message.New().SetData(r).SkipMissingValues() + ch.Send(msg) + + return nil + } + + scanner := bufio.NewScanner() + defer scanner.Close() + + if err := scanner.ReadFile(f); err != nil { + return err + } + + for scanner.Scan() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + b := []byte(scanner.Text()) + msg := message.New().SetData(b).SkipMissingValues() + + ch.Send(msg) + } + + if err := scanner.Err(); err != nil { + return err + } + + return nil + }) + + if err := group.Wait(); err != nil { + return err + } + + return nil +} + +func readFile(fi string) (*os.File, error) { + if _, err := os.Stat(fi); err != nil { + return nil, fmt.Errorf("invalid file: %s", fi) + } + + f, err := os.Open(fi) + if err != nil { + return nil, err + } + + return f, nil +} + +func readHTTP(fi string) (*os.File, error) { + f, err := os.CreateTemp("", "substation") + if err != nil { + return nil, err + } + + if !strings.HasPrefix(fi, "http://") && !strings.HasPrefix(fi, "https://") { + return f, fmt.Errorf("invalid http endpoint: %s", fi) + } + + resp, err := retryablehttp.Get(fi) + if err != nil { + return f, err + } + defer resp.Body.Close() + + size, err := io.Copy(f, resp.Body) + if err != nil { + return f, err + } + + if size == 0 { + return nil, fmt.Errorf("empty file: %s", fi) + } + + return f, nil +} + +func readS3(fi string) (*os.File, error) { + f, err := os.CreateTemp("", "substation") + if err != nil { + return nil, err + } + + if !strings.HasPrefix(fi, "s3://") { + return f, fmt.Errorf("invalid s3 object: %s", fi) + } + + ctx := context.Background() + awsCfg, err := iconfig.NewAWS(ctx, iconfig.AWS{}) + if err != nil { + return f, err + } + + c := s3.NewFromConfig(awsCfg) + s3downloader := manager.NewDownloader(c) + + // "s3://bucket/key" becomes ["bucket" "key"] + paths := strings.SplitN(strings.TrimPrefix(fi, "s3://"), "/", 2) + + // Download the file from S3. + ctx = context.WithoutCancel(ctx) + size, err := s3downloader.Download(ctx, f, &s3.GetObjectInput{ + Bucket: &paths[0], + Key: &paths[1], + }) + if err != nil { + return f, err + } + + if size == 0 { + return f, fmt.Errorf("empty file: %s", fi) + } + + return f, nil +} From 6b29690ce22b5615194596eb9de84a563a394a23 Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sun, 27 Oct 2024 09:50:14 -0700 Subject: [PATCH 05/12] refactor(cmd): Update CLI Tool Commands --- cmd/substation/demo.go | 297 ++--------------------------------- cmd/substation/main.go | 103 +++++++++++- cmd/substation/playground.go | 37 ++--- cmd/substation/tap.go | 11 +- cmd/substation/vet.go | 3 +- 5 files changed, 119 insertions(+), 332 deletions(-) diff --git a/cmd/substation/demo.go b/cmd/substation/demo.go index 508291bb..c71fd2b7 100644 --- a/cmd/substation/demo.go +++ b/cmd/substation/demo.go @@ -16,289 +16,6 @@ func init() { rootCmd.AddCommand(demoCmd) } -const demoConf = ` -local sub = import '../../substation.libsonnet'; - -{ - transforms: [ - // Move the event to the 'event.original' field. - sub.tf.obj.cp({object: { source_key: '@this', target_key: 'meta event.original' }}), - sub.tf.obj.cp({object: { source_key: 'meta @this' }}), - - // Insert the hash of the original event into the 'event.hash' field. - sub.tf.hash.sha256({obj: { src: 'event.original', trg: 'event.hash'}}), - - // Insert the event dataset into the 'event.dataset' field. - sub.tf.obj.insert({obj: { trg: 'event.dataset' }, value: 'aws.cloudtrail'}), - - // Insert the kind of event into the 'event.kind' field. - sub.tf.obj.insert({obj: { trg: 'event.kind' }, value: 'event'}), - - // Insert the event category into the 'event.category' field. - sub.tf.obj.insert({obj: { trg: std.format('%s.-1', 'event.category') }, value: 'configuration'}), - - // Insert the event type into the 'event.type' field. - sub.tf.obj.insert({obj: { trg: std.format('%s.-1', 'event.type') }, value: 'change'}), - - // Insert the outcome into the 'event.outcome' field. - sub.tf.meta.switch({ cases: [ - { - condition: sub.cnd.num.len.gt({ obj: { src: 'errorCode' }, value: 0 }), - transforms: [ - sub.tf.obj.insert({ obj: { trg: 'event.outcome' }, value: 'failure' }), - ], - }, - { - transforms: [ - sub.tf.obj.insert({ obj: { trg: 'event.outcome' }, value: 'success' }), - ], - }, - ] }), - - // Copy the event time to the '@timestamp' field. - sub.tf.obj.cp({obj: { src: 'event.original.eventTime', trg: '\\@timestamp' }}), - - // Copy the IP address to the 'source.ip' field. - sub.tf.obj.cp({obj: { src: 'event.original.sourceIPAddress', trg: 'source.ip' }}), - - // Copy the user agent to the 'user_agent.original' field. - sub.tf.obj.cp({obj: { src: 'event.original.userAgent', trg: 'user_agent.original' }}), - - // Copy the region to the 'cloud.region' field. - sub.tf.obj.cp({obj: { src: 'event.original.awsRegion', trg: 'cloud.region' }}), - - // Copy the account ID to the 'cloud.account.id' field. - sub.tf.obj.cp({obj: { src: 'event.original.userIdentity.accountId', trg: 'cloud.account.id' }}), - - // Add the cloud service provider to the 'cloud.provider' field. - sub.tf.obj.insert({obj: { trg: 'cloud.provider' }, value: 'aws'}), - - // Extract the cloud service into the 'cloud.service.name' field. - sub.tf.str.capture({obj: { src: 'event.original.eventSource', trg: 'cloud.service.name' }, pattern: '^(.*)\\.amazonaws\\.com$'}), - - // Make the event pretty before printing to the console. - sub.tf.obj.cp({obj: { src: '@this|@pretty' }}), - sub.tf.send.stdout(), - ], -} -` - -const demoCompiled = ` -{ - "transforms": [ - { - "settings": { - "id": "2bbe3748-28c56e0b", - "object": { - "source_key": "@this", - "target_key": "meta event.original" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "2bbe3748-61e51827", - "object": { - "source_key": "meta @this" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "324f1035-f49e5682", - "object": { - "source_key": "event.original", - "target_key": "event.hash" - } - }, - "type": "hash_sha256" - }, - { - "settings": { - "id": "5f4ae672-0478e109", - "object": { - "target_key": "event.dataset" - }, - "value": "aws.cloudtrail" - }, - "type": "object_insert" - }, - { - "settings": { - "id": "5f4ae672-7de9f731", - "object": { - "target_key": "event.kind" - }, - "value": "event" - }, - "type": "object_insert" - }, - { - "settings": { - "id": "5f4ae672-2c1fa54f", - "object": { - "target_key": "event.category.-1" - }, - "value": "configuration" - }, - "type": "object_insert" - }, - { - "settings": { - "id": "5f4ae672-e97ed8b8", - "object": { - "target_key": "event.type.-1" - }, - "value": "change" - }, - "type": "object_insert" - }, - { - "settings": { - "cases": [ - { - "condition": { - "settings": { - "measurement": "byte", - "object": { - "source_key": "errorCode" - }, - "value": 0 - }, - "type": "number_length_greater_than" - }, - "transforms": [ - { - "settings": { - "id": "5f4ae672-c3cc893e", - "object": { - "target_key": "event.outcome" - }, - "value": "failure" - }, - "type": "object_insert" - } - ] - }, - { - "transforms": [ - { - "settings": { - "id": "5f4ae672-87ff6d17", - "object": { - "target_key": "event.outcome" - }, - "value": "success" - }, - "type": "object_insert" - } - ] - } - ], - "id": "b3a47dd1-fddb5674" - }, - "type": "meta_switch" - }, - { - "settings": { - "id": "2bbe3748-e3640864", - "object": { - "source_key": "event.original.eventTime", - "target_key": "\\@timestamp" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "2bbe3748-63faf2a6", - "object": { - "source_key": "event.original.sourceIPAddress", - "target_key": "source.ip" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "2bbe3748-3b7dfda5", - "object": { - "source_key": "event.original.userAgent", - "target_key": "user_agent.original" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "2bbe3748-626bded4", - "object": { - "source_key": "event.original.awsRegion", - "target_key": "cloud.region" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "2bbe3748-061dfac7", - "object": { - "source_key": "event.original.userIdentity.accountId", - "target_key": "cloud.account.id" - } - }, - "type": "object_copy" - }, - { - "settings": { - "id": "5f4ae672-5c9e5d3a", - "object": { - "target_key": "cloud.provider" - }, - "value": "aws" - }, - "type": "object_insert" - }, - { - "settings": { - "count": 0, - "id": "e3bd5484-53bd3692", - "object": { - "source_key": "event.original.eventSource", - "target_key": "cloud.service.name" - }, - "pattern": "^(.*)\\.amazonaws\\.com$" - }, - "type": "string_capture" - }, - { - "settings": { - "id": "2bbe3748-15552062", - "object": { - "source_key": "@this|@pretty" - } - }, - "type": "object_copy" - }, - { - "settings": { - "batch": { - "count": 1000, - "duration": "1m", - "size": 1000000 - }, - "id": "de19b3c9-67c1890d" - }, - "type": "send_stdout" - } - ] -} -` - -const demoEvt = `{"eventVersion":"1.08","userIdentity":{"type":"IAMUser","principalId":"EXAMPLE123456789","arn":"arn:aws:iam::123456789012:user/Alice","accountId":"123456789012","accessKeyId":"ASIAEXAMPLE123","sessionContext":{"attributes":{"mfaAuthenticated":"false","creationDate":"2024-10-01T12:00:00Z"},"sessionIssuer":{"type":"AWS","principalId":"EXAMPLE123456","arn":"arn:aws:iam::123456789012:role/Admin","accountId":"123456789012","userName":"Admin"}}},"eventTime":"2024-10-01T12:30:45Z","eventSource":"s3.amazonaws.com","eventName":"PutBucketPolicy","awsRegion":"us-west-2","sourceIPAddress":"203.0.113.0","userAgent":"aws-sdk-python/1.0.0 Python/3.8.0 Linux/4.15.0","requestParameters":{"bucketName":"example-bucket","policy":"{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":\"*\",\"Action\":\"s3:GetObject\",\"Resource\":\"arn:aws:s3:::example-bucket/*\"}]}"}},"responseElements":{"location":"http://example-bucket.s3.amazonaws.com/"},"requestID":"EXAMPLE123456789","eventID":"EXAMPLE-1-2-3-4-5-6","readOnly":false,"resources":[{"ARN":"arn:aws:s3:::example-bucket","accountId":"123456789012","type":"AWS::S3::Bucket"}],"eventType":"AwsApiCall","managementEvent":true,"recipientAccountId":"123456789012"}` - var demoCmd = &cobra.Command{ Use: "demo", Short: "demo substation", @@ -313,9 +30,13 @@ partially normalized to the Elastic Common Schema (ECS). `, Args: cobra.MaximumNArgs(0), RunE: func(cmd *cobra.Command, args []string) error { - cfg := substation.Config{} + conf, err := compileStr(confDemo, nil) + if err != nil { + return err + } - if err := json.Unmarshal([]byte(demoCompiled), &cfg); err != nil { + cfg := substation.Config{} + if err := json.Unmarshal([]byte(conf), &cfg); err != nil { return err } @@ -326,19 +47,19 @@ partially normalized to the Elastic Common Schema (ECS). } msgs := []*message.Message{ - message.New().SetData([]byte(demoEvt)).SkipMissingValues(), + message.New().SetData([]byte(evtDemo)).SkipMissingValues(), message.New().AsControl(), } // Make the input pretty before printing to the console. - fmt.Printf("input:\n%s\n", gjson.Get(demoEvt, "@this|@pretty").String()) + fmt.Printf("input:\n%s\n", gjson.Get(evtDemo, "@this|@pretty").String()) fmt.Printf("output:\n") if _, err := sub.Transform(ctx, msgs...); err != nil { return err } - fmt.Printf("\nconfig:%s\n", demoConf) + fmt.Printf("\nconfig:\n%s\n", confDemo) return nil }, diff --git a/cmd/substation/main.go b/cmd/substation/main.go index 7727536e..acb4456d 100644 --- a/cmd/substation/main.go +++ b/cmd/substation/main.go @@ -1,12 +1,15 @@ package main import ( + "io" "os" "path/filepath" "strings" "github.com/google/go-jsonnet" "github.com/spf13/cobra" + + "github.com/brexhq/substation/v2" ) var rootCmd = &cobra.Command{ @@ -14,6 +17,87 @@ var rootCmd = &cobra.Command{ Long: "'substation' is a tool for managing Substation configurations.", } +const ( + // confStdout is the default configuration used by + // read-like commands. It prints any results (<= 100MB) + // to stdout. + confStdout = `local sub = std.extVar('sub'); + +{ + transforms: [ + sub.tf.send.stdout({ batch: { size: 100000000000, count: 1 } }), + ], +}` + + // confDemo is a demo configuration for AWS CloudTrail. + confDemo = `local sub = std.extVar('sub'); + +{ + transforms: [ + // Move the event to the 'event.original' field. + sub.tf.obj.cp({object: { source_key: '@this', target_key: 'meta event.original' }}), + sub.tf.obj.cp({object: { source_key: 'meta @this' }}), + + // Insert the hash of the original event into the 'event.hash' field. + sub.tf.hash.sha256({obj: { src: 'event.original', trg: 'event.hash'}}), + + // Insert the event dataset into the 'event.dataset' field. + sub.tf.obj.insert({obj: { trg: 'event.dataset' }, value: 'aws.cloudtrail'}), + + // Insert the kind of event into the 'event.kind' field. + sub.tf.obj.insert({obj: { trg: 'event.kind' }, value: 'event'}), + + // Insert the event category into the 'event.category' field. + sub.tf.obj.insert({obj: { trg: std.format('%s.-1', 'event.category') }, value: 'configuration'}), + + // Insert the event type into the 'event.type' field. + sub.tf.obj.insert({obj: { trg: std.format('%s.-1', 'event.type') }, value: 'change'}), + + // Insert the outcome into the 'event.outcome' field. + sub.tf.meta.switch({ cases: [ + { + condition: sub.cnd.num.len.gt({ obj: { src: 'errorCode' }, value: 0 }), + transforms: [ + sub.tf.obj.insert({ obj: { trg: 'event.outcome' }, value: 'failure' }), + ], + }, + { + transforms: [ + sub.tf.obj.insert({ obj: { trg: 'event.outcome' }, value: 'success' }), + ], + }, + ] }), + + // Copy the event time to the '@timestamp' field. + sub.tf.obj.cp({obj: { src: 'event.original.eventTime', trg: '\\@timestamp' }}), + + // Copy the IP address to the 'source.ip' field. + sub.tf.obj.cp({obj: { src: 'event.original.sourceIPAddress', trg: 'source.ip' }}), + + // Copy the user agent to the 'user_agent.original' field. + sub.tf.obj.cp({obj: { src: 'event.original.userAgent', trg: 'user_agent.original' }}), + + // Copy the region to the 'cloud.region' field. + sub.tf.obj.cp({obj: { src: 'event.original.awsRegion', trg: 'cloud.region' }}), + + // Copy the account ID to the 'cloud.account.id' field. + sub.tf.obj.cp({obj: { src: 'event.original.userIdentity.accountId', trg: 'cloud.account.id' }}), + + // Add the cloud service provider to the 'cloud.provider' field. + sub.tf.obj.insert({obj: { trg: 'cloud.provider' }, value: 'aws'}), + + // Extract the cloud service into the 'cloud.service.name' field. + sub.tf.str.capture({obj: { src: 'event.original.eventSource', trg: 'cloud.service.name' }, pattern: '^(.*)\\.amazonaws\\.com$'}), + + // Make the event pretty before printing to the console. + sub.tf.obj.cp({obj: { src: '@this|@pretty' }}), + sub.tf.send.stdout(), + ], +}` + + evtDemo = `{"eventVersion":"1.08","userIdentity":{"type":"IAMUser","principalId":"EXAMPLE123456789","arn":"arn:aws:iam::123456789012:user/Alice","accountId":"123456789012","accessKeyId":"ASIAEXAMPLE123","sessionContext":{"attributes":{"mfaAuthenticated":"false","creationDate":"2024-10-01T12:00:00Z"},"sessionIssuer":{"type":"AWS","principalId":"EXAMPLE123456","arn":"arn:aws:iam::123456789012:role/Admin","accountId":"123456789012","userName":"Admin"}}},"eventTime":"2024-10-01T12:30:45Z","eventSource":"s3.amazonaws.com","eventName":"PutBucketPolicy","awsRegion":"us-west-2","sourceIPAddress":"203.0.113.0","userAgent":"aws-sdk-python/1.0.0 Python/3.8.0 Linux/4.15.0","requestParameters":{"bucketName":"example-bucket","policy":"{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":\"*\",\"Action\":\"s3:GetObject\",\"Resource\":\"arn:aws:s3:::example-bucket/*\"}]}"}},"responseElements":{"location":"http://example-bucket.s3.amazonaws.com/"},"requestID":"EXAMPLE123456789","eventID":"EXAMPLE-1-2-3-4-5-6","readOnly":false,"resources":[{"ARN":"arn:aws:s3:::example-bucket","accountId":"123456789012","type":"AWS::S3::Bucket"}],"eventType":"AwsApiCall","managementEvent":true,"recipientAccountId":"123456789012"}` +) + func init() { // Hides the 'completion' command. rootCmd.AddCommand(&cobra.Command{ @@ -30,28 +114,31 @@ func init() { } // compileFile returns JSON from a Jsonnet file. -func compileFile(f string, extVars map[string]string) (string, error) { - vm := jsonnet.MakeVM() - for k, v := range extVars { - vm.ExtVar(k, v) +func compileFile(fi string, extVars map[string]string) (string, error) { + f, err := os.Open(fi) + if err != nil { + return "", err } + defer f.Close() - res, err := vm.EvaluateFile(f) + s, err := io.ReadAll(f) if err != nil { return "", err } - return res, nil + return compileStr(string(s), extVars) } // compileStr returns JSON from a Jsonnet string. -func compileStr(m string, extVars map[string]string) (string, error) { +func compileStr(s string, extVars map[string]string) (string, error) { vm := jsonnet.MakeVM() + vm.ExtCode("sub", substation.Library) + for k, v := range extVars { vm.ExtVar(k, v) } - res, err := vm.EvaluateAnonymousSnippet("snippet", m) + res, err := vm.EvaluateAnonymousSnippet("snippet", s) if err != nil { return "", err } diff --git a/cmd/substation/playground.go b/cmd/substation/playground.go index bfa46e4d..eaa7cf46 100644 --- a/cmd/substation/playground.go +++ b/cmd/substation/playground.go @@ -18,12 +18,12 @@ import ( "syscall" "time" + "github.com/google/go-jsonnet/formatter" + "github.com/spf13/cobra" + "github.com/brexhq/substation/v2" "github.com/brexhq/substation/v2/condition" "github.com/brexhq/substation/v2/message" - "github.com/google/go-jsonnet" - "github.com/google/go-jsonnet/formatter" - "github.com/spf13/cobra" ) //go:embed playground.tmpl @@ -137,11 +137,9 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { func handleDemo(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - cleanedDemoconf := strings.ReplaceAll(demoConf, "local sub = import '../../substation.libsonnet';\n\n", "") - if err := json.NewEncoder(w).Encode(map[string]interface{}{ - "config": cleanedDemoconf, - "input": demoEvt, + "config": confDemo, + "input": evtDemo, }); err != nil { http.Error(w, fmt.Sprintf("Error encoding response: %v", err), http.StatusInternalServerError) } @@ -162,19 +160,14 @@ func handleTest(w http.ResponseWriter, r *http.Request) { return } - combinedConfig := fmt.Sprintf(`local sub = %s; - -%s`, substation.Library, request.Config) - - vm := jsonnet.MakeVM() - jsonString, err := vm.EvaluateAnonymousSnippet("", combinedConfig) + conf, err := compileStr(request.Config, nil) if err != nil { - http.Error(w, fmt.Sprintf("Error evaluating Jsonnet: %v", err), http.StatusBadRequest) + http.Error(w, fmt.Sprintf("Error compiling config: %v", err), http.StatusBadRequest) return } var cfg customConfig - if err := json.Unmarshal([]byte(jsonString), &cfg); err != nil { + if err := json.Unmarshal([]byte(conf), &cfg); err != nil { http.Error(w, fmt.Sprintf("Invalid configuration: %v", err), http.StatusBadRequest) return } @@ -289,19 +282,14 @@ func handleRun(w http.ResponseWriter, r *http.Request) { return } - combinedConfig := fmt.Sprintf(`local sub = %s; - -%s`, substation.Library, request.Config) - - vm := jsonnet.MakeVM() - jsonString, err := vm.EvaluateAnonymousSnippet("", combinedConfig) + conf, err := compileStr(request.Config, nil) if err != nil { - http.Error(w, fmt.Sprintf("Error evaluating Jsonnet: %v", err), http.StatusBadRequest) + http.Error(w, fmt.Sprintf("Error compiling config: %v", err), http.StatusBadRequest) return } var cfg substation.Config - if err := json.Unmarshal([]byte(jsonString), &cfg); err != nil { + if err := json.Unmarshal([]byte(conf), &cfg); err != nil { http.Error(w, fmt.Sprintf("Invalid configuration: %v", err), http.StatusBadRequest) return } @@ -402,7 +390,6 @@ func handleShare(w http.ResponseWriter, r *http.Request) { var request struct { Config string `json:"config"` Input string `json:"input"` - Output string `json:"output"` } if err := json.NewDecoder(r.Body).Decode(&request); err != nil { @@ -411,7 +398,7 @@ func handleShare(w http.ResponseWriter, r *http.Request) { } // Combine and encode the data - combined := request.Config + "{substation-separator}" + request.Input + "{substation-separator}" + request.Output + combined := request.Config + "{substation-separator}" + request.Input + "{substation-separator}" encoded := base64.URLEncoding.EncodeToString([]byte(combined)) // Create the shareable URL diff --git a/cmd/substation/tap.go b/cmd/substation/tap.go index bc662982..5c7d47b5 100644 --- a/cmd/substation/tap.go +++ b/cmd/substation/tap.go @@ -32,15 +32,6 @@ func init() { tapCmd.PersistentFlags().StringToString("ext-str", nil, "set external variables") } -var tapConfig = fmt.Sprintf(`local sub = %s; - -{ - transforms: [ - sub.tf.send.stdout(), - ], -} -`, substation.Library) - var tapCmd = &cobra.Command{ Use: "tap [path]", Short: "tap data streams", @@ -139,7 +130,7 @@ func tapKinesis(arg string, extVars map[string]string, offset, stream string) er cfg = fi default: - mem, err := compileStr(tapConfig, extVars) + mem, err := compileStr(confStdout, extVars) if err != nil { return err } diff --git a/cmd/substation/vet.go b/cmd/substation/vet.go index 25fb5b9a..86d2ff7c 100644 --- a/cmd/substation/vet.go +++ b/cmd/substation/vet.go @@ -7,8 +7,9 @@ import ( "path/filepath" "regexp" - "github.com/brexhq/substation/v2" "github.com/spf13/cobra" + + "github.com/brexhq/substation/v2" ) func init() { From df35d4c657feaa76241850419944774580bd0f4d Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sun, 27 Oct 2024 09:50:43 -0700 Subject: [PATCH 06/12] chore: Delete Scripts, Commands --- build/scripts/config/compile.sh | 9 - build/scripts/config/format.sh | 7 - cmd/development/substation-bench/main.go | 206 ----------------------- cmd/development/substation-file/main.go | 194 --------------------- 4 files changed, 416 deletions(-) delete mode 100644 build/scripts/config/compile.sh delete mode 100644 build/scripts/config/format.sh delete mode 100644 cmd/development/substation-bench/main.go delete mode 100644 cmd/development/substation-file/main.go diff --git a/build/scripts/config/compile.sh b/build/scripts/config/compile.sh deleted file mode 100644 index e09a9232..00000000 --- a/build/scripts/config/compile.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/sh -files=$(find . -name "*.jsonnet") - -for file in $files -do - # 'rev | cut | rev' converts "path/to/file.jsonnet" to "path/to/file.json" - f=$(echo $file | rev | cut -c 4- | rev) - jsonnet $file > $f -done diff --git a/build/scripts/config/format.sh b/build/scripts/config/format.sh deleted file mode 100644 index 90adb1a8..00000000 --- a/build/scripts/config/format.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/sh -files=$(find . \( -name *.jsonnet -o -name *.libsonnet \)) - -for file in $files -do - jsonnetfmt -i $file -done diff --git a/cmd/development/substation-bench/main.go b/cmd/development/substation-bench/main.go deleted file mode 100644 index 21a03ffa..00000000 --- a/cmd/development/substation-bench/main.go +++ /dev/null @@ -1,206 +0,0 @@ -// Benchmarks the performance of Substation by sending a configurable number of events -// through the system and reporting the total time taken, the number of events sent, the -// amount of data sent, and the rate of events and data sent per second. -package main - -import ( - "context" - "encoding/json" - "flag" - "fmt" - "os" - "runtime/pprof" - "time" - - "golang.org/x/sync/errgroup" - - "github.com/brexhq/substation/v2" - "github.com/brexhq/substation/v2/message" - - "github.com/brexhq/substation/v2/internal/bufio" - "github.com/brexhq/substation/v2/internal/channel" - "github.com/brexhq/substation/v2/internal/file" -) - -type options struct { - Count int - Concurrency int - ConfigFile string - DataFile string - pprofCPU bool - pprofMemory bool -} - -func main() { - var opts options - - flag.StringVar(&opts.DataFile, "file", "", "File to parse") - flag.IntVar(&opts.Count, "count", 100000, "Number of events to process (default: 100000)") - flag.IntVar(&opts.Concurrency, "concurrency", -1, "Number of concurrent data transformation functions to run (default: number of CPUs available)") - flag.StringVar(&opts.ConfigFile, "config", "", "Substation configuration file (default: empty config)") - flag.BoolVar(&opts.pprofCPU, "cpu", false, "Enable CPU profiling (default: false)") - flag.BoolVar(&opts.pprofMemory, "mem", false, "Enable memory profiling (default: false)") - flag.Parse() - - if opts.DataFile == "" { - fmt.Println("missing required flag -file") - os.Exit(1) - } - - ctx := context.Background() - - fmt.Printf("%s: Configuring Substation\n", time.Now().Format(time.RFC3339Nano)) - var conf []byte - // If no config file is provided, then an empty config is used. - if opts.ConfigFile != "" { - path, err := file.Get(ctx, opts.ConfigFile) - defer os.Remove(path) - - if err != nil { - panic(err) - } - - conf, err = os.ReadFile(path) - if err != nil { - panic(err) - } - } else { - conf = []byte(`{"transforms":[]}`) - } - - cfg := substation.Config{} - if err := json.Unmarshal(conf, &cfg); err != nil { - panic(err) - } - - sub, err := substation.New(ctx, cfg) - if err != nil { - panic(err) - } - - // Collect the sample data for the benchmark. - path, err := file.Get(ctx, opts.DataFile) - defer os.Remove(path) - - if err != nil { - panic(fmt.Errorf("file: %v", err)) - } - - f, err := os.Open(path) - if err != nil { - panic(fmt.Errorf("file: %v", err)) - } - defer f.Close() - - scanner := bufio.NewScanner() - defer scanner.Close() - - if err := scanner.ReadFile(f); err != nil { - panic(err) - } - - fmt.Printf("%s: Loading data into memory\n", time.Now().Format(time.RFC3339Nano)) - var data [][]byte - dataBytes := 0 - for scanner.Scan() { - b := []byte(scanner.Text()) - for i := 0; i < opts.Count; i++ { - data = append(data, b) - dataBytes += len(b) - } - } - - if err := scanner.Err(); err != nil { - panic(err) - } - - if opts.pprofCPU { - f, err := os.Create("./cpu.prof") - if err != nil { - panic(err) - } - defer f.Close() // error handling omitted for example - if err := pprof.StartCPUProfile(f); err != nil { - panic(err) - } - defer pprof.StopCPUProfile() - } - - fmt.Printf("%s: Starting benchmark\n", time.Now().Format(time.RFC3339Nano)) - start := time.Now() - ch := channel.New[*message.Message]() - group, ctx := errgroup.WithContext(ctx) - - group.Go(func() error { - tfGroup, tfCtx := errgroup.WithContext(ctx) - tfGroup.SetLimit(opts.Concurrency) - - for message := range ch.Recv() { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - msg := message - tfGroup.Go(func() error { - if _, err := sub.Transform(tfCtx, msg); err != nil { - return err - } - - return nil - }) - } - - if err := tfGroup.Wait(); err != nil { - return err - } - - // ctrl messages flush the pipeline. This must be done - // after all messages have been processed. - ctrl := message.New().AsControl() - if _, err := sub.Transform(ctx, ctrl); err != nil { - return err - } - - return nil - }) - - group.Go(func() error { - defer ch.Close() - - for _, b := range data { - msg := message.New().SetData(b).SkipMissingValues() - ch.Send(msg) - } - - return nil - }) - - // Wait for all goroutines to complete. This includes the goroutines that are - // executing the transform functions. - if err := group.Wait(); err != nil { - panic(err) - } - - fmt.Printf("%s: Ending benchmark\n", time.Now().Format(time.RFC3339Nano)) - - // The benchmark reports the total time taken, the number of events sent, the - // amount of data sent, and the rate of events and data sent per second. - elapsed := time.Since(start) - fmt.Printf("\nBenchmark results:\n") - fmt.Printf("- %d events in %s\n", len(data), elapsed) - fmt.Printf("- %.2f events per second\n", float64(len(data))/elapsed.Seconds()) - fmt.Printf("- %d MB in %s\n", dataBytes/1000/1000, elapsed) - fmt.Printf("- %.2f MB per second\n", float64(dataBytes)/1000/1000/elapsed.Seconds()) - - if opts.pprofMemory { - heap, err := os.Create("./heap.prof") - if err != nil { - panic(err) - } - if err := pprof.WriteHeapProfile(heap); err != nil { - panic(err) - } - } -} diff --git a/cmd/development/substation-file/main.go b/cmd/development/substation-file/main.go deleted file mode 100644 index 96ef9052..00000000 --- a/cmd/development/substation-file/main.go +++ /dev/null @@ -1,194 +0,0 @@ -package main - -import ( - "bytes" - "context" - "encoding/json" - "flag" - "fmt" - "io" - "os" - "slices" - "time" - - "golang.org/x/sync/errgroup" - - "github.com/brexhq/substation/v2" - "github.com/brexhq/substation/v2/internal/bufio" - "github.com/brexhq/substation/v2/internal/channel" - "github.com/brexhq/substation/v2/internal/file" - "github.com/brexhq/substation/v2/internal/media" - "github.com/brexhq/substation/v2/message" -) - -type options struct { - File string - Config string -} - -// getConfig contextually retrieves a Substation configuration. -func getConfig(ctx context.Context, cfg string) (io.Reader, error) { - path, err := file.Get(ctx, cfg) - defer os.Remove(path) - - if err != nil { - return nil, err - } - - conf, err := os.Open(path) - if err != nil { - return nil, err - } - defer conf.Close() - - buf := new(bytes.Buffer) - if _, err := io.Copy(buf, conf); err != nil { - return nil, err - } - - return buf, nil -} - -func main() { - var opts options - - timeout := flag.Duration("timeout", 10*time.Second, "Timeout in seconds") - flag.StringVar(&opts.File, "file", "", "File to parse") - flag.StringVar(&opts.Config, "config", "", "Substation configuration file") - flag.Parse() - - ctx, cancel := context.WithTimeout(context.Background(), *timeout) - defer cancel() - - if err := run(ctx, opts); err != nil { - panic(fmt.Errorf("main: %v", err)) - } -} - -func run(ctx context.Context, opts options) error { - c, err := getConfig(ctx, opts.Config) - if err != nil { - return err - } - - cfg := substation.Config{} - if err := json.NewDecoder(c).Decode(&cfg); err != nil { - return err - } - - sub, err := substation.New(ctx, cfg) - if err != nil { - return err - } - - ch := channel.New[*message.Message]() - group, ctx := errgroup.WithContext(ctx) - - group.Go(func() error { - tfGroup, tfCtx := errgroup.WithContext(ctx) - tfGroup.SetLimit(1) - - for message := range ch.Recv() { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - msg := message - tfGroup.Go(func() error { - if _, err := sub.Transform(tfCtx, msg); err != nil { - return err - } - - return nil - }) - } - - if err := tfGroup.Wait(); err != nil { - return err - } - - // CTRL messages flush the pipeline. This must be done - // after all messages have been processed. - ctrl := message.New().AsControl() - if _, err := sub.Transform(tfCtx, ctrl); err != nil { - return err - } - - return nil - }) - - // Data ingest. - group.Go(func() error { - defer ch.Close() - - fi, err := file.Get(ctx, opts.File) - if err != nil { - return err - } - defer os.Remove(fi) - - f, err := os.Open(fi) - if err != nil { - return err - } - defer f.Close() - - mediaType, err := media.File(f) - if err != nil { - return err - } - - if _, err := f.Seek(0, 0); err != nil { - return err - } - - // Unsupported media types are sent as binary data. - if !slices.Contains(bufio.MediaTypes, mediaType) { - r, err := io.ReadAll(f) - if err != nil { - return err - } - - msg := message.New().SetData(r).SkipMissingValues() - ch.Send(msg) - - return nil - } - - scanner := bufio.NewScanner() - defer scanner.Close() - - if err := scanner.ReadFile(f); err != nil { - return err - } - - for scanner.Scan() { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - b := []byte(scanner.Text()) - msg := message.New().SetData(b).SkipMissingValues() - - ch.Send(msg) - } - - if err := scanner.Err(); err != nil { - return err - } - - return nil - }) - - // Wait for all goroutines to complete. This includes the goroutines that are - // executing the transform functions. - if err := group.Wait(); err != nil { - return err - } - - return nil -} From c6816744e12aa1b7b38b6e1c5d0cee11c5b323b0 Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sun, 27 Oct 2024 09:51:11 -0700 Subject: [PATCH 07/12] docs: Update Jsonnet Imports --- examples/condition/meta/config.jsonnet | 2 +- examples/condition/number/config.jsonnet | 2 +- examples/condition/string/config.jsonnet | 2 +- .../transform/aggregate/sample/config.jsonnet | 2 +- .../aggregate/summarize/config.jsonnet | 2 +- .../transform/array/extend/config.jsonnet | 2 +- .../transform/array/flatten/config.jsonnet | 2 +- .../array/flatten_deep/config.jsonnet | 2 +- examples/transform/array/group/config.jsonnet | 2 +- .../enrich/http_secret/config.jsonnet | 2 +- .../enrich/kvstore_csv/config.jsonnet | 2 +- .../enrich/kvstore_json/config.jsonnet | 2 +- .../enrich/kvstore_set_add/config.jsonnet | 2 +- examples/transform/enrich/mmdb/config.jsonnet | 2 +- .../transform/enrich/urlscan/config.jsonnet | 2 +- examples/transform/format/zip/config.jsonnet | 2 +- .../meta/crash_program/config.jsonnet | 2 +- .../meta/default_value/config.jsonnet | 2 +- .../meta/each_in_array/config.jsonnet | 2 +- .../meta/exactly_once_consumer/config.jsonnet | 2 +- .../meta/exactly_once_producer/config.jsonnet | 2 +- .../meta/exactly_once_system/config.jsonnet | 2 +- .../meta/execution_time/config.jsonnet | 2 +- .../meta/retry_with_backoff/config.jsonnet | 2 +- .../transform/number/clamp/config.jsonnet | 2 +- examples/transform/number/max/config.jsonnet | 2 +- examples/transform/number/min/config.jsonnet | 2 +- .../send/aux_transforms/config.jsonnet | 2 +- .../send/aws_s3_glacier/config.jsonnet | 4 ++-- examples/transform/send/batch/config.jsonnet | 2 +- .../transform/send/datadog/config.jsonnet | 21 ++++++------------- examples/transform/send/splunk/config.jsonnet | 9 ++++---- .../transform/send/sumologic/config.jsonnet | 6 ++++-- .../transform/test/config_test/config.jsonnet | 2 +- .../time/str_conversion/config.jsonnet | 2 +- .../utility/generate_ctrl/config.jsonnet | 2 +- .../utility/message_bytes/config.jsonnet | 2 +- .../utility/message_count/config.jsonnet | 2 +- .../utility/message_freshness/config.jsonnet | 2 +- substation_test.jsonnet | 3 +-- 40 files changed, 52 insertions(+), 61 deletions(-) diff --git a/examples/condition/meta/config.jsonnet b/examples/condition/meta/config.jsonnet index 19028494..f6d8179f 100644 --- a/examples/condition/meta/config.jsonnet +++ b/examples/condition/meta/config.jsonnet @@ -1,7 +1,7 @@ // This example determines if all values in an array are email addresses // that have the DNS domain "brex.com". This technique can be used to // validate or summarize values in an array. -local sub = import '../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/condition/number/config.jsonnet b/examples/condition/number/config.jsonnet index cbbff57a..f3618437 100644 --- a/examples/condition/number/config.jsonnet +++ b/examples/condition/number/config.jsonnet @@ -1,5 +1,5 @@ // This example shows usage of the 'number.equal_to' and 'number.greater_than' conditions. -local sub = import '../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/condition/string/config.jsonnet b/examples/condition/string/config.jsonnet index 6b7785da..a1237316 100644 --- a/examples/condition/string/config.jsonnet +++ b/examples/condition/string/config.jsonnet @@ -1,5 +1,5 @@ // This example shows usage of the 'string.equal_to' and 'string.greater_than' conditions. -local sub = import '../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/aggregate/sample/config.jsonnet b/examples/transform/aggregate/sample/config.jsonnet index 0904be2e..e1a652cb 100644 --- a/examples/transform/aggregate/sample/config.jsonnet +++ b/examples/transform/aggregate/sample/config.jsonnet @@ -1,7 +1,7 @@ // This example samples data by aggregating events into an array, then // selecting the first event in the array as a sample. The sampling rate // is 1/N, where N is the count of events in the buffer. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/aggregate/summarize/config.jsonnet b/examples/transform/aggregate/summarize/config.jsonnet index 21c03040..8274ad99 100644 --- a/examples/transform/aggregate/summarize/config.jsonnet +++ b/examples/transform/aggregate/summarize/config.jsonnet @@ -1,7 +1,7 @@ // This example reduces data by summarizing multiple network events into a single event, // simulating the behavior of flow records. This technique can be used to reduce // any JSON data that contains common fields, not just network events. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/array/extend/config.jsonnet b/examples/transform/array/extend/config.jsonnet index 22f65db4..a93a3adc 100644 --- a/examples/transform/array/extend/config.jsonnet +++ b/examples/transform/array/extend/config.jsonnet @@ -1,5 +1,5 @@ // This example extends an array by appending and flattening values. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/array/flatten/config.jsonnet b/examples/transform/array/flatten/config.jsonnet index b7ffb6e6..7a1c8fe4 100644 --- a/examples/transform/array/flatten/config.jsonnet +++ b/examples/transform/array/flatten/config.jsonnet @@ -1,5 +1,5 @@ // This example flattens an array of arrays. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/array/flatten_deep/config.jsonnet b/examples/transform/array/flatten_deep/config.jsonnet index 82bfe7d2..c5e3398f 100644 --- a/examples/transform/array/flatten_deep/config.jsonnet +++ b/examples/transform/array/flatten_deep/config.jsonnet @@ -1,5 +1,5 @@ // This example flattens an array of arrays. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/array/group/config.jsonnet b/examples/transform/array/group/config.jsonnet index 007684d3..4b09a8ae 100644 --- a/examples/transform/array/group/config.jsonnet +++ b/examples/transform/array/group/config.jsonnet @@ -1,6 +1,6 @@ // This example groups an array of arrays into an array of objects // based on index and configured keys. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); local files_key = 'meta files'; diff --git a/examples/transform/enrich/http_secret/config.jsonnet b/examples/transform/enrich/http_secret/config.jsonnet index f8995e9b..dc4815d4 100644 --- a/examples/transform/enrich/http_secret/config.jsonnet +++ b/examples/transform/enrich/http_secret/config.jsonnet @@ -3,7 +3,7 @@ // // Test this example using the substation CLI: // SUBSTATION_EXAMPLE_URL=https://www.gutenberg.org/files/2701/old/moby10b.txt substation test config.jsonnet -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // The secret is retrieved from the environment variable named // `SUBSTATION_EXAMPLE_URL` and referenced in subsequent transforms using diff --git a/examples/transform/enrich/kvstore_csv/config.jsonnet b/examples/transform/enrich/kvstore_csv/config.jsonnet index 47153d46..275ce5cd 100644 --- a/examples/transform/enrich/kvstore_csv/config.jsonnet +++ b/examples/transform/enrich/kvstore_csv/config.jsonnet @@ -1,6 +1,6 @@ // This example shows how to use the `enrich_kv_store_item_get` transform // to lookup data in a KV store backed by a CSV file. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // This CSV file must be local to the Substation app. Absolute paths are // recommended. Files accessible over HTTPS and hosted in AWS S3 also work. diff --git a/examples/transform/enrich/kvstore_json/config.jsonnet b/examples/transform/enrich/kvstore_json/config.jsonnet index bb56d207..d7225715 100644 --- a/examples/transform/enrich/kvstore_json/config.jsonnet +++ b/examples/transform/enrich/kvstore_json/config.jsonnet @@ -1,6 +1,6 @@ // This example shows how to use the `enrich_kv_store_item_get` transform // to lookup data in a KV store backed by a JSON file. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // This JSON file must be local to the Substation app. Absolute paths are // recommended. Files accessible over HTTPS and hosted in AWS S3 also work. diff --git a/examples/transform/enrich/kvstore_set_add/config.jsonnet b/examples/transform/enrich/kvstore_set_add/config.jsonnet index 9d7452fb..2592b9e0 100644 --- a/examples/transform/enrich/kvstore_set_add/config.jsonnet +++ b/examples/transform/enrich/kvstore_set_add/config.jsonnet @@ -1,7 +1,7 @@ // This example shows how to use the `enrich_kv_store_set_add` transform // to track data over time in a KV store. The sample data contains food // orders and is indexed by each customer's email address. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // Default Memory store is used. local mem = sub.kv_store.memory(); diff --git a/examples/transform/enrich/mmdb/config.jsonnet b/examples/transform/enrich/mmdb/config.jsonnet index d3ce76b6..4ce6ee3a 100644 --- a/examples/transform/enrich/mmdb/config.jsonnet +++ b/examples/transform/enrich/mmdb/config.jsonnet @@ -1,4 +1,4 @@ -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); local asn = sub.kv_store.mmdb({ file: 'https://gist.github.com/jshlbrd/59641ccc71ba2873fb204ac44d101640/raw/3ad0e8c09563c614c50de4671caef8c1983cbb4d/GeoLite2-ASN.mmdb' }); diff --git a/examples/transform/enrich/urlscan/config.jsonnet b/examples/transform/enrich/urlscan/config.jsonnet index 0aa8bd9b..42760744 100644 --- a/examples/transform/enrich/urlscan/config.jsonnet +++ b/examples/transform/enrich/urlscan/config.jsonnet @@ -3,7 +3,7 @@ // // Test this example using the substation CLI: // URLSCAN_API_KEY=xx substation test config.jsonnet -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); local headers = { 'API-Key': '${SECRET:URLSCAN}', 'Content-Type': 'application/json' }; diff --git a/examples/transform/format/zip/config.jsonnet b/examples/transform/format/zip/config.jsonnet index 172549db..2d5590c3 100644 --- a/examples/transform/format/zip/config.jsonnet +++ b/examples/transform/format/zip/config.jsonnet @@ -2,7 +2,7 @@ // Add the two data files in this directory to a Zip file and send it to // Substation. You can use this command to create the Zip file: // zip data.zip data.jsonl data.csv -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/meta/crash_program/config.jsonnet b/examples/transform/meta/crash_program/config.jsonnet index 8df3a7f7..a3011794 100644 --- a/examples/transform/meta/crash_program/config.jsonnet +++ b/examples/transform/meta/crash_program/config.jsonnet @@ -1,7 +1,7 @@ // This example shows how to intentionally crash a program if a transform // does not produce an output. This technique can be used to provide strict // guarantees about the result of data transformations. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // `key` is the target of the transform that may not produce an output and is // checked to determine if the transform was successful. diff --git a/examples/transform/meta/default_value/config.jsonnet b/examples/transform/meta/default_value/config.jsonnet index 4eeda314..3a7998c2 100644 --- a/examples/transform/meta/default_value/config.jsonnet +++ b/examples/transform/meta/default_value/config.jsonnet @@ -1,4 +1,4 @@ -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/meta/each_in_array/config.jsonnet b/examples/transform/meta/each_in_array/config.jsonnet index db4333b0..acf93916 100644 --- a/examples/transform/meta/each_in_array/config.jsonnet +++ b/examples/transform/meta/each_in_array/config.jsonnet @@ -1,7 +1,7 @@ // This example shows how to use the `meta.for_each` transform to // modify objects in an array. In this example, keys are removed // and added to each object in the array. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/meta/exactly_once_consumer/config.jsonnet b/examples/transform/meta/exactly_once_consumer/config.jsonnet index 6e2f279b..c46be7a9 100644 --- a/examples/transform/meta/exactly_once_consumer/config.jsonnet +++ b/examples/transform/meta/exactly_once_consumer/config.jsonnet @@ -1,6 +1,6 @@ // This example shows how to use the `meta_kv_store_lock` transform to // create an "exactly once" semantic for a pipeline consumer. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // In production environments a distributed KV store should be used. local kv = sub.kv_store.memory(); diff --git a/examples/transform/meta/exactly_once_producer/config.jsonnet b/examples/transform/meta/exactly_once_producer/config.jsonnet index 01051b0f..dc0c3086 100644 --- a/examples/transform/meta/exactly_once_producer/config.jsonnet +++ b/examples/transform/meta/exactly_once_producer/config.jsonnet @@ -1,6 +1,6 @@ // This example shows how to use the `meta_kv_store_lock` transform to // create an "exactly once" semantic for a pipeline producer. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // In production environments a distributed KV store should be used. local kv = sub.kv_store.memory(); diff --git a/examples/transform/meta/exactly_once_system/config.jsonnet b/examples/transform/meta/exactly_once_system/config.jsonnet index 2757aa3c..3594a38d 100644 --- a/examples/transform/meta/exactly_once_system/config.jsonnet +++ b/examples/transform/meta/exactly_once_system/config.jsonnet @@ -1,6 +1,6 @@ // This example shows how to use the `meta_kv_store_lock` transform to // create an "exactly once" semantic for an entire pipeline system. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // In production environments a distributed KV store should be used. local kv = sub.kv_store.memory(); diff --git a/examples/transform/meta/execution_time/config.jsonnet b/examples/transform/meta/execution_time/config.jsonnet index 69f6c67f..50b467db 100644 --- a/examples/transform/meta/execution_time/config.jsonnet +++ b/examples/transform/meta/execution_time/config.jsonnet @@ -1,6 +1,6 @@ // This example shows how to use the `meta_metric_duration` transform to // measure the execution time of other transforms. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); local attr = { AppName: 'example' }; local dest = { type: 'aws_cloudwatch_embedded_metrics' }; diff --git a/examples/transform/meta/retry_with_backoff/config.jsonnet b/examples/transform/meta/retry_with_backoff/config.jsonnet index d9635cb7..e8c09034 100644 --- a/examples/transform/meta/retry_with_backoff/config.jsonnet +++ b/examples/transform/meta/retry_with_backoff/config.jsonnet @@ -1,7 +1,7 @@ // This example shows how to implement retry with backoff behavior for any // transform that does not produce an output. This technique may be useful // when enriching data with external services or asynchronous data pipelines. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // `key` is the target of the transform that may not produce an output and is // checked to determine if the transform was successful. diff --git a/examples/transform/number/clamp/config.jsonnet b/examples/transform/number/clamp/config.jsonnet index 72956274..66068335 100644 --- a/examples/transform/number/clamp/config.jsonnet +++ b/examples/transform/number/clamp/config.jsonnet @@ -1,5 +1,5 @@ // This example shows how to clamp a number to a range. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/number/max/config.jsonnet b/examples/transform/number/max/config.jsonnet index 6e906b54..026484db 100644 --- a/examples/transform/number/max/config.jsonnet +++ b/examples/transform/number/max/config.jsonnet @@ -1,6 +1,6 @@ // This example uses the `number_maximum` transform to return the larger // of two values, where one value is a constant and the other is a message. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/number/min/config.jsonnet b/examples/transform/number/min/config.jsonnet index b5ae1c22..e9ccb353 100644 --- a/examples/transform/number/min/config.jsonnet +++ b/examples/transform/number/min/config.jsonnet @@ -1,6 +1,6 @@ // This example uses the `number_minimum` transform to return the smaller // of two values, where one value is a constant and the other is a message. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/send/aux_transforms/config.jsonnet b/examples/transform/send/aux_transforms/config.jsonnet index bc642b6d..943231f9 100644 --- a/examples/transform/send/aux_transforms/config.jsonnet +++ b/examples/transform/send/aux_transforms/config.jsonnet @@ -2,7 +2,7 @@ // are executed after the data is buffered and before it is sent. The // transforms applied inside of the send transform do not affect the data // sent through the main pipeline. All send transforms use this behavior. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/send/aws_s3_glacier/config.jsonnet b/examples/transform/send/aws_s3_glacier/config.jsonnet index 1f5c3750..0a8ddaf7 100644 --- a/examples/transform/send/aws_s3_glacier/config.jsonnet +++ b/examples/transform/send/aws_s3_glacier/config.jsonnet @@ -2,7 +2,7 @@ // The Glacier Instant Retrieval class is recommended for archival data that is // compatible with Substation's serverless architecture; this class can be read // directly by a Lambda function triggered by an SNS notification. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { transforms: [ @@ -11,7 +11,7 @@ local sub = import '../../../../substation.libsonnet'; // the other values are set to impossibly high values to ensure all events are // written to the same file. batch: { size: 128 * 1000, count: 1000 * 1000, duration: '60m' }, - bucket_name: 'substation', + aws: { arn: 'arn:aws:s3:::substation-bucket' }, // Replace with your S3 bucket ARN. storage_class: 'GLACIER_IR', // Glacier Instant Retrieval. // S3 objects are organized by time to the nearest hour and have a UUID filename. file_path: { time_format: '2006/01/02/15', uuid: true, suffix: '.jsonl.gz' }, diff --git a/examples/transform/send/batch/config.jsonnet b/examples/transform/send/batch/config.jsonnet index ec6a5583..54a09094 100644 --- a/examples/transform/send/batch/config.jsonnet +++ b/examples/transform/send/batch/config.jsonnet @@ -1,7 +1,7 @@ // This example configures send transforms with batch keys to organize // data before it is sent externally. Every send transform supports batching // and optionally grouping JSON objects by a value derived from the object. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/send/datadog/config.jsonnet b/examples/transform/send/datadog/config.jsonnet index cd956ec1..73630ffc 100644 --- a/examples/transform/send/datadog/config.jsonnet +++ b/examples/transform/send/datadog/config.jsonnet @@ -3,7 +3,7 @@ // // More information about the Datadog Logs API can be found here: // https://docs.datadoghq.com/api/latest/logs/#send-logs -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // Datadog has a strict limit of 5MB per payload. Any individual event // larger than 1MB will be truncated on ingest. @@ -22,20 +22,11 @@ local max_count = 1000; sub.tf.agg.to.array({ object: { target_key: 'message' } }), ], url: 'https://http-intake.logs.datadoghq.com/api/v2/logs', - headers: [ - { - key: 'DD-API-KEY', - value: '${SECRET:DD}', - }, - { - key: 'ddsource', - value: 'my-source', - }, - { - key: 'service', - value: 'my-service', - }, - ], + headers: { + 'DD-API-KEY': '${SECRET:DD}', + ddsource: 'my-source', + service: 'my-service', + }, }), ], } diff --git a/examples/transform/send/splunk/config.jsonnet b/examples/transform/send/splunk/config.jsonnet index 5bb7bf65..d2460190 100644 --- a/examples/transform/send/splunk/config.jsonnet +++ b/examples/transform/send/splunk/config.jsonnet @@ -5,7 +5,7 @@ // // More information about the Splunk HEC can be found here: // https://docs.splunk.com/Documentation/SplunkCloud/latest/Data/HECExamples -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // By default the Splunk HEC limits the size of each request to 1MB. local max_size = 1000 * 1000; @@ -21,10 +21,9 @@ local max_size = 1000 * 1000; sub.tf.array.join({ separator: '' }), ], url: 'https://my-instance.cloud.splunk.com:8088/services/collector', - headers: [{ - key: 'Authorization', - value: 'Splunk ${SECRET:SPLUNK}', - }], + headers: { + Authorization: 'Splunk ${SECRET:SPLUNK}', + }, }), ], } diff --git a/examples/transform/send/sumologic/config.jsonnet b/examples/transform/send/sumologic/config.jsonnet index 2e030a9c..03b9fcc7 100644 --- a/examples/transform/send/sumologic/config.jsonnet +++ b/examples/transform/send/sumologic/config.jsonnet @@ -3,7 +3,7 @@ // // More information about Sumo Logic HTTP upload can be found here: // https://help.sumologic.com/docs/send-data/hosted-collectors/http-source/logs-metrics/upload-logs/ -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); // Sumo Logic has a strict limit of 1MB per request. local max_size = 1000 * 1000; @@ -19,7 +19,9 @@ local max_size = 1000 * 1000; // There is no authentication, so the URL should be treated like a secret. url: 'https://endpoint6.collection.us2.sumologic.com/receiver/v1/http/xxxxxxxxxx', // You can override the default source category associated with the URL. - // headers: [{key: 'X-Sumo-Category', value: 'testing/substation'}] + headers: { + 'X-Sumo-Category': 'testing/substation', + }, }), ], } diff --git a/examples/transform/test/config_test/config.jsonnet b/examples/transform/test/config_test/config.jsonnet index 7fa7eb7d..1f5d024d 100644 --- a/examples/transform/test/config_test/config.jsonnet +++ b/examples/transform/test/config_test/config.jsonnet @@ -1,4 +1,4 @@ -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/time/str_conversion/config.jsonnet b/examples/transform/time/str_conversion/config.jsonnet index 5ab554d1..0741ed3a 100644 --- a/examples/transform/time/str_conversion/config.jsonnet +++ b/examples/transform/time/str_conversion/config.jsonnet @@ -1,5 +1,5 @@ // This example shows how to convert time values between string formats. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/utility/generate_ctrl/config.jsonnet b/examples/transform/utility/generate_ctrl/config.jsonnet index 74be6e73..e4c35541 100644 --- a/examples/transform/utility/generate_ctrl/config.jsonnet +++ b/examples/transform/utility/generate_ctrl/config.jsonnet @@ -2,7 +2,7 @@ // generate a control (ctrl) Message based on the amount of data Messages // received by the system. ctrl Messages overrides the settings of the // `aggregate_to_array` transform (and any other transform that supports). -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); { tests: [ diff --git a/examples/transform/utility/message_bytes/config.jsonnet b/examples/transform/utility/message_bytes/config.jsonnet index fe586351..95485645 100644 --- a/examples/transform/utility/message_bytes/config.jsonnet +++ b/examples/transform/utility/message_bytes/config.jsonnet @@ -1,6 +1,6 @@ // This example shows how to use the `utility_metric_bytes` transform to // sum the amount of data received and transformed by Substation. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); local attr = { AppName: 'example' }; local dest = { type: 'aws_cloudwatch_embedded_metrics' }; diff --git a/examples/transform/utility/message_count/config.jsonnet b/examples/transform/utility/message_count/config.jsonnet index 1bc6fc04..6287eb26 100644 --- a/examples/transform/utility/message_count/config.jsonnet +++ b/examples/transform/utility/message_count/config.jsonnet @@ -1,6 +1,6 @@ // This example shows how to use the `utility_metric_count` transform to // count the number of messages received and transformed by Substation. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); local attr = { AppName: 'example' }; local dest = { type: 'aws_cloudwatch_embedded_metrics' }; diff --git a/examples/transform/utility/message_freshness/config.jsonnet b/examples/transform/utility/message_freshness/config.jsonnet index ca1a0c00..0d4642e3 100644 --- a/examples/transform/utility/message_freshness/config.jsonnet +++ b/examples/transform/utility/message_freshness/config.jsonnet @@ -8,7 +8,7 @@ // // The transform emits two metrics that describe success and failure, annotated // in the `FreshnessType` attribute. -local sub = import '../../../../substation.libsonnet'; +local sub = std.extVar('sub'); local attr = { AppName: 'example' }; local dest = { type: 'aws_cloudwatch_embedded_metrics' }; diff --git a/substation_test.jsonnet b/substation_test.jsonnet index a13766e4..eb8dba1c 100644 --- a/substation_test.jsonnet +++ b/substation_test.jsonnet @@ -1,7 +1,6 @@ -local sub = import 'substation.libsonnet'; +local sub = std.extVar('sub'); local src = 'source'; -local trg = 'target'; { condition: { From 255c6d07093af02bb166c98bb333b20d77b98599 Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sun, 27 Oct 2024 09:58:11 -0700 Subject: [PATCH 08/12] ci: Add Jsonnet Script --- .github/workflows/code.yml | 2 +- .github/workflows/code_jsonnet.sh | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/code_jsonnet.sh diff --git a/.github/workflows/code.yml b/.github/workflows/code.yml index e2fe4b43..7fde02d5 100644 --- a/.github/workflows/code.yml +++ b/.github/workflows/code.yml @@ -64,4 +64,4 @@ jobs: - name: Compiling run: | go install github.com/google/go-jsonnet/cmd/jsonnet@latest - sh build/scripts/config/compile.sh + sh .github/workflows/code_jsonnet.sh diff --git a/.github/workflows/code_jsonnet.sh b/.github/workflows/code_jsonnet.sh new file mode 100644 index 00000000..e09a9232 --- /dev/null +++ b/.github/workflows/code_jsonnet.sh @@ -0,0 +1,9 @@ +#!/bin/sh +files=$(find . -name "*.jsonnet") + +for file in $files +do + # 'rev | cut | rev' converts "path/to/file.jsonnet" to "path/to/file.json" + f=$(echo $file | rev | cut -c 4- | rev) + jsonnet $file > $f +done From 9571d36cb622a408d4bd63235966d05a35589024 Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sun, 27 Oct 2024 10:01:42 -0700 Subject: [PATCH 09/12] ci: Update Jsonnet ext-code-file --- .github/workflows/code_jsonnet.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/code_jsonnet.sh b/.github/workflows/code_jsonnet.sh index e09a9232..5655ea2d 100644 --- a/.github/workflows/code_jsonnet.sh +++ b/.github/workflows/code_jsonnet.sh @@ -5,5 +5,6 @@ for file in $files do # 'rev | cut | rev' converts "path/to/file.jsonnet" to "path/to/file.json" f=$(echo $file | rev | cut -c 4- | rev) - jsonnet $file > $f + # This is run from the root of the repo. + jsonnet --ext-code-file sub="./substation.libsonnet" $file > $f done From 9b8350bb9a2ef802f36700dcf7aa3bbfcf5f9e8f Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sun, 27 Oct 2024 10:25:32 -0700 Subject: [PATCH 10/12] docs(cmd): Remove Ref to Dev Apps --- cmd/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/README.md b/cmd/README.md index 0a29d514..b880a2d1 100644 --- a/cmd/README.md +++ b/cmd/README.md @@ -1,11 +1,11 @@ # cmd -This directory contains applications that run Substation. Applications are organized by their deployment target (e.g., AWS Lambda) and the source of the data they process (e.g., file, http). +This directory contains applications that run Substation. ## aws/lambda/ Applications that run on AWS Lambda. -## development/ +## substation/ -Applications that are used for development. +The Substation CLI tool, which is used for managing configurations and local development. From 90bb6dd710a2476a7d34d06f5438c43feaf78c42 Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sun, 27 Oct 2024 10:25:44 -0700 Subject: [PATCH 11/12] style(cmd): No Sort Flags --- cmd/substation/build.go | 2 ++ cmd/substation/fmt.go | 2 ++ cmd/substation/tap.go | 4 +++- cmd/substation/test.go | 2 ++ cmd/substation/vet.go | 2 ++ 5 files changed, 11 insertions(+), 1 deletion(-) diff --git a/cmd/substation/build.go b/cmd/substation/build.go index a067bc42..407d7e01 100644 --- a/cmd/substation/build.go +++ b/cmd/substation/build.go @@ -12,6 +12,8 @@ func init() { rootCmd.AddCommand(buildCmd) buildCmd.PersistentFlags().BoolP("recursive", "R", false, "recursively build all files") buildCmd.PersistentFlags().StringToString("ext-str", nil, "set external variables") + buildCmd.Flags().SortFlags = false + buildCmd.PersistentFlags().SortFlags = false } var buildCmd = &cobra.Command{ diff --git a/cmd/substation/fmt.go b/cmd/substation/fmt.go index 7f6dffe9..24b9a04b 100644 --- a/cmd/substation/fmt.go +++ b/cmd/substation/fmt.go @@ -13,6 +13,8 @@ func init() { rootCmd.AddCommand(fmtCmd) fmtCmd.PersistentFlags().BoolP("write", "w", false, "write result to (source) file instead of stdout") fmtCmd.PersistentFlags().BoolP("recursive", "R", false, "recursively format all files") + fmtCmd.Flags().SortFlags = false + fmtCmd.PersistentFlags().SortFlags = false } var fmtCmd = &cobra.Command{ diff --git a/cmd/substation/tap.go b/cmd/substation/tap.go index 5c7d47b5..2a00a7d8 100644 --- a/cmd/substation/tap.go +++ b/cmd/substation/tap.go @@ -27,9 +27,11 @@ import ( func init() { rootCmd.AddCommand(tapCmd) - tapCmd.PersistentFlags().String("offset", "latest", "the offset to read from (earliest, latest)") tapCmd.PersistentFlags().String("aws-kinesis-data-stream", "", "ARN of the Kinesis Data Stream to tap") + tapCmd.PersistentFlags().String("offset", "latest", "the offset to read from (earliest, latest)") tapCmd.PersistentFlags().StringToString("ext-str", nil, "set external variables") + tapCmd.Flags().SortFlags = false + tapCmd.PersistentFlags().SortFlags = false } var tapCmd = &cobra.Command{ diff --git a/cmd/substation/test.go b/cmd/substation/test.go index 6f68e8ef..7a47477c 100644 --- a/cmd/substation/test.go +++ b/cmd/substation/test.go @@ -32,6 +32,8 @@ func init() { rootCmd.AddCommand(testCmd) testCmd.PersistentFlags().BoolP("recursive", "R", false, "recursively test all files") testCmd.PersistentFlags().StringToString("ext-str", nil, "set external variables") + testCmd.Flags().SortFlags = false + testCmd.PersistentFlags().SortFlags = false } func fiConfig(f string) (customConfig, error) { diff --git a/cmd/substation/vet.go b/cmd/substation/vet.go index 86d2ff7c..0f5a6969 100644 --- a/cmd/substation/vet.go +++ b/cmd/substation/vet.go @@ -16,6 +16,8 @@ func init() { rootCmd.AddCommand(vetCmd) vetCmd.PersistentFlags().BoolP("recursive", "R", false, "recursively vet all files") vetCmd.PersistentFlags().StringToString("ext-str", nil, "set external variables") + vetCmd.Flags().SortFlags = false + vetCmd.PersistentFlags().SortFlags = false } // vetTransformRe captures the transform ID from a Substation error message. From 12da7678c1049bf1e64583b18afb46aec5b76955 Mon Sep 17 00:00:00 2001 From: jshlbrd Date: Sun, 27 Oct 2024 19:42:37 -0700 Subject: [PATCH 12/12] style(cmd): Update CLI --- cmd/substation/main.go | 3 ++- cmd/substation/playground.go | 6 +++--- cmd/substation/playground.tmpl | 13 ++++++------- cmd/substation/tap.go | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cmd/substation/main.go b/cmd/substation/main.go index acb4456d..61fc96ec 100644 --- a/cmd/substation/main.go +++ b/cmd/substation/main.go @@ -30,7 +30,8 @@ const ( }` // confDemo is a demo configuration for AWS CloudTrail. - confDemo = `local sub = std.extVar('sub'); + confDemo = `// Every config must import the Substation library. +local sub = std.extVar('sub'); { transforms: [ diff --git a/cmd/substation/playground.go b/cmd/substation/playground.go index eaa7cf46..77c58176 100644 --- a/cmd/substation/playground.go +++ b/cmd/substation/playground.go @@ -34,9 +34,9 @@ func init() { } var playgroundCmd = &cobra.Command{ - Use: "playground", + Use: "play", Short: "start playground", - Long: `'substation playground' starts a local HTTP server for testing Substation configurations.`, + Long: `'substation play' starts a local instance of the Substation playground.`, RunE: runPlayground, } @@ -124,7 +124,7 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { // If shared data is present, don't include environment variables if sharedData == "" { - data.DefaultEnv = "# Add environment variables here, one per line\n# Example: KEY=VALUE" + data.DefaultEnv = "# Example: KEY=VALUE" } tmpl := template.Must(template.New("index").Parse(playgroundHTML)) diff --git a/cmd/substation/playground.tmpl b/cmd/substation/playground.tmpl index 0d78ae7a..c1e6e3f6 100644 --- a/cmd/substation/playground.tmpl +++ b/cmd/substation/playground.tmpl @@ -318,8 +318,7 @@

- Run your configuration, test it, or try a demo. - View examples + Run, test, or share a config. New to Substation? Try the demo to see how it works. Need inspiration? View more examples here.

@@ -330,7 +329,7 @@

Configuration

-

Configure the transformations to be applied to the input event.

+

Configure the transforms.

@@ -342,7 +341,7 @@ -

Paste the message data to be processed by Substation here.

+

Add message data.

@@ -353,7 +352,7 @@
-

The processed message data will appear here after running.

+

See the results.

@@ -366,7 +365,7 @@

Environment Variables

× -

Add environment variables here, one per line (KEY=VALUE). These will not be included when sharing.

+

Add environment variables, one per line (KEY=VALUE). These are not included when sharing the config.