From d2fdb995c34029705766d181f5dc817b620c05ab Mon Sep 17 00:00:00 2001 From: Nir Ozery Date: Sun, 4 Feb 2024 17:26:19 +0200 Subject: [PATCH] Delta Exporter: Azure Support --- .github/workflows/esti.yaml | 8 +- docs/howto/hooks/lua.md | 12 +- esti/catalog_export_test.go | 165 +++++++++-- .../azure/_lakefs_actions/delta_export.yaml | 30 ++ .../delta/data/_lakefs_tables/test-table.yaml | 4 + ...8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet | Bin 0 -> 6434 bytes .../_delta_log/00000000000000000000.json | 4 + .../s3/_lakefs_actions/delta_export.yaml | 31 +++ .../_lakefs_actions/glue_export.yaml | 6 +- .../_lakefs_actions/symlink_export.yaml | 6 +- .../{ => glue}/scripts/glue_exporter.lua | 0 .../{ => glue}/scripts/symlink_exporter.lua | 0 esti/ops/docker-compose-external-db.yaml | 4 +- esti/ops/docker-compose.yaml | 4 +- pkg/actions/lua.go | 9 +- .../lakefs/catalogexport/table_extractor.lua | 2 +- pkg/actions/lua/open.go | 2 + pkg/actions/lua/storage/aws/client.go | 236 ++++++++++++++++ pkg/actions/lua/storage/aws/s3.go | 259 ------------------ pkg/actions/lua/storage/azure/azure.go | 18 ++ pkg/actions/lua/storage/azure/client.go | 125 +++++++++ pkg/actions/lua/storage/client.go | 26 ++ test/lakefsfs_contract/docker-compose.yaml | 2 - test/spark/docker-compose.yaml | 4 - 24 files changed, 644 insertions(+), 313 deletions(-) create mode 100644 esti/export_hooks_files/delta/azure/_lakefs_actions/delta_export.yaml create mode 100644 esti/export_hooks_files/delta/data/_lakefs_tables/test-table.yaml create mode 100644 esti/export_hooks_files/delta/data/tables/test-table/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet create mode 100644 esti/export_hooks_files/delta/data/tables/test-table/_delta_log/00000000000000000000.json create mode 100644 esti/export_hooks_files/delta/s3/_lakefs_actions/delta_export.yaml rename esti/export_hooks_files/{ => glue}/_lakefs_actions/glue_export.yaml (83%) rename esti/export_hooks_files/{ => glue}/_lakefs_actions/symlink_export.yaml (62%) rename esti/export_hooks_files/{ => glue}/scripts/glue_exporter.lua (100%) rename esti/export_hooks_files/{ => glue}/scripts/symlink_exporter.lua (100%) create mode 100644 pkg/actions/lua/storage/aws/client.go delete mode 100644 pkg/actions/lua/storage/aws/s3.go create mode 100644 pkg/actions/lua/storage/azure/azure.go create mode 100644 pkg/actions/lua/storage/azure/client.go create mode 100644 pkg/actions/lua/storage/client.go diff --git a/.github/workflows/esti.yaml b/.github/workflows/esti.yaml index 1940bd92943..f280158465c 100644 --- a/.github/workflows/esti.yaml +++ b/.github/workflows/esti.yaml @@ -900,8 +900,8 @@ jobs: DOCKER_REG: ${{ needs.login-to-amazon-ecr.outputs.registry }} LAKEFS_DATABASE_TYPE: postgres LAKEFS_BLOCKSTORE_TYPE: azure - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT: esti - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY2 }} + ESTI_AZURE_STORAGE_ACCOUNT: esti + ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY }} ESTI_BLOCKSTORE_TYPE: azure ESTI_STORAGE_NAMESPACE: https://esti.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }} @@ -954,11 +954,11 @@ jobs: LAKEFS_DATABASE_COSMOSDB_CONTAINER: ${{ github.run_number }}-${{ steps.unique.outputs.value }} LAKEFS_DATABASE_COSMOSDB_KEY: ${{ secrets.LAKEFS_DATABASE_COSMOSDB_READWRITEKEY }} LAKEFS_BLOCKSTORE_TYPE: azure - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT: esti4hns - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }} ESTI_BLOCKSTORE_TYPE: azure ESTI_STORAGE_NAMESPACE: https://esti4hns.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }} ESTI_ADLS_IMPORT_BASE_URL: https://esti4hns.adls.core.windows.net/esti-system-testing-data/ + ESTI_AZURE_STORAGE_ACCOUNT: esti4hns + ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }} - name: cleanup cosmos db container if: always() diff --git a/docs/howto/hooks/lua.md b/docs/howto/hooks/lua.md index 9af960c7fd2..70c4ac9f993 100644 --- a/docs/howto/hooks/lua.md +++ b/docs/howto/hooks/lua.md @@ -412,7 +412,7 @@ Parameters: A package used to export Delta Lake tables from lakeFS to an external cloud storage. -### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_names, writer, delta_client, table_descriptors_path)` +### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_def_names, write_object, delta_client, table_descriptors_path)` The function used to export Delta Lake tables. The return value is a table with mapping of table names to external table location (from which it is possible to query the data). @@ -420,10 +420,10 @@ The return value is a table with mapping of table names to external table locati Parameters: - `action`: The global action object -- `table_names`: Delta tables name list (e.g. `{"table1", "table2"}`) -- `writer`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3.s3_client.put_object`) +- `table_def_names`: Delta tables name list (e.g. `{"table1", "table2"}`) +- `write_object`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3.s3_client.put_object`) - `delta_client`: A Delta Lake client that implements `get_table: function(repo, ref, prefix)` -- `table_descriptors_path`: The path under which the table descriptors of the provided `table_names` reside +- `table_descriptors_path`: The path under which the table descriptors of the provided `table_def_names` reside Example: @@ -444,7 +444,7 @@ hooks: local table_descriptors_path = "_lakefs_tables" local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region) local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region) - local delta_table_locations = delta_exporter.export_delta_log(action, args.table_names, sc.put_object, delta_client, table_descriptors_path) + local delta_table_locations = delta_exporter.export_delta_log(action, args.table_defs, sc.put_object, delta_client, table_descriptors_path) for t, loc in pairs(delta_table_locations) do print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n") @@ -457,7 +457,7 @@ hooks: lakefs: access_key_id: secret_access_key: - table_names: + table_defs: - mytable ``` diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go index 01e3b09401d..6c56222d12b 100644 --- a/esti/catalog_export_test.go +++ b/esti/catalog_export_test.go @@ -28,14 +28,20 @@ import ( "github.com/treeverse/lakefs/pkg/api/apigen" "github.com/treeverse/lakefs/pkg/api/apiutil" "github.com/treeverse/lakefs/pkg/block" + "github.com/treeverse/lakefs/pkg/block/azure" + "github.com/treeverse/lakefs/pkg/block/params" + lakefscfg "github.com/treeverse/lakefs/pkg/config" "github.com/treeverse/lakefs/pkg/testutil" + "github.com/treeverse/lakefs/pkg/uri" "golang.org/x/exp/slices" "gopkg.in/yaml.v3" ) -//go:embed export_hooks_files/*/* +//go:embed all:export_hooks_files var exportHooksFiles embed.FS +const catalogExportTestMaxRetries = 30 * time.Second + type schemaField struct { Name string `yaml:"name"` Type string `yaml:"type"` @@ -56,18 +62,23 @@ type hiveTableSpec struct { } type exportHooksTestData struct { - SymlinkActionPath string - SymlinkScriptPath string - TableDescriptorPath string - GlueActionPath string - GlueScriptPath string - Branch string - GlueDB string - AccessKeyId string - SecretAccessKey string - Region string - OverrideCommitID string - TableSpec *hiveTableSpec + Repository string + SymlinkActionPath string + SymlinkScriptPath string + TableDescriptorPath string + GlueActionPath string + GlueScriptPath string + Branch string + GlueDB string + AWSAccessKeyID string + AWSSecretAccessKey string + AWSRegion string + AzureAccessKey string + AzureStorageAccount string + OverrideCommitID string + LakeFSAccessKeyID string + LakeFSSecretAccessKey string + TableSpec *hiveTableSpec } func renderTplFileAsStr(t *testing.T, tplData any, rootDir fs.FS, path string) string { @@ -166,7 +177,7 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tmplD storageURL, err := url.Parse(symlinksPrefix) require.NoError(t, err, "failed extracting bucket name") - s3Client, err := testutil.SetupTestS3Client("https://s3.amazonaws.com", testData.AccessKeyId, testData.SecretAccessKey) + s3Client, err := testutil.SetupTestS3Client("https://s3.amazonaws.com", testData.AWSAccessKeyID, testData.AWSSecretAccessKey) require.NoError(t, err, "failed creating s3 client") @@ -204,9 +215,10 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tmplD Key: aws.String(symlinkFileKey), }) require.NoErrorf(t, err, "getting symlink file content bucket=%s key=%s", storageURL.Host, symlinkFileKey) - defer objRes.Body.Close() body, err := io.ReadAll(objRes.Body) require.NoError(t, err, "fail reading object data") + require.NoError(t, objRes.Body.Close()) + for _, addr := range strings.Split(string(body), "\n") { if addr != "" { // split returns last \n as empty string storagePhysicalAddrs[addr] = true @@ -234,14 +246,14 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tmplD return commit.Id, symlinksPrefix } -// TestAWSCatalogExport will verify that symlinks are exported correcrtly and then in a sequential test verify that the glue exporter works well. +// TestAWSCatalogExport will verify that symlinks are exported correctly and then in a sequential test verify that the glue exporter works well. // The setup in this test includes: // Symlinks export: lua script, table in _lakefs_tables, action file, mock table data in CSV form // Glue export: lua script, table in _lakefs_tables, action file func TestAWSCatalogExport(t *testing.T) { // skip if blockstore is not not s3 requireBlockstoreType(t, block.BlockstoreTypeS3) - // skip of the following args are not provided + // skip if the following args are not provided accessKeyID := viper.GetString("aws_access_key_id") secretAccessKey := viper.GetString("aws_secret_access_key") glueDB := viper.GetString("glue_export_hooks_database") @@ -259,7 +271,7 @@ func TestAWSCatalogExport(t *testing.T) { ctx, _, repo := setupTest(t) defer tearDownTest(repo) - tmplDir, _ := fs.Sub(exportHooksFiles, "export_hooks_files") + tmplDir, _ := fs.Sub(exportHooksFiles, "export_hooks_files/glue") testData := &exportHooksTestData{ Branch: mainBranch, SymlinkActionPath: "_lakefs_actions/symlink_export.yaml", @@ -268,9 +280,9 @@ func TestAWSCatalogExport(t *testing.T) { GlueActionPath: "_lakefs_actions/glue_export.yaml", TableDescriptorPath: "_lakefs_tables/animals.yaml", GlueDB: glueDB, - Region: glueRegion, - AccessKeyId: accessKeyID, - SecretAccessKey: secretAccessKey, + AWSRegion: glueRegion, + AWSAccessKeyID: accessKeyID, + AWSSecretAccessKey: secretAccessKey, TableSpec: &hiveTableSpec{ Name: "animals", Type: "hive", @@ -320,14 +332,14 @@ func TestAWSCatalogExport(t *testing.T) { // create glue client - glueClient, err := setupGlueClient(ctx, testData.AccessKeyId, testData.SecretAccessKey, testData.Region) + glueClient, err := setupGlueClient(ctx, testData.AWSAccessKeyID, testData.AWSSecretAccessKey, testData.AWSRegion) require.NoError(t, err, "creating glue client") // wait until table is ready tableName := fmt.Sprintf("%s_%s_%s_%s", testData.TableSpec.Name, repo, mainBranch, commitID[:6]) bo := backoff.NewExponentialBackOff() bo.MaxInterval = 5 * time.Second - bo.MaxElapsedTime = 30 * time.Second + bo.MaxElapsedTime = catalogExportTestMaxRetries getGlueTable := func() error { t.Logf("[retry] get table %s ", tableName) resp, err := glueClient.GetTable(ctx, &glue.GetTableInput{ @@ -383,3 +395,110 @@ func TestAWSCatalogExport(t *testing.T) { require.Equal(t, symlinkPrefix, aws.ToString(glueTable.StorageDescriptor.Location)+"/", "wrong s3 location in glue table") }) } + +func setupCatalogExportTestByStorageType(t *testing.T, testData *exportHooksTestData) string { + blockstoreType := viper.GetString(lakefscfg.BlockstoreTypeKey) + + switch blockstoreType { + case block.BlockstoreTypeS3: + testData.AWSAccessKeyID = viper.GetString("aws_access_key_id") + testData.AWSSecretAccessKey = viper.GetString("aws_secret_access_key") + testData.AWSRegion = "us-east-1" + + case block.BlockstoreTypeAzure: + testData.AzureStorageAccount = viper.GetString("azure_storage_account") + testData.AzureAccessKey = viper.GetString("azure_storage_access_key") + + default: + t.Skip("unsupported block adapter: ", blockstoreType) + } + + return blockstoreType +} + +func validateExportTestByStorageType(t *testing.T, ctx context.Context, commit string, testData *exportHooksTestData, blockstoreType string) { + resp, err := client.GetRepositoryWithResponse(ctx, testData.Repository) + require.NoError(t, err) + require.NotNil(t, resp.JSON200) + namespaceURL, err := url.Parse(resp.JSON200.StorageNamespace) + require.NoError(t, err) + keyTempl := "%s/_lakefs/exported/%s/%s/test_table/_delta_log/00000000000000000000.json" + + switch blockstoreType { + case block.BlockstoreTypeS3: + cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion(testData.AWSRegion), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(testData.AWSAccessKeyID, testData.AWSSecretAccessKey, "")), + ) + require.NoError(t, err) + + clt := s3.NewFromConfig(cfg) + key := fmt.Sprintf(keyTempl, strings.TrimPrefix(namespaceURL.Path, "/"), mainBranch, commit[:6]) + _, err = clt.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(namespaceURL.Host), + Key: aws.String(key)}) + require.NoError(t, err) + + case block.BlockstoreTypeAzure: + azClient, err := azure.BuildAzureServiceClient(params.Azure{ + StorageAccount: testData.AzureStorageAccount, + StorageAccessKey: testData.AzureAccessKey, + }) + require.NoError(t, err) + + containerName, prefix, _ := strings.Cut(namespaceURL.Path, uri.PathSeparator) + key := fmt.Sprintf(keyTempl, strings.TrimPrefix(prefix, "/"), mainBranch, commit[:6]) + _, err = azClient.NewContainerClient(containerName).NewBlobClient(key).GetProperties(ctx, nil) + require.NoError(t, err) + + default: + t.Fatal("validation failed on unsupported block adapter") + } +} + +func TestDeltaCatalogExport(t *testing.T) { + ctx, _, repo := setupTest(t) + defer tearDownTest(repo) + + accessKeyID := viper.GetString("access_key_id") + secretAccessKey := viper.GetString("secret_access_key") + testData := &exportHooksTestData{ + Repository: repo, + Branch: mainBranch, + LakeFSAccessKeyID: accessKeyID, + LakeFSSecretAccessKey: secretAccessKey, + } + blockstore := setupCatalogExportTestByStorageType(t, testData) + + tmplDir, err := fs.Sub(exportHooksFiles, "export_hooks_files/delta") + require.NoError(t, err) + err = fs.WalkDir(tmplDir, "data", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() { + buf, err := fs.ReadFile(tmplDir, path) + if err != nil { + return err + } + uploadResp, err := uploadContent(ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf)) + if err != nil { + return err + } + require.Equal(t, http.StatusCreated, uploadResp.StatusCode()) + } + return nil + }) + require.NoError(t, err) + + headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{ + "_lakefs_actions/delta_export.yaml": renderTplFileAsStr(t, testData, tmplDir, fmt.Sprintf("%s/_lakefs_actions/delta_export.yaml", blockstore)), + }) + + runs := waitForListRepositoryRunsLen(ctx, t, repo, headCommit.Id, 1) + require.Equal(t, "completed", runs.Results[0].Status) + + validateExportTestByStorageType(t, ctx, headCommit.Id, testData, blockstore) +} + +// TODO: What to pass in region: `local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, "")` diff --git a/esti/export_hooks_files/delta/azure/_lakefs_actions/delta_export.yaml b/esti/export_hooks_files/delta/azure/_lakefs_actions/delta_export.yaml new file mode 100644 index 00000000000..783bfbef035 --- /dev/null +++ b/esti/export_hooks_files/delta/azure/_lakefs_actions/delta_export.yaml @@ -0,0 +1,30 @@ +name: Delta Exporter +on: + post-commit: + branches: ["{{ .Branch }}*"] +hooks: + - id: delta_exporter + type: lua + properties: + script: | + local azure = require("azure") + local formats = require("formats") + local delta_exporter = require("lakefs/catalogexport/delta_exporter") + + local table_descriptors_path = "_lakefs_tables" + local sc = azure.client(args.azure.storage_account, args.azure.access_key) + local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, "") + local delta_table_locations = delta_exporter.export_delta_log(action, args.table_names, sc.put_object, delta_client, table_descriptors_path) + + for t, loc in pairs(delta_table_locations) do + print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n") + end + args: + azure: + storage_account: "{{ .AzureStorageAccount }}" + access_key: "{{ .AzureAccessKey }}" + lakefs: # provide credentials of a user that has access to the script and Delta Table + access_key_id: "{{ .LakeFSAccessKeyID }}" + secret_access_key: "{{ .LakeFSSecretAccessKey }}" + table_names: + - test-table diff --git a/esti/export_hooks_files/delta/data/_lakefs_tables/test-table.yaml b/esti/export_hooks_files/delta/data/_lakefs_tables/test-table.yaml new file mode 100644 index 00000000000..6d65a1e3f38 --- /dev/null +++ b/esti/export_hooks_files/delta/data/_lakefs_tables/test-table.yaml @@ -0,0 +1,4 @@ +name: test_table +type: delta +catalog: my-catalog +path: tables/test-table \ No newline at end of file diff --git a/esti/export_hooks_files/delta/data/tables/test-table/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet b/esti/export_hooks_files/delta/data/tables/test-table/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet new file mode 100644 index 0000000000000000000000000000000000000000..bec00fb61c08b49e6b9d33ede9385da761a39237 GIT binary patch literal 6434 zcmcgxeQX=$8Nc^pr%sxrA>^)3WxyNhQPyDJozM1p3dm=taZ=oprnGU>PT9r2*q5Hq z=I)#?&_IZNs%U8IDzpy?jjh`sRoC%Hg%CoB{V~L*2_ZDthbBG-ni!hG1c*%t?Rj6@ zwUb=OJ_44{_rA~jywAt)dEWPZ-egLd78nn62{9K?mSGvs07d2ar+@e0p4UI;rS3cV zPS4B8H^C_Xd;_{qCx+*K5AWpt*Zl$c`;#vZ-}WZz&))mv2VTXakG;758XnC(yYLF~ z$(Qf>)eF$2ZkSJFSq#Mlnc)$vgXv^C1AZTw4EUl>9j-@(!#hfcRPMM)umVJwcFZUwieE*HMp%Jjq!_e4b z{G#wH$DC@yGjgFDh_Au(20Y(^=lh7FegbbB9vYrg9TfF-m^T9NEpB`8sNaLf!QHkV z5M11Rx--Dg4L51MK>Yd}V&6yHGhXgFPr%c9t0=kv{oFh!1tRVwF`7 zG!E8O6O5x2)7*h}4QdMumbQxaNQ$AFoj^ZYQmu-K26_&vriKOvvz3}|YbZQARniSZ zuVP_`R7=-3I~}9;b=D{QJy;ms9v^xXuz*pj&5peXzvWsW*q|mZ5OY1GmHU^rb$+Un_=rG`u`v{(%QT`f0oIej|0Tr&%rRkSMbMl!Vv9T_eF zRC+$C&zI4))NXWpvWPL8Ur5@RDQ{G2-UW1TQZ?%c>a}FCQYji5lBQUYZWrr#mb7Gf z)vl^}W>0bvl0wrGNo`3poq0=lynBZGf!D}aR?HX1k5WL`B@W8SC~ zQ31W4Ethcs%8Lz&8urj79PAd|gwr3mn#WiU=yeI(`d}+;UahR^2FcTpyzI+f?(((V z!@F?E{x^lLp&e%SW%0%Y8}z^!&gOB@YG`=f9)dS`JIMH3+bbydV7kAv&IfGW!>~8O zCW^cw&8_cl+22{zaFB@u-$m>X5%;a#+;3Ryz(yIahCR^1yhS&2?isYw3reI|NQem` zNeT%vzY7SWC_*zH5~IkM6ncOnMC6bZit<2`yf?~1G$cet=tM)J80kYd2I4X3g@go( zdc>V*q6fyK-Y6v_NfzV;48RiuJrNPzJt8Ee5KM@ILcyCG6{2xq=Yav3M5!0$1`{Al z6wnVmA=#_*pe`za3}A^zTr#D1g9rXTVZ#J4-z6o^UDtxNTd&Tkg@UEoHi7kmhdt-v zz7gQ=8^W-DjCfa^&(q9#z`38Kz_-zQ8myr3B8f!hL{y6Mk{Fi~vKWU9p<)qUii|X@u_)5Uc#zGDK#(I4*GMFm02BjEa6<+TY&8QfW2#T8XJUufM?Lr+qPR1k zZ9zxp^8~WTJJ=^XxO1Q3p1&SL<}S+>sV5Ng6m0!_Pr2*X?w8&v(h9sD?eBOc32cA$E`5~z<}>f@`}NP?N@BjTf6xlNF5as5DyLxJ zsbBUhFFjhl^;Dln+3cv_*VN$mHD$6;$Zn+qvKug{7qSn80!(+K?CG>2#Tr=v)$6-7 z_vQ%q(0(Xiw@SN;4p(fx(!rd8x?q6p9lt;&d=4MgOub?S(-qqZDtVHBs_E=Td)Cvd zwxgBppi(I7X1%Tk#!0+`Op`)ev$R6+aLuwcyw@BsmvpEL%W!n7M(|E8Ujib9LhC0p zmRc?8`QR8VyrQXD*8)AIgF-0OZAZ`Rsu`S|oV*Um=}Ng=Gj(S*ct_2#G&Nre9<6G? zVE`xAJZUosQxsnt&U@HP9`0R{ zdp3+S@83vu#oEX8WEq+u?ZF2fz5)y|?1qhzA|`bO8wWB=aSz2>w)mjxz-ZQwjuKnt zo{e&kM6oe=qN`TO{-td50_W?ZDKDa4?e$TAhUaXr&+|Rxy}?7B@9n27pdg@}?C1)P zehnK4pZ&P3h0kVG{l1fp5x=j=ixY1v6(+kR_fv_sTiV>54t2)LoB?H?F~`r%5xDf6 zqL?0Nv|T&)QdwLJ6LPCAZ>QGFzIIBZkZLEj6$0&KION*9&F#OP*4hu7)NZh_u|w4D zswZu87&~w!b(%ziz!u;smK_QVz5QukqPcCo$!_Omk=wp2829O33%09@=RV2Obvui@ z?XH=)Z7V>yPy87FB#x{9@7Uo!&E1_nb2H0wKKAi_@TZHRfWJqXp1xz|^Yy*nf1fW* zKE<@6C`aJIpW78>SxFU@aYZRA=^~+}Qi^g2w}5dHzoFlJLr=mZlfVu_FnJl+#`0LA ztsDT(Bfy?IhWlCEC)3A>z6$P5;AW~h0Gh@U;jj#}18G7`6IRm1>~+@ELQXVlvzg;k zPFyhx<>Pj44QR`l;m-Z_`AUk+oLXzj8?P%NOf7lXa3!@kFAC1=Oq9=);Z!la6b|1$ z6%U7T|Hv^SDucyefeL0}>cn)lkXbSCh>rP%v4;KwV|9H5#5R2=fHSprVrHe3H%|jp z^9}w+`VbnZt|lYh(VzCjKq<5q@JiCyt+mf4>__xnJf^jDCUAr=Wl7P4wAChVr0~-LMm+^W6 z7Lp%8Qc}%D1#y7p2_>~$Uw0t98I)AIgjp4`blT!eqMk1wH&yT}UmoYxnM5roPV results[j]["key"].(string) + }) + + response := map[string]interface{}{ + "is_truncated": resp.IsTruncated, + "next_continuation_token": aws.ToString(resp.NextContinuationToken), + "results": results, + } + + return util.DeepPush(l, response) +} diff --git a/pkg/actions/lua/storage/aws/s3.go b/pkg/actions/lua/storage/aws/s3.go deleted file mode 100644 index 22b63255cac..00000000000 --- a/pkg/actions/lua/storage/aws/s3.go +++ /dev/null @@ -1,259 +0,0 @@ -package aws - -import ( - "context" - "errors" - "fmt" - "io" - "sort" - "strings" - "time" - - "github.com/Shopify/go-lua" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/types" - "github.com/treeverse/lakefs/pkg/actions/lua/util" -) - -var errDeleteObject = errors.New("delete object failed") - -func newS3Client(ctx context.Context) lua.Function { - return func(l *lua.State) int { - accessKeyID := lua.CheckString(l, 1) - secretAccessKey := lua.CheckString(l, 2) - var region string - if !l.IsNone(3) { - region = lua.CheckString(l, 3) - } - var endpoint string - if !l.IsNone(4) { - endpoint = lua.CheckString(l, 4) - } - c := &S3Client{ - AccessKeyID: accessKeyID, - SecretAccessKey: secretAccessKey, - Endpoint: endpoint, - Region: region, - ctx: ctx, - } - - l.NewTable() - for name, goFn := range functions { - // -1: tbl - l.PushGoFunction(goFn(c)) - // -1: fn, -2:tbl - l.SetField(-2, name) - } - - return 1 - } -} - -type S3Client struct { - AccessKeyID string - SecretAccessKey string - Endpoint string - Region string - ctx context.Context -} - -func (c *S3Client) client() *s3.Client { - cfg, err := config.LoadDefaultConfig(c.ctx, - config.WithRegion(c.Region), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(c.AccessKeyID, c.SecretAccessKey, "")), - ) - if err != nil { - panic(err) - } - return s3.NewFromConfig(cfg, func(o *s3.Options) { - if c.Endpoint != "" { - o.BaseEndpoint = aws.String(c.Endpoint) - } - }) -} - -var functions = map[string]func(client *S3Client) lua.Function{ - "get_object": getObject, - "put_object": putObject, - "list_objects": listObjects, - "delete_object": deleteObject, - "delete_recursive": deleteRecursive, -} - -func deleteRecursive(c *S3Client) lua.Function { - return func(l *lua.State) int { - bucketName := lua.CheckString(l, 1) - prefix := lua.CheckString(l, 2) - - client := c.client() - input := &s3.ListObjectsV2Input{ - Bucket: aws.String(bucketName), - Prefix: aws.String(prefix), - } - - var errs error - for { - // list objects to delete and delete them - listObjects, err := client.ListObjectsV2(c.ctx, input) - if err != nil { - lua.Errorf(l, "%s", err.Error()) - panic("unreachable") - } - - deleteInput := &s3.DeleteObjectsInput{ - Bucket: &bucketName, - Delete: &types.Delete{}, - } - for _, content := range listObjects.Contents { - deleteInput.Delete.Objects = append(deleteInput.Delete.Objects, types.ObjectIdentifier{Key: content.Key}) - } - deleteObjects, err := client.DeleteObjects(c.ctx, deleteInput) - if err != nil { - errs = errors.Join(errs, err) - break - } - for _, deleteError := range deleteObjects.Errors { - errDel := fmt.Errorf("%w '%s', %s", - errDeleteObject, aws.ToString(deleteError.Key), aws.ToString(deleteError.Message)) - errs = errors.Join(errs, errDel) - } - - if !aws.ToBool(listObjects.IsTruncated) { - break - } - input.ContinuationToken = listObjects.NextContinuationToken - } - if errs != nil { - lua.Errorf(l, "%s", errs.Error()) - panic("unreachable") - } - return 0 - } -} - -func getObject(c *S3Client) lua.Function { - return func(l *lua.State) int { - client := c.client() - key := lua.CheckString(l, 2) - bucket := lua.CheckString(l, 1) - resp, err := client.GetObject(c.ctx, &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - }) - if err != nil { - var ( - noSuchBucket *types.NoSuchBucket - noSuchKey *types.NoSuchKey - ) - if errors.As(err, &noSuchBucket) || errors.As(err, &noSuchKey) { - l.PushString("") - l.PushBoolean(false) // exists - return 2 - } - lua.Errorf(l, "%s", err.Error()) - panic("unreachable") - } - data, err := io.ReadAll(resp.Body) - if err != nil { - lua.Errorf(l, "%s", err.Error()) - panic("unreachable") - } - l.PushString(string(data)) - l.PushBoolean(true) // exists - return 2 - } -} - -func putObject(c *S3Client) lua.Function { - return func(l *lua.State) int { - client := c.client() - buf := strings.NewReader(lua.CheckString(l, 3)) - _, err := client.PutObject(c.ctx, &s3.PutObjectInput{ - Body: buf, - Bucket: aws.String(lua.CheckString(l, 1)), - Key: aws.String(lua.CheckString(l, 2)), - }) - if err != nil { - lua.Errorf(l, "%s", err.Error()) - panic("unreachable") - } - return 0 - } -} - -func deleteObject(c *S3Client) lua.Function { - return func(l *lua.State) int { - client := c.client() - _, err := client.DeleteObject(c.ctx, &s3.DeleteObjectInput{ - Bucket: aws.String(lua.CheckString(l, 1)), - Key: aws.String(lua.CheckString(l, 2)), - }) - if err != nil { - lua.Errorf(l, "%s", err.Error()) - panic("unreachable") - } - return 0 - } -} - -func listObjects(c *S3Client) lua.Function { - return func(l *lua.State) int { - client := c.client() - - var prefix, delimiter, continuationToken *string - if !l.IsNone(2) { - prefix = aws.String(lua.CheckString(l, 2)) - } - if !l.IsNone(3) { - continuationToken = aws.String(lua.CheckString(l, 3)) - } - if !l.IsNone(4) { - delimiter = aws.String(lua.CheckString(l, 4)) - } else { - delimiter = aws.String("/") - } - - resp, err := client.ListObjectsV2(c.ctx, &s3.ListObjectsV2Input{ - Bucket: aws.String(lua.CheckString(l, 1)), - ContinuationToken: continuationToken, - Delimiter: delimiter, - Prefix: prefix, - }) - if err != nil { - lua.Errorf(l, "%s", err.Error()) - panic("unreachable") - } - results := make([]map[string]interface{}, 0) - for _, prefix := range resp.CommonPrefixes { - results = append(results, map[string]interface{}{ - "key": *prefix.Prefix, - "type": "prefix", - }) - } - for _, obj := range resp.Contents { - results = append(results, map[string]interface{}{ - "key": *obj.Key, - "type": "object", - "etag": *obj.ETag, - "size": obj.Size, - "last_modified": obj.LastModified.Format(time.RFC3339), - }) - } - - // sort it - sort.Slice(results, func(i, j int) bool { - return results[i]["key"].(string) > results[j]["key"].(string) - }) - - response := map[string]interface{}{ - "is_truncated": resp.IsTruncated, - "next_continuation_token": aws.ToString(resp.NextContinuationToken), - "results": results, - } - - return util.DeepPush(l, response) - } -} diff --git a/pkg/actions/lua/storage/azure/azure.go b/pkg/actions/lua/storage/azure/azure.go new file mode 100644 index 00000000000..3b8f9246849 --- /dev/null +++ b/pkg/actions/lua/storage/azure/azure.go @@ -0,0 +1,18 @@ +package azure + +import ( + "context" + + "github.com/Shopify/go-lua" +) + +func Open(l *lua.State, ctx context.Context) { + open := func(l *lua.State) int { + lua.NewLibrary(l, []lua.RegistryFunction{ + {Name: "client", Function: newClient(ctx)}, + }) + return 1 + } + lua.Require(l, "azure", open, false) + l.Pop(1) +} diff --git a/pkg/actions/lua/storage/azure/client.go b/pkg/actions/lua/storage/azure/client.go new file mode 100644 index 00000000000..4a8d01cd573 --- /dev/null +++ b/pkg/actions/lua/storage/azure/client.go @@ -0,0 +1,125 @@ +package azure + +import ( + "context" + "io" + "strings" + + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" + "github.com/Shopify/go-lua" + "github.com/treeverse/lakefs/pkg/actions/lua/storage" + "github.com/treeverse/lakefs/pkg/block/azure" + "github.com/treeverse/lakefs/pkg/block/params" + "github.com/treeverse/lakefs/pkg/uri" +) + +type Client struct { + ctx context.Context + client *service.Client +} + +func newClient(ctx context.Context) lua.Function { + return func(l *lua.State) int { + storageAccount := lua.CheckString(l, 1) + accessKey := lua.CheckString(l, 2) + + azClient, err := azure.BuildAzureServiceClient(params.Azure{ + StorageAccount: storageAccount, + StorageAccessKey: accessKey, + }) + if err != nil { + panic(err) + } + + client := &Client{ + ctx: ctx, + client: azClient, + } + + storage.InitStorage(l, client) + return 1 + } +} + +func (c *Client) GetObject(l *lua.State) int { + containerName := lua.CheckString(l, 1) + key := lua.CheckString(l, 2) + blobClient := c.getBlobClient(containerName, key) + downloadResponse, err := blobClient.DownloadStream(c.ctx, &azblob.DownloadStreamOptions{}) + + if err != nil { + if bloberror.HasCode(err, bloberror.BlobNotFound) { + l.PushString("") + l.PushBoolean(false) // exists + return 2 + } + lua.Errorf(l, "%s", err.Error()) + panic("unreachable") + } + + data, err := io.ReadAll(downloadResponse.Body) + if err != nil { + lua.Errorf(l, "%s", err.Error()) + panic("unreachable") + } + l.PushString(string(data)) + l.PushBoolean(true) // exists + return 2 +} + +func (c *Client) PutObject(l *lua.State) int { + // Skipping first argument as it is the host arg (bucket which is irrelevant in Azure) + path := lua.CheckString(l, 2) + buf := strings.NewReader(lua.CheckString(l, 3)) + // Extract containerName and key from path + containerName, key, found := strings.Cut(path, uri.PathSeparator) + if !found { + lua.Errorf(l, "azure client: invalid path, missing container name from path: %s", path) + panic("unreachable") + } + blobClient := c.getBlobClient(containerName, key) + _, err := blobClient.UploadStream(c.ctx, buf, &azblob.UploadStreamOptions{}) + if err != nil { + lua.Errorf(l, "azure client: (container %s) (key %s): %s", containerName, key, err.Error()) + panic("unreachable") + } + return 0 +} + +func (c *Client) DeleteObject(l *lua.State) int { + containerName := lua.CheckString(l, 1) + key := lua.CheckString(l, 2) + blobClient := c.getBlobClient(containerName, key) + _, err := blobClient.Delete(c.ctx, nil) + if err != nil { + lua.Errorf(l, "%s", err.Error()) + panic("unreachable") + } + return 0 +} + +// ListObjects Should be implemented when needed. There are nuances between HNS and BlobStorage which requires understanding the +// Actual use case before implementing the solution +func (c *Client) ListObjects(l *lua.State) int { + lua.Errorf(l, "Not implemented") + panic("unreachable") +} + +// DeleteRecursive Should be implemented when needed. There are nuances between HNS and BlobStorage which requires understanding the +// Actual use case before implementing the solution +func (c *Client) DeleteRecursive(l *lua.State) int { + lua.Errorf(l, "Not implemented") + panic("unreachable") +} + +func (c *Client) getBlobClient(container, blob string) *blockblob.Client { + return c.getContainerClient(container).NewBlockBlobClient(blob) +} + +func (c *Client) getContainerClient(container string) *container.Client { + return c.client.NewContainerClient(container) +} diff --git a/pkg/actions/lua/storage/client.go b/pkg/actions/lua/storage/client.go new file mode 100644 index 00000000000..60873b52065 --- /dev/null +++ b/pkg/actions/lua/storage/client.go @@ -0,0 +1,26 @@ +package storage + +import "github.com/Shopify/go-lua" + +type Client interface { + GetObject(l *lua.State) int + PutObject(l *lua.State) int + DeleteObject(l *lua.State) int + ListObjects(l *lua.State) int + DeleteRecursive(l *lua.State) int +} + +func InitStorage(l *lua.State, client Client) { + l.NewTable() + functions := map[string]lua.Function{ + "get_object": client.GetObject, + "put_object": client.PutObject, + "list_objects": client.ListObjects, + "delete_object": client.DeleteObject, + "delete_recursive": client.DeleteRecursive, + } + for name, goFn := range functions { + l.PushGoFunction(goFn) + l.SetField(-2, name) + } +} diff --git a/test/lakefsfs_contract/docker-compose.yaml b/test/lakefsfs_contract/docker-compose.yaml index 36eba13dd41..935f9518bd0 100644 --- a/test/lakefsfs_contract/docker-compose.yaml +++ b/test/lakefsfs_contract/docker-compose.yaml @@ -54,8 +54,6 @@ services: - LAKEFS_BLOCKSTORE_S3_ENDPOINT=http://s3.local.lakefs.io:9000 - LAKEFS_BLOCKSTORE_S3_FORCE_PATH_STYLE=true - LAKEFS_BLOCKSTORE_GS_CREDENTIALS_JSON - - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT - - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY env_file: - tester.env entrypoint: ["/app/wait-for", "postgres:5432", "--", "/app/lakefs", "run"] diff --git a/test/spark/docker-compose.yaml b/test/spark/docker-compose.yaml index 3220a8f6a35..1044e17f3fd 100644 --- a/test/spark/docker-compose.yaml +++ b/test/spark/docker-compose.yaml @@ -14,8 +14,6 @@ x-lakefs-common: - LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID - LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY - LAKEFS_BLOCKSTORE_GS_CREDENTIALS_JSON - - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT - - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY - AWS_REGION - AWS_ACCESS_KEY_ID - AWS_SECRET_ACCESS_KEY @@ -121,8 +119,6 @@ services: - AWS_ACCESS_KEY_ID - AWS_SECRET_ACCESS_KEY - AWS_REGION - - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT - - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY - TESTER_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE - TESTER_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY extra_hosts: