Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table Extractor Hook and _lakefs_tables format #6589

Merged
merged 30 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f51a49e
initial commit
Isan-Rivkin Sep 12, 2023
22c67d1
load lua libs
Isan-Rivkin Sep 12, 2023
a44293e
working table extraction
Isan-Rivkin Sep 12, 2023
a6bec34
fix linter issues
Isan-Rivkin Sep 13, 2023
8755a70
load embedded files dynamicly (#6592)
nopcoder Sep 13, 2023
0327daa
change paths
Isan-Rivkin Sep 13, 2023
3096211
Merge branch 'master' into 6573-table-extractor
Isan-Rivkin Sep 13, 2023
83ef166
fix short digest and paging parameters
Isan-Rivkin Sep 13, 2023
4f02c4d
fix review
Isan-Rivkin Sep 13, 2023
9892648
clean
Isan-Rivkin Sep 13, 2023
ac5978f
Merge branch 'master' into 6573-table-extractor
Isan-Rivkin Sep 13, 2023
e1d8d03
review comments
Isan-Rivkin Sep 14, 2023
72dc0d1
update review
Isan-Rivkin Sep 14, 2023
ef891ae
refactor tables
Isan-Rivkin Sep 14, 2023
9dbb36f
Extractor
Isan-Rivkin Sep 14, 2023
0429e67
review comments
Isan-Rivkin Sep 14, 2023
ce76e29
fix review comments
Isan-Rivkin Sep 18, 2023
f4c3a96
fix iterator
Isan-Rivkin Sep 18, 2023
ec5ddd6
fix review
Isan-Rivkin Sep 18, 2023
c85c426
minor change
Isan-Rivkin Sep 18, 2023
eb21f45
update whiletrue
Isan-Rivkin Sep 18, 2023
77bd52d
update final
Isan-Rivkin Sep 19, 2023
bdd7c2b
use short_digest default
Isan-Rivkin Sep 19, 2023
82fde2f
fix page size param
Isan-Rivkin Sep 19, 2023
be77969
fix review
Isan-Rivkin Sep 19, 2023
9f1de19
fix
Isan-Rivkin Sep 19, 2023
7e80978
minor fix
Isan-Rivkin Sep 19, 2023
e4e55bd
remove nesting
Isan-Rivkin Sep 19, 2023
f31675d
convert to local
Isan-Rivkin Sep 19, 2023
cbc6ee0
align head
Isan-Rivkin Sep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/actions/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func NewLuaHook(h ActionHook, action *Action, cfg Config, e *http.Server) (Hook,
Args: args,
}, nil
} else if !errors.Is(err, errMissingKey) {
// 'script' was provided but is empty or of the wrong type..
// 'script' was provided but is empty or of the wrong type.
return nil, err
}

Expand Down
68 changes: 68 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/hive.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
local pathlib = require("path")
local utils = require("lakefs/catalogexport/internal/utils")

-- extract partition prefix from full path
function extract_partitions_path(partitions, path)
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
local idx = 0
for _, partition in ipairs(partitions) do
local col_substr = partition .. "="
local i, j = string.find(path, col_substr, idx)
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
if i == nil then
return nil
end
local start_val, end_val = string.find(path, "/", j+1)
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
idx = end_val
end
return string.sub(path, 1, idx)
end

-- Hive format partition iterator each result set is a collection of files under the same partition
function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. reorder the arguments to enable default values - page_size for example, if we do not pass it to the call we should use the default
  2. think we can drop the lakefs_ prefix or just keep partition_pager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. its already re-ordered, you're not rebased
  2. i prefer to keep the prefix explicit since there are going to be iterations around tables and objects in the export destination as well.

local prefix = base_path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you call it prefix - change the argument name and not reassign

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you push the fix? there is still base_path and prefix

local target_partition = ""
local pager = utils.api.lakefs_object_pager(client, repo_id, commit_id, "", prefix, page_size, "")
local page = pager()
return function()
if page == nil then
return nil
end
local partition_entries = {}
while(true) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while(true) do
while true do

if #page == 0 then
page = pager()
if page == nil then -- no more records
return target_partition, partition_entries
end
end
local entry = page[1]
if not pathlib.is_hidden(entry.path) then
local partition_key = extract_partitions_path(partition_cols, entry.path)
-- first time: if not set, assign current object partition as the target_partition key
if target_partition == "" then
target_partition = partition_key
end
-- break if current entry does not belong to the target_partition
if partition_key ~= target_partition then
local partition_result = target_partition
target_partition = partition_key
return partition_result, partition_entries
end
table.insert(partition_entries, {
physical_address = entry.physical_address,
path = entry.path,
size = entry.size_bytes,
checksum = entry.checksum
})
-- remove entry only if its part of the current partition
table.remove(page, 1)
end
end
end
end

return {
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
TableExtractor = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the package level - why do we need TableExtractor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to group it under extractor because there is also export functionality that is right now missing but will be added there and with that more functionality related to extraction.

DEFAULT_PAGE_SIZE_PARTITION = 30,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. no need to expose this one - we should use it in case we do not provide page_size for the call
  2. we can just expose the pager or short version of the function:
local hive = require('lakefs/catalogexport/hive')
hive.partition_pager(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't exist anymore rebase

lakefs_hive_partition_pager=lakefs_hive_partition_pager,
}
}
52 changes: 52 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/internal/utils.lua
Copy link
Contributor

@nopcoder nopcoder Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not plan to add more internal stuff - I think temporary internal.lua under catalogexport is enough

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file is under internal path - do we plan to keep internal folder?

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
local DEFAULT_SHORT_DIGEST_LEN=6

function short_digest(digest, len)
return digest:sub(1, len)
end

-- paginate lakefs api
function lakefs_paginiated_api(api_call, after)
local next_offset = after
local has_more = true
return function()
if not has_more then
return nil
end
local code, resp = api_call(next_offset)
if code < 200 or code >= 300 then
error("lakeFS: api return non-2xx" .. code)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tostring the code (my bug)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug is still there

end
has_more = resp.pagination.has_more
next_offset = resp.pagination.next_offset
return resp.results
end
end

-- paginage over lakefs objects
function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter)
return lakefs_paginiated_api(function(next_offset)
return lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size)
end, after)
end

-- resolve ref value from action global, used as part of setting default table name
function ref_from_branch_or_tag(action_info)
local event = action_info.event_type
if event == "pre-create-tag" or event == "post-create-tag" then
return action_info.tag_id
elseif event == "pre-create-branch" or event == "post-create-branch" or "post-commit" or "post-merge" then
return action_info.branch_id
else
error("unsupported event type: " .. action_info.event_type)
end
end

return {
DEFAULT_SHORT_DIGEST_LEN=DEFAULT_SHORT_DIGEST_LEN,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to expose this one - you use it internally

short_digest=short_digest,
ref_from_branch_or_tag=ref_from_branch_or_tag,
api = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kiss - why another table level? if you like to prefix all the api related helper with api_ change the function. all these functions already under internal/utils.
which I think it is too much as internal.lua as the catalogexport is enough.

lakefs_object_pager=lakefs_object_pager,
lakefs_paginiated_api=lakefs_paginiated_api,
}
}
54 changes: 54 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/table_extractor.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
local pathlib = require("path")
local strings = require("strings")
local yaml = require("encoding/yaml")
local utils = require("lakefs/catalogexport/internal/utils")
local HiveTableExtractor = require("lakefs/catalogexport/hive")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case we extracted an object I'll say it looks ok - but it is a package with function.
I don't mind you switch it to be an object (as all are tables)


local LAKEFS_TABLES_BASE = "_lakefs_tables/"

