From c6575cac0d369b0c7530ee1bcc59fee366cab698 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 3 Oct 2023 17:20:58 +0300 Subject: [PATCH 01/10] lua test --- pkg/actions/lua_test.go | 5 ++ .../catalogexport_hive_partition_pager.lua | 69 +++++++++++++++++++ .../catalogexport_hive_partition_pager.output | 10 +++ 3 files changed, 84 insertions(+) create mode 100644 pkg/actions/testdata/lua/catalogexport_hive_partition_pager.lua create mode 100644 pkg/actions/testdata/lua/catalogexport_hive_partition_pager.output diff --git a/pkg/actions/lua_test.go b/pkg/actions/lua_test.go index f55c1605ce8..7e67a298fb4 100644 --- a/pkg/actions/lua_test.go +++ b/pkg/actions/lua_test.go @@ -357,6 +357,11 @@ func TestLuaRunTable(t *testing.T) { Input: "testdata/lua/strings_partition.lua", Output: "testdata/lua/strings_partition.output", }, + { + Name: "catalogexport_hive_partition_pager", + Input: "testdata/lua/catalogexport_hive_partition_pager.lua", + Output: "testdata/lua/catalogexport_hive_partition_pager.output", + }, } for _, testCase := range tests { diff --git a/pkg/actions/testdata/lua/catalogexport_hive_partition_pager.lua b/pkg/actions/testdata/lua/catalogexport_hive_partition_pager.lua new file mode 100644 index 00000000000..2c8c48bb84b --- /dev/null +++ b/pkg/actions/testdata/lua/catalogexport_hive_partition_pager.lua @@ -0,0 +1,69 @@ +local hive = require("lakefs/catalogexport/hive") + +-- helper function to slice table array +function table.slice(tbl, first, last, step) + local sliced = {} + + for i = first or 1, last or #tbl, step or 1 do + sliced[#sliced + 1] = tbl[i] + end + + return sliced +end + +-- lakefs mock package + +local lakefs = { + list_objects = function(repo_id, commit_id, next_offset, prefix, delimiter, page_size) + local fs = { + [action.repository_id] = { + [action.commit_id] = {{ + physical_address = "s3://bucket/a1/b1/b", + path = "letters/a=1/b=1/b.csv" + }, { + physical_address = "s3://bucket/a2/b2/a", + path = "letters/a=2/b=2/a.csv" + }, { + physical_address = "s3://bucket/a2/b2/b", + path = "letters/a=2/b=2/b.csv" + }, { + physical_address = "", + path = "letters/a=2/b=3/_SUCCESS" + }, { + physical_address = "s3://bucket/a2/b3/a", + path = "letters/a=2/b=3/a.csv" + }, { + physical_address = "s3://bucket/a3/b4/a", + path = "letters/a=3/b=4/a.csv" + }, { + physical_address = "s3://bucket/a3/b4/b", + path = "letters/a=3/b=4/b.csv" + }} + } + } + local all_entries = fs[repo_id][commit_id] + if next_offset == "" then + next_offset = 1 + end + local end_idx = next_offset + page_size + return 200, { + results = table.slice(all_entries, next_offset, end_idx), + pagination = { + has_more = end_idx < #all_entries, + next_offset = end_idx + 1 + } + } + end +} + +local page = 2 +local partitions = {"a", "b"} +local prefix = "letters/" +local pager = hive.extract_partition_pager(lakefs, action.repository_id, action.commit_id, prefix, partitions, page) + +for part_key, entries in pager do + print("# partition: " .. part_key) + for _, entry in ipairs(entries) do + print("path: " .. entry.path .. " physical: " .. entry.physical_address) + end +end diff --git a/pkg/actions/testdata/lua/catalogexport_hive_partition_pager.output b/pkg/actions/testdata/lua/catalogexport_hive_partition_pager.output new file mode 100644 index 00000000000..8f37ce39658 --- /dev/null +++ b/pkg/actions/testdata/lua/catalogexport_hive_partition_pager.output @@ -0,0 +1,10 @@ +# partition: letters/a=1/b=1/ +path: letters/a=1/b=1/b.csv physical: s3://bucket/a1/b1/b +# partition: letters/a=2/b=2/ +path: letters/a=2/b=2/a.csv physical: s3://bucket/a2/b2/a +path: letters/a=2/b=2/b.csv physical: s3://bucket/a2/b2/b +# partition: letters/a=2/b=3/ +path: letters/a=2/b=3/a.csv physical: s3://bucket/a2/b3/a +# partition: letters/a=3/b=4/ +path: letters/a=3/b=4/a.csv physical: s3://bucket/a3/b4/a +path: letters/a=3/b=4/b.csv physical: s3://bucket/a3/b4/b \ No newline at end of file From 02ac8efd5c12cb2928da2a5b592fdd93a4b970a7 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Sun, 8 Oct 2023 10:46:02 +0300 Subject: [PATCH 02/10] initial commit --- esti/catalog_export_test.go | 240 ++++++++++++++++++++++++++++++++++++ 1 file changed, 240 insertions(+) create mode 100644 esti/catalog_export_test.go diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go new file mode 100644 index 00000000000..894694df009 --- /dev/null +++ b/esti/catalog_export_test.go @@ -0,0 +1,240 @@ +package esti + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "testing" + "text/template" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/glue" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/cenkalti/backoff/v4" + "github.com/spf13/viper" + "github.com/stretchr/testify/require" + "github.com/treeverse/lakefs/pkg/api/apigen" + "github.com/treeverse/lakefs/pkg/api/apiutil" + "github.com/treeverse/lakefs/pkg/testutil" +) + +const symlinkExporterScript = ` +local exporter = require("lakefs/catalogexport/symlink_exporter") +local aws = require("aws") +local table_path = args.table_source +local s3 = aws.s3_client(args.s3.aws_access_key_id, args.s3.aws_secret_access_key, args.s3.aws_region) +exporter.export_s3(s3, table_path, action, {debug=true}) +` +const animalsTableSpec = ` +name: '{{ .Name }}' +type: hive +path: '{{ .TablePrefix }}' +partition_columns: ['type', 'weight'] +schema: + type: struct + fields: + - name: weight + type: integer + nullable: false + - name: name + type: string + nullable: false + metadata: {} + - name: type + type: string + nullable: true + metadata: + comment: axolotl, cat, dog, fish etc` + +const symlinkExporterAction = ` +name: Symlink S3 Exporter +on: + post-commit: + branches: ["main*"] +hooks: + - id: symlink_exporter + type: lua + properties: + script_path: scripts/symlink_exporter.lua + args: + s3: + aws_access_key_id: "{{ .AccessKeyId }}" + aws_secret_access_key: "{{ .SecretAccessKey }}" + aws_region: us-east-1 + table_source: '_lakefs_tables/{{ .Name }}.yaml'` + +func renderTpl(t *testing.T, tplData any, name, content string) string { + t.Helper() + tpl, err := template.New(name).Parse(content) + require.NoError(t, err, "rendering template") + var doc bytes.Buffer + err = tpl.Execute(&doc, tplData) + require.NoError(t, err) + return doc.String() +} + +func setupGlueClient(ctx context.Context, accessKeyID, secretAccessKey string) (*glue.Client, error) { + cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion("us-east-1"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKeyID, secretAccessKey, "")), + ) + if err != nil { + return nil, err + } + return glue.NewFromConfig(cfg), nil +} + +func uploadAndCommitObjects(t *testing.T, ctx context.Context, repo, branch string, objects map[string]string) { + for path, obj := range objects { + resp, err := uploadContent(ctx, repo, branch, path, obj) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, resp.StatusCode()) + } + commitResp, err := client.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{ + Message: "Table Data", + }) +} + +func TestSymlinkS3Exporter(t *testing.T) { + // setup + ctx, _, repo := setupTest(t) + defer tearDownTest(repo) + tablePrefix := "tables/animals" + tablePaths := map[string]string{ + // tables/animals + tablePrefix + "/type=axolotl/weight=22/a.csv": "blob", + tablePrefix + "/type=axolotl/weight=22/b.csv": "blob", + tablePrefix + "/type=axolotl/weight=22/c.csv": "blob", + tablePrefix + "/type=axolotl/weight=12/a.csv": "blob", + tablePrefix + "/type=axolotl/weight=12/_hidden": "blob", + tablePrefix + "/type=cat/weight=33/a.csv": "blob", + tablePrefix + "/type=dog/weight=10/a.csv": "blob", + tablePrefix + "/type=dog/weight=10/b.csv": "blob", + } + // render actions based on templates + actionData := struct { + Name string + TablePrefix string + AccessKeyId, SecretAccessKey string + }{ + Name: "animals", + TablePrefix: tablePrefix, + AccessKeyId: viper.GetString("aws_access_key_id"), + SecretAccessKey: viper.GetString("aws_secret_access_key"), + } + hookFiles := map[string]string{ + "scripts/symlink_exporter.lua": symlinkExporterScript, + "_lakefs_tables/animals.yaml": renderTpl(t, actionData, "table", animalsTableSpec), + } + + uploadObjects(t, ctx, repo, mainBranch, hookFiles) + // table data + uploadObjects(t, ctx, repo, mainBranch, tablePaths) + + commitResp, err := client.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{ + Message: "Table Data", + }) + require.NoError(t, err, "failed to commit table data content") + require.Equal(t, http.StatusCreated, commitResp.StatusCode()) + // upload action + + resp, err = uploadContent(ctx, repo, mainBranch, "_lakefs_actions/animals_symlink.yaml", renderTpl(t, actionData, "action", symlinkExporterAction)) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, resp.StatusCode()) + commitResp, err = client.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{ + Message: "Action commit", + }) + require.NoError(t, err, "failed to commit action content") + require.Equal(t, http.StatusCreated, commitResp.StatusCode()) + + // wait until actions finish running + runs := waitForListRepositoryRunsLen(ctx, t, repo, commitResp.JSON201.Id, 1) + require.Equal(t, "completed", runs.Results[0].Status, "action result not finished") + + // list symlink.txt files from blockstore + + repoResponse, err := client.GetRepositoryWithResponse(ctx, repo) + require.NoError(t, err, "could not get repository information") + require.Equal(t, repoResponse.StatusCode(), http.StatusOK, "could not get repository information") + namespace := repoResponse.JSON200.StorageNamespace + + //testesti/rand_qtvU6OdSpF/ckei2hn6i1efi86dngh0/testsymlinkexporter/_lakefs/exported/main/9a196e/animals/type=axolotl/weight=22/symlink.tx + symlinksPrefix := fmt.Sprintf("%s/_lakefs/exported/%s/%s/animals/", namespace, mainBranch, commitResp.JSON201.Id[:6]) + storageURL, err := url.Parse(symlinksPrefix) + require.NoError(t, err, "failed extracting bucket name") + + s3Client, err := testutil.SetupTestS3Client("https://s3.amazonaws.com", actionData.AccessKeyId, actionData.SecretAccessKey) + + require.NoError(t, err, "failed creating s3 client") + + // we need to retry for the result + var symlinkLocations []string + + bo := backoff.NewExponentialBackOff() + bo.MaxInterval = 5 * time.Second + bo.MaxElapsedTime = 30 * time.Second + listS3Func := func() error { + listResp, err := s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: aws.String(storageURL.Host), + Prefix: aws.String(storageURL.Path[1:]), // trim leading slash + }) + require.NoErrorf(t, err, "listing symlink files in storage: %s", symlinksPrefix) + if len(listResp.Contents) == 0 { + return fmt.Errorf("no objects found") + } + for _, f := range listResp.Contents { + symlinkLocations = append(symlinkLocations, aws.ToString(f.Key)) + } + return nil + } + + err = backoff.Retry(listS3Func, bo) + + require.NoErrorf(t, err, "failed listing symlink files %s", symlinksPrefix) + require.NotEmptyf(t, symlinkLocations, "no symlink files found in blockstore: %s", symlinksPrefix) + + // get the symlink files and compare their physical address to lakefs result + storagePhysicalAddrs := map[string]bool{} + for _, symlinkFileKey := range symlinkLocations { + objRes, err := s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(storageURL.Host), + Key: aws.String(symlinkFileKey), + }) + require.NoErrorf(t, err, "getting symlink file content bucket=%s key=%s", storageURL.Host, symlinkFileKey) + defer objRes.Body.Close() + body, err := io.ReadAll(objRes.Body) + require.NoError(t, err, "fail reading object data") + for _, addr := range strings.Split(string(body), "\n") { + if addr != "" { // split returns last \n as empty string + storagePhysicalAddrs[addr] = true + } + } + } + + lakeFSObjs, err := client.ListObjectsWithResponse(ctx, repo, commitResp.JSON201.Id, &apigen.ListObjectsParams{ + Prefix: apiutil.Ptr(apigen.PaginationPrefix(tablePrefix)), + }) + require.NoError(t, err, "failed listing lakefs objects") + + // test that all lakeFS entries are exported and represented correctly in symlink files + lakefsPhysicalAddrs := map[string]bool{} + for _, entry := range lakeFSObjs.JSON200.Results { + if !strings.Contains(entry.Path, "_hidden") { + lakefsPhysicalAddrs[entry.PhysicalAddress] = true + } + } + + require.Equal(t, lakefsPhysicalAddrs, storagePhysicalAddrs, "mismatch between lakefs exported objects in symlink files") + + // glue test + glueClient, err := setupGlueClient(ctx, actionData.AccessKeyId, actionData.SecretAccessKey) + require.NoError(t, err, "creating glue client") + glueClient.CreateTable() +} From 7cdcebd1a88f5b5459dbc1c8c5626eeb3b9f2add Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Wed, 18 Oct 2023 09:42:27 +0300 Subject: [PATCH 03/10] added esti test --- esti/catalog_export_test.go | 345 +++++++++++++++++++++++++++--------- 1 file changed, 260 insertions(+), 85 deletions(-) diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go index 894694df009..735ec1742dd 100644 --- a/esti/catalog_export_test.go +++ b/esti/catalog_export_test.go @@ -16,6 +16,7 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/glue" + "github.com/aws/aws-sdk-go-v2/service/glue/types" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/cenkalti/backoff/v4" "github.com/spf13/viper" @@ -23,54 +24,106 @@ import ( "github.com/treeverse/lakefs/pkg/api/apigen" "github.com/treeverse/lakefs/pkg/api/apiutil" "github.com/treeverse/lakefs/pkg/testutil" + "gopkg.in/yaml.v3" ) +type schemaField struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + IsNullable bool `yaml:"nullable"` + Metadata map[string]string `yaml:"metadata,omitempty"` +} +type tableSchema struct { + Type string `yaml:"type"` + Fields []schemaField `yaml:"fields"` +} + +type hiveTableSpec struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + Path string `yaml:"path"` + PartitionColumns []string `yaml:"partition_columns"` + Schema tableSchema `yaml:"schema"` +} + +type exportHooksTestData struct { + // Name string + // TablePrefix string + SymlinkScriptPath string + TableDescriptorPath string + GlueScriptPath string + Branch string + GlueDB string + AccessKeyId string + SecretAccessKey string + OverrideCommitID string + TableSpec *hiveTableSpec +} + +const glueExportScript = ` +local aws = require("aws") +local exporter = require("lakefs/catalogexport/glue_exporter") +action.commit_id = "{{ .OverrideCommitID }}" -- override commit id to use specific symlink file previously created +local glue = aws.glue_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region) +exporter.export_glue(glue, args.catalog.db_name, args.table_source, args.catalog.table_input, action, {debug=true}) +` + +const glueExporterAction = ` +name: Glue Exporter +on: + post-commit: + branches: ["{{ .Branch }}*"] +hooks: + - id: glue_exporter + type: lua + properties: + script_path: "{{ .GlueScriptPath }}" + args: + aws: + aws_access_key_id: "{{ .AccessKeyId }}" + aws_secret_access_key: "{{ .SecretAccessKey }}" + aws_region: us-east-1 + table_source: '{{ .TableDescriptorPath }}' + catalog: + db_name: "{{ .GlueDB }}" + table_input: + StorageDescriptor: + InputFormat: "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat" + OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" + SerdeInfo: + SerializationLibrary: "org.apache.hadoop.hive.serde2.OpenCSVSerde" + Parameters: + separatorChar: "," + Parameters: + classification: "csv" + "skip.header.line.count": "1"` + const symlinkExporterScript = ` local exporter = require("lakefs/catalogexport/symlink_exporter") local aws = require("aws") local table_path = args.table_source -local s3 = aws.s3_client(args.s3.aws_access_key_id, args.s3.aws_secret_access_key, args.s3.aws_region) +local s3 = aws.s3_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region) exporter.export_s3(s3, table_path, action, {debug=true}) ` -const animalsTableSpec = ` -name: '{{ .Name }}' -type: hive -path: '{{ .TablePrefix }}' -partition_columns: ['type', 'weight'] -schema: - type: struct - fields: - - name: weight - type: integer - nullable: false - - name: name - type: string - nullable: false - metadata: {} - - name: type - type: string - nullable: true - metadata: - comment: axolotl, cat, dog, fish etc` const symlinkExporterAction = ` name: Symlink S3 Exporter on: post-commit: - branches: ["main*"] + branches: ["{{ .Branch }}*"] hooks: - id: symlink_exporter type: lua properties: - script_path: scripts/symlink_exporter.lua + script_path: "{{ .SymlinkScriptPath }}" args: - s3: + aws: aws_access_key_id: "{{ .AccessKeyId }}" aws_secret_access_key: "{{ .SecretAccessKey }}" aws_region: us-east-1 - table_source: '_lakefs_tables/{{ .Name }}.yaml'` + table_source: '{{ .TableDescriptorPath }}'` -func renderTpl(t *testing.T, tplData any, name, content string) string { +func renderTplAsStr(t *testing.T, tplData any, name, content string) string { t.Helper() tpl, err := template.New(name).Parse(content) require.NoError(t, err, "rendering template") @@ -91,71 +144,65 @@ func setupGlueClient(ctx context.Context, accessKeyID, secretAccessKey string) ( return glue.NewFromConfig(cfg), nil } -func uploadAndCommitObjects(t *testing.T, ctx context.Context, repo, branch string, objects map[string]string) { - for path, obj := range objects { - resp, err := uploadContent(ctx, repo, branch, path, obj) - require.NoError(t, err) - require.Equal(t, http.StatusCreated, resp.StatusCode()) +func uploadAndCommitObjects(t *testing.T, ctx context.Context, repo, branch string, objectsGroups ...map[string]string) *apigen.Commit { + t.Helper() + for _, objects := range objectsGroups { + for path, obj := range objects { + resp, err := uploadContent(ctx, repo, branch, path, obj) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, resp.StatusCode()) + } } + commitResp, err := client.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{ Message: "Table Data", }) + require.NoErrorf(t, err, "failed commiting uploaded objects to lakefs://%s/%s", repo, branch) + require.Equal(t, http.StatusCreated, commitResp.StatusCode()) + return commitResp.JSON201 } - -func TestSymlinkS3Exporter(t *testing.T) { - // setup - ctx, _, repo := setupTest(t) - defer tearDownTest(repo) - tablePrefix := "tables/animals" - tablePaths := map[string]string{ - // tables/animals - tablePrefix + "/type=axolotl/weight=22/a.csv": "blob", - tablePrefix + "/type=axolotl/weight=22/b.csv": "blob", - tablePrefix + "/type=axolotl/weight=22/c.csv": "blob", - tablePrefix + "/type=axolotl/weight=12/a.csv": "blob", - tablePrefix + "/type=axolotl/weight=12/_hidden": "blob", - tablePrefix + "/type=cat/weight=33/a.csv": "blob", - tablePrefix + "/type=dog/weight=10/a.csv": "blob", - tablePrefix + "/type=dog/weight=10/b.csv": "blob", +func genCSVData(cols []string, n int) string { + csvBlob := "" + // generate header + for _, c := range cols { + csvBlob += c + "," } - // render actions based on templates - actionData := struct { - Name string - TablePrefix string - AccessKeyId, SecretAccessKey string - }{ - Name: "animals", - TablePrefix: tablePrefix, - AccessKeyId: viper.GetString("aws_access_key_id"), - SecretAccessKey: viper.GetString("aws_secret_access_key"), + csvBlob = strings.TrimSuffix(csvBlob, ",") + "\n" + // generate rows + for rowNum := 0; rowNum < n; rowNum++ { + row := "" + for i := range cols { + row += fmt.Sprintf("%d,", rowNum+i) + } + csvBlob += strings.TrimSuffix(row, ",") + "\n" } + return csvBlob +} + +func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tablePaths map[string]string, testData *exportHooksTestData) (string, string) { + t.Helper() + + tableYaml, err := yaml.Marshal(&testData.TableSpec) + + if err != nil { + require.NoError(t, err, "failed marshaling table spec to YAML") + } + hookFiles := map[string]string{ - "scripts/symlink_exporter.lua": symlinkExporterScript, - "_lakefs_tables/animals.yaml": renderTpl(t, actionData, "table", animalsTableSpec), + testData.SymlinkScriptPath: symlinkExporterScript, + testData.TableDescriptorPath: renderTplAsStr(t, testData.TableSpec, "table", string(tableYaml)), } - uploadObjects(t, ctx, repo, mainBranch, hookFiles) - // table data - uploadObjects(t, ctx, repo, mainBranch, tablePaths) + // upload table objects and action script + uploadAndCommitObjects(t, ctx, repo, mainBranch, hookFiles, tablePaths) - commitResp, err := client.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{ - Message: "Table Data", - }) - require.NoError(t, err, "failed to commit table data content") - require.Equal(t, http.StatusCreated, commitResp.StatusCode()) // upload action - - resp, err = uploadContent(ctx, repo, mainBranch, "_lakefs_actions/animals_symlink.yaml", renderTpl(t, actionData, "action", symlinkExporterAction)) - require.NoError(t, err) - require.Equal(t, http.StatusCreated, resp.StatusCode()) - commitResp, err = client.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{ - Message: "Action commit", + commit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{ + "_lakefs_actions/animals_symlink.yaml": renderTplAsStr(t, testData, "action", symlinkExporterAction), }) - require.NoError(t, err, "failed to commit action content") - require.Equal(t, http.StatusCreated, commitResp.StatusCode()) // wait until actions finish running - runs := waitForListRepositoryRunsLen(ctx, t, repo, commitResp.JSON201.Id, 1) + runs := waitForListRepositoryRunsLen(ctx, t, repo, commit.Id, 1) require.Equal(t, "completed", runs.Results[0].Status, "action result not finished") // list symlink.txt files from blockstore @@ -165,12 +212,11 @@ func TestSymlinkS3Exporter(t *testing.T) { require.Equal(t, repoResponse.StatusCode(), http.StatusOK, "could not get repository information") namespace := repoResponse.JSON200.StorageNamespace - //testesti/rand_qtvU6OdSpF/ckei2hn6i1efi86dngh0/testsymlinkexporter/_lakefs/exported/main/9a196e/animals/type=axolotl/weight=22/symlink.tx - symlinksPrefix := fmt.Sprintf("%s/_lakefs/exported/%s/%s/animals/", namespace, mainBranch, commitResp.JSON201.Id[:6]) + symlinksPrefix := fmt.Sprintf("%s/_lakefs/exported/%s/%s/animals/", namespace, mainBranch, commit.Id[:6]) storageURL, err := url.Parse(symlinksPrefix) require.NoError(t, err, "failed extracting bucket name") - s3Client, err := testutil.SetupTestS3Client("https://s3.amazonaws.com", actionData.AccessKeyId, actionData.SecretAccessKey) + s3Client, err := testutil.SetupTestS3Client("https://s3.amazonaws.com", testData.AccessKeyId, testData.SecretAccessKey) require.NoError(t, err, "failed creating s3 client") @@ -218,9 +264,10 @@ func TestSymlinkS3Exporter(t *testing.T) { } } - lakeFSObjs, err := client.ListObjectsWithResponse(ctx, repo, commitResp.JSON201.Id, &apigen.ListObjectsParams{ - Prefix: apiutil.Ptr(apigen.PaginationPrefix(tablePrefix)), + lakeFSObjs, err := client.ListObjectsWithResponse(ctx, repo, commit.Id, &apigen.ListObjectsParams{ + Prefix: apiutil.Ptr(apigen.PaginationPrefix(testData.TableSpec.Path)), }) + require.NoError(t, err, "failed listing lakefs objects") // test that all lakeFS entries are exported and represented correctly in symlink files @@ -233,8 +280,136 @@ func TestSymlinkS3Exporter(t *testing.T) { require.Equal(t, lakefsPhysicalAddrs, storagePhysicalAddrs, "mismatch between lakefs exported objects in symlink files") - // glue test - glueClient, err := setupGlueClient(ctx, actionData.AccessKeyId, actionData.SecretAccessKey) - require.NoError(t, err, "creating glue client") - glueClient.CreateTable() + return commit.Id, symlinksPrefix +} + +func TestAWSCatalogExport(t *testing.T) { + var ( + glueTable *types.Table + commitID string + symlinkPrefix string + ) + ctx, _, repo := setupTest(t) + defer tearDownTest(repo) + + testData := &exportHooksTestData{ + Branch: mainBranch, + SymlinkScriptPath: "scripts/symlink_exporter.lua", + GlueScriptPath: "scripts/glue_exporter.lua", + TableDescriptorPath: "_lakefs_tables/animals.yaml", + GlueDB: "testexport1", + AccessKeyId: viper.GetString("aws_access_key_id"), + SecretAccessKey: viper.GetString("aws_secret_access_key"), + TableSpec: &hiveTableSpec{ + Name: "animals", + Type: "hive", + Path: "tables/animals", + PartitionColumns: []string{"type", "weight"}, + Schema: tableSchema{ + Type: "struct", + Fields: []schemaField{ + {Name: "type", Type: "string", Metadata: map[string]string{"comment": "axolotl, cat, dog, fish etc"}}, + {Name: "weight", Type: "integer"}, + {Name: "name", Type: "string"}, + {Name: "color", Type: "string", IsNullable: true}, + }, + }, + }, + } + + t.Run("symlink_exporter", func(t *testing.T) { + var columns = []string{"name", "color"} + tablePaths := map[string]string{ + testData.TableSpec.Path + "/type=axolotl/weight=22/a.csv": genCSVData(columns, 3), + testData.TableSpec.Path + "/type=axolotl/weight=22/b.csv": genCSVData(columns, 3), + testData.TableSpec.Path + "/type=axolotl/weight=22/c.csv": genCSVData(columns, 3), + testData.TableSpec.Path + "/type=axolotl/weight=12/a.csv": genCSVData(columns, 3), + testData.TableSpec.Path + "/type=axolotl/weight=12/_hidden": "blob", + testData.TableSpec.Path + "/type=cat/weight=33/a.csv": genCSVData(columns, 3), + testData.TableSpec.Path + "/type=dog/weight=10/b.csv": genCSVData(columns, 3), + testData.TableSpec.Path + "/type=dog/weight=10/a.csv": genCSVData(columns, 3), + } + commitID, symlinkPrefix = testSymlinkS3Exporter(t, ctx, repo, tablePaths, testData) + t.Logf("commit id %s symlinks prefix %s", commitID, symlinkPrefix) + }) + t.Run("glue_exporter", func(t *testing.T) { + // override commit ID to make the export table point to the previous commit of data + testData.OverrideCommitID = commitID + luaScript := renderTplAsStr(t, testData, "export_script", glueExportScript) + actionFileBlob := renderTplAsStr(t, testData, "export_action", glueExporterAction) + + headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{ + testData.GlueScriptPath: luaScript, + "_lakefs_actions/glue_export.yaml": actionFileBlob, + }) + + // wait for action to finish + runs := waitForListRepositoryRunsLen(ctx, t, repo, headCommit.Id, 1) + require.Equal(t, "completed", runs.Results[0].Status, "action result not finished") + + // create glue client + + glueClient, err := setupGlueClient(ctx, testData.AccessKeyId, testData.SecretAccessKey) + require.NoError(t, err, "creating glue client") + + // wait until table is ready + tableName := fmt.Sprintf("%s_%s_%s_%s", testData.TableSpec.Name, repo, mainBranch, commitID[:6]) + bo := backoff.NewExponentialBackOff() + bo.MaxInterval = 5 * time.Second + bo.MaxElapsedTime = 30 * time.Second + getGlueTable := func() error { + t.Logf("[retry] get table %s ", tableName) + resp, err := glueClient.GetTable(ctx, &glue.GetTableInput{ + DatabaseName: aws.String(testData.GlueDB), + Name: aws.String(tableName), + }) + if err != nil { + return fmt.Errorf("glue getTable %s/%s: %w", testData.GlueDB, tableName, err) + } + glueTable = resp.Table + return nil + } + err = backoff.Retry(getGlueTable, bo) + require.NoError(t, err, "glue table not created in time") + + // delete table when the test is over + + t.Cleanup(func() { + _, err = glueClient.DeleteTable(ctx, &glue.DeleteTableInput{ + DatabaseName: aws.String(testData.GlueDB), + Name: aws.String(tableName), + }) + if err != nil { + t.Errorf("failed cleanup of glue table %s/%s ", testData.GlueDB, tableName) + } + }) + + // verify table partitions + require.Equal(t, len(testData.TableSpec.PartitionColumns), len(glueTable.PartitionKeys), "partitions created in glue doesnt match in size") + partitionsSet := map[string]bool{} + for _, partCol := range glueTable.PartitionKeys { + name := aws.ToString(partCol.Name) + require.Contains(t, testData.TableSpec.PartitionColumns, name, "glue partition not in table spec") + partitionsSet[name] = true + } + + // verify table columns + glueCols := map[string]bool{} + for _, col := range glueTable.StorageDescriptor.Columns { + glueCols[aws.ToString(col.Name)] = true + } + for _, expCol := range testData.TableSpec.Schema.Fields { + // if not a partition, regular column compare + if !partitionsSet[expCol.Name] { + require.Contains(t, glueCols, expCol.Name, "column not found in glue table schema") + } + } + + // verify table storage properties + + require.Equal(t, "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat", aws.ToString(glueTable.StorageDescriptor.InputFormat), "wrong table input format") + require.Equal(t, "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", aws.ToString(glueTable.StorageDescriptor.OutputFormat), "wrong table output format") + require.Equal(t, symlinkPrefix, aws.ToString(glueTable.StorageDescriptor.Location)+"/", "wrong s3 location in glue table") + require.Equal(t, "matilda", *glueTable.Name, "not matila yo") + }) } From dd945cf830ac8aaee2060d70efd8af33c214b615 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Wed, 18 Oct 2023 10:09:14 +0300 Subject: [PATCH 04/10] update test info --- esti/catalog_export_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go index 735ec1742dd..10d3dd750e0 100644 --- a/esti/catalog_export_test.go +++ b/esti/catalog_export_test.go @@ -47,8 +47,6 @@ type hiveTableSpec struct { } type exportHooksTestData struct { - // Name string - // TablePrefix string SymlinkScriptPath string TableDescriptorPath string GlueScriptPath string @@ -203,7 +201,7 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table // wait until actions finish running runs := waitForListRepositoryRunsLen(ctx, t, repo, commit.Id, 1) - require.Equal(t, "completed", runs.Results[0].Status, "action result not finished") + require.Equal(t, "completed", runs.Results[0].Status, "symlink action result not finished") // list symlink.txt files from blockstore @@ -345,7 +343,7 @@ func TestAWSCatalogExport(t *testing.T) { // wait for action to finish runs := waitForListRepositoryRunsLen(ctx, t, repo, headCommit.Id, 1) - require.Equal(t, "completed", runs.Results[0].Status, "action result not finished") + require.Equal(t, "completed", runs.Results[0].Status, "glue action result not finished") // create glue client From b816fabb01429d84e7fdd540a21233e82eb4588f Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Wed, 18 Oct 2023 10:31:22 +0300 Subject: [PATCH 05/10] rerun roles --- esti/catalog_export_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go index 10d3dd750e0..220c1f149d2 100644 --- a/esti/catalog_export_test.go +++ b/esti/catalog_export_test.go @@ -295,7 +295,7 @@ func TestAWSCatalogExport(t *testing.T) { SymlinkScriptPath: "scripts/symlink_exporter.lua", GlueScriptPath: "scripts/glue_exporter.lua", TableDescriptorPath: "_lakefs_tables/animals.yaml", - GlueDB: "testexport1", + GlueDB: "export-hooks-esti", // TODO(isan) make variable AccessKeyId: viper.GetString("aws_access_key_id"), SecretAccessKey: viper.GetString("aws_secret_access_key"), TableSpec: &hiveTableSpec{ From fbcb5fa69074ded9df0c4724438614e3a9f2cc06 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Wed, 18 Oct 2023 10:43:05 +0300 Subject: [PATCH 06/10] fix require assert table name --- esti/catalog_export_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go index 220c1f149d2..befce72eae0 100644 --- a/esti/catalog_export_test.go +++ b/esti/catalog_export_test.go @@ -408,6 +408,5 @@ func TestAWSCatalogExport(t *testing.T) { require.Equal(t, "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat", aws.ToString(glueTable.StorageDescriptor.InputFormat), "wrong table input format") require.Equal(t, "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", aws.ToString(glueTable.StorageDescriptor.OutputFormat), "wrong table output format") require.Equal(t, symlinkPrefix, aws.ToString(glueTable.StorageDescriptor.Location)+"/", "wrong s3 location in glue table") - require.Equal(t, "matilda", *glueTable.Name, "not matila yo") }) } From a092640fdad657530cc9c495f3464f5be868bf43 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Wed, 18 Oct 2023 10:59:21 +0300 Subject: [PATCH 07/10] configure skip if not aws --- esti/catalog_export_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go index befce72eae0..5b9f2488162 100644 --- a/esti/catalog_export_test.go +++ b/esti/catalog_export_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "github.com/treeverse/lakefs/pkg/api/apigen" "github.com/treeverse/lakefs/pkg/api/apiutil" + "github.com/treeverse/lakefs/pkg/block" "github.com/treeverse/lakefs/pkg/testutil" "gopkg.in/yaml.v3" ) @@ -282,6 +283,7 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table } func TestAWSCatalogExport(t *testing.T) { + requireBlockstoreType(t, block.BlockstoreTypeS3) var ( glueTable *types.Table commitID string From e6d2f3af9be50c4180e867701f0aa3695ad51c4d Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Wed, 18 Oct 2023 11:12:48 +0300 Subject: [PATCH 08/10] fix tests --- esti/catalog_export_test.go | 6 +++++- pkg/testutil/setup.go | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go index 5b9f2488162..e55d145adb6 100644 --- a/esti/catalog_export_test.go +++ b/esti/catalog_export_test.go @@ -282,6 +282,10 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table return commit.Id, symlinksPrefix } +// TestAWSCatalogExport will verify that symlinks are exported correcrtly and then in a sequential test verify that the glue exporter works well. +// The setup in this test includes: +// Symlinks export: lua script, table in _lakefs_tables, action file, mock table data in CSV form +// Glue export: lua script, table in _lakefs_tables, action file func TestAWSCatalogExport(t *testing.T) { requireBlockstoreType(t, block.BlockstoreTypeS3) var ( @@ -297,7 +301,7 @@ func TestAWSCatalogExport(t *testing.T) { SymlinkScriptPath: "scripts/symlink_exporter.lua", GlueScriptPath: "scripts/glue_exporter.lua", TableDescriptorPath: "_lakefs_tables/animals.yaml", - GlueDB: "export-hooks-esti", // TODO(isan) make variable + GlueDB: viper.GetString("glue_export_hooks_database"), AccessKeyId: viper.GetString("aws_access_key_id"), SecretAccessKey: viper.GetString("aws_secret_access_key"), TableSpec: &hiveTableSpec{ diff --git a/pkg/testutil/setup.go b/pkg/testutil/setup.go index 900e53356f2..01ab58f3330 100644 --- a/pkg/testutil/setup.go +++ b/pkg/testutil/setup.go @@ -54,10 +54,10 @@ func SetupTestingEnv(params *SetupTestingEnvParams) (logging.Logger, apigen.Clie if err != nil { logger.WithError(err).Fatal("Failed to get CWD") } + viper.SetDefault("glue_export_hooks_database", "export-hooks-esti") viper.SetDefault("lakectl_dir", filepath.Join(currDir, "..")) viper.SetDefault("azure_storage_account", "") viper.SetDefault("azure_storage_access_key", "") - err = viper.ReadInConfig() if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) { logger.WithError(err).Fatal("Failed to read configuration") From b35911f11f802bdf82cbb85d1a71af4f123d540b Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Thu, 19 Oct 2023 14:49:57 +0300 Subject: [PATCH 09/10] fix all review comments --- esti/catalog_export_test.go | 163 +++++++----------- .../_lakefs_actions/glue_export.yaml | 28 +++ .../_lakefs_actions/symlink_export.yaml | 15 ++ .../scripts/glue_exporter.lua | 5 + .../scripts/symlink_exporter.lua | 6 + pkg/testutil/setup.go | 1 + 6 files changed, 117 insertions(+), 101 deletions(-) create mode 100644 esti/export_hooks_files/_lakefs_actions/glue_export.yaml create mode 100644 esti/export_hooks_files/_lakefs_actions/symlink_export.yaml create mode 100644 esti/export_hooks_files/scripts/glue_exporter.lua create mode 100644 esti/export_hooks_files/scripts/symlink_exporter.lua diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go index e55d145adb6..38a4dd8a5bf 100644 --- a/esti/catalog_export_test.go +++ b/esti/catalog_export_test.go @@ -3,8 +3,12 @@ package esti import ( "bytes" "context" + "embed" + "encoding/csv" + "errors" "fmt" "io" + "io/fs" "net/http" "net/url" "strings" @@ -25,9 +29,13 @@ import ( "github.com/treeverse/lakefs/pkg/api/apiutil" "github.com/treeverse/lakefs/pkg/block" "github.com/treeverse/lakefs/pkg/testutil" + "golang.org/x/exp/slices" "gopkg.in/yaml.v3" ) +//go:embed export_hooks_files/*/* +var exportHooksFiles embed.FS + type schemaField struct { Name string `yaml:"name"` Type string `yaml:"type"` @@ -48,83 +56,23 @@ type hiveTableSpec struct { } type exportHooksTestData struct { + SymlinkActionPath string SymlinkScriptPath string TableDescriptorPath string + GlueActionPath string GlueScriptPath string Branch string GlueDB string AccessKeyId string SecretAccessKey string + Region string OverrideCommitID string TableSpec *hiveTableSpec } -const glueExportScript = ` -local aws = require("aws") -local exporter = require("lakefs/catalogexport/glue_exporter") -action.commit_id = "{{ .OverrideCommitID }}" -- override commit id to use specific symlink file previously created -local glue = aws.glue_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region) -exporter.export_glue(glue, args.catalog.db_name, args.table_source, args.catalog.table_input, action, {debug=true}) -` - -const glueExporterAction = ` -name: Glue Exporter -on: - post-commit: - branches: ["{{ .Branch }}*"] -hooks: - - id: glue_exporter - type: lua - properties: - script_path: "{{ .GlueScriptPath }}" - args: - aws: - aws_access_key_id: "{{ .AccessKeyId }}" - aws_secret_access_key: "{{ .SecretAccessKey }}" - aws_region: us-east-1 - table_source: '{{ .TableDescriptorPath }}' - catalog: - db_name: "{{ .GlueDB }}" - table_input: - StorageDescriptor: - InputFormat: "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat" - OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" - SerdeInfo: - SerializationLibrary: "org.apache.hadoop.hive.serde2.OpenCSVSerde" - Parameters: - separatorChar: "," - Parameters: - classification: "csv" - "skip.header.line.count": "1"` - -const symlinkExporterScript = ` -local exporter = require("lakefs/catalogexport/symlink_exporter") -local aws = require("aws") -local table_path = args.table_source -local s3 = aws.s3_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region) -exporter.export_s3(s3, table_path, action, {debug=true}) -` - -const symlinkExporterAction = ` -name: Symlink S3 Exporter -on: - post-commit: - branches: ["{{ .Branch }}*"] -hooks: - - id: symlink_exporter - type: lua - properties: - script_path: "{{ .SymlinkScriptPath }}" - args: - aws: - aws_access_key_id: "{{ .AccessKeyId }}" - aws_secret_access_key: "{{ .SecretAccessKey }}" - aws_region: us-east-1 - table_source: '{{ .TableDescriptorPath }}'` - -func renderTplAsStr(t *testing.T, tplData any, name, content string) string { +func renderTplFileAsStr(t *testing.T, tplData any, rootDir fs.FS, path string) string { t.Helper() - tpl, err := template.New(name).Parse(content) + tpl, err := template.ParseFS(rootDir, path) require.NoError(t, err, "rendering template") var doc bytes.Buffer err = tpl.Execute(&doc, tplData) @@ -132,9 +80,9 @@ func renderTplAsStr(t *testing.T, tplData any, name, content string) string { return doc.String() } -func setupGlueClient(ctx context.Context, accessKeyID, secretAccessKey string) (*glue.Client, error) { +func setupGlueClient(ctx context.Context, accessKeyID, secretAccessKey, region string) (*glue.Client, error) { cfg, err := config.LoadDefaultConfig(ctx, - config.WithRegion("us-east-1"), + config.WithRegion(region), config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKeyID, secretAccessKey, "")), ) if err != nil { @@ -160,25 +108,29 @@ func uploadAndCommitObjects(t *testing.T, ctx context.Context, repo, branch stri require.Equal(t, http.StatusCreated, commitResp.StatusCode()) return commitResp.JSON201 } -func genCSVData(cols []string, n int) string { - csvBlob := "" - // generate header - for _, c := range cols { - csvBlob += c + "," - } - csvBlob = strings.TrimSuffix(csvBlob, ",") + "\n" - // generate rows + +// genCSVData will create n+rows (with header) each cell value is rowNum+colNum +func genCSVData(columns []string, n int) string { + // Create a new CSV writer. + buf := new(bytes.Buffer) + writer := csv.NewWriter(buf) + // Write the header row. + headerRow := columns + writer.Write(headerRow) + for rowNum := 0; rowNum < n; rowNum++ { - row := "" - for i := range cols { - row += fmt.Sprintf("%d,", rowNum+i) + dataRow := []string{} + for colNum := range columns { + dataRow = append(dataRow, fmt.Sprintf("%d", rowNum+colNum)) } - csvBlob += strings.TrimSuffix(row, ",") + "\n" + writer.Write(dataRow) } - return csvBlob + writer.Flush() + csvContent := buf.String() + return csvContent } -func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tablePaths map[string]string, testData *exportHooksTestData) (string, string) { +func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tmplDir fs.FS, tablePaths map[string]string, testData *exportHooksTestData) (string, string) { t.Helper() tableYaml, err := yaml.Marshal(&testData.TableSpec) @@ -188,17 +140,13 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table } hookFiles := map[string]string{ - testData.SymlinkScriptPath: symlinkExporterScript, - testData.TableDescriptorPath: renderTplAsStr(t, testData.TableSpec, "table", string(tableYaml)), + testData.SymlinkScriptPath: renderTplFileAsStr(t, testData, tmplDir, testData.SymlinkScriptPath), + testData.TableDescriptorPath: string(tableYaml), + testData.SymlinkActionPath: renderTplFileAsStr(t, testData, tmplDir, testData.SymlinkActionPath), } - // upload table objects and action script - uploadAndCommitObjects(t, ctx, repo, mainBranch, hookFiles, tablePaths) - - // upload action - commit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{ - "_lakefs_actions/animals_symlink.yaml": renderTplAsStr(t, testData, "action", symlinkExporterAction), - }) + // upload all files (hook, lua, table data) + commit := uploadAndCommitObjects(t, ctx, repo, mainBranch, tablePaths, hookFiles) // wait until actions finish running runs := waitForListRepositoryRunsLen(ctx, t, repo, commit.Id, 1) @@ -232,7 +180,7 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table }) require.NoErrorf(t, err, "listing symlink files in storage: %s", symlinksPrefix) if len(listResp.Contents) == 0 { - return fmt.Errorf("no objects found") + return errors.New("no symlink files found in blockstore") } for _, f := range listResp.Contents { symlinkLocations = append(symlinkLocations, aws.ToString(f.Key)) @@ -287,7 +235,18 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table // Symlinks export: lua script, table in _lakefs_tables, action file, mock table data in CSV form // Glue export: lua script, table in _lakefs_tables, action file func TestAWSCatalogExport(t *testing.T) { + // skip if blockstore is not not s3 requireBlockstoreType(t, block.BlockstoreTypeS3) + // skip of the following args are not provided + accessKeyID := viper.GetString("aws_access_key_id") + secretKeyID := viper.GetString("aws_secret_access_key") + glueDB := viper.GetString("glue_export_hooks_database") + glueRegion := viper.GetString("glue_export_region") + requiredArgs := []string{accessKeyID, secretKeyID, glueDB, glueRegion} + if slices.Contains(requiredArgs, "") { + t.Skip("One of the required Args empty") + } + var ( glueTable *types.Table commitID string @@ -296,14 +255,18 @@ func TestAWSCatalogExport(t *testing.T) { ctx, _, repo := setupTest(t) defer tearDownTest(repo) + tmplDir, _ := fs.Sub(exportHooksFiles, "export_hooks_files") testData := &exportHooksTestData{ Branch: mainBranch, + SymlinkActionPath: "_lakefs_actions/symlink_export.yaml", SymlinkScriptPath: "scripts/symlink_exporter.lua", GlueScriptPath: "scripts/glue_exporter.lua", + GlueActionPath: "_lakefs_actions/glue_export.yaml", TableDescriptorPath: "_lakefs_tables/animals.yaml", - GlueDB: viper.GetString("glue_export_hooks_database"), - AccessKeyId: viper.GetString("aws_access_key_id"), - SecretAccessKey: viper.GetString("aws_secret_access_key"), + GlueDB: glueDB, + Region: glueDB, + AccessKeyId: accessKeyID, + SecretAccessKey: secretKeyID, TableSpec: &hiveTableSpec{ Name: "animals", Type: "hive", @@ -333,18 +296,15 @@ func TestAWSCatalogExport(t *testing.T) { testData.TableSpec.Path + "/type=dog/weight=10/b.csv": genCSVData(columns, 3), testData.TableSpec.Path + "/type=dog/weight=10/a.csv": genCSVData(columns, 3), } - commitID, symlinkPrefix = testSymlinkS3Exporter(t, ctx, repo, tablePaths, testData) + commitID, symlinkPrefix = testSymlinkS3Exporter(t, ctx, repo, tmplDir, tablePaths, testData) t.Logf("commit id %s symlinks prefix %s", commitID, symlinkPrefix) }) t.Run("glue_exporter", func(t *testing.T) { // override commit ID to make the export table point to the previous commit of data testData.OverrideCommitID = commitID - luaScript := renderTplAsStr(t, testData, "export_script", glueExportScript) - actionFileBlob := renderTplAsStr(t, testData, "export_action", glueExporterAction) - headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{ - testData.GlueScriptPath: luaScript, - "_lakefs_actions/glue_export.yaml": actionFileBlob, + testData.GlueScriptPath: renderTplFileAsStr(t, testData, tmplDir, testData.GlueScriptPath), + testData.GlueActionPath: renderTplFileAsStr(t, testData, tmplDir, testData.GlueActionPath), }) // wait for action to finish @@ -353,7 +313,7 @@ func TestAWSCatalogExport(t *testing.T) { // create glue client - glueClient, err := setupGlueClient(ctx, testData.AccessKeyId, testData.SecretAccessKey) + glueClient, err := setupGlueClient(ctx, testData.AccessKeyId, testData.SecretAccessKey, testData.Region) require.NoError(t, err, "creating glue client") // wait until table is ready @@ -414,5 +374,6 @@ func TestAWSCatalogExport(t *testing.T) { require.Equal(t, "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat", aws.ToString(glueTable.StorageDescriptor.InputFormat), "wrong table input format") require.Equal(t, "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", aws.ToString(glueTable.StorageDescriptor.OutputFormat), "wrong table output format") require.Equal(t, symlinkPrefix, aws.ToString(glueTable.StorageDescriptor.Location)+"/", "wrong s3 location in glue table") + require.Equal(t, "matilda", *glueTable.Name, "oopsie") }) } diff --git a/esti/export_hooks_files/_lakefs_actions/glue_export.yaml b/esti/export_hooks_files/_lakefs_actions/glue_export.yaml new file mode 100644 index 00000000000..2ce5964bc42 --- /dev/null +++ b/esti/export_hooks_files/_lakefs_actions/glue_export.yaml @@ -0,0 +1,28 @@ +name: Glue Exporter +on: + post-commit: + branches: ["{{ .Branch }}*"] +hooks: + - id: glue_exporter + type: lua + properties: + script_path: "{{ .GlueScriptPath }}" + args: + aws: + aws_access_key_id: "{{ .AccessKeyId }}" + aws_secret_access_key: "{{ .SecretAccessKey }}" + aws_region: "{{ .Region }}" + table_source: '{{ .TableDescriptorPath }}' + catalog: + db_name: "{{ .GlueDB }}" + table_input: + StorageDescriptor: + InputFormat: "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat" + OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" + SerdeInfo: + SerializationLibrary: "org.apache.hadoop.hive.serde2.OpenCSVSerde" + Parameters: + separatorChar: "," + Parameters: + classification: "csv" + "skip.header.line.count": "1" \ No newline at end of file diff --git a/esti/export_hooks_files/_lakefs_actions/symlink_export.yaml b/esti/export_hooks_files/_lakefs_actions/symlink_export.yaml new file mode 100644 index 00000000000..220d5ba8bfa --- /dev/null +++ b/esti/export_hooks_files/_lakefs_actions/symlink_export.yaml @@ -0,0 +1,15 @@ +name: Symlink S3 Exporter +on: + post-commit: + branches: ["{{ .Branch }}*"] +hooks: + - id: symlink_exporter + type: lua + properties: + script_path: "{{ .SymlinkScriptPath }}" + args: + aws: + aws_access_key_id: "{{ .AccessKeyId }}" + aws_secret_access_key: "{{ .SecretAccessKey }}" + aws_region: "{{ .Region }}" + table_source: '{{ .TableDescriptorPath }}' \ No newline at end of file diff --git a/esti/export_hooks_files/scripts/glue_exporter.lua b/esti/export_hooks_files/scripts/glue_exporter.lua new file mode 100644 index 00000000000..4ad77919f3d --- /dev/null +++ b/esti/export_hooks_files/scripts/glue_exporter.lua @@ -0,0 +1,5 @@ +local aws = require("aws") +local exporter = require("lakefs/catalogexport/glue_exporter") +action.commit_id = "{{ .OverrideCommitID }}" -- override commit id to use specific symlink file previously created +local glue = aws.glue_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region) +exporter.export_glue(glue, args.catalog.db_name, args.table_source, args.catalog.table_input, action, {debug=true}) \ No newline at end of file diff --git a/esti/export_hooks_files/scripts/symlink_exporter.lua b/esti/export_hooks_files/scripts/symlink_exporter.lua new file mode 100644 index 00000000000..7064d2dd2a0 --- /dev/null +++ b/esti/export_hooks_files/scripts/symlink_exporter.lua @@ -0,0 +1,6 @@ + +local exporter = require("lakefs/catalogexport/symlink_exporter") +local aws = require("aws") +local table_path = args.table_source +local s3 = aws.s3_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region) +exporter.export_s3(s3, table_path, action, {debug=true}) \ No newline at end of file diff --git a/pkg/testutil/setup.go b/pkg/testutil/setup.go index 01ab58f3330..a1f50a75004 100644 --- a/pkg/testutil/setup.go +++ b/pkg/testutil/setup.go @@ -55,6 +55,7 @@ func SetupTestingEnv(params *SetupTestingEnvParams) (logging.Logger, apigen.Clie logger.WithError(err).Fatal("Failed to get CWD") } viper.SetDefault("glue_export_hooks_database", "export-hooks-esti") + viper.SetDefault("glue_export_region", "us-east-1") viper.SetDefault("lakectl_dir", filepath.Join(currDir, "..")) viper.SetDefault("azure_storage_account", "") viper.SetDefault("azure_storage_access_key", "") From 14b4ac2ddd13196c59af958a00cbdbb60d117103 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Thu, 19 Oct 2023 15:05:40 +0300 Subject: [PATCH 10/10] prep for merge --- esti/catalog_export_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go index 38a4dd8a5bf..9642324ff81 100644 --- a/esti/catalog_export_test.go +++ b/esti/catalog_export_test.go @@ -239,10 +239,10 @@ func TestAWSCatalogExport(t *testing.T) { requireBlockstoreType(t, block.BlockstoreTypeS3) // skip of the following args are not provided accessKeyID := viper.GetString("aws_access_key_id") - secretKeyID := viper.GetString("aws_secret_access_key") + secretAccessKey := viper.GetString("aws_secret_access_key") glueDB := viper.GetString("glue_export_hooks_database") glueRegion := viper.GetString("glue_export_region") - requiredArgs := []string{accessKeyID, secretKeyID, glueDB, glueRegion} + requiredArgs := []string{accessKeyID, secretAccessKey, glueDB, glueRegion} if slices.Contains(requiredArgs, "") { t.Skip("One of the required Args empty") } @@ -264,9 +264,9 @@ func TestAWSCatalogExport(t *testing.T) { GlueActionPath: "_lakefs_actions/glue_export.yaml", TableDescriptorPath: "_lakefs_tables/animals.yaml", GlueDB: glueDB, - Region: glueDB, + Region: glueRegion, AccessKeyId: accessKeyID, - SecretAccessKey: secretKeyID, + SecretAccessKey: secretAccessKey, TableSpec: &hiveTableSpec{ Name: "animals", Type: "hive", @@ -374,6 +374,5 @@ func TestAWSCatalogExport(t *testing.T) { require.Equal(t, "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat", aws.ToString(glueTable.StorageDescriptor.InputFormat), "wrong table input format") require.Equal(t, "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", aws.ToString(glueTable.StorageDescriptor.OutputFormat), "wrong table output format") require.Equal(t, symlinkPrefix, aws.ToString(glueTable.StorageDescriptor.Location)+"/", "wrong s3 location in glue table") - require.Equal(t, "matilda", *glueTable.Name, "oopsie") }) }