From e036ddf7aeef222f461965f0d0fcd14d84b70248 Mon Sep 17 00:00:00 2001 From: isan_rivkin Date: Tue, 19 Sep 2023 17:10:14 +0300 Subject: [PATCH] Table Extractor Hook and _lakefs_tables format (#6589) --- pkg/actions/lua.go | 2 +- pkg/actions/lua/lakefs/catalogexport/hive.lua | 78 ++++++++++++ .../lua/lakefs/catalogexport/internal.lua | 49 ++++++++ .../lakefs/catalogexport/table_extractor.lua | 53 ++++++++ pkg/actions/lua/load.go | 117 ++++++++++++++++-- 5 files changed, 287 insertions(+), 12 deletions(-) create mode 100644 pkg/actions/lua/lakefs/catalogexport/hive.lua create mode 100644 pkg/actions/lua/lakefs/catalogexport/internal.lua create mode 100644 pkg/actions/lua/lakefs/catalogexport/table_extractor.lua diff --git a/pkg/actions/lua.go b/pkg/actions/lua.go index 984c766c9c5..383813b2725 100644 --- a/pkg/actions/lua.go +++ b/pkg/actions/lua.go @@ -209,7 +209,7 @@ func NewLuaHook(h ActionHook, action *Action, cfg Config, e *http.Server) (Hook, Args: args, }, nil } else if !errors.Is(err, errMissingKey) { - // 'script' was provided but is empty or of the wrong type.. + // 'script' was provided but is empty or of the wrong type. return nil, err } diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua new file mode 100644 index 00000000000..f628c63d157 --- /dev/null +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -0,0 +1,78 @@ +local pathlib = require("path") +local utils = require("lakefs/catalogexport/internal") +local strings = require("strings") +local DEFAULT_PAGE_SIZE = 30 + +-- extract partition prefix from full path +local function extract_partitions_path(partitions, path) + if partitions == nil or #partitions == 0 then + return "" + end + local idx = 1 + local is_partition_prefix = strings.has_prefix(path, partitions[1]) + for part_idx, partition in ipairs(partitions) do + local col_substr = "/" .. partition .. "=" + -- if partition is the path prefix and we are the that first partition remove / + if part_idx == 1 and is_partition_prefix then + col_substr = partition .. "=" + end + local i, j = string.find(path, col_substr, idx) + if i == nil then + return nil + end + local separator_idx = string.find(path, "/", j+1) + -- verify / found and there is something in between = ... / + if separator_idx == nil or separator_idx <= (j + 1) then + return nil + end + idx = separator_idx + end + return string.sub(path, 1, idx) +end + +-- Hive format partition iterator each result set is a collection of files under the same partition +local function extract_partition_pager(client, repo_id, commit_id, base_path, partition_cols, page_size) + local target_partition = "" + local pager = utils.lakefs_object_pager(client, repo_id, commit_id, "", base_path,"", page_size or DEFAULT_PAGE_SIZE) + local page = pager() + return function() + if page == nil then + return nil + end + local partition_entries = {} + while true do + if #page == 0 then + page = pager() + if page == nil then -- no more records + return target_partition, partition_entries + end + end + local entry = page[1] + if not pathlib.is_hidden(entry.path) then + local partition_key = extract_partitions_path(partition_cols, entry.path) + -- first time: if not set, assign current object partition as the target_partition key + if target_partition == "" then + target_partition = partition_key + end + -- break if current entry does not belong to the target_partition + if partition_key ~= target_partition then + local partition_result = target_partition + target_partition = partition_key + return partition_result, partition_entries + end + table.insert(partition_entries, { + physical_address = entry.physical_address, + path = entry.path, + size = entry.size_bytes, + checksum = entry.checksum + }) + -- remove entry only if its part of the current partition + table.remove(page, 1) + end + end + end +end + +return { + extract_partition_pager=extract_partition_pager, +} \ No newline at end of file diff --git a/pkg/actions/lua/lakefs/catalogexport/internal.lua b/pkg/actions/lua/lakefs/catalogexport/internal.lua new file mode 100644 index 00000000000..c33c0eef1c2 --- /dev/null +++ b/pkg/actions/lua/lakefs/catalogexport/internal.lua @@ -0,0 +1,49 @@ +local DEFAULT_SHORT_DIGEST_LEN=6 + +local function short_digest(digest, len) + return digest:sub(1, len or DEFAULT_SHORT_DIGEST_LEN) +end + +-- paginate lakefs api +local function lakefs_paginiated_api(api_call, after) + local next_offset = after + local has_more = true + return function() + if not has_more then + return nil + end + local code, resp = api_call(next_offset) + if code < 200 or code >= 300 then + error("lakeFS: api return non-2xx" .. tostring(code)) + end + has_more = resp.pagination.has_more + next_offset = resp.pagination.next_offset + return resp.results + end +end + +-- paginage over lakefs objects +local function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, delimiter, page_size) + return lakefs_paginiated_api(function(next_offset) + return lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size or 30) + end, after) +end + +-- resolve ref value from action global, used as part of setting default table name +local function ref_from_branch_or_tag(action_info) + local event = action_info.event_type + if event == "pre-create-tag" or event == "post-create-tag" then + return action_info.tag_id + elseif event == "pre-create-branch" or event == "post-create-branch" or "post-commit" or "post-merge" then + return action_info.branch_id + else + error("unsupported event type: " .. action_info.event_type) + end +end + +return { + short_digest=short_digest, + ref_from_branch_or_tag=ref_from_branch_or_tag, + lakefs_object_pager=lakefs_object_pager, + lakefs_paginiated_api=lakefs_paginiated_api, +} \ No newline at end of file diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua new file mode 100644 index 00000000000..c57592adbb9 --- /dev/null +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -0,0 +1,53 @@ +local pathlib = require("path") +local strings = require("strings") +local yaml = require("encoding/yaml") +local utils = require("lakefs/catalogexport/internal") + +local LAKEFS_TABLES_BASE = "_lakefs_tables/" + +-- check if lakefs entry is a table spec under _lakefs_tables/ +local function is_table_obj(entry, tables_base) + if entry.path_type ~= "object" then + return false + end + local path = entry.path + if strings.has_prefix(path, tables_base) then + -- remove _lakefs_tables/ from path + path = entry.path:sub(#tables_base, #path) + end + return not pathlib.is_hidden(path) and strings.has_suffix(path, ".yaml") +end + +-- list all YAML files under _lakefs_tables/* +local function list_table_descriptor_entries(client, repo_id, commit_id) + local table_entries = {} + local page_size = 30 + local pager = utils.lakefs_object_pager(client, repo_id, commit_id, "", LAKEFS_TABLES_BASE,"", page_size) + for entries in pager do + for _, entry in ipairs(entries) do + if is_table_obj(entry, LAKEFS_TABLES_BASE) then + table.insert(table_entries, { + physical_address = entry.physical_address, + path = entry.path + }) + end + end + end + return table_entries +end + +-- table as parsed YAML object +local function get_table_descriptor(client, repo_id, commit_id, logical_path) + local code, content = client.get_object(repo_id, commit_id, logical_path) + if code ~= 200 then + error("could not fetch data file: HTTP " .. tostring(code)) + end + local descriptor = yaml.unmarshal(content) + descriptor.partition_columns = descriptor.partition_columns or {} + return descriptor +end + +return { + list_table_descriptor_entries = list_table_descriptor_entries, + get_table_descriptor = get_table_descriptor, +} \ No newline at end of file diff --git a/pkg/actions/lua/load.go b/pkg/actions/lua/load.go index 29e4932ee13..2ec07ba0e88 100644 --- a/pkg/actions/lua/load.go +++ b/pkg/actions/lua/load.go @@ -1,19 +1,29 @@ package lua import ( + "bytes" + "embed" + "errors" "fmt" + "io/fs" "os" "path/filepath" "strings" "github.com/Shopify/go-lua" + "github.com/hashicorp/go-multierror" ) const ( pathListSeparator = ';' - defaultPath = "./?.lua" + defaultPath = "?.lua" ) +//go:embed lakefs/catalogexport/*.lua +var luaEmbeddedCode embed.FS + +var ErrNoFile = errors.New("no file") + func findLoader(l *lua.State, name string) { var msg string if l.Field(lua.UpValueIndex(1), "searchers"); !l.IsTable(3) { @@ -35,6 +45,61 @@ func findLoader(l *lua.State, name string) { } } +func findFile(l *lua.State, name, field, dirSep string) (string, error) { + l.Field(lua.UpValueIndex(1), field) + path, ok := l.ToString(-1) + if !ok { + lua.Errorf(l, "'package.%s' must be a string", field) + } + return searchPath(name, path, ".", dirSep) +} + +func checkLoad(l *lua.State, loaded bool, fileName string) int { + if loaded { // Module loaded successfully? + l.PushString(fileName) // Second argument to module. + return 2 // Return open function & file name. + } + m := lua.CheckString(l, 1) + e := lua.CheckString(l, -1) + lua.Errorf(l, "error loading module '%s' from file '%s':\n\t%s", m, fileName, e) + panic("unreachable") +} + +func searcherLua(l *lua.State) int { + name := lua.CheckString(l, 1) + filename, err := findFile(l, name, "path", string(filepath.Separator)) + if err != nil { + return 1 // Module isn't found in this path. + } + + return checkLoad(l, loadFile(l, filename, "") == nil, filename) +} + +func loadFile(l *lua.State, fileName, mode string) error { + fileNameIndex := l.Top() + 1 + fileError := func(what string) error { + fileName, _ := l.ToString(fileNameIndex) + l.PushFString("cannot %s %s", what, fileName[1:]) + l.Remove(fileNameIndex) + return lua.FileError + } + l.PushString("@" + fileName) + data, err := luaEmbeddedCode.ReadFile(fileName) + if err != nil { + return fileError("open") + } + s, _ := l.ToString(-1) + err = l.Load(bytes.NewReader(data), s, mode) + switch { + case err == nil, errors.Is(err, lua.SyntaxError), errors.Is(err, lua.MemoryError): // do nothing + default: + l.SetTop(fileNameIndex) + return fileError("read") + } + l.Remove(fileNameIndex) + return err +} + func searcherPreload(l *lua.State) int { name := lua.CheckString(l, 1) l.Field(lua.RegistryIndex, "_PRELOAD") @@ -46,7 +111,7 @@ func searcherPreload(l *lua.State) int { } func createSearchersTable(l *lua.State) { - searchers := []lua.Function{searcherPreload} + searchers := []lua.Function{searcherPreload, searcherLua} l.CreateTable(len(searchers), 0) for i, s := range searchers { l.PushValue(-2) @@ -55,6 +120,33 @@ func createSearchersTable(l *lua.State) { } } +func searchPath(name, path, sep, dirSep string) (string, error) { + var err error + if sep != "" { + name = strings.ReplaceAll(name, sep, dirSep) // Replace sep by dirSep. + } + path = strings.ReplaceAll(path, string(pathListSeparator), string(filepath.ListSeparator)) + for _, template := range filepath.SplitList(path) { + if template == "" { + continue + } + filename := strings.ReplaceAll(template, "?", name) + if readable(filename) { + return filename, nil + } + err = multierror.Append(err, fmt.Errorf("%w %s", ErrNoFile, filename)) + } + return "", err +} + +func readable(name string) bool { + if !fs.ValidPath(name) { + return false + } + info, err := fs.Stat(luaEmbeddedCode, name) + return err == nil && !info.IsDir() +} + func noEnv(l *lua.State) bool { l.Field(lua.RegistryIndex, "LUA_NOENV") b := l.ToBoolean(-1) @@ -84,15 +176,18 @@ var packageLibrary = []lua.RegistryFunction{ return 3 // Return nil, error message, and where. }}, {Name: "searchpath", Function: func(l *lua.State) int { - _ = lua.CheckString(l, 1) - _ = lua.CheckString(l, 2) - _ = lua.OptString(l, 3, ".") - _ = lua.OptString(l, 4, string(filepath.Separator)) - - l.PushNil() - l.PushString("searchpath not enabled; check your Lua installation") - l.PushString("absent") - return 3 // Return nil, error message, and where. + name := lua.CheckString(l, 1) + path := lua.CheckString(l, 2) + sep := lua.OptString(l, 3, ".") + dirSep := lua.OptString(l, 4, string(filepath.Separator)) + f, err := searchPath(name, path, sep, dirSep) + if err != nil { + l.PushNil() + l.PushString(err.Error()) + return 2 + } + l.PushString(f) + return 1 }}, }