Skip to content

Commit

Permalink
Merge pull request #152 from Aiven-Open/joelynch/updateable-snapshotg…
Browse files Browse the repository at this point in the history
…roups-2

snapshotter: Use an SQLite backed snapshotter
  • Loading branch information
fingon authored Nov 3, 2023
2 parents 1751e9b + 1082ef8 commit 4ea6911
Show file tree
Hide file tree
Showing 18 changed files with 1,157 additions and 575 deletions.
48 changes: 34 additions & 14 deletions astacus/common/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
"""

from .utils import AstacusModel
from pyparsing import Iterable
from typing import Optional
from typing_extensions import Self, TypeVar

import logging
import math
Expand All @@ -15,7 +18,7 @@
_log_1_1 = math.log(1.1)


def increase_worth_reporting(value, new_value=None, *, total=None):
def increase_worth_reporting(value: int, new_value: Optional[int] = None, *, total: int | None = None):
"""Make reporting sparser and sparser as values grow larger
- report every 1.1**N or so
- if we know total, report every percent
Expand All @@ -36,6 +39,9 @@ def increase_worth_reporting(value, new_value=None, *, total=None):
return old_exp != new_exp


T = TypeVar("T")


class Progress(AstacusModel):
"""JSON-encodable progress meter of sorts"""

Expand All @@ -44,17 +50,31 @@ class Progress(AstacusModel):
total: int = 0
final: bool = False

def __repr__(self):
def __repr__(self) -> str:
finished = ", finished" if self.final else ""
return f"{self.handled}/{self.total} handled, {self.failed} failures{finished}"

def start(self, n):
def wrap(self, i: Iterable[T]) -> Iterable[T]:
"""Iterate over i, updating progress as we go."""
try:
self.add_total(len(i))
except TypeError:
# Can't compute progress for this iterator
return i

for item in i:
yield item
self.add_success()
self.done()
return None

def start(self, n) -> None:
"Optional 'first' step, just for logic handling state (e.g. no progress object reuse desired)"
assert not self.total
logger.info("start")
self.add_total(n)

def add_total(self, n):
def add_total(self, n: int) -> None:
if not n:
return
old_total = self.total
Expand All @@ -63,15 +83,15 @@ def add_total(self, n):
logger.info("add_total %r -> %r", n, self)
assert not self.final

def add_fail(self, n=1, *, info="add_fail"):
def add_fail(self, n: int = 1, *, info: str = "add_fail") -> None:
assert n > 0
old_failed = self.failed
self.failed += n
if increase_worth_reporting(old_failed, self.failed):
logger.info("%s %r -> %r", info, n, self)
assert not self.final

def add_success(self, n=1, *, info="add_success"):
def add_success(self, n: int = 1, *, info: str = "add_success") -> None:
assert n > 0
old_handled = self.handled
self.handled += n
Expand All @@ -80,34 +100,34 @@ def add_success(self, n=1, *, info="add_success"):
logger.info("%s %r -> %r", info, n, self)
assert not self.final

def download_success(self, size):
def download_success(self, size: int) -> None:
self.add_success(size, info="download_success")

def upload_success(self, hexdigest):
def upload_success(self, hexdigest: str) -> None:
self.add_success(info=f"upload_success {hexdigest}")

def upload_missing(self, hexdigest):
def upload_missing(self, hexdigest: str) -> None:
self.add_fail(info=f"upload_missing {hexdigest}")

def upload_failure(self, hexdigest):
def upload_failure(self, hexdigest: str) -> None:
self.add_fail(info=f"upload_failure {hexdigest}")

def done(self):
def done(self) -> None:
assert self.total is not None and self.handled <= self.total
assert not self.final
self.final = True
logger.info("done %r", self)

@property
def finished_successfully(self):
def finished_successfully(self) -> bool:
return self.final and not self.failed and self.handled == self.total

@property
def finished_failed(self):
def finished_failed(self) -> bool:
return self.final and not self.finished_successfully

