Skip to content

Commit

Permalink
FlashService: fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
svenrademakers committed Aug 30, 2023
1 parent 6b7585d commit b7dd936
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 59 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bmcd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ log.workspace = true
simple_logger.workspace = true
tokio = { workspace = true, features = ["net"] }
mime = "0.3.17"
futures = "0.3.28"
44 changes: 36 additions & 8 deletions bmcd/src/flash_service.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
#![allow(dead_code, unused)]
use crate::into_legacy_response::LegacyResponse;
use actix_web::{http::StatusCode, web::Bytes};
use std::{error::Error, fmt::Display, sync::Arc};
use anyhow::Context;
use futures::future::BoxFuture;
use futures::TryFutureExt;
use std::{
collections::hash_map::DefaultHasher,
error::Error,
fmt::Display,
hash::{Hash, Hasher},
sync::Arc,
};
use tokio::{
io::{AsyncRead, BufReader},
sync::mpsc::{channel, error::SendError, Receiver, Sender},
};
use tpi_rs::{
app::bmc_application::BmcApplication,
middleware::{firmware_update::SUPPORTED_DEVICES, NodeId, UsbRoute},
utils::logging_sink,
};
use tpi_rs::{app::flash_application::flash_node, middleware::firmware_update::FlashStatus};
use tpi_rs::{app::flash_application::FlashContext, utils::ReceiverReader};

pub type FlashDoneFut = BoxFuture<'static, anyhow::Result<()>>;
pub struct FlashService {
status: Option<Sender<Bytes>>,
status: Option<(u64, Sender<Bytes>)>,
bmc: Arc<BmcApplication>,
}

