Skip to content

Commit

Permalink
streaming_data_service: improvements
Browse files Browse the repository at this point in the history
* The BMC will restore USB and power state of the given node even if a
  transfer was aborted prematurely.
* The multi-part HTTP handler cancels the node flash process if a partial
  byte-stream was received.
  • Loading branch information
svenrademakers committed Nov 30, 2023
1 parent 1a2c646 commit 086fdf7
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 27 deletions.
17 changes: 15 additions & 2 deletions src/api/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use actix_web::{get, post, web, HttpResponse, Responder};
use anyhow::Context;
use async_compression::tokio::bufread::GzipEncoder;
use async_compression::Level;
use humansize::{format_size, DECIMAL};
use serde_json::json;
use std::collections::HashMap;
use std::ops::Deref;
Expand Down Expand Up @@ -568,7 +569,7 @@ async fn handle_flash_request(
DataTransfer::remote(PathBuf::from(&file), size, 16)
};

let transfer = InitializeTransfer::new(process_name, upgrade_command, data_transfer);
let transfer = InitializeTransfer::new(process_name, upgrade_command, data_transfer, true);
let handle = ss.request_transfer(transfer.try_into()?).await?;
let json = json!({"handle": handle});
Ok(json.to_string())
Expand All @@ -586,12 +587,14 @@ async fn handle_file_upload(
ss: web::Data<StreamingDataService>,
mut payload: Multipart,
) -> impl Responder {
let sender = ss.take_sender(*handle).await?;
let (sender, size) = ss.take_sender(*handle).await?;
let Some(Ok(mut field)) = payload.next().await else {
return Err(LegacyResponse::bad_request("Multipart form invalid"));
};

let mut bytes_send: u64 = 0;
while let Some(Ok(chunk)) = field.next().await {
let length = chunk.len();
if sender.send(chunk).await.is_err() {
// when the channel gets dropped, give the worker some time to shutdown so that the
// actual error message can be bubbled up.
Expand All @@ -603,6 +606,16 @@ async fn handle_file_upload(
)
.into());
}

bytes_send += length as u64;
}

if bytes_send != size {
ss.cancel_all().await;
return Err(LegacyResponse::bad_request(format!(
"missing {} bytes",
format_size(size - bytes_send, DECIMAL)
)));
}

Ok(Null)
Expand Down
5 changes: 5 additions & 0 deletions src/app/transfer_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,21 @@ pub struct InitializeTransfer {
transfer_name: String,
data_transfer: DataTransfer,
upgrade_command: UpgradeCommand,
do_crc_validation: bool,
}

