Skip to content

Commit

Permalink
flash_service: implement file streaming handler
Browse files Browse the repository at this point in the history
Added a handler in the 'bmcd' that:
* receives a firmware image over the http API endpoint
* buffers these image chunks
* async write chunks to a given node
  • Loading branch information
svenrademakers committed Aug 26, 2023
1 parent 20158a5 commit 898aab2
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bmcd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ tpi_rs = { path = "../tpi_rs" }
anyhow.workspace = true
log.workspace = true
simple_logger.workspace = true
tokio.workspace = true
tokio = { workspace = true, features = ["net"] }
mime = "0.3.17"
108 changes: 108 additions & 0 deletions bmcd/src/flash_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#![allow(dead_code, unused)]
use crate::into_legacy_response::LegacyResponse;
use actix_web::{http::StatusCode, web::Bytes};
use std::{error::Error, fmt::Display, sync::Arc};
use tokio::{
io::{AsyncRead, BufReader},
sync::mpsc::{channel, error::SendError, Receiver, Sender},
};
use tpi_rs::app::flash_application::FlashContext;
use tpi_rs::{
app::bmc_application::BmcApplication,
middleware::{firmware_update::SUPPORTED_DEVICES, NodeId, UsbRoute},
};
use tpi_rs::{app::flash_application::flash_node, middleware::firmware_update::FlashStatus};

pub struct FlashService {
status: Option<Sender<Bytes>>,
bmc: Arc<BmcApplication>,
}

impl FlashService {
pub fn new(bmc: Arc<BmcApplication>) -> Self {
Self { status: None, bmc }
}

pub async fn start_transfer(
&mut self,
filename: String,
size: usize,
node: NodeId,
) -> Result<(), FlashError> {
if self.status.is_some() {
return Err(FlashError::InProgress);
}

let (sender, receiver) = channel::<Bytes>(128);
let (progress_sender, progress_receiver) = channel(32);
let context = FlashContext {
filename,
size,
node,
byte_stream: tokio::fs::File::open("/todo/make/wrapper").await.unwrap(),
bmc: self.bmc.clone(),
progress_sender,
};

/// execute the flashing of the image.
tokio::spawn(flash_node(context));

self.status = Some(sender);
Ok(())
}

pub async fn stream_chunk(&mut self, data: Bytes) -> Result<(), FlashError> {
if let Some(sender) = &self.status {
match sender.send(data).await {
Ok(_) => Ok(()),
Err(e) if sender.is_closed() => Err(FlashError::Aborted),
Err(e) => Err(e.into()),
}
} else {
Err(FlashError::UnexpectedCommand)
}
}
}

#[derive(Debug, PartialEq)]
pub enum FlashError {
InProgress,
UnexpectedCommand,
Aborted,
MpscError(SendError<Bytes>),
}

impl Error for FlashError {}

impl Display for FlashError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}

impl From<SendError<Bytes>> for FlashError {
fn from(value: SendError<Bytes>) -> Self {
FlashError::MpscError(value)
}
}

impl From<FlashError> for LegacyResponse {
fn from(value: FlashError) -> Self {
match value {
FlashError::InProgress => (
StatusCode::SERVICE_UNAVAILABLE,
"another flash operation is in progress",
)
.into(),
FlashError::UnexpectedCommand => {
(StatusCode::BAD_REQUEST, "did not expect given request").into()
}
FlashError::MpscError(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into(),
FlashError::Aborted => (
StatusCode::INTERNAL_SERVER_ERROR,
"Flashing process aborted",
)
.into(),
}
}
}
77 changes: 66 additions & 11 deletions bmcd/src/legacy.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
//! Routes for legacy API present in versions <= 1.1.0 of the firmware.
use crate::flash_service::FlashService;
use crate::into_legacy_response::LegacyResult;
use crate::into_legacy_response::{IntoLegacyResponse, LegacyResponse};
use actix_web::http::header::{CONTENT_ENCODING, TRANSFER_ENCODING};
use actix_web::http::StatusCode;
use actix_web::{web, HttpResponse};
use actix_web::web::Bytes;
use actix_web::{web, HttpRequest, HttpResponse};
use anyhow::Context;
use nix::sys::statfs::statfs;
use serde_json::json;
use std::str::FromStr;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Mutex};
use tpi_rs::app::bmc_application::{BmcApplication, UsbConfig};
use tpi_rs::middleware::{NodeId, UsbMode, UsbRoute};

