Skip to content

Commit

Permalink
Refactor compare_and_extract_chunks and improve test coverage
Browse files Browse the repository at this point in the history
To do:
- Remove additional comments after approval.

Thank you for helping me with my first file system operations contribution!
  • Loading branch information
alighazi288 committed Jan 15, 2025
1 parent e81ef60 commit 6358ce6
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 91 deletions.
96 changes: 70 additions & 26 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,38 +719,82 @@ def extract_helper(self, item, path, hlm, *, dry_run=False):
# In this case, we *want* to extract twice, because there is no other way.
pass

def compare_and_extract_chunks(self, item, fs_path):
def compare_and_extract_chunks(self, item, fs_path, st=None, *, pi=None, sparse=False):
"""Compare file chunks and patch if needed. Returns True if patching succeeded."""
try:
st = os.stat(fs_path, follow_symlinks=False)
if not stat.S_ISREG(st.st_mode):
return False

with open(fs_path, "rb+") as fs_file:
chunk_offset = 0
for chunk_entry in item.chunks:
chunkid_A = chunk_entry.id
size = chunk_entry.size
if st is None or not stat.S_ISREG(st.st_mode):
return False

fs_file.seek(chunk_offset)
data_F = fs_file.read(size)
try:
# First pass: Build fs chunks list
fs_chunks = []
offset = 0
with backup_io("open"):
fs_file = open(fs_path, "rb")
with fs_file:
for chunk in item.chunks:
with backup_io("seek"):
fs_file.seek(offset)
with backup_io("read"):
data = fs_file.read(chunk.size)
if len(data) != chunk.size:
fs_chunks.append(None)
else:
fs_chunks.append(ChunkListEntry(id=self.key.id_hash(data), size=chunk.size))
offset += chunk.size

needs_update = True
if len(data_F) == size:
chunkid_F = self.key.id_hash(data_F)
needs_update = chunkid_A != chunkid_F
# Compare chunks and collect needed chunk IDs
needed_chunks = []
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
if fs_chunk is None or fs_chunk.id != item_chunk.id:
needed_chunks.append(item_chunk)

if needs_update:
chunk_data = b"".join(self.pipeline.fetch_many([chunkid_A], ro_type=ROBJ_FILE_STREAM))
fs_file.seek(chunk_offset)
fs_file.write(chunk_data)
if not needed_chunks:
return True