impl InitializeTransfer {
pub fn new(
transfer_name: String,
upgrade_command: UpgradeCommand,
data_transfer: DataTransfer,
do_crc_validation: bool,
) -> Self {
Self {
transfer_name,
data_transfer,
upgrade_command,
do_crc_validation,
}
}
}
Expand All @@ -52,6 +55,8 @@ impl TryInto<TransferRequest> for InitializeTransfer {
let cancel_child = cancel.child_token();
let (written_sender, written_receiver) = watch::channel(0u64);
let worker = self.upgrade_command.run(UpgradeWorker::new(
None,
self.do_crc_validation,
self.data_transfer,
cancel_child,
written_sender,
Expand Down
82 changes: 64 additions & 18 deletions src/app/upgrade_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
use anyhow::bail;
use core::time::Duration;
use crc::{Crc, CRC_64_REDIS};
use humansize::DECIMAL;
use humansize::{format_size, DECIMAL};
use std::io::{Error, ErrorKind};
use std::path::PathBuf;
use std::process::Command;
Expand All @@ -48,18 +48,24 @@ const BLOCK_READ_SIZE: usize = 524288; // 512Kib
// Contains collection of functions that execute some business flow in relation
// to file transfers in the BMC. See `flash_node` and `os_update`.
pub struct UpgradeWorker {
crc: Option<u64>,
do_crc_validation: bool,
data_transfer: DataTransfer,
cancel: CancellationToken,
written_sender: watch::Sender<u64>,
}

impl UpgradeWorker {
pub fn new(
crc: Option<u64>,
do_crc_validation: bool,
data_transfer: DataTransfer,
cancel: CancellationToken,
written_sender: watch::Sender<u64>,
) -> Self {
Self {
crc,
do_crc_validation,
data_transfer,
cancel,
written_sender,
Expand All @@ -71,44 +77,84 @@ impl UpgradeWorker {
bmc: Arc<BmcApplication>,
node: NodeId,
) -> anyhow::Result<()> {
let source = self.data_transfer.reader().await?;
let device = bmc
.configure_node_for_fwupgrade(node, UsbRoute::Bmc, SUPPORTED_DEVICES.keys())
.await?;
let mut buf_stream = BufStream::with_capacity(BLOCK_READ_SIZE, BLOCK_WRITE_SIZE, device);

let result = async move {
let reader = self.data_transfer.reader().await?;
let mut buf_stream =
BufStream::with_capacity(BLOCK_READ_SIZE, BLOCK_WRITE_SIZE, device);
let (bytes_written, written_crc) =
self.try_write_node(node, reader, &mut buf_stream).await?;

if self.do_crc_validation {
buf_stream.seek(std::io::SeekFrom::Start(0)).await?;
flush_file_caches().await?;
self.try_validate_crc(
node,
self.crc.unwrap_or(written_crc),
buf_stream.take(bytes_written),
)
.await?;
}

Ok::<(), anyhow::Error>(())
}
.await;

if let Ok(()) = result {
log::info!("Flashing {node} successful, restoring USB & power settings.");
}

// disregarding the result, set the BMC in the finalized state.
bmc.activate_slot(node.to_inverse_bitfield(), node.to_bitfield())
.await?;
bmc.usb_boot(node, false).await?;
bmc.configure_usb(bmc.get_usb_mode().await).await?;
result
}

async fn try_write_node(
&mut self,
node: NodeId,
source_reader: impl AsyncRead + 'static + Unpin,
mut node_writer: &mut (impl AsyncWrite + 'static + Unpin),
) -> anyhow::Result<(u64, u64)> {
log::info!("started writing to {node}");
let crc = Crc::<u64>::new(&CRC_64_REDIS);
let mut write_watcher = WriteMonitor::new(&mut buf_stream, &mut self.written_sender, &crc);
let bytes_written = copy_or_cancel(source, &mut write_watcher, &self.cancel).await?;
let img_checksum = write_watcher.crc();

log::info!("Verifying checksum of written data to {node}");
buf_stream.seek(std::io::SeekFrom::Start(0)).await?;
flush_file_caches().await?;
let mut write_watcher = WriteMonitor::new(&mut node_writer, &mut self.written_sender, &crc);
let bytes_written = copy_or_cancel(source_reader, &mut write_watcher, &self.cancel).await?;
log::info!("Wrote {}", format_size(bytes_written, DECIMAL));
Ok((bytes_written, write_watcher.crc()))
}

async fn try_validate_crc(
&mut self,
node: NodeId,
expected_crc: u64,
node_reader: impl AsyncRead + 'static + Unpin,
) -> anyhow::Result<()> {
log::info!("Verifying checksum of data on node {node}");
let (mut sender, receiver) = watch::channel(0u64);
tokio::spawn(progress_printer(receiver));

let crc = Crc::<u64>::new(&CRC_64_REDIS);
let mut sink = WriteMonitor::new(sink(), &mut sender, &crc);
copy_or_cancel(&mut buf_stream.take(bytes_written), &mut sink, &self.cancel).await?;
copy_or_cancel(node_reader, &mut sink, &self.cancel).await?;
let dev_checksum = sink.crc();

if img_checksum != dev_checksum {
if expected_crc != dev_checksum {
log::error!(
"Source and destination checksum mismatch: {:#x} != {:#x}",
img_checksum,
expected_crc,
dev_checksum
);

bail!(FwUpdateError::ChecksumMismatch)
}

log::info!("Flashing {node} successful, restoring USB & power settings.");
bmc.activate_slot(node.to_inverse_bitfield(), node.to_bitfield())
.await?;
bmc.usb_boot(node, false).await?;
bmc.configure_usb(bmc.get_usb_mode().await).await
Ok(())
}

pub async fn os_update(mut self) -> anyhow::Result<()> {
Expand Down
11 changes: 8 additions & 3 deletions src/streaming_data_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ impl StreamingDataService {
/// unknown
/// * 'Err(StreamingServiceError::SenderTaken(_)'
/// * Ok(()) on success
pub async fn take_sender(&self, id: u32) -> Result<mpsc::Sender<Bytes>, StreamingServiceError> {
pub async fn take_sender(
&self,
id: u32,
) -> Result<(mpsc::Sender<Bytes>, u64), StreamingServiceError> {
let mut status = self.status.lock().await;
let StreamingState::Transferring(ref mut context) = *status else {
return Err(StreamingServiceError::WrongState(
Expand All @@ -184,10 +187,12 @@ impl StreamingDataService {
return Err(StreamingServiceError::HandlesDoNotMatch);
}

context
let sender = context
.data_sender
.take()
.ok_or(StreamingServiceError::SenderTaken)
.ok_or(StreamingServiceError::SenderTaken)?;

Ok((sender, context.size))
}

/// Return a borrow to the current status of the flash service
Expand Down
10 changes: 6 additions & 4 deletions src/streaming_data_service/data_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub enum DataTransfer {
file_name: PathBuf,
size: u64,
sender: Option<mpsc::Sender<bytes::Bytes>>,
receiver: mpsc::Receiver<bytes::Bytes>,
receiver: Option<mpsc::Receiver<bytes::Bytes>>,
},
}

Expand All @@ -55,7 +55,7 @@ impl DataTransfer {
file_name,
size,
sender: Some(sender),
receiver,
receiver: Some(receiver),
}
}
}
Expand Down Expand Up @@ -93,7 +93,7 @@ impl DataTransfer {
}
}

pub async fn reader(self) -> anyhow::Result<impl AsyncRead + Sync + Send + Unpin> {
pub async fn reader(&mut self) -> anyhow::Result<impl AsyncRead + Sync + Send + Unpin> {
match self {
DataTransfer::Local { path } => {
let file = OpenOptions::new()
Expand All @@ -110,7 +110,9 @@ impl DataTransfer {
sender: _,
receiver,
} => {
let stream = ReceiverStream::new(receiver).map(Ok::<bytes::Bytes, io::Error>);
let stream =
ReceiverStream::new(receiver.take().expect("cannot take reader twice"))
.map(Ok::<bytes::Bytes, io::Error>);
Ok(with_xz_support(&file_name, StreamReader::new(stream)))

Check failure on line 116 in src/streaming_data_service/data_transfer.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

error: this expression creates a reference which is immediately dereferenced by the compiler --> src/streaming_data_service/data_transfer.rs:116:36 | 116 | Ok(with_xz_support(&file_name, StreamReader::new(stream))) | ^^^^^^^^^^ help: change this to: `file_name` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow
}
}
Expand Down

0 comments on commit 086fdf7

Please sign in to comment.