Skip to content

Commit

Permalink
Merge pull request #32 from dandi/gh-31
Browse files Browse the repository at this point in the history
Delete old Zarr entries in a single `git rm` call
  • Loading branch information
yarikoptic authored Feb 19, 2024
2 parents 6771254 + 0d2da8b commit 97579bc
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 65 deletions.
25 changes: 24 additions & 1 deletion src/backups2datalad/adataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from collections import Counter
from collections.abc import AsyncGenerator, Sequence
from collections.abc import AsyncGenerator, Iterable, Sequence
from contextlib import aclosing
from dataclasses import InitVar, dataclass, field, replace
from datetime import datetime
Expand All @@ -11,6 +11,7 @@
from pathlib import Path
import re
import subprocess
import tempfile
import textwrap
from typing import Any, ClassVar

Expand Down Expand Up @@ -386,6 +387,28 @@ async def remove(self, path: str) -> None:
)
await anyio.sleep(delay)

async def remove_batch(self, paths: Iterable[str]) -> None:
pathlist = list(paths)
if not pathlist:
return
# `paths` must be relative to the root of the dataset
async with self.lock:
# to avoid problems with locking etc. Same is done in DataLad's
# invocation of rm
self.ds.repo.precommit()
with tempfile.NamedTemporaryFile(mode="w") as fp:
for p in pathlist:
print(p, end="\0", file=fp)
fp.flush()
fp.seek(0)
await self.call_git(
"rm",
"-f",
"--ignore-unmatch",
f"--pathspec-from-file={fp.name}",
"--pathspec-file-nul",
)

async def update(self, how: str, sibling: str | None = None) -> None:
await anyio.to_thread.run_sync(
partial(self.ds.update, how=how, sibling=sibling)
Expand Down
8 changes: 6 additions & 2 deletions src/backups2datalad/annex.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async def register_url(self, key: str, url: str) -> None:
)
### TODO: Raise an exception?

async def list_files(self) -> AsyncGenerator[str, None]:
async def list_files(self, path: Path | None = None) -> AsyncGenerator[str, None]:
async with aclosing(
stream_null_command(
"git",
Expand All @@ -150,8 +150,12 @@ async def list_files(self) -> AsyncGenerator[str, None]:
"--name-only",
"-z",
"HEAD",
*([str(path)] if path is not None else []),
cwd=self.repo,
)
) as p:
async for fname in p:
yield fname
if path is not None:
yield (path / fname).as_posix()
else:
yield fname
117 changes: 55 additions & 62 deletions src/backups2datalad/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataclasses import dataclass, field
from datetime import datetime
import os
from pathlib import Path
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING
from urllib.parse import quote, quote_plus