Expand Down Expand Up @@ -309,22 +312,74 @@ async fn get_usb_mode(bmc: &BmcApplication) -> anyhow::Result<impl IntoLegacyRes
))
}

async fn api_post(query: Query) -> HttpResponse {
if query.get("opt") != Some(&"set".to_owned()) {
return LegacyResponse::Error(StatusCode::BAD_REQUEST, "Invalid `opt` parameter").into();
async fn api_post(
flash: web::Data<Mutex<FlashService>>,
chunk: Bytes,
request: HttpRequest,
query: Query,
) -> HttpResponse {
if query.get("opt").map(String::as_ref) != Some("set") {
return LegacyResponse::bad_request("Invalid `opt` parameter").into();
}

let Some(ty) = query.get("type") else {
return LegacyResponse::Error(StatusCode::BAD_REQUEST, "Missing `type` parameter").into();
return LegacyResponse::bad_request("Missing `type` parameter").into();
};

match ty.as_ref() {
"firmware" => stub(),
"flash" => stub(),
_ => LegacyResponse::Error(StatusCode::BAD_REQUEST, "Invalid `type` parameter").into(),
"firmware" => handle_flash_request(flash, request, chunk, query)
.await
.legacy_response(),
"flash" => LegacyResponse::stub(),
_ => LegacyResponse::bad_request("Invalid `type` parameter"),
}
.into()
}

async fn handle_flash_request(
flash: web::Data<Mutex<FlashService>>,
request: HttpRequest,
chunk: Bytes,
query: Query,
) -> LegacyResult<()> {
let mut flash_service = flash.lock().await;

if is_stream_chunck(&request) {
(*flash_service).stream_chunk(chunk).await?;
return Ok(());
}

let node = get_node_param(&query)?;
let file = query
.get("file")
.ok_or(LegacyResponse::bad_request(
"Invalid `file` query parameter",
))?
.to_string();

let size = query.get("length").ok_or((
StatusCode::LENGTH_REQUIRED,
"Invalid `length` query parameter",
))?;

let size = usize::from_str(size)
.map_err(|_| LegacyResponse::bad_request("`lenght` parameter not a number"))?;

(*flash_service)
.start_transfer(file, size, node)
.await
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, format!("{}", e)).into())
}

fn stub() -> HttpResponse {
LegacyResponse::Success(None).into()
fn is_stream_chunck(request: &HttpRequest) -> bool {
request
.headers()
.get(TRANSFER_ENCODING)
.map(|v| v.to_str().unwrap())
== Some("chunked")
&& request
.headers()
.get(CONTENT_ENCODING)
.map(|v| v.to_str().unwrap())
== Some(mime::APPLICATION_OCTET_STREAM.essence_str())
}
4 changes: 4 additions & 0 deletions bmcd/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::flash_service::FlashService;
use actix_files::Files;
use actix_web::{middleware, web::Data, App, HttpServer};
use log::LevelFilter;
use std::ops::Deref;
use tokio::sync::Mutex;
use tpi_rs::app::{bmc_application::BmcApplication, event_application::run_event_listener};
mod flash_service;
mod into_legacy_response;
mod legacy;

Expand All @@ -17,6 +20,7 @@ async fn main() -> anyhow::Result<()> {
App::new()
// Shared state: BmcApplication instance
.app_data(bmc.clone())
.app_data(Mutex::new(FlashService::new(bmc.deref().clone())))
// Legacy API
.configure(legacy::config)
// Enable logger
Expand Down

0 comments on commit 898aab2

Please sign in to comment.