Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table Extractor Hook and _lakefs_tables format #6589

Merged
merged 30 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f51a49e
initial commit
Isan-Rivkin Sep 12, 2023
22c67d1
load lua libs
Isan-Rivkin Sep 12, 2023
a44293e
working table extraction
Isan-Rivkin Sep 12, 2023
a6bec34
fix linter issues
Isan-Rivkin Sep 13, 2023
8755a70
load embedded files dynamicly (#6592)
nopcoder Sep 13, 2023
0327daa
change paths
Isan-Rivkin Sep 13, 2023
3096211
Merge branch 'master' into 6573-table-extractor
Isan-Rivkin Sep 13, 2023
83ef166
fix short digest and paging parameters
Isan-Rivkin Sep 13, 2023
4f02c4d
fix review
Isan-Rivkin Sep 13, 2023
9892648
clean
Isan-Rivkin Sep 13, 2023
ac5978f
Merge branch 'master' into 6573-table-extractor
Isan-Rivkin Sep 13, 2023
e1d8d03
review comments
Isan-Rivkin Sep 14, 2023
72dc0d1
update review
Isan-Rivkin Sep 14, 2023
ef891ae
refactor tables
Isan-Rivkin Sep 14, 2023
9dbb36f
Extractor
Isan-Rivkin Sep 14, 2023
0429e67
review comments
Isan-Rivkin Sep 14, 2023
ce76e29
fix review comments
Isan-Rivkin Sep 18, 2023
f4c3a96
fix iterator
Isan-Rivkin Sep 18, 2023
ec5ddd6
fix review
Isan-Rivkin Sep 18, 2023
c85c426
minor change
Isan-Rivkin Sep 18, 2023
eb21f45
update whiletrue
Isan-Rivkin Sep 18, 2023
77bd52d
update final
Isan-Rivkin Sep 19, 2023
bdd7c2b
use short_digest default
Isan-Rivkin Sep 19, 2023
82fde2f
fix page size param
Isan-Rivkin Sep 19, 2023
be77969
fix review
Isan-Rivkin Sep 19, 2023
9f1de19
fix
Isan-Rivkin Sep 19, 2023
7e80978
minor fix
Isan-Rivkin Sep 19, 2023
e4e55bd
remove nesting
Isan-Rivkin Sep 19, 2023
f31675d
convert to local
Isan-Rivkin Sep 19, 2023
cbc6ee0
align head
Isan-Rivkin Sep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/actions/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func NewLuaHook(h ActionHook, action *Action, cfg Config, e *http.Server) (Hook,
Args: args,
}, nil
} else if !errors.Is(err, errMissingKey) {
// 'script' was provided but is empty or of the wrong type..
// 'script' was provided but is empty or of the wrong type.
return nil, err
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/common.lua
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
local SHORT_DIGEST_LEN=6

function short_digest(digest)
return digest:sub(1, SHORT_DIGEST_LEN)
end

function lakefs_paginiated_api(api_call, after)
local next_offset = after
local has_more = true
return function()
if not has_more then
return nil
end
local code, resp = api_call(next_offset)
if code < 200 or code >= 300 then
error("lakeFS: api return non-2xx" .. code)
end
has_more = resp.pagination.has_more
next_offset = resp.pagination.next_offset
return resp.results
end
end

function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. you are not exporting this function
  2. you are calling list_objects -> lakefs_list_objects_pager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you're saying, please explain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lakefs_paginiated_api is the money maker - I can use it with multiple api supplied by the lakefs package.
The common package under catalogexport use it just for listing object and expose this functionality only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lakefs_paginiated_api is still not exported

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When someone needs lakefs_paginiated_api outside of the package, we can expose it - no? what am i missing?

return lakefs_paginiated_api(function(next_offset)
return lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size)
end, after)
end

return {
lakefs_object_pager=lakefs_object_pager,
short_digest=short_digest,
}
19 changes: 19 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/lib.lua
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. how is this one is used?
  2. why it is part of lib? or under catalog export

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local extractor = require("lakefs/catalogexport/table_extractor")
local common = require("lakefs/catalogexport/common")

