Skip to content

Commit

Permalink
Delta Exporter: Azure Support
Browse files Browse the repository at this point in the history
  • Loading branch information
N-o-Z committed Feb 6, 2024
1 parent c4be2a0 commit a5d5c5d
Show file tree
Hide file tree
Showing 20 changed files with 629 additions and 297 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,8 @@ jobs:
LAKEFS_BLOCKSTORE_TYPE: azure
LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT: esti
LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY2 }}
ESTI_AZURE_STORAGE_ACCOUNT: esti
ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY2 }}
ESTI_BLOCKSTORE_TYPE: azure
ESTI_STORAGE_NAMESPACE: https://esti.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}

Expand Down Expand Up @@ -959,6 +961,8 @@ jobs:
ESTI_BLOCKSTORE_TYPE: azure
ESTI_STORAGE_NAMESPACE: https://esti4hns.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}
ESTI_ADLS_IMPORT_BASE_URL: https://esti4hns.adls.core.windows.net/esti-system-testing-data/
ESTI_AZURE_STORAGE_ACCOUNT: esti4hns
ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }}

- name: cleanup cosmos db container
if: always()
Expand Down
12 changes: 6 additions & 6 deletions docs/howto/hooks/lua.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,18 +412,18 @@ Parameters:

A package used to export Delta Lake tables from lakeFS to an external cloud storage.

### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_names, writer, delta_client, table_descriptors_path)`
### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_def_names, write_object, delta_client, table_descriptors_path)`

The function used to export Delta Lake tables.
The return value is a table with mapping of table names to external table location (from which it is possible to query the data).

Parameters:

- `action`: The global action object
- `table_names`: Delta tables name list (e.g. `{"table1", "table2"}`)
- `writer`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3.s3_client.put_object`)
- `table_def_names`: Delta tables name list (e.g. `{"table1", "table2"}`)
- `write_object`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3.s3_client.put_object`)
- `delta_client`: A Delta Lake client that implements `get_table: function(repo, ref, prefix)`
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_names` reside
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_def_names` reside

Example:

