Skip to content

Commit

Permalink
streaming_data_service:
Browse files Browse the repository at this point in the history
* Includes patch for tokio::io::Take which caused overflows when
  steaming files over 4GB
* Decreased lock contention of the streaming_data_service state by
  exposing the "Sender" of the data. (`take_sender()`)
* Upgraded from peer validation to handle validation. Only data that is
  send to the correct handle endpoint is allowed.
* various fixes and improvements
  • Loading branch information
svenrademakers committed Oct 18, 2023
1 parent 00ccc41 commit a28a981
Show file tree
Hide file tree
Showing 11 changed files with 549 additions and 477 deletions.
220 changes: 149 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ serde_with = "3.3.0"
thiserror = "1.0.49"
tokio-stream = "0.1.14"
humansize = "2.1.3"
actix-multipart = "0.6.1"
async-trait = "0.1.74"

[dev-dependencies]
tempdir = "0.3.7"
Expand All @@ -60,3 +62,8 @@ strip = true
[features]
vendored = ["openssl/vendored"]

[patch.crates-io]
# this patch needs to be removed as soon as the given rev lands in a release
tokio = { git = "https://github.com/tokio-rs/tokio", rev="654a3d5acf37841d74dca411ec7a7cc70495e1cd" }


86 changes: 43 additions & 43 deletions src/api/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Routes for legacy API present in versions <= 1.1.0 of the firmware.
use super::streaming_data_service::TransferType;
use crate::api::into_legacy_response::LegacyResponse;
use crate::api::into_legacy_response::{LegacyResult, Null};
use crate::api::streaming_data_service::StreamingDataService;
use crate::app::bmc_application::{BmcApplication, Encoding, UsbConfig};
use crate::app::firmware_runner::FirmwareRunner;
use crate::app::transfer_action::{TransferType, UpgradeAction, UpgradeType};
use crate::hal::{NodeId, UsbMode, UsbRoute};
use crate::utils::logging_sink;
use actix_multipart::Multipart;
use actix_web::guard::{fn_guard, GuardContext};
use actix_web::http::StatusCode;
use actix_web::web::Bytes;
use actix_web::{get, web, HttpRequest, Responder};
use actix_web::{get, post, web, Responder};
use anyhow::Context;
use serde_json::json;
use std::collections::HashMap;
use std::ops::Deref;
use std::str::FromStr;
use tokio::io::AsyncBufReadExt;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
type Query = web::Query<std::collections::HashMap<String, String>>;

/// version 1:
Expand All @@ -56,22 +56,26 @@ pub fn config(cfg: &mut web::ServiceConfig) {
.guard(fn_guard(flash_guard))
.to(handle_flash_request),
)
.route(web::post().guard(fn_guard(flash_guard)).to(handle_chunk))
.route(web::get().to(api_entry)),
);
)
.service(handle_file_upload);
}

pub fn info_config(cfg: &mut web::ServiceConfig) {
cfg.service(info_handler);
}

fn flash_status_guard(context: &GuardContext<'_>) -> bool {
let Some(query) = context.head().uri.query() else { return false; };
let Some(query) = context.head().uri.query() else {
return false;
};
query.contains("opt=get") && (query.contains("type=flash") || query.contains("type=firmware"))
}

fn flash_guard(context: &GuardContext<'_>) -> bool {
let Some(query) = context.head().uri.query() else { return false; };
let Some(query) = context.head().uri.query() else {
return false;
};
query.contains("opt=set") && (query.contains("type=flash") || query.contains("type=firmware"))
}

Expand Down Expand Up @@ -460,25 +464,24 @@ async fn handle_flash_status(flash: web::Data<StreamingDataService>) -> LegacyRe
async fn handle_flash_request(
ss: web::Data<StreamingDataService>,
bmc: web::Data<BmcApplication>,
request: HttpRequest,
mut query: Query,
) -> LegacyResult<Null> {
) -> LegacyResult<String> {
let file = query
.get("file")
.ok_or(LegacyResponse::bad_request(
"Invalid `file` query parameter",
))?
.to_string();

let peer: String = request
.connection_info()
.peer_addr()
.map(Into::into)
.context("peer_addr unknown")?;

let (firmware_request, process_name) = match query.get_mut("type").map(|c| c.as_str()) {
Some("firmware") => (true, "upgrade os task".to_string()),
Some("flash") => (false, "node flash service".to_string()),
let (process_name, upgrade_type) = match query.get_mut("type").map(|c| c.as_str()) {
Some("firmware") => ("os upgrade service".to_string(), UpgradeType::OsUpgrade),
Some("flash") => {
let node = get_node_param(&query)?;
(
format!("{node} upgrade service"),
UpgradeType::Module(node, bmc.clone().into_inner()),
)
}
_ => panic!("programming error: `type` should equal 'firmware' or 'flash'"),
};

Expand All @@ -492,34 +495,31 @@ async fn handle_flash_request(

let size = u64::from_str(size)
.map_err(|_| LegacyResponse::bad_request("`length` parameter is not a number"))?;
TransferType::Remote(peer, size)
TransferType::Remote(file, size)
};

let handle = ss.request_transfer(process_name, transfer_type).await?;
let context = FirmwareRunner::new(file.into(), handle);

if firmware_request {
ss.execute_worker(context.os_update()).await?;
} else {
let node = get_node_param(&query)?;
ss.execute_worker(context.flash_node(bmc.clone().into_inner(), node))
.await?;
}

Ok(Null)
let action = UpgradeAction::new(upgrade_type, transfer_type);
let handle = ss.request_transfer(process_name, action).await?;
let json = json!({"handle": handle});
Ok(json.to_string())
}

async fn handle_chunk(
flash: web::Data<StreamingDataService>,
request: HttpRequest,
chunk: Bytes,
) -> LegacyResult<Null> {
let peer: String = request
.connection_info()
.peer_addr()
.map(Into::into)
.context("peer_addr unknown")?;
#[post("/api/bmc/upload/{handle}")]
async fn handle_file_upload(
handle: web::Path<u32>,
ss: web::Data<StreamingDataService>,
mut payload: Multipart,
) -> impl Responder {
let sender = ss.take_sender(*handle).await?;
let Some(Ok(mut field)) = payload.next().await else {
return Err(LegacyResponse::bad_request("Multipart form invalid"));
};

while let Some(Ok(chunk)) = field.next().await {
if sender.send(chunk).await.is_err() {
return Err((StatusCode::GONE, "upload cancelled").into());
}
}

flash.put_chunk(peer, chunk).await?;
Ok(Null)
}
Loading

0 comments on commit a28a981

Please sign in to comment.