-- resolve ref value from action global
function ref_from_branch_or_tag(action_info)
event = action_info.event_type
if event == "pre-create-tag" or event == "post-create-tag" then
return action_info.tag_id, nil
elseif event == "pre-create-branch" or event == "post-create-branch" or "post-commit" or "post-merge" then
return action_info.branch_id, nil
else
return nil, "unsupported event type: " .. action_info.event_type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning (data, err) is Go practice not Lua.
Indicate invalid state by calling an error or by return nil or false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, 💯

end
end

return {
TableExtractor = extractor.TableExtractor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we export a function from a different module I can require?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the end goal is for the user to require 1 single package and lib.lua will be the interface for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we like single package we need to expose it as catalogexport?

local catalogexport = require("lakefs/catalogexport")

catalogexport.Export(...)

ref_from_branch_or_tag = ref_from_branch_or_tag
}
186 changes: 186 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/table_extractor.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
local pathlib = require("path")
local strings = require("strings")
local yaml = require("encoding/yaml")
local lakefs = require("lakefs")
local json = require("encoding/json")
local common = require("lakefs/catalogexport/common")

-- return partition table from path by based on columns in partition_cols
function get_partition_values(partition_cols, path)
local vals = {}
splitted_path = strings.split(path, pathlib.default_separator())
for _, part in pairs(splitted_path) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ipairs as we run over split result is index based array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted code

for _, col in ipairs(partition_cols) do
local prefix = col .. "="
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you switch between the inner and outer loop you will perform perfix concat one per column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed that code

if strings.has_prefix(part, prefix) then
vals[col] = part:sub(#prefix + 1)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when there is a match we can break

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed that code

end
end
return vals
end

-- extract partition substr from full path
function extract_partition_prefix_from_path(partition_cols, path)
local partitions = get_partition_values(partition_cols, path)
local partitions_list = {}
-- iterate partition_cols to maintain order
for _, col in pairs(partition_cols) do
table.insert(partitions_list, col .. "=" .. partitions[col])
end
local partition_key = table.concat(partitions_list, pathlib.default_separator())
return partition_key
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I got it right the extract_partition_prefix_from_path users get_parition_values and the end result is getting the part of the object path based on the paritions / columns.

I assume that the columns are ordered based on the same structure as the object path.
Also, that the number of partitions must match the path information.

Base on the above I suggest the following:

function extract_partitions_path(partitions, path)
    local idx = 0
    for _, partition in ipairs(partitions) do
        local pattern = partition .. "=[^/]*"
        local i, j = string.find(path, pattern, idx)
        if i == nil then
            return nil
        end
        idx = j + 1
    end
    return string.sub(path, 1, idx)
end

It should return the relevant part of the path based on the partitioning used to write the data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better thanks! Did almost that (added comment in the code) 👍


-- Hive format partition iterator each result set is a collection of files under the same partition
function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, delimiter, partition_cols)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is relevant to some functions here - how do we plan to support delimiter?
As I see it, we can use delimiter and pass it to list object as long as it is not the default delimiter.
When using a delimiter for listing we will not get recursive listing under prefix - it means that we will not get any partition information.

Please verify that we need recursive listing, and if this is the case I prefer not to pass any delimiter to these APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the support for delimiter, we only need recursive.

local after = ""
local has_more = true
local prefix = base_path
local target_partition = ""
return function()
if not has_more then
return nil
end
local partition_entries = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition_entries should be the same level as target_partition? or it means that the caller will have to handle multiple calls that will have the same target.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be like this, not sure what you mean? From testing it looks fine.

local iter = common.lakefs_object_pager(client, repo_id, commit_id, after, prefix, page_size, delimiter)
for entries in iter do
for _, entry in ipairs(entries) do
is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name)
if not is_hidden and entry.path_type == "object" then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of recursive listing, all are objects

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed 👍

