Skip to content

Commit

Permalink
Export Hooks: Symlink Exporter (#6636)
Browse files Browse the repository at this point in the history
  • Loading branch information
Isan-Rivkin authored Sep 21, 2023
1 parent 0506441 commit b2a4e50
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/internal.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local url = require("net/url")
local DEFAULT_SHORT_DIGEST_LEN=6

local function short_digest(digest, len)
Expand Down Expand Up @@ -41,7 +42,17 @@ local function ref_from_branch_or_tag(action_info)
end
end

local function parse_storage_uri(uri)
local u = url.parse(uri)
return {
protocol = u.scheme,
bucket = u.host,
key = (u.path:sub(0, 1) == "/") and u.path:sub(2) or u.path,
}
end

return {
parse_storage_uri=parse_storage_uri,
short_digest=short_digest,
ref_from_branch_or_tag=ref_from_branch_or_tag,
lakefs_object_pager=lakefs_object_pager,
Expand Down
108 changes: 108 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/symlink_exporter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
local extractor = require("lakefs/catalogexport/table_extractor")
local hive = require("lakefs/catalogexport/hive")
local utils = require("lakefs/catalogexport/internal")
local pathlib = require("path")
local strings = require("strings")
local lakefs = require("lakefs")

--[[
### Default Symlink File(s) structure:
${storageNamespace}
_lakefs/
exported/
${ref}/
${commitId}/
${tableName}/
p1=v1/symlink.txt
p1=v2/symlink.txt
p1=v3/symlink.txt
...
]]
local function get_storage_uri_prefix(storage_ns, commit_id, action_info)
local branch_or_tag = utils.ref_from_branch_or_tag(action_info)
local sha = utils.short_digest(commit_id)
return pathlib.join("/", storage_ns, string.format("_lakefs/exported/%s/%s/", branch_or_tag, sha))
end

--[[
@repo_id: repository id
@commit_id: commit id of the current table
@table_src_path: path to table spec (i.e _lakefs_tables/my_table.yaml)
@options:
- skip_trim_obj_base_path(boolean) if true will skip removing the prefix path before the partition path.
]]
local function export_it(repo_id, commit_id, table_src_path, options)
local opts = options or {}
local descriptor = extractor.get_table_descriptor(lakefs, repo_id, commit_id, table_src_path)
if descriptor.type ~= "hive" then
error("table " .. descriptor.type .. " in path " .. table_src_path .. " not supported")
end
if opts.debug then
print(string.format('%s table `lakefs://%s/%s/%s`', descriptor.type, repo_id, utils.short_digest(commit_id),
descriptor.path))
end
local base_path = descriptor.path
local cols = descriptor.partition_columns
local pager = hive.extract_partition_pager(lakefs, repo_id, commit_id, base_path, cols)
return function()
local part_key, entries = pager()
if part_key == nil then
return nil
end
local symlink_data = ""
for _, entry in ipairs(entries) do
symlink_data = symlink_data .. entry.physical_address .. "\n"
end
-- create key suffix for symlink file
local storage_key_suffix = part_key
if #descriptor.partition_columns == 0 then
storage_key_suffix = descriptor.name .. "/" .. "symlink.txt"
else
if not opts.skip_trim_obj_base_path then
storage_key_suffix = strings.replace(part_key, base_path .. "/", "", 1) -- remove base_path prefix from partition path
end
-- append to partition path to suffix
storage_key_suffix = pathlib.join("/", descriptor.name, storage_key_suffix, "symlink.txt")
end
return {
key_suffix = storage_key_suffix,
data = symlink_data
}
end
end

--[[
export a Symlinks that represent a table to S3
@s3_client: configured client
@table_src_path: object path to the table spec (_lakefs_tables/my_table.yaml)
@action_info: the global action object
@options:
- debug(boolean)
- export_base_uri(string): override the prefix in S3 i.e s3://other-bucket/path/
- writer(function(bucket, key, data)): if passed then will not use s3 client, helpful for debug
]]
local function export_s3(s3_client, table_src_path, action_info, options)
local opts = options or {}
local repo_id = action_info.repository_id
local commit_id = action_info.commit_id
local base_prefix = opts.export_base_uri or action_info.storage_namespace
local export_base_uri = get_storage_uri_prefix(base_prefix, commit_id, action_info)
local location = utils.parse_storage_uri(export_base_uri)
local put_object = opts.writer or s3_client.put_object
local it = export_it(repo_id, commit_id, table_src_path, opts)
for symlink in it do
local key = pathlib.join("/", location.key, symlink.key_suffix)
if opts.debug then
print("S3 writing bucket: " .. location.bucket .. " key: " .. key)
end
put_obj(location.bucket, key, symlink.data)
end
return {
location = location
}
end

return {
export_s3 = export_s3
}

0 comments on commit b2a4e50

Please sign in to comment.