Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Isan-Rivkin committed Sep 21, 2023
1 parent f61b402 commit 4931400
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions pkg/actions/lua/lakefs/catalogexport/symlink_exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local hive = require("lakefs/catalogexport/hive")
local utils = require("lakefs/catalogexport/internal")
local pathlib = require("path")
local strings = require("strings")
local lakefs = require("lakefs")

--[[
### Default Symlink File(s) structure:
Expand All @@ -21,16 +22,19 @@ local strings = require("strings")
local function get_storage_uri_prefix(storage_ns, commit_id, action_info)
local branch_or_tag = utils.ref_from_branch_or_tag(action_info)
local sha = utils.short_digest(commit_id)
return pathlib.join("/", storage_ns, string.format("_lakefs/exported/%s/%s/",branch_or_tag, sha))
return pathlib.join("/", storage_ns, string.format("_lakefs/exported/%s/%s/", branch_or_tag, sha))
end

--[[
options:
@repo_id: repository id
@commit_id: commit id of the current table
@table_src_path: path to table spec (i.e _lakefs_tables/my_table.yaml)
@options:
- skip_trim_obj_base_path(boolean) if true will skip removing the prefix path before the partition path.
]]
local function export_it(lakefs_client, repo_id, commit_id, table_src_path, options)
local function export_it(repo_id, commit_id, table_src_path, options)
local opts = options or {}
local descriptor = extractor.get_table_descriptor(lakefs_client, repo_id, commit_id, table_src_path)
local descriptor = extractor.get_table_descriptor(lakefs, repo_id, commit_id, table_src_path)
if descriptor.type ~= "hive" then
error("table " .. descriptor.type .. " in path " .. table_src_path .. " not supported")
end
Expand All @@ -40,7 +44,7 @@ local function export_it(lakefs_client, repo_id, commit_id, table_src_path, opti
end
local base_path = descriptor.path
local cols = descriptor.partition_columns
local pager = hive.extract_partition_pager(lakefs_client, repo_id, commit_id, base_path, cols)
local pager = hive.extract_partition_pager(lakefs, repo_id, commit_id, base_path, cols)
return function()
local part_key, entries = pager()
if part_key == nil then
Expand Down Expand Up @@ -70,32 +74,32 @@ end

--[[
export a Symlinks that represent a table to S3
options:
@s3_client: configured client
@table_src_path: object path to the table spec (_lakefs_tables/my_table.yaml)
@action_info: the global action object
@options:
- debug(boolean)
- override_export_base_uri(string): override the prefix in S3 i.e s3://other-bucket/path/
- export_base_uri(string): override the prefix in S3 i.e s3://other-bucket/path/
- writer(function(bucket, key, data)): if passed then will not use s3 client, helpful for debug
]]
local function export_s3(lakefs_client, s3_client, table_src_path, action_info, options)
local function export_s3(s3_client, table_src_path, action_info, options)
local opts = options or {}
local repo_id = action_info.repository_id
local commit_id = action_info.commit_id
local base_prefix = opts.override_export_base_uri or action_info.storage_namespace
local base_prefix = opts.export_base_uri or action_info.storage_namespace
local export_base_uri = get_storage_uri_prefix(base_prefix, commit_id, action_info)
local storage_uri = utils.parse_storage_uri(export_base_uri)
local it = export_it(lakefs_client, repo_id, commit_id, table_src_path, opts)
local location = utils.parse_storage_uri(export_base_uri)
local put_obj = opts.writer or s3_client.put_object
local it = export_it(repo_id, commit_id, table_src_path, opts)
for symlink in it do
local key = pathlib.join("/", storage_uri.key, symlink.key_suffix)
if opts.writer ~= nil then
opts.writer(storage_uri.bucket, key, symlink.data)
else
s3_client.put_object(storage_uri.bucket, key, symlink.data)
if opts.debug then
print("S3 success write bucket: " .. storage_uri.bucket .. " path: " .. key)
end
local key = pathlib.join("/", location.key, symlink.key_suffix)
if opts.debug then
print("S3 writting bucket: " .. location.bucket .. " path: " .. key)
end
put_obj(location.bucket, key, symlink.data)
end
return {
location = storage_uri
location = location
}
end

Expand Down

0 comments on commit 4931400

Please sign in to comment.