Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export Hooks: Symlink Exporter #6636

Merged
merged 10 commits into from
Sep 21, 2023
Merged
11 changes: 11 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/internal.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local url = require("net/url")
local DEFAULT_SHORT_DIGEST_LEN=6

local function short_digest(digest, len)
Expand Down Expand Up @@ -41,7 +42,17 @@ local function ref_from_branch_or_tag(action_info)
end
end

local function parse_storage_uri(uri)
local u = url.parse(uri)
return {
protocol = u.scheme,
bucket = u.host,
key = (u.path:sub(0, 1) == "/") and u.path:sub(2) or u.path,
}
end

return {
parse_storage_uri=parse_storage_uri,
short_digest=short_digest,
ref_from_branch_or_tag=ref_from_branch_or_tag,
lakefs_object_pager=lakefs_object_pager,
Expand Down
104 changes: 104 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/symlink_exporter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
local extractor = require("lakefs/catalogexport/table_extractor")
local hive = require("lakefs/catalogexport/hive")
local utils = require("lakefs/catalogexport/internal")
local pathlib = require("path")
local strings = require("strings")

--[[
### Default Symlink File(s) structure:

${storageNamespace}
_lakefs/
exported/
${ref}/
${commitId}/
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
${tableName}/
p1=v1/symlink.txt
p1=v2/symlink.txt
p1=v3/symlink.txt
...
]]
local function get_storage_uri_prefix(storage_ns, commit_id, action_info)
local branch_or_tag = utils.ref_from_branch_or_tag(action_info)
local sha = utils.short_digest(commit_id)
return pathlib.join("/", storage_ns, string.format("_lakefs/exported/%s/%s/",branch_or_tag, sha))
end

--[[
options:
- skip_trim_obj_base_path(boolean) if true will skip removing the prefix path before the partition path.
]]
local function export_it(lakefs_client, repo_id, commit_id, table_src_path, options)
local opts = options or {}
local descriptor = extractor.get_table_descriptor(lakefs_client, repo_id, commit_id, table_src_path)
if descriptor.type ~= "hive" then
error("table " .. descriptor.type .. " in path " .. table_src_path .. " not supported")
end
if opts.debug then
print(string.format('%s table `lakefs://%s/%s/%s`', descriptor.type, repo_id, utils.short_digest(commit_id),
descriptor.path))
end
local base_path = descriptor.path
local cols = descriptor.partition_columns
local pager = hive.extract_partition_pager(lakefs_client, repo_id, commit_id, base_path, cols)
return function()
local part_key, entries = pager()
if part_key == nil then
return nil
end
local symlink_data = ""
for _, entry in ipairs(entries) do
symlink_data = symlink_data .. entry.physical_address .. "\n"
end
-- create key suffix for symlink file
local storage_key_suffix = part_key
if #descriptor.partition_columns == 0 then
storage_key_suffix = descriptor.name .. "/" .. "symlink.txt"
else
if not opts.skip_trim_obj_base_path then
storage_key_suffix = strings.replace(part_key, base_path .. "/", "", 1) -- remove base_path prefix from partition path
end
-- append to partition path to suffix
storage_key_suffix = pathlib.join("/", descriptor.name, storage_key_suffix, "symlink.txt")
end
return {
key_suffix = storage_key_suffix,
data = symlink_data
}
end
end

--[[
export a Symlinks that represent a table to S3
options:
- debug(boolean)
- override_export_base_uri(string): override the prefix in S3 i.e s3://other-bucket/path/
- writer(function(bucket, key, data)): if passed then will not use s3 client, helpful for debug
]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Document the reset of the function arguments
  2. We don't need to pass lakefs_client - it is not a real client it is a package

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 👍
  2. 👍

local function export_s3(lakefs_client, s3_client, table_src_path, action_info, options)
local opts = options or {}
local repo_id = action_info.repository_id
local commit_id = action_info.commit_id
local base_prefix = opts.override_export_base_uri or action_info.storage_namespace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we pass a value it means we override it

Suggested change
local base_prefix = opts.override_export_base_uri or action_info.storage_namespace
local base_prefix = opts.export_base_uri or action_info.storage_namespace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

local export_base_uri = get_storage_uri_prefix(base_prefix, commit_id, action_info)
local storage_uri = utils.parse_storage_uri(export_base_uri)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets call it location

local it = export_it(lakefs_client, repo_id, commit_id, table_src_path, opts)
for symlink in it do
local key = pathlib.join("/", storage_uri.key, symlink.key_suffix)
if opts.writer ~= nil then
opts.writer(storage_uri.bucket, key, symlink.data)
else
s3_client.put_object(storage_uri.bucket, key, symlink.data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set the writer you like to work with outside the loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if opts.debug then
print("S3 success write bucket: " .. storage_uri.bucket .. " path: " .. key)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to print before the operation as in case of a failure we will have the last log pointing to the data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea 👍

end
end
return {
location = storage_uri
}
end

return {
export_s3 = export_s3
}