Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table Extractor Hook and _lakefs_tables format #6589

Merged
merged 30 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f51a49e
initial commit
Isan-Rivkin Sep 12, 2023
22c67d1
load lua libs
Isan-Rivkin Sep 12, 2023
a44293e
working table extraction
Isan-Rivkin Sep 12, 2023
a6bec34
fix linter issues
Isan-Rivkin Sep 13, 2023
8755a70
load embedded files dynamicly (#6592)
nopcoder Sep 13, 2023
0327daa
change paths
Isan-Rivkin Sep 13, 2023
3096211
Merge branch 'master' into 6573-table-extractor
Isan-Rivkin Sep 13, 2023
83ef166
fix short digest and paging parameters
Isan-Rivkin Sep 13, 2023
4f02c4d
fix review
Isan-Rivkin Sep 13, 2023
9892648
clean
Isan-Rivkin Sep 13, 2023
ac5978f
Merge branch 'master' into 6573-table-extractor
Isan-Rivkin Sep 13, 2023
e1d8d03
review comments
Isan-Rivkin Sep 14, 2023
72dc0d1
update review
Isan-Rivkin Sep 14, 2023
ef891ae
refactor tables
Isan-Rivkin Sep 14, 2023
9dbb36f
Extractor
Isan-Rivkin Sep 14, 2023
0429e67
review comments
Isan-Rivkin Sep 14, 2023
ce76e29
fix review comments
Isan-Rivkin Sep 18, 2023
f4c3a96
fix iterator
Isan-Rivkin Sep 18, 2023
ec5ddd6
fix review
Isan-Rivkin Sep 18, 2023
c85c426
minor change
Isan-Rivkin Sep 18, 2023
eb21f45
update whiletrue
Isan-Rivkin Sep 18, 2023
77bd52d
update final
Isan-Rivkin Sep 19, 2023
bdd7c2b
use short_digest default
Isan-Rivkin Sep 19, 2023
82fde2f
fix page size param
Isan-Rivkin Sep 19, 2023
be77969
fix review
Isan-Rivkin Sep 19, 2023
9f1de19
fix
Isan-Rivkin Sep 19, 2023
7e80978
minor fix
Isan-Rivkin Sep 19, 2023
e4e55bd
remove nesting
Isan-Rivkin Sep 19, 2023
f31675d
convert to local
Isan-Rivkin Sep 19, 2023
cbc6ee0
align head
Isan-Rivkin Sep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/actions/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func NewLuaHook(h ActionHook, action *Action, cfg Config, e *http.Server) (Hook,
Args: args,
}, nil
} else if !errors.Is(err, errMissingKey) {
// 'script' was provided but is empty or of the wrong type..
// 'script' was provided but is empty or of the wrong type.
return nil, err
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/common.lua
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
function lakefs_object_it(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter)
local next_offset = after
local has_more = true
return function()
if not has_more then
return nil
end
local code, resp = lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size)
if code ~= 200 then
error("lakeFS: could not list objects in: " .. prefix .. ", error: " .. resp.message)
end
local objects = resp.results
has_more = resp.pagination.has_more
next_offset = resp.pagination.next_offset
return objects
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to make it iterator you need to return an object for each call to the callback.
The implementation enable pagination like iteration.
Suggest to make this function more generic lakefs_paginiated_api - callback function that accepts next_offset and the function will work with all our paginated API.
Also, I don't think this code is related to this function and should move to a different package.

function lakefs_paginiated_api(api_call, after="")
    local next_offset = after
    local has_more = true
    return function()
        if not has_more then
            return nil
        end
        local code, resp = api_call(next_offset)
        if code < 200 or code >= 300 then
            error("lakeFS: api return non-2xx" .. code)
        end
        has_more = resp.pagination.has_more
        next_offset = resp.pagination.next_offset
        return resp.results
    end
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed 👍


return {
lakefs_object_it=lakefs_object_it,
SHORT_DIGEST_LEN=6,
LAKEFS_DEFAULT_PAGE_SIZE=30,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • short digest looks like an helper function
  • default page size - prefer to have default per type of request and not a global one. also not sure catalog export is the place for them. You can have DEFAULT_EXPORT_PAGE_SIZE and DEFAULT_HIVE_PAGE_SIZE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed both!

}
22 changes: 22 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/lib.lua
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. how is this one is used?
  2. why it is part of lib? or under catalog export

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
local extractor = require("lakefs/catalogexport/table_extractor")
local common = require("lakefs/catalogexport/common")

-- resolve ref value from action.action.event_type
function ref_from_branch_or_tag()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although action is global variable injected if you are building a common code in lib - which I'm not sure it is related to this package. get the table as first argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 also added locals to the dict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like that - verify there is a commit based on the event and the fact that we like to raise an error / alternative return empty ref

