Skip to content

Commit

Permalink
Table Extractor Hook and _lakefs_tables format (#6589)
Browse files Browse the repository at this point in the history
  • Loading branch information
Isan-Rivkin authored Sep 19, 2023
1 parent 772649d commit e036ddf
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/actions/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func NewLuaHook(h ActionHook, action *Action, cfg Config, e *http.Server) (Hook,
Args: args,
}, nil
} else if !errors.Is(err, errMissingKey) {
// 'script' was provided but is empty or of the wrong type..
// 'script' was provided but is empty or of the wrong type.
return nil, err
}

Expand Down
78 changes: 78 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/hive.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
local pathlib = require("path")
local utils = require("lakefs/catalogexport/internal")
local strings = require("strings")
local DEFAULT_PAGE_SIZE = 30

-- extract partition prefix from full path
local function extract_partitions_path(partitions, path)
if partitions == nil or #partitions == 0 then
return ""
end
local idx = 1
local is_partition_prefix = strings.has_prefix(path, partitions[1])
for part_idx, partition in ipairs(partitions) do
local col_substr = "/" .. partition .. "="
-- if partition is the path prefix and we are the that first partition remove /
if part_idx == 1 and is_partition_prefix then
col_substr = partition .. "="
end
local i, j = string.find(path, col_substr, idx)
if i == nil then
return nil
end
local separator_idx = string.find(path, "/", j+1)
-- verify / found and there is something in between = ... /
if separator_idx == nil or separator_idx <= (j + 1) then
return nil
end
idx = separator_idx
end
return string.sub(path, 1, idx)
end

-- Hive format partition iterator each result set is a collection of files under the same partition
local function extract_partition_pager(client, repo_id, commit_id, base_path, partition_cols, page_size)
local target_partition = ""
local pager = utils.lakefs_object_pager(client, repo_id, commit_id, "", base_path,"", page_size or DEFAULT_PAGE_SIZE)
local page = pager()
return function()
if page == nil then
return nil
end
local partition_entries = {}
while true do
if #page == 0 then
page = pager()
if page == nil then -- no more records
return target_partition, partition_entries
end
end
local entry = page[1]
if not pathlib.is_hidden(entry.path) then
local partition_key = extract_partitions_path(partition_cols, entry.path)
-- first time: if not set, assign current object partition as the target_partition key
if target_partition == "" then
target_partition = partition_key
end
-- break if current entry does not belong to the target_partition
if partition_key ~= target_partition then
local partition_result = target_partition
target_partition = partition_key
return partition_result, partition_entries
end
table.insert(partition_entries, {
physical_address = entry.physical_address,
path = entry.path,
size = entry.size_bytes,
checksum = entry.checksum
})
-- remove entry only if its part of the current partition
table.remove(page, 1)
end
end
end
end

return {
extract_partition_pager=extract_partition_pager,
}
49 changes: 49 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/internal.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
local DEFAULT_SHORT_DIGEST_LEN=6

local function short_digest(digest, len)
return digest:sub(1, len or DEFAULT_SHORT_DIGEST_LEN)
end

-- paginate lakefs api
local function lakefs_paginiated_api(api_call, after)
local next_offset = after
local has_more = true
return function()
if not has_more then
return nil
end
local code, resp = api_call(next_offset)
if code < 200 or code >= 300 then
error("lakeFS: api return non-2xx" .. tostring(code))
end
has_more = resp.pagination.has_more
next_offset = resp.pagination.next_offset
return resp.results
end
end

-- paginage over lakefs objects
local function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, delimiter, page_size)
return lakefs_paginiated_api(function(next_offset)
return lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size or 30)
end, after)
end

-- resolve ref value from action global, used as part of setting default table name
local function ref_from_branch_or_tag(action_info)
local event = action_info.event_type
if event == "pre-create-tag" or event == "post-create-tag" then
return action_info.tag_id
elseif event == "pre-create-branch" or event == "post-create-branch" or "post-commit" or "post-merge" then
return action_info.branch_id
else
error("unsupported event type: " .. action_info.event_type)
end
end

return {
short_digest=short_digest,
ref_from_branch_or_tag=ref_from_branch_or_tag,
lakefs_object_pager=lakefs_object_pager,
lakefs_paginiated_api=lakefs_paginiated_api,
}
53 changes: 53 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/table_extractor.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
local pathlib = require("path")
local strings = require("strings")
local yaml = require("encoding/yaml")
local utils = require("lakefs/catalogexport/internal")

local LAKEFS_TABLES_BASE = "_lakefs_tables/"