chunk_offset += size
# Fetch all needed chunks and iterate through ALL of them
chunk_data_iter = self.pipeline.fetch_many(
[chunk.id for chunk in needed_chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM
)

fs_file.truncate(item.size)
return True
# Second pass: Update file and consume EVERY chunk from the iterator
offset = 0
item_chunk_size = 0
with backup_io("open"):
fs_file = open(fs_path, "rb+")
with fs_file:
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
with backup_io("seek"):
fs_file.seek(offset)
if fs_chunk is not None and fs_chunk.id == item_chunk.id:
offset += item_chunk.size
item_chunk_size += item_chunk.size
else:
chunk_data = next(chunk_data_iter)
if pi:
pi.show(increase=len(chunk_data), info=[remove_surrogates(item.path)])

Check warning on line 774 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L774

Added line #L774 was not covered by tests
with backup_io("write"):
if sparse and not chunk_data.strip(b"\0"):
fs_file.seek(len(chunk_data), 1) # Seek over sparse section
offset += len(chunk_data)

Check warning on line 778 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L777-L778

Added lines #L777 - L778 were not covered by tests
else:
fs_file.write(chunk_data)
offset += len(chunk_data)
item_chunk_size += len(chunk_data)
with backup_io("truncate_and_attrs"):
fs_file.truncate(item.size)
fs_file.flush()
self.restore_attrs(fs_path, item, fd=fs_file.fileno())

# Size verification like extract_item
if "size" in item and item.size != item_chunk_size:
raise BackupError(f"Size inconsistency detected: size {item.size}, chunks size {item_chunk_size}")

Check warning on line 790 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L790

Added line #L790 was not covered by tests

# Damaged chunks check like extract_item
if "chunks_healthy" in item and not item.chunks_healthy:
raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.")

Check warning on line 794 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L794

Added line #L794 was not covered by tests

except (OSError, Exception):
return True
except OSError:
return False

Check warning on line 798 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L797-L798

Added lines #L797 - L798 were not covered by tests

def extract_item(
Expand Down Expand Up @@ -855,7 +899,7 @@ def make_parent(path):
with self.extract_helper(item, path, hlm) as hardlink_set:
if hardlink_set:
return
if self.compare_and_extract_chunks(item, path):
if self.compare_and_extract_chunks(item, path, pi=pi, sparse=sparse):
return

Check warning on line 903 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L903

Added line #L903 was not covered by tests

with backup_io("open"):
Expand Down
120 changes: 55 additions & 65 deletions src/borg/testsuite/archive_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,27 +426,31 @@ def __init__(self):
extractor.pipeline = cache
extractor.key = key
extractor.cwd = str(tmpdir)
extractor.restore_attrs = Mock()

# Track fetched chunks across tests
fetched_chunks = []

def create_mock_chunks(test_data, chunk_size=512):
"""Helper function to create mock chunks from test data"""
def create_mock_chunks(item_data, chunk_size=4):
"""Helper function to create mock chunks from archive data"""
chunks = []
for i in range(0, len(test_data), chunk_size):
chunk_data = test_data[i : i + chunk_size]
for i in range(0, len(item_data), chunk_size):
chunk_data = item_data[i : i + chunk_size]
chunk_id = key.id_hash(chunk_data)
chunks.append(Mock(id=chunk_id, size=len(chunk_data)))
cache.objects[chunk_id] = chunk_data

item = Mock(chunks=chunks, size=len(test_data))
target_path = str(tmpdir.join("test.txt"))
return item, target_path
item = Mock(spec=["chunks", "size", "__contains__", "get"])
item.chunks = chunks # Use actual list for chunks
item.size = len(item_data)
item.__contains__ = lambda self, item: item == "size"

def mock_fetch_many(chunk_ids, ro_type):
return item, str(tmpdir.join("test.txt"))

def mock_fetch_many(chunk_ids, is_preloaded=True, ro_type=None):
"""Helper function to track and mock chunk fetching"""
fetched_chunks.extend(chunk_ids)
return [cache.objects[chunk_id] for chunk_id in chunk_ids]
return iter([cache.objects[chunk_id] for chunk_id in chunk_ids])

def clear_fetched_chunks():
"""Helper function to clear tracked chunks between tests"""
Expand All @@ -462,99 +466,85 @@ def get_fetched_chunks():


@pytest.mark.parametrize(
"name, test_data, initial_data, expected_fetched_chunks, expected_success",
"name, item_data, fs_data, expected_fetched_chunks",
[
(
"no_changes",
b"A" * 512, # One complete chunk, no changes needed
b"A" * 512, # Identical content
b"1111", # One complete chunk, no changes needed
b"1111", # Identical content
0, # No chunks should be fetched
True,
),
(
"single_chunk_change",
b"A" * 512 + b"B" * 512, # Two chunks
b"A" * 512 + b"X" * 512, # Second chunk different
b"11112222", # Two chunks
b"1111XXXX", # Second chunk different
1, # Only second chunk should be fetched
True,
),
(
"cross_boundary_change",
b"A" * 512 + b"B" * 512, # Two chunks
b"A" * 500 + b"X" * 24, # Change crosses chunk boundary
b"11112222", # Two chunks
b"111XX22", # Change crosses chunk boundary
2, # Both chunks need update
True,
),
(
"exact_multiple_chunks",
b"A" * 512 + b"B" * 512 + b"C" * 512, # Three complete chunks
b"A" * 512 + b"X" * 512 + b"C" * 512, # Middle chunk different
b"11112222333", # Three chunks (last one partial)
b"1111XXXX333", # Middle chunk different
1, # Only middle chunk fetched
True,
),
(
"first_chunk_change",
b"A" * 512 + b"B" * 512, # Two chunks
b"X" * 512 + b"B" * 512, # First chunk different
b"11112222", # Two chunks
b"XXXX2222", # First chunk different
1, # Only first chunk should be fetched
True,
),
(
"all_chunks_different",
b"A" * 512 + b"B" * 512, # Two chunks
b"X" * 512 + b"Y" * 512, # Both chunks different
b"11112222", # Two chunks
b"XXXXYYYY", # Both chunks different
2, # Both chunks should be fetched
True,
),
(
"partial_last_chunk",
b"A" * 512 + b"B" * 100, # One full chunk + partial
b"A" * 512 + b"X" * 100, # Partial chunk different
b"111122", # One full chunk + partial
b"1111XX", # Partial chunk different
1, # Only second chunk should be fetched
True,
),
],
)
def test_compare_and_extract_chunks(
setup_extractor, name, test_data, initial_data, expected_fetched_chunks, expected_success
):
def test_compare_and_extract_chunks(setup_extractor, name, item_data, fs_data, expected_fetched_chunks):
"""Test chunk comparison and extraction"""
extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks = setup_extractor
clear_fetched_chunks()

item, target_path = create_mock_chunks(test_data, chunk_size=512)
chunk_size = 4
item, target_path = create_mock_chunks(item_data, chunk_size=chunk_size)

original_chunk_ids = [chunk.id for chunk in item.chunks]

# Write initial file state
with open(target_path, "wb") as f:
f.write(initial_data)

result = extractor.compare_and_extract_chunks(item, target_path)
assert result == expected_success

if expected_success:
# Verify only the expected chunks were fetched
fetched_chunks = get_fetched_chunks()
assert (
len(fetched_chunks) == expected_fetched_chunks
), f"Expected {expected_fetched_chunks} chunks to be fetched, got {len(fetched_chunks)}"

# For single chunk changes, verify it's the correct chunk
if expected_fetched_chunks == 1:
# Find which chunk should have changed by comparing initial_data with test_data
for i, (orig_chunk, mod_chunk) in enumerate(
zip(
[test_data[i : i + 512] for i in range(0, len(test_data), 512)],
[initial_data[i : i + 512] for i in range(0, len(initial_data), 512)],
)
):
if orig_chunk != mod_chunk:
assert (
fetched_chunks[0] == original_chunk_ids[i]
), f"Wrong chunk fetched. Expected chunk at position {i}"
break

# Verify final content
with open(target_path, "rb") as f:
assert f.read() == test_data
f.write(fs_data)

st = os.stat(target_path)
result = extractor.compare_and_extract_chunks(item, target_path, st=st) # Pass st parameter
assert result

# Verify only the expected chunks were fetched
fetched_chunks = get_fetched_chunks()
assert len(fetched_chunks) == expected_fetched_chunks

# For single chunk changes, verify it's the correct chunk
if expected_fetched_chunks == 1:
item_chunks = [item_data[i : i + chunk_size] for i in range(0, len(item_data), chunk_size)]
fs_chunks = [fs_data[i : i + chunk_size] for i in range(0, len(fs_data), chunk_size)]

# Find which chunk should have changed by comparing item_data with fs_data
for i, (item_chunk, fs_chunk) in enumerate(zip(item_chunks, fs_chunks)):
if item_chunk != fs_chunk:
assert fetched_chunks[0] == original_chunk_ids[i]
break

# Verify final content
with open(target_path, "rb") as f:
assert f.read() == item_data

0 comments on commit 6358ce6

Please sign in to comment.