Expand All @@ -25,16 +36,19 @@ impl FlashService {

pub async fn start_transfer(
&mut self,
peer: &str,
filename: String,
size: usize,
node: NodeId,
) -> Result<(), FlashError> {
) -> Result<FlashDoneFut, FlashError> {
if self.status.is_some() {
return Err(FlashError::InProgress);
}

let (sender, receiver) = channel::<Bytes>(128);
let (progress_sender, progress_receiver) = channel(32);
logging_sink(progress_receiver);

let context = FlashContext {
filename,
size,
Expand All @@ -45,14 +59,24 @@ impl FlashService {
};

/// execute the flashing of the image.
tokio::spawn(flash_node(context));
let flash_handle = tokio::spawn(flash_node(context));

self.status = Some(sender);
Ok(())
let mut hasher = DefaultHasher::new();
peer.hash(&mut hasher);
self.status = Some((hasher.finish(), sender));
Ok(Box::pin(async move {
flash_handle
.await
.context("join error waiting for flashing to complete")?
}))
}

pub async fn stream_chunk(&mut self, data: Bytes) -> Result<(), FlashError> {
if let Some(sender) = &self.status {
pub async fn stream_chunk(&mut self, peer: &str, data: Bytes) -> Result<(), FlashError> {
let mut hasher = DefaultHasher::new();
peer.hash(&mut hasher);
let hash = hasher.finish();

if let Some((hash, sender)) = &self.status {
match sender.send(data).await {
Ok(_) => Ok(()),
Err(e) if sender.is_closed() => Err(FlashError::Aborted),
Expand All @@ -62,6 +86,10 @@ impl FlashService {
Err(FlashError::UnexpectedCommand)
}
}

pub fn reset(&mut self) {
self.status = None;
}
}

#[derive(Debug, PartialEq)]
Expand Down
118 changes: 68 additions & 50 deletions bmcd/src/legacy.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,56 @@
//! Routes for legacy API present in versions <= 1.1.0 of the firmware.
use crate::flash_service::FlashService;
use crate::into_legacy_response::LegacyResult;
use crate::into_legacy_response::{IntoLegacyResponse, LegacyResponse};
use actix_web::http::header::{CONTENT_ENCODING, TRANSFER_ENCODING};
use crate::into_legacy_response::{LegacyResult, Null};
use actix_web::guard::{fn_guard, GuardContext};
use actix_web::http::header::CONTENT_TYPE;
use actix_web::http::StatusCode;
use actix_web::web::Bytes;
use actix_web::{web, HttpRequest, HttpResponse};
use actix_web::{web, HttpRequest, Responder};
use anyhow::Context;
use nix::sys::statfs::statfs;
use serde_json::json;
use std::str::FromStr;
use tokio::sync::{mpsc, Mutex};
use tpi_rs::app::bmc_application::{BmcApplication, UsbConfig};
use tpi_rs::middleware::{NodeId, UsbMode, UsbRoute};

type Query = web::Query<std::collections::HashMap<String, String>>;

pub fn config(cfg: &mut web::ServiceConfig) {
cfg.service(
web::resource("/api/bmc")
.route(web::get().to(api_entry))
.route(web::post().to(api_post)),
.route(
web::route()
.guard(fn_guard(flash_guard))
.to(handle_flash_request),
)
.route(web::get().to(api_entry)),
);
}

async fn api_entry(bmc: web::Data<BmcApplication>, query: Query) -> HttpResponse {
fn flash_guard(context: &GuardContext<'_>) -> bool {
let is_set = context
.head()
.uri
.query()
.is_some_and(|q| q.contains("opt=set"));
let is_type = context
.head()
.uri
.query()
.is_some_and(|q| q.contains("type=flash"));
is_set && is_type
}

async fn api_entry(bmc: web::Data<BmcApplication>, query: Query) -> impl Responder {
let is_set = match query.get("opt").map(String::as_str) {
Some("set") => true,
Some("get") => false,
_ => return LegacyResponse::bad_request("Missing `opt` parameter").into(),
_ => return LegacyResponse::bad_request("Missing `opt` parameter"),
};

let Some(ty) = query.get("type") else {
return LegacyResponse::bad_request("Missing `opt` parameter").into()
return LegacyResponse::bad_request("Missing `opt` parameter")
};

let bmc = bmc.as_ref();
Expand All @@ -41,19 +59,22 @@ async fn api_entry(bmc: web::Data<BmcApplication>, query: Query) -> HttpResponse
("network", true) => reset_network(bmc).await.legacy_response(),
("nodeinfo", true) => set_node_info().legacy_response(),
("nodeinfo", false) => get_node_info(bmc).legacy_response(),
("node_to_msd", true) => set_node_to_msd(bmc, query).await.legacy_response(),
("node_to_msd", true) => set_node_to_msd(bmc, query).await.into(),
("other", false) => get_system_information().await.legacy_response(),
("power", true) => set_node_power(bmc, query).await.legacy_response(),
("power", false) => get_node_power(bmc).await.legacy_response(),
("sdcard", true) => format_sdcard().legacy_response(),
("sdcard", false) => get_sdcard_info(),
("uart", true) => write_to_uart(bmc, query).legacy_response(),
("uart", false) => read_from_uart(bmc, query).legacy_response(),
("usb", true) => set_usb_mode(bmc, query).await.legacy_response(),
("usb", true) => set_usb_mode(bmc, query).await.into(),
("usb", false) => get_usb_mode(bmc).await.into(),
_ => LegacyResponse::bad_request("Invalid `type` parameter"),
_ => (
StatusCode::BAD_REQUEST,
format!("Invalid `type` parameter {}", ty),
)
.legacy_response(),
}
.into()
}

fn clear_usb_boot(bmc: &BmcApplication) -> impl IntoLegacyResponse {
Expand Down Expand Up @@ -312,41 +333,25 @@ async fn get_usb_mode(bmc: &BmcApplication) -> anyhow::Result<impl IntoLegacyRes
))
}

async fn api_post(
flash: web::Data<Mutex<FlashService>>,
chunk: Bytes,
request: HttpRequest,
query: Query,
) -> HttpResponse {
if query.get("opt").map(String::as_ref) != Some("set") {
return LegacyResponse::bad_request("Invalid `opt` parameter").into();
}

let Some(ty) = query.get("type") else {
return LegacyResponse::bad_request("Missing `type` parameter").into();
};

match ty.as_ref() {
"firmware" => handle_flash_request(flash, request, chunk, query)
.await
.legacy_response(),
"flash" => LegacyResponse::stub(),
_ => LegacyResponse::bad_request("Invalid `type` parameter"),
}
.into()
}

async fn handle_flash_request(
flash: web::Data<Mutex<FlashService>>,
request: HttpRequest,
chunk: Bytes,
query: Query,
) -> LegacyResult<()> {
) -> LegacyResult<Null> {
let mut flash_service = flash.lock().await;

if is_stream_chunck(&request) {
(*flash_service).stream_chunk(chunk).await?;
return Ok(());
(*flash_service)
.stream_chunk(
request
.connection_info()

Check failure on line 348 in bmcd/src/legacy.rs

View workflow job for this annotation

GitHub Actions / clippy

this `RefCell` reference is held across an `await` point

error: this `RefCell` reference is held across an `await` point --> bmcd/src/legacy.rs:347:17 | 347 | / request 348 | | .connection_info() | |______________________________________^ | = help: ensure the reference is dropped before calling `await` note: these are all the `await` points this reference is held through --> bmcd/src/legacy.rs:345:9 | 345 | / (*flash_service) 346 | | .stream_chunk( 347 | | request 348 | | .connection_info() ... | 352 | | ) 353 | | .await?; | |____________________^ = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#await_holding_refcell_ref = note: `-D clippy::await-holding-refcell-ref` implied by `-D warnings`
.peer_addr()
.context("peer_addr unknown")?,
chunk,
)
.await?;
return Ok(Null);
}

let node = get_node_param(&query)?;
Expand All @@ -365,21 +370,34 @@ async fn handle_flash_request(
let size = usize::from_str(size)
.map_err(|_| LegacyResponse::bad_request("`lenght` parameter not a number"))?;

(*flash_service)
.start_transfer(file, size, node)
let on_done = (*flash_service)
.start_transfer(
request
.connection_info()
.peer_addr()
.context("peer_addr unknown")?,
file,
size,
node,
)
.await
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, format!("{}", e)).into())
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, format!("{}", e)).legacy_response())?;

let service = flash.clone();
tokio::spawn(async move {
if let Err(e) = on_done.await {
log::error!("{}", e);
}
service.lock().await.reset();
});

Ok(Null)
}

