Skip to content

Commit

Permalink
io: support aliyun oss backend
Browse files Browse the repository at this point in the history
Signed-off-by: divinerapier <[email protected]>
  • Loading branch information
divinerapier committed Nov 22, 2024
1 parent c1ffe04 commit 6a14f50
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 0 deletions.
26 changes: 26 additions & 0 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ type CatalogType string

type AwsProperties map[string]string

type OSSConfig struct {
Endpoint string
AccessKey string
SecretKey string
}

const (
REST CatalogType = "rest"
Hive CatalogType = "hive"
Expand Down Expand Up @@ -122,12 +128,32 @@ func WithPrefix(prefix string) Option[RestCatalog] {
}
}

func WithOSSEndpoint(endpoint string) Option[RestCatalog] {
return func(o *options) {
o.ossConfig.Endpoint = endpoint
}
}

func WithOSSAccessKey(accessKey string) Option[RestCatalog] {
return func(o *options) {
o.ossConfig.AccessKey = accessKey
}
}

func WithOSSSecretKey(secretKey string) Option[RestCatalog] {
return func(o *options) {
o.ossConfig.SecretKey = secretKey
}
}

type Option[T GlueCatalog | RestCatalog] func(*options)

type options struct {
awsConfig aws.Config
awsProperties AwsProperties

ossConfig OSSConfig

tlsConfig *tls.Config
credential string
oauthToken string
Expand Down
9 changes: 9 additions & 0 deletions catalog/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ const (
keyRestSigV4Region = "rest.signing-region"
keyRestSigV4Service = "rest.signing-name"
keyAuthUrl = "rest.authorization-url"

keyOSSAccessKey = "client.oss-access-key"
keyOSSSecretKey = "client.oss-secret-key"
keyOSSEndpoint = "client.oss-endpoint"
)

var (
Expand Down Expand Up @@ -356,6 +360,10 @@ func toProps(o *options) iceberg.Properties {
if o.authUri != nil {
setIf(keyAuthUrl, o.authUri.String())
}

setIf(keyOSSAccessKey, o.ossConfig.AccessKey)
setIf(keyOSSSecretKey, o.ossConfig.SecretKey)
setIf(keyOSSEndpoint, o.ossConfig.Endpoint)
return props
}

Expand Down Expand Up @@ -515,6 +523,7 @@ func (r *RestCatalog) fetchConfig(opts *options) (*options, error) {

o := fromProps(cfg)
o.awsConfig = opts.awsConfig
o.ossConfig = opts.ossConfig
o.tlsConfig = opts.tlsConfig

if uri, ok := cfg["uri"]; ok {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go 1.23
toolchain go1.23.2

require (
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f
github.com/aws/aws-sdk-go-v2 v1.32.5
github.com/aws/aws-sdk-go-v2/config v1.28.5
Expand Down Expand Up @@ -97,6 +98,7 @@ require (
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.4.0 // indirect
golang.org/x/tools v0.26.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ github.com/alecthomas/participle/v2 v2.1.0 h1:z7dElHRrOEEq45F2TG5cbQihMtNTv8vwld
github.com/alecthomas/participle/v2 v2.1.0/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c=
github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk=
github.com/alecthomas/repr v0.2.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g=
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f h1:k14GhTGJuvq27vRgLxf4iuufzLt7GeN3UOytJmU7W/A=
Expand Down Expand Up @@ -243,6 +245,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY=
golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
Expand Down
2 changes: 2 additions & 0 deletions io/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ func inferFileIOFromSchema(path string, props map[string]string) (IO, error) {
switch parsed.Scheme {
case "s3", "s3a", "s3n":
return createS3FileIO(parsed, props)
case "oss":
return createOSSFileIO(parsed, props)
case "file", "":
return LocalFS{}, nil
default:
Expand Down
153 changes: 153 additions & 0 deletions io/oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package io

import (
"fmt"
"io"
"io/fs"
"net/url"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/aliyun/aliyun-oss-go-sdk/oss"
)

// Constants for OSS configuration options
const (
OSSAccessKey = "client.oss-access-key"
OSSSecretKey = "client.oss-secret-key"
OSSEndpoint = "client.oss-endpoint"
)

func createOSSFileIO(parsed *url.URL, props map[string]string) (IO, error) {
endpoint, ok := props[OSSEndpoint]
if !ok {
endpoint = os.Getenv("OSS_ENDPOINT")
}
if endpoint == "" {
return nil, fmt.Errorf("oss endpoint must be specified")
}

accessKey := props[OSSAccessKey]
secretKey := props[OSSSecretKey]

client, err := oss.New(endpoint, accessKey, secretKey)
if err != nil {
return nil, err
}

bucketName := parsed.Host
bucket, err := client.Bucket(bucketName)
if err != nil {
return nil, err
}

ossFS := &ossFS{
bucket: bucket,
}

preprocess := func(n string) string {
_, after, found := strings.Cut(n, "://")
if found {
n = after
}
return strings.TrimPrefix(n, parsed.Host)
}

return FSPreProcName(ossFS, preprocess), nil
}

type ossFS struct {
bucket *oss.Bucket
}

// Open implements fs.FS
func (o *ossFS) Open(name string) (fs.File, error) {
if !fs.ValidPath(name) {
return nil, &os.PathError{Op: "open", Path: name, Err: os.ErrInvalid}
}
if name == "." {
return &ossFile{
name: name,
bucket: o.bucket,
}, nil
}
name = strings.TrimPrefix(name, "/")

obj, err := o.bucket.GetObject(name)
if err != nil {
return nil, err
}

return &ossFile{
reader: obj,
name: name,
bucket: o.bucket,
}, nil
}

type ossFile struct {
mutex sync.Mutex
reader io.ReadCloser
bucket *oss.Bucket
name string
}

func (f *ossFile) Read(p []byte) (int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

return f.reader.Read(p)
}

func (f *ossFile) Close() error {
return f.reader.Close()
}

func (f *ossFile) Stat() (fs.FileInfo, error) {
header, err := f.bucket.GetObjectMeta(f.name)
if err != nil {
return nil, err
}

cl, err := strconv.ParseInt(header.Get("Content-Length"), 10, 64)
if err != nil {
return nil, err
}

return &ossFileInfo{
name: f.name,
size: cl,
}, nil
}

type ossFileInfo struct {
name string
size int64
}

func (fi *ossFileInfo) Name() string { return fi.name }
func (fi *ossFileInfo) Size() int64 { return fi.size }
func (fi *ossFileInfo) Mode() fs.FileMode { return 0444 }
func (fi *ossFileInfo) ModTime() time.Time { return time.Time{} }
func (fi *ossFileInfo) IsDir() bool { return false }
func (fi *ossFileInfo) Sys() interface{} { return nil }

0 comments on commit 6a14f50

Please sign in to comment.