Expand All @@ -444,7 +444,7 @@ hooks:
local table_descriptors_path = "_lakefs_tables"
local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region)
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_names, sc.put_object, delta_client, table_descriptors_path)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_defs, sc.put_object, delta_client, table_descriptors_path)
for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
Expand All @@ -457,7 +457,7 @@ hooks:
lakefs:
access_key_id: <LAKEFS_ACCESS_KEY_ID>
secret_access_key: <LAKEFS_SECRET_ACCESS_KEY>
table_names:
table_defs:
- mytable
```

Expand Down
150 changes: 129 additions & 21 deletions esti/catalog_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"encoding/csv"
"errors"
"fmt"
"github.com/treeverse/lakefs/pkg/block/azure"
"github.com/treeverse/lakefs/pkg/block/params"
"github.com/treeverse/lakefs/pkg/uri"
"io"
"io/fs"
"net/http"
Expand All @@ -28,12 +31,13 @@ import (
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/apiutil"
"github.com/treeverse/lakefs/pkg/block"
lakefscfg "github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/testutil"
"golang.org/x/exp/slices"
"gopkg.in/yaml.v3"
)

//go:embed export_hooks_files/*/*
//go:embed all:export_hooks_files
var exportHooksFiles embed.FS

type schemaField struct {
Expand All @@ -56,18 +60,23 @@ type hiveTableSpec struct {
}

type exportHooksTestData struct {
SymlinkActionPath string
SymlinkScriptPath string
TableDescriptorPath string
GlueActionPath string
GlueScriptPath string
Branch string
GlueDB string
AccessKeyId string
SecretAccessKey string
Region string
OverrideCommitID string
TableSpec *hiveTableSpec
Repository string
SymlinkActionPath string
SymlinkScriptPath string
TableDescriptorPath string
GlueActionPath string
GlueScriptPath string
Branch string
GlueDB string
AWSAccessKeyID string
AWSSecretAccessKey string
AWSRegion string
AzureAccessKey string
AzureStorageAccount string
OverrideCommitID string
LakeFSAccessKeyID string
LakeFSSecretAccessKey string
TableSpec *hiveTableSpec
}

func renderTplFileAsStr(t *testing.T, tplData any, rootDir fs.FS, path string) string {
Expand Down Expand Up @@ -166,7 +175,7 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tmplD
storageURL, err := url.Parse(symlinksPrefix)
require.NoError(t, err, "failed extracting bucket name")

s3Client, err := testutil.SetupTestS3Client("https://s3.amazonaws.com", testData.AccessKeyId, testData.SecretAccessKey)
s3Client, err := testutil.SetupTestS3Client("https://s3.amazonaws.com", testData.AWSAccessKeyID, testData.AWSSecretAccessKey)

require.NoError(t, err, "failed creating s3 client")

Expand Down Expand Up @@ -234,14 +243,14 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tmplD
return commit.Id, symlinksPrefix
}

// TestAWSCatalogExport will verify that symlinks are exported correcrtly and then in a sequential test verify that the glue exporter works well.
// TestAWSCatalogExport will verify that symlinks are exported correctly and then in a sequential test verify that the glue exporter works well.
// The setup in this test includes:
// Symlinks export: lua script, table in _lakefs_tables, action file, mock table data in CSV form
// Glue export: lua script, table in _lakefs_tables, action file
func TestAWSCatalogExport(t *testing.T) {
// skip if blockstore is not not s3
requireBlockstoreType(t, block.BlockstoreTypeS3)
// skip of the following args are not provided
// skip if the following args are not provided
accessKeyID := viper.GetString("aws_access_key_id")
secretAccessKey := viper.GetString("aws_secret_access_key")
glueDB := viper.GetString("glue_export_hooks_database")
Expand All @@ -259,7 +268,7 @@ func TestAWSCatalogExport(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

tmplDir, _ := fs.Sub(exportHooksFiles, "export_hooks_files")
tmplDir, _ := fs.Sub(exportHooksFiles, "export_hooks_files/glue")
testData := &exportHooksTestData{
Branch: mainBranch,
SymlinkActionPath: "_lakefs_actions/symlink_export.yaml",
Expand All @@ -268,9 +277,9 @@ func TestAWSCatalogExport(t *testing.T) {
GlueActionPath: "_lakefs_actions/glue_export.yaml",
TableDescriptorPath: "_lakefs_tables/animals.yaml",
GlueDB: glueDB,
Region: glueRegion,
AccessKeyId: accessKeyID,
SecretAccessKey: secretAccessKey,
AWSRegion: glueRegion,
AWSAccessKeyID: accessKeyID,
AWSSecretAccessKey: secretAccessKey,
TableSpec: &hiveTableSpec{
Name: "animals",
Type: "hive",
Expand Down Expand Up @@ -320,7 +329,7 @@ func TestAWSCatalogExport(t *testing.T) {

// create glue client

glueClient, err := setupGlueClient(ctx, testData.AccessKeyId, testData.SecretAccessKey, testData.Region)
glueClient, err := setupGlueClient(ctx, testData.AWSAccessKeyID, testData.AWSSecretAccessKey, testData.AWSRegion)
require.NoError(t, err, "creating glue client")

// wait until table is ready
Expand Down Expand Up @@ -383,3 +392,102 @@ func TestAWSCatalogExport(t *testing.T) {
require.Equal(t, symlinkPrefix, aws.ToString(glueTable.StorageDescriptor.Location)+"/", "wrong s3 location in glue table")
})
}

func setupCatalogExportTestByStorageType(t *testing.T, testData *exportHooksTestData) string {
blockstoreType := viper.GetString(lakefscfg.BlockstoreTypeKey)

switch blockstoreType {
case block.BlockstoreTypeS3:
testData.AWSAccessKeyID = viper.GetString("aws_access_key_id")
testData.AWSSecretAccessKey = viper.GetString("aws_secret_access_key")
testData.AWSRegion = "us-east-1"

case block.BlockstoreTypeAzure:
testData.AzureStorageAccount = viper.GetString("azure_storage_account")
testData.AzureAccessKey = viper.GetString("azure_storage_access_key")
default:
t.Skip("unsupported block adapter: ", blockstoreType)
}

return blockstoreType
}

func validateExportTestByStorageType(t *testing.T, ctx context.Context, commit string, testData *exportHooksTestData, blockstoreType string) {
resp, err := client.GetRepositoryWithResponse(ctx, testData.Repository)
require.NoError(t, err)
require.NotNil(t, resp.JSON200)
namespaceURL, err := url.Parse(resp.JSON200.StorageNamespace)
require.NoError(t, err)
keyTempl := "%s/_lakefs/exported/%s/%s/test_table/_delta_log/00000000000000000000.json"

switch blockstoreType {
case block.BlockstoreTypeS3:
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion(testData.AWSRegion),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(testData.AWSAccessKeyID, testData.AWSSecretAccessKey, "")),
)
require.NoError(t, err)
clt := s3.NewFromConfig(cfg)
waiter := s3.NewObjectExistsWaiter(clt)
key := fmt.Sprintf(keyTempl, strings.TrimPrefix(namespaceURL.Path, "/"), mainBranch, commit[:6])
err = waiter.Wait(ctx, &s3.HeadObjectInput{
Bucket: aws.String(namespaceURL.Host),
Key: aws.String(key),
}, 60*time.Second)
require.NoError(t, err)

case block.BlockstoreTypeAzure:
azClient, err := azure.BuildAzureServiceClient(params.Azure{
StorageAccount: testData.AzureStorageAccount,
StorageAccessKey: testData.AzureAccessKey,
})
require.NoError(t, err)
containerName, prefix, _ := strings.Cut(namespaceURL.Path, uri.PathSeparator)
key := fmt.Sprintf(keyTempl, strings.TrimPrefix(prefix, "/"), mainBranch, commit[:6])
_, err = azClient.NewContainerClient(containerName).NewBlobClient(key).GetProperties(ctx, nil)
require.NoError(t, err)
default:
t.Fatal("validation failed on unsupported block adapter")
}
}

func TestDeltaCatalogExport(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

testData := &exportHooksTestData{
Repository: repo,
Branch: mainBranch,
LakeFSAccessKeyID: "AKIAIOSFDNN7EXAMPLEQ",
LakeFSSecretAccessKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
}
blockstore := setupCatalogExportTestByStorageType(t, testData)

tmplDir, err := fs.Sub(exportHooksFiles, "export_hooks_files/delta")
require.NoError(t, err)
err = fs.WalkDir(tmplDir, "data", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
buf, err := fs.ReadFile(tmplDir, path)
if err != nil {
return err
}
uploadResp, err := uploadContent(ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf))
if err != nil {
return err
}
require.Equal(t, http.StatusCreated, uploadResp.StatusCode())
}
return nil
})
require.NoError(t, err)

headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{
"_lakefs_actions/delta_export.yaml": renderTplFileAsStr(t, testData, tmplDir, fmt.Sprintf("%s/_lakefs_actions/delta_export.yaml", blockstore)),
})
validateExportTestByStorageType(t, ctx, headCommit.Id, testData, blockstore)
}

// TODO: What to pass in region: `local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, "")`
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: Delta Exporter
on:
post-commit:
branches: ["{{ .Branch }}*"]
hooks:
- id: delta_exporter
type: lua
properties:
script: |
local azure = require("azure")
local formats = require("formats")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local table_descriptors_path = "_lakefs_tables"
local sc = azure.client(args.azure.storage_account, args.azure.access_key)
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, "")
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_names, sc.put_object, delta_client, table_descriptors_path)
for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
end
args:
azure:
storage_account: "{{ .AzureStorageAccount }}"
access_key: "{{ .AzureAccessKey }}"
lakefs: # provide credentials of a user that has access to the script and Delta Table
access_key_id: "{{ .LakeFSAccessKeyID }}"
secret_access_key: "{{ .LakeFSSecretAccessKey }}"
table_names:
- test-table
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name: test_table
type: delta
catalog: my-catalog
path: tables/test-table
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"db5e0917-1716-4b0f-a009-c25e5b7304a1","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"registration_dttm\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"first_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"email\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"gender\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ip_address\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"cc\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"country\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthdate\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"title\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"comments\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"__index_level_0__\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1707066829815,"configuration":{}}}
{"add":{"path":"0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet","partitionValues":{},"size":6434,"modificationTime":1707066829810,"dataChange":true,"stats":"{\"numRecords\": 11, \"minValues\": {\"registration_dttm\": \"2016-02-03T00:35:29\", \"id\": 137, \"first_name\": \"Alan\", \"last_name\": \"Cox\", \"email\": \"[email protected]\", \"gender\": \"Female\", \"ip_address\": \"115.51.190.173\", \"cc\": \"\", \"country\": \"Brazil\", \"birthdate\": \"\", \"salary\": 32792.52, \"title\": \"\", \"comments\": \"\", \"__index_level_0__\": 136}, \"maxValues\": {\"registration_dttm\": \"2016-02-03T23:47:37\", \"id\": 860, \"first_name\": \"Sharon\", \"last_name\": \"Vasquez\", \"email\": \"[email protected]\", \"gender\": \"Male\", \"ip_address\": \"46.161.44.180\", \"cc\": \"630472604816980983\", \"country\": \"Philippines\", \"birthdate\": \"9/27/1984\", \"salary\": 278553.57, \"title\": \"Systems Administrator I\", \"comments\": \"\", \"__index_level_0__\": 859}, \"nullCount\": {\"registration_dttm\": 0, \"id\": 0, \"first_name\": 0, \"last_name\": 0, \"email\": 0, \"gender\": 0, \"ip_address\": 0, \"cc\": 0, \"country\": 0, \"birthdate\": 0, \"salary\": 0, \"title\": 0, \"comments\": 0, \"__index_level_0__\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1707066829820,"operation":"CREATE TABLE","operationParameters":{"protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","metadata":"{\"configuration\":{},\"created_time\":1707066829815,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"db5e0917-1716-4b0f-a009-c25e5b7304a1\",\"name\":null,\"partition_columns\":[],\"schema\":{\"fields\":[{\"metadata\":{},\"name\":\"registration_dttm\",\"nullable\":true,\"type\":\"timestamp\"},{\"metadata\":{},\"name\":\"id\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"first_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"last_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"email\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"gender\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"ip_address\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"cc\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"country\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"birthdate\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"salary\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"title\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"comments\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"__index_level_0__\",\"nullable\":true,\"type\":\"long\"}],\"type\":\"struct\"}}","mode":"ErrorIfExists","location":"s3a://delta-lake-demo/main/data"},"clientVersion":"delta-rs.0.17.0"}}
31 changes: 31 additions & 0 deletions esti/export_hooks_files/delta/s3/_lakefs_actions/delta_export.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Delta Exporter
on:
post-commit:
branches: ["{{ .Branch }}*"]
hooks:
- id: delta_exporter
type: lua
properties:
script: |
local aws = require("aws")
local formats = require("formats")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local table_descriptors_path = "_lakefs_tables"
local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region)
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_names, sc.put_object, delta_client, table_descriptors_path)
for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
end
args:
aws:
access_key_id: "{{ .AWSAccessKeyID }}"
secret_access_key: "{{ .AWSSecretAccessKey }}"
region: "{{ .AWSRegion }}"
lakefs: # provide credentials of a user that has access to the script and Delta Table
access_key_id: "{{ .LakeFSAccessKeyID }}"
secret_access_key: "{{ .LakeFSSecretAccessKey }}"
table_names:
- test-table
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ hooks:
script_path: "{{ .GlueScriptPath }}"
args:
aws:
aws_access_key_id: "{{ .AccessKeyId }}"
aws_secret_access_key: "{{ .SecretAccessKey }}"
aws_region: "{{ .Region }}"
aws_access_key_id: "{{ .AWSAccessKeyID }}"
aws_secret_access_key: "{{ .AWSSecretAccessKey }}"
aws_region: "{{ .AWSRegion }}"
table_source: '{{ .TableDescriptorPath }}'
catalog:
db_name: "{{ .GlueDB }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ hooks:
script_path: "{{ .SymlinkScriptPath }}"
args:
aws:
aws_access_key_id: "{{ .AccessKeyId }}"
aws_secret_access_key: "{{ .SecretAccessKey }}"
aws_region: "{{ .Region }}"
aws_access_key_id: "{{ .AWSAccessKeyID }}"
aws_secret_access_key: "{{ .AWSSecretAccessKey }}"
aws_region: "{{ .AWSRegion }}"
table_source: '{{ .TableDescriptorPath }}'
Loading

0 comments on commit a5d5c5d

Please sign in to comment.