From cc22e38dfb03e61a74f49e1bc9cb4a1d7b03802b Mon Sep 17 00:00:00 2001 From: Dmitry Potepalov Date: Fri, 6 Oct 2023 15:36:21 +0200 Subject: [PATCH] Release extra files in dst after upload Allow the coordinator to release the files that were uploaded during backup creation. Helps to free disk space after the backup was uploaded. Especially useful for Cassandra, since because of compactions we end up keeping around copies of old sstables. --- astacus/common/ipc.py | 5 ++ astacus/coordinator/cluster.py | 4 +- astacus/coordinator/plugins/base.py | 30 ++++++++ .../coordinator/plugins/cassandra/plugin.py | 1 + astacus/node/api.py | 20 ++++- astacus/node/snapshot.py | 22 ++++++ astacus/node/snapshotter.py | 10 +++ tests/unit/coordinator/plugins/test_base.py | 77 ++++++++++++++++--- tests/unit/node/test_snapshotter.py | 43 ++++++++++- 9 files changed, 196 insertions(+), 16 deletions(-) diff --git a/astacus/common/ipc.py b/astacus/common/ipc.py index cf7ea5f1..aca2bc3d 100644 --- a/astacus/common/ipc.py +++ b/astacus/common/ipc.py @@ -189,6 +189,11 @@ class SnapshotClearRequest(NodeRequest): root_globs: Sequence[str] +class SnapshotReleaseRequest(NodeRequest): + # Files matching these digests will be unlinked in snapshotter's dst + hexdigests: Sequence[str] + + # node.cassandra diff --git a/astacus/coordinator/cluster.py b/astacus/coordinator/cluster.py index 395c81ae..d3f31811 100644 --- a/astacus/coordinator/cluster.py +++ b/astacus/coordinator/cluster.py @@ -67,8 +67,8 @@ async def request_from_nodes( *, caller: str, req: Optional[ipc.NodeRequest] = None, - reqs: Optional[List[ipc.NodeRequest]] = None, - nodes: Optional[List[CoordinatorNode]] = None, + reqs: Optional[Sequence[ipc.NodeRequest]] = None, + nodes: Optional[Sequence[CoordinatorNode]] = None, **kw, ) -> Sequence[Optional[Result]]: """Perform asynchronously parallel request to the node components. diff --git a/astacus/coordinator/plugins/base.py b/astacus/coordinator/plugins/base.py index b2f286d7..113a5fb6 100644 --- a/astacus/coordinator/plugins/base.py +++ b/astacus/coordinator/plugins/base.py @@ -191,6 +191,36 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.No return await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult) +@dataclasses.dataclass +class SnapshotReleaseStep(Step[List[ipc.NodeResult]]): + """ + Request to release the files we don't need any more in the destination hierarchy. + + Allows to free some disk space before the next backup happens. + """ + + async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.NodeResult]: + snapshot_results = context.get_result(SnapshotStep) + nodes_metadata = await get_nodes_metadata(cluster) + all_nodes_have_release_feature = nodes_metadata and all( + Features.release_snapshot_files.value in n.features for n in nodes_metadata + ) + if not all_nodes_have_release_feature: + logger.info("Skipped SnapshotReleaseStep because some nodes don't support it, node features: %s", nodes_metadata) + return [] + node_requests = [ + ipc.SnapshotReleaseRequest(hexdigests=self._hexdigests_from_hashes(s.hashes)) for s in snapshot_results + ] + start_results = await cluster.request_from_nodes( + "release", method="post", caller="SnapshotReleaseStep", reqs=node_requests + ) + return await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult) + + def _hexdigests_from_hashes(self, hashes: Optional[List[ipc.SnapshotHash]]) -> Sequence[str]: + assert hashes is not None + return [h.hexdigest for h in hashes] + + @dataclasses.dataclass class UploadManifestStep(Step[None]): """ diff --git a/astacus/coordinator/plugins/cassandra/plugin.py b/astacus/coordinator/plugins/cassandra/plugin.py index e8046ee3..b9edbcd1 100644 --- a/astacus/coordinator/plugins/cassandra/plugin.py +++ b/astacus/coordinator/plugins/cassandra/plugin.py @@ -98,6 +98,7 @@ def get_backup_steps(self, *, context: OperationContext) -> List[Step]: base.ListHexdigestsStep(hexdigest_storage=context.hexdigest_storage), base.UploadBlocksStep(storage_name=context.storage_name), CassandraSubOpStep(op=ipc.CassandraSubOp.remove_snapshot), + base.SnapshotReleaseStep(), base.UploadManifestStep( json_storage=context.json_storage, plugin=ipc.Plugin.cassandra, diff --git a/astacus/node/api.py b/astacus/node/api.py index 8fa5d323..1e60e30c 100644 --- a/astacus/node/api.py +++ b/astacus/node/api.py @@ -6,7 +6,7 @@ from .clear import ClearOp from .download import DownloadOp from .node import Node -from .snapshot import SnapshotOp, UploadOp +from .snapshot import ReleaseOp, SnapshotOp, UploadOp from .state import node_state, NodeState from astacus.common import ipc from astacus.common.magic import StrEnum @@ -35,6 +35,7 @@ class OpName(StrEnum): download = "download" snapshot = "snapshot" upload = "upload" + release = "release" class Features(Enum): @@ -42,6 +43,8 @@ class Features(Enum): validate_file_hashes = "validate_file_hashes" # Added on 2023-06-07 snapshot_groups = "snapshot_groups" + # Added on 2023-10-16 + release_snapshot_files = "release_snapshot_files" def is_allowed(subop: ipc.CassandraSubOp, access_level: CassandraAccessLevel): @@ -149,6 +152,21 @@ def delta_upload_result(*, op_id: int, n: Node = Depends()): return op.result +@router.post("/release") +def release(req: ipc.SnapshotReleaseRequest, n: Node = Depends()): + if not n.state.is_locked: + raise HTTPException(status_code=409, detail="Not locked") + snapshotter = n.get_snapshotter() + assert snapshotter + return ReleaseOp(n=n, op_id=n.allocate_op_id(), stats=n.stats, req=req).start(snapshotter) + + +@router.get("/release/{op_id}") +def release_result(*, op_id: int, n: Node = Depends()): + op, _ = n.get_op_and_op_info(op_id=op_id, op_name=OpName.release) + return op.result + + @router.post("/download") def download(req: ipc.SnapshotDownloadRequest, n: Node = Depends()): if not n.state.is_locked: diff --git a/astacus/node/snapshot.py b/astacus/node/snapshot.py index 533c3391..cafa1e68 100644 --- a/astacus/node/snapshot.py +++ b/astacus/node/snapshot.py @@ -85,3 +85,25 @@ def upload(self) -> None: validate_file_hashes=self.req.validate_file_hashes, ) self.result.progress.done() + + +class ReleaseOp(NodeOp[ipc.SnapshotReleaseRequest, ipc.NodeResult]): + snapshotter: Optional[Snapshotter] = None + + def create_result(self) -> ipc.NodeResult: + return ipc.NodeResult() + + def start(self, snapshotter: Snapshotter) -> NodeOp.StartResult: + logger.info("start_release %r", self.req) + self.snapshotter = snapshotter + return self.start_op(op_name="release", op=self, fun=self.release) + + def release(self) -> None: + assert self.snapshotter + with self.snapshotter.lock: + self.check_op_id() + self.result.progress.add_total(len(self.req.hexdigests)) + for hexdigest in self.req.hexdigests: + self.snapshotter.release(hexdigest) + self.result.progress.add_success() + self.result.progress.done() diff --git a/astacus/node/snapshotter.py b/astacus/node/snapshotter.py index 9f63a123..0af40121 100644 --- a/astacus/node/snapshotter.py +++ b/astacus/node/snapshotter.py @@ -113,6 +113,10 @@ def _remove_snapshotfile(self, snapshotfile: SnapshotFile) -> None: if snapshotfile.hexdigest: self.hexdigest_to_snapshotfiles[snapshotfile.hexdigest].remove(snapshotfile) + def _release_snapshotfile(self, snapshotfile: SnapshotFile) -> None: + dst_path = self.dst / snapshotfile.relative_path + dst_path.unlink(missing_ok=True) + def _snapshotfile_from_path(self, relative_path) -> SnapshotFile: src_path = self.src / relative_path st = src_path.stat() @@ -273,3 +277,9 @@ def _result_cb(*, map_in: SnapshotFile, map_out: SnapshotFile) -> bool: progress.add_success() return changes + + def release(self, hexdigest: str) -> None: + assert self.lock.locked() + assert self.src != self.dst + for snapshotfile in self.hexdigest_to_snapshotfiles.get(hexdigest, []): + self._release_snapshotfile(snapshotfile) diff --git a/tests/unit/coordinator/plugins/test_base.py b/tests/unit/coordinator/plugins/test_base.py index 2c1034ee..3ed49748 100644 --- a/tests/unit/coordinator/plugins/test_base.py +++ b/tests/unit/coordinator/plugins/test_base.py @@ -14,6 +14,7 @@ ComputeKeptBackupsStep, ListBackupsStep, ListHexdigestsStep, + SnapshotReleaseStep, SnapshotStep, Step, StepsContext, @@ -26,7 +27,7 @@ from io import BytesIO from pydantic import Field from tests.unit.json_storage import MemoryJsonStorage -from typing import AbstractSet, List, Optional, Sequence +from typing import AbstractSet, Callable, List, Optional, Sequence from unittest import mock import datetime @@ -81,6 +82,17 @@ def fixture_context() -> StepsContext: return StepsContext() +def make_request_check(expected_payload: dict, op_name: str) -> Callable[[httpx.Request], httpx.Response]: + def check_request(request: httpx.Request) -> httpx.Response: + payload = json.loads(request.content) + assert payload == expected_payload + return httpx.Response( + status_code=HTTPStatus.OK, json=Op.StartResult(op_id=1, status_url=f"http://node_1/{op_name}/1").jsondict() + ) + + return check_request + + @pytest.mark.asyncio @pytest.mark.parametrize( "node_features,expected_request", @@ -111,19 +123,13 @@ async def test_upload_step_uses_new_request_if_supported( ) upload_step = UploadBlocksStep(storage_name="fake") with respx.mock: - - def check_request(request: httpx.Request) -> httpx.Response: - payload = json.loads(request.content) - assert payload == expected_request.jsondict() - return httpx.Response( - status_code=HTTPStatus.OK, json=Op.StartResult(op_id=1, status_url="http://node_1/upload/1").jsondict() - ) - - respx.get("http://node_1/metadata").respond( + metadata_request = respx.get("http://node_1/metadata").respond( json=ipc.MetadataResult(version="0.1", features=[feature.value for feature in node_features]).jsondict() ) - respx.post("http://node_1/upload").mock(side_effect=check_request) - respx.get("http://node_1/upload/1").respond( + upload_request = respx.post("http://node_1/upload").mock( + side_effect=make_request_check(expected_request.jsondict(), "upload") + ) + status_request = respx.get("http://node_1/upload/1").respond( json=ipc.SnapshotUploadResult( hostname="localhost", az="az1", @@ -133,6 +139,9 @@ def check_request(request: httpx.Request) -> httpx.Response: ).jsondict() ) await upload_step.run_step(cluster=single_node_cluster, context=context) + assert metadata_request.call_count == 1 + assert upload_request.call_count == 1 + assert status_request.called BACKUPS_FOR_RETENTION_TEST = { @@ -232,3 +241,47 @@ async def test_upload_manifest_step_generates_correct_backup_name( step = UploadManifestStep(json_storage=async_json_storage, plugin=ipc.Plugin.files) await step.run_step(cluster=single_node_cluster, context=context) assert "backup-2020-01-07T05:00:00+00:00" in async_json_storage.storage.items + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "node_features,expected_request", + [ + ([], None), + ( + [Features.release_snapshot_files], + ipc.SnapshotReleaseRequest(hexdigests=["aaa", "bbb"]), + ), + ], +) +async def test_snapshot_release_step( + node_features: Sequence[Features], + expected_request: Optional[ipc.SnapshotReleaseRequest], + single_node_cluster: Cluster, + context: StepsContext, +) -> None: + hashes_to_release = [ipc.SnapshotHash(hexdigest="aaa", size=1), ipc.SnapshotHash(hexdigest="bbb", size=2)] + context.set_result(SnapshotStep, [DefaultedSnapshotResult(hashes=hashes_to_release)]) + release_step = SnapshotReleaseStep() + + with respx.mock: + metadata_request = respx.get("http://node_1/metadata").respond( + json=ipc.MetadataResult(version="0.1", features=[feature.value for feature in node_features]).jsondict() + ) + if Features.release_snapshot_files in node_features: + assert expected_request is not None + release_request = respx.post("http://node_1/release").mock( + side_effect=make_request_check(expected_request.jsondict(), "release") + ) + status_request = respx.get("http://node_1/release/1").respond( + json=ipc.NodeResult( + hostname="localhost", + az="az1", + progress=Progress(handled=2, total=2, final=True), + ).jsondict() + ) + await release_step.run_step(cluster=single_node_cluster, context=context) + assert metadata_request.call_count == 1 + if Features.release_snapshot_files in node_features: + assert release_request.call_count == 1 + assert status_request.called diff --git a/tests/unit/node/test_snapshotter.py b/tests/unit/node/test_snapshotter.py index 02a56d93..118e1c86 100644 --- a/tests/unit/node/test_snapshotter.py +++ b/tests/unit/node/test_snapshotter.py @@ -4,7 +4,7 @@ """ from astacus.common.progress import Progress from astacus.common.snapshot import SnapshotGroup -from astacus.node.snapshotter import Snapshotter +from astacus.node.snapshotter import hash_hexdigest_readable, Snapshotter from pathlib import Path @@ -21,3 +21,44 @@ def test_snapshotter_with_src_equal_dst_forgets_file_from_previous_snapshot(tmp_ file_after.write_bytes(b"y" * 1024) snapshotter.snapshot(progress=Progress()) assert snapshotter.relative_path_to_snapshotfile.keys() == {Path("file_after")} + + +def assert_kept(hexdigest: str, dst_path: Path, snapshotter: Snapshotter): + assert dst_path.exists() + assert hexdigest in snapshotter.hexdigest_to_snapshotfiles + assert Path(dst_path.name) in snapshotter.relative_path_to_snapshotfile + + +def assert_released(hexdigest: str, dst_path: Path, snapshotter: Snapshotter): + assert not dst_path.exists() + assert hexdigest in snapshotter.hexdigest_to_snapshotfiles + assert Path(dst_path.name) in snapshotter.relative_path_to_snapshotfile + + +def test_snapshotter_release_hash_unlinks_files_but_keeps_metadata(tmp_path: Path) -> None: + src = tmp_path / "src" + dst = tmp_path / "dst" + src.mkdir() + dst.mkdir() + (src / "keep_this").write_text("this will be kept") + kept_digest = hash_hexdigest_readable((src / "keep_this").open(mode="rb")) + (src / "release_this").write_text("this will be released") + released_digest = hash_hexdigest_readable((src / "release_this").open(mode="rb")) + snapshotter = Snapshotter(src=src, dst=dst, groups=[SnapshotGroup(root_glob="*", embedded_file_size_max=0)], parallel=1) + + with snapshotter.lock: + snapshotter.snapshot(progress=Progress()) + assert_kept(kept_digest, dst / "keep_this", snapshotter) + assert_kept(released_digest, dst / "release_this", snapshotter) + + snapshotter.release(released_digest) + assert_kept(kept_digest, dst / "keep_this", snapshotter) + assert_released(released_digest, dst / "release_this", snapshotter) + + # re-snapshotting should restore the link + (src / "add_this").write_text("this is added for the next snapshot") + added_digest = hash_hexdigest_readable((src / "add_this").open(mode="rb")) + snapshotter.snapshot(progress=Progress()) + assert_kept(kept_digest, dst / "keep_this", snapshotter) + assert_kept(released_digest, dst / "release_this", snapshotter) + assert_kept(added_digest, dst / "add_this", snapshotter)