Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release extra files in dst after upload #150

Merged
merged 1 commit into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions astacus/common/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ class SnapshotClearRequest(NodeRequest):
root_globs: Sequence[str]


class SnapshotReleaseRequest(NodeRequest):
# Files matching these digests will be unlinked in snapshotter's dst
hexdigests: Sequence[str]


# node.cassandra


Expand Down
4 changes: 2 additions & 2 deletions astacus/coordinator/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ async def request_from_nodes(
*,
caller: str,
req: Optional[ipc.NodeRequest] = None,
reqs: Optional[List[ipc.NodeRequest]] = None,
nodes: Optional[List[CoordinatorNode]] = None,
reqs: Optional[Sequence[ipc.NodeRequest]] = None,
nodes: Optional[Sequence[CoordinatorNode]] = None,
**kw,
) -> Sequence[Optional[Result]]:
"""Perform asynchronously parallel request to the node components.
Expand Down
30 changes: 30 additions & 0 deletions astacus/coordinator/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,36 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.No
return await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult)


@dataclasses.dataclass
class SnapshotReleaseStep(Step[List[ipc.NodeResult]]):
"""
Request to release the files we don't need any more in the destination hierarchy.

Allows to free some disk space before the next backup happens.
"""

async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.NodeResult]:
snapshot_results = context.get_result(SnapshotStep)
nodes_metadata = await get_nodes_metadata(cluster)
all_nodes_have_release_feature = nodes_metadata and all(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: nodes_all_have_feature could be a helper.

Features.release_snapshot_files.value in n.features for n in nodes_metadata
)
if not all_nodes_have_release_feature:
logger.info("Skipped SnapshotReleaseStep because some nodes don't support it, node features: %s", nodes_metadata)
return []
node_requests = [
ipc.SnapshotReleaseRequest(hexdigests=self._hexdigests_from_hashes(s.hashes)) for s in snapshot_results
]
start_results = await cluster.request_from_nodes(
"release", method="post", caller="SnapshotReleaseStep", reqs=node_requests
)
return await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult)

def _hexdigests_from_hashes(self, hashes: Optional[List[ipc.SnapshotHash]]) -> Sequence[str]:
assert hashes is not None
return [h.hexdigest for h in hashes]


@dataclasses.dataclass
class UploadManifestStep(Step[None]):
"""
Expand Down
1 change: 1 addition & 0 deletions astacus/coordinator/plugins/cassandra/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def get_backup_steps(self, *, context: OperationContext) -> List[Step]:
base.ListHexdigestsStep(hexdigest_storage=context.hexdigest_storage),
base.UploadBlocksStep(storage_name=context.storage_name),
CassandraSubOpStep(op=ipc.CassandraSubOp.remove_snapshot),
base.SnapshotReleaseStep(),
base.UploadManifestStep(
json_storage=context.json_storage,
plugin=ipc.Plugin.cassandra,
Expand Down
20 changes: 19 additions & 1 deletion astacus/node/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .clear import ClearOp
from .download import DownloadOp
from .node import Node
from .snapshot import SnapshotOp, UploadOp
from .snapshot import ReleaseOp, SnapshotOp, UploadOp
from .state import node_state, NodeState
from astacus.common import ipc
from astacus.common.magic import StrEnum
Expand Down Expand Up @@ -35,13 +35,16 @@ class OpName(StrEnum):
download = "download"
snapshot = "snapshot"
upload = "upload"
release = "release"


class Features(Enum):
# Added on 2022-11-29, this can be assumed to be supported everywhere after 1 or 2 years
validate_file_hashes = "validate_file_hashes"
# Added on 2023-06-07
snapshot_groups = "snapshot_groups"
# Added on 2023-10-16
release_snapshot_files = "release_snapshot_files"


def is_allowed(subop: ipc.CassandraSubOp, access_level: CassandraAccessLevel):
Expand Down Expand Up @@ -149,6 +152,21 @@ def delta_upload_result(*, op_id: int, n: Node = Depends()):
return op.result


@router.post("/release")
def release(req: ipc.SnapshotReleaseRequest, n: Node = Depends()):
if not n.state.is_locked:
raise HTTPException(status_code=409, detail="Not locked")
snapshotter = n.get_snapshotter()
assert snapshotter
return ReleaseOp(n=n, op_id=n.allocate_op_id(), stats=n.stats, req=req).start(snapshotter)


@router.get("/release/{op_id}")
def release_result(*, op_id: int, n: Node = Depends()):
op, _ = n.get_op_and_op_info(op_id=op_id, op_name=OpName.release)
return op.result


@router.post("/download")
def download(req: ipc.SnapshotDownloadRequest, n: Node = Depends()):
if not n.state.is_locked:
Expand Down
22 changes: 22 additions & 0 deletions astacus/node/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,25 @@ def upload(self) -> None:
validate_file_hashes=self.req.validate_file_hashes,
)
self.result.progress.done()


