Skip to content

Commit

Permalink
change read interface back to AsyncRead
Browse files Browse the repository at this point in the history
It maps better to the decompression interfaces. This commit functions as
a prepare step.
  • Loading branch information
svenrademakers committed Nov 14, 2023
1 parent b44b4cb commit 5f99ed1
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 24 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,6 @@ strip = true
vendored = ["openssl/vendored"]
stubbed = []

[patch.crates-io]
async-compression = { git = 'https://github.com/svenrademakers/async-compression.git' }

2 changes: 1 addition & 1 deletion src/api/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ async fn handle_flash_request(

let size = u64::from_str(size)
.map_err(|_| LegacyResponse::bad_request("`length` parameter is not a number"))?;
DataTransfer::remote(PathBuf::from(&file), size, 256)
DataTransfer::remote(PathBuf::from(&file), size, 16)
};

let transfer = InitializeTransfer::new(process_name, upgrade_command, data_transfer);
Expand Down
23 changes: 11 additions & 12 deletions src/app/upgrade_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio::io::BufStream;
use tokio::io::{sink, AsyncBufRead};
use tokio::io::{sink, AsyncRead};
use tokio::sync::watch;
use tokio::task::spawn_blocking;
use tokio::time::sleep;
Expand All @@ -43,7 +43,7 @@ use tokio::{
use tokio_util::sync::CancellationToken;

const TMP_UPGRADE_DIR: &str = "/tmp/os_upgrade";
const BLOCK_WRITE_SIZE: usize = 4194304; // 4Mib
const BLOCK_WRITE_SIZE: usize = BLOCK_READ_SIZE; // 4Mib
const BLOCK_READ_SIZE: usize = 524288; // 512Kib

// Contains collection of functions that execute some business flow in relation
Expand Down Expand Up @@ -74,7 +74,7 @@ impl UpgradeWorker {
) -> anyhow::Result<()> {
log::info!("Start OS install");
let size = self.data_transfer.size()?;
let source = self.data_transfer.buf_reader().await?;
let source = self.data_transfer.reader().await?;
let device = bmc
.configure_node_for_fwupgrade(node, UsbRoute::Bmc, SUPPORTED_DEVICES.keys())
.await?;
Expand All @@ -83,7 +83,7 @@ impl UpgradeWorker {
log::info!("started writing to {node}");
let crc = Crc::<u64>::new(&CRC_64_REDIS);
let mut write_watcher = WriteMonitor::new(&mut buf_stream, &mut self.written_sender, &crc);
pedantic_buf_copy(source, &mut write_watcher, size, &self.cancel).await?;
pedantic_copy(source, &mut write_watcher, size, &self.cancel).await?;
let img_checksum = write_watcher.crc();

log::info!("Verifying checksum of written data to {node}");
Expand All @@ -94,7 +94,7 @@ impl UpgradeWorker {
tokio::spawn(progress_printer(receiver));

let mut write_watcher = WriteMonitor::new(sink(), &mut sender, &crc);
pedantic_buf_copy(
pedantic_copy(
&mut buf_stream.take(size),
&mut write_watcher,
size,
Expand Down Expand Up @@ -124,7 +124,7 @@ impl UpgradeWorker {
log::info!("start os update");
let size = self.data_transfer.size()?;
let file_name = self.data_transfer.file_name()?.to_owned();
let source = self.data_transfer.buf_reader().await?;
let source = self.data_transfer.reader().await?;

let mut os_update_img = PathBuf::from(TMP_UPGRADE_DIR);
os_update_img.push(&file_name);
Expand All @@ -140,7 +140,7 @@ impl UpgradeWorker {

let crc = Crc::<u64>::new(&CRC_64_REDIS);
let mut writer = WriteMonitor::new(&mut file, &mut self.written_sender, &crc);
pedantic_buf_copy(source, &mut writer, size, &self.cancel).await?;
pedantic_copy(source, &mut writer, size, &self.cancel).await?;
log::info!("crc os_update image: {}.", writer.crc());

let result = spawn_blocking(move || {
Expand All @@ -164,17 +164,17 @@ impl UpgradeWorker {

/// Copies bytes from `reader` to `writer` until the reader is exhausted. This function
/// returns an `io::Error(Interrupted)` in case a cancel was issued.
async fn pedantic_buf_copy<L, W>(
async fn pedantic_copy<L, W>(
mut reader: L,
mut writer: &mut W,
size: u64,
cancel: &CancellationToken,
) -> std::io::Result<()>
where
L: AsyncBufRead + std::marker::Unpin,
L: AsyncRead + std::marker::Unpin,
W: AsyncWrite + std::marker::Unpin,
{
let copy_task = tokio::io::copy_buf(&mut reader, &mut writer);
let copy_task = tokio::io::copy(&mut reader, &mut writer);
let cancel = cancel.cancelled();

let bytes_copied;
Expand All @@ -195,8 +195,7 @@ fn validate_size(len: u64, total_size: u64) -> std::io::Result<()> {
ErrorKind::UnexpectedEof,
format!("missing {} bytes", format_size(total_size - len, DECIMAL)),
)),
Ordering::Greater => panic!("reads are capped to self.size"),
Ordering::Equal => Ok(()),
Ordering::Greater | Ordering::Equal => Ok(()),
}
}

Expand Down
15 changes: 6 additions & 9 deletions src/streaming_data_service/data_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::io::Seek;
use std::path::Path;
use std::{io::ErrorKind, path::PathBuf};
use tokio::fs::OpenOptions;
use tokio::io::AsyncBufRead;
use tokio::io::AsyncRead;
use tokio::io::BufReader;

Check warning on line 24 in src/streaming_data_service/data_transfer.rs

View workflow job for this annotation

GitHub Actions / cargo-test

unused import: `tokio::io::BufReader`

Check failure on line 24 in src/streaming_data_service/data_transfer.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `tokio::io::BufReader`

error: unused import: `tokio::io::BufReader` --> src/streaming_data_service/data_transfer.rs:24:5 | 24 | use tokio::io::BufReader; | ^^^^^^^^^^^^^^^^^^^^ | = note: `-D unused-imports` implied by `-D warnings`
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -90,13 +90,13 @@ impl DataTransfer {
}
}

pub async fn buf_reader(self) -> anyhow::Result<impl AsyncBufRead + Sync + Send + Unpin> {
pub async fn reader(self) -> anyhow::Result<impl AsyncRead + Sync + Send + Unpin> {
match self {
DataTransfer::Local { path } => OpenOptions::new()
.read(true)
.open(&path)
.await
.map(|x| Box::new(BufReader::new(x)) as Box<dyn AsyncBufRead + Sync + Send + Unpin>)
.map(|x| Box::new(x) as Box<dyn AsyncRead + Sync + Send + Unpin>)
.with_context(|| path.to_string_lossy().to_string()),
DataTransfer::Remote {
file_name,
Expand All @@ -108,13 +108,10 @@ impl DataTransfer {
let buf_reader = StreamReader::new(stream);
if is_lzma_file(&file_name) {
log::info!("enabled lzma decoder for {}", file_name.to_string_lossy());
//let decoder = XzDecoder::new(buf_reader);
Ok(
Box::new(BufReader::with_capacity(1024 * 1024 * 4, buf_reader))
as Box<dyn AsyncBufRead + Sync + Send + Unpin>,
)
let decoder = XzDecoder::with_memlimit(buf_reader, 66 * 1024 * 1024);
Ok(Box::new(decoder) as Box<dyn AsyncRead + Sync + Send + Unpin>)
} else {
Ok(Box::new(buf_reader) as Box<dyn AsyncBufRead + Sync + Send + Unpin>)
Ok(Box::new(buf_reader) as Box<dyn AsyncRead + Sync + Send + Unpin>)
}
}
}
Expand Down

0 comments on commit 5f99ed1

Please sign in to comment.