Skip to content

Commit

Permalink
feat(catalog): Add Catalog Registry
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Jan 7, 2025
1 parent 73379f4 commit 4b6b49d
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 47 deletions.
52 changes: 52 additions & 0 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"maps"
"net/url"

"github.com/apache/iceberg-go"
Expand All @@ -45,6 +47,9 @@ var (
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchNamespace = errors.New("namespace does not exist")
ErrNamespaceAlreadyExists = errors.New("namespace already exists")
ErrTableAlreadyExists = errors.New("table already exists")
ErrCatalogNotFound = errors.New("catalog type not registered")
ErrNamespaceNotEmpty = errors.New("namespace is not empty")
)

// WithAwsConfig sets the AWS configuration for the catalog.
Expand Down Expand Up @@ -194,3 +199,50 @@ func TableNameFromIdent(ident table.Identifier) string {
func NamespaceFromIdent(ident table.Identifier) table.Identifier {
return ident[:len(ident)-1]
}

func checkForOverlap(removals []string, updates iceberg.Properties) error {
overlap := []string{}
for _, key := range removals {
if _, ok := updates[key]; ok {
overlap = append(overlap, key)
}
}
if len(overlap) > 0 {
return fmt.Errorf("conflict between removals and updates for keys: %v", overlap)
}
return nil
}

func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals []string, updates iceberg.Properties) (iceberg.Properties, PropertiesUpdateSummary, error) {
if err := checkForOverlap(removals, updates); err != nil {
return nil, PropertiesUpdateSummary{}, err
}

var (
updatedProps = maps.Clone(currentProps)
removed = make([]string, 0, len(removals))
updated = make([]string, 0, len(updates))
)

for _, key := range removals {
if _, exists := updatedProps[key]; exists {
delete(updatedProps, key)
removed = append(removed, key)
}
}

for key, value := range updates {
if updatedProps[key] != value {
updated = append(updated, key)
updatedProps[key] = value
}
}

summary := PropertiesUpdateSummary{
Removed: removed,
Updated: updated,
Missing: iceberg.Difference(removals, removed),
}

return updatedProps, summary, nil
}
89 changes: 50 additions & 39 deletions catalog/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"context"
"errors"
"fmt"
"strconv"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/glue"
"github.com/aws/aws-sdk-go-v2/service/glue/types"
)
Expand Down Expand Up @@ -54,6 +57,50 @@ var (
_ Catalog = (*GlueCatalog)(nil)
)

func init() {
Register("glue", RegistrarFunc(func(_ string, props iceberg.Properties) (Catalog, error) {
awsConfig, err := toAwsConfig(props)
if err != nil {
return nil, err
}

return NewGlueCatalog(WithAwsConfig(awsConfig), WithAwsProperties(AwsProperties(props))), nil
}))
}

func toAwsConfig(p iceberg.Properties) (aws.Config, error) {
opts := make([]func(*config.LoadOptions) error, 0)

for k, v := range p {
switch k {
case "glue.region":
opts = append(opts, config.WithRegion(v))
case "glue.endpoint":
opts = append(opts, config.WithBaseEndpoint(v))
case "glue.max-retries":
maxRetry, err := strconv.Atoi(v)
if err != nil {
return aws.Config{}, err
}
opts = append(opts, config.WithRetryMaxAttempts(maxRetry))
case "glue.retry-mode":
m, err := aws.ParseRetryMode(v)
if err != nil {
return aws.Config{}, err
}
opts = append(opts, config.WithRetryMode(m))
}
}

key, secret, token := p.Get("glue.access-key", ""), p.Get("glue.secret-access-key", ""), p.Get("glue.session-token", "")
if key != "" && secret != "" && token != "" {
opts = append(opts, config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(key, secret, token)))
}

return config.LoadDefaultConfig(context.Background(), opts...)
}

type glueAPI interface {
CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error)
GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error)
Expand Down Expand Up @@ -353,39 +400,9 @@ func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace t
return PropertiesUpdateSummary{}, err
}

overlap := []string{}
for _, key := range removals {
if _, exists := updates[key]; exists {
overlap = append(overlap, key)
}
}
if len(overlap) > 0 {
return PropertiesUpdateSummary{}, fmt.Errorf("conflict between removals and updates for keys: %v", overlap)
}

updatedProperties := make(map[string]string)
if database.Parameters != nil {
for k, v := range database.Parameters {
updatedProperties[k] = v
}
}

// Removals.
removed := []string{}
for _, key := range removals {
if _, exists := updatedProperties[key]; exists {
delete(updatedProperties, key)
removed = append(removed, key)
}
}

