Skip to content

Commit

Permalink
Adding Azure Blob Storage output support (#47)
Browse files Browse the repository at this point in the history
* Adding Azure Blob Storage output support

* linting

* Lint

* adding some comments, and making the linter happy

* more linting..

* go mod tidy

* Updating the name from ABS to azure_blob_storage

* update changelog

* adding better package name and root commands
  • Loading branch information
P1llus authored Jan 17, 2023
1 parent 0607895 commit d35c829
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 0 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ This project adheres to [Semantic Versioning](http://semver.org/).

### Removed

## [0.9.0]

### Added

- Added support for azure blob storage output: [#46](https://github.com/elastic/stream/pull/46)

## [0.8.0]

### Added
Expand Down
6 changes: 6 additions & 0 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/stream/pkg/output"

// Register outputs.
_ "github.com/elastic/stream/pkg/output/azureblobstorage"
_ "github.com/elastic/stream/pkg/output/gcppubsub"
_ "github.com/elastic/stream/pkg/output/kafka"
_ "github.com/elastic/stream/pkg/output/lumberjack"
Expand Down Expand Up @@ -78,6 +79,11 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.PersistentFlags().StringVar(&opts.GCPPubsubOptions.Subscription, "gcppubsub-subscription", "subscription", "GCP Pubsub subscription name")
rootCmd.PersistentFlags().BoolVar(&opts.GCPPubsubOptions.Clear, "gcppubsub-clear", true, "GCP Pubsub clear flag")

// GCS output flags.
rootCmd.PersistentFlags().StringVar(&opts.AzureBlobStorageOptions.Container, "azure-blob-storage-container", "testcontainer", "Azure Blob Storage container name")
rootCmd.PersistentFlags().StringVar(&opts.AzureBlobStorageOptions.Blob, "azure-blob-storage-blob", "testblob", "Azure Blob Storage blob name")
rootCmd.PersistentFlags().StringVar(&opts.AzureBlobStorageOptions.Port, "azure-blob-storage-port", "10000", "HTTP port used to connect to the blob storage, used for emulators and CI")

// Kafka Pubsub output flags.
rootCmd.PersistentFlags().StringVar(&opts.KafkaOptions.Topic, "kafka-topic", "test", "Kafka topic name")

Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
cloud.google.com/go/pubsub v1.25.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.5.1
github.com/Shopify/sarama v1.36.0
github.com/elastic/go-concert v0.2.0
github.com/elastic/go-lumber v0.1.2-0.20220819171948-335fde24ea0f
Expand All @@ -26,6 +27,8 @@ require (
cloud.google.com/go v0.104.0 // indirect
cloud.google.com/go/compute v1.9.0 // indirect
cloud.google.com/go/iam v0.4.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.4 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,16 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.4 h1:pqrAR74b6EoR4kcxF7L7Wg2B8Jgil9UUZtMvxhEFqWo=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.4/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.5.1 h1:BMTdr+ib5ljLa9MxTJK8x/Ds0MbBb4MfuW5BL0zMJnI=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.5.1/go.mod h1:c6WvOhtmjNUWbLfOG1qxM/q0SPvQNSVJvolm+C52dIU=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
Expand Down Expand Up @@ -120,6 +128,7 @@ github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxG
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c=
github.com/docker/cli v20.10.17+incompatible h1:eO2KS7ZFeov5UJeaDmIs1NFEDRf32PaqRpvoEkKBy5M=
github.com/docker/cli v20.10.17+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE=
Expand Down Expand Up @@ -170,6 +179,7 @@ github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -249,6 +259,7 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99a87aa/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8=
github.com/googleapis/enterprise-certificate-proxy v0.1.0 h1:zO8WHNx/MYiAKJ3d5spxZXZE6KHmIQGQcAzwUzV7qQw=
Expand Down Expand Up @@ -321,6 +332,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2 h1:hRGSmZu7j271trc9sneMrpOW7GN5ngLm8YUZIPzf394=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
Expand Down Expand Up @@ -350,6 +362,7 @@ github.com/ory/dockertest/v3 v3.9.1/go.mod h1:42Ir9hmvaAPm0Mgibk6mBPi7SFvTXxEcnz
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.16 h1:kQPfno+wyx6C5572ABwV+Uo3pDFzQ7yhyGchSyRda0c=
github.com/pierrec/lz4/v4 v4.1.16/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
63 changes: 63 additions & 0 deletions pkg/output/azureblobstorage/azure_blob_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package azureblobstorage

import (
"context"
"errors"
"fmt"

"github.com/elastic/stream/pkg/output"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)

func init() {
output.Register("azureblobstorage", New)
}

type Output struct {
opts *output.Options
client *azblob.Client
}

func New(opts *output.Options) (output.Output, error) {
if opts.Addr == "" {
return nil, errors.New("azure blob storage address is required")
}
// A connection string is used for multiple reasons, its easier to bypass the URL endpoint, and the hardcoded credentials can easily be passed.
// These credentials are the defaults for the Azurite Emulator, which is why they can simply be hardcoded.
connectionString := fmt.Sprintf("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://%s:%s/devstoreaccount1;", opts.Addr, opts.AzureBlobStorageOptions.Port)
serviceClient, _ := azblob.NewClientFromConnectionString(connectionString, nil)

return &Output{opts: opts, client: serviceClient}, nil
}

func (o *Output) DialContext(ctx context.Context) error {
if err := o.createContainer(ctx); err != nil {
return err
}
return nil
}

// Close is not needed as there is no client to close
func (*Output) Close() error {
return nil
}

func (o *Output) Write(b []byte) (int, error) {
_, err := o.client.UploadBuffer(context.Background(), o.opts.AzureBlobStorageOptions.Container, o.opts.AzureBlobStorageOptions.Blob, b, nil)
if err != nil {
return 0, fmt.Errorf("failed to upload file to blob: %w", err)
}
return len(b), nil
}

func (o *Output) createContainer(ctx context.Context) error {
_, err := o.client.CreateContainer(ctx, o.opts.AzureBlobStorageOptions.Container, nil)
if err != nil {
return err
}
return nil
}
124 changes: 124 additions & 0 deletions pkg/output/azureblobstorage/azure_blob_storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

package azureblobstorage

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/require"
"gotest.tools/assert"

"github.com/elastic/stream/pkg/output"
)

const (
emulatorHost = "0.0.0.0"
emulatorPort = "10000"
container = "testcontainer"
blob = "testblob"
)

func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "mcr.microsoft.com/azure-storage/azurite",
Tag: "latest",
PortBindings: map[docker.Port][]docker.PortBinding{
emulatorPort: {{HostIP: emulatorHost, HostPort: emulatorPort}},
},
ExposedPorts: []string{emulatorPort},
}, func(config *docker.HostConfig) {
// set AutoRemove to true so that stopped container goes away by itself
config.AutoRemove = true
config.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
})
if err != nil {
log.Fatalf("Could not start resource: %s", err)
}

if err := pool.Retry(func() error {
// Disable HTTP keep-alives to ensure no extra goroutines hang around.
httpClient := http.Client{Transport: &http.Transport{DisableKeepAlives: true}}
// Sanity check the emulator.
resp, err := httpClient.Get(fmt.Sprintf("http://%s:%s/devstoreaccount1", emulatorHost, emulatorPort))
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusBadRequest {
return fmt.Errorf("unexpected status code: %v", resp.StatusCode)
}

return nil
}); err != nil {
_ = pool.Purge(resource)
log.Fatalf("Could not connect to the Azure Blob Storage instance: %s", err)
}

code := m.Run()

_ = pool.Purge(resource)

os.Exit(code)
}