-- Based on the event type, resolve the ref value from the action.
function action_event_ref(action)
    local ref
    if action.event_type == "pre-create-tag" or action.event_type "post-create-tag" then
        return action.tag_id
    elseif action.event_type == "pre-create-branch" or action.event_type == "post-create-branch" then
        ref = action.branch_id
    elseif action.event_type == "post-commit" or action.event_type == "post-merge" then
        ref = action.commit_id
    else
        error("unsupported event type: " .. action.event_type)
    end
    return ref
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did something similiar see 👍

tag_events = { ["pre-create-tag"] = true, ["post-create-tag"] = true }
branch_events = { ["pre-create-branch"] = true, ["post-create-branch"] = true }
commit_events = { ["post-commit"] = true, ["post-merge"] = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are using three tables for 2 items lookup - use if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

local ref
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if tag_events[action.event_type] then
return action.tag_id, nil
elseif branch_events[action.event_type] or commit_events[action.event_type] then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why commit gets branch_id and not commit_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because i have the commit_id, i need the reference to the HEAD of the current commit (specifically when creating symlinks ref + commit_id are used)

return action.branch_id, nil
else
return nil, "unsupported event type: " .. action.event_type
end
end

return {
TableExtractor = extractor.TableExtractor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we export a function from a different module I can require?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the end goal is for the user to require 1 single package and lib.lua will be the interface for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we like single package we need to expose it as catalogexport?

local catalogexport = require("lakefs/catalogexport")

catalogexport.Export(...)

ref_from_branch_or_tag=ref_from_branch_or_tag,
}
183 changes: 183 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/table_extractor.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
local pathlib = require("path")
local strings = require("strings")
local yaml = require("encoding/yaml")
local lakefs = require("lakefs")
local json = require("encoding/json")
local common = require("lakefs/catalogexport/common")

-- return partition table from path by based on columns in partition_cols
function get_partition_values(partition_cols, path)
local vals = {}
splitted_path = strings.split(path, pathlib.default_separator())
for _, part in pairs(splitted_path) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ipairs as we run over split result is index based array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted code

for _, col in ipairs(partition_cols) do
local prefix = col .. "="
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you switch between the inner and outer loop you will perform perfix concat one per column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed that code

if strings.has_prefix(part, prefix) then
vals[col] = part:sub(#prefix + 1)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when there is a match we can break

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed that code

end
end
return vals
end

-- extract partition substr from full path
function extract_partition_prefix_from_path(partition_cols, path)
local partitions = get_partition_values(partition_cols, path)
local partitions_list = {}
-- iterate partition_cols to maintain order
for _, col in pairs(partition_cols) do
table.insert(partitions_list, col .. "=" .. partitions[col])
end
local partition_key = table.concat(partitions_list, pathlib.default_separator())
return partition_key
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I got it right the extract_partition_prefix_from_path users get_parition_values and the end result is getting the part of the object path based on the paritions / columns.

I assume that the columns are ordered based on the same structure as the object path.
Also, that the number of partitions must match the path information.

Base on the above I suggest the following:

function extract_partitions_path(partitions, path)
    local idx = 0
    for _, partition in ipairs(partitions) do
        local pattern = partition .. "=[^/]*"
        local i, j = string.find(path, pattern, idx)
        if i == nil then
            return nil
        end
        idx = j + 1
    end
    return string.sub(path, 1, idx)
end

It should return the relevant part of the path based on the partitioning used to write the data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better thanks! Did almost that (added comment in the code) 👍


-- Hive format partition iterator each result set is a collection of files under the same partition
function lakefs_hive_partition_it(client, repo_id, commit_id, base_path, page_size, delimiter, partition_cols)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have hive.lua that expose this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outdated. (+ only table extractor uses this code)

local after = ""
local has_more = true
local prefix = base_path
local target_partition = ""
return function()
if not has_more then
return nil
end
local partition_entries = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition_entries should be the same level as target_partition? or it means that the caller will have to handle multiple calls that will have the same target.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be like this, not sure what you mean? From testing it looks fine.

local iter = common.lakefs_object_it(client, repo_id, commit_id, after, prefix, page_size, delimiter)
for entries in iter do
for _, entry in ipairs(entries) do
is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name)
if not is_hidden and entry.path_type == "object" then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of recursive listing, all are objects

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed 👍

local partition_key = extract_partition_prefix_from_path(partition_cols, entry.path)
-- first time: if not set, assign current object partition as the target_partition key
if target_partition == "" then
target_partition = partition_key
end
-- break if current entry does not belong to the target_partition
if partition_key ~= target_partition then
-- next time start searching AFTER the last used key
after = partition_entries[#partition_entries].path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we are not using yield and trust our after it can have very bad impact on our users as we may do call per record where each call we pull a page of records and continue to request per record in case of unique values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed but, when we need we can add coroutine (yield) into the runtime and improve this, for now I think that's the tradeoff of paging over partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the issue is not with paging is multiple listing over lakefs because of partition.

local partition_result = target_partition
target_partition = partition_key
return partition_result, partition_entries
end
table.insert(partition_entries, {
physical_address = entry.physical_address,
path = entry.path,
size = entry.size_bytes,
checksum = entry.checksum
})
end
end
end
has_more = false
return target_partition, partition_entries
end
end

-- Define the HiveTable class
local HiveTable = {}
HiveTable.__index = HiveTable

-- Factory function to create new instances
function HiveTable.new(repo_id, ref_id, commit_id, path, name, partition_cols, schema)
local self = setmetatable({}, HiveTable)
self.repo_id = repo_id
self.ref_id = ref_id
self.commit_id = commit_id
self._path = path
self._name = name
self.schema = schema
self.partition_cols = partition_cols
self._iter_page_size = common.LAKEFS_DEFAULT_PAGE_SIZE
return self
end

-- Define methods
function HiveTable:name()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see a reason to generate a class for hive table - we can just return a table with this information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

return self._name
end

function HiveTable:description()
return string.format('Hive table representation for `lakefs://%s/%s/%s` digest: %s', self.repo_id, self.ref_id,
tostring(self._path), self.commit_id:sub(1, common.SHORT_DIGEST_LEN))
end

function HiveTable:path()
return self._path
end

function HiveTable:min_reader_version()
return 1
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted


function HiveTable:schema_string()
return json.marshal(self.schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getter method that perform marshal should name differently

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted

end

function HiveTable:partition_columns()
return self.partition_cols
end

function HiveTable:version()
return 0
end

function HiveTable:partition_iterator()
return lakefs_hive_partition_it(lakefs, self.repo_id, self.commit_id, self._path, self._iter_page_size, "",
self.partition_cols)
end

local TableExtractor = {}
TableExtractor.__index = TableExtractor

function TableExtractor.new(repository_id, ref, commit_id)
local self = setmetatable({}, TableExtractor)
self.tables_registry_base = pathlib.join(pathlib.default_separator(), '_lakefs_tables/')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this call provides?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

self.repository_id = repository_id
self.commit_id = commit_id
self.ref = ref
self._iter_page_size = common.LAKEFS_DEFAULT_PAGE_SIZE
return self
end

-- list all YAML files in _lakefs_tables
function TableExtractor:list_table_definitions()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module should extract this function - I don't see any reason to have this one as an object and initialize it with repo, commit, ref while for listing I don't use ref and for get table in specific case I use it.

Should be enable to get a list of tables from the function.
About get_table - it does more than get it uses the ref we provide to HiveTable - so I think getting a table is just parse yaml object, the rest is related to hive and should be handle by hive function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed everything :)

local table_entries = {}
local iter = common.lakefs_object_it(lakefs, self.repository_id, self.commit_id, "", self.tables_registry_base,
self._iter_page_size, "")
for entries in iter do
for _, entry in ipairs(entries) do
is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. is_hidden should accept prefix
  2. check hidden is result of checking prefix after parse as we do parse the path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the code now a bit to make sure the rest of the path is considered.
Trying to make it work with only is_hidden params (suffix, separator) makes it look very confusing and convoluted.

is_yaml = strings.has_suffix(entry.path, ".yaml")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. use local for temporary variables
  2. suggest: we are not doing lazy eval - if a complex if is hard to read we can extract the check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 👍

if not is_hidden and is_yaml and entry.path_type == "object" then
table.insert(table_entries, {
physical_address = entry.physical_address,
path = entry.path
})
end
end
end
return table_entries
end

-- return concrete table instance
function TableExtractor:get_table(logical_path)
code, content = lakefs.get_object(self.repository_id, self.commit_id, logical_path)
if code ~= 200 then
error("could not fetch data file: HTTP " .. tostring(code))
end

descriptor = yaml.unmarshal(content)
-- TODO(isan) decide where to handle different table parsing? (delta table / glue table etc)
if descriptor.type == 'hive' then
return HiveTable.new(self.repository_id, self.ref, self.commit_id, descriptor.path, descriptor.name,
descriptor.partition_columns or {}, descriptor.schema), nil
end
return nil, "NotImplemented: table type: " .. descriptor.type .. " path: " .. logical_path
end

return {
TableExtractor = TableExtractor,
lakefs_hive_partition_it = lakefs_hive_partition_it
}
Loading
Loading