local partition_key = extract_partition_prefix_from_path(partition_cols, entry.path)
-- first time: if not set, assign current object partition as the target_partition key
if target_partition == "" then
target_partition = partition_key
end
-- break if current entry does not belong to the target_partition
if partition_key ~= target_partition then
-- next time start searching AFTER the last used key
after = partition_entries[#partition_entries].path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we are not using yield and trust our after it can have very bad impact on our users as we may do call per record where each call we pull a page of records and continue to request per record in case of unique values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed but, when we need we can add coroutine (yield) into the runtime and improve this, for now I think that's the tradeoff of paging over partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the issue is not with paging is multiple listing over lakefs because of partition.

local partition_result = target_partition
target_partition = partition_key
return partition_result, partition_entries
end
table.insert(partition_entries, {
physical_address = entry.physical_address,
path = entry.path,
size = entry.size_bytes,
checksum = entry.checksum
})
end
end
end
has_more = false
return target_partition, partition_entries
end
end

-- Define the HiveTable class
local HiveTable = {}
HiveTable.__index = HiveTable

-- Factory function to create new instances
function HiveTable.new(repo_id, ref_id, commit_id, path, name, partition_cols, schema, iter_page_size)
local self = setmetatable({}, HiveTable)
self.repo_id = repo_id
self.ref_id = ref_id
self.commit_id = commit_id
self._path = path
self._name = name
self.schema = schema
self.partition_cols = partition_cols
self._iter_page_size = iter_page_size
return self
end

-- Define methods
function HiveTable:name()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see a reason to generate a class for hive table - we can just return a table with this information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

return self._name
end

function HiveTable:description()
return string.format('Hive table representation for `lakefs://%s/%s/%s` digest: %s', self.repo_id, self.ref_id,
tostring(self._path), common.short_digest(self.commit_id))
end

function HiveTable:path()
return self._path
end

function HiveTable:min_reader_version()
return 1
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted


function HiveTable:schema_string()
return json.marshal(self.schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getter method that perform marshal should name differently

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted

end

function HiveTable:partition_columns()
return self.partition_cols
end

function HiveTable:version()
return 0
end

function HiveTable:partition_pager()
return lakefs_hive_partition_pager(lakefs, self.repo_id, self.commit_id, self._path, self._iter_page_size, "",
self.partition_cols)
end

local TableExtractor = {}
TableExtractor.__index = TableExtractor

function TableExtractor.new(repository_id, ref, commit_id, tables_iter_page_size, export_iter_page_size)
local self = setmetatable({}, TableExtractor)
self.tables_registry_base = '_lakefs_tables/'
self.repository_id = repository_id
self.commit_id = commit_id
self.ref = ref
-- object iterator when listing _lakefs_tables/*
self._iter_page_size = tables_iter_page_size
-- object iteration when listing partitions to export (table objects)
self._export_iter_page_size = export_iter_page_size
return self
end

-- list all YAML files in _lakefs_tables
function TableExtractor:list_table_definitions()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module should extract this function - I don't see any reason to have this one as an object and initialize it with repo, commit, ref while for listing I don't use ref and for get table in specific case I use it.

Should be enable to get a list of tables from the function.
About get_table - it does more than get it uses the ref we provide to HiveTable - so I think getting a table is just parse yaml object, the rest is related to hive and should be handle by hive function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed everything :)

local table_entries = {}
local iter = common.lakefs_object_pager(lakefs, self.repository_id, self.commit_id, "", self.tables_registry_base,
self._iter_page_size, "")
for entries in iter do
for _, entry in ipairs(entries) do
is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. is_hidden should accept prefix
  2. check hidden is result of checking prefix after parse as we do parse the path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the code now a bit to make sure the rest of the path is considered.
Trying to make it work with only is_hidden params (suffix, separator) makes it look very confusing and convoluted.

is_yaml = strings.has_suffix(entry.path, ".yaml")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. use local for temporary variables
  2. suggest: we are not doing lazy eval - if a complex if is hard to read we can extract the check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 👍

if not is_hidden and is_yaml and entry.path_type == "object" then
table.insert(table_entries, {
physical_address = entry.physical_address,
path = entry.path
})
end
end
end
return table_entries
end

-- return concrete table instance
function TableExtractor:get_table(logical_path)
code, content = lakefs.get_object(self.repository_id, self.commit_id, logical_path)
if code ~= 200 then
error("could not fetch data file: HTTP " .. tostring(code))
end

descriptor = yaml.unmarshal(content)
-- TODO(isan) decide where to handle different table parsing? (delta table / glue table etc)
if descriptor.type == 'hive' then
return HiveTable.new(self.repository_id, self.ref, self.commit_id, descriptor.path, descriptor.name,
descriptor.partition_columns or {}, descriptor.schema, self._export_iter_page_size), nil
end
return nil, "NotImplemented: table type: " .. descriptor.type .. " path: " .. logical_path
end

return {
TableExtractor = TableExtractor,
lakefs_hive_partition_pager = lakefs_hive_partition_pager
}
Loading