Skip to content

Commit

Permalink
feat: s3 support for the blob sync (#1449)
Browse files Browse the repository at this point in the history
## This PR
Intent of this pr is to add S3 bucket support to the existing "blob"
sync.

### Related Issues
fixes #1376 

### Notes
Marking as a draft for now until I can find my aws creds and live-test
it.

### Follow-up Tasks
integration testing is yet to be performed

### How to test
unit tests provided

Signed-off-by: Dave Josephsen <[email protected]>
  • Loading branch information
djosephsen authored Dec 11, 2024
1 parent 431fbb4 commit a9f7261
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 86 deletions.
21 changes: 21 additions & 0 deletions core/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,26 @@ require (
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/aws/aws-sdk-go-v2 v1.30.3 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.27 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect
github.com/aws/smithy-go v1.20.3 // indirect
github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
Expand Down Expand Up @@ -84,6 +104,7 @@ require (
github.com/googleapis/gax-go/v2 v2.13.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
Expand Down
82 changes: 2 additions & 80 deletions core/go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions core/pkg/sync/blob/blob_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"gocloud.dev/blob"
_ "gocloud.dev/blob/azureblob" // needed to initialize Azure Blob Storage driver
_ "gocloud.dev/blob/gcsblob" // needed to initialize GCS driver
_ "gocloud.dev/blob/s3blob" // needed to initialize s3 driver
)

type Sync struct {
Expand Down
38 changes: 36 additions & 2 deletions core/pkg/sync/builder/syncbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
syncProviderHTTP = "http"
syncProviderGcs = "gcs"
syncProviderAzblob = "azblob"
syncProviderS3 = "s3"
)

var (
Expand All @@ -43,6 +44,7 @@ var (
regFile *regexp.Regexp
regGcs *regexp.Regexp
regAzblob *regexp.Regexp
regS3 *regexp.Regexp
)

func init() {
Expand All @@ -54,6 +56,7 @@ func init() {
regFile = regexp.MustCompile("^file:")
regGcs = regexp.MustCompile("^gs://.+?/")
regAzblob = regexp.MustCompile("^azblob://.+?/")
regS3 = regexp.MustCompile("^s3://.+?/")
}

type ISyncBuilder interface {
Expand Down Expand Up @@ -119,12 +122,15 @@ func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *lo
case syncProviderAzblob:
logger.Debug(fmt.Sprintf("using blob sync-provider with azblob driver for: %s", sourceConfig.URI))
return sb.newAzblob(sourceConfig, logger)
case syncProviderS3:
logger.Debug(fmt.Sprintf("using blob sync-provider with s3 driver for: %s", sourceConfig.URI))
return sb.newS3(sourceConfig, logger), nil

default:
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with "+
"'%s', '%s', '%s', '%s', '%s', '%s', '%s' or '%s'",
"'%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s' or '%s'",
sourceConfig.Provider, syncProviderFile, syncProviderFsNotify, syncProviderFileInfo,
syncProviderKubernetes, syncProviderHTTP, syncProviderGrpc, syncProviderGcs, syncProviderAzblob)
syncProviderKubernetes, syncProviderHTTP, syncProviderGrpc, syncProviderGcs, syncProviderAzblob, syncProviderS3)
}
}

Expand Down Expand Up @@ -284,6 +290,34 @@ func (sb *SyncBuilder) newAzblob(config sync.SourceConfig, logger *logger.Logger
}, nil
}

func (sb *SyncBuilder) newS3(config sync.SourceConfig, logger *logger.Logger) *blobSync.Sync {
// Extract bucket uri and object name from the full URI:
// gs://bucket/path/to/object results in gs://bucket/ as bucketUri and
// path/to/object as an object name.
bucketURI := regS3.FindString(config.URI)
objectName := regS3.ReplaceAllString(config.URI, "")

// Defaults to 5 seconds if interval is not set.
var interval uint32 = 5
if config.Interval != 0 {
interval = config.Interval
}

return &blobSync.Sync{
Bucket: bucketURI,
Object: objectName,

BlobURLMux: blob.DefaultURLMux(),

Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "s3"),
),
Interval: interval,
Cron: cron.New(),
}
}

