Skip to content

Commit

Permalink
feat: compress backups with gzip
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Oct 25, 2024
1 parent fc2b111 commit 4521ecc
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ anyhow = { workspace = true }
async-broadcast = "0.7.1"
async-channel = { workspace = true }
async-imap = { version = "0.10.2", default-features = false, features = ["runtime-tokio", "compress"] }
async-compression = { version = "0.4.15", default-features = false, features = ["tokio", "gzip"] }
async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] }
async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] }
async_zip = { version = "0.0.17", default-features = false, features = ["deflate", "tokio-fs"] }
Expand Down
44 changes: 25 additions & 19 deletions src/imex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,30 +269,24 @@ async fn import_backup(
context.get_dbfile().display()
);

import_backup_stream(context, backup_file, file_size, passphrase).await?;
let backup_file = ProgressReader::new(backup_file, context.clone(), file_size);
if backup_to_import.extension() == Some(OsStr::new("gz")) {
let backup_file = tokio::io::BufReader::new(backup_file);
let backup_file = async_compression::tokio::bufread::GzipDecoder::new(backup_file);
import_backup_stream(context, backup_file, passphrase).await?;
} else {
import_backup_stream(context, backup_file, passphrase).await?;
}
Ok(())
}

/// Imports backup by reading a tar file from a stream.
///
/// `file_size` is used to calculate the progress
/// and emit progress events.
/// Ideally it is the sum of the entry
/// sizes without the header overhead,
/// but can be estimated as tar file size
/// in which case the progress is underestimated
/// and may not reach 99.9% by the end of import.
/// Underestimating is better than
/// overestimating because the progress
/// jumps to 100% instead of getting stuck at 99.9%
/// for some time.
pub(crate) async fn import_backup_stream<R: tokio::io::AsyncRead + Unpin>(
context: &Context,
backup_file: R,
file_size: u64,
passphrase: String,
) -> Result<()> {
import_backup_stream_inner(context, backup_file, file_size, passphrase)
import_backup_stream_inner(context, backup_file, passphrase)
.await
.0
}
Expand All @@ -319,6 +313,19 @@ struct ProgressReader<R> {
}

impl<R> ProgressReader<R> {
/// Creates a new `ProgressReader`.
///
/// `file_size` is used to calculate the progress
/// and emit progress events.
/// Ideally it is the sum of the entry
/// sizes without the header overhead,
/// but can be estimated as tar file size
/// in which case the progress is underestimated
/// and may not reach 99.9% by the end of import.
/// Underestimating is better than
/// overestimating because the progress
/// jumps to 100% instead of getting stuck at 99.9%
/// for some time.
fn new(r: R, context: Context, file_size: u64) -> Self {
Self {
inner: r,
Expand Down Expand Up @@ -358,10 +365,8 @@ where
async fn import_backup_stream_inner<R: tokio::io::AsyncRead + Unpin>(
context: &Context,
backup_file: R,
file_size: u64,
passphrase: String,
) -> (Result<()>,) {
let backup_file = ProgressReader::new(backup_file, context.clone(), file_size);
let mut archive = Archive::new(backup_file);

let mut entries = match archive.entries() {
Expand Down Expand Up @@ -461,10 +466,10 @@ fn get_next_backup_path(
tempdbfile.push(format!("{stem}-{i:02}-{addr}.db"));

let mut tempfile = folder.clone();
tempfile.push(format!("{stem}-{i:02}-{addr}.tar.part"));
tempfile.push(format!("{stem}-{i:02}-{addr}.tar.gz.part"));

let mut destfile = folder.clone();
destfile.push(format!("{stem}-{i:02}-{addr}.tar"));
destfile.push(format!("{stem}-{i:02}-{addr}.tar.gz"));

if !tempdbfile.exists() && !tempfile.exists() && !destfile.exists() {
return Ok((tempdbfile, tempfile, destfile));
Expand Down Expand Up @@ -504,6 +509,7 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
file_size += blob.to_abs_path().metadata()?.len()
}

let file = async_compression::tokio::write::GzipEncoder::new(file);
export_backup_stream(context, &temp_db_path, blobdir, file, file_size)
.await
.context("Exporting backup to file failed")?;
Expand Down
5 changes: 3 additions & 2 deletions src/imex/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tokio_util::sync::CancellationToken;

use crate::chat::add_device_msg;
use crate::context::Context;
use crate::imex::BlobDirContents;
use crate::imex::{BlobDirContents, ProgressReader};
use crate::message::{Message, Viewtype};
use crate::qr::Qr;
use crate::stock_str::backup_transfer_msg_body;
Expand Down Expand Up @@ -310,7 +310,8 @@ pub async fn get_backup2(
let mut file_size_buf = [0u8; 8];
recv_stream.read_exact(&mut file_size_buf).await?;
let file_size = u64::from_be_bytes(file_size_buf);
import_backup_stream(context, recv_stream, file_size, passphrase)
let recv_stream = ProgressReader::new(recv_stream, context.clone(), file_size);
import_backup_stream(context, recv_stream, passphrase)
.await
.context("Failed to import backup from QUIC stream")?;
info!(context, "Finished importing backup from the stream.");
Expand Down

0 comments on commit 4521ecc

Please sign in to comment.