From 086fdf775920b3323da89f94bebfc2eeda341ac6 Mon Sep 17 00:00:00 2001 From: Sven Rademakers Date: Thu, 30 Nov 2023 17:08:43 +0000 Subject: [PATCH] streaming_data_service: improvements * The BMC will restore USB and power state of the given node even if a transfer was aborted prematurely. * The multi-part HTTP handler cancels the node flash process if a partial byte-stream was received. --- src/api/legacy.rs | 17 ++++- src/app/transfer_action.rs | 5 ++ src/app/upgrade_worker.rs | 82 ++++++++++++++++----- src/streaming_data_service.rs | 11 ++- src/streaming_data_service/data_transfer.rs | 10 ++- 5 files changed, 98 insertions(+), 27 deletions(-) diff --git a/src/api/legacy.rs b/src/api/legacy.rs index 79b94f9..76be053 100644 --- a/src/api/legacy.rs +++ b/src/api/legacy.rs @@ -31,6 +31,7 @@ use actix_web::{get, post, web, HttpResponse, Responder}; use anyhow::Context; use async_compression::tokio::bufread::GzipEncoder; use async_compression::Level; +use humansize::{format_size, DECIMAL}; use serde_json::json; use std::collections::HashMap; use std::ops::Deref; @@ -568,7 +569,7 @@ async fn handle_flash_request( DataTransfer::remote(PathBuf::from(&file), size, 16) }; - let transfer = InitializeTransfer::new(process_name, upgrade_command, data_transfer); + let transfer = InitializeTransfer::new(process_name, upgrade_command, data_transfer, true); let handle = ss.request_transfer(transfer.try_into()?).await?; let json = json!({"handle": handle}); Ok(json.to_string()) @@ -586,12 +587,14 @@ async fn handle_file_upload( ss: web::Data, mut payload: Multipart, ) -> impl Responder { - let sender = ss.take_sender(*handle).await?; + let (sender, size) = ss.take_sender(*handle).await?; let Some(Ok(mut field)) = payload.next().await else { return Err(LegacyResponse::bad_request("Multipart form invalid")); }; + let mut bytes_send: u64 = 0; while let Some(Ok(chunk)) = field.next().await { + let length = chunk.len(); if sender.send(chunk).await.is_err() { // when the channel gets dropped, give the worker some time to shutdown so that the // actual error message can be bubbled up. @@ -603,6 +606,16 @@ async fn handle_file_upload( ) .into()); } + + bytes_send += length as u64; + } + + if bytes_send != size { + ss.cancel_all().await; + return Err(LegacyResponse::bad_request(format!( + "missing {} bytes", + format_size(size - bytes_send, DECIMAL) + ))); } Ok(Null) diff --git a/src/app/transfer_action.rs b/src/app/transfer_action.rs index 731bd2b..71dd571 100644 --- a/src/app/transfer_action.rs +++ b/src/app/transfer_action.rs @@ -26,6 +26,7 @@ pub struct InitializeTransfer { transfer_name: String, data_transfer: DataTransfer, upgrade_command: UpgradeCommand, + do_crc_validation: bool, } impl InitializeTransfer { @@ -33,11 +34,13 @@ impl InitializeTransfer { transfer_name: String, upgrade_command: UpgradeCommand, data_transfer: DataTransfer, + do_crc_validation: bool, ) -> Self { Self { transfer_name, data_transfer, upgrade_command, + do_crc_validation, } } } @@ -52,6 +55,8 @@ impl TryInto for InitializeTransfer { let cancel_child = cancel.child_token(); let (written_sender, written_receiver) = watch::channel(0u64); let worker = self.upgrade_command.run(UpgradeWorker::new( + None, + self.do_crc_validation, self.data_transfer, cancel_child, written_sender, diff --git a/src/app/upgrade_worker.rs b/src/app/upgrade_worker.rs index 2f95470..fa31acc 100644 --- a/src/app/upgrade_worker.rs +++ b/src/app/upgrade_worker.rs @@ -22,7 +22,7 @@ use crate::{ use anyhow::bail; use core::time::Duration; use crc::{Crc, CRC_64_REDIS}; -use humansize::DECIMAL; +use humansize::{format_size, DECIMAL}; use std::io::{Error, ErrorKind}; use std::path::PathBuf; use std::process::Command; @@ -48,6 +48,8 @@ const BLOCK_READ_SIZE: usize = 524288; // 512Kib // Contains collection of functions that execute some business flow in relation // to file transfers in the BMC. See `flash_node` and `os_update`. pub struct UpgradeWorker { + crc: Option, + do_crc_validation: bool, data_transfer: DataTransfer, cancel: CancellationToken, written_sender: watch::Sender, @@ -55,11 +57,15 @@ pub struct UpgradeWorker { impl UpgradeWorker { pub fn new( + crc: Option, + do_crc_validation: bool, data_transfer: DataTransfer, cancel: CancellationToken, written_sender: watch::Sender, ) -> Self { Self { + crc, + do_crc_validation, data_transfer, cancel, written_sender, @@ -71,44 +77,84 @@ impl UpgradeWorker { bmc: Arc, node: NodeId, ) -> anyhow::Result<()> { - let source = self.data_transfer.reader().await?; let device = bmc .configure_node_for_fwupgrade(node, UsbRoute::Bmc, SUPPORTED_DEVICES.keys()) .await?; - let mut buf_stream = BufStream::with_capacity(BLOCK_READ_SIZE, BLOCK_WRITE_SIZE, device); + let result = async move { + let reader = self.data_transfer.reader().await?; + let mut buf_stream = + BufStream::with_capacity(BLOCK_READ_SIZE, BLOCK_WRITE_SIZE, device); + let (bytes_written, written_crc) = + self.try_write_node(node, reader, &mut buf_stream).await?; + + if self.do_crc_validation { + buf_stream.seek(std::io::SeekFrom::Start(0)).await?; + flush_file_caches().await?; + self.try_validate_crc( + node, + self.crc.unwrap_or(written_crc), + buf_stream.take(bytes_written), + ) + .await?; + } + + Ok::<(), anyhow::Error>(()) + } + .await; + + if let Ok(()) = result { + log::info!("Flashing {node} successful, restoring USB & power settings."); + } + + // disregarding the result, set the BMC in the finalized state. + bmc.activate_slot(node.to_inverse_bitfield(), node.to_bitfield()) + .await?; + bmc.usb_boot(node, false).await?; + bmc.configure_usb(bmc.get_usb_mode().await).await?; + result + } + + async fn try_write_node( + &mut self, + node: NodeId, + source_reader: impl AsyncRead + 'static + Unpin, + mut node_writer: &mut (impl AsyncWrite + 'static + Unpin), + ) -> anyhow::Result<(u64, u64)> { log::info!("started writing to {node}"); let crc = Crc::::new(&CRC_64_REDIS); - let mut write_watcher = WriteMonitor::new(&mut buf_stream, &mut self.written_sender, &crc); - let bytes_written = copy_or_cancel(source, &mut write_watcher, &self.cancel).await?; - let img_checksum = write_watcher.crc(); - - log::info!("Verifying checksum of written data to {node}"); - buf_stream.seek(std::io::SeekFrom::Start(0)).await?; - flush_file_caches().await?; + let mut write_watcher = WriteMonitor::new(&mut node_writer, &mut self.written_sender, &crc); + let bytes_written = copy_or_cancel(source_reader, &mut write_watcher, &self.cancel).await?; + log::info!("Wrote {}", format_size(bytes_written, DECIMAL)); + Ok((bytes_written, write_watcher.crc())) + } + async fn try_validate_crc( + &mut self, + node: NodeId, + expected_crc: u64, + node_reader: impl AsyncRead + 'static + Unpin, + ) -> anyhow::Result<()> { + log::info!("Verifying checksum of data on node {node}"); let (mut sender, receiver) = watch::channel(0u64); tokio::spawn(progress_printer(receiver)); + let crc = Crc::::new(&CRC_64_REDIS); let mut sink = WriteMonitor::new(sink(), &mut sender, &crc); - copy_or_cancel(&mut buf_stream.take(bytes_written), &mut sink, &self.cancel).await?; + copy_or_cancel(node_reader, &mut sink, &self.cancel).await?; let dev_checksum = sink.crc(); - if img_checksum != dev_checksum { + if expected_crc != dev_checksum { log::error!( "Source and destination checksum mismatch: {:#x} != {:#x}", - img_checksum, + expected_crc, dev_checksum ); bail!(FwUpdateError::ChecksumMismatch) } - log::info!("Flashing {node} successful, restoring USB & power settings."); - bmc.activate_slot(node.to_inverse_bitfield(), node.to_bitfield()) - .await?; - bmc.usb_boot(node, false).await?; - bmc.configure_usb(bmc.get_usb_mode().await).await + Ok(()) } pub async fn os_update(mut self) -> anyhow::Result<()> { diff --git a/src/streaming_data_service.rs b/src/streaming_data_service.rs index ee2f6b7..a7743dd 100644 --- a/src/streaming_data_service.rs +++ b/src/streaming_data_service.rs @@ -171,7 +171,10 @@ impl StreamingDataService { /// unknown /// * 'Err(StreamingServiceError::SenderTaken(_)' /// * Ok(()) on success - pub async fn take_sender(&self, id: u32) -> Result, StreamingServiceError> { + pub async fn take_sender( + &self, + id: u32, + ) -> Result<(mpsc::Sender, u64), StreamingServiceError> { let mut status = self.status.lock().await; let StreamingState::Transferring(ref mut context) = *status else { return Err(StreamingServiceError::WrongState( @@ -184,10 +187,12 @@ impl StreamingDataService { return Err(StreamingServiceError::HandlesDoNotMatch); } - context + let sender = context .data_sender .take() - .ok_or(StreamingServiceError::SenderTaken) + .ok_or(StreamingServiceError::SenderTaken)?; + + Ok((sender, context.size)) } /// Return a borrow to the current status of the flash service diff --git a/src/streaming_data_service/data_transfer.rs b/src/streaming_data_service/data_transfer.rs index b68ca21..bd82b32 100644 --- a/src/streaming_data_service/data_transfer.rs +++ b/src/streaming_data_service/data_transfer.rs @@ -39,7 +39,7 @@ pub enum DataTransfer { file_name: PathBuf, size: u64, sender: Option>, - receiver: mpsc::Receiver, + receiver: Option>, }, } @@ -55,7 +55,7 @@ impl DataTransfer { file_name, size, sender: Some(sender), - receiver, + receiver: Some(receiver), } } } @@ -93,7 +93,7 @@ impl DataTransfer { } } - pub async fn reader(self) -> anyhow::Result { + pub async fn reader(&mut self) -> anyhow::Result { match self { DataTransfer::Local { path } => { let file = OpenOptions::new() @@ -110,7 +110,9 @@ impl DataTransfer { sender: _, receiver, } => { - let stream = ReceiverStream::new(receiver).map(Ok::); + let stream = + ReceiverStream::new(receiver.take().expect("cannot take reader twice")) + .map(Ok::); Ok(with_xz_support(&file_name, StreamReader::new(stream))) } }