-- check if lakefs entry is a table spec under _lakefs_tables/
local function is_table_obj(entry, tables_base)
if entry.path_type ~= "object" then
return false
end
local path = entry.path
if strings.has_prefix(path, tables_base) then
-- remove _lakefs_tables/ from path
path = entry.path:sub(#tables_base, #path)
end
return not pathlib.is_hidden(path) and strings.has_suffix(path, ".yaml")
end

-- list all YAML files under _lakefs_tables/*
local function list_table_descriptor_entries(client, repo_id, commit_id)
local table_entries = {}
local page_size = 30
local pager = utils.lakefs_object_pager(client, repo_id, commit_id, "", LAKEFS_TABLES_BASE,"", page_size)
for entries in pager do
for _, entry in ipairs(entries) do
if is_table_obj(entry, LAKEFS_TABLES_BASE) then
table.insert(table_entries, {
physical_address = entry.physical_address,
path = entry.path
})
end
end
end
return table_entries
end

-- table as parsed YAML object
local function get_table_descriptor(client, repo_id, commit_id, logical_path)
local code, content = client.get_object(repo_id, commit_id, logical_path)
if code ~= 200 then
error("could not fetch data file: HTTP " .. tostring(code))
end
local descriptor = yaml.unmarshal(content)
descriptor.partition_columns = descriptor.partition_columns or {}
return descriptor
end

return {
list_table_descriptor_entries = list_table_descriptor_entries,
get_table_descriptor = get_table_descriptor,
}
117 changes: 106 additions & 11 deletions pkg/actions/lua/load.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
package lua

import (
"bytes"
"embed"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"

"github.com/Shopify/go-lua"
"github.com/hashicorp/go-multierror"
)

const (
pathListSeparator = ';'
defaultPath = "./?.lua"
defaultPath = "?.lua"
)

//go:embed lakefs/catalogexport/*.lua
var luaEmbeddedCode embed.FS

var ErrNoFile = errors.New("no file")

func findLoader(l *lua.State, name string) {
var msg string
if l.Field(lua.UpValueIndex(1), "searchers"); !l.IsTable(3) {
Expand All @@ -35,6 +45,61 @@ func findLoader(l *lua.State, name string) {
}
}

func findFile(l *lua.State, name, field, dirSep string) (string, error) {
l.Field(lua.UpValueIndex(1), field)
path, ok := l.ToString(-1)
if !ok {
lua.Errorf(l, "'package.%s' must be a string", field)
}
return searchPath(name, path, ".", dirSep)
}

func checkLoad(l *lua.State, loaded bool, fileName string) int {
if loaded { // Module loaded successfully?
l.PushString(fileName) // Second argument to module.
return 2 // Return open function & file name.
}
m := lua.CheckString(l, 1)
e := lua.CheckString(l, -1)
lua.Errorf(l, "error loading module '%s' from file '%s':\n\t%s", m, fileName, e)
panic("unreachable")
}

func searcherLua(l *lua.State) int {
name := lua.CheckString(l, 1)
filename, err := findFile(l, name, "path", string(filepath.Separator))
if err != nil {
return 1 // Module isn't found in this path.
}

return checkLoad(l, loadFile(l, filename, "") == nil, filename)
}

func loadFile(l *lua.State, fileName, mode string) error {
fileNameIndex := l.Top() + 1
fileError := func(what string) error {
fileName, _ := l.ToString(fileNameIndex)
l.PushFString("cannot %s %s", what, fileName[1:])
l.Remove(fileNameIndex)
return lua.FileError
}
l.PushString("@" + fileName)
data, err := luaEmbeddedCode.ReadFile(fileName)
if err != nil {
return fileError("open")
}
s, _ := l.ToString(-1)
err = l.Load(bytes.NewReader(data), s, mode)
switch {
case err == nil, errors.Is(err, lua.SyntaxError), errors.Is(err, lua.MemoryError): // do nothing
default:
l.SetTop(fileNameIndex)
return fileError("read")
}
l.Remove(fileNameIndex)
return err
}

func searcherPreload(l *lua.State) int {
name := lua.CheckString(l, 1)
l.Field(lua.RegistryIndex, "_PRELOAD")
Expand All @@ -46,7 +111,7 @@ func searcherPreload(l *lua.State) int {
}

func createSearchersTable(l *lua.State) {
searchers := []lua.Function{searcherPreload}
searchers := []lua.Function{searcherPreload, searcherLua}
l.CreateTable(len(searchers), 0)
for i, s := range searchers {
l.PushValue(-2)
Expand All @@ -55,6 +120,33 @@ func createSearchersTable(l *lua.State) {
}
}

func searchPath(name, path, sep, dirSep string) (string, error) {
var err error
if sep != "" {
name = strings.ReplaceAll(name, sep, dirSep) // Replace sep by dirSep.
}
path = strings.ReplaceAll(path, string(pathListSeparator), string(filepath.ListSeparator))
for _, template := range filepath.SplitList(path) {
if template == "" {
continue
}
filename := strings.ReplaceAll(template, "?", name)
if readable(filename) {
return filename, nil
}
err = multierror.Append(err, fmt.Errorf("%w %s", ErrNoFile, filename))
}
return "", err
}

func readable(name string) bool {
if !fs.ValidPath(name) {
return false
}
info, err := fs.Stat(luaEmbeddedCode, name)
return err == nil && !info.IsDir()
}

func noEnv(l *lua.State) bool {
l.Field(lua.RegistryIndex, "LUA_NOENV")
b := l.ToBoolean(-1)
Expand Down Expand Up @@ -84,15 +176,18 @@ var packageLibrary = []lua.RegistryFunction{
return 3 // Return nil, error message, and where.
}},
{Name: "searchpath", Function: func(l *lua.State) int {
_ = lua.CheckString(l, 1)
_ = lua.CheckString(l, 2)
_ = lua.OptString(l, 3, ".")
_ = lua.OptString(l, 4, string(filepath.Separator))

l.PushNil()
l.PushString("searchpath not enabled; check your Lua installation")
l.PushString("absent")
return 3 // Return nil, error message, and where.
name := lua.CheckString(l, 1)
path := lua.CheckString(l, 2)
sep := lua.OptString(l, 3, ".")
dirSep := lua.OptString(l, 4, string(filepath.Separator))
f, err := searchPath(name, path, sep, dirSep)
if err != nil {
l.PushNil()
l.PushString(err.Error())
return 2
}
l.PushString(f)
return 1
}},
}

Expand Down

0 comments on commit e036ddf

Please sign in to comment.