class ReleaseOp(NodeOp[ipc.SnapshotReleaseRequest, ipc.NodeResult]):
snapshotter: Optional[Snapshotter] = None

def create_result(self) -> ipc.NodeResult:
return ipc.NodeResult()

def start(self, snapshotter: Snapshotter) -> NodeOp.StartResult:
logger.info("start_release %r", self.req)
self.snapshotter = snapshotter
return self.start_op(op_name="release", op=self, fun=self.release)

def release(self) -> None:
assert self.snapshotter
with self.snapshotter.lock:
self.check_op_id()
self.result.progress.add_total(len(self.req.hexdigests))
for hexdigest in self.req.hexdigests:
self.snapshotter.release(hexdigest)
self.result.progress.add_success()
self.result.progress.done()
10 changes: 10 additions & 0 deletions astacus/node/snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ def _remove_snapshotfile(self, snapshotfile: SnapshotFile) -> None:
if snapshotfile.hexdigest:
self.hexdigest_to_snapshotfiles[snapshotfile.hexdigest].remove(snapshotfile)

def _release_snapshotfile(self, snapshotfile: SnapshotFile) -> None:
dst_path = self.dst / snapshotfile.relative_path
dst_path.unlink(missing_ok=True)

def _snapshotfile_from_path(self, relative_path) -> SnapshotFile:
src_path = self.src / relative_path
st = src_path.stat()
Expand Down Expand Up @@ -273,3 +277,9 @@ def _result_cb(*, map_in: SnapshotFile, map_out: SnapshotFile) -> bool:
progress.add_success()

return changes

