Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta Exporter action: Handle vacuumed objects correctly #8409

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ local utils = require("lakefs/catalogexport/internal")
local extractor = require("lakefs/catalogexport/table_extractor")
local strings = require("strings")
local url = require("net/url")

local function isTableNotEmpty(t)
return next(t) ~= nil
end

--[[
delta_log_entry_key_generator returns a closure that returns a Delta Lake version key according to the Delta Lake
protocol: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#delta-log-entries
Expand Down Expand Up @@ -40,7 +45,7 @@ end

delta_client:
- get_table: function(repo, ref, prefix)

path_transformer: function(path) used for transforming path scheme (ex: Azure https to abfss)

]]
Expand Down Expand Up @@ -87,6 +92,7 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli
]]
local table_log = {}
local keyGenerator = delta_log_entry_key_generator()
local unfound_paths = {}
for _, key in ipairs(sortedKeys) do
local content = t[key]
local entry_log = {}
Expand Down Expand Up @@ -128,6 +134,14 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli
elseif entry.remove ~= nil then
entry.remove.path = physical_path
end
elseif code == 404 then
if entry.remove ~= nil then
-- If the object is not found, and the entry is a remove entry, we can assume it was vacuumed
print(string.format("Object with path '%s' of a `remove` entry wasn't found. Assuming vacuum.", unescaped_path))
unfound_paths[unescaped_path] = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this redundant?

Copy link
Contributor Author

@Jonathan-Rosenberg Jonathan-Rosenberg Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it was an add entry (meaning the clause beneath) and a stat object failed, there might still be a remove entry with the same path further ahead which will cause a vacuum to delete it, and in that case we don't want to fail the process, thus we add it to the unfound_paths table.
This line indicates that if a remove occurred and a stat object failed, we should treat any previously added entries as vacuumed objects.
The test on line 155 checks if there were any failed add entry path stat object requests (meaning that they weren't removed by a remove entry), and if that's the case throw an error.

else
unfound_paths[unescaped_path] = true
end
else
error("failed stat_object with code: " .. tostring(code) .. ", and path: " .. unescaped_path)
end
Expand All @@ -138,6 +152,17 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli
table_log[keyGenerator()] = entry_log
end

if isTableNotEmpty(unfound_paths) then
local unfound_paths_str = ""
for p, v in pairs(unfound_paths) do
if v ~= nil then
unfound_paths_str = pathlib.join(" ", unfound_paths_str, p)
print(p)
end
end
error("The following objects were not found: " .. unfound_paths)
end

local table_export_prefix = utils.get_storage_uri_prefix(ns, commit_id, action)
local table_physical_path = pathlib.join("/", table_export_prefix, table_name)
local table_log_physical_path = pathlib.join("/", table_physical_path, "_delta_log")
Expand Down
Loading