func TestAzureBlobStorage(t *testing.T) {
out, err := New(&output.Options{
Addr: emulatorHost,
AzureBlobStorageOptions: output.AzureBlobStorageOptions{
Container: container,
Blob: blob,
Port: emulatorPort,
},
})
require.NoError(t, err)

err = out.DialContext(context.Background())
require.NoError(t, err)

event := map[string]interface{}{
"message": "hello world!",
}
data, err := json.Marshal(event)
require.NoError(t, err)

n, err := out.Write(data)
require.NoError(t, err)
assert.Equal(t, len(data), n)

connectionString := fmt.Sprintf("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://%s:%s/devstoreaccount1;", emulatorHost, emulatorPort)
ctx, cancel := context.WithCancel(context.Background())
serviceClient, _ := azblob.NewClientFromConnectionString(connectionString, nil)

blobDownloadResponse, err := serviceClient.DownloadStream(ctx, container, blob, nil)
require.NoError(t, err)

reader := blobDownloadResponse.Body
downloadData, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, string(data), string(downloadData))

err = reader.Close()
require.NoError(t, err)

t.Cleanup(cancel)
}
7 changes: 7 additions & 0 deletions pkg/output/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Options struct {
WebhookOptions
GCPPubsubOptions
KafkaOptions
AzureBlobStorageOptions
LumberjackOptions
}

Expand All @@ -40,6 +41,12 @@ type KafkaOptions struct {
Topic string // Topic name. Will create it if not exists.
}

type AzureBlobStorageOptions struct {
Container string // Container name. Will create it if it does not exists.
Blob string // Blob name to use, will be created inside the container.
Port string // Need port number for tests, to update the connection string
}

type LumberjackOptions struct {
ParseJSON bool // Parse the input bytes as JSON and send structured data. By default, input bytes are sent in a 'message' field.
}

0 comments on commit d35c829

Please sign in to comment.