Skip to content

Commit

Permalink
handle 'add' entries accordingly
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan-Rosenberg committed Dec 9, 2024
1 parent 8f16dbd commit a1a0729
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ local utils = require("lakefs/catalogexport/internal")
local extractor = require("lakefs/catalogexport/table_extractor")
local strings = require("strings")
local url = require("net/url")

local function isTableNotEmpty(t)
return next(t) ~= nil
end

--[[
delta_log_entry_key_generator returns a closure that returns a Delta Lake version key according to the Delta Lake
protocol: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#delta-log-entries
Expand Down Expand Up @@ -87,6 +92,7 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli
]]
local table_log = {}
local keyGenerator = delta_log_entry_key_generator()
local unfound_paths = {}
for _, key in ipairs(sortedKeys) do
local content = t[key]
local entry_log = {}
Expand Down Expand Up @@ -128,9 +134,14 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli
elseif entry.remove ~= nil then
entry.remove.path = physical_path
end
elseif code == 404 and entry.remove ~= nil then
-- If the object is not found, and the entry is a remove entry, we can assume it was vacuumed
print(string.format("REMOVE entry object `%s` wasn't found. Assuming vacuum.", unescaped_path))
elseif code == 404 then
if entry.remove ~= nil then
-- If the object is not found, and the entry is a remove entry, we can assume it was vacuumed
print(string.format("Object with path '%s' of a `remove` entry wasn't found. Assuming vacuum.", unescaped_path))
unfound_paths[unescaped_path] = nil
else
unfound_paths[unescaped_path] = true
end
else
error("failed stat_object with code: " .. tostring(code) .. ", and path: " .. unescaped_path)
end
Expand All @@ -141,6 +152,17 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli
table_log[keyGenerator()] = entry_log
end

if isTableNotEmpty(unfound_paths) then
local unfound_paths_str = ""
for p, v in pairs(unfound_paths) do
if v ~= nil then
unfound_paths_str = pathlib.join(" ", unfound_paths_str, p)
print(p)
end
end
error("The following objects were not found: " .. unfound_paths)
end

local table_export_prefix = utils.get_storage_uri_prefix(ns, commit_id, action)
local table_physical_path = pathlib.join("/", table_export_prefix, table_name)
local table_log_physical_path = pathlib.join("/", table_physical_path, "_delta_log")
Expand Down

0 comments on commit a1a0729

Please sign in to comment.