Skip to content

Commit

Permalink
fix all review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Isan-Rivkin committed Oct 19, 2023
1 parent e6d2f3a commit b35911f
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 101 deletions.
163 changes: 62 additions & 101 deletions esti/catalog_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package esti
import (
"bytes"
"context"
"embed"
"encoding/csv"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"net/url"
"strings"
Expand All @@ -25,9 +29,13 @@ import (
"github.com/treeverse/lakefs/pkg/api/apiutil"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/testutil"
"golang.org/x/exp/slices"
"gopkg.in/yaml.v3"
)

//go:embed export_hooks_files/*/*
var exportHooksFiles embed.FS

type schemaField struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
Expand All @@ -48,93 +56,33 @@ type hiveTableSpec struct {
}

type exportHooksTestData struct {
SymlinkActionPath string
SymlinkScriptPath string
TableDescriptorPath string
GlueActionPath string
GlueScriptPath string
Branch string
GlueDB string
AccessKeyId string
SecretAccessKey string
Region string
OverrideCommitID string
TableSpec *hiveTableSpec
}

const glueExportScript = `
local aws = require("aws")
local exporter = require("lakefs/catalogexport/glue_exporter")
action.commit_id = "{{ .OverrideCommitID }}" -- override commit id to use specific symlink file previously created
local glue = aws.glue_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region)
exporter.export_glue(glue, args.catalog.db_name, args.table_source, args.catalog.table_input, action, {debug=true})
`

const glueExporterAction = `
name: Glue Exporter
on:
post-commit:
branches: ["{{ .Branch }}*"]
hooks:
- id: glue_exporter
type: lua
properties:
script_path: "{{ .GlueScriptPath }}"
args:
aws:
aws_access_key_id: "{{ .AccessKeyId }}"
aws_secret_access_key: "{{ .SecretAccessKey }}"
aws_region: us-east-1
table_source: '{{ .TableDescriptorPath }}'
catalog:
db_name: "{{ .GlueDB }}"
table_input:
StorageDescriptor:
InputFormat: "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat"
OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
SerdeInfo:
SerializationLibrary: "org.apache.hadoop.hive.serde2.OpenCSVSerde"
Parameters:
separatorChar: ","
Parameters:
classification: "csv"
"skip.header.line.count": "1"`

const symlinkExporterScript = `
local exporter = require("lakefs/catalogexport/symlink_exporter")
local aws = require("aws")
local table_path = args.table_source
local s3 = aws.s3_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region)
exporter.export_s3(s3, table_path, action, {debug=true})
`

const symlinkExporterAction = `
name: Symlink S3 Exporter
on:
post-commit:
branches: ["{{ .Branch }}*"]
hooks:
- id: symlink_exporter
type: lua
properties:
script_path: "{{ .SymlinkScriptPath }}"
args:
aws:
aws_access_key_id: "{{ .AccessKeyId }}"
aws_secret_access_key: "{{ .SecretAccessKey }}"
aws_region: us-east-1
table_source: '{{ .TableDescriptorPath }}'`

func renderTplAsStr(t *testing.T, tplData any, name, content string) string {
func renderTplFileAsStr(t *testing.T, tplData any, rootDir fs.FS, path string) string {
t.Helper()
tpl, err := template.New(name).Parse(content)
tpl, err := template.ParseFS(rootDir, path)
require.NoError(t, err, "rendering template")
var doc bytes.Buffer
err = tpl.Execute(&doc, tplData)
require.NoError(t, err)
return doc.String()
}

func setupGlueClient(ctx context.Context, accessKeyID, secretAccessKey string) (*glue.Client, error) {
func setupGlueClient(ctx context.Context, accessKeyID, secretAccessKey, region string) (*glue.Client, error) {
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion("us-east-1"),
config.WithRegion(region),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKeyID, secretAccessKey, "")),
)
if err != nil {
Expand All @@ -160,25 +108,29 @@ func uploadAndCommitObjects(t *testing.T, ctx context.Context, repo, branch stri
require.Equal(t, http.StatusCreated, commitResp.StatusCode())
return commitResp.JSON201
}
func genCSVData(cols []string, n int) string {
csvBlob := ""
// generate header
for _, c := range cols {
csvBlob += c + ","
}
csvBlob = strings.TrimSuffix(csvBlob, ",") + "\n"
// generate rows

// genCSVData will create n+rows (with header) each cell value is rowNum+colNum
func genCSVData(columns []string, n int) string {
// Create a new CSV writer.
buf := new(bytes.Buffer)
writer := csv.NewWriter(buf)
// Write the header row.
headerRow := columns
writer.Write(headerRow)

for rowNum := 0; rowNum < n; rowNum++ {
row := ""
for i := range cols {
row += fmt.Sprintf("%d,", rowNum+i)
dataRow := []string{}
for colNum := range columns {
dataRow = append(dataRow, fmt.Sprintf("%d", rowNum+colNum))
}
csvBlob += strings.TrimSuffix(row, ",") + "\n"
writer.Write(dataRow)
}
return csvBlob
writer.Flush()
csvContent := buf.String()
return csvContent
}

func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tablePaths map[string]string, testData *exportHooksTestData) (string, string) {
func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, tmplDir fs.FS, tablePaths map[string]string, testData *exportHooksTestData) (string, string) {
t.Helper()

tableYaml, err := yaml.Marshal(&testData.TableSpec)
Expand All @@ -188,17 +140,13 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table
}

hookFiles := map[string]string{
testData.SymlinkScriptPath: symlinkExporterScript,
testData.TableDescriptorPath: renderTplAsStr(t, testData.TableSpec, "table", string(tableYaml)),
testData.SymlinkScriptPath: renderTplFileAsStr(t, testData, tmplDir, testData.SymlinkScriptPath),
testData.TableDescriptorPath: string(tableYaml),
testData.SymlinkActionPath: renderTplFileAsStr(t, testData, tmplDir, testData.SymlinkActionPath),
}

// upload table objects and action script
uploadAndCommitObjects(t, ctx, repo, mainBranch, hookFiles, tablePaths)

// upload action
commit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{
"_lakefs_actions/animals_symlink.yaml": renderTplAsStr(t, testData, "action", symlinkExporterAction),
})
// upload all files (hook, lua, table data)
commit := uploadAndCommitObjects(t, ctx, repo, mainBranch, tablePaths, hookFiles)

// wait until actions finish running
runs := waitForListRepositoryRunsLen(ctx, t, repo, commit.Id, 1)
Expand Down Expand Up @@ -232,7 +180,7 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table
})
require.NoErrorf(t, err, "listing symlink files in storage: %s", symlinksPrefix)
if len(listResp.Contents) == 0 {
return fmt.Errorf("no objects found")
return errors.New("no symlink files found in blockstore")
}
for _, f := range listResp.Contents {
symlinkLocations = append(symlinkLocations, aws.ToString(f.Key))
Expand Down Expand Up @@ -287,7 +235,18 @@ func testSymlinkS3Exporter(t *testing.T, ctx context.Context, repo string, table
// Symlinks export: lua script, table in _lakefs_tables, action file, mock table data in CSV form
// Glue export: lua script, table in _lakefs_tables, action file
func TestAWSCatalogExport(t *testing.T) {
// skip if blockstore is not not s3
requireBlockstoreType(t, block.BlockstoreTypeS3)
// skip of the following args are not provided
accessKeyID := viper.GetString("aws_access_key_id")
secretKeyID := viper.GetString("aws_secret_access_key")
glueDB := viper.GetString("glue_export_hooks_database")
glueRegion := viper.GetString("glue_export_region")
requiredArgs := []string{accessKeyID, secretKeyID, glueDB, glueRegion}
if slices.Contains(requiredArgs, "") {
t.Skip("One of the required Args empty")
}

var (
glueTable *types.Table
commitID string
Expand All @@ -296,14 +255,18 @@ func TestAWSCatalogExport(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

tmplDir, _ := fs.Sub(exportHooksFiles, "export_hooks_files")
testData := &exportHooksTestData{
Branch: mainBranch,
SymlinkActionPath: "_lakefs_actions/symlink_export.yaml",
SymlinkScriptPath: "scripts/symlink_exporter.lua",
GlueScriptPath: "scripts/glue_exporter.lua",
GlueActionPath: "_lakefs_actions/glue_export.yaml",
TableDescriptorPath: "_lakefs_tables/animals.yaml",
GlueDB: viper.GetString("glue_export_hooks_database"),
AccessKeyId: viper.GetString("aws_access_key_id"),
SecretAccessKey: viper.GetString("aws_secret_access_key"),
GlueDB: glueDB,
Region: glueDB,
AccessKeyId: accessKeyID,
SecretAccessKey: secretKeyID,
TableSpec: &hiveTableSpec{
Name: "animals",
Type: "hive",
Expand Down Expand Up @@ -333,18 +296,15 @@ func TestAWSCatalogExport(t *testing.T) {
testData.TableSpec.Path + "/type=dog/weight=10/b.csv": genCSVData(columns, 3),
testData.TableSpec.Path + "/type=dog/weight=10/a.csv": genCSVData(columns, 3),
}
commitID, symlinkPrefix = testSymlinkS3Exporter(t, ctx, repo, tablePaths, testData)
commitID, symlinkPrefix = testSymlinkS3Exporter(t, ctx, repo, tmplDir, tablePaths, testData)
t.Logf("commit id %s symlinks prefix %s", commitID, symlinkPrefix)
})
t.Run("glue_exporter", func(t *testing.T) {
// override commit ID to make the export table point to the previous commit of data
testData.OverrideCommitID = commitID
luaScript := renderTplAsStr(t, testData, "export_script", glueExportScript)
actionFileBlob := renderTplAsStr(t, testData, "export_action", glueExporterAction)

headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{
testData.GlueScriptPath: luaScript,
"_lakefs_actions/glue_export.yaml": actionFileBlob,
testData.GlueScriptPath: renderTplFileAsStr(t, testData, tmplDir, testData.GlueScriptPath),
testData.GlueActionPath: renderTplFileAsStr(t, testData, tmplDir, testData.GlueActionPath),
})

// wait for action to finish
Expand All @@ -353,7 +313,7 @@ func TestAWSCatalogExport(t *testing.T) {

// create glue client

glueClient, err := setupGlueClient(ctx, testData.AccessKeyId, testData.SecretAccessKey)
glueClient, err := setupGlueClient(ctx, testData.AccessKeyId, testData.SecretAccessKey, testData.Region)
require.NoError(t, err, "creating glue client")

// wait until table is ready
Expand Down Expand Up @@ -414,5 +374,6 @@ func TestAWSCatalogExport(t *testing.T) {
require.Equal(t, "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat", aws.ToString(glueTable.StorageDescriptor.InputFormat), "wrong table input format")
require.Equal(t, "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", aws.ToString(glueTable.StorageDescriptor.OutputFormat), "wrong table output format")
require.Equal(t, symlinkPrefix, aws.ToString(glueTable.StorageDescriptor.Location)+"/", "wrong s3 location in glue table")
require.Equal(t, "matilda", *glueTable.Name, "oopsie")
})
}
28 changes: 28 additions & 0 deletions esti/export_hooks_files/_lakefs_actions/glue_export.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Glue Exporter
on:
post-commit:
branches: ["{{ .Branch }}*"]
hooks:
- id: glue_exporter
type: lua
properties:
script_path: "{{ .GlueScriptPath }}"
args:
aws:
aws_access_key_id: "{{ .AccessKeyId }}"
aws_secret_access_key: "{{ .SecretAccessKey }}"
aws_region: "{{ .Region }}"
table_source: '{{ .TableDescriptorPath }}'
catalog:
db_name: "{{ .GlueDB }}"
table_input:
StorageDescriptor:
InputFormat: "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat"
OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
SerdeInfo:
SerializationLibrary: "org.apache.hadoop.hive.serde2.OpenCSVSerde"
Parameters:
separatorChar: ","
Parameters:
classification: "csv"
"skip.header.line.count": "1"
15 changes: 15 additions & 0 deletions esti/export_hooks_files/_lakefs_actions/symlink_export.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: Symlink S3 Exporter
on:
post-commit:
branches: ["{{ .Branch }}*"]
hooks:
- id: symlink_exporter
type: lua
properties:
script_path: "{{ .SymlinkScriptPath }}"
args:
aws:
aws_access_key_id: "{{ .AccessKeyId }}"
aws_secret_access_key: "{{ .SecretAccessKey }}"
aws_region: "{{ .Region }}"
table_source: '{{ .TableDescriptorPath }}'
5 changes: 5 additions & 0 deletions esti/export_hooks_files/scripts/glue_exporter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
local aws = require("aws")
local exporter = require("lakefs/catalogexport/glue_exporter")
action.commit_id = "{{ .OverrideCommitID }}" -- override commit id to use specific symlink file previously created
local glue = aws.glue_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region)
exporter.export_glue(glue, args.catalog.db_name, args.table_source, args.catalog.table_input, action, {debug=true})
6 changes: 6 additions & 0 deletions esti/export_hooks_files/scripts/symlink_exporter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

local exporter = require("lakefs/catalogexport/symlink_exporter")
local aws = require("aws")
local table_path = args.table_source
local s3 = aws.s3_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region)
exporter.export_s3(s3, table_path, action, {debug=true})
1 change: 1 addition & 0 deletions pkg/testutil/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func SetupTestingEnv(params *SetupTestingEnvParams) (logging.Logger, apigen.Clie
logger.WithError(err).Fatal("Failed to get CWD")
}
viper.SetDefault("glue_export_hooks_database", "export-hooks-esti")
viper.SetDefault("glue_export_region", "us-east-1")
viper.SetDefault("lakectl_dir", filepath.Join(currDir, ".."))
viper.SetDefault("azure_storage_account", "")
viper.SetDefault("azure_storage_access_key", "")
Expand Down

0 comments on commit b35911f

Please sign in to comment.