From 6a14f50781d4c5d80f31acd70cd7cbccdb7733ae Mon Sep 17 00:00:00 2001 From: divinerapier Date: Fri, 22 Nov 2024 18:26:58 +0800 Subject: [PATCH] io: support aliyun oss backend Signed-off-by: divinerapier --- catalog/catalog.go | 26 ++++++++ catalog/rest.go | 9 +++ go.mod | 2 + go.sum | 4 ++ io/io.go | 2 + io/oss.go | 153 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 196 insertions(+) create mode 100644 io/oss.go diff --git a/catalog/catalog.go b/catalog/catalog.go index 65da7e5..4ab5aee 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -32,6 +32,12 @@ type CatalogType string type AwsProperties map[string]string +type OSSConfig struct { + Endpoint string + AccessKey string + SecretKey string +} + const ( REST CatalogType = "rest" Hive CatalogType = "hive" @@ -122,12 +128,32 @@ func WithPrefix(prefix string) Option[RestCatalog] { } } +func WithOSSEndpoint(endpoint string) Option[RestCatalog] { + return func(o *options) { + o.ossConfig.Endpoint = endpoint + } +} + +func WithOSSAccessKey(accessKey string) Option[RestCatalog] { + return func(o *options) { + o.ossConfig.AccessKey = accessKey + } +} + +func WithOSSSecretKey(secretKey string) Option[RestCatalog] { + return func(o *options) { + o.ossConfig.SecretKey = secretKey + } +} + type Option[T GlueCatalog | RestCatalog] func(*options) type options struct { awsConfig aws.Config awsProperties AwsProperties + ossConfig OSSConfig + tlsConfig *tls.Config credential string oauthToken string diff --git a/catalog/rest.go b/catalog/rest.go index ef9c332..68a9ac9 100644 --- a/catalog/rest.go +++ b/catalog/rest.go @@ -56,6 +56,10 @@ const ( keyRestSigV4Region = "rest.signing-region" keyRestSigV4Service = "rest.signing-name" keyAuthUrl = "rest.authorization-url" + + keyOSSAccessKey = "client.oss-access-key" + keyOSSSecretKey = "client.oss-secret-key" + keyOSSEndpoint = "client.oss-endpoint" ) var ( @@ -356,6 +360,10 @@ func toProps(o *options) iceberg.Properties { if o.authUri != nil { setIf(keyAuthUrl, o.authUri.String()) } + + setIf(keyOSSAccessKey, o.ossConfig.AccessKey) + setIf(keyOSSSecretKey, o.ossConfig.SecretKey) + setIf(keyOSSEndpoint, o.ossConfig.Endpoint) return props } @@ -515,6 +523,7 @@ func (r *RestCatalog) fetchConfig(opts *options) (*options, error) { o := fromProps(cfg) o.awsConfig = opts.awsConfig + o.ossConfig = opts.ossConfig o.tlsConfig = opts.tlsConfig if uri, ok := cfg["uri"]; ok { diff --git a/go.mod b/go.mod index 14a4d21..c343652 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ go 1.23 toolchain go1.23.2 require ( + github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f github.com/aws/aws-sdk-go-v2 v1.32.5 github.com/aws/aws-sdk-go-v2/config v1.28.5 @@ -97,6 +98,7 @@ require ( golang.org/x/sys v0.26.0 // indirect golang.org/x/term v0.25.0 // indirect golang.org/x/text v0.19.0 // indirect + golang.org/x/time v0.4.0 // indirect golang.org/x/tools v0.26.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/go.sum b/go.sum index ecfdcde..d44adfd 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/alecthomas/participle/v2 v2.1.0 h1:z7dElHRrOEEq45F2TG5cbQihMtNTv8vwld github.com/alecthomas/participle/v2 v2.1.0/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c= github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk= github.com/alecthomas/repr v0.2.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= +github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g= +github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f h1:k14GhTGJuvq27vRgLxf4iuufzLt7GeN3UOytJmU7W/A= @@ -243,6 +245,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY= +golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/io/io.go b/io/io.go index abe5971..231383d 100644 --- a/io/io.go +++ b/io/io.go @@ -215,6 +215,8 @@ func inferFileIOFromSchema(path string, props map[string]string) (IO, error) { switch parsed.Scheme { case "s3", "s3a", "s3n": return createS3FileIO(parsed, props) + case "oss": + return createOSSFileIO(parsed, props) case "file", "": return LocalFS{}, nil default: diff --git a/io/oss.go b/io/oss.go new file mode 100644 index 0000000..411c368 --- /dev/null +++ b/io/oss.go @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package io + +import ( + "fmt" + "io" + "io/fs" + "net/url" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/aliyun/aliyun-oss-go-sdk/oss" +) + +// Constants for OSS configuration options +const ( + OSSAccessKey = "client.oss-access-key" + OSSSecretKey = "client.oss-secret-key" + OSSEndpoint = "client.oss-endpoint" +) + +func createOSSFileIO(parsed *url.URL, props map[string]string) (IO, error) { + endpoint, ok := props[OSSEndpoint] + if !ok { + endpoint = os.Getenv("OSS_ENDPOINT") + } + if endpoint == "" { + return nil, fmt.Errorf("oss endpoint must be specified") + } + + accessKey := props[OSSAccessKey] + secretKey := props[OSSSecretKey] + + client, err := oss.New(endpoint, accessKey, secretKey) + if err != nil { + return nil, err + } + + bucketName := parsed.Host + bucket, err := client.Bucket(bucketName) + if err != nil { + return nil, err + } + + ossFS := &ossFS{ + bucket: bucket, + } + + preprocess := func(n string) string { + _, after, found := strings.Cut(n, "://") + if found { + n = after + } + return strings.TrimPrefix(n, parsed.Host) + } + + return FSPreProcName(ossFS, preprocess), nil +} + +type ossFS struct { + bucket *oss.Bucket +} + +// Open implements fs.FS +func (o *ossFS) Open(name string) (fs.File, error) { + if !fs.ValidPath(name) { + return nil, &os.PathError{Op: "open", Path: name, Err: os.ErrInvalid} + } + if name == "." { + return &ossFile{ + name: name, + bucket: o.bucket, + }, nil + } + name = strings.TrimPrefix(name, "/") + + obj, err := o.bucket.GetObject(name) + if err != nil { + return nil, err + } + + return &ossFile{ + reader: obj, + name: name, + bucket: o.bucket, + }, nil +} + +type ossFile struct { + mutex sync.Mutex + reader io.ReadCloser + bucket *oss.Bucket + name string +} + +func (f *ossFile) Read(p []byte) (int, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + + return f.reader.Read(p) +} + +func (f *ossFile) Close() error { + return f.reader.Close() +} + +func (f *ossFile) Stat() (fs.FileInfo, error) { + header, err := f.bucket.GetObjectMeta(f.name) + if err != nil { + return nil, err + } + + cl, err := strconv.ParseInt(header.Get("Content-Length"), 10, 64) + if err != nil { + return nil, err + } + + return &ossFileInfo{ + name: f.name, + size: cl, + }, nil +} + +type ossFileInfo struct { + name string + size int64 +} + +func (fi *ossFileInfo) Name() string { return fi.name } +func (fi *ossFileInfo) Size() int64 { return fi.size } +func (fi *ossFileInfo) Mode() fs.FileMode { return 0444 } +func (fi *ossFileInfo) ModTime() time.Time { return time.Time{} } +func (fi *ossFileInfo) IsDir() bool { return false } +func (fi *ossFileInfo) Sys() interface{} { return nil }