Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export Hooks: Symlink Exporter #6636

Merged
merged 10 commits into from
Sep 21, 2023
93 changes: 93 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/symlink_exporter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
local extractor = require("lakefs/catalogexport/table_extractor")
local hive = require("lakefs/catalogexport/hive")
local utils = require("lakefs/catalogexport/internal")
local url = require("net/url")
local pathlib = require("path")
local strings = require("strings")

-- make it a function so that @nopcoder can ask why we bloat the code?
local function parse_storage_uri(uri)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are bloating the code - please don't get table and assign each field to local just to return them. if you are not doing anything with a field just use the member, I don't need to meet another local and remember what it does.

also, you can see that the writter of url.parse already return a struct/table, and the beauty of the members describe each field - the current function split them into local variables in which we loose this marbel.

from the go code path can't be nil and we need to match it with the separator before we drop it.

suggest the following code + internal.lua

local function parse_storage_uri(uri)
    local u = url.parse(uri)
    return { 
        protocol = u.scheme,
        bucket = u.host,
        key = (u.path:sub(0, 1) == "/") and u.path:sub(2) or u.path,
    }
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

local parsed_uri = url.parse(uri)
local storage = parsed_uri.scheme
local bucket = parsed_uri.host
local key = parsed_uri.path
if key ~= nil then
key = key:sub(2) -- remove the leading slash
end
return storage, bucket, key
end

--[[
### Default Symlink File(s) structure:

${storageNamespace}
_lakefs/
exported/
${ref}/
${commitId}/
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
${tableName}/
p1=v1/symlink.txt
p1=v2/symlink.txt
p1=v3/symlink.txt
...
]]
local function gen_storage_uri_prefix(storage_ns, commit_id, action_info)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Why this function exposed? we don't call it here and prefer not to give it to our users it looks internal as it writes under _lakefs. looks like something our code need to format just before writting the data
  2. If we plan the user too call it and pass it to one of writers, I suggest accept the action_info in the writer and extract the data+format the storage ns and everything you need to know the default layout under the repository to write to.
  3. If it just string manipulation and not generate anything I suggest format or fmt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 👍
  2. 👍
  3. 👍

local branch_or_tag = utils.ref_from_branch_or_tag(action_info)
return pathlib.join("/", storage_ns, "_lakefs/exported", branch_or_tag, utils.short_digest(commit_id), "/")
end

local function new_std_table_writer(export_base_uri)
local storage, bucket, key_prefix = parse_storage_uri(export_base_uri)
return function(suffix_key, data)
local key = pathlib.join("/", key_prefix, suffix_key)
print("[STD Writer Put Object] Bucket " .. bucket .. " key: " .. key .. " content: \n" .. data)
end
end

local function new_s3_table_writer(client, export_base_uri)
local storage, bucket, key_prefix = parse_storage_uri(export_base_uri)
return function(suffix_key, data)
local key = pathlib.join("/", key_prefix, suffix_key)
client.put_object(bucket, key, data)
print("S3 Put: bucket: " .. bucket .. " key: " .. key)
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume these are two types of callbacks that we can pass as blockstore_write.

  1. If this is the case you need to have the same signature.
  2. As the package developer I'm sure it will easy for you to undestand how to use. For for the user, I think it will be easy to just call export_s3. (see my comment on export)
  3. Suggest to call them callbacks if that what they are.

Copy link
Contributor Author

@Isan-Rivkin Isan-Rivkin Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume these are two types of callbacks that we can pass as blockstore_write

Nope, they return the callbacks that has the same signature function(suffix_key, data) (Look at the usage script I added in the PR description)

writer = exporter.new_s3_table_writer(s3,  action, args.override_export_uri)
exporter.export(lakefs, repo_id, commit_id, table_path, writer, trim_obj_base_path)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no more writer callback


local function export(lakefs_client, repo_id, commit_id, table_src_path, blockstore_write, trim_obj_base_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The export accepts a lot of arguments and one of them is a specific callback that do two things:

  1. format the target path (which is the same code for both callback)
  2. write the information (s3 or stdout)
    Suggest to turn export to iterator so you can implement export_s3 as simple loop which grab the path information and write it as the callback (just without a callback) to the s3 bucket (same for every target)
  3. I don't think that export should handle argument like trim_object_base_path - the caller doesn't know what to pass. Another reason not to expose this function and let the user call export_s3 which knows how to handle the path.
    For conditional format of the data in the export output - suggest passing a table argument with optional members and one of them can be this flag, or better, as iterator it can return a table of the parts and the caller s3 or std can format as they should the information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed it as we discussed!

local descriptor = extractor.get_table_descriptor(lakefs_client, repo_id, commit_id, table_src_path)
if descriptor.type == "hive" then
local base_path = descriptor.path
local pager = hive.extract_partition_pager(lakefs_client, repo_id, commit_id, base_path,
descriptor.partition_columns)
local sha = utils.short_digest(commit_id)
print(string.format('%s table `lakefs://%s/%s/%s`', descriptor.type, repo_id, sha, descriptor.path))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug print?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it's log that I believe is helpful feedback in the Action logs. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added debug flag

for part_key, entries in pager do
local symlink_data = ""
for _, entry in ipairs(entries) do
symlink_data = symlink_data .. entry.physical_address .. "\n"
end
-- create key suffix for symlink file
local storage_key_suffix = part_key
if #descriptor.partition_columns == 0 then
storage_key_suffix = pathlib.join("/", descriptor.name, "symlink.txt")
else
if trim_obj_base_path then
storage_key_suffix = strings.replace(part_key, base_path .. "/", "", 1) -- remove base_path prefix from partition path
end
-- append to partition path to suffix
storage_key_suffix = pathlib.join("/", descriptor.name, storage_key_suffix, "symlink.txt")
end
-- write symlink file to blockstore
blockstore_write(storage_key_suffix, symlink_data)
end
else
error("table " .. descriptor.type .. " in path " .. table_src_path .. " not supported")
end
end

return {
export = export,
new_s3_table_writer = new_s3_table_writer,
new_std_table_writer = new_std_table_writer,
gen_storage_uri_prefix = gen_storage_uri_prefix
}