Skip to content

Commit

Permalink
chore: Add streaming suport in S3 and Azure Blob (#3)
Browse files Browse the repository at this point in the history
* chore: Add streaming suport in S3 and Azure Blob

* chore: Update docs

* chore: Remove code

* fix: Treat error

* chore: Enable the exclusion of all metadata
  • Loading branch information
juniorrhis1 authored Mar 1, 2024
1 parent 81da9a5 commit de746ae
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 18 deletions.
38 changes: 36 additions & 2 deletions internal/impl/aws/output_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"os"
"sort"
"strings"
"time"
Expand All @@ -25,6 +27,7 @@ const (
s3oFieldBucket = "bucket"
s3oFieldForcePathStyleURLs = "force_path_style_urls"
s3oFieldPath = "path"
s3oFieldLocalFilePath = "local_file_path"
s3oFieldTags = "tags"
s3oFieldContentType = "content_type"
s3oFieldContentEncoding = "content_encoding"
Expand All @@ -49,6 +52,7 @@ type s3oConfig struct {
Bucket string

Path *service.InterpolatedString
LocalFilePath *service.InterpolatedString
Tags []s3TagPair
ContentType *service.InterpolatedString
ContentEncoding *service.InterpolatedString
Expand Down Expand Up @@ -79,6 +83,10 @@ func s3oConfigFromParsed(pConf *service.ParsedConfig) (conf s3oConfig, err error
return
}

if conf.LocalFilePath, err = pConf.FieldInterpolatedString(s3oFieldLocalFilePath); err != nil {
return
}

var tagMap map[string]*service.InterpolatedString
if tagMap, err = pConf.FieldInterpolatedStringMap(s3oFieldTags); err != nil {
return
Expand Down Expand Up @@ -208,6 +216,10 @@ output:
Example(`${!count("files")}-${!timestamp_unix_nano()}.txt`).
Example(`${!meta("kafka_key")}.json`).
Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`),
service.NewInterpolatedStringField(s3oFieldLocalFilePath).
Description("The path of the local file to upload.").
Default(``).
Example(`/tmp/file.json`),
service.NewInterpolatedStringMapField(s3oFieldTags).
Description("Key/value pairs to store with the object as tags.").
Default(map[string]any{}).
Expand Down Expand Up @@ -381,15 +393,15 @@ func (a *amazonS3Writer) WriteBatch(wctx context.Context, msg service.MessageBat
return fmt.Errorf("storage class interpolation: %w", err)
}

mBytes, err := m.AsBytes()
uploadBody, err := a.getUploadBody(m)
if err != nil {
return err
}

uploadInput := &s3manager.UploadInput{
Bucket: &a.conf.Bucket,
Key: aws.String(key),
Body: bytes.NewReader(mBytes),
Body: uploadBody,
ContentType: aws.String(contentType),
ContentEncoding: contentEncoding,
CacheControl: cacheControl,
Expand Down Expand Up @@ -432,6 +444,28 @@ func (a *amazonS3Writer) WriteBatch(wctx context.Context, msg service.MessageBat
})
}

func (a *amazonS3Writer) getUploadBody(m *service.Message) (io.Reader, error) {
localFilePath, err := a.conf.LocalFilePath.TryString(m)
if err != nil {
return nil, fmt.Errorf("local file path interpolation error: %w", err)
}

if localFilePath != "" {
file, err := os.Open(localFilePath)
if err != nil {
return nil, fmt.Errorf("local file read error: %w", err)
}
return file, nil
}

mBytes, err := m.AsBytes()
if err != nil {
return nil, err
}

return bytes.NewReader(mBytes), nil
}

func (a *amazonS3Writer) Close(context.Context) error {
return nil
}
9 changes: 6 additions & 3 deletions internal/impl/azure/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ func azureComponentSpec(forBlobStorage bool) *service.ConfigSpec {
Default(""),
service.NewStringField(bscFieldStorageAccessKey).
Description("The storage account access key. This field is ignored if `"+bscFieldStorageConnectionString+"` is set.").
Default(""),
Default("").
Secret(),
service.NewStringField(bscFieldStorageConnectionString).
Description("A storage account connection string. This field is required if `"+bscFieldStorageAccount+"` and `"+bscFieldStorageAccessKey+"` / `"+bscFieldStorageSASToken+"` are not set.").
Default(""),
Default("").
Secret(),
)
spec = spec.Field(service.NewStringField(bscFieldStorageSASToken).
Description("The storage account SAS token. This field is ignored if `" + bscFieldStorageConnectionString + "` or `" + bscFieldStorageAccessKey + "` are set.").
Default("")).
Default("").
Secret()).
LintRule(`root = if this.storage_connection_string != "" && !this.storage_connection_string.contains("AccountName=") && !this.storage_connection_string.contains("UseDevelopmentStorage=true;") && this.storage_account == "" { [ "storage_account must be set if storage_connection_string does not contain the \"AccountName\" parameter" ] }`)
return spec
}
Expand Down
57 changes: 45 additions & 12 deletions internal/impl/azure/output_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"errors"
"fmt"
"io"
"os"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
Expand All @@ -19,6 +21,7 @@ const (
// Blob Storage Output Fields
bsoFieldContainer = "container"
bsoFieldPath = "path"
bsoFieldLocalFilePath = "local_file_path"
bsoFieldBlobType = "blob_type"
bsoFieldPublicAccessLevel = "public_access_level"
)
Expand All @@ -27,6 +30,7 @@ type bsoConfig struct {
client *azblob.Client
Container *service.InterpolatedString
Path *service.InterpolatedString
LocalFilePath *service.InterpolatedString
BlobType *service.InterpolatedString
PublicAccessLevel *service.InterpolatedString
}
Expand All @@ -50,6 +54,9 @@ func bsoConfigFromParsed(pConf *service.ParsedConfig) (conf bsoConfig, err error
if conf.Path, err = pConf.FieldInterpolatedString(bsoFieldPath); err != nil {
return
}
if conf.LocalFilePath, err = pConf.FieldInterpolatedString(bsoFieldLocalFilePath); err != nil {
return
}
if conf.BlobType, err = pConf.FieldInterpolatedString(bsoFieldBlobType); err != nil {
return
}
Expand Down Expand Up @@ -89,6 +96,10 @@ If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+
Example(`${!meta("kafka_key")}.json`).
Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`).
Default(`${!count("files")}-${!timestamp_unix_nano()}.txt`),
service.NewInterpolatedStringField(bsoFieldLocalFilePath).
Description("The path of the local file to upload.").
Example(`/tmp/file.json`).
Default(``),
service.NewInterpolatedStringEnumField(bsoFieldBlobType, "BLOCK", "APPEND").
Description("Block and Append blobs are comprised of blocks, and each blob can support up to 50,000 blocks. The default value is `+\"`BLOCK`\"+`.`").
Advanced().
Expand Down Expand Up @@ -138,12 +149,16 @@ func (a *azureBlobStorageWriter) Connect(ctx context.Context) error {
return nil
}

func (a *azureBlobStorageWriter) uploadBlob(ctx context.Context, containerName, blobName, blobType string, message []byte) error {
func (a *azureBlobStorageWriter) uploadBlob(ctx context.Context, containerName, blobName, blobType string, msg *service.Message) error {
uploadBody, err := a.getUploadBody(msg)
if err != nil {
return err
}

containerClient := a.conf.client.ServiceClient().NewContainerClient(containerName)
var err error
if blobType == "APPEND" {
appendBlobClient := containerClient.NewAppendBlobClient(blobName)
_, err = appendBlobClient.AppendBlock(ctx, streaming.NopCloser(bytes.NewReader(message)), nil)
_, err = appendBlobClient.AppendBlock(ctx, streaming.NopCloser(uploadBody), nil)
if err != nil {
if isErrorCode(err, bloberror.BlobNotFound) {
_, err := appendBlobClient.Create(ctx, nil)
Expand All @@ -152,7 +167,7 @@ func (a *azureBlobStorageWriter) uploadBlob(ctx context.Context, containerName,
}

// Try to upload the message again now that we created the blob
_, err = appendBlobClient.AppendBlock(ctx, streaming.NopCloser(bytes.NewReader(message)), nil)
_, err = appendBlobClient.AppendBlock(ctx, streaming.NopCloser(uploadBody), nil)
if err != nil {
return fmt.Errorf("failed retrying to append block to blob: %w", err)
}
Expand All @@ -161,7 +176,7 @@ func (a *azureBlobStorageWriter) uploadBlob(ctx context.Context, containerName,
}
}
} else {
_, err = containerClient.NewBlockBlobClient(blobName).UploadStream(ctx, bytes.NewReader(message), nil)
_, err = containerClient.NewBlockBlobClient(blobName).UploadStream(ctx, uploadBody, nil)
if err != nil {
return fmt.Errorf("failed to push block to blob: %w", err)
}
Expand Down Expand Up @@ -199,12 +214,7 @@ func (a *azureBlobStorageWriter) Write(ctx context.Context, msg *service.Message
return fmt.Errorf("blob type interpolation error: %s", err)
}

mBytes, err := msg.AsBytes()
if err != nil {
return err
}

if err := a.uploadBlob(ctx, containerName, blobName, blobType, mBytes); err != nil {
if err := a.uploadBlob(ctx, containerName, blobName, blobType, msg); err != nil {
if isErrorCode(err, bloberror.ContainerNotFound) {
var accessLevel string
if accessLevel, err = a.conf.PublicAccessLevel.TryString(msg); err != nil {
Expand All @@ -217,7 +227,7 @@ func (a *azureBlobStorageWriter) Write(ctx context.Context, msg *service.Message
}
}

if err := a.uploadBlob(ctx, containerName, blobName, blobType, mBytes); err != nil {
if err := a.uploadBlob(ctx, containerName, blobName, blobType, msg); err != nil {
return fmt.Errorf("error retrying to upload blob: %s", err)
}
} else {
Expand All @@ -227,6 +237,29 @@ func (a *azureBlobStorageWriter) Write(ctx context.Context, msg *service.Message
return nil
}

func (a *azureBlobStorageWriter) getUploadBody(m *service.Message) (io.ReadSeeker, error) {
localFilePath, err := a.conf.LocalFilePath.TryString(m)
if err != nil {
return nil, fmt.Errorf("local file path interpolation error: %w", err)
}

if localFilePath != "" {
file, err := os.Open(localFilePath)
if err != nil {
return nil, fmt.Errorf("local file read error: %w", err)
}

return file, nil
}

mBytes, err := m.AsBytes()
if err != nil {
return nil, err
}

return bytes.NewReader(mBytes), nil
}

func (a *azureBlobStorageWriter) Close(context.Context) error {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion internal/metadata/exclude_filter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metadata

import (
"slices"
"strings"

"github.com/benthosdev/benthos/v4/internal/docs"
Expand Down Expand Up @@ -46,7 +47,7 @@ type ExcludeFilter struct {
// true otherwise. It also returns true if no filters are configured.
func (f *ExcludeFilter) Match(str string) bool {
for _, prefix := range f.excludePrefixes {
if strings.HasPrefix(str, prefix) {
if strings.HasPrefix(str, prefix) || slices.Contains(f.excludePrefixes, "*") {
return false
}
}
Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/inputs/azure_blob_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ Default: `""`
### `storage_access_key`

The storage account access key. This field is ignored if `storage_connection_string` is set.
:::warning Secret
This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets).
:::


Type: `string`
Expand All @@ -118,6 +121,9 @@ Default: `""`
### `storage_connection_string`

A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set.
:::warning Secret
This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets).
:::


Type: `string`
Expand All @@ -126,6 +132,9 @@ Default: `""`
### `storage_sas_token`

The storage account SAS token. This field is ignored if `storage_connection_string` or `storage_access_key` are set.
:::warning Secret
This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets).
:::


Type: `string`
Expand Down
6 changes: 6 additions & 0 deletions website/docs/components/inputs/azure_queue_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ Default: `""`
### `storage_access_key`

The storage account access key. This field is ignored if `storage_connection_string` is set.
:::warning Secret
This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets).
:::


Type: `string`
Expand All @@ -92,6 +95,9 @@ Default: `""`
### `storage_connection_string`

A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set.
:::warning Secret
This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets).
:::


Type: `string`
Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/inputs/azure_table_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ Default: `""`
### `storage_access_key`

The storage account access key. This field is ignored if `storage_connection_string` is set.
:::warning Secret
This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets).
:::


Type: `string`
Expand All @@ -92,6 +95,9 @@ Default: `""`
### `storage_connection_string`

A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set.
:::warning Secret
This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets).
:::


Type: `string`
Expand All @@ -100,6 +106,9 @@ Default: `""`
### `storage_sas_token`

The storage account SAS token. This field is ignored if `storage_connection_string` or `storage_access_key` are set.
:::warning Secret
This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets).
:::


Type: `string`
Expand Down
17 changes: 17 additions & 0 deletions website/docs/components/outputs/aws_s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ output:
aws_s3:
bucket: "" # No default (required)
path: ${!count("files")}-${!timestamp_unix_nano()}.txt
local_file_path: ""
tags: {}
content_type: application/octet-stream
metadata:
Expand All @@ -55,6 +56,7 @@ output:
aws_s3:
bucket: "" # No default (required)
path: ${!count("files")}-${!timestamp_unix_nano()}.txt
local_file_path: ""
tags: {}
content_type: application/octet-stream
content_encoding: ""
Expand Down Expand Up @@ -184,6 +186,21 @@ path: ${!meta("kafka_key")}.json
path: ${!json("doc.namespace")}/${!json("doc.id")}.json
```

### `local_file_path`

The path of the local file to upload.
This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries).


Type: `string`
Default: `""`

```yml
# Examples
local_file_path: /tmp/file.json
```

### `tags`

Key/value pairs to store with the object as tags.
Expand Down
Loading

0 comments on commit de746ae

Please sign in to comment.