From de746ae3fe3682c4d022d2c56bf06329c576cad2 Mon Sep 17 00:00:00 2001 From: juniorrhis1 Date: Fri, 1 Mar 2024 10:15:05 -0300 Subject: [PATCH] chore: Add streaming suport in S3 and Azure Blob (#3) * chore: Add streaming suport in S3 and Azure Blob * chore: Update docs * chore: Remove code * fix: Treat error * chore: Enable the exclusion of all metadata --- internal/impl/aws/output_s3.go | 38 ++++++++++++- internal/impl/azure/auth.go | 9 ++- internal/impl/azure/output_blob_storage.go | 57 +++++++++++++++---- internal/metadata/exclude_filter.go | 3 +- .../components/inputs/azure_blob_storage.md | 9 +++ .../components/inputs/azure_queue_storage.md | 6 ++ .../components/inputs/azure_table_storage.md | 9 +++ website/docs/components/outputs/aws_s3.md | 17 ++++++ .../components/outputs/azure_blob_storage.md | 26 +++++++++ .../components/outputs/azure_queue_storage.md | 9 +++ .../components/outputs/azure_table_storage.md | 9 +++ .../docs/components/outputs/elasticsearch.md | 25 ++++++++ 12 files changed, 199 insertions(+), 18 deletions(-) diff --git a/internal/impl/aws/output_s3.go b/internal/impl/aws/output_s3.go index 183b6b0d9c..2195f63f96 100644 --- a/internal/impl/aws/output_s3.go +++ b/internal/impl/aws/output_s3.go @@ -4,7 +4,9 @@ import ( "bytes" "context" "fmt" + "io" "net/url" + "os" "sort" "strings" "time" @@ -25,6 +27,7 @@ const ( s3oFieldBucket = "bucket" s3oFieldForcePathStyleURLs = "force_path_style_urls" s3oFieldPath = "path" + s3oFieldLocalFilePath = "local_file_path" s3oFieldTags = "tags" s3oFieldContentType = "content_type" s3oFieldContentEncoding = "content_encoding" @@ -49,6 +52,7 @@ type s3oConfig struct { Bucket string Path *service.InterpolatedString + LocalFilePath *service.InterpolatedString Tags []s3TagPair ContentType *service.InterpolatedString ContentEncoding *service.InterpolatedString @@ -79,6 +83,10 @@ func s3oConfigFromParsed(pConf *service.ParsedConfig) (conf s3oConfig, err error return } + if conf.LocalFilePath, err = pConf.FieldInterpolatedString(s3oFieldLocalFilePath); err != nil { + return + } + var tagMap map[string]*service.InterpolatedString if tagMap, err = pConf.FieldInterpolatedStringMap(s3oFieldTags); err != nil { return @@ -208,6 +216,10 @@ output: Example(`${!count("files")}-${!timestamp_unix_nano()}.txt`). Example(`${!meta("kafka_key")}.json`). Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`), + service.NewInterpolatedStringField(s3oFieldLocalFilePath). + Description("The path of the local file to upload."). + Default(``). + Example(`/tmp/file.json`), service.NewInterpolatedStringMapField(s3oFieldTags). Description("Key/value pairs to store with the object as tags."). Default(map[string]any{}). @@ -381,7 +393,7 @@ func (a *amazonS3Writer) WriteBatch(wctx context.Context, msg service.MessageBat return fmt.Errorf("storage class interpolation: %w", err) } - mBytes, err := m.AsBytes() + uploadBody, err := a.getUploadBody(m) if err != nil { return err } @@ -389,7 +401,7 @@ func (a *amazonS3Writer) WriteBatch(wctx context.Context, msg service.MessageBat uploadInput := &s3manager.UploadInput{ Bucket: &a.conf.Bucket, Key: aws.String(key), - Body: bytes.NewReader(mBytes), + Body: uploadBody, ContentType: aws.String(contentType), ContentEncoding: contentEncoding, CacheControl: cacheControl, @@ -432,6 +444,28 @@ func (a *amazonS3Writer) WriteBatch(wctx context.Context, msg service.MessageBat }) } +func (a *amazonS3Writer) getUploadBody(m *service.Message) (io.Reader, error) { + localFilePath, err := a.conf.LocalFilePath.TryString(m) + if err != nil { + return nil, fmt.Errorf("local file path interpolation error: %w", err) + } + + if localFilePath != "" { + file, err := os.Open(localFilePath) + if err != nil { + return nil, fmt.Errorf("local file read error: %w", err) + } + return file, nil + } + + mBytes, err := m.AsBytes() + if err != nil { + return nil, err + } + + return bytes.NewReader(mBytes), nil +} + func (a *amazonS3Writer) Close(context.Context) error { return nil } diff --git a/internal/impl/azure/auth.go b/internal/impl/azure/auth.go index 388b14df7c..9741482438 100644 --- a/internal/impl/azure/auth.go +++ b/internal/impl/azure/auth.go @@ -31,14 +31,17 @@ func azureComponentSpec(forBlobStorage bool) *service.ConfigSpec { Default(""), service.NewStringField(bscFieldStorageAccessKey). Description("The storage account access key. This field is ignored if `"+bscFieldStorageConnectionString+"` is set."). - Default(""), + Default(""). + Secret(), service.NewStringField(bscFieldStorageConnectionString). Description("A storage account connection string. This field is required if `"+bscFieldStorageAccount+"` and `"+bscFieldStorageAccessKey+"` / `"+bscFieldStorageSASToken+"` are not set."). - Default(""), + Default(""). + Secret(), ) spec = spec.Field(service.NewStringField(bscFieldStorageSASToken). Description("The storage account SAS token. This field is ignored if `" + bscFieldStorageConnectionString + "` or `" + bscFieldStorageAccessKey + "` are set."). - Default("")). + Default(""). + Secret()). LintRule(`root = if this.storage_connection_string != "" && !this.storage_connection_string.contains("AccountName=") && !this.storage_connection_string.contains("UseDevelopmentStorage=true;") && this.storage_account == "" { [ "storage_account must be set if storage_connection_string does not contain the \"AccountName\" parameter" ] }`) return spec } diff --git a/internal/impl/azure/output_blob_storage.go b/internal/impl/azure/output_blob_storage.go index d9ec9547bf..aa68c68ae0 100644 --- a/internal/impl/azure/output_blob_storage.go +++ b/internal/impl/azure/output_blob_storage.go @@ -5,6 +5,8 @@ import ( "context" "errors" "fmt" + "io" + "os" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" @@ -19,6 +21,7 @@ const ( // Blob Storage Output Fields bsoFieldContainer = "container" bsoFieldPath = "path" + bsoFieldLocalFilePath = "local_file_path" bsoFieldBlobType = "blob_type" bsoFieldPublicAccessLevel = "public_access_level" ) @@ -27,6 +30,7 @@ type bsoConfig struct { client *azblob.Client Container *service.InterpolatedString Path *service.InterpolatedString + LocalFilePath *service.InterpolatedString BlobType *service.InterpolatedString PublicAccessLevel *service.InterpolatedString } @@ -50,6 +54,9 @@ func bsoConfigFromParsed(pConf *service.ParsedConfig) (conf bsoConfig, err error if conf.Path, err = pConf.FieldInterpolatedString(bsoFieldPath); err != nil { return } + if conf.LocalFilePath, err = pConf.FieldInterpolatedString(bsoFieldLocalFilePath); err != nil { + return + } if conf.BlobType, err = pConf.FieldInterpolatedString(bsoFieldBlobType); err != nil { return } @@ -89,6 +96,10 @@ If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+ Example(`${!meta("kafka_key")}.json`). Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`). Default(`${!count("files")}-${!timestamp_unix_nano()}.txt`), + service.NewInterpolatedStringField(bsoFieldLocalFilePath). + Description("The path of the local file to upload."). + Example(`/tmp/file.json`). + Default(``), service.NewInterpolatedStringEnumField(bsoFieldBlobType, "BLOCK", "APPEND"). Description("Block and Append blobs are comprised of blocks, and each blob can support up to 50,000 blocks. The default value is `+\"`BLOCK`\"+`.`"). Advanced(). @@ -138,12 +149,16 @@ func (a *azureBlobStorageWriter) Connect(ctx context.Context) error { return nil } -func (a *azureBlobStorageWriter) uploadBlob(ctx context.Context, containerName, blobName, blobType string, message []byte) error { +func (a *azureBlobStorageWriter) uploadBlob(ctx context.Context, containerName, blobName, blobType string, msg *service.Message) error { + uploadBody, err := a.getUploadBody(msg) + if err != nil { + return err + } + containerClient := a.conf.client.ServiceClient().NewContainerClient(containerName) - var err error if blobType == "APPEND" { appendBlobClient := containerClient.NewAppendBlobClient(blobName) - _, err = appendBlobClient.AppendBlock(ctx, streaming.NopCloser(bytes.NewReader(message)), nil) + _, err = appendBlobClient.AppendBlock(ctx, streaming.NopCloser(uploadBody), nil) if err != nil { if isErrorCode(err, bloberror.BlobNotFound) { _, err := appendBlobClient.Create(ctx, nil) @@ -152,7 +167,7 @@ func (a *azureBlobStorageWriter) uploadBlob(ctx context.Context, containerName, } // Try to upload the message again now that we created the blob - _, err = appendBlobClient.AppendBlock(ctx, streaming.NopCloser(bytes.NewReader(message)), nil) + _, err = appendBlobClient.AppendBlock(ctx, streaming.NopCloser(uploadBody), nil) if err != nil { return fmt.Errorf("failed retrying to append block to blob: %w", err) } @@ -161,7 +176,7 @@ func (a *azureBlobStorageWriter) uploadBlob(ctx context.Context, containerName, } } } else { - _, err = containerClient.NewBlockBlobClient(blobName).UploadStream(ctx, bytes.NewReader(message), nil) + _, err = containerClient.NewBlockBlobClient(blobName).UploadStream(ctx, uploadBody, nil) if err != nil { return fmt.Errorf("failed to push block to blob: %w", err) } @@ -199,12 +214,7 @@ func (a *azureBlobStorageWriter) Write(ctx context.Context, msg *service.Message return fmt.Errorf("blob type interpolation error: %s", err) } - mBytes, err := msg.AsBytes() - if err != nil { - return err - } - - if err := a.uploadBlob(ctx, containerName, blobName, blobType, mBytes); err != nil { + if err := a.uploadBlob(ctx, containerName, blobName, blobType, msg); err != nil { if isErrorCode(err, bloberror.ContainerNotFound) { var accessLevel string if accessLevel, err = a.conf.PublicAccessLevel.TryString(msg); err != nil { @@ -217,7 +227,7 @@ func (a *azureBlobStorageWriter) Write(ctx context.Context, msg *service.Message } } - if err := a.uploadBlob(ctx, containerName, blobName, blobType, mBytes); err != nil { + if err := a.uploadBlob(ctx, containerName, blobName, blobType, msg); err != nil { return fmt.Errorf("error retrying to upload blob: %s", err) } } else { @@ -227,6 +237,29 @@ func (a *azureBlobStorageWriter) Write(ctx context.Context, msg *service.Message return nil } +func (a *azureBlobStorageWriter) getUploadBody(m *service.Message) (io.ReadSeeker, error) { + localFilePath, err := a.conf.LocalFilePath.TryString(m) + if err != nil { + return nil, fmt.Errorf("local file path interpolation error: %w", err) + } + + if localFilePath != "" { + file, err := os.Open(localFilePath) + if err != nil { + return nil, fmt.Errorf("local file read error: %w", err) + } + + return file, nil + } + + mBytes, err := m.AsBytes() + if err != nil { + return nil, err + } + + return bytes.NewReader(mBytes), nil +} + func (a *azureBlobStorageWriter) Close(context.Context) error { return nil } diff --git a/internal/metadata/exclude_filter.go b/internal/metadata/exclude_filter.go index e8070fe5a5..5aa8bcb6fd 100644 --- a/internal/metadata/exclude_filter.go +++ b/internal/metadata/exclude_filter.go @@ -1,6 +1,7 @@ package metadata import ( + "slices" "strings" "github.com/benthosdev/benthos/v4/internal/docs" @@ -46,7 +47,7 @@ type ExcludeFilter struct { // true otherwise. It also returns true if no filters are configured. func (f *ExcludeFilter) Match(str string) bool { for _, prefix := range f.excludePrefixes { - if strings.HasPrefix(str, prefix) { + if strings.HasPrefix(str, prefix) || slices.Contains(f.excludePrefixes, "*") { return false } } diff --git a/website/docs/components/inputs/azure_blob_storage.md b/website/docs/components/inputs/azure_blob_storage.md index 0803d2b7e8..48f50e13d5 100644 --- a/website/docs/components/inputs/azure_blob_storage.md +++ b/website/docs/components/inputs/azure_blob_storage.md @@ -110,6 +110,9 @@ Default: `""` ### `storage_access_key` The storage account access key. This field is ignored if `storage_connection_string` is set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -118,6 +121,9 @@ Default: `""` ### `storage_connection_string` A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -126,6 +132,9 @@ Default: `""` ### `storage_sas_token` The storage account SAS token. This field is ignored if `storage_connection_string` or `storage_access_key` are set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` diff --git a/website/docs/components/inputs/azure_queue_storage.md b/website/docs/components/inputs/azure_queue_storage.md index 4ff05f95e8..a56b65bb40 100644 --- a/website/docs/components/inputs/azure_queue_storage.md +++ b/website/docs/components/inputs/azure_queue_storage.md @@ -84,6 +84,9 @@ Default: `""` ### `storage_access_key` The storage account access key. This field is ignored if `storage_connection_string` is set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -92,6 +95,9 @@ Default: `""` ### `storage_connection_string` A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` diff --git a/website/docs/components/inputs/azure_table_storage.md b/website/docs/components/inputs/azure_table_storage.md index f6c8ab9de9..1f9fd5ce0b 100644 --- a/website/docs/components/inputs/azure_table_storage.md +++ b/website/docs/components/inputs/azure_table_storage.md @@ -84,6 +84,9 @@ Default: `""` ### `storage_access_key` The storage account access key. This field is ignored if `storage_connection_string` is set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -92,6 +95,9 @@ Default: `""` ### `storage_connection_string` A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -100,6 +106,9 @@ Default: `""` ### `storage_sas_token` The storage account SAS token. This field is ignored if `storage_connection_string` or `storage_access_key` are set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` diff --git a/website/docs/components/outputs/aws_s3.md b/website/docs/components/outputs/aws_s3.md index 214e6dd984..2edb653eb4 100644 --- a/website/docs/components/outputs/aws_s3.md +++ b/website/docs/components/outputs/aws_s3.md @@ -33,6 +33,7 @@ output: aws_s3: bucket: "" # No default (required) path: ${!count("files")}-${!timestamp_unix_nano()}.txt + local_file_path: "" tags: {} content_type: application/octet-stream metadata: @@ -55,6 +56,7 @@ output: aws_s3: bucket: "" # No default (required) path: ${!count("files")}-${!timestamp_unix_nano()}.txt + local_file_path: "" tags: {} content_type: application/octet-stream content_encoding: "" @@ -184,6 +186,21 @@ path: ${!meta("kafka_key")}.json path: ${!json("doc.namespace")}/${!json("doc.id")}.json ``` +### `local_file_path` + +The path of the local file to upload. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). + + +Type: `string` +Default: `""` + +```yml +# Examples + +local_file_path: /tmp/file.json +``` + ### `tags` Key/value pairs to store with the object as tags. diff --git a/website/docs/components/outputs/azure_blob_storage.md b/website/docs/components/outputs/azure_blob_storage.md index 9517a7fdd8..79a22a66f5 100644 --- a/website/docs/components/outputs/azure_blob_storage.md +++ b/website/docs/components/outputs/azure_blob_storage.md @@ -40,6 +40,7 @@ output: storage_sas_token: "" container: messages-${!timestamp("2006")} # No default (required) path: ${!count("files")}-${!timestamp_unix_nano()}.txt + local_file_path: "" max_in_flight: 64 ``` @@ -57,6 +58,7 @@ output: storage_sas_token: "" container: messages-${!timestamp("2006")} # No default (required) path: ${!count("files")}-${!timestamp_unix_nano()}.txt + local_file_path: "" blob_type: BLOCK public_access_level: PRIVATE max_in_flight: 64 @@ -99,6 +101,9 @@ Default: `""` ### `storage_access_key` The storage account access key. This field is ignored if `storage_connection_string` is set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -107,6 +112,9 @@ Default: `""` ### `storage_connection_string` A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -115,6 +123,9 @@ Default: `""` ### `storage_sas_token` The storage account SAS token. This field is ignored if `storage_connection_string` or `storage_access_key` are set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -153,6 +164,21 @@ path: ${!meta("kafka_key")}.json path: ${!json("doc.namespace")}/${!json("doc.id")}.json ``` +### `local_file_path` + +The path of the local file to upload. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). + + +Type: `string` +Default: `""` + +```yml +# Examples + +local_file_path: /tmp/file.json +``` + ### `blob_type` Block and Append blobs are comprised of blocks, and each blob can support up to 50,000 blocks. The default value is `+"`BLOCK`"+`.` diff --git a/website/docs/components/outputs/azure_queue_storage.md b/website/docs/components/outputs/azure_queue_storage.md index 10355b14ef..5a755e5d6a 100644 --- a/website/docs/components/outputs/azure_queue_storage.md +++ b/website/docs/components/outputs/azure_queue_storage.md @@ -100,6 +100,9 @@ Default: `""` ### `storage_access_key` The storage account access key. This field is ignored if `storage_connection_string` is set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -108,6 +111,9 @@ Default: `""` ### `storage_connection_string` A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -116,6 +122,9 @@ Default: `""` ### `storage_sas_token` The storage account SAS token. This field is ignored if `storage_connection_string` or `storage_access_key` are set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` diff --git a/website/docs/components/outputs/azure_table_storage.md b/website/docs/components/outputs/azure_table_storage.md index 48ea8d557f..23f9190a43 100644 --- a/website/docs/components/outputs/azure_table_storage.md +++ b/website/docs/components/outputs/azure_table_storage.md @@ -140,6 +140,9 @@ Default: `""` ### `storage_access_key` The storage account access key. This field is ignored if `storage_connection_string` is set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -148,6 +151,9 @@ Default: `""` ### `storage_connection_string` A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` @@ -156,6 +162,9 @@ Default: `""` ### `storage_sas_token` The storage account SAS token. This field is ignored if `storage_connection_string` or `storage_access_key` are set. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: Type: `string` diff --git a/website/docs/components/outputs/elasticsearch.md b/website/docs/components/outputs/elasticsearch.md index cfdd6e0a33..69d2856de1 100644 --- a/website/docs/components/outputs/elasticsearch.md +++ b/website/docs/components/outputs/elasticsearch.md @@ -56,6 +56,8 @@ output: id: ${!count("elastic_ids")}-${!timestamp_unix()} type: "" routing: "" + stored_script: "" + script_params: "" sniff: true healthcheck: true timeout: 5s @@ -185,6 +187,29 @@ This field supports [interpolation functions](/docs/configuration/interpolation# Type: `string` Default: `""` +### `stored_script` + +The id of the [stored script](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html#script-stored-scripts). Ignored if action is not `update` or `upsert` +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). + + +Type: `string` +Default: `""` + +### `script_params` + +A [Bloblang query](/docs/guides/bloblang/about/) with the script params. Ignored if action is not `update` or `upsert` + + +Type: `string` +Default: `""` + +```yml +# Examples + +script_params: root.doc = this +``` + ### `sniff` Prompts Benthos to sniff for brokers to connect to when establishing a connection.