@classmethod
def merge(cls, progresses):
def merge(cls, progresses: Iterable[Self]) -> Self:
p = cls()
for progress in progresses:
p.handled += progress.handled
Expand Down
19 changes: 9 additions & 10 deletions astacus/node/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .clear import ClearOp
from .download import DownloadOp
from .node import Node
from .snapshot import ReleaseOp, SnapshotOp, UploadOp
from .snapshot_op import ReleaseOp, SnapshotOp, UploadOp
from .state import node_state, NodeState
from astacus.common import ipc
from astacus.common.magic import StrEnum
Expand Down Expand Up @@ -126,9 +126,8 @@ def delta_snapshot_result(*, op_id: int, n: Node = Depends()):
def upload(req: ipc.SnapshotUploadRequestV20221129, n: Node = Depends()):
if not n.state.is_locked:
raise HTTPException(status_code=409, detail="Not locked")
snapshotter = n.get_snapshotter()
assert snapshotter
return UploadOp(n=n, op_id=n.allocate_op_id(), stats=n.stats, req=req).start(snapshotter)
snapshot_ = n.get_or_create_snapshot()
return UploadOp(n=n, op_id=n.allocate_op_id(), stats=n.stats, req=req).start(snapshot_)


@router.get("/upload/{op_id}")
Expand All @@ -141,9 +140,8 @@ def upload_result(*, op_id: int, n: Node = Depends()):
def delta_upload(req: ipc.SnapshotUploadRequestV20221129, n: Node = Depends()):
if not n.state.is_locked:
raise HTTPException(status_code=409, detail="Not locked")
snapshotter = n.get_delta_snapshotter()
assert snapshotter
return UploadOp(n=n, op_id=n.allocate_op_id(), stats=n.stats, req=req).start(snapshotter)
snapshot_ = n.get_or_create_delta_snapshot()
return UploadOp(n=n, op_id=n.allocate_op_id(), stats=n.stats, req=req).start(snapshot_)


@router.get("/delta/upload/{op_id}")
Expand All @@ -156,7 +154,8 @@ def delta_upload_result(*, op_id: int, n: Node = Depends()):
def release(req: ipc.SnapshotReleaseRequest, n: Node = Depends()):
if not n.state.is_locked:
raise HTTPException(status_code=409, detail="Not locked")
snapshotter = n.get_snapshotter()
# Groups not needed here.
snapshotter = n.get_snapshotter(groups=[])
assert snapshotter
return ReleaseOp(n=n, op_id=n.allocate_op_id(), stats=n.stats, req=req).start(snapshotter)

Expand Down Expand Up @@ -274,9 +273,9 @@ def groups_from_snapshot_req(req: SnapshotReq) -> Sequence[SnapshotGroup]:

def snapshotter_from_snapshot_req(req: SnapshotReq, n: Node) -> Snapshotter:
groups = groups_from_snapshot_req(req)
return n.get_or_create_snapshotter(groups)
return n.get_snapshotter(groups)


def delta_snapshotter_from_snapshot_req(req: SnapshotReq, n: Node) -> Snapshotter:
groups = groups_from_snapshot_req(req)
return n.get_or_create_delta_snapshotter(groups)
return n.get_delta_snapshotter(groups)
15 changes: 6 additions & 9 deletions astacus/node/clear.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from astacus.common.progress import Progress
from typing import Optional

import contextlib
import logging

logger = logging.getLogger(__name__)
Expand All @@ -28,22 +27,20 @@ def create_result(self) -> ipc.NodeResult:

def start(self, snapshotter: Snapshotter, *, is_snapshot_outdated: bool) -> NodeOp.StartResult:
logger.info("start_clear %r", self.req)
self.is_snaphot_outdated = is_snapshot_outdated
self.snapshotter = snapshotter
return self.start_op(op_name="clear", op=self, fun=self.clear)

