From 5f99ed1513c6a2e794615bf71cb9bb69d098b72e Mon Sep 17 00:00:00 2001 From: Sven Rademakers Date: Tue, 14 Nov 2023 22:34:51 +0000 Subject: [PATCH] change read interface back to AsyncRead It maps better to the decompression interfaces. This commit functions as a prepare step. --- Cargo.lock | 3 +-- Cargo.toml | 3 +++ src/api/legacy.rs | 2 +- src/app/upgrade_worker.rs | 23 ++++++++++----------- src/streaming_data_service/data_transfer.rs | 15 ++++++-------- 5 files changed, 22 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e81d111..df4c49b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,8 +416,7 @@ checksum = "619743e34b5ba4e9703bba34deac3427c72507c7159f5fd030aea8cac0cfe341" [[package]] name = "async-compression" version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f658e2baef915ba0f26f1f7c42bfb8e12f532a01f449a090ded75ae7a07e9ba2" +source = "git+https://github.com/svenrademakers/async-compression.git#7d93375d56e63d9137df02e1711dbc3a2af1a86a" dependencies = [ "flate2", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 0798bd6..38a0cac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,3 +69,6 @@ strip = true vendored = ["openssl/vendored"] stubbed = [] +[patch.crates-io] +async-compression = { git = 'https://github.com/svenrademakers/async-compression.git' } + diff --git a/src/api/legacy.rs b/src/api/legacy.rs index 7255ec7..55cb287 100644 --- a/src/api/legacy.rs +++ b/src/api/legacy.rs @@ -565,7 +565,7 @@ async fn handle_flash_request( let size = u64::from_str(size) .map_err(|_| LegacyResponse::bad_request("`length` parameter is not a number"))?; - DataTransfer::remote(PathBuf::from(&file), size, 256) + DataTransfer::remote(PathBuf::from(&file), size, 16) }; let transfer = InitializeTransfer::new(process_name, upgrade_command, data_transfer); diff --git a/src/app/upgrade_worker.rs b/src/app/upgrade_worker.rs index 88fb1bb..0abbbf4 100644 --- a/src/app/upgrade_worker.rs +++ b/src/app/upgrade_worker.rs @@ -32,7 +32,7 @@ use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; use tokio::io::AsyncSeekExt; use tokio::io::BufStream; -use tokio::io::{sink, AsyncBufRead}; +use tokio::io::{sink, AsyncRead}; use tokio::sync::watch; use tokio::task::spawn_blocking; use tokio::time::sleep; @@ -43,7 +43,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; const TMP_UPGRADE_DIR: &str = "/tmp/os_upgrade"; -const BLOCK_WRITE_SIZE: usize = 4194304; // 4Mib +const BLOCK_WRITE_SIZE: usize = BLOCK_READ_SIZE; // 4Mib const BLOCK_READ_SIZE: usize = 524288; // 512Kib // Contains collection of functions that execute some business flow in relation @@ -74,7 +74,7 @@ impl UpgradeWorker { ) -> anyhow::Result<()> { log::info!("Start OS install"); let size = self.data_transfer.size()?; - let source = self.data_transfer.buf_reader().await?; + let source = self.data_transfer.reader().await?; let device = bmc .configure_node_for_fwupgrade(node, UsbRoute::Bmc, SUPPORTED_DEVICES.keys()) .await?; @@ -83,7 +83,7 @@ impl UpgradeWorker { log::info!("started writing to {node}"); let crc = Crc::::new(&CRC_64_REDIS); let mut write_watcher = WriteMonitor::new(&mut buf_stream, &mut self.written_sender, &crc); - pedantic_buf_copy(source, &mut write_watcher, size, &self.cancel).await?; + pedantic_copy(source, &mut write_watcher, size, &self.cancel).await?; let img_checksum = write_watcher.crc(); log::info!("Verifying checksum of written data to {node}"); @@ -94,7 +94,7 @@ impl UpgradeWorker { tokio::spawn(progress_printer(receiver)); let mut write_watcher = WriteMonitor::new(sink(), &mut sender, &crc); - pedantic_buf_copy( + pedantic_copy( &mut buf_stream.take(size), &mut write_watcher, size, @@ -124,7 +124,7 @@ impl UpgradeWorker { log::info!("start os update"); let size = self.data_transfer.size()?; let file_name = self.data_transfer.file_name()?.to_owned(); - let source = self.data_transfer.buf_reader().await?; + let source = self.data_transfer.reader().await?; let mut os_update_img = PathBuf::from(TMP_UPGRADE_DIR); os_update_img.push(&file_name); @@ -140,7 +140,7 @@ impl UpgradeWorker { let crc = Crc::::new(&CRC_64_REDIS); let mut writer = WriteMonitor::new(&mut file, &mut self.written_sender, &crc); - pedantic_buf_copy(source, &mut writer, size, &self.cancel).await?; + pedantic_copy(source, &mut writer, size, &self.cancel).await?; log::info!("crc os_update image: {}.", writer.crc()); let result = spawn_blocking(move || { @@ -164,17 +164,17 @@ impl UpgradeWorker { /// Copies bytes from `reader` to `writer` until the reader is exhausted. This function /// returns an `io::Error(Interrupted)` in case a cancel was issued. -async fn pedantic_buf_copy( +async fn pedantic_copy( mut reader: L, mut writer: &mut W, size: u64, cancel: &CancellationToken, ) -> std::io::Result<()> where - L: AsyncBufRead + std::marker::Unpin, + L: AsyncRead + std::marker::Unpin, W: AsyncWrite + std::marker::Unpin, { - let copy_task = tokio::io::copy_buf(&mut reader, &mut writer); + let copy_task = tokio::io::copy(&mut reader, &mut writer); let cancel = cancel.cancelled(); let bytes_copied; @@ -195,8 +195,7 @@ fn validate_size(len: u64, total_size: u64) -> std::io::Result<()> { ErrorKind::UnexpectedEof, format!("missing {} bytes", format_size(total_size - len, DECIMAL)), )), - Ordering::Greater => panic!("reads are capped to self.size"), - Ordering::Equal => Ok(()), + Ordering::Greater | Ordering::Equal => Ok(()), } } diff --git a/src/streaming_data_service/data_transfer.rs b/src/streaming_data_service/data_transfer.rs index d9762e9..3d4864f 100644 --- a/src/streaming_data_service/data_transfer.rs +++ b/src/streaming_data_service/data_transfer.rs @@ -20,7 +20,7 @@ use std::io::Seek; use std::path::Path; use std::{io::ErrorKind, path::PathBuf}; use tokio::fs::OpenOptions; -use tokio::io::AsyncBufRead; +use tokio::io::AsyncRead; use tokio::io::BufReader; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -90,13 +90,13 @@ impl DataTransfer { } } - pub async fn buf_reader(self) -> anyhow::Result { + pub async fn reader(self) -> anyhow::Result { match self { DataTransfer::Local { path } => OpenOptions::new() .read(true) .open(&path) .await - .map(|x| Box::new(BufReader::new(x)) as Box) + .map(|x| Box::new(x) as Box) .with_context(|| path.to_string_lossy().to_string()), DataTransfer::Remote { file_name, @@ -108,13 +108,10 @@ impl DataTransfer { let buf_reader = StreamReader::new(stream); if is_lzma_file(&file_name) { log::info!("enabled lzma decoder for {}", file_name.to_string_lossy()); - //let decoder = XzDecoder::new(buf_reader); - Ok( - Box::new(BufReader::with_capacity(1024 * 1024 * 4, buf_reader)) - as Box, - ) + let decoder = XzDecoder::with_memlimit(buf_reader, 66 * 1024 * 1024); + Ok(Box::new(decoder) as Box) } else { - Ok(Box::new(buf_reader) as Box) + Ok(Box::new(buf_reader) as Box) } } }