From f51a49e871f0de7f59516ab3960f6a0385c86c81 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 12 Sep 2023 17:57:37 +0300 Subject: [PATCH 01/27] initial commit --- .../lua/lakefs/catalog_export/common.lua | 24 ++ pkg/actions/lua/lakefs/catalog_export/lib.lua | 24 ++ .../lakefs/catalog_export/table_extractor.lua | 290 ++++++++++++++++++ 3 files changed, 338 insertions(+) create mode 100644 pkg/actions/lua/lakefs/catalog_export/common.lua create mode 100644 pkg/actions/lua/lakefs/catalog_export/lib.lua create mode 100644 pkg/actions/lua/lakefs/catalog_export/table_extractor.lua diff --git a/pkg/actions/lua/lakefs/catalog_export/common.lua b/pkg/actions/lua/lakefs/catalog_export/common.lua new file mode 100644 index 00000000000..859bba9053b --- /dev/null +++ b/pkg/actions/lua/lakefs/catalog_export/common.lua @@ -0,0 +1,24 @@ + +function lakefs_object_it(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter) + local next_offset = after + local has_more = true + return function() + if not has_more then + return nil + end + local code, resp = lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size) + if code ~= 200 then + -- TODO(isan) return error to the caller + error("lakeFS: could not list objects in: " .. prefix .. ", error: " .. resp.message) + end + local objects = resp.results + has_more = resp.pagination.has_more + next_offset = resp.pagination.next_offset + return objects + end +end + +return { + lakefs_object_it=lakefs_object_it, + SHORT_DIGEST_LEN=6, +} \ No newline at end of file diff --git a/pkg/actions/lua/lakefs/catalog_export/lib.lua b/pkg/actions/lua/lakefs/catalog_export/lib.lua new file mode 100644 index 00000000000..466fa1fea7e --- /dev/null +++ b/pkg/actions/lua/lakefs/catalog_export/lib.lua @@ -0,0 +1,24 @@ +local extractor = require("lakefs/catalog_export/table_extractor") +local common = require("lakefs/catalog_export/common") + +-- resolve ref value from action.action.event_type +function ref_from_branch_or_tag() + tag_events = { ["pre-create-tag"] = true, ["post-create-tag"] = true } + branch_events = { ["pre-create-branch"] = true, ["post-create-branch"] = true } + commit_events = { ["post-commit"] = true, ["post-merge"] = true } + local ref + if tag_events[action.event_type] then + return action.tag_id, nil + elseif branch_events[action.event_type] or commit_events[action.event_type] then + return action.branch_id, nil + else + return nil, "unsupported event type: " .. action.event_type + end +end + +return { + TableExtractor = extractor.TableExtractor, + ref_from_branch_or_tag=ref_from_branch_or_tag, + lakefs_object_it=common.lakefs_object_it, + lakefs_hive_partition_it=extractor.lakefs_hive_partition_it, +} diff --git a/pkg/actions/lua/lakefs/catalog_export/table_extractor.lua b/pkg/actions/lua/lakefs/catalog_export/table_extractor.lua new file mode 100644 index 00000000000..d246d8df230 --- /dev/null +++ b/pkg/actions/lua/lakefs/catalog_export/table_extractor.lua @@ -0,0 +1,290 @@ +local pathlib = require("path") +local strings = require("strings") +local yaml = require("encoding/yaml") +local success, lakefs = pcall(require, "lakefs") +local json = require("encoding/json") +local common = require("lakefs/catalog_export/common") +-- TDOO(isan) configure somewhere else + +-- TODO(isan) this is for development remove before merging +if success then + print("lakefs module is available") +else + print("Error loading lakefs module:", lakefs) +end + +-- Define the HiveTable class +local HiveTable = {} +HiveTable.__index = HiveTable + +-- Factory function to create new instances +function HiveTable.new(repo_id, ref_id, commit_id, path, name, partition_cols, schema) + local self = setmetatable({}, HiveTable) + self.repo_id = repo_id + self.ref_id = ref_id + self.commit_id = commit_id + self._path = path + self._name = name + self.schema = schema + self.partition_cols = partition_cols + return self +end + +-- Define methods +function HiveTable:name() + return self._name +end + +function HiveTable:size() + return nil +end + +function HiveTable:description() + return string.format('Hive table representation for `lakefs://%s/%s/%s` digest: %s', self.repo_id, self.ref_id, + tostring(self._path), self.commit_id:sub(1, common.SHORT_DIGEST_LEN)) +end + +function HiveTable:path() + return self._path +end + +function HiveTable:min_reader_version() + return 1 +end + +function HiveTable:schema_string() + return json.marshal(self.schema) +end + +function HiveTable:partition_columns() + return self.partition_cols +end + +function HiveTable:version() + return 0 +end + +function HiveTable:_get_partition_values(path) + local vals = {} + splitted_path = strings.split(path, pathlib.default_separator()) + for _, part in pairs(splitted_path) do + for _, col in ipairs(self.partition_cols) do + local prefix = col .. "=" + if strings.has_prefix(part, prefix) then + vals[col] = part:sub(#prefix + 1) + end + end + end + return vals +end + +-- return partition table from path by based on columns in partition_cols +function get_partition_values(partition_cols, path) + local vals = {} + splitted_path = strings.split(path, pathlib.default_separator()) + for _, part in pairs(splitted_path) do + for _, col in ipairs(partition_cols) do + local prefix = col .. "=" + if strings.has_prefix(part, prefix) then + vals[col] = part:sub(#prefix + 1) + end + end + end + return vals +end +-- extract partition substr from full path +function extract_partition_prefix_from_path(partition_cols, path) + local partitions = get_partition_values(partition_cols, path) + local partitions_list = {} + -- iterate partition_cols to maintain order + for _, col in pairs(partition_cols) do + table.insert(partitions_list, col .. "=" .. partitions[col]) + end + local partition_key = table.concat(partitions_list, pathlib.default_separator()) + return partition_key +end + +function lakefs_hive_partition_it(client, repo_id, commit_id, base_path, page_size, delimiter, partition_cols) + local after = "" + local has_more = true + local prefix = base_path + local target_partition = "" + return function() + if not has_more then + return nil + end + local partition_entries = {} + local iter = common.lakefs_object_it(client, repo_id, commit_id, after, prefix, page_size, delimiter) + for entries in iter do + for _, entry in ipairs(entries) do + is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name) + if not is_hidden and entry.path_type == "object" then + local partition_key = extract_partition_prefix_from_path(partition_cols, entry.path) + -- first time: if not set, assign current object partition as the target_partition key + if target_partition == "" then + target_partition = partition_key + end + -- break if current entry does not belong to the target_partition + if partition_key ~= target_partition then + -- next time start searching AFTER the last used key + after = partition_entries[#partition_entries].path + local partition_result = target_partition + target_partition = partition_key + return partition_result, partition_entries + end + table.insert(partition_entries, { + physical_address = entry.physical_address, + path = entry.path, + size = entry.size_bytes, + checksum = entry.checksum + }) + end + end + end + has_more = false + return target_partition, partition_entries + end +end + + + +function HiveTable:list_objs_and_partitions_test() + local has_more = true + local after = "" + local delimiter = "" + local count = 1 + local partitions_to_files = {} + print("searching objects partitions based in base path " .. self._path) + repeat + local code, resp = lakefs.list_objects(self.repo_id, self.commit_id, after, self._path, delimiter) + if code ~= 200 then + -- TODO(isan) return error to the caller + error("lakeFS: could not list tables in: " .. self._path .. ", error: " .. resp.message) + end + for _, entry in ipairs(resp.results) do + is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name) + if not is_hidden and entry.path_type == "object" then + entry_partitions = self:_get_partition_values(entry.path) + -- start mapp + local partitions = {} + for k, v in pairs(entry_partitions) do + table.insert(partitions, k .. "=" .. v) + end + local partition_key = table.concat(partitions, "/") + + if not partitions_to_files[partition_key] then + partitions_to_files[partition_key] = {} + end + + table.insert(partitions_to_files[partition_key], { + physical_address = entry.physical_address, + path = entry.path, + size = entry.size_bytes, + checksum = entry.checksum + }) + -- end map + end + end + -- check if has more pages + has_more = resp.pagination.has_more + after = resp.pagination.next_offset + until not has_more + return partitions_to_files +end + +function HiveTable:list_objects() + local fs = {} + local pre_signed_urls = _signed_urls_by_path(self._lakefs, self.repo_id, self.ref_id, self._path) + for _, s in pairs(pre_signed_urls) do + if string.match(s.path, "%.parquet$") then + local file_proto = { + url = s.physical_address, + id = s.checksum, + entry_partitions = self:_get_partition_values(s.path), + size = s.size_bytes + } + if s.physical_address_expiry then + file_proto.expirationTimestamp = math.floor(s.physical_address_expiry * 1000) + end + table.insert(fs, { + file = file_proto + }) + end + end + return fs +end + +local TableExtractor = {} +TableExtractor.__index = TableExtractor + +function TableExtractor.new(repository_id, ref, commit_id) + local self = setmetatable({}, TableExtractor) + -- TODO(isan) use or remove lakefs client injection + self.lakefs = lakefs + self.tables_registry_base = pathlib.join(pathlib.default_separator(), '_lakefs_tables/') + self.repository_id = repository_id + self.commit_id = commit_id + self.ref = ref + return self +end + +function TableExtractor:_list_objects(repository_id, commit_id, location, handle_entries) + local has_more = true + local after = "" + local delimiter = "" + local count = 1 + repeat + local code, resp = self.lakefs.list_objects(repository_id, commit_id, after, location, delimiter) + -- handle paged entiries + if not handle_entries(code, resp) then + return + end + -- check if has more pages + has_more = resp.pagination.has_more + after = resp.pagination.next_offset + until not has_more +end + +function TableExtractor:list_table_definitions() + local table_entries = {} + self:_list_objects(self.repository_id, self.commit_id, self.tables_registry_base, function(code, resp) + if code ~= 200 then + -- TODO(isan) return error to the caller + error("lakeFS: could not list tables in: " .. self.tables_registry_base .. ", error: " .. resp.message) + end + for _, entry in ipairs(resp.results) do + is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name) + is_yaml = strings.has_suffix(entry.path, ".yaml") + if not is_hidden and is_yaml and entry.path_type == "object" then + table.insert(table_entries, { + physical_address = entry.physical_address, + path = entry.path + }) + end + end + return true + end) + return table_entries +end + +function TableExtractor:get_table(logical_path) + code, content = lakefs.get_object(self.repository_id, self.commit_id, logical_path) + if code ~= 200 then + -- TODO(isan) propagate error to the caller + error("could not fetch data file: HTTP " .. tostring(code)) + end + + descriptor = yaml.unmarshal(content) + -- TODO(isan) implement other tables or handle unsupported + if descriptor.type == 'hive' then + return HiveTable.new(self.repository_id, self.ref, self.commit_id, descriptor.path, descriptor.name, + descriptor.partition_columns or {}, descriptor.schema), nil + end + return nil, "NotImplemented: table type: " .. descriptor.type .. " path: " .. descriptor.path +end + +return { + TableExtractor = TableExtractor, + HiveTable = HiveTable, + lakefs_hive_partition_it = lakefs_hive_partition_it, +} From 22c67d1db2e3eb1131444089f5fbd9d37173e34b Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 12 Sep 2023 18:03:07 +0300 Subject: [PATCH 02/27] load lua libs --- pkg/actions/lua.go | 2 + pkg/actions/lua/lakefs/catalog_export/load.go | 37 +++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 pkg/actions/lua/lakefs/catalog_export/load.go diff --git a/pkg/actions/lua.go b/pkg/actions/lua.go index b64f2e889db..f9e12db02b2 100644 --- a/pkg/actions/lua.go +++ b/pkg/actions/lua.go @@ -13,6 +13,7 @@ import ( "github.com/Shopify/go-lua" lualibs "github.com/treeverse/lakefs/pkg/actions/lua" "github.com/treeverse/lakefs/pkg/actions/lua/lakefs" + catalogexport "github.com/treeverse/lakefs/pkg/actions/lua/lakefs/catalog_export" luautil "github.com/treeverse/lakefs/pkg/actions/lua/util" "github.com/treeverse/lakefs/pkg/auth" "github.com/treeverse/lakefs/pkg/auth/model" @@ -66,6 +67,7 @@ func injectHookContext(l *lua.State, ctx context.Context, user *model.User, endp luautil.DeepPush(l, args) l.SetGlobal("args") lakefs.OpenClient(l, ctx, user, endpoint) + catalogexport.OpenLuaPackage(l, ctx, user, endpoint) } type loggingBuffer struct { diff --git a/pkg/actions/lua/lakefs/catalog_export/load.go b/pkg/actions/lua/lakefs/catalog_export/load.go new file mode 100644 index 00000000000..be9e5254446 --- /dev/null +++ b/pkg/actions/lua/lakefs/catalog_export/load.go @@ -0,0 +1,37 @@ +package catalog_export + +import ( + "context" + "embed" + "io/fs" + "net/http" + + "github.com/Shopify/go-lua" + "github.com/treeverse/lakefs/pkg/auth/model" +) + +//go:embed *.lua +var modulePath embed.FS + +func OpenLuaPackage(l *lua.State, ctx context.Context, user *model.User, server *http.Server) { + loadLuaAsPackage(l, ctx, "lakefs/catalog_export/common", "common.lua", user, server) + loadLuaAsPackage(l, ctx, "lakefs/catalog_export/table_extractor", "table_extractor.lua", user, server) + loadLuaAsPackage(l, ctx, "lakefs/catalog_export", "lib.lua", user, server) +} + +func loadLuaAsPackage(l *lua.State, ctx context.Context, importAlias, scriptName string, user *model.User, server *http.Server) { + lua.Require(l, importAlias, func(l *lua.State) int { + data, err := fs.ReadFile(modulePath, scriptName) + if err != nil { + lua.Errorf(l, err.Error()) + panic("unreachable") + } + if err := lua.DoString(l, string(data)); err != nil { + lua.Errorf(l, err.Error()) + panic("unreachable") + } + return 1 + // TODO(isan) we want it global or not? (the `true`) + }, true) + l.Pop(1) +} From a44293ea603e6120b3650b9d23021c114437a042 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 12 Sep 2023 19:24:16 +0300 Subject: [PATCH 03/27] working table extraction --- pkg/actions/lua.go | 2 +- .../lua/lakefs/catalog_export/common.lua | 2 +- pkg/actions/lua/lakefs/catalog_export/load.go | 16 +- .../lakefs/catalog_export/table_extractor.lua | 231 +++++------------- 4 files changed, 72 insertions(+), 179 deletions(-) diff --git a/pkg/actions/lua.go b/pkg/actions/lua.go index f9e12db02b2..18d35c1185f 100644 --- a/pkg/actions/lua.go +++ b/pkg/actions/lua.go @@ -67,7 +67,7 @@ func injectHookContext(l *lua.State, ctx context.Context, user *model.User, endp luautil.DeepPush(l, args) l.SetGlobal("args") lakefs.OpenClient(l, ctx, user, endpoint) - catalogexport.OpenLuaPackage(l, ctx, user, endpoint) + catalogexport.OpenLuaPackage(l, ctx) } type loggingBuffer struct { diff --git a/pkg/actions/lua/lakefs/catalog_export/common.lua b/pkg/actions/lua/lakefs/catalog_export/common.lua index 859bba9053b..173772659cf 100644 --- a/pkg/actions/lua/lakefs/catalog_export/common.lua +++ b/pkg/actions/lua/lakefs/catalog_export/common.lua @@ -8,7 +8,6 @@ function lakefs_object_it(lakefs_client, repo_id, commit_id, after, prefix, page end local code, resp = lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size) if code ~= 200 then - -- TODO(isan) return error to the caller error("lakeFS: could not list objects in: " .. prefix .. ", error: " .. resp.message) end local objects = resp.results @@ -21,4 +20,5 @@ end return { lakefs_object_it=lakefs_object_it, SHORT_DIGEST_LEN=6, + LAKEFS_DEFAULT_PAGE_SIZE=30, } \ No newline at end of file diff --git a/pkg/actions/lua/lakefs/catalog_export/load.go b/pkg/actions/lua/lakefs/catalog_export/load.go index be9e5254446..68a4de55a2f 100644 --- a/pkg/actions/lua/lakefs/catalog_export/load.go +++ b/pkg/actions/lua/lakefs/catalog_export/load.go @@ -4,22 +4,23 @@ import ( "context" "embed" "io/fs" - "net/http" "github.com/Shopify/go-lua" - "github.com/treeverse/lakefs/pkg/auth/model" ) //go:embed *.lua var modulePath embed.FS -func OpenLuaPackage(l *lua.State, ctx context.Context, user *model.User, server *http.Server) { - loadLuaAsPackage(l, ctx, "lakefs/catalog_export/common", "common.lua", user, server) - loadLuaAsPackage(l, ctx, "lakefs/catalog_export/table_extractor", "table_extractor.lua", user, server) - loadLuaAsPackage(l, ctx, "lakefs/catalog_export", "lib.lua", user, server) +// OpenLuaPackage load lua code as a package in the runtime +func OpenLuaPackage(l *lua.State, ctx context.Context) { + // order here matters each when packages rely on each other + loadLuaAsPackage(l, ctx, "lakefs/catalog_export/common", "common.lua") + loadLuaAsPackage(l, ctx, "lakefs/catalog_export/table_extractor", "table_extractor.lua") + // lib.lua is high level facade for users + loadLuaAsPackage(l, ctx, "lakefs/catalog_export", "lib.lua") } -func loadLuaAsPackage(l *lua.State, ctx context.Context, importAlias, scriptName string, user *model.User, server *http.Server) { +func loadLuaAsPackage(l *lua.State, ctx context.Context, importAlias, scriptName string) { lua.Require(l, importAlias, func(l *lua.State) int { data, err := fs.ReadFile(modulePath, scriptName) if err != nil { @@ -31,7 +32,6 @@ func loadLuaAsPackage(l *lua.State, ctx context.Context, importAlias, scriptName panic("unreachable") } return 1 - // TODO(isan) we want it global or not? (the `true`) }, true) l.Pop(1) } diff --git a/pkg/actions/lua/lakefs/catalog_export/table_extractor.lua b/pkg/actions/lua/lakefs/catalog_export/table_extractor.lua index d246d8df230..a77867f5a83 100644 --- a/pkg/actions/lua/lakefs/catalog_export/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalog_export/table_extractor.lua @@ -1,82 +1,9 @@ local pathlib = require("path") local strings = require("strings") local yaml = require("encoding/yaml") -local success, lakefs = pcall(require, "lakefs") +local lakefs = require("lakefs") local json = require("encoding/json") local common = require("lakefs/catalog_export/common") --- TDOO(isan) configure somewhere else - --- TODO(isan) this is for development remove before merging -if success then - print("lakefs module is available") -else - print("Error loading lakefs module:", lakefs) -end - --- Define the HiveTable class -local HiveTable = {} -HiveTable.__index = HiveTable - --- Factory function to create new instances -function HiveTable.new(repo_id, ref_id, commit_id, path, name, partition_cols, schema) - local self = setmetatable({}, HiveTable) - self.repo_id = repo_id - self.ref_id = ref_id - self.commit_id = commit_id - self._path = path - self._name = name - self.schema = schema - self.partition_cols = partition_cols - return self -end - --- Define methods -function HiveTable:name() - return self._name -end - -function HiveTable:size() - return nil -end - -function HiveTable:description() - return string.format('Hive table representation for `lakefs://%s/%s/%s` digest: %s', self.repo_id, self.ref_id, - tostring(self._path), self.commit_id:sub(1, common.SHORT_DIGEST_LEN)) -end - -function HiveTable:path() - return self._path -end - -function HiveTable:min_reader_version() - return 1 -end - -function HiveTable:schema_string() - return json.marshal(self.schema) -end - -function HiveTable:partition_columns() - return self.partition_cols -end - -function HiveTable:version() - return 0 -end - -function HiveTable:_get_partition_values(path) - local vals = {} - splitted_path = strings.split(path, pathlib.default_separator()) - for _, part in pairs(splitted_path) do - for _, col in ipairs(self.partition_cols) do - local prefix = col .. "=" - if strings.has_prefix(part, prefix) then - vals[col] = part:sub(#prefix + 1) - end - end - end - return vals -end -- return partition table from path by based on columns in partition_cols function get_partition_values(partition_cols, path) @@ -92,6 +19,7 @@ function get_partition_values(partition_cols, path) end return vals end + -- extract partition substr from full path function extract_partition_prefix_from_path(partition_cols, path) local partitions = get_partition_values(partition_cols, path) @@ -104,6 +32,7 @@ function extract_partition_prefix_from_path(partition_cols, path) return partition_key end +-- Hive format partition iterator each result set is a collection of files under the same partition function lakefs_hive_partition_it(client, repo_id, commit_id, base_path, page_size, delimiter, partition_cols) local after = "" local has_more = true @@ -125,13 +54,13 @@ function lakefs_hive_partition_it(client, repo_id, commit_id, base_path, page_si target_partition = partition_key end -- break if current entry does not belong to the target_partition - if partition_key ~= target_partition then + if partition_key ~= target_partition then -- next time start searching AFTER the last used key after = partition_entries[#partition_entries].path local partition_result = target_partition target_partition = partition_key return partition_result, partition_entries - end + end table.insert(partition_entries, { physical_address = entry.physical_address, path = entry.path, @@ -141,77 +70,62 @@ function lakefs_hive_partition_it(client, repo_id, commit_id, base_path, page_si end end end - has_more = false + has_more = false return target_partition, partition_entries end end +-- Define the HiveTable class +local HiveTable = {} +HiveTable.__index = HiveTable +-- Factory function to create new instances +function HiveTable.new(repo_id, ref_id, commit_id, path, name, partition_cols, schema) + local self = setmetatable({}, HiveTable) + self.repo_id = repo_id + self.ref_id = ref_id + self.commit_id = commit_id + self._path = path + self._name = name + self.schema = schema + self.partition_cols = partition_cols + self._iter_page_size = common.LAKEFS_DEFAULT_PAGE_SIZE + return self +end -function HiveTable:list_objs_and_partitions_test() - local has_more = true - local after = "" - local delimiter = "" - local count = 1 - local partitions_to_files = {} - print("searching objects partitions based in base path " .. self._path) - repeat - local code, resp = lakefs.list_objects(self.repo_id, self.commit_id, after, self._path, delimiter) - if code ~= 200 then - -- TODO(isan) return error to the caller - error("lakeFS: could not list tables in: " .. self._path .. ", error: " .. resp.message) - end - for _, entry in ipairs(resp.results) do - is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name) - if not is_hidden and entry.path_type == "object" then - entry_partitions = self:_get_partition_values(entry.path) - -- start mapp - local partitions = {} - for k, v in pairs(entry_partitions) do - table.insert(partitions, k .. "=" .. v) - end - local partition_key = table.concat(partitions, "/") +-- Define methods +function HiveTable:name() + return self._name +end - if not partitions_to_files[partition_key] then - partitions_to_files[partition_key] = {} - end +function HiveTable:description() + return string.format('Hive table representation for `lakefs://%s/%s/%s` digest: %s', self.repo_id, self.ref_id, + tostring(self._path), self.commit_id:sub(1, common.SHORT_DIGEST_LEN)) +end - table.insert(partitions_to_files[partition_key], { - physical_address = entry.physical_address, - path = entry.path, - size = entry.size_bytes, - checksum = entry.checksum - }) - -- end map - end - end - -- check if has more pages - has_more = resp.pagination.has_more - after = resp.pagination.next_offset - until not has_more - return partitions_to_files +function HiveTable:path() + return self._path end -function HiveTable:list_objects() - local fs = {} - local pre_signed_urls = _signed_urls_by_path(self._lakefs, self.repo_id, self.ref_id, self._path) - for _, s in pairs(pre_signed_urls) do - if string.match(s.path, "%.parquet$") then - local file_proto = { - url = s.physical_address, - id = s.checksum, - entry_partitions = self:_get_partition_values(s.path), - size = s.size_bytes - } - if s.physical_address_expiry then - file_proto.expirationTimestamp = math.floor(s.physical_address_expiry * 1000) - end - table.insert(fs, { - file = file_proto - }) - end - end - return fs +function HiveTable:min_reader_version() + return 1 +end + +function HiveTable:schema_string() + return json.marshal(self.schema) +end + +function HiveTable:partition_columns() + return self.partition_cols +end + +function HiveTable:version() + return 0 +end + +function HiveTable:partition_iterator() + return lakefs_hive_partition_it(lakefs, self.repo_id, self.commit_id, self._path, self._iter_page_size, "", + self.partition_cols) end local TableExtractor = {} @@ -219,40 +133,21 @@ TableExtractor.__index = TableExtractor function TableExtractor.new(repository_id, ref, commit_id) local self = setmetatable({}, TableExtractor) - -- TODO(isan) use or remove lakefs client injection - self.lakefs = lakefs self.tables_registry_base = pathlib.join(pathlib.default_separator(), '_lakefs_tables/') self.repository_id = repository_id self.commit_id = commit_id self.ref = ref + self._iter_page_size = common.LAKEFS_DEFAULT_PAGE_SIZE return self end -function TableExtractor:_list_objects(repository_id, commit_id, location, handle_entries) - local has_more = true - local after = "" - local delimiter = "" - local count = 1 - repeat - local code, resp = self.lakefs.list_objects(repository_id, commit_id, after, location, delimiter) - -- handle paged entiries - if not handle_entries(code, resp) then - return - end - -- check if has more pages - has_more = resp.pagination.has_more - after = resp.pagination.next_offset - until not has_more -end - +-- list all YAML files in _lakefs_tables function TableExtractor:list_table_definitions() local table_entries = {} - self:_list_objects(self.repository_id, self.commit_id, self.tables_registry_base, function(code, resp) - if code ~= 200 then - -- TODO(isan) return error to the caller - error("lakeFS: could not list tables in: " .. self.tables_registry_base .. ", error: " .. resp.message) - end - for _, entry in ipairs(resp.results) do + local iter = common.lakefs_object_it(lakefs, self.repository_id, self.commit_id, "", self.tables_registry_base, + self._iter_page_size, "") + for entries in iter do + for _, entry in ipairs(entries) do is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name) is_yaml = strings.has_suffix(entry.path, ".yaml") if not is_hidden and is_yaml and entry.path_type == "object" then @@ -262,29 +157,27 @@ function TableExtractor:list_table_definitions() }) end end - return true - end) + end return table_entries end +-- return concrete table instance function TableExtractor:get_table(logical_path) code, content = lakefs.get_object(self.repository_id, self.commit_id, logical_path) if code ~= 200 then - -- TODO(isan) propagate error to the caller error("could not fetch data file: HTTP " .. tostring(code)) end descriptor = yaml.unmarshal(content) - -- TODO(isan) implement other tables or handle unsupported + -- TODO(isan) decide where to handle different table parsing? (delta table / glue table etc) if descriptor.type == 'hive' then return HiveTable.new(self.repository_id, self.ref, self.commit_id, descriptor.path, descriptor.name, descriptor.partition_columns or {}, descriptor.schema), nil end - return nil, "NotImplemented: table type: " .. descriptor.type .. " path: " .. descriptor.path + return nil, "NotImplemented: table type: " .. descriptor.type .. " path: " .. logical_path end return { TableExtractor = TableExtractor, - HiveTable = HiveTable, - lakefs_hive_partition_it = lakefs_hive_partition_it, + lakefs_hive_partition_it = lakefs_hive_partition_it } From a6bec34769dcd2c5574c2f5e4e066a6c71a28c73 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Wed, 13 Sep 2023 10:00:50 +0300 Subject: [PATCH 04/27] fix linter issues --- pkg/actions/lua.go | 4 ++-- .../{catalog_export => catalogexport}/common.lua | 0 .../{catalog_export => catalogexport}/lib.lua | 0 .../{catalog_export => catalogexport}/load.go | 13 ++++++------- .../table_extractor.lua | 0 5 files changed, 8 insertions(+), 9 deletions(-) rename pkg/actions/lua/lakefs/{catalog_export => catalogexport}/common.lua (100%) rename pkg/actions/lua/lakefs/{catalog_export => catalogexport}/lib.lua (100%) rename pkg/actions/lua/lakefs/{catalog_export => catalogexport}/load.go (59%) rename pkg/actions/lua/lakefs/{catalog_export => catalogexport}/table_extractor.lua (100%) diff --git a/pkg/actions/lua.go b/pkg/actions/lua.go index 18d35c1185f..857bc0e9dd4 100644 --- a/pkg/actions/lua.go +++ b/pkg/actions/lua.go @@ -13,7 +13,7 @@ import ( "github.com/Shopify/go-lua" lualibs "github.com/treeverse/lakefs/pkg/actions/lua" "github.com/treeverse/lakefs/pkg/actions/lua/lakefs" - catalogexport "github.com/treeverse/lakefs/pkg/actions/lua/lakefs/catalog_export" + catalogexport "github.com/treeverse/lakefs/pkg/actions/lua/lakefs/catalogexport" luautil "github.com/treeverse/lakefs/pkg/actions/lua/util" "github.com/treeverse/lakefs/pkg/auth" "github.com/treeverse/lakefs/pkg/auth/model" @@ -67,7 +67,7 @@ func injectHookContext(l *lua.State, ctx context.Context, user *model.User, endp luautil.DeepPush(l, args) l.SetGlobal("args") lakefs.OpenClient(l, ctx, user, endpoint) - catalogexport.OpenLuaPackage(l, ctx) + catalogexport.OpenLuaPackage(l) } type loggingBuffer struct { diff --git a/pkg/actions/lua/lakefs/catalog_export/common.lua b/pkg/actions/lua/lakefs/catalogexport/common.lua similarity index 100% rename from pkg/actions/lua/lakefs/catalog_export/common.lua rename to pkg/actions/lua/lakefs/catalogexport/common.lua diff --git a/pkg/actions/lua/lakefs/catalog_export/lib.lua b/pkg/actions/lua/lakefs/catalogexport/lib.lua similarity index 100% rename from pkg/actions/lua/lakefs/catalog_export/lib.lua rename to pkg/actions/lua/lakefs/catalogexport/lib.lua diff --git a/pkg/actions/lua/lakefs/catalog_export/load.go b/pkg/actions/lua/lakefs/catalogexport/load.go similarity index 59% rename from pkg/actions/lua/lakefs/catalog_export/load.go rename to pkg/actions/lua/lakefs/catalogexport/load.go index 68a4de55a2f..6c084321449 100644 --- a/pkg/actions/lua/lakefs/catalog_export/load.go +++ b/pkg/actions/lua/lakefs/catalogexport/load.go @@ -1,7 +1,6 @@ -package catalog_export +package catalogexport import ( - "context" "embed" "io/fs" @@ -12,15 +11,15 @@ import ( var modulePath embed.FS // OpenLuaPackage load lua code as a package in the runtime -func OpenLuaPackage(l *lua.State, ctx context.Context) { +func OpenLuaPackage(l *lua.State) { // order here matters each when packages rely on each other - loadLuaAsPackage(l, ctx, "lakefs/catalog_export/common", "common.lua") - loadLuaAsPackage(l, ctx, "lakefs/catalog_export/table_extractor", "table_extractor.lua") + loadLuaAsPackage(l, "lakefs/catalog_export/common", "common.lua") + loadLuaAsPackage(l, "lakefs/catalog_export/table_extractor", "table_extractor.lua") // lib.lua is high level facade for users - loadLuaAsPackage(l, ctx, "lakefs/catalog_export", "lib.lua") + loadLuaAsPackage(l, "lakefs/catalog_export", "lib.lua") } -func loadLuaAsPackage(l *lua.State, ctx context.Context, importAlias, scriptName string) { +func loadLuaAsPackage(l *lua.State, importAlias, scriptName string) { lua.Require(l, importAlias, func(l *lua.State) int { data, err := fs.ReadFile(modulePath, scriptName) if err != nil { diff --git a/pkg/actions/lua/lakefs/catalog_export/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua similarity index 100% rename from pkg/actions/lua/lakefs/catalog_export/table_extractor.lua rename to pkg/actions/lua/lakefs/catalogexport/table_extractor.lua From 8755a7013e9e0051b8130207f88bd7294465680d Mon Sep 17 00:00:00 2001 From: Barak Amar Date: Wed, 13 Sep 2023 19:54:01 +0800 Subject: [PATCH 05/27] load embedded files dynamicly (#6592) --- pkg/actions/lua.go | 4 +- .../lua/lakefs/catalogexport/common.lua | 1 - pkg/actions/lua/lakefs/catalogexport/load.go | 36 ------ pkg/actions/lua/load.go | 117 ++++++++++++++++-- 4 files changed, 107 insertions(+), 51 deletions(-) delete mode 100644 pkg/actions/lua/lakefs/catalogexport/load.go diff --git a/pkg/actions/lua.go b/pkg/actions/lua.go index 857bc0e9dd4..e290bbc76d0 100644 --- a/pkg/actions/lua.go +++ b/pkg/actions/lua.go @@ -13,7 +13,6 @@ import ( "github.com/Shopify/go-lua" lualibs "github.com/treeverse/lakefs/pkg/actions/lua" "github.com/treeverse/lakefs/pkg/actions/lua/lakefs" - catalogexport "github.com/treeverse/lakefs/pkg/actions/lua/lakefs/catalogexport" luautil "github.com/treeverse/lakefs/pkg/actions/lua/util" "github.com/treeverse/lakefs/pkg/auth" "github.com/treeverse/lakefs/pkg/auth/model" @@ -67,7 +66,6 @@ func injectHookContext(l *lua.State, ctx context.Context, user *model.User, endp luautil.DeepPush(l, args) l.SetGlobal("args") lakefs.OpenClient(l, ctx, user, endpoint) - catalogexport.OpenLuaPackage(l) } type loggingBuffer struct { @@ -207,7 +205,7 @@ func NewLuaHook(h ActionHook, action *Action, cfg Config, e *http.Server) (Hook, Args: args, }, nil } else if !errors.Is(err, errMissingKey) { - // 'script' was provided but is empty or of the wrong type.. + // 'script' was provided but is empty or of the wrong type. return nil, err } diff --git a/pkg/actions/lua/lakefs/catalogexport/common.lua b/pkg/actions/lua/lakefs/catalogexport/common.lua index 173772659cf..21d5ac5ee95 100644 --- a/pkg/actions/lua/lakefs/catalogexport/common.lua +++ b/pkg/actions/lua/lakefs/catalogexport/common.lua @@ -1,4 +1,3 @@ - function lakefs_object_it(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter) local next_offset = after local has_more = true diff --git a/pkg/actions/lua/lakefs/catalogexport/load.go b/pkg/actions/lua/lakefs/catalogexport/load.go deleted file mode 100644 index 6c084321449..00000000000 --- a/pkg/actions/lua/lakefs/catalogexport/load.go +++ /dev/null @@ -1,36 +0,0 @@ -package catalogexport - -import ( - "embed" - "io/fs" - - "github.com/Shopify/go-lua" -) - -//go:embed *.lua -var modulePath embed.FS - -// OpenLuaPackage load lua code as a package in the runtime -func OpenLuaPackage(l *lua.State) { - // order here matters each when packages rely on each other - loadLuaAsPackage(l, "lakefs/catalog_export/common", "common.lua") - loadLuaAsPackage(l, "lakefs/catalog_export/table_extractor", "table_extractor.lua") - // lib.lua is high level facade for users - loadLuaAsPackage(l, "lakefs/catalog_export", "lib.lua") -} - -func loadLuaAsPackage(l *lua.State, importAlias, scriptName string) { - lua.Require(l, importAlias, func(l *lua.State) int { - data, err := fs.ReadFile(modulePath, scriptName) - if err != nil { - lua.Errorf(l, err.Error()) - panic("unreachable") - } - if err := lua.DoString(l, string(data)); err != nil { - lua.Errorf(l, err.Error()) - panic("unreachable") - } - return 1 - }, true) - l.Pop(1) -} diff --git a/pkg/actions/lua/load.go b/pkg/actions/lua/load.go index 29e4932ee13..2ec07ba0e88 100644 --- a/pkg/actions/lua/load.go +++ b/pkg/actions/lua/load.go @@ -1,19 +1,29 @@ package lua import ( + "bytes" + "embed" + "errors" "fmt" + "io/fs" "os" "path/filepath" "strings" "github.com/Shopify/go-lua" + "github.com/hashicorp/go-multierror" ) const ( pathListSeparator = ';' - defaultPath = "./?.lua" + defaultPath = "?.lua" ) +//go:embed lakefs/catalogexport/*.lua +var luaEmbeddedCode embed.FS + +var ErrNoFile = errors.New("no file") + func findLoader(l *lua.State, name string) { var msg string if l.Field(lua.UpValueIndex(1), "searchers"); !l.IsTable(3) { @@ -35,6 +45,61 @@ func findLoader(l *lua.State, name string) { } } +func findFile(l *lua.State, name, field, dirSep string) (string, error) { + l.Field(lua.UpValueIndex(1), field) + path, ok := l.ToString(-1) + if !ok { + lua.Errorf(l, "'package.%s' must be a string", field) + } + return searchPath(name, path, ".", dirSep) +} + +func checkLoad(l *lua.State, loaded bool, fileName string) int { + if loaded { // Module loaded successfully? + l.PushString(fileName) // Second argument to module. + return 2 // Return open function & file name. + } + m := lua.CheckString(l, 1) + e := lua.CheckString(l, -1) + lua.Errorf(l, "error loading module '%s' from file '%s':\n\t%s", m, fileName, e) + panic("unreachable") +} + +func searcherLua(l *lua.State) int { + name := lua.CheckString(l, 1) + filename, err := findFile(l, name, "path", string(filepath.Separator)) + if err != nil { + return 1 // Module isn't found in this path. + } + + return checkLoad(l, loadFile(l, filename, "") == nil, filename) +} + +func loadFile(l *lua.State, fileName, mode string) error { + fileNameIndex := l.Top() + 1 + fileError := func(what string) error { + fileName, _ := l.ToString(fileNameIndex) + l.PushFString("cannot %s %s", what, fileName[1:]) + l.Remove(fileNameIndex) + return lua.FileError + } + l.PushString("@" + fileName) + data, err := luaEmbeddedCode.ReadFile(fileName) + if err != nil { + return fileError("open") + } + s, _ := l.ToString(-1) + err = l.Load(bytes.NewReader(data), s, mode) + switch { + case err == nil, errors.Is(err, lua.SyntaxError), errors.Is(err, lua.MemoryError): // do nothing + default: + l.SetTop(fileNameIndex) + return fileError("read") + } + l.Remove(fileNameIndex) + return err +} + func searcherPreload(l *lua.State) int { name := lua.CheckString(l, 1) l.Field(lua.RegistryIndex, "_PRELOAD") @@ -46,7 +111,7 @@ func searcherPreload(l *lua.State) int { } func createSearchersTable(l *lua.State) { - searchers := []lua.Function{searcherPreload} + searchers := []lua.Function{searcherPreload, searcherLua} l.CreateTable(len(searchers), 0) for i, s := range searchers { l.PushValue(-2) @@ -55,6 +120,33 @@ func createSearchersTable(l *lua.State) { } } +func searchPath(name, path, sep, dirSep string) (string, error) { + var err error + if sep != "" { + name = strings.ReplaceAll(name, sep, dirSep) // Replace sep by dirSep. + } + path = strings.ReplaceAll(path, string(pathListSeparator), string(filepath.ListSeparator)) + for _, template := range filepath.SplitList(path) { + if template == "" { + continue + } + filename := strings.ReplaceAll(template, "?", name) + if readable(filename) { + return filename, nil + } + err = multierror.Append(err, fmt.Errorf("%w %s", ErrNoFile, filename)) + } + return "", err +} + +func readable(name string) bool { + if !fs.ValidPath(name) { + return false + } + info, err := fs.Stat(luaEmbeddedCode, name) + return err == nil && !info.IsDir() +} + func noEnv(l *lua.State) bool { l.Field(lua.RegistryIndex, "LUA_NOENV") b := l.ToBoolean(-1) @@ -84,15 +176,18 @@ var packageLibrary = []lua.RegistryFunction{ return 3 // Return nil, error message, and where. }}, {Name: "searchpath", Function: func(l *lua.State) int { - _ = lua.CheckString(l, 1) - _ = lua.CheckString(l, 2) - _ = lua.OptString(l, 3, ".") - _ = lua.OptString(l, 4, string(filepath.Separator)) - - l.PushNil() - l.PushString("searchpath not enabled; check your Lua installation") - l.PushString("absent") - return 3 // Return nil, error message, and where. + name := lua.CheckString(l, 1) + path := lua.CheckString(l, 2) + sep := lua.OptString(l, 3, ".") + dirSep := lua.OptString(l, 4, string(filepath.Separator)) + f, err := searchPath(name, path, sep, dirSep) + if err != nil { + l.PushNil() + l.PushString(err.Error()) + return 2 + } + l.PushString(f) + return 1 }}, } From 0327daadb81cbedbe963ccf26b1fff920221dee7 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Wed, 13 Sep 2023 14:57:58 +0300 Subject: [PATCH 06/27] change paths --- pkg/actions/lua/lakefs/catalogexport/lib.lua | 8 +++----- pkg/actions/lua/lakefs/catalogexport/table_extractor.lua | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/lib.lua b/pkg/actions/lua/lakefs/catalogexport/lib.lua index 466fa1fea7e..f780b956e94 100644 --- a/pkg/actions/lua/lakefs/catalogexport/lib.lua +++ b/pkg/actions/lua/lakefs/catalogexport/lib.lua @@ -1,5 +1,5 @@ -local extractor = require("lakefs/catalog_export/table_extractor") -local common = require("lakefs/catalog_export/common") +local extractor = require("lakefs/catalogexport/table_extractor") +local common = require("lakefs/catalogexport/common") -- resolve ref value from action.action.event_type function ref_from_branch_or_tag() @@ -18,7 +18,5 @@ end return { TableExtractor = extractor.TableExtractor, - ref_from_branch_or_tag=ref_from_branch_or_tag, - lakefs_object_it=common.lakefs_object_it, - lakefs_hive_partition_it=extractor.lakefs_hive_partition_it, + ref_from_branch_or_tag=ref_from_branch_or_tag, } diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index a77867f5a83..c5d7db1aa6f 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -3,7 +3,7 @@ local strings = require("strings") local yaml = require("encoding/yaml") local lakefs = require("lakefs") local json = require("encoding/json") -local common = require("lakefs/catalog_export/common") +local common = require("lakefs/catalogexport/common") -- return partition table from path by based on columns in partition_cols function get_partition_values(partition_cols, path) From 83ef166f65fe7522fc32e1ca73778a13602b4013 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Wed, 13 Sep 2023 18:38:51 +0300 Subject: [PATCH 07/27] fix short digest and paging parameters --- pkg/actions/lua/lakefs/catalogexport/common.lua | 9 +++++++-- .../lua/lakefs/catalogexport/table_extractor.lua | 15 +++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/common.lua b/pkg/actions/lua/lakefs/catalogexport/common.lua index 21d5ac5ee95..f5d18163543 100644 --- a/pkg/actions/lua/lakefs/catalogexport/common.lua +++ b/pkg/actions/lua/lakefs/catalogexport/common.lua @@ -1,3 +1,9 @@ +local SHORT_DIGEST_LEN=6 + +function short_digest(digest) + return digest:sub(1, SHORT_DIGEST_LEN) +end + function lakefs_object_it(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter) local next_offset = after local has_more = true @@ -18,6 +24,5 @@ end return { lakefs_object_it=lakefs_object_it, - SHORT_DIGEST_LEN=6, - LAKEFS_DEFAULT_PAGE_SIZE=30, + short_digest=short_digest, } \ No newline at end of file diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index c5d7db1aa6f..67088181b3d 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -80,7 +80,7 @@ local HiveTable = {} HiveTable.__index = HiveTable -- Factory function to create new instances -function HiveTable.new(repo_id, ref_id, commit_id, path, name, partition_cols, schema) +function HiveTable.new(repo_id, ref_id, commit_id, path, name, partition_cols, schema, iter_page_size) local self = setmetatable({}, HiveTable) self.repo_id = repo_id self.ref_id = ref_id @@ -89,7 +89,7 @@ function HiveTable.new(repo_id, ref_id, commit_id, path, name, partition_cols, s self._name = name self.schema = schema self.partition_cols = partition_cols - self._iter_page_size = common.LAKEFS_DEFAULT_PAGE_SIZE + self._iter_page_size = iter_page_size return self end @@ -100,7 +100,7 @@ end function HiveTable:description() return string.format('Hive table representation for `lakefs://%s/%s/%s` digest: %s', self.repo_id, self.ref_id, - tostring(self._path), self.commit_id:sub(1, common.SHORT_DIGEST_LEN)) + tostring(self._path), common.short_digest(self.commit_id)) end function HiveTable:path() @@ -131,13 +131,16 @@ end local TableExtractor = {} TableExtractor.__index = TableExtractor -function TableExtractor.new(repository_id, ref, commit_id) +function TableExtractor.new(repository_id, ref, commit_id, tables_iter_page_size, export_iter_page_size) local self = setmetatable({}, TableExtractor) self.tables_registry_base = pathlib.join(pathlib.default_separator(), '_lakefs_tables/') self.repository_id = repository_id self.commit_id = commit_id self.ref = ref - self._iter_page_size = common.LAKEFS_DEFAULT_PAGE_SIZE + -- object iterator when listing _lakefs_tables/* + self._iter_page_size = tables_iter_page_size + -- object iteration when listing partitions to export (table objects) + self._export_iter_page_size = export_iter_page_size return self end @@ -172,7 +175,7 @@ function TableExtractor:get_table(logical_path) -- TODO(isan) decide where to handle different table parsing? (delta table / glue table etc) if descriptor.type == 'hive' then return HiveTable.new(self.repository_id, self.ref, self.commit_id, descriptor.path, descriptor.name, - descriptor.partition_columns or {}, descriptor.schema), nil + descriptor.partition_columns or {}, descriptor.schema, self._export_iter_page_size), nil end return nil, "NotImplemented: table type: " .. descriptor.type .. " path: " .. logical_path end From 4f02c4def853facfc5044b22519a98227c0cd6dd Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Wed, 13 Sep 2023 19:11:28 +0300 Subject: [PATCH 08/27] fix review --- .../lua/lakefs/catalogexport/common.lua | 37 +++++++++++++++---- pkg/actions/lua/lakefs/catalogexport/lib.lua | 25 ++++++------- .../lakefs/catalogexport/table_extractor.lua | 14 +++---- 3 files changed, 48 insertions(+), 28 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/common.lua b/pkg/actions/lua/lakefs/catalogexport/common.lua index f5d18163543..544a1e03b9c 100644 --- a/pkg/actions/lua/lakefs/catalogexport/common.lua +++ b/pkg/actions/lua/lakefs/catalogexport/common.lua @@ -4,25 +4,48 @@ function short_digest(digest) return digest:sub(1, SHORT_DIGEST_LEN) end -function lakefs_object_it(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter) +function lakefs_paginiated_api(api_call, after) local next_offset = after local has_more = true return function() if not has_more then return nil end - local code, resp = lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size) - if code ~= 200 then - error("lakeFS: could not list objects in: " .. prefix .. ", error: " .. resp.message) + local code, resp = api_call(next_offset) + if code < 200 or code >= 300 then + error("lakeFS: api return non-2xx" .. code) end - local objects = resp.results has_more = resp.pagination.has_more next_offset = resp.pagination.next_offset - return objects + return resp.results end end +function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter) + return lakefs_paginiated_api(function(next_offset) + return lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size) + end, after) +end + +-- function lakefs_object_pager_old(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter) +-- local next_offset = after +-- local has_more = true +-- return function() +-- if not has_more then +-- return nil +-- end +-- local code, resp = lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size) +-- if code ~= 200 then +-- error("lakeFS: could not list objects in: " .. prefix .. ", error: " .. resp.message) +-- end +-- local objects = resp.results +-- has_more = resp.pagination.has_more +-- next_offset = resp.pagination.next_offset +-- return objects +-- end +-- end + return { - lakefs_object_it=lakefs_object_it, + lakefs_object_pager=lakefs_object_pager, short_digest=short_digest, } \ No newline at end of file diff --git a/pkg/actions/lua/lakefs/catalogexport/lib.lua b/pkg/actions/lua/lakefs/catalogexport/lib.lua index f780b956e94..973ef48d524 100644 --- a/pkg/actions/lua/lakefs/catalogexport/lib.lua +++ b/pkg/actions/lua/lakefs/catalogexport/lib.lua @@ -1,22 +1,19 @@ -local extractor = require("lakefs/catalogexport/table_extractor") -local common = require("lakefs/catalogexport/common") +local extractor = require("lakefs/catalogexport/table_extractor") +local common = require("lakefs/catalogexport/common") --- resolve ref value from action.action.event_type -function ref_from_branch_or_tag() - tag_events = { ["pre-create-tag"] = true, ["post-create-tag"] = true } - branch_events = { ["pre-create-branch"] = true, ["post-create-branch"] = true } - commit_events = { ["post-commit"] = true, ["post-merge"] = true } - local ref - if tag_events[action.event_type] then - return action.tag_id, nil - elseif branch_events[action.event_type] or commit_events[action.event_type] then - return action.branch_id, nil +-- resolve ref value from action global +function ref_from_branch_or_tag(action_info) + event = action_info.event_type + if event == "pre-create-tag" or event == "post-create-tag" then + return action_info.tag_id, nil + elseif event == "pre-create-branch" or event == "post-create-branch" or "post-commit" or "post-merge" then + return action_info.branch_id, nil else - return nil, "unsupported event type: " .. action.event_type + return nil, "unsupported event type: " .. action_info.event_type end end return { TableExtractor = extractor.TableExtractor, - ref_from_branch_or_tag=ref_from_branch_or_tag, + ref_from_branch_or_tag = ref_from_branch_or_tag } diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index 67088181b3d..78c78e38f79 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -33,7 +33,7 @@ function extract_partition_prefix_from_path(partition_cols, path) end -- Hive format partition iterator each result set is a collection of files under the same partition -function lakefs_hive_partition_it(client, repo_id, commit_id, base_path, page_size, delimiter, partition_cols) +function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, delimiter, partition_cols) local after = "" local has_more = true local prefix = base_path @@ -43,7 +43,7 @@ function lakefs_hive_partition_it(client, repo_id, commit_id, base_path, page_si return nil end local partition_entries = {} - local iter = common.lakefs_object_it(client, repo_id, commit_id, after, prefix, page_size, delimiter) + local iter = common.lakefs_object_pager(client, repo_id, commit_id, after, prefix, page_size, delimiter) for entries in iter do for _, entry in ipairs(entries) do is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name) @@ -123,8 +123,8 @@ function HiveTable:version() return 0 end -function HiveTable:partition_iterator() - return lakefs_hive_partition_it(lakefs, self.repo_id, self.commit_id, self._path, self._iter_page_size, "", +function HiveTable:partition_pager() + return lakefs_hive_partition_pager(lakefs, self.repo_id, self.commit_id, self._path, self._iter_page_size, "", self.partition_cols) end @@ -133,7 +133,7 @@ TableExtractor.__index = TableExtractor function TableExtractor.new(repository_id, ref, commit_id, tables_iter_page_size, export_iter_page_size) local self = setmetatable({}, TableExtractor) - self.tables_registry_base = pathlib.join(pathlib.default_separator(), '_lakefs_tables/') + self.tables_registry_base = '_lakefs_tables/' self.repository_id = repository_id self.commit_id = commit_id self.ref = ref @@ -147,7 +147,7 @@ end -- list all YAML files in _lakefs_tables function TableExtractor:list_table_definitions() local table_entries = {} - local iter = common.lakefs_object_it(lakefs, self.repository_id, self.commit_id, "", self.tables_registry_base, + local iter = common.lakefs_object_pager(lakefs, self.repository_id, self.commit_id, "", self.tables_registry_base, self._iter_page_size, "") for entries in iter do for _, entry in ipairs(entries) do @@ -182,5 +182,5 @@ end return { TableExtractor = TableExtractor, - lakefs_hive_partition_it = lakefs_hive_partition_it + lakefs_hive_partition_pager = lakefs_hive_partition_pager } From 98926489e9976ecf6383279faf626ee2046a0e6d Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Wed, 13 Sep 2023 19:11:49 +0300 Subject: [PATCH 09/27] clean --- .../lua/lakefs/catalogexport/common.lua | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/common.lua b/pkg/actions/lua/lakefs/catalogexport/common.lua index 544a1e03b9c..072c423ae41 100644 --- a/pkg/actions/lua/lakefs/catalogexport/common.lua +++ b/pkg/actions/lua/lakefs/catalogexport/common.lua @@ -27,24 +27,6 @@ function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, p end, after) end --- function lakefs_object_pager_old(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter) --- local next_offset = after --- local has_more = true --- return function() --- if not has_more then --- return nil --- end --- local code, resp = lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size) --- if code ~= 200 then --- error("lakeFS: could not list objects in: " .. prefix .. ", error: " .. resp.message) --- end --- local objects = resp.results --- has_more = resp.pagination.has_more --- next_offset = resp.pagination.next_offset --- return objects --- end --- end - return { lakefs_object_pager=lakefs_object_pager, short_digest=short_digest, From e1d8d03ef1494a728c5a3de90369fcb6711e1c36 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Thu, 14 Sep 2023 08:29:46 +0300 Subject: [PATCH 10/27] review comments --- pkg/actions/lua/lakefs/catalogexport/lib.lua | 6 +++--- .../lakefs/catalogexport/table_extractor.lua | 17 ++++++----------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/lib.lua b/pkg/actions/lua/lakefs/catalogexport/lib.lua index 973ef48d524..3dea18af920 100644 --- a/pkg/actions/lua/lakefs/catalogexport/lib.lua +++ b/pkg/actions/lua/lakefs/catalogexport/lib.lua @@ -5,11 +5,11 @@ local common = require("lakefs/catalogexport/common") function ref_from_branch_or_tag(action_info) event = action_info.event_type if event == "pre-create-tag" or event == "post-create-tag" then - return action_info.tag_id, nil + return action_info.tag_id elseif event == "pre-create-branch" or event == "post-create-branch" or "post-commit" or "post-merge" then - return action_info.branch_id, nil + return action_info.branch_id else - return nil, "unsupported event type: " .. action_info.event_type + error("unsupported event type: " .. action_info.event_type) end end diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index 78c78e38f79..3f9067614d7 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -107,14 +107,6 @@ function HiveTable:path() return self._path end -function HiveTable:min_reader_version() - return 1 -end - -function HiveTable:schema_string() - return json.marshal(self.schema) -end - function HiveTable:partition_columns() return self.partition_cols end @@ -144,6 +136,11 @@ function TableExtractor.new(repository_id, ref, commit_id, tables_iter_page_size return self end +function TableExtractor:_is_table_obj(entry) + local is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name) + local is_yaml = strings.has_suffix(entry.path, ".yaml") + return not is_hidden and is_yaml and entry.path_type == "object" +end -- list all YAML files in _lakefs_tables function TableExtractor:list_table_definitions() local table_entries = {} @@ -151,9 +148,7 @@ function TableExtractor:list_table_definitions() self._iter_page_size, "") for entries in iter do for _, entry in ipairs(entries) do - is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name) - is_yaml = strings.has_suffix(entry.path, ".yaml") - if not is_hidden and is_yaml and entry.path_type == "object" then + if self:_is_table_obj(entry) then table.insert(table_entries, { physical_address = entry.physical_address, path = entry.path From 72dc0d1f099129ad9866030198901bf3894b733f Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Thu, 14 Sep 2023 10:56:11 +0300 Subject: [PATCH 11/27] update review --- .../lakefs/catalogexport/table_extractor.lua | 63 +++++++++---------- 1 file changed, 29 insertions(+), 34 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index 3f9067614d7..70df62dd086 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -1,3 +1,4 @@ +local regexp = require("regexp") local pathlib = require("path") local strings = require("strings") local yaml = require("encoding/yaml") @@ -5,35 +6,25 @@ local lakefs = require("lakefs") local json = require("encoding/json") local common = require("lakefs/catalogexport/common") --- return partition table from path by based on columns in partition_cols -function get_partition_values(partition_cols, path) - local vals = {} - splitted_path = strings.split(path, pathlib.default_separator()) - for _, part in pairs(splitted_path) do - for _, col in ipairs(partition_cols) do - local prefix = col .. "=" - if strings.has_prefix(part, prefix) then - vals[col] = part:sub(#prefix + 1) - end +-- extract partition prefix from full path +function extract_partitions_path(partition_cols, path) + local idx = 0 + for _, partition in ipairs(partition_cols) do + local pattern = partition .. "=[^/]*" + local re = regexp.compile(pattern) + local match = re.find(path, pattern) + if match == "" then + return nil end + -- expanding the pattern to a match regex because string.find() does not implement pattern matching https://github.com/Shopify/go-lua/blob/main/string.go#L37 + local i, j = string.find(path, match, idx) + idx = j + 1 end - return vals -end - --- extract partition substr from full path -function extract_partition_prefix_from_path(partition_cols, path) - local partitions = get_partition_values(partition_cols, path) - local partitions_list = {} - -- iterate partition_cols to maintain order - for _, col in pairs(partition_cols) do - table.insert(partitions_list, col .. "=" .. partitions[col]) - end - local partition_key = table.concat(partitions_list, pathlib.default_separator()) - return partition_key + return path:sub(1, idx) end -- Hive format partition iterator each result set is a collection of files under the same partition -function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, delimiter, partition_cols) +function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols) local after = "" local has_more = true local prefix = base_path @@ -43,12 +34,11 @@ function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page return nil end local partition_entries = {} - local iter = common.lakefs_object_pager(client, repo_id, commit_id, after, prefix, page_size, delimiter) + local iter = common.lakefs_object_pager(client, repo_id, commit_id, after, prefix, page_size, "") for entries in iter do for _, entry in ipairs(entries) do - is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name) - if not is_hidden and entry.path_type == "object" then - local partition_key = extract_partition_prefix_from_path(partition_cols, entry.path) + if not pathlib.is_hidden(pathlib.parse(entry.path).base_name) then + local partition_key = extract_partitions_path(partition_cols, entry.path) -- first time: if not set, assign current object partition as the target_partition key if target_partition == "" then target_partition = partition_key @@ -116,7 +106,7 @@ function HiveTable:version() end function HiveTable:partition_pager() - return lakefs_hive_partition_pager(lakefs, self.repo_id, self.commit_id, self._path, self._iter_page_size, "", + return lakefs_hive_partition_pager(lakefs, self.repo_id, self.commit_id, self._path, self._iter_page_size, self.partition_cols) end @@ -137,10 +127,15 @@ function TableExtractor.new(repository_id, ref, commit_id, tables_iter_page_size end function TableExtractor:_is_table_obj(entry) - local is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name) + if entry == nil or entry.path_type ~= "object" then + return false + end + -- remove _lakefs_tables/ from path + local suffix = entry.path:sub(#self.tables_registry_base, #entry.path) + local is_hidden = pathlib.is_hidden(suffix) local is_yaml = strings.has_suffix(entry.path, ".yaml") - return not is_hidden and is_yaml and entry.path_type == "object" -end + return not is_hidden and is_yaml +end -- list all YAML files in _lakefs_tables function TableExtractor:list_table_definitions() local table_entries = {} @@ -170,9 +165,9 @@ function TableExtractor:get_table(logical_path) -- TODO(isan) decide where to handle different table parsing? (delta table / glue table etc) if descriptor.type == 'hive' then return HiveTable.new(self.repository_id, self.ref, self.commit_id, descriptor.path, descriptor.name, - descriptor.partition_columns or {}, descriptor.schema, self._export_iter_page_size), nil + descriptor.partition_columns or {}, descriptor.schema, self._export_iter_page_size) end - return nil, "NotImplemented: table type: " .. descriptor.type .. " path: " .. logical_path + error("NotImplemented: table type: " .. descriptor.type .. " path: " .. logical_path) end return { From ef891ae0e3df65641abf9b6e60538ca49d79b47f Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Thu, 14 Sep 2023 12:35:47 +0300 Subject: [PATCH 12/27] refactor tables --- .../lua/lakefs/catalogexport/common.lua | 2 +- pkg/actions/lua/lakefs/catalogexport/hive.lua | 101 +++++++++++ pkg/actions/lua/lakefs/catalogexport/lib.lua | 6 +- .../lakefs/catalogexport/table_extractor.lua | 160 ++---------------- 4 files changed, 125 insertions(+), 144 deletions(-) create mode 100644 pkg/actions/lua/lakefs/catalogexport/hive.lua diff --git a/pkg/actions/lua/lakefs/catalogexport/common.lua b/pkg/actions/lua/lakefs/catalogexport/common.lua index 072c423ae41..901a47f1566 100644 --- a/pkg/actions/lua/lakefs/catalogexport/common.lua +++ b/pkg/actions/lua/lakefs/catalogexport/common.lua @@ -30,4 +30,4 @@ end return { lakefs_object_pager=lakefs_object_pager, short_digest=short_digest, -} \ No newline at end of file +} diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua new file mode 100644 index 00000000000..642cdbfa5be --- /dev/null +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -0,0 +1,101 @@ +local regexp = require("regexp") +local pathlib = require("path") +local common = require("lakefs/catalogexport/common") + +-- extract partition prefix from full path +function extract_partitions_path(partition_cols, path) + local idx = 0 + for _, partition in ipairs(partition_cols) do + local pattern = partition .. "=[^/]*" + local re = regexp.compile(pattern) + local match = re.find(path, pattern) + if match == "" then + return nil + end + -- expanding the pattern to a match regex because string.find() does not implement pattern matching https://github.com/Shopify/go-lua/blob/main/string.go#L37 + local i, j = string.find(path, match, idx) + idx = j + 1 + end + return path:sub(1, idx) +end + +-- Hive format partition iterator each result set is a collection of files under the same partition +function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols) + local after = "" + local has_more = true + local prefix = base_path + local target_partition = "" + return function() + if not has_more then + return nil + end + local partition_entries = {} + local iter = common.lakefs_object_pager(client, repo_id, commit_id, after, prefix, page_size, "") + for entries in iter do + for _, entry in ipairs(entries) do + if not pathlib.is_hidden(pathlib.parse(entry.path).base_name) then + local partition_key = extract_partitions_path(partition_cols, entry.path) + -- first time: if not set, assign current object partition as the target_partition key + if target_partition == "" then + target_partition = partition_key + end + -- break if current entry does not belong to the target_partition + if partition_key ~= target_partition then + -- next time start searching AFTER the last used key + after = partition_entries[#partition_entries].path + local partition_result = target_partition + target_partition = partition_key + return partition_result, partition_entries + end + table.insert(partition_entries, { + physical_address = entry.physical_address, + path = entry.path, + size = entry.size_bytes, + checksum = entry.checksum + }) + end + end + end + has_more = false + return target_partition, partition_entries + end +end + +-- Define the HiveTableExtractor class +local HiveTableExtractor = {} +HiveTableExtractor.__index = HiveTableExtractor + +-- Factory function to create new instances +function HiveTableExtractor.new(lakefs_client, repo_id, ref_id, commit_id, descriptor) + local self = setmetatable({}, HiveTableExtractor) + self.lakefs_client = lakefs_client + self.repo_id = repo_id + self.ref_id = ref_id + self.commit_id = commit_id + self.descriptor = descriptor + self.path = descriptor.path + self.name = descriptor.name + self.schema = descriptor.schema + self.partition_cols = descriptor.partition_columns or {} + return self +end + +-- Define methods +function HiveTableExtractor:description() + return string.format('Hive table representation for `lakefs://%s/%s/%s` digest: %s', self.repo_id, self.ref_id, + tostring(self.path), common.short_digest(self.commit_id)) +end + +function HiveTableExtractor:version() + return 0 +end + +function HiveTableExtractor:partition_pager(page_size) + return lakefs_hive_partition_pager(self.lakefs_client, self.repo_id, self.commit_id, self.path, page_size, + self.partition_cols) +end + +return { + HiveTableExtractor = HiveTableExtractor, + lakefs_hive_partition_it = lakefs_hive_partition_it +} diff --git a/pkg/actions/lua/lakefs/catalogexport/lib.lua b/pkg/actions/lua/lakefs/catalogexport/lib.lua index 3dea18af920..ec3f834b3c0 100644 --- a/pkg/actions/lua/lakefs/catalogexport/lib.lua +++ b/pkg/actions/lua/lakefs/catalogexport/lib.lua @@ -13,7 +13,11 @@ function ref_from_branch_or_tag(action_info) end end + return { - TableExtractor = extractor.TableExtractor, + -- for testing purposes with scripts + internal = { + extractor = extractor + }, ref_from_branch_or_tag = ref_from_branch_or_tag } diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index 70df62dd086..741ebc8dafd 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -1,149 +1,30 @@ -local regexp = require("regexp") local pathlib = require("path") local strings = require("strings") local yaml = require("encoding/yaml") -local lakefs = require("lakefs") -local json = require("encoding/json") local common = require("lakefs/catalogexport/common") +local hive = require("lakefs/catalogexport/hive") --- extract partition prefix from full path -function extract_partitions_path(partition_cols, path) - local idx = 0 - for _, partition in ipairs(partition_cols) do - local pattern = partition .. "=[^/]*" - local re = regexp.compile(pattern) - local match = re.find(path, pattern) - if match == "" then - return nil - end - -- expanding the pattern to a match regex because string.find() does not implement pattern matching https://github.com/Shopify/go-lua/blob/main/string.go#L37 - local i, j = string.find(path, match, idx) - idx = j + 1 - end - return path:sub(1, idx) -end - --- Hive format partition iterator each result set is a collection of files under the same partition -function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols) - local after = "" - local has_more = true - local prefix = base_path - local target_partition = "" - return function() - if not has_more then - return nil - end - local partition_entries = {} - local iter = common.lakefs_object_pager(client, repo_id, commit_id, after, prefix, page_size, "") - for entries in iter do - for _, entry in ipairs(entries) do - if not pathlib.is_hidden(pathlib.parse(entry.path).base_name) then - local partition_key = extract_partitions_path(partition_cols, entry.path) - -- first time: if not set, assign current object partition as the target_partition key - if target_partition == "" then - target_partition = partition_key - end - -- break if current entry does not belong to the target_partition - if partition_key ~= target_partition then - -- next time start searching AFTER the last used key - after = partition_entries[#partition_entries].path - local partition_result = target_partition - target_partition = partition_key - return partition_result, partition_entries - end - table.insert(partition_entries, { - physical_address = entry.physical_address, - path = entry.path, - size = entry.size_bytes, - checksum = entry.checksum - }) - end - end - end - has_more = false - return target_partition, partition_entries - end -end - --- Define the HiveTable class -local HiveTable = {} -HiveTable.__index = HiveTable - --- Factory function to create new instances -function HiveTable.new(repo_id, ref_id, commit_id, path, name, partition_cols, schema, iter_page_size) - local self = setmetatable({}, HiveTable) - self.repo_id = repo_id - self.ref_id = ref_id - self.commit_id = commit_id - self._path = path - self._name = name - self.schema = schema - self.partition_cols = partition_cols - self._iter_page_size = iter_page_size - return self -end - --- Define methods -function HiveTable:name() - return self._name -end - -function HiveTable:description() - return string.format('Hive table representation for `lakefs://%s/%s/%s` digest: %s', self.repo_id, self.ref_id, - tostring(self._path), common.short_digest(self.commit_id)) -end - -function HiveTable:path() - return self._path -end - -function HiveTable:partition_columns() - return self.partition_cols -end - -function HiveTable:version() - return 0 -end +local LAKEFS_TABLES_BASE = "_lakefs_tables/" -function HiveTable:partition_pager() - return lakefs_hive_partition_pager(lakefs, self.repo_id, self.commit_id, self._path, self._iter_page_size, - self.partition_cols) -end - -local TableExtractor = {} -TableExtractor.__index = TableExtractor - -function TableExtractor.new(repository_id, ref, commit_id, tables_iter_page_size, export_iter_page_size) - local self = setmetatable({}, TableExtractor) - self.tables_registry_base = '_lakefs_tables/' - self.repository_id = repository_id - self.commit_id = commit_id - self.ref = ref - -- object iterator when listing _lakefs_tables/* - self._iter_page_size = tables_iter_page_size - -- object iteration when listing partitions to export (table objects) - self._export_iter_page_size = export_iter_page_size - return self -end - -function TableExtractor:_is_table_obj(entry) +function _is_table_obj(entry, tables_base) if entry == nil or entry.path_type ~= "object" then - return false + return false end -- remove _lakefs_tables/ from path - local suffix = entry.path:sub(#self.tables_registry_base, #entry.path) + local suffix = entry.path:sub(#tables_base, #entry.path) local is_hidden = pathlib.is_hidden(suffix) local is_yaml = strings.has_suffix(entry.path, ".yaml") return not is_hidden and is_yaml end --- list all YAML files in _lakefs_tables -function TableExtractor:list_table_definitions() + +-- list all YAML files under _lakefs_tables/* +function list_table_descriptor_files(client, repo_id, commit_id) local table_entries = {} - local iter = common.lakefs_object_pager(lakefs, self.repository_id, self.commit_id, "", self.tables_registry_base, - self._iter_page_size, "") + local page_size = 30 + local iter = common.lakefs_object_pager(client, repo_id, commit_id, "", LAKEFS_TABLES_BASE, page_size, "") for entries in iter do for _, entry in ipairs(entries) do - if self:_is_table_obj(entry) then + if _is_table_obj(entry, LAKEFS_TABLES_BASE) then table.insert(table_entries, { physical_address = entry.physical_address, path = entry.path @@ -154,23 +35,18 @@ function TableExtractor:list_table_definitions() return table_entries end --- return concrete table instance -function TableExtractor:get_table(logical_path) - code, content = lakefs.get_object(self.repository_id, self.commit_id, logical_path) +-- table as parsed YAML object +function get_table_descriptor(client, repo_id, commit_id, logical_path) + code, content = client.get_object(repo_id, commit_id, logical_path) if code ~= 200 then error("could not fetch data file: HTTP " .. tostring(code)) end - descriptor = yaml.unmarshal(content) - -- TODO(isan) decide where to handle different table parsing? (delta table / glue table etc) - if descriptor.type == 'hive' then - return HiveTable.new(self.repository_id, self.ref, self.commit_id, descriptor.path, descriptor.name, - descriptor.partition_columns or {}, descriptor.schema, self._export_iter_page_size) - end - error("NotImplemented: table type: " .. descriptor.type .. " path: " .. logical_path) + return descriptor end return { - TableExtractor = TableExtractor, - lakefs_hive_partition_pager = lakefs_hive_partition_pager + list_table_descriptor_files = list_table_descriptor_files, + get_table_descriptor = get_table_descriptor, + HiveTableExtractor = hive.HiveTableExtractor, } From 9dbb36f8b07d0956de1f4b6d0f92179e2c9809ec Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Thu, 14 Sep 2023 18:41:05 +0300 Subject: [PATCH 13/27] Extractor --- .../lua/lakefs/catalogexport/table_extractor.lua | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index 741ebc8dafd..a20b083319c 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -6,15 +6,17 @@ local hive = require("lakefs/catalogexport/hive") local LAKEFS_TABLES_BASE = "_lakefs_tables/" +-- check if lakefs entry is a table spec under _lakefs_tables/ function _is_table_obj(entry, tables_base) - if entry == nil or entry.path_type ~= "object" then + if entry.path_type ~= "object" then return false end - -- remove _lakefs_tables/ from path - local suffix = entry.path:sub(#tables_base, #entry.path) - local is_hidden = pathlib.is_hidden(suffix) - local is_yaml = strings.has_suffix(entry.path, ".yaml") - return not is_hidden and is_yaml + local path = entry.path + if strings.has_prefix(path, tables_base) then + -- remove _lakefs_tables/ from path + path = entry.path:sub(#tables_base, #path) + end + return not pathlib.is_hidden(path) and strings.has_suffix(path, ".yaml") end -- list all YAML files under _lakefs_tables/* From 0429e670a586313b4bf514f32554a6f499937bbd Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Thu, 14 Sep 2023 18:46:44 +0300 Subject: [PATCH 14/27] review comments --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index 642cdbfa5be..e6633d8f26c 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -4,21 +4,17 @@ local common = require("lakefs/catalogexport/common") -- extract partition prefix from full path function extract_partitions_path(partition_cols, path) - local idx = 0 - for _, partition in ipairs(partition_cols) do - local pattern = partition .. "=[^/]*" - local re = regexp.compile(pattern) - local match = re.find(path, pattern) - if match == "" then - return nil - end - -- expanding the pattern to a match regex because string.find() does not implement pattern matching https://github.com/Shopify/go-lua/blob/main/string.go#L37 - local i, j = string.find(path, match, idx) - idx = j + 1 + -- list of columns to pattern {a,b,c} -> a=*/b=*/c=*/ + local partition_pattern = table.concat(partition_cols, "=[^/]*/") .. "=[^/]*/" + local re = regexp.compile(partition_pattern) + local match = re.find(path, partition_pattern) + if match == "" then + return nil end - return path:sub(1, idx) + return match end + -- Hive format partition iterator each result set is a collection of files under the same partition function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols) local after = "" From ce76e29501a2cbcc130e02e51beb230cf10c8025 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Mon, 18 Sep 2023 11:30:05 +0300 Subject: [PATCH 15/27] fix review comments --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 34 ++++++++++++------- .../lakefs/catalogexport/table_extractor.lua | 4 +-- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index e6633d8f26c..10f0efefaa8 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -5,7 +5,7 @@ local common = require("lakefs/catalogexport/common") -- extract partition prefix from full path function extract_partitions_path(partition_cols, path) -- list of columns to pattern {a,b,c} -> a=*/b=*/c=*/ - local partition_pattern = table.concat(partition_cols, "=[^/]*/") .. "=[^/]*/" + local partition_pattern = table.concat(partition_cols, "=[^/]*/") .. "=[^/]*/" local re = regexp.compile(partition_pattern) local match = re.find(path, partition_pattern) if match == "" then @@ -14,31 +14,39 @@ function extract_partitions_path(partition_cols, path) return match end - -- Hive format partition iterator each result set is a collection of files under the same partition function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols) local after = "" local has_more = true local prefix = base_path local target_partition = "" + local page = {} return function() if not has_more then return nil end local partition_entries = {} - local iter = common.lakefs_object_pager(client, repo_id, commit_id, after, prefix, page_size, "") - for entries in iter do - for _, entry in ipairs(entries) do - if not pathlib.is_hidden(pathlib.parse(entry.path).base_name) then + repeat + if #page == 0 then + local nextPage = common.lakefs_object_pager(client, repo_id, commit_id, after, prefix, page_size, "") + page = nextPage() + if page == nil or #page == 0 then -- no more records + has_more = false + return target_partition, partition_entries + else -- set next offset + after = page[#page].path + end + end + repeat + local entry = page[1] + if not pathlib.is_hidden(entry.path) then local partition_key = extract_partitions_path(partition_cols, entry.path) - -- first time: if not set, assign current object partition as the target_partition key + -- first time: if not set, assign current object partition as the target_partition key if target_partition == "" then target_partition = partition_key end -- break if current entry does not belong to the target_partition if partition_key ~= target_partition then - -- next time start searching AFTER the last used key - after = partition_entries[#partition_entries].path local partition_result = target_partition target_partition = partition_key return partition_result, partition_entries @@ -49,11 +57,11 @@ function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page size = entry.size_bytes, checksum = entry.checksum }) + -- remove entry only if its part of the current partition + table.remove(page, 1) end - end - end - has_more = false - return target_partition, partition_entries + until page == nil or #page == 0 + until not has_more end end diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index a20b083319c..678ffd47935 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -7,7 +7,7 @@ local hive = require("lakefs/catalogexport/hive") local LAKEFS_TABLES_BASE = "_lakefs_tables/" -- check if lakefs entry is a table spec under _lakefs_tables/ -function _is_table_obj(entry, tables_base) +function is_table_obj(entry, tables_base) if entry.path_type ~= "object" then return false end @@ -26,7 +26,7 @@ function list_table_descriptor_files(client, repo_id, commit_id) local iter = common.lakefs_object_pager(client, repo_id, commit_id, "", LAKEFS_TABLES_BASE, page_size, "") for entries in iter do for _, entry in ipairs(entries) do - if _is_table_obj(entry, LAKEFS_TABLES_BASE) then + if is_table_obj(entry, LAKEFS_TABLES_BASE) then table.insert(table_entries, { physical_address = entry.physical_address, path = entry.path From f4c3a9632b0ad0b24407f62678a2de6498b55584 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Mon, 18 Sep 2023 12:54:41 +0300 Subject: [PATCH 16/27] fix iterator --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 63 +++++++++---------- 1 file changed, 29 insertions(+), 34 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index 10f0efefaa8..624810c9586 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -16,52 +16,47 @@ end -- Hive format partition iterator each result set is a collection of files under the same partition function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols) - local after = "" - local has_more = true + -- local has_more = true local prefix = base_path local target_partition = "" - local page = {} + local pager = common.lakefs_object_pager(client, repo_id, commit_id, "", prefix, page_size, "") + local page = pager() return function() - if not has_more then + if page == nil then return nil end local partition_entries = {} repeat if #page == 0 then - local nextPage = common.lakefs_object_pager(client, repo_id, commit_id, after, prefix, page_size, "") - page = nextPage() - if page == nil or #page == 0 then -- no more records - has_more = false + page = pager() + if page == nil then -- no more records return target_partition, partition_entries - else -- set next offset - after = page[#page].path end end - repeat - local entry = page[1] - if not pathlib.is_hidden(entry.path) then - local partition_key = extract_partitions_path(partition_cols, entry.path) - -- first time: if not set, assign current object partition as the target_partition key - if target_partition == "" then - target_partition = partition_key - end - -- break if current entry does not belong to the target_partition - if partition_key ~= target_partition then - local partition_result = target_partition - target_partition = partition_key - return partition_result, partition_entries - end - table.insert(partition_entries, { - physical_address = entry.physical_address, - path = entry.path, - size = entry.size_bytes, - checksum = entry.checksum - }) - -- remove entry only if its part of the current partition - table.remove(page, 1) + -- repeat + local entry = page[1] + if not pathlib.is_hidden(entry.path) then + local partition_key = extract_partitions_path(partition_cols, entry.path) + -- first time: if not set, assign current object partition as the target_partition key + if target_partition == "" then + target_partition = partition_key end - until page == nil or #page == 0 - until not has_more + -- break if current entry does not belong to the target_partition + if partition_key ~= target_partition then + local partition_result = target_partition + target_partition = partition_key + return partition_result, partition_entries + end + table.insert(partition_entries, { + physical_address = entry.physical_address, + path = entry.path, + size = entry.size_bytes, + checksum = entry.checksum + }) + -- remove entry only if its part of the current partition + table.remove(page, 1) + end + until not true -- check if has while True end end From ec5ddd62705d4642fd20336c686df5134b089eae Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Mon, 18 Sep 2023 13:34:05 +0300 Subject: [PATCH 17/27] fix review --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index 624810c9586..8413ad1a926 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -3,15 +3,18 @@ local pathlib = require("path") local common = require("lakefs/catalogexport/common") -- extract partition prefix from full path -function extract_partitions_path(partition_cols, path) - -- list of columns to pattern {a,b,c} -> a=*/b=*/c=*/ - local partition_pattern = table.concat(partition_cols, "=[^/]*/") .. "=[^/]*/" - local re = regexp.compile(partition_pattern) - local match = re.find(path, partition_pattern) - if match == "" then - return nil +function extract_partitions_path(partitions, path) + local idx = 0 + for _, partition in ipairs(partitions) do + local col_substr = partition .. "=" + local i, j = string.find(path, col_substr, idx) + if i == nil then + return "nil" + end + local start_val, end_val = string.find(path, "/", j+1) + idx = end_val end - return match + return string.sub(path, 1, idx) end -- Hive format partition iterator each result set is a collection of files under the same partition From c85c4267b43f64bcc10d65bcd670b53753b2ac02 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Mon, 18 Sep 2023 14:21:06 +0300 Subject: [PATCH 18/27] minor change --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index 8413ad1a926..b8c41602ca3 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -19,7 +19,6 @@ end -- Hive format partition iterator each result set is a collection of files under the same partition function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols) - -- local has_more = true local prefix = base_path local target_partition = "" local pager = common.lakefs_object_pager(client, repo_id, commit_id, "", prefix, page_size, "") From eb21f455d51ca7697205fa8a97023921ecba9786 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Mon, 18 Sep 2023 14:30:57 +0300 Subject: [PATCH 19/27] update whiletrue --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index b8c41602ca3..48eac7ee16c 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -28,14 +28,13 @@ function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page return nil end local partition_entries = {} - repeat + while(true) do if #page == 0 then page = pager() if page == nil then -- no more records return target_partition, partition_entries end end - -- repeat local entry = page[1] if not pathlib.is_hidden(entry.path) then local partition_key = extract_partitions_path(partition_cols, entry.path) @@ -58,7 +57,7 @@ function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page -- remove entry only if its part of the current partition table.remove(page, 1) end - until not true -- check if has while True + end end end From 77bd52d60c5761f77b7ec012e1429327d5938171 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 19 Sep 2023 12:34:01 +0300 Subject: [PATCH 20/27] update final --- .../lua/lakefs/catalogexport/common.lua | 33 ------------ pkg/actions/lua/lakefs/catalogexport/hive.lua | 49 +++-------------- .../lakefs/catalogexport/internal/utils.lua | 52 +++++++++++++++++++ pkg/actions/lua/lakefs/catalogexport/lib.lua | 23 -------- .../lakefs/catalogexport/table_extractor.lua | 16 +++--- pkg/actions/lua/load.go | 2 +- 6 files changed, 69 insertions(+), 106 deletions(-) delete mode 100644 pkg/actions/lua/lakefs/catalogexport/common.lua create mode 100644 pkg/actions/lua/lakefs/catalogexport/internal/utils.lua delete mode 100644 pkg/actions/lua/lakefs/catalogexport/lib.lua diff --git a/pkg/actions/lua/lakefs/catalogexport/common.lua b/pkg/actions/lua/lakefs/catalogexport/common.lua deleted file mode 100644 index 901a47f1566..00000000000 --- a/pkg/actions/lua/lakefs/catalogexport/common.lua +++ /dev/null @@ -1,33 +0,0 @@ -local SHORT_DIGEST_LEN=6 - -function short_digest(digest) - return digest:sub(1, SHORT_DIGEST_LEN) -end - -function lakefs_paginiated_api(api_call, after) - local next_offset = after - local has_more = true - return function() - if not has_more then - return nil - end - local code, resp = api_call(next_offset) - if code < 200 or code >= 300 then - error("lakeFS: api return non-2xx" .. code) - end - has_more = resp.pagination.has_more - next_offset = resp.pagination.next_offset - return resp.results - end -end - -function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter) - return lakefs_paginiated_api(function(next_offset) - return lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size) - end, after) -end - -return { - lakefs_object_pager=lakefs_object_pager, - short_digest=short_digest, -} diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index 48eac7ee16c..c2be871380b 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -1,6 +1,5 @@ -local regexp = require("regexp") local pathlib = require("path") -local common = require("lakefs/catalogexport/common") +local utils = require("lakefs/catalogexport/internal/utils") -- extract partition prefix from full path function extract_partitions_path(partitions, path) @@ -9,7 +8,7 @@ function extract_partitions_path(partitions, path) local col_substr = partition .. "=" local i, j = string.find(path, col_substr, idx) if i == nil then - return "nil" + return nil end local start_val, end_val = string.find(path, "/", j+1) idx = end_val @@ -21,7 +20,7 @@ end function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols) local prefix = base_path local target_partition = "" - local pager = common.lakefs_object_pager(client, repo_id, commit_id, "", prefix, page_size, "") + local pager = utils.api.lakefs_object_pager(client, repo_id, commit_id, "", prefix, page_size, "") local page = pager() return function() if page == nil then @@ -61,41 +60,9 @@ function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page end end --- Define the HiveTableExtractor class -local HiveTableExtractor = {} -HiveTableExtractor.__index = HiveTableExtractor - --- Factory function to create new instances -function HiveTableExtractor.new(lakefs_client, repo_id, ref_id, commit_id, descriptor) - local self = setmetatable({}, HiveTableExtractor) - self.lakefs_client = lakefs_client - self.repo_id = repo_id - self.ref_id = ref_id - self.commit_id = commit_id - self.descriptor = descriptor - self.path = descriptor.path - self.name = descriptor.name - self.schema = descriptor.schema - self.partition_cols = descriptor.partition_columns or {} - return self -end - --- Define methods -function HiveTableExtractor:description() - return string.format('Hive table representation for `lakefs://%s/%s/%s` digest: %s', self.repo_id, self.ref_id, - tostring(self.path), common.short_digest(self.commit_id)) -end - -function HiveTableExtractor:version() - return 0 -end - -function HiveTableExtractor:partition_pager(page_size) - return lakefs_hive_partition_pager(self.lakefs_client, self.repo_id, self.commit_id, self.path, page_size, - self.partition_cols) -end - return { - HiveTableExtractor = HiveTableExtractor, - lakefs_hive_partition_it = lakefs_hive_partition_it -} + TableExtractor = { + DEFAULT_PAGE_SIZE_PARTITION = 30, + lakefs_hive_partition_pager=lakefs_hive_partition_pager, + } +} \ No newline at end of file diff --git a/pkg/actions/lua/lakefs/catalogexport/internal/utils.lua b/pkg/actions/lua/lakefs/catalogexport/internal/utils.lua new file mode 100644 index 00000000000..c1e60c84cc3 --- /dev/null +++ b/pkg/actions/lua/lakefs/catalogexport/internal/utils.lua @@ -0,0 +1,52 @@ +local DEFAULT_SHORT_DIGEST_LEN=6 + +function short_digest(digest, len) + return digest:sub(1, len) +end + +-- paginate lakefs api +function lakefs_paginiated_api(api_call, after) + local next_offset = after + local has_more = true + return function() + if not has_more then + return nil + end + local code, resp = api_call(next_offset) + if code < 200 or code >= 300 then + error("lakeFS: api return non-2xx" .. code) + end + has_more = resp.pagination.has_more + next_offset = resp.pagination.next_offset + return resp.results + end +end + +-- paginage over lakefs objects +function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter) + return lakefs_paginiated_api(function(next_offset) + return lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size) + end, after) +end + +-- resolve ref value from action global, used as part of setting default table name +function ref_from_branch_or_tag(action_info) + local event = action_info.event_type + if event == "pre-create-tag" or event == "post-create-tag" then + return action_info.tag_id + elseif event == "pre-create-branch" or event == "post-create-branch" or "post-commit" or "post-merge" then + return action_info.branch_id + else + error("unsupported event type: " .. action_info.event_type) + end +end + +return { + DEFAULT_SHORT_DIGEST_LEN=DEFAULT_SHORT_DIGEST_LEN, + short_digest=short_digest, + ref_from_branch_or_tag=ref_from_branch_or_tag, + api = { + lakefs_object_pager=lakefs_object_pager, + lakefs_paginiated_api=lakefs_paginiated_api, + } +} \ No newline at end of file diff --git a/pkg/actions/lua/lakefs/catalogexport/lib.lua b/pkg/actions/lua/lakefs/catalogexport/lib.lua deleted file mode 100644 index ec3f834b3c0..00000000000 --- a/pkg/actions/lua/lakefs/catalogexport/lib.lua +++ /dev/null @@ -1,23 +0,0 @@ -local extractor = require("lakefs/catalogexport/table_extractor") -local common = require("lakefs/catalogexport/common") - --- resolve ref value from action global -function ref_from_branch_or_tag(action_info) - event = action_info.event_type - if event == "pre-create-tag" or event == "post-create-tag" then - return action_info.tag_id - elseif event == "pre-create-branch" or event == "post-create-branch" or "post-commit" or "post-merge" then - return action_info.branch_id - else - error("unsupported event type: " .. action_info.event_type) - end -end - - -return { - -- for testing purposes with scripts - internal = { - extractor = extractor - }, - ref_from_branch_or_tag = ref_from_branch_or_tag -} diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index 678ffd47935..e2e7fca5128 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -1,8 +1,8 @@ local pathlib = require("path") local strings = require("strings") local yaml = require("encoding/yaml") -local common = require("lakefs/catalogexport/common") -local hive = require("lakefs/catalogexport/hive") +local utils = require("lakefs/catalogexport/internal/utils") +local HiveTableExtractor = require("lakefs/catalogexport/hive") local LAKEFS_TABLES_BASE = "_lakefs_tables/" @@ -12,7 +12,7 @@ function is_table_obj(entry, tables_base) return false end local path = entry.path - if strings.has_prefix(path, tables_base) then + if strings.has_prefix(path, tables_base) then -- remove _lakefs_tables/ from path path = entry.path:sub(#tables_base, #path) end @@ -23,7 +23,7 @@ end function list_table_descriptor_files(client, repo_id, commit_id) local table_entries = {} local page_size = 30 - local iter = common.lakefs_object_pager(client, repo_id, commit_id, "", LAKEFS_TABLES_BASE, page_size, "") + local iter = utils.api.lakefs_object_pager(client, repo_id, commit_id, "", LAKEFS_TABLES_BASE, page_size, "") for entries in iter do for _, entry in ipairs(entries) do if is_table_obj(entry, LAKEFS_TABLES_BASE) then @@ -39,16 +39,16 @@ end -- table as parsed YAML object function get_table_descriptor(client, repo_id, commit_id, logical_path) - code, content = client.get_object(repo_id, commit_id, logical_path) + local code, content = client.get_object(repo_id, commit_id, logical_path) if code ~= 200 then error("could not fetch data file: HTTP " .. tostring(code)) end - descriptor = yaml.unmarshal(content) + local descriptor = yaml.unmarshal(content) + descriptor.partition_columns = descriptor.partition_columns or {} return descriptor end return { list_table_descriptor_files = list_table_descriptor_files, get_table_descriptor = get_table_descriptor, - HiveTableExtractor = hive.HiveTableExtractor, -} +} \ No newline at end of file diff --git a/pkg/actions/lua/load.go b/pkg/actions/lua/load.go index 2ec07ba0e88..8f83c2a99db 100644 --- a/pkg/actions/lua/load.go +++ b/pkg/actions/lua/load.go @@ -19,7 +19,7 @@ const ( defaultPath = "?.lua" ) -//go:embed lakefs/catalogexport/*.lua +//go:embed lakefs/catalogexport/*.lua lakefs/catalogexport/internal/*.lua var luaEmbeddedCode embed.FS var ErrNoFile = errors.New("no file") From bdd7c2b01ccbc8f4ce28a6f0c4188324618645a3 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 19 Sep 2023 12:57:44 +0300 Subject: [PATCH 21/27] use short_digest default --- pkg/actions/lua/lakefs/catalogexport/internal/utils.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/internal/utils.lua b/pkg/actions/lua/lakefs/catalogexport/internal/utils.lua index c1e60c84cc3..05ac4aa30d9 100644 --- a/pkg/actions/lua/lakefs/catalogexport/internal/utils.lua +++ b/pkg/actions/lua/lakefs/catalogexport/internal/utils.lua @@ -1,7 +1,7 @@ local DEFAULT_SHORT_DIGEST_LEN=6 function short_digest(digest, len) - return digest:sub(1, len) + return digest:sub(1, len or DEFAULT_SHORT_DIGEST_LEN) end -- paginate lakefs api From 82fde2f714fe1d88ecd5b9185f97bd6a95163446 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 19 Sep 2023 13:04:50 +0300 Subject: [PATCH 22/27] fix page size param --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index c2be871380b..86f1310fe43 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -1,5 +1,6 @@ local pathlib = require("path") local utils = require("lakefs/catalogexport/internal/utils") +local DEFAULT_PAGE_SIZE = 30 -- extract partition prefix from full path function extract_partitions_path(partitions, path) @@ -17,10 +18,10 @@ function extract_partitions_path(partitions, path) end -- Hive format partition iterator each result set is a collection of files under the same partition -function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols) +function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, partition_cols, page_size) local prefix = base_path local target_partition = "" - local pager = utils.api.lakefs_object_pager(client, repo_id, commit_id, "", prefix, page_size, "") + local pager = utils.api.lakefs_object_pager(client, repo_id, commit_id, "", prefix, page_size or DEFAULT_PAGE_SIZE) local page = pager() return function() if page == nil then @@ -62,7 +63,6 @@ end return { TableExtractor = { - DEFAULT_PAGE_SIZE_PARTITION = 30, lakefs_hive_partition_pager=lakefs_hive_partition_pager, } } \ No newline at end of file From be779695033ad5e1d8dbf73fc51450b74dccd82a Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 19 Sep 2023 16:00:36 +0300 Subject: [PATCH 23/27] fix review --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 18 ++++++++++++------ .../{internal/utils.lua => internal.lua} | 13 +++++-------- .../lakefs/catalogexport/table_extractor.lua | 10 +++++----- pkg/actions/lua/load.go | 2 +- 4 files changed, 23 insertions(+), 20 deletions(-) rename pkg/actions/lua/lakefs/catalogexport/{internal/utils.lua => internal.lua} (82%) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index 86f1310fe43..cbd222f3744 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -1,27 +1,33 @@ local pathlib = require("path") -local utils = require("lakefs/catalogexport/internal/utils") +local utils = require("lakefs/catalogexport/internal") local DEFAULT_PAGE_SIZE = 30 -- extract partition prefix from full path function extract_partitions_path(partitions, path) - local idx = 0 + if partitions == nil or #partitions == 0 then + return "" + end + local idx = 1 for _, partition in ipairs(partitions) do local col_substr = partition .. "=" local i, j = string.find(path, col_substr, idx) if i == nil then return nil end - local start_val, end_val = string.find(path, "/", j+1) - idx = end_val + local separator_idx = string.find(path, "/", j+1) + -- verify / found and there is something in between = ... / + if separator_idx == nil or separator_idx <= (j + 1) then + return nil + end + idx = separator_idx end return string.sub(path, 1, idx) end -- Hive format partition iterator each result set is a collection of files under the same partition function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, partition_cols, page_size) - local prefix = base_path local target_partition = "" - local pager = utils.api.lakefs_object_pager(client, repo_id, commit_id, "", prefix, page_size or DEFAULT_PAGE_SIZE) + local pager = utils.lakefs_object_pager(client, repo_id, commit_id, "", base_path,"", page_size or DEFAULT_PAGE_SIZE) local page = pager() return function() if page == nil then diff --git a/pkg/actions/lua/lakefs/catalogexport/internal/utils.lua b/pkg/actions/lua/lakefs/catalogexport/internal.lua similarity index 82% rename from pkg/actions/lua/lakefs/catalogexport/internal/utils.lua rename to pkg/actions/lua/lakefs/catalogexport/internal.lua index 05ac4aa30d9..98ca7f2a749 100644 --- a/pkg/actions/lua/lakefs/catalogexport/internal/utils.lua +++ b/pkg/actions/lua/lakefs/catalogexport/internal.lua @@ -14,7 +14,7 @@ function lakefs_paginiated_api(api_call, after) end local code, resp = api_call(next_offset) if code < 200 or code >= 300 then - error("lakeFS: api return non-2xx" .. code) + error("lakeFS: api return non-2xx" .. tostring(code)) end has_more = resp.pagination.has_more next_offset = resp.pagination.next_offset @@ -23,9 +23,9 @@ function lakefs_paginiated_api(api_call, after) end -- paginage over lakefs objects -function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter) +function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, delimiter, page_size) return lakefs_paginiated_api(function(next_offset) - return lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size) + return lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size or 30) end, after) end @@ -42,11 +42,8 @@ function ref_from_branch_or_tag(action_info) end return { - DEFAULT_SHORT_DIGEST_LEN=DEFAULT_SHORT_DIGEST_LEN, short_digest=short_digest, ref_from_branch_or_tag=ref_from_branch_or_tag, - api = { - lakefs_object_pager=lakefs_object_pager, - lakefs_paginiated_api=lakefs_paginiated_api, - } + lakefs_object_pager=lakefs_object_pager, + lakefs_paginiated_api=lakefs_paginiated_api, } \ No newline at end of file diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index e2e7fca5128..f41906cf91a 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -1,7 +1,7 @@ local pathlib = require("path") local strings = require("strings") local yaml = require("encoding/yaml") -local utils = require("lakefs/catalogexport/internal/utils") +local utils = require("lakefs/catalogexport/internal") local HiveTableExtractor = require("lakefs/catalogexport/hive") local LAKEFS_TABLES_BASE = "_lakefs_tables/" @@ -20,11 +20,11 @@ function is_table_obj(entry, tables_base) end -- list all YAML files under _lakefs_tables/* -function list_table_descriptor_files(client, repo_id, commit_id) +function list_table_descriptor_entries(client, repo_id, commit_id) local table_entries = {} local page_size = 30 - local iter = utils.api.lakefs_object_pager(client, repo_id, commit_id, "", LAKEFS_TABLES_BASE, page_size, "") - for entries in iter do + local pager = utils.lakefs_object_pager(client, repo_id, commit_id, "", LAKEFS_TABLES_BASE,"", page_size) + for entries in pager do for _, entry in ipairs(entries) do if is_table_obj(entry, LAKEFS_TABLES_BASE) then table.insert(table_entries, { @@ -49,6 +49,6 @@ function get_table_descriptor(client, repo_id, commit_id, logical_path) end return { - list_table_descriptor_files = list_table_descriptor_files, + list_table_descriptor_entries = list_table_descriptor_entries, get_table_descriptor = get_table_descriptor, } \ No newline at end of file diff --git a/pkg/actions/lua/load.go b/pkg/actions/lua/load.go index 8f83c2a99db..2ec07ba0e88 100644 --- a/pkg/actions/lua/load.go +++ b/pkg/actions/lua/load.go @@ -19,7 +19,7 @@ const ( defaultPath = "?.lua" ) -//go:embed lakefs/catalogexport/*.lua lakefs/catalogexport/internal/*.lua +//go:embed lakefs/catalogexport/*.lua var luaEmbeddedCode embed.FS var ErrNoFile = errors.New("no file") From 9f1de19558fa65d00563bf3e9b327bbb86975361 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 19 Sep 2023 16:16:16 +0300 Subject: [PATCH 24/27] fix --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index cbd222f3744..0678ad04acc 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -1,5 +1,6 @@ local pathlib = require("path") local utils = require("lakefs/catalogexport/internal") +local strings = require("strings") local DEFAULT_PAGE_SIZE = 30 -- extract partition prefix from full path @@ -8,8 +9,13 @@ function extract_partitions_path(partitions, path) return "" end local idx = 1 - for _, partition in ipairs(partitions) do - local col_substr = partition .. "=" + local is_partition_prefix = strings.has_prefix(path, partitions[1]) + for part_idx, partition in ipairs(partitions) do + local col_substr = "/" .. partition .. "=" + -- if partition is the path prefix and we are the that first partition remove / + if part_idx == 1 and is_partition_prefix then + col_substr = partition .. "=" + end local i, j = string.find(path, col_substr, idx) if i == nil then return nil From 7e8097842939e5d7a81e0b9ce447e9e6ab7ab7c3 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 19 Sep 2023 16:33:39 +0300 Subject: [PATCH 25/27] minor fix --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index 0678ad04acc..6edc7b198be 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -40,7 +40,7 @@ function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, part return nil end local partition_entries = {} - while(true) do + while true do if #page == 0 then page = pager() if page == nil then -- no more records From e4e55bd36db5c3533e1e81fd4fb4bb521e51ddb9 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 19 Sep 2023 16:46:24 +0300 Subject: [PATCH 26/27] remove nesting --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 6 ++---- pkg/actions/lua/lakefs/catalogexport/table_extractor.lua | 1 - 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index 6edc7b198be..58757ca3186 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -31,7 +31,7 @@ function extract_partitions_path(partitions, path) end -- Hive format partition iterator each result set is a collection of files under the same partition -function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, partition_cols, page_size) +function extract_partition_pager(client, repo_id, commit_id, base_path, partition_cols, page_size) local target_partition = "" local pager = utils.lakefs_object_pager(client, repo_id, commit_id, "", base_path,"", page_size or DEFAULT_PAGE_SIZE) local page = pager() @@ -74,7 +74,5 @@ function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, part end return { - TableExtractor = { - lakefs_hive_partition_pager=lakefs_hive_partition_pager, - } + extract_partition_pager=extract_partition_pager, } \ No newline at end of file diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index f41906cf91a..30017b75518 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -2,7 +2,6 @@ local pathlib = require("path") local strings = require("strings") local yaml = require("encoding/yaml") local utils = require("lakefs/catalogexport/internal") -local HiveTableExtractor = require("lakefs/catalogexport/hive") local LAKEFS_TABLES_BASE = "_lakefs_tables/" From f31675d9bbbf1644dc9811bd620359f12f875d24 Mon Sep 17 00:00:00 2001 From: Isan Rivkin Date: Tue, 19 Sep 2023 16:54:20 +0300 Subject: [PATCH 27/27] convert to local --- pkg/actions/lua/lakefs/catalogexport/hive.lua | 4 ++-- pkg/actions/lua/lakefs/catalogexport/internal.lua | 8 ++++---- pkg/actions/lua/lakefs/catalogexport/table_extractor.lua | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/hive.lua b/pkg/actions/lua/lakefs/catalogexport/hive.lua index 58757ca3186..f628c63d157 100644 --- a/pkg/actions/lua/lakefs/catalogexport/hive.lua +++ b/pkg/actions/lua/lakefs/catalogexport/hive.lua @@ -4,7 +4,7 @@ local strings = require("strings") local DEFAULT_PAGE_SIZE = 30 -- extract partition prefix from full path -function extract_partitions_path(partitions, path) +local function extract_partitions_path(partitions, path) if partitions == nil or #partitions == 0 then return "" end @@ -31,7 +31,7 @@ function extract_partitions_path(partitions, path) end -- Hive format partition iterator each result set is a collection of files under the same partition -function extract_partition_pager(client, repo_id, commit_id, base_path, partition_cols, page_size) +local function extract_partition_pager(client, repo_id, commit_id, base_path, partition_cols, page_size) local target_partition = "" local pager = utils.lakefs_object_pager(client, repo_id, commit_id, "", base_path,"", page_size or DEFAULT_PAGE_SIZE) local page = pager() diff --git a/pkg/actions/lua/lakefs/catalogexport/internal.lua b/pkg/actions/lua/lakefs/catalogexport/internal.lua index 98ca7f2a749..c33c0eef1c2 100644 --- a/pkg/actions/lua/lakefs/catalogexport/internal.lua +++ b/pkg/actions/lua/lakefs/catalogexport/internal.lua @@ -1,11 +1,11 @@ local DEFAULT_SHORT_DIGEST_LEN=6 -function short_digest(digest, len) +local function short_digest(digest, len) return digest:sub(1, len or DEFAULT_SHORT_DIGEST_LEN) end -- paginate lakefs api -function lakefs_paginiated_api(api_call, after) +local function lakefs_paginiated_api(api_call, after) local next_offset = after local has_more = true return function() @@ -23,14 +23,14 @@ function lakefs_paginiated_api(api_call, after) end -- paginage over lakefs objects -function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, delimiter, page_size) +local function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, delimiter, page_size) return lakefs_paginiated_api(function(next_offset) return lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size or 30) end, after) end -- resolve ref value from action global, used as part of setting default table name -function ref_from_branch_or_tag(action_info) +local function ref_from_branch_or_tag(action_info) local event = action_info.event_type if event == "pre-create-tag" or event == "post-create-tag" then return action_info.tag_id diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index 30017b75518..c57592adbb9 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -6,7 +6,7 @@ local utils = require("lakefs/catalogexport/internal") local LAKEFS_TABLES_BASE = "_lakefs_tables/" -- check if lakefs entry is a table spec under _lakefs_tables/ -function is_table_obj(entry, tables_base) +local function is_table_obj(entry, tables_base) if entry.path_type ~= "object" then return false end @@ -19,7 +19,7 @@ function is_table_obj(entry, tables_base) end -- list all YAML files under _lakefs_tables/* -function list_table_descriptor_entries(client, repo_id, commit_id) +local function list_table_descriptor_entries(client, repo_id, commit_id) local table_entries = {} local page_size = 30 local pager = utils.lakefs_object_pager(client, repo_id, commit_id, "", LAKEFS_TABLES_BASE,"", page_size) @@ -37,7 +37,7 @@ function list_table_descriptor_entries(client, repo_id, commit_id) end -- table as parsed YAML object -function get_table_descriptor(client, repo_id, commit_id, logical_path) +local function get_table_descriptor(client, repo_id, commit_id, logical_path) local code, content = client.get_object(repo_id, commit_id, logical_path) if code ~= 200 then error("could not fetch data file: HTTP " .. tostring(code))