// Updates.
updated := []string{}
for key, value := range updates {
if updatedProperties[key] != value {
updatedProperties[key] = value
updated = append(updated, key)
}
updatedProperties, propertiesUpdateSummary, err := getUpdatedPropsAndUpdateSummary(database.Parameters, removals, updates)
if err != nil {
return PropertiesUpdateSummary{}, err
}

_, err = c.glueSvc.UpdateDatabase(ctx, &glue.UpdateDatabaseInput{CatalogId: c.catalogId, Name: aws.String(databaseName), DatabaseInput: &types.DatabaseInput{
Expand All @@ -396,12 +413,6 @@ func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace t
return PropertiesUpdateSummary{}, fmt.Errorf("failed to update namespace properties %s: %w", databaseName, err)
}

propertiesUpdateSummary := PropertiesUpdateSummary{
Removed: removed,
Updated: updated,
Missing: iceberg.Difference(removals, removed),
}

return propertiesUpdateSummary, nil
}

Expand Down
142 changes: 142 additions & 0 deletions catalog/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package catalog

import (
"fmt"
"maps"
"net/url"
"slices"
"strings"
"sync"

"github.com/apache/iceberg-go"
)

type registry map[string]Registrar

func (r registry) getKeys() []string {
regMutex.Lock()
defer regMutex.Unlock()
return slices.Collect(maps.Keys(r))
}

func (r registry) set(catalogType string, reg Registrar) {
regMutex.Lock()
defer regMutex.Unlock()
r[catalogType] = reg
}

func (r registry) get(catalogType string) (Registrar, bool) {
regMutex.Lock()
defer regMutex.Unlock()
reg, ok := r[catalogType]
return reg, ok
}

func (r registry) remove(catalogType string) {
regMutex.Lock()
defer regMutex.Unlock()
delete(r, catalogType)
}

var (
regMutex sync.Mutex
defaultRegistry = registry{}
)

// Registrar is a factory for creating Catalog instances, used for registering to use
// with LoadCatalog.
type Registrar interface {
GetCatalog(catalogURI string, props iceberg.Properties) (Catalog, error)
}

type RegistrarFunc func(string, iceberg.Properties) (Catalog, error)

func (f RegistrarFunc) GetCatalog(catalogURI string, props iceberg.Properties) (Catalog, error) {
return f(catalogURI, props)
}

// Register adds the new catalog type to the registry. If the catalog type is already registered, it will be replaced.
func Register(catalogType string, reg Registrar) {
if reg == nil {
panic("catalog: RegisterCatalog catalog factory is nil")
}
defaultRegistry.set(catalogType, reg)
}

// Unregister removes the requested catalog factory from the registry.
func Unregister(catalogType string) {
defaultRegistry.remove(catalogType)
}

// GetRegsisteredCatalogs returns the list of registered catalog names that can
// be looked up via LoadCatalog.
func GetRegisteredCatalogs() []string {
return defaultRegistry.getKeys()
}

// Load allows loading a specific catalog by URI and properties.
//
// This is utilized alongside RegisterCatalog/UnregisterCatalog to not only allow
// easier catalog loading but also to allow for custom catalog implementations to
// be registered and loaded external to this module.
//
// The URI is used to determine the catalog type by first checking if it contains
// the string "://" indicating the presence of a scheme. If so, the schema is used
// to lookup the registered catalog. i.e. "glue://..." would return the Glue catalog
// implementation, passing the URI and properties to NewGlueCatalog. If no scheme is
// present, then the URI is used as-is to lookup the catalog factory function.
//
// Currently the following catalogs are registered by default:
//
// - "glue" for AWS Glue Data Catalog, the rest of the URI is ignored, all configuration
// should be provided using the properties. "glue.region", "glue.endpoint",
// "glue.max-retries", etc. Default AWS credentials are used if found, or can be
// overridden by setting "glue.access-key", "glue.secret-access-key", and "glue.session-token".
//
// - "rest" for a REST API catalog, if the properties have a "uri" key, then that will be used
// as the REST endpoint, otherwise the URI is used as the endpoint. The REST catalog also
// registers "http" and "https" so that Load with an http/s URI will automatically
// load the REST Catalog.
//
// - "sql" for SQL catalogs. The registered generic SQL catalog loader looks for the following
// properties to create the connection: The value of "sql.driver" will be used to call `sql.Open`.
// the DSN to pass to `sql.Open` is set by the "uri" property. Finally, the "sql.dialect" property
// will be used which SQL dialect to use for queries and must be one of the supported ones.
// In addition, "catalog.name" can be set to specify the catalog name, otherwise it will just default
// to "sql".
func Load(catalogURI string, props iceberg.Properties) (Catalog, error) {
var catalogType string
if strings.Contains(catalogURI, "://") {
parsed, err := url.Parse(catalogURI)
if err != nil {
return nil, err
}
catalogType = parsed.Scheme
} else {
catalogType = catalogURI
}

cat, ok := defaultRegistry.get(catalogType)
if !ok {
return nil, fmt.Errorf("%w: %s", ErrCatalogNotFound, catalogType)
}

return cat.GetCatalog(catalogURI, props)
}
Loading

0 comments on commit 4b6b49d

Please sign in to comment.