def release(self, hexdigest: str) -> None:
assert self.lock.locked()
assert self.src != self.dst
for snapshotfile in self.hexdigest_to_snapshotfiles.get(hexdigest, []):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snapshot will get out of sync with the disk here. Is that is so that we don't recompute hashes in case a file is not deleted? We will need to be careful to always re-link all files when creating a new snapshot in the SQLLite PR (if I ever get around to finishing that), also obviously if an upload is ever run after this step looks like it will skip the files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the upload would fail (because it'll attempt to read the files), and presently we never re-upload the same snapshot (except when it fails during backup - but then it's done before the release). In the SQLite case we could mark the released rows in a more explicit fashion if needed. This logic is a bit obscure, but the alternative seems to be "snapshot more often" - which is also a bit weird, since why would we want to waste CPU cycles to compute hashes of files we'd never upload?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the upload will fail, just skip the missing files https://github.com/Aiven-Open/astacus/blob/master/astacus/node/uploader.py#L41.

I like the idea of marking released rows, I'll keep that in mind

self._release_snapshotfile(snapshotfile)
77 changes: 65 additions & 12 deletions tests/unit/coordinator/plugins/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ComputeKeptBackupsStep,
ListBackupsStep,
ListHexdigestsStep,
SnapshotReleaseStep,
SnapshotStep,
Step,
StepsContext,
Expand All @@ -26,7 +27,7 @@
from io import BytesIO
from pydantic import Field
from tests.unit.json_storage import MemoryJsonStorage
from typing import AbstractSet, List, Optional, Sequence
from typing import AbstractSet, Callable, List, Optional, Sequence
from unittest import mock

import datetime
Expand Down Expand Up @@ -81,6 +82,17 @@ def fixture_context() -> StepsContext:
return StepsContext()


def make_request_check(expected_payload: dict, op_name: str) -> Callable[[httpx.Request], httpx.Response]:
def check_request(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content)
assert payload == expected_payload
return httpx.Response(
status_code=HTTPStatus.OK, json=Op.StartResult(op_id=1, status_url=f"http://node_1/{op_name}/1").jsondict()
)

return check_request


@pytest.mark.asyncio
@pytest.mark.parametrize(
"node_features,expected_request",
Expand Down Expand Up @@ -111,19 +123,13 @@ async def test_upload_step_uses_new_request_if_supported(
)
upload_step = UploadBlocksStep(storage_name="fake")
with respx.mock:

def check_request(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content)
assert payload == expected_request.jsondict()
return httpx.Response(
status_code=HTTPStatus.OK, json=Op.StartResult(op_id=1, status_url="http://node_1/upload/1").jsondict()
)

respx.get("http://node_1/metadata").respond(
metadata_request = respx.get("http://node_1/metadata").respond(
json=ipc.MetadataResult(version="0.1", features=[feature.value for feature in node_features]).jsondict()
)
respx.post("http://node_1/upload").mock(side_effect=check_request)
respx.get("http://node_1/upload/1").respond(
upload_request = respx.post("http://node_1/upload").mock(
side_effect=make_request_check(expected_request.jsondict(), "upload")
)
status_request = respx.get("http://node_1/upload/1").respond(
json=ipc.SnapshotUploadResult(
hostname="localhost",
az="az1",
Expand All @@ -133,6 +139,9 @@ def check_request(request: httpx.Request) -> httpx.Response:
).jsondict()
)
await upload_step.run_step(cluster=single_node_cluster, context=context)
assert metadata_request.call_count == 1
assert upload_request.call_count == 1
assert status_request.called


BACKUPS_FOR_RETENTION_TEST = {
Expand Down Expand Up @@ -232,3 +241,47 @@ async def test_upload_manifest_step_generates_correct_backup_name(
step = UploadManifestStep(json_storage=async_json_storage, plugin=ipc.Plugin.files)
await step.run_step(cluster=single_node_cluster, context=context)
assert "backup-2020-01-07T05:00:00+00:00" in async_json_storage.storage.items


@pytest.mark.asyncio
@pytest.mark.parametrize(
"node_features,expected_request",
[
([], None),
(
[Features.release_snapshot_files],
ipc.SnapshotReleaseRequest(hexdigests=["aaa", "bbb"]),
),
],
)
async def test_snapshot_release_step(
node_features: Sequence[Features],
expected_request: Optional[ipc.SnapshotReleaseRequest],
single_node_cluster: Cluster,
context: StepsContext,
) -> None:
hashes_to_release = [ipc.SnapshotHash(hexdigest="aaa", size=1), ipc.SnapshotHash(hexdigest="bbb", size=2)]
context.set_result(SnapshotStep, [DefaultedSnapshotResult(hashes=hashes_to_release)])
release_step = SnapshotReleaseStep()

with respx.mock:
metadata_request = respx.get("http://node_1/metadata").respond(
json=ipc.MetadataResult(version="0.1", features=[feature.value for feature in node_features]).jsondict()
)
if Features.release_snapshot_files in node_features:
assert expected_request is not None
release_request = respx.post("http://node_1/release").mock(
side_effect=make_request_check(expected_request.jsondict(), "release")
)
status_request = respx.get("http://node_1/release/1").respond(
json=ipc.NodeResult(
hostname="localhost",
az="az1",
progress=Progress(handled=2, total=2, final=True),
).jsondict()
)
await release_step.run_step(cluster=single_node_cluster, context=context)
assert metadata_request.call_count == 1
if Features.release_snapshot_files in node_features:
assert release_request.call_count == 1
assert status_request.called
43 changes: 42 additions & 1 deletion tests/unit/node/test_snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from astacus.common.progress import Progress
from astacus.common.snapshot import SnapshotGroup
from astacus.node.snapshotter import Snapshotter
from astacus.node.snapshotter import hash_hexdigest_readable, Snapshotter
from pathlib import Path


Expand All @@ -21,3 +21,44 @@ def test_snapshotter_with_src_equal_dst_forgets_file_from_previous_snapshot(tmp_
file_after.write_bytes(b"y" * 1024)
snapshotter.snapshot(progress=Progress())
assert snapshotter.relative_path_to_snapshotfile.keys() == {Path("file_after")}


def assert_kept(hexdigest: str, dst_path: Path, snapshotter: Snapshotter):
assert dst_path.exists()
assert hexdigest in snapshotter.hexdigest_to_snapshotfiles
assert Path(dst_path.name) in snapshotter.relative_path_to_snapshotfile


def assert_released(hexdigest: str, dst_path: Path, snapshotter: Snapshotter):
assert not dst_path.exists()
assert hexdigest in snapshotter.hexdigest_to_snapshotfiles
assert Path(dst_path.name) in snapshotter.relative_path_to_snapshotfile


def test_snapshotter_release_hash_unlinks_files_but_keeps_metadata(tmp_path: Path) -> None:
src = tmp_path / "src"
dst = tmp_path / "dst"
src.mkdir()
dst.mkdir()
(src / "keep_this").write_text("this will be kept")
kept_digest = hash_hexdigest_readable((src / "keep_this").open(mode="rb"))
(src / "release_this").write_text("this will be released")
released_digest = hash_hexdigest_readable((src / "release_this").open(mode="rb"))
snapshotter = Snapshotter(src=src, dst=dst, groups=[SnapshotGroup(root_glob="*", embedded_file_size_max=0)], parallel=1)

with snapshotter.lock:
snapshotter.snapshot(progress=Progress())
assert_kept(kept_digest, dst / "keep_this", snapshotter)
assert_kept(released_digest, dst / "release_this", snapshotter)

snapshotter.release(released_digest)
assert_kept(kept_digest, dst / "keep_this", snapshotter)
assert_released(released_digest, dst / "release_this", snapshotter)

# re-snapshotting should restore the link
(src / "add_this").write_text("this is added for the next snapshot")
added_digest = hash_hexdigest_readable((src / "add_this").open(mode="rb"))
snapshotter.snapshot(progress=Progress())
assert_kept(kept_digest, dst / "keep_this", snapshotter)
assert_kept(released_digest, dst / "release_this", snapshotter)
assert_kept(added_digest, dst / "add_this", snapshotter)
Loading