-- check if lakefs entry is a table spec under _lakefs_tables/
function is_table_obj(entry, tables_base)
if entry.path_type ~= "object" then
return false
end
local path = entry.path
if strings.has_prefix(path, tables_base) then
-- remove _lakefs_tables/ from path
path = entry.path:sub(#tables_base, #path)
end
return not pathlib.is_hidden(path) and strings.has_suffix(path, ".yaml")
end

-- list all YAML files under _lakefs_tables/*
function list_table_descriptor_files(client, repo_id, commit_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest rename files with something else, as we usually reference them as objects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe entries instead of files as we return an entry with logical and physical location

local table_entries = {}
local page_size = 30
local iter = utils.api.lakefs_object_pager(client, repo_id, commit_id, "", LAKEFS_TABLES_BASE, page_size, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pager

for entries in iter do
for _, entry in ipairs(entries) do
if is_table_obj(entry, LAKEFS_TABLES_BASE) then
table.insert(table_entries, {
physical_address = entry.physical_address,
path = entry.path
})
end
end
end
return table_entries
end

-- table as parsed YAML object
function get_table_descriptor(client, repo_id, commit_id, logical_path)
local code, content = client.get_object(repo_id, commit_id, logical_path)
if code ~= 200 then
error("could not fetch data file: HTTP " .. tostring(code))
end
local descriptor = yaml.unmarshal(content)
descriptor.partition_columns = descriptor.partition_columns or {}
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
return descriptor
end

return {
list_table_descriptor_files = list_table_descriptor_files,
get_table_descriptor = get_table_descriptor,
}
117 changes: 106 additions & 11 deletions pkg/actions/lua/load.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
package lua

import (
"bytes"
"embed"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"

"github.com/Shopify/go-lua"
"github.com/hashicorp/go-multierror"
)

const (
pathListSeparator = ';'
defaultPath = "./?.lua"
defaultPath = "?.lua"
)

//go:embed lakefs/catalogexport/*.lua lakefs/catalogexport/internal/*.lua
var luaEmbeddedCode embed.FS

var ErrNoFile = errors.New("no file")

func findLoader(l *lua.State, name string) {
var msg string
if l.Field(lua.UpValueIndex(1), "searchers"); !l.IsTable(3) {
Expand All @@ -35,6 +45,61 @@ func findLoader(l *lua.State, name string) {
}
}

func findFile(l *lua.State, name, field, dirSep string) (string, error) {
l.Field(lua.UpValueIndex(1), field)
path, ok := l.ToString(-1)
if !ok {
lua.Errorf(l, "'package.%s' must be a string", field)
}
return searchPath(name, path, ".", dirSep)
}

func checkLoad(l *lua.State, loaded bool, fileName string) int {
if loaded { // Module loaded successfully?
l.PushString(fileName) // Second argument to module.
return 2 // Return open function & file name.
}
m := lua.CheckString(l, 1)
e := lua.CheckString(l, -1)
lua.Errorf(l, "error loading module '%s' from file '%s':\n\t%s", m, fileName, e)
panic("unreachable")
}

func searcherLua(l *lua.State) int {
name := lua.CheckString(l, 1)
filename, err := findFile(l, name, "path", string(filepath.Separator))
if err != nil {
return 1 // Module isn't found in this path.
}

return checkLoad(l, loadFile(l, filename, "") == nil, filename)
}

func loadFile(l *lua.State, fileName, mode string) error {
fileNameIndex := l.Top() + 1
fileError := func(what string) error {
fileName, _ := l.ToString(fileNameIndex)
l.PushFString("cannot %s %s", what, fileName[1:])
l.Remove(fileNameIndex)
return lua.FileError
}
l.PushString("@" + fileName)
data, err := luaEmbeddedCode.ReadFile(fileName)
if err != nil {
return fileError("open")
}
s, _ := l.ToString(-1)
err = l.Load(bytes.NewReader(data), s, mode)
switch {
case err == nil, errors.Is(err, lua.SyntaxError), errors.Is(err, lua.MemoryError): // do nothing
default:
l.SetTop(fileNameIndex)
return fileError("read")
}
l.Remove(fileNameIndex)
return err
}

func searcherPreload(l *lua.State) int {
name := lua.CheckString(l, 1)
l.Field(lua.RegistryIndex, "_PRELOAD")
Expand All @@ -46,7 +111,7 @@ func searcherPreload(l *lua.State) int {
}

func createSearchersTable(l *lua.State) {
searchers := []lua.Function{searcherPreload}
searchers := []lua.Function{searcherPreload, searcherLua}
l.CreateTable(len(searchers), 0)
for i, s := range searchers {
l.PushValue(-2)
Expand All @@ -55,6 +120,33 @@ func createSearchersTable(l *lua.State) {
}
}

func searchPath(name, path, sep, dirSep string) (string, error) {
var err error
if sep != "" {
name = strings.ReplaceAll(name, sep, dirSep) // Replace sep by dirSep.
}
path = strings.ReplaceAll(path, string(pathListSeparator), string(filepath.ListSeparator))
for _, template := range filepath.SplitList(path) {
if template == "" {
continue
}
filename := strings.ReplaceAll(template, "?", name)
if readable(filename) {
return filename, nil
}
err = multierror.Append(err, fmt.Errorf("%w %s", ErrNoFile, filename))
}
return "", err
}

func readable(name string) bool {
if !fs.ValidPath(name) {
return false
}
info, err := fs.Stat(luaEmbeddedCode, name)
return err == nil && !info.IsDir()
}

func noEnv(l *lua.State) bool {
l.Field(lua.RegistryIndex, "LUA_NOENV")
b := l.ToBoolean(-1)
Expand Down Expand Up @@ -84,15 +176,18 @@ var packageLibrary = []lua.RegistryFunction{
return 3 // Return nil, error message, and where.
}},
{Name: "searchpath", Function: func(l *lua.State) int {
_ = lua.CheckString(l, 1)
_ = lua.CheckString(l, 2)
_ = lua.OptString(l, 3, ".")
_ = lua.OptString(l, 4, string(filepath.Separator))

l.PushNil()
l.PushString("searchpath not enabled; check your Lua installation")
l.PushString("absent")
return 3 // Return nil, error message, and where.
name := lua.CheckString(l, 1)
path := lua.CheckString(l, 2)
sep := lua.OptString(l, 3, ".")
dirSep := lua.OptString(l, 4, string(filepath.Separator))
f, err := searchPath(name, path, sep, dirSep)
if err != nil {
l.PushNil()
l.PushString(err.Error())
return 2
}
l.PushString(f)
return 1
}},
}

Expand Down
Loading