Skip to content

Commit

Permalink
tracker/snapshot_file_content: avoid useless uploads after desync
Browse files Browse the repository at this point in the history
When the replicating (writer) Verneuil VFS notices that there is
spooled metadata, but that it doesn't match the sqlite file, we
currently always copy everything from scratch.  This makes sense
for long-lived processes, but isn't great (still correct, just
wasteful!) for short-lived ones that run between regular writes
to the sqlite db.

This commit notices when we have a valid spooled manifest, and
it doesn't match the file on disk.  We then force a full scan of
the sqlite file, but still avoid uploading any chunk that happens
to match what was in the manifest.
  • Loading branch information
pkhuong committed Oct 5, 2024
1 parent 8deba19 commit d4380b6
Showing 1 changed file with 41 additions and 23 deletions.
64 changes: 41 additions & 23 deletions src/tracker/snapshot_file_contents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,10 @@ impl Tracker {
enum CurrentManifest {
// Don't generate a new snapshot at all.
UpToDate,
// Snapshot from scratch.
Desync,
FromScratch,
// Snapshot from scratch, but the previous manifest
// had some chunk fingerprints that could still match.
Desync(Manifest, Option<Arc<Chunk>>),
// Use the previous manifest as an initial snapshot.
Initial(Manifest, Option<Arc<Chunk>>),
}
Expand All @@ -262,12 +264,14 @@ enum CurrentManifest {
impl Tracker {
/// Loads the current manifest and figures out what to do with it.
fn judge_current_manifest(&self, version_id: &[u8]) -> CurrentManifest {
let mut current_manifest: Option<(Manifest, Option<Arc<Chunk>>)> = self
let current_manifest: Option<(Manifest, Option<Arc<Chunk>>)> = self
.read_current_manifest()
.map_err(|e| chain_info!(e, "failed to read staged manifest file"))
.ok()
.flatten();

let desync: bool;

// If we're snapshotting after a write, we always want to go
// through the whole process. We made some changes, let's
// guarantee we try and publish them. That's important because,
Expand Down Expand Up @@ -326,10 +330,8 @@ impl Tracker {
up_to_date = false;
}

// If the manifest isn't up to date, we can't use it.
if !up_to_date {
current_manifest = None;
}
// If the manifest isn't up to date, remember that.
desync = !up_to_date;
} else {
// If we're doing this opportunistically (not after a write)
// and the staging manifest seems up to date, there's nothing
Expand All @@ -344,14 +346,20 @@ impl Tracker {
}

// If we think there's work to do after a read
// transaction, assume the worst, and rebuild
// the snapshot from scratch.
current_manifest = None;
// transaction, assume the worst, and rescan
// the whole file
desync = true;
}

match current_manifest {
None => CurrentManifest::Desync,
Some((manifest, base)) => CurrentManifest::Initial(manifest, base),
None => CurrentManifest::FromScratch,
Some((manifest, base)) => {
if desync {
CurrentManifest::Desync(manifest, base)
} else {
CurrentManifest::Initial(manifest, base)
}
}
}
}

Expand All @@ -363,6 +371,7 @@ impl Tracker {
fn snapshot_chunks(
&self,
base: Option<Vec<Fingerprint>>,
force_refresh: bool,
) -> Result<(u64, Vec<Fingerprint>, Vec<BundledChunk>, usize)> {
use rand::Rng;
let mut rng = rand::thread_rng();
Expand Down Expand Up @@ -402,12 +411,18 @@ impl Tracker {
.is_some();
let delta = (grown || wrote_past_end) as u64;

// We definitely don't know anything about what's at or
// after chunk index `fprints.len()`. We also don't
// want to go out of bounds if the new file shrunk.
backfill_begin = (fprints.len() as u64)
.clamp(0, num_chunks)
.saturating_sub(delta);
if force_refresh {
// Assume all the chunks in the manifests exist, but confirm
// that they match what we want.
backfill_begin = 0;
} else {
// We definitely don't know anything about what's at or
// after chunk index `fprints.len()`. We also don't
// want to go out of bounds if the new file shrunk.
backfill_begin = (fprints.len() as u64)
.clamp(0, num_chunks)
.saturating_sub(delta);
}
fprints
} else {
backfill_begin = 0;
Expand Down Expand Up @@ -551,6 +566,7 @@ impl Tracker {
header_fprint: Fingerprint,
version_id: Vec<u8>,
current_manifest: Option<(Manifest, Option<Arc<Chunk>>)>,
force_refresh: bool,
) -> Result<(usize, Vec<Fingerprint>, Option<Fingerprint>)> {
use std::os::unix::fs::MetadataExt;

Expand Down Expand Up @@ -585,7 +601,8 @@ impl Tracker {
// Try to get an initial list of chunks to work off.
let base_fprints = Self::base_chunk_fprints(current_manifest.as_ref().map(|x| &x.0));

let (len, mut chunks, bundled_chunks, mut copied) = self.snapshot_chunks(base_fprints)?;
let (len, mut chunks, bundled_chunks, mut copied) =
self.snapshot_chunks(base_fprints, force_refresh)?;

let (ctime, ctime_ns) = match self.file.metadata() {
Ok(meta) => (meta.ctime(), meta.ctime_nsec() as i32),
Expand Down Expand Up @@ -724,11 +741,12 @@ impl Tracker {
e => chain_warn!(e, "failed to force populate version xattr", path=?self.path));
}

let current_manifest: Option<(Manifest, Option<Arc<Chunk>>)> =
let (current_manifest, force_refresh): (Option<(Manifest, Option<Arc<Chunk>>)>, bool) =
match self.judge_current_manifest(&version_id) {
CurrentManifest::UpToDate => return Ok(()),
CurrentManifest::Desync => None,
CurrentManifest::Initial(manifest, base) => Some((manifest, base)),
CurrentManifest::FromScratch => (None, true),
CurrentManifest::Desync(manifest, base) => (Some((manifest, base)), true),
CurrentManifest::Initial(manifest, base) => (Some((manifest, base)), false),
};

// We don't *have* to overwrite the .metadata file, but we
Expand All @@ -740,7 +758,7 @@ impl Tracker {
// Publish an updated snapshot, and remember the chunks we
// care about.
let (copied, chunks, base_chunk) =
self.stage_new_snapshot(header_fprint, version_id, current_manifest)?;
self.stage_new_snapshot(header_fprint, version_id, current_manifest, force_refresh)?;

let published = self.maybe_update_ready_buffer(&chunks)?;

Expand Down

0 comments on commit d4380b6

Please sign in to comment.