diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go index e55d145adb6..38a4dd8a5bf 100644 --- a/esti/catalog_export_test.go +++ b/esti/catalog_export_test.go @@ -3,8 +3,12 @@ package esti import ( "bytes" "context" + "embed" + "encoding/csv" + "errors" "fmt" "io" + "io/fs" "net/http" "net/url" "strings" @@ -25,9 +29,13 @@ import ( "github.com/treeverse/lakefs/pkg/api/apiutil" "github.com/treeverse/lakefs/pkg/block" "github.com/treeverse/lakefs/pkg/testutil" + "golang.org/x/exp/slices" "gopkg.in/yaml.v3" ) +//go:embed export_hooks_files/*/* +var exportHooksFiles embed.FS + type schemaField struct { Name string `yaml:"name"` Type string `yaml:"type"` @@ -48,83 +56,23 @@ type hiveTableSpec struct { } type exportHooksTestData struct { + SymlinkActionPath string SymlinkScriptPath string TableDescriptorPath string + GlueActionPath string GlueScriptPath string Branch string GlueDB string AccessKeyId string SecretAccessKey string + Region string OverrideCommitID string TableSpec *hiveTableSpec } -const glueExportScript = ` -local aws = require("aws") -local exporter = require("lakefs/catalogexport/glue_exporter") -action.commit_id = "{{ .OverrideCommitID }}" -- override commit id to use specific symlink file previously created -local glue = aws.glue_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region) -exporter.export_glue(glue, args.catalog.db_name, args.table_source, args.catalog.table_input, action, {debug=true}) -` - -const glueExporterAction = ` -name: Glue Exporter -on: - post-commit: - branches: ["{{ .Branch }}*"] -hooks: - - id: glue_exporter - type: lua - properties: - script_path: "{{ .GlueScriptPath }}" - args: - aws: - aws_access_key_id: "{{ .AccessKeyId }}" - aws_secret_access_key: "{{ .SecretAccessKey }}" - aws_region: us-east-1 - table_source: '{{ .TableDescriptorPath }}' - catalog: - db_name: "{{ .GlueDB }}" - table_input: - StorageDescriptor: - InputFormat: "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat" - OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" - SerdeInfo: - SerializationLibrary: "org.apache.hadoop.hive.serde2.OpenCSVSerde" - Parameters: - separatorChar: "," - Parameters: - classification: "csv" - "skip.header.line.count": "1"` - -const symlinkExporterScript = ` -local exporter = require("lakefs/catalogexport/symlink_exporter") -local aws = require("aws") -local table_path = args.table_source -local s3 = aws.s3_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region) -exporter.export_s3(s3, table_path, action, {debug=true}) -` - -const symlinkExporterAction = ` -name: Symlink S3 Exporter -on: - post-commit: - branches: ["{{ .Branch }}*"] -hooks: - - id: symlink_exporter - type: lua - properties: - script_path: "{{ .SymlinkScriptPath }}" - args: - aws: - aws_access_key_id: "{{ .AccessKeyId }}" - aws_secret_access_key: "{{ .SecretAccessKey }}" - aws_region: us-east-1 - table_source: '{{ .TableDescriptorPath }}'` - -func renderTplAsStr(t *testing.T, tplData any, name, content string) string { +func renderTplFileAsStr(t *testing.T, tplData any, rootDir fs.FS, path string) string { t.Helper() - tpl, err := template.New(name).Parse(content) + tpl, err := template.ParseFS(rootDir, path) require.NoError(t, err, "rendering template") var doc bytes.Buffer err = tpl.Execute(&doc, tplData) @@ -132,9 +80,9 @@ func renderTplAsStr(t *testing.T, tplData any, name, content string) string { return doc.String() } -func setupGlueClient(ctx context.Context, accessKeyID, secretAccessKey string) (*glue.Client, error) { +func setupGlueClient(ctx context.Context, accessKeyID, secretAccessKey, region string) (*glue.Client, error) { cfg, err := config.LoadDefaultConfig(ctx, - config.WithRegion("us-east-1"), + config.WithRegion(region), config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKeyID, secretAccessKey, "")), ) if err != nil { @@ -160,25 +108,29 @@ func uploadAndCommitObjects(t *testing.T, ctx context.Context, repo, branch stri require.Equal(t, http.StatusCreated, commitResp.StatusCode()) return commitResp.JSON201 } -func genCSVData(cols []string, n int) string { - csvBlob := "" - // generate header - for _, c := range cols { - csvBlob += c + "," - } - csvBlob = strings.TrimSuffix(csvBlob, ",") + "\n" - // generate rows + +// genCSVData will create n+rows (with header) each cell value is rowNum+colNum +func genCSVData(columns []string, n int) string { + // Create a new CSV writer. + buf := new(bytes.Buffer) + writer := csv.NewWriter(buf) + // Write the header row. + headerRow := columns + writer.Write(headerRow) + for rowNum := 0; rowNum < n; rowNum++ { - row := "" - for i := range cols { - row += fmt.Sprintf("%d,", rowNum+i) + dataRow := []string{} + for colNum := range columns { + dataRow = append(dataRow, fmt.Sprintf("%d", rowNum+colNum)) } - csvBlob += strings.TrimSuffix(row, ",") + "\n" + writer.Write(dataRow) } - return csvBlob + writer.Flush() + csvContent := buf.String() + return csvContent } -func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tablePaths map[string]string, testData *exportHooksTestData) (string, string) { +func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tmplDir fs.FS, tablePaths map[string]string, testData *exportHooksTestData) (string, string) { t.Helper() tableYaml, err := yaml.Marshal(&testData.TableSpec) @@ -188,17 +140,13 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table } hookFiles := map[string]string{ - testData.SymlinkScriptPath: symlinkExporterScript, - testData.TableDescriptorPath: renderTplAsStr(t, testData.TableSpec, "table", string(tableYaml)), + testData.SymlinkScriptPath: renderTplFileAsStr(t, testData, tmplDir, testData.SymlinkScriptPath), + testData.TableDescriptorPath: string(tableYaml), + testData.SymlinkActionPath: renderTplFileAsStr(t, testData, tmplDir, testData.SymlinkActionPath), } - // upload table objects and action script - uploadAndCommitObjects(t, ctx, repo, mainBranch, hookFiles, tablePaths) - - // upload action - commit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{ - "_lakefs_actions/animals_symlink.yaml": renderTplAsStr(t, testData, "action", symlinkExporterAction), - }) + // upload all files (hook, lua, table data) + commit := uploadAndCommitObjects(t, ctx, repo, mainBranch, tablePaths, hookFiles) // wait until actions finish running runs := waitForListRepositoryRunsLen(ctx, t, repo, commit.Id, 1) @@ -232,7 +180,7 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table }) require.NoErrorf(t, err, "listing symlink files in storage: %s", symlinksPrefix) if len(listResp.Contents) == 0 { - return fmt.Errorf("no objects found") + return errors.New("no symlink files found in blockstore") } for _, f := range listResp.Contents { symlinkLocations = append(symlinkLocations, aws.ToString(f.Key)) @@ -287,7 +235,18 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table // Symlinks export: lua script, table in _lakefs_tables, action file, mock table data in CSV form // Glue export: lua script, table in _lakefs_tables, action file func TestAWSCatalogExport(t *testing.T) { + // skip if blockstore is not not s3 requireBlockstoreType(t, block.BlockstoreTypeS3) + // skip of the following args are not provided + accessKeyID := viper.GetString("aws_access_key_id") + secretKeyID := viper.GetString("aws_secret_access_key") + glueDB := viper.GetString("glue_export_hooks_database") + glueRegion := viper.GetString("glue_export_region") + requiredArgs := []string{accessKeyID, secretKeyID, glueDB, glueRegion} + if slices.Contains(requiredArgs, "") { + t.Skip("One of the required Args empty") + } + var ( glueTable *types.Table commitID string @@ -296,14 +255,18 @@ func TestAWSCatalogExport(t *testing.T) { ctx, _, repo := setupTest(t) defer tearDownTest(repo) + tmplDir, _ := fs.Sub(exportHooksFiles, "export_hooks_files") testData := &exportHooksTestData{ Branch: mainBranch, + SymlinkActionPath: "_lakefs_actions/symlink_export.yaml", SymlinkScriptPath: "scripts/symlink_exporter.lua", GlueScriptPath: "scripts/glue_exporter.lua", + GlueActionPath: "_lakefs_actions/glue_export.yaml", TableDescriptorPath: "_lakefs_tables/animals.yaml", - GlueDB: viper.GetString("glue_export_hooks_database"), - AccessKeyId: viper.GetString("aws_access_key_id"), - SecretAccessKey: viper.GetString("aws_secret_access_key"), + GlueDB: glueDB, + Region: glueDB, + AccessKeyId: accessKeyID, + SecretAccessKey: secretKeyID, TableSpec: &hiveTableSpec{ Name: "animals", Type: "hive", @@ -333,18 +296,15 @@ func TestAWSCatalogExport(t *testing.T) { testData.TableSpec.Path + "/type=dog/weight=10/b.csv": genCSVData(columns, 3), testData.TableSpec.Path + "/type=dog/weight=10/a.csv": genCSVData(columns, 3), } - commitID, symlinkPrefix = testSymlinkS3Exporter(t, ctx, repo, tablePaths, testData) + commitID, symlinkPrefix = testSymlinkS3Exporter(t, ctx, repo, tmplDir, tablePaths, testData) t.Logf("commit id %s symlinks prefix %s", commitID, symlinkPrefix) }) t.Run("glue_exporter", func(t *testing.T) { // override commit ID to make the export table point to the previous commit of data testData.OverrideCommitID = commitID - luaScript := renderTplAsStr(t, testData, "export_script", glueExportScript) - actionFileBlob := renderTplAsStr(t, testData, "export_action", glueExporterAction) - headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{ - testData.GlueScriptPath: luaScript, - "_lakefs_actions/glue_export.yaml": actionFileBlob, + testData.GlueScriptPath: renderTplFileAsStr(t, testData, tmplDir, testData.GlueScriptPath), + testData.GlueActionPath: renderTplFileAsStr(t, testData, tmplDir, testData.GlueActionPath), }) // wait for action to finish @@ -353,7 +313,7 @@ func TestAWSCatalogExport(t *testing.T) { // create glue client - glueClient, err := setupGlueClient(ctx, testData.AccessKeyId, testData.SecretAccessKey) + glueClient, err := setupGlueClient(ctx, testData.AccessKeyId, testData.SecretAccessKey, testData.Region) require.NoError(t, err, "creating glue client") // wait until table is ready @@ -414,5 +374,6 @@ func TestAWSCatalogExport(t *testing.T) { require.Equal(t, "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat", aws.ToString(glueTable.StorageDescriptor.InputFormat), "wrong table input format") require.Equal(t, "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", aws.ToString(glueTable.StorageDescriptor.OutputFormat), "wrong table output format") require.Equal(t, symlinkPrefix, aws.ToString(glueTable.StorageDescriptor.Location)+"/", "wrong s3 location in glue table") + require.Equal(t, "matilda", *glueTable.Name, "oopsie") }) } diff --git a/esti/export_hooks_files/_lakefs_actions/glue_export.yaml b/esti/export_hooks_files/_lakefs_actions/glue_export.yaml new file mode 100644 index 00000000000..2ce5964bc42 --- /dev/null +++ b/esti/export_hooks_files/_lakefs_actions/glue_export.yaml @@ -0,0 +1,28 @@ +name: Glue Exporter +on: + post-commit: + branches: ["{{ .Branch }}*"] +hooks: + - id: glue_exporter + type: lua + properties: + script_path: "{{ .GlueScriptPath }}" + args: + aws: + aws_access_key_id: "{{ .AccessKeyId }}" + aws_secret_access_key: "{{ .SecretAccessKey }}" + aws_region: "{{ .Region }}" + table_source: '{{ .TableDescriptorPath }}' + catalog: + db_name: "{{ .GlueDB }}" + table_input: + StorageDescriptor: + InputFormat: "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat" + OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" + SerdeInfo: + SerializationLibrary: "org.apache.hadoop.hive.serde2.OpenCSVSerde" + Parameters: + separatorChar: "," + Parameters: + classification: "csv" + "skip.header.line.count": "1" \ No newline at end of file diff --git a/esti/export_hooks_files/_lakefs_actions/symlink_export.yaml b/esti/export_hooks_files/_lakefs_actions/symlink_export.yaml new file mode 100644 index 00000000000..220d5ba8bfa --- /dev/null +++ b/esti/export_hooks_files/_lakefs_actions/symlink_export.yaml @@ -0,0 +1,15 @@ +name: Symlink S3 Exporter +on: + post-commit: + branches: ["{{ .Branch }}*"] +hooks: + - id: symlink_exporter + type: lua + properties: + script_path: "{{ .SymlinkScriptPath }}" + args: + aws: + aws_access_key_id: "{{ .AccessKeyId }}" + aws_secret_access_key: "{{ .SecretAccessKey }}" + aws_region: "{{ .Region }}" + table_source: '{{ .TableDescriptorPath }}' \ No newline at end of file diff --git a/esti/export_hooks_files/scripts/glue_exporter.lua b/esti/export_hooks_files/scripts/glue_exporter.lua new file mode 100644 index 00000000000..4ad77919f3d --- /dev/null +++ b/esti/export_hooks_files/scripts/glue_exporter.lua @@ -0,0 +1,5 @@ +local aws = require("aws") +local exporter = require("lakefs/catalogexport/glue_exporter") +action.commit_id = "{{ .OverrideCommitID }}" -- override commit id to use specific symlink file previously created +local glue = aws.glue_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region) +exporter.export_glue(glue, args.catalog.db_name, args.table_source, args.catalog.table_input, action, {debug=true}) \ No newline at end of file diff --git a/esti/export_hooks_files/scripts/symlink_exporter.lua b/esti/export_hooks_files/scripts/symlink_exporter.lua new file mode 100644 index 00000000000..7064d2dd2a0 --- /dev/null +++ b/esti/export_hooks_files/scripts/symlink_exporter.lua @@ -0,0 +1,6 @@ + +local exporter = require("lakefs/catalogexport/symlink_exporter") +local aws = require("aws") +local table_path = args.table_source +local s3 = aws.s3_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region) +exporter.export_s3(s3, table_path, action, {debug=true}) \ No newline at end of file diff --git a/pkg/testutil/setup.go b/pkg/testutil/setup.go index 01ab58f3330..a1f50a75004 100644 --- a/pkg/testutil/setup.go +++ b/pkg/testutil/setup.go @@ -55,6 +55,7 @@ func SetupTestingEnv(params *SetupTestingEnvParams) (logging.Logger, apigen.Clie logger.WithError(err).Fatal("Failed to get CWD") } viper.SetDefault("glue_export_hooks_database", "export-hooks-esti") + viper.SetDefault("glue_export_region", "us-east-1") viper.SetDefault("lakectl_dir", filepath.Join(currDir, "..")) viper.SetDefault("azure_storage_account", "") viper.SetDefault("azure_storage_access_key", "")