fn is_stream_chunck(request: &HttpRequest) -> bool {
request
.headers()
.get(TRANSFER_ENCODING)
.get(CONTENT_TYPE)
.map(|v| v.to_str().unwrap())
== Some("chunked")
&& request
.headers()
.get(CONTENT_ENCODING)
.map(|v| v.to_str().unwrap())
== Some(mime::APPLICATION_OCTET_STREAM.essence_str())
== Some("application/octet-stream")
}
4 changes: 3 additions & 1 deletion bmcd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ async fn main() -> anyhow::Result<()> {
let bmc = Data::new(BmcApplication::new().await?);
run_event_listener(bmc.deref().clone())?;

let flash_service = Data::new(Mutex::new(FlashService::new(bmc.deref().clone())));

HttpServer::new(move || {
App::new()
// Shared state: BmcApplication instance
.app_data(bmc.clone())
.app_data(Mutex::new(FlashService::new(bmc.deref().clone())))
.app_data(flash_service.clone())
// Legacy API
.configure(legacy::config)
// Enable logger
Expand Down
15 changes: 15 additions & 0 deletions tpi_rs/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
mod event_listener;
use std::fmt::Display;

#[doc(inline)]
pub use event_listener::*;
mod io;
pub use io::*;
use tokio::sync::mpsc::Receiver;

// for now we print the status updates to console. In the future we would like to pass
// this back to the clients.
pub fn logging_sink<T: Display + Send + 'static>(
mut receiver: Receiver<T>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
log::info!("{}", msg);
}
})
}

0 comments on commit b7dd936

Please sign in to comment.