Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export Hooks: Symlink Exporter #6636

Merged
merged 10 commits into from
Sep 21, 2023
Merged
11 changes: 11 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/internal.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local url = require("net/url")
local DEFAULT_SHORT_DIGEST_LEN=6

local function short_digest(digest, len)
Expand Down Expand Up @@ -41,7 +42,17 @@ local function ref_from_branch_or_tag(action_info)
end
end

local function parse_storage_uri(uri)
local u = url.parse(uri)
return {
protocol = u.scheme,
bucket = u.host,
key = (u.path:sub(0, 1) == "/") and u.path:sub(2) or u.path,
}
end

return {
parse_storage_uri=parse_storage_uri,
short_digest=short_digest,
ref_from_branch_or_tag=ref_from_branch_or_tag,
lakefs_object_pager=lakefs_object_pager,
Expand Down
108 changes: 108 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/symlink_exporter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
local extractor = require("lakefs/catalogexport/table_extractor")
local hive = require("lakefs/catalogexport/hive")
local utils = require("lakefs/catalogexport/internal")
local pathlib = require("path")
local strings = require("strings")
local lakefs = require("lakefs")

--[[
### Default Symlink File(s) structure:

${storageNamespace}
_lakefs/
exported/
${ref}/
${commitId}/
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
${tableName}/
p1=v1/symlink.txt
p1=v2/symlink.txt
p1=v3/symlink.txt
...
]]
local function get_storage_uri_prefix(storage_ns, commit_id, action_info)
local branch_or_tag = utils.ref_from_branch_or_tag(action_info)
local sha = utils.short_digest(commit_id)
return pathlib.join("/", storage_ns, string.format("_lakefs/exported/%s/%s/", branch_or_tag, sha))
end

--[[
@repo_id: repository id
@commit_id: commit id of the current table
@table_src_path: path to table spec (i.e _lakefs_tables/my_table.yaml)
@options:
- skip_trim_obj_base_path(boolean) if true will skip removing the prefix path before the partition path.
]]
local function export_it(repo_id, commit_id, table_src_path, options)
local opts = options or {}
local descriptor = extractor.get_table_descriptor(lakefs, repo_id, commit_id, table_src_path)
if descriptor.type ~= "hive" then
error("table " .. descriptor.type .. " in path " .. table_src_path .. " not supported")
end
if opts.debug then
print(string.format('%s table `lakefs://%s/%s/%s`', descriptor.type, repo_id, utils.short_digest(commit_id),
descriptor.path))
end
local base_path = descriptor.path
local cols = descriptor.partition_columns
local pager = hive.extract_partition_pager(lakefs, repo_id, commit_id, base_path, cols)
return function()
local part_key, entries = pager()
if part_key == nil then
return nil
end
local symlink_data = ""
for _, entry in ipairs(entries) do
symlink_data = symlink_data .. entry.physical_address .. "\n"
end
-- create key suffix for symlink file
local storage_key_suffix = part_key
if #descriptor.partition_columns == 0 then
storage_key_suffix = descriptor.name .. "/" .. "symlink.txt"
else
if not opts.skip_trim_obj_base_path then
storage_key_suffix = strings.replace(part_key, base_path .. "/", "", 1) -- remove base_path prefix from partition path
end
-- append to partition path to suffix
storage_key_suffix = pathlib.join("/", descriptor.name, storage_key_suffix, "symlink.txt")
end
return {
key_suffix = storage_key_suffix,
data = symlink_data
}
end
end

--[[
export a Symlinks that represent a table to S3
@s3_client: configured client
@table_src_path: object path to the table spec (_lakefs_tables/my_table.yaml)
@action_info: the global action object
@options:
- debug(boolean)
- export_base_uri(string): override the prefix in S3 i.e s3://other-bucket/path/
- writer(function(bucket, key, data)): if passed then will not use s3 client, helpful for debug
]]
local function export_s3(s3_client, table_src_path, action_info, options)
local opts = options or {}
local repo_id = action_info.repository_id
local commit_id = action_info.commit_id
local base_prefix = opts.export_base_uri or action_info.storage_namespace
local export_base_uri = get_storage_uri_prefix(base_prefix, commit_id, action_info)
local location = utils.parse_storage_uri(export_base_uri)
local put_object = opts.writer or s3_client.put_object
local it = export_it(repo_id, commit_id, table_src_path, opts)
for symlink in it do
local key = pathlib.join("/", location.key, symlink.key_suffix)
if opts.debug then
print("S3 writing bucket: " .. location.bucket .. " key: " .. key)
end
put_obj(location.bucket, key, symlink.data)
end
return {
location = location
}
end

return {
export_s3 = export_s3
}