diff --git a/src/backups2datalad/adataset.py b/src/backups2datalad/adataset.py index 22d9d58..f801834 100644 --- a/src/backups2datalad/adataset.py +++ b/src/backups2datalad/adataset.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections import Counter -from collections.abc import AsyncGenerator, Sequence +from collections.abc import AsyncGenerator, Iterable, Sequence from contextlib import aclosing from dataclasses import InitVar, dataclass, field, replace from datetime import datetime @@ -11,6 +11,7 @@ from pathlib import Path import re import subprocess +import tempfile import textwrap from typing import Any, ClassVar @@ -386,6 +387,28 @@ async def remove(self, path: str) -> None: ) await anyio.sleep(delay) + async def remove_batch(self, paths: Iterable[str]) -> None: + pathlist = list(paths) + if not pathlist: + return + # `paths` must be relative to the root of the dataset + async with self.lock: + # to avoid problems with locking etc. Same is done in DataLad's + # invocation of rm + self.ds.repo.precommit() + with tempfile.NamedTemporaryFile(mode="w") as fp: + for p in pathlist: + print(p, end="\0", file=fp) + fp.flush() + fp.seek(0) + await self.call_git( + "rm", + "-f", + "--ignore-unmatch", + f"--pathspec-from-file={fp.name}", + "--pathspec-file-nul", + ) + async def update(self, how: str, sibling: str | None = None) -> None: await anyio.to_thread.run_sync( partial(self.ds.update, how=how, sibling=sibling) diff --git a/src/backups2datalad/annex.py b/src/backups2datalad/annex.py index c90f6cf..0752f45 100644 --- a/src/backups2datalad/annex.py +++ b/src/backups2datalad/annex.py @@ -140,7 +140,7 @@ async def register_url(self, key: str, url: str) -> None: ) ### TODO: Raise an exception? - async def list_files(self) -> AsyncGenerator[str, None]: + async def list_files(self, path: Path | None = None) -> AsyncGenerator[str, None]: async with aclosing( stream_null_command( "git", @@ -150,8 +150,12 @@ async def list_files(self) -> AsyncGenerator[str, None]: "--name-only", "-z", "HEAD", + *([str(path)] if path is not None else []), cwd=self.repo, ) ) as p: async for fname in p: - yield fname + if path is not None: + yield (path / fname).as_posix() + else: + yield fname diff --git a/src/backups2datalad/zarr.py b/src/backups2datalad/zarr.py index dad6558..3312c32 100644 --- a/src/backups2datalad/zarr.py +++ b/src/backups2datalad/zarr.py @@ -5,7 +5,7 @@ from dataclasses import dataclass, field from datetime import datetime import os -from pathlib import Path +from pathlib import Path, PurePosixPath from typing import TYPE_CHECKING from urllib.parse import quote, quote_plus @@ -129,11 +129,11 @@ def __post_init__(self) -> None: async def run(self) -> None: last_sync = self.read_sync_file() async with aclosing(self.annex.list_files()) as fileiter: - local_paths = {f async for f in fileiter if not is_meta_file(f)} + to_delete = {f async for f in fileiter if not is_meta_file(f)} async with get_session().create_client( "s3", config=AioConfig(signature_version=UNSIGNED) ) as client: - if not await self.needs_sync(client, last_sync, local_paths): + if not await self.needs_sync(client, last_sync, to_delete): self.log.info("backup up to date") return self.log.info("sync needed") @@ -141,6 +141,7 @@ async def run(self) -> None: while True: orig_checksum = await self.get_local_checksum() zcc = ZarrChecksumTree() + to_update = [] async with aclosing(self.aiter_file_entries(client)) as ait: async for entry in ait: if is_meta_file(str(entry)): @@ -150,7 +151,7 @@ async def run(self) -> None: ) self.log.debug("%s: Syncing", entry) zcc.add_leaf(Path(entry.path), entry.size, entry.md5_digest) - local_paths.discard(str(entry)) + to_delete.discard(str(entry)) if self.mode is ZarrMode.TIMESTAMP: if ( last_sync is not None @@ -162,6 +163,7 @@ async def run(self) -> None: continue self.check_change(f"entry {entry!r} was modified/added") dest = self.repo / str(entry) + conflicted = False if dest.is_dir(): # File path is replacing a directory, which needs # to be deleted @@ -173,7 +175,8 @@ async def run(self) -> None: "%s: deleting conflicting directory at same path", entry, ) - await self.rmtree(dest, local_paths) + to_delete |= await self.under_tree(dest) + conflicted = True else: for ep in entry.parents: pp = self.repo / ep @@ -189,17 +192,15 @@ async def run(self) -> None: entry, ep, ) - await self.ds.remove(ep) - local_paths.discard(ep) - self.report.deleted += 1 + to_delete.add(str(ep)) + conflicted = True break elif pp.is_dir(): break - to_update = False - if not (dest.exists() or dest.is_symlink()): + if conflicted or not (dest.exists() or dest.is_symlink()): self.check_change(f"entry {str(entry)!r} added") self.log.debug("%s: Not in dataset; will add", entry) - to_update = True + to_update.append(entry) self.report.added += 1 else: self.log.debug("%s: About to fetch hash from annex", entry) @@ -216,36 +217,36 @@ async def run(self) -> None: " modification; will update", entry, ) - to_update = True + to_update.append(entry) + to_delete.add(str(entry)) self.report.updated += 1 - if to_update: - await self.ds.remove(str(entry)) - key = await self.annex.mkkey( - entry.name, entry.size, entry.md5_digest - ) - remotes = await self.annex.get_key_remotes(key) - await self.annex.from_key(key, str(entry)) - await self.register_url(str(entry), key, entry.bucket_url) - prefix = quote_plus(str(entry)) - await self.register_url( - str(entry), - key, - ( - f"{self.api_url}/zarr/{self.zarr_id}/files" - f"?prefix={prefix}&download=true" - ), - ) - if ( - remotes is not None - and self.backup_remote is not None - and self.backup_remote not in remotes - ): - self.log.info( - "%s: Not in backup remote %s", - entry, - self.backup_remote, - ) - await self.prune_deleted(local_paths) + await self.prune_deleted(to_delete) + for entry in to_update: + key = await self.annex.mkkey( + entry.name, entry.size, entry.md5_digest + ) + remotes = await self.annex.get_key_remotes(key) + await self.annex.from_key(key, str(entry)) + await self.register_url(str(entry), key, entry.bucket_url) + prefix = quote_plus(str(entry)) + await self.register_url( + str(entry), + key, + ( + f"{self.api_url}/zarr/{self.zarr_id}/files" + f"?prefix={prefix}&download=true" + ), + ) + if ( + remotes is not None + and self.backup_remote is not None + and self.backup_remote not in remotes + ): + self.log.info( + "%s: Not in backup remote %s", + entry, + self.backup_remote, + ) final_checksum = str(zcc.process()) modern_asset = await self.asset.refetch() changed_during_sync = self.asset.modified != modern_asset.modified @@ -393,29 +394,21 @@ async def needs_sync( else: return False - async def rmtree(self, dirpath: Path, local_paths: set[str]) -> None: - for p in list(dirpath.iterdir()): - if p.is_dir(): - await self.rmtree(p, local_paths) - else: - relpath = p.relative_to(self.repo).as_posix() - self.log.info("deleting %s", p) - await self.ds.remove(relpath) - self.report.deleted += 1 - local_paths.discard(relpath) - with suppress(FileNotFoundError): - dirpath.rmdir() - - async def prune_deleted(self, local_paths: set[str]) -> None: - if local_paths: - self.check_change(f"{quantify(len(local_paths), 'file')} deleted from Zarr") + async def under_tree(self, dirpath: Path) -> set[str]: + reldir = dirpath.relative_to(self.repo) + async with aclosing(self.annex.list_files(reldir)) as fileiter: + return {f async for f in fileiter} + + async def prune_deleted(self, to_delete: set[str]) -> None: + if to_delete: + self.check_change(f"{quantify(len(to_delete), 'file')} deleted from Zarr") self.log.info("deleting extra files") - for path in local_paths: - self.log.info("deleting %s", path) - await self.ds.remove(path) - p = self.repo / path - self.report.deleted += 1 - d = p.parent + for p in to_delete: + self.log.debug("deleting %s", p) + self.report.deleted += len(to_delete) + await self.ds.remove_batch(to_delete) + for p in to_delete: + d = self.repo / PurePosixPath(p).parent while d != self.repo and (not d.exists() or not any(d.iterdir())): with suppress(FileNotFoundError): d.rmdir()