Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table Extractor Hook and _lakefs_tables format #6589

Merged
merged 30 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f51a49e
initial commit
Isan-Rivkin Sep 12, 2023
22c67d1
load lua libs
Isan-Rivkin Sep 12, 2023
a44293e
working table extraction
Isan-Rivkin Sep 12, 2023
a6bec34
fix linter issues
Isan-Rivkin Sep 13, 2023
8755a70
load embedded files dynamicly (#6592)
nopcoder Sep 13, 2023
0327daa
change paths
Isan-Rivkin Sep 13, 2023
3096211
Merge branch 'master' into 6573-table-extractor
Isan-Rivkin Sep 13, 2023
83ef166
fix short digest and paging parameters
Isan-Rivkin Sep 13, 2023
4f02c4d
fix review
Isan-Rivkin Sep 13, 2023
9892648
clean
Isan-Rivkin Sep 13, 2023
ac5978f
Merge branch 'master' into 6573-table-extractor
Isan-Rivkin Sep 13, 2023
e1d8d03
review comments
Isan-Rivkin Sep 14, 2023
72dc0d1
update review
Isan-Rivkin Sep 14, 2023
ef891ae
refactor tables
Isan-Rivkin Sep 14, 2023
9dbb36f
Extractor
Isan-Rivkin Sep 14, 2023
0429e67
review comments
Isan-Rivkin Sep 14, 2023
ce76e29
fix review comments
Isan-Rivkin Sep 18, 2023
f4c3a96
fix iterator
Isan-Rivkin Sep 18, 2023
ec5ddd6
fix review
Isan-Rivkin Sep 18, 2023
c85c426
minor change
Isan-Rivkin Sep 18, 2023
eb21f45
update whiletrue
Isan-Rivkin Sep 18, 2023
77bd52d
update final
Isan-Rivkin Sep 19, 2023
bdd7c2b
use short_digest default
Isan-Rivkin Sep 19, 2023
82fde2f
fix page size param
Isan-Rivkin Sep 19, 2023
be77969
fix review
Isan-Rivkin Sep 19, 2023
9f1de19
fix
Isan-Rivkin Sep 19, 2023
7e80978
minor fix
Isan-Rivkin Sep 19, 2023
e4e55bd
remove nesting
Isan-Rivkin Sep 19, 2023
f31675d
convert to local
Isan-Rivkin Sep 19, 2023
cbc6ee0
align head
Isan-Rivkin Sep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/actions/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Shopify/go-lua"
lualibs "github.com/treeverse/lakefs/pkg/actions/lua"
"github.com/treeverse/lakefs/pkg/actions/lua/lakefs"
catalogexport "github.com/treeverse/lakefs/pkg/actions/lua/lakefs/catalog_export"
luautil "github.com/treeverse/lakefs/pkg/actions/lua/util"
"github.com/treeverse/lakefs/pkg/auth"
"github.com/treeverse/lakefs/pkg/auth/model"
Expand Down Expand Up @@ -66,6 +67,7 @@ func injectHookContext(l *lua.State, ctx context.Context, user *model.User, endp
luautil.DeepPush(l, args)
l.SetGlobal("args")
lakefs.OpenClient(l, ctx, user, endpoint)
catalogexport.OpenLuaPackage(l, ctx)
}

type loggingBuffer struct {
Expand Down
24 changes: 24 additions & 0 deletions pkg/actions/lua/lakefs/catalog_export/common.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

function lakefs_object_it(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter)
local next_offset = after
local has_more = true
return function()
if not has_more then
return nil
end
local code, resp = lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size)
if code ~= 200 then
error("lakeFS: could not list objects in: " .. prefix .. ", error: " .. resp.message)
end
local objects = resp.results
has_more = resp.pagination.has_more
next_offset = resp.pagination.next_offset
return objects
end
end

return {
lakefs_object_it=lakefs_object_it,
SHORT_DIGEST_LEN=6,
LAKEFS_DEFAULT_PAGE_SIZE=30,
}
24 changes: 24 additions & 0 deletions pkg/actions/lua/lakefs/catalog_export/lib.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
local extractor = require("lakefs/catalog_export/table_extractor")
local common = require("lakefs/catalog_export/common")

-- resolve ref value from action.action.event_type
function ref_from_branch_or_tag()
tag_events = { ["pre-create-tag"] = true, ["post-create-tag"] = true }
branch_events = { ["pre-create-branch"] = true, ["post-create-branch"] = true }
commit_events = { ["post-commit"] = true, ["post-merge"] = true }
local ref
if tag_events[action.event_type] then
return action.tag_id, nil
elseif branch_events[action.event_type] or commit_events[action.event_type] then
return action.branch_id, nil
else
return nil, "unsupported event type: " .. action.event_type
end
end

return {
TableExtractor = extractor.TableExtractor,
ref_from_branch_or_tag=ref_from_branch_or_tag,
lakefs_object_it=common.lakefs_object_it,
lakefs_hive_partition_it=extractor.lakefs_hive_partition_it,
}
37 changes: 37 additions & 0 deletions pkg/actions/lua/lakefs/catalog_export/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package catalog_export

import (
"context"
"embed"
"io/fs"

"github.com/Shopify/go-lua"
)

//go:embed *.lua
var modulePath embed.FS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we generalize this one and have fs for all our lua files. Adding modulePath to the lua loader search path will load our code too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by you 💪


// OpenLuaPackage load lua code as a package in the runtime
func OpenLuaPackage(l *lua.State, ctx context.Context) {
// order here matters each when packages rely on each other
loadLuaAsPackage(l, ctx, "lakefs/catalog_export/common", "common.lua")
loadLuaAsPackage(l, ctx, "lakefs/catalog_export/table_extractor", "table_extractor.lua")
// lib.lua is high level facade for users
loadLuaAsPackage(l, ctx, "lakefs/catalog_export", "lib.lua")
}

func loadLuaAsPackage(l *lua.State, ctx context.Context, importAlias, scriptName string) {
lua.Require(l, importAlias, func(l *lua.State) int {
data, err := fs.ReadFile(modulePath, scriptName)
if err != nil {
lua.Errorf(l, err.Error())
panic("unreachable")
}
if err := lua.DoString(l, string(data)); err != nil {
lua.Errorf(l, err.Error())
panic("unreachable")
}
return 1
}, true)
l.Pop(1)
}
183 changes: 183 additions & 0 deletions pkg/actions/lua/lakefs/catalog_export/table_extractor.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
local pathlib = require("path")
local strings = require("strings")
local yaml = require("encoding/yaml")
local lakefs = require("lakefs")
local json = require("encoding/json")
local common = require("lakefs/catalog_export/common")

-- return partition table from path by based on columns in partition_cols
function get_partition_values(partition_cols, path)
local vals = {}
splitted_path = strings.split(path, pathlib.default_separator())
for _, part in pairs(splitted_path) do
for _, col in ipairs(partition_cols) do
local prefix = col .. "="
if strings.has_prefix(part, prefix) then
vals[col] = part:sub(#prefix + 1)
end
end
end
return vals
end

-- extract partition substr from full path
function extract_partition_prefix_from_path(partition_cols, path)
local partitions = get_partition_values(partition_cols, path)
local partitions_list = {}
-- iterate partition_cols to maintain order
for _, col in pairs(partition_cols) do
table.insert(partitions_list, col .. "=" .. partitions[col])
end
local partition_key = table.concat(partitions_list, pathlib.default_separator())
return partition_key
end

-- Hive format partition iterator each result set is a collection of files under the same partition
function lakefs_hive_partition_it(client, repo_id, commit_id, base_path, page_size, delimiter, partition_cols)
local after = ""
local has_more = true
local prefix = base_path
local target_partition = ""
return function()
if not has_more then
return nil
end
local partition_entries = {}
local iter = common.lakefs_object_it(client, repo_id, commit_id, after, prefix, page_size, delimiter)
for entries in iter do
for _, entry in ipairs(entries) do
is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name)
if not is_hidden and entry.path_type == "object" then
local partition_key = extract_partition_prefix_from_path(partition_cols, entry.path)
-- first time: if not set, assign current object partition as the target_partition key
if target_partition == "" then
target_partition = partition_key
end
-- break if current entry does not belong to the target_partition
if partition_key ~= target_partition then
-- next time start searching AFTER the last used key
after = partition_entries[#partition_entries].path
local partition_result = target_partition
target_partition = partition_key
return partition_result, partition_entries
end
table.insert(partition_entries, {
physical_address = entry.physical_address,
path = entry.path,
size = entry.size_bytes,
checksum = entry.checksum
})
end
end
end
has_more = false
return target_partition, partition_entries
end
end

-- Define the HiveTable class
local HiveTable = {}
HiveTable.__index = HiveTable

-- Factory function to create new instances
function HiveTable.new(repo_id, ref_id, commit_id, path, name, partition_cols, schema)
local self = setmetatable({}, HiveTable)
self.repo_id = repo_id
self.ref_id = ref_id
self.commit_id = commit_id
self._path = path
self._name = name
self.schema = schema
self.partition_cols = partition_cols
self._iter_page_size = common.LAKEFS_DEFAULT_PAGE_SIZE
return self
end

-- Define methods
function HiveTable:name()
return self._name
end

function HiveTable:description()
return string.format('Hive table representation for `lakefs://%s/%s/%s` digest: %s', self.repo_id, self.ref_id,
tostring(self._path), self.commit_id:sub(1, common.SHORT_DIGEST_LEN))
end

function HiveTable:path()
return self._path
end

function HiveTable:min_reader_version()
return 1
end

function HiveTable:schema_string()
return json.marshal(self.schema)
end

function HiveTable:partition_columns()
return self.partition_cols
end

function HiveTable:version()
return 0
end

function HiveTable:partition_iterator()
return lakefs_hive_partition_it(lakefs, self.repo_id, self.commit_id, self._path, self._iter_page_size, "",
self.partition_cols)
end

local TableExtractor = {}
TableExtractor.__index = TableExtractor

function TableExtractor.new(repository_id, ref, commit_id)
local self = setmetatable({}, TableExtractor)
self.tables_registry_base = pathlib.join(pathlib.default_separator(), '_lakefs_tables/')
self.repository_id = repository_id
self.commit_id = commit_id
self.ref = ref
self._iter_page_size = common.LAKEFS_DEFAULT_PAGE_SIZE
return self
end

-- list all YAML files in _lakefs_tables
function TableExtractor:list_table_definitions()
local table_entries = {}
local iter = common.lakefs_object_it(lakefs, self.repository_id, self.commit_id, "", self.tables_registry_base,
self._iter_page_size, "")
for entries in iter do
for _, entry in ipairs(entries) do
is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name)
is_yaml = strings.has_suffix(entry.path, ".yaml")
if not is_hidden and is_yaml and entry.path_type == "object" then
table.insert(table_entries, {
physical_address = entry.physical_address,
path = entry.path
})
end
end
end
return table_entries
end

-- return concrete table instance
function TableExtractor:get_table(logical_path)
code, content = lakefs.get_object(self.repository_id, self.commit_id, logical_path)
if code ~= 200 then
error("could not fetch data file: HTTP " .. tostring(code))
end

descriptor = yaml.unmarshal(content)
-- TODO(isan) decide where to handle different table parsing? (delta table / glue table etc)
if descriptor.type == 'hive' then
return HiveTable.new(self.repository_id, self.ref, self.commit_id, descriptor.path, descriptor.name,
descriptor.partition_columns or {}, descriptor.schema), nil
end
return nil, "NotImplemented: table type: " .. descriptor.type .. " path: " .. logical_path
end

return {
TableExtractor = TableExtractor,
lakefs_hive_partition_it = lakefs_hive_partition_it
}