def clear(self) -> None:
assert self.snapshotter
# 'snapshotter' is global; ensure we have sole access to it
assert self.snapshotter is not None
with self.snapshotter.lock:
self.check_op_id()
if self.is_snaphot_outdated:
self.snapshotter.snapshot(progress=Progress())
files = set(self.snapshotter.relative_path_to_snapshotfile.keys())
self.snapshotter.perform_snapshot(progress=Progress())
progress = self.result.progress
progress.start(len(files))
for relative_path in files:
progress.start(len(self.snapshotter.snapshot))
for relative_path in self.snapshotter.snapshot.get_all_paths():
absolute_path = self.config.root / relative_path
with contextlib.suppress(FileNotFoundError):
absolute_path.unlink()
absolute_path.unlink(missing_ok=True)
progress.add_success()
progress.done()
1 change: 1 addition & 0 deletions astacus/node/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class NodeConfig(AstacusModel):

# Same as root_link for the delta snapshotter.
delta_root_link: Optional[Path]
db_path: Optional[Path]

# These can be either globally or locally set
object_storage: Optional[RohmuConfig] = None
Expand Down
16 changes: 8 additions & 8 deletions astacus/node/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from typing import Callable, Dict, List, Optional, Sequence

import base64
import contextlib
import getpass
import logging
import os
Expand All @@ -37,17 +36,18 @@ def __init__(
super().__init__(storage=storage)
self.dst = dst
self.snapshotter = snapshotter
self.snapshot = snapshotter.snapshot
self.parallel = parallel
self.copy_dst_owner = copy_dst_owner

def _snapshotfile_already_exists(self, snapshotfile: ipc.SnapshotFile) -> bool:
relative_path = snapshotfile.relative_path
existing_snapshotfile = self.snapshotter.relative_path_to_snapshotfile.get(relative_path)
existing_snapshotfile = self.snapshot.get_file(snapshotfile.relative_path)
return existing_snapshotfile is not None and existing_snapshotfile.equals_excluding_mtime(snapshotfile)

def _download_snapshotfile(self, snapshotfile: ipc.SnapshotFile) -> None:
if self._snapshotfile_already_exists(snapshotfile):
return

relative_path = snapshotfile.relative_path
download_path = self.dst / relative_path
download_path.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -71,6 +71,7 @@ def _download_snapshotfiles_from_storage(self, snapshotfiles: Sequence[ipc.Snaps
def _copy_snapshotfile(self, snapshotfile_src: ipc.SnapshotFile, snapshotfile: ipc.SnapshotFile) -> None:
if self._snapshotfile_already_exists(snapshotfile):
return

src_path = self.dst / snapshotfile_src.relative_path
dst_path = self.dst / snapshotfile.relative_path
dst_path.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -91,7 +92,7 @@ def download_from_storage(
if snapshotfile.hexdigest:
hexdigest_to_snapshotfiles.setdefault(snapshotfile.hexdigest, []).append(snapshotfile)

self.snapshotter.snapshot(progress=Progress())
self.snapshotter.perform_snapshot(progress=Progress())
# TBD: Error checking, what to do if we're told to restore to existing directory?
progress.start(sum(1 + snapshotfile.file_size for snapshotfile in snapshotstate.files))
for snapshotfile in snapshotstate.files:
Expand All @@ -118,10 +119,9 @@ def _cb(*, map_in: Sequence[ipc.SnapshotFile], map_out: Sequence[ipc.SnapshotFil
return

# Delete files that were not supposed to exist
for relative_path in set(self.snapshotter.relative_path_to_snapshotfile.keys()).difference(valid_relative_path_set):
for relative_path in set(self.snapshot.get_all_paths()).difference(valid_relative_path_set):
absolute_path = self.dst / relative_path
with contextlib.suppress(FileNotFoundError):
absolute_path.unlink()
absolute_path.unlink(missing_ok=True)

if self.copy_dst_owner:
# Adjust owner of created files and folders to be like the owner of dst
Expand Down Expand Up @@ -170,7 +170,7 @@ def start(self, snapshotter: Snapshotter) -> NodeOp.StartResult:
return self.start_op(op_name="download", op=self, fun=self.download)

def download(self) -> None:
assert self.snapshotter
assert self.snapshotter is not None
# Actual 'restore from backup'
manifest = ipc.BackupManifest.parse_obj(self.storage.download_json(self.req.backup_name))
snapshotstate = manifest.snapshot_results[self.req.snapshot_index].state
Expand Down
Loading

0 comments on commit 4ea6911

Please sign in to comment.