type IK8sClientBuilder interface {
GetK8sClient() (dynamic.Interface, error)
}
Expand Down
59 changes: 59 additions & 0 deletions core/pkg/sync/builder/syncbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ func Test_SyncsFromFromConfig(t *testing.T) {
URI: "azblob://bucket/path/to/file",
Provider: syncProviderAzblob,
},
{
URI: "s3://bucket/path/to/file",
Provider: syncProviderS3,
},
},
},
wantSyncs: []sync.ISync{
Expand All @@ -252,6 +256,7 @@ func Test_SyncsFromFromConfig(t *testing.T) {
&kubernetes.Sync{},
&blob.Sync{},
&blob.Sync{},
&blob.Sync{},
},
wantErr: false,
},
Expand Down Expand Up @@ -418,3 +423,57 @@ func Test_AzblobConfig(t *testing.T) {
})
}
}

func Test_S3Config(t *testing.T) {
lg := logger.NewLogger(nil, false)
defaultInterval := uint32(5)
tests := []struct {
name string
uri string
interval uint32
expectedBucket string
expectedObject string
expectedInterval uint32
}{
{
name: "simple path",
uri: "s3://bucket/path/to/object",
interval: 10,
expectedBucket: "s3://bucket/",
expectedObject: "path/to/object",
expectedInterval: 10,
},
{
name: "default interval",
uri: "s3://bucket/path/to/object",
expectedBucket: "s3://bucket/",
expectedObject: "path/to/object",
expectedInterval: defaultInterval,
},
{
name: "no object set", // Blob syncer will return error when fetching
uri: "s3://bucket/",
expectedBucket: "s3://bucket/",
expectedObject: "",
expectedInterval: defaultInterval,
},
{
name: "malformed uri", // Blob syncer will return error when opening bucket
uri: "malformed",
expectedBucket: "",
expectedObject: "malformed",
expectedInterval: defaultInterval,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s3Sync := NewSyncBuilder().newS3(sync.SourceConfig{
URI: tt.uri,
Interval: tt.interval,
}, lg)
require.Equal(t, tt.expectedBucket, s3Sync.Bucket)
require.Equal(t, tt.expectedObject, s3Sync.Object)
require.Equal(t, int(tt.expectedInterval), int(s3Sync.Interval))
})
}
}
5 changes: 5 additions & 0 deletions core/pkg/sync/builder/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func ParseSyncProviderURIs(uris []string) ([]sync.SourceConfig, error) {
URI: uri,
Provider: syncProviderAzblob,
})
case regS3.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
URI: uri,
Provider: syncProviderS3,
})
default:
return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+
"'http(s)://', 'grpc(s)://', 'gs://', 'azblob://' or 'core.openfeature.dev'", uri)
Expand Down
12 changes: 11 additions & 1 deletion core/pkg/sync/builder/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func TestParseSource(t *testing.T) {
{"uri":"host:port","provider":"grpc"},
{"uri":"default/my-crd","provider":"kubernetes"},
{"uri":"gs://bucket-name/path/to/file","provider":"gcs"},
{"uri":"azblob://bucket-name/path/to/file","provider":"azblob"}
{"uri":"azblob://bucket-name/path/to/file","provider":"azblob"},
{"uri":"s3://bucket-name/path/to/file","provider":"s3"}
]`,
expectErr: false,
out: []sync.SourceConfig{
Expand Down Expand Up @@ -59,6 +60,10 @@ func TestParseSource(t *testing.T) {
URI: "azblob://bucket-name/path/to/file",
Provider: syncProviderAzblob,
},
{
URI: "s3://bucket-name/path/to/file",
Provider: syncProviderS3,
},
},
},
"multiple-syncs-with-options": {
Expand Down Expand Up @@ -194,6 +199,7 @@ func TestParseSyncProviderURIs(t *testing.T) {
"core.openfeature.dev/default/my-crd",
"gs://bucket-name/path/to/file",
"azblob://bucket-name/path/to/file",
"s3://bucket-name/path/to/file",
},
expectErr: false,
out: []sync.SourceConfig{
Expand Down Expand Up @@ -227,6 +233,10 @@ func TestParseSyncProviderURIs(t *testing.T) {
URI: "azblob://bucket-name/path/to/file",
Provider: syncProviderAzblob,
},
{
URI: "s3://bucket-name/path/to/file",
Provider: syncProviderS3,
},
},
},
"empty": {
Expand Down
2 changes: 1 addition & 1 deletion schemas
2 changes: 1 addition & 1 deletion test-harness

0 comments on commit a9f7261

Please sign in to comment.