Expand Down Expand Up @@ -129,18 +129,19 @@ def __post_init__(self) -> None:
async def run(self) -> None:
last_sync = self.read_sync_file()
async with aclosing(self.annex.list_files()) as fileiter:
local_paths = {f async for f in fileiter if not is_meta_file(f)}
to_delete = {f async for f in fileiter if not is_meta_file(f)}
async with get_session().create_client(
"s3", config=AioConfig(signature_version=UNSIGNED)
) as client:
if not await self.needs_sync(client, last_sync, local_paths):
if not await self.needs_sync(client, last_sync, to_delete):
self.log.info("backup up to date")
return
self.log.info("sync needed")
i = 0
while True:
orig_checksum = await self.get_local_checksum()
zcc = ZarrChecksumTree()
to_update = []
async with aclosing(self.aiter_file_entries(client)) as ait:
async for entry in ait:
if is_meta_file(str(entry)):
Expand All @@ -150,7 +151,7 @@ async def run(self) -> None:
)
self.log.debug("%s: Syncing", entry)
zcc.add_leaf(Path(entry.path), entry.size, entry.md5_digest)
local_paths.discard(str(entry))
to_delete.discard(str(entry))
if self.mode is ZarrMode.TIMESTAMP:
if (
last_sync is not None
Expand All @@ -162,6 +163,7 @@ async def run(self) -> None:
continue
self.check_change(f"entry {entry!r} was modified/added")
dest = self.repo / str(entry)
conflicted = False
if dest.is_dir():
# File path is replacing a directory, which needs
# to be deleted
Expand All @@ -173,7 +175,8 @@ async def run(self) -> None:
"%s: deleting conflicting directory at same path",
entry,
)
await self.rmtree(dest, local_paths)
to_delete |= await self.under_tree(dest)
conflicted = True
else:
for ep in entry.parents:
pp = self.repo / ep
Expand All @@ -189,17 +192,15 @@ async def run(self) -> None:
entry,
ep,
)
await self.ds.remove(ep)
local_paths.discard(ep)
self.report.deleted += 1
to_delete.add(str(ep))
conflicted = True
break
elif pp.is_dir():
break
to_update = False
if not (dest.exists() or dest.is_symlink()):
if conflicted or not (dest.exists() or dest.is_symlink()):
self.check_change(f"entry {str(entry)!r} added")
self.log.debug("%s: Not in dataset; will add", entry)
to_update = True
to_update.append(entry)
self.report.added += 1
else:
self.log.debug("%s: About to fetch hash from annex", entry)
Expand All @@ -216,36 +217,36 @@ async def run(self) -> None:
" modification; will update",
entry,
)
to_update = True
to_update.append(entry)
to_delete.add(str(entry))
self.report.updated += 1
if to_update:
await self.ds.remove(str(entry))
key = await self.annex.mkkey(
entry.name, entry.size, entry.md5_digest
)
remotes = await self.annex.get_key_remotes(key)
await self.annex.from_key(key, str(entry))
await self.register_url(str(entry), key, entry.bucket_url)
prefix = quote_plus(str(entry))
await self.register_url(
str(entry),
key,
(
f"{self.api_url}/zarr/{self.zarr_id}/files"
f"?prefix={prefix}&download=true"
),
)
if (
remotes is not None
and self.backup_remote is not None
and self.backup_remote not in remotes
):
self.log.info(
"%s: Not in backup remote %s",
entry,
self.backup_remote,
)
await self.prune_deleted(local_paths)
await self.prune_deleted(to_delete)
for entry in to_update:
key = await self.annex.mkkey(
entry.name, entry.size, entry.md5_digest
)
remotes = await self.annex.get_key_remotes(key)
await self.annex.from_key(key, str(entry))
await self.register_url(str(entry), key, entry.bucket_url)
prefix = quote_plus(str(entry))
await self.register_url(
str(entry),
key,
(
f"{self.api_url}/zarr/{self.zarr_id}/files"
f"?prefix={prefix}&download=true"
),
)
if (
remotes is not None
and self.backup_remote is not None
and self.backup_remote not in remotes
):
self.log.info(
"%s: Not in backup remote %s",
entry,
self.backup_remote,
)
final_checksum = str(zcc.process())
modern_asset = await self.asset.refetch()
changed_during_sync = self.asset.modified != modern_asset.modified
Expand Down Expand Up @@ -393,29 +394,21 @@ async def needs_sync(
else:
return False

async def rmtree(self, dirpath: Path, local_paths: set[str]) -> None:
for p in list(dirpath.iterdir()):
if p.is_dir():
await self.rmtree(p, local_paths)
else:
relpath = p.relative_to(self.repo).as_posix()
self.log.info("deleting %s", p)
await self.ds.remove(relpath)
self.report.deleted += 1
local_paths.discard(relpath)
with suppress(FileNotFoundError):
dirpath.rmdir()

async def prune_deleted(self, local_paths: set[str]) -> None:
if local_paths:
self.check_change(f"{quantify(len(local_paths), 'file')} deleted from Zarr")
async def under_tree(self, dirpath: Path) -> set[str]:
reldir = dirpath.relative_to(self.repo)
async with aclosing(self.annex.list_files(reldir)) as fileiter:
return {f async for f in fileiter}

async def prune_deleted(self, to_delete: set[str]) -> None:
if to_delete:
self.check_change(f"{quantify(len(to_delete), 'file')} deleted from Zarr")
self.log.info("deleting extra files")
for path in local_paths:
self.log.info("deleting %s", path)
await self.ds.remove(path)
p = self.repo / path
self.report.deleted += 1
d = p.parent
for p in to_delete:
self.log.debug("deleting %s", p)
self.report.deleted += len(to_delete)
await self.ds.remove_batch(to_delete)
for p in to_delete:
d = self.repo / PurePosixPath(p).parent
while d != self.repo and (not d.exists() or not any(d.iterdir())):
with suppress(FileNotFoundError):
d.rmdir()
Expand Down

0 comments on commit 97579bc

Please sign in to comment.