From 602593332495c3a0405e41756ce691b220c3b798 Mon Sep 17 00:00:00 2001 From: ESPIE Date: Wed, 1 May 2024 12:59:48 +0200 Subject: [PATCH 1/5] feat: child processes are now gracefully sigterm-ed on kill signal reception Signed-off-by: ESPIE --- src/agent/Cargo.toml | 2 + src/agent/src/agents/debug.rs | 75 ++++++++++++++++++------------- src/agent/src/agents/mod.rs | 15 ++++++- src/agent/src/agents/rust.rs | 53 +++++++++++++++------- src/agent/src/main.rs | 9 +++- src/agent/src/workload/runner.rs | 22 ++++++--- src/agent/src/workload/service.rs | 15 ++++++- 7 files changed, 135 insertions(+), 56 deletions(-) diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index daa40d6..716f20c 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -16,6 +16,8 @@ tokio = { version = "1.37.0", features = ["full"] } tokio-stream = "0.1.15" toml = "0.8.12" tonic = "0.11" +nix = { version = "0.28.0", features = ["signal"] } +once_cell = "1.19.0" [build-dependencies] tonic-build = "0.11" diff --git a/src/agent/src/agents/debug.rs b/src/agent/src/agents/debug.rs index ceae270..3161d1c 100644 --- a/src/agent/src/agents/debug.rs +++ b/src/agent/src/agents/debug.rs @@ -1,8 +1,13 @@ use super::AgentOutput; use crate::agents::Agent; use crate::{workload, AgentResult}; +use std::collections::HashSet; use std::fs::create_dir_all; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; use std::time::SystemTime; +use tokio::sync::Mutex; pub struct DebugAgent { workload_config: workload::config::Config, @@ -15,42 +20,52 @@ impl From for DebugAgent { } impl Agent for DebugAgent { - fn prepare(&self) -> AgentResult { - let dir = format!("/tmp/{}", self.workload_config.workload_name); - - println!("Function directory: {}", dir); - - create_dir_all(&dir).expect("Unable to create directory"); - - std::fs::write( - format!("{}/debug.txt", &dir), - format!( - "Debug agent for {} - written at {:?}", - self.workload_config.workload_name, - SystemTime::now(), - ), - ) - .expect("Unable to write debug.txt file"); - - Ok(AgentOutput { - exit_code: 0, - stdout: "Build successfully!".into(), - stderr: String::default(), + fn prepare( + &self, + _: &Arc>>, + ) -> Pin> + Send + '_>> { + Box::pin(async { + let dir = format!("/tmp/{}", self.workload_config.workload_name); + + println!("Function directory: {}", dir); + + create_dir_all(&dir).expect("Unable to create directory"); + + std::fs::write( + format!("{}/debug.txt", &dir), + format!( + "Debug agent for {} - written at {:?}", + self.workload_config.workload_name, + SystemTime::now(), + ), + ) + .expect("Unable to write debug.txt file"); + + Ok(AgentOutput { + exit_code: 0, + stdout: "Build successfully!".into(), + stderr: String::default(), + }) }) } - fn run(&self) -> AgentResult { - let dir = format!("/tmp/{}", self.workload_config.workload_name); + fn run( + &self, + _: &Arc>>, + ) -> Pin> + Send + '_>> { + Box::pin(async { + let dir = format!("/tmp/{}", self.workload_config.workload_name); - let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)) - .expect("Unable to read debug.txt file"); + let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)) + .expect("Unable to read debug.txt file"); - std::fs::remove_dir_all(dir).expect("Unable to remove directory"); + std::fs::remove_dir_all(dir).expect("Unable to remove directory"); - Ok(AgentOutput { - exit_code: 0, - stdout: content, - stderr: String::default(), + Ok(AgentOutput { + exit_code: 0, + stdout: content, + stderr: String::default(), + }) }) } } diff --git a/src/agent/src/agents/mod.rs b/src/agent/src/agents/mod.rs index f70d43b..eaf789d 100644 --- a/src/agent/src/agents/mod.rs +++ b/src/agent/src/agents/mod.rs @@ -1,5 +1,10 @@ use crate::{AgentError, AgentResult}; use serde::Deserialize; +use std::collections::HashSet; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use tokio::sync::Mutex; #[cfg(feature = "debug-agent")] pub mod debug; @@ -13,8 +18,14 @@ pub struct AgentOutput { } pub trait Agent { - fn prepare(&self) -> AgentResult; - fn run(&self) -> AgentResult; + fn prepare<'a>( + &'a self, + child_processes: &'a Arc>>, + ) -> Pin> + Send + '_>>; + fn run<'a>( + &'a self, + child_processes: &'a Arc>>, + ) -> Pin> + Send + '_>>; } #[derive(Debug, Clone, Deserialize)] diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index c34f3b4..180188a 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -2,7 +2,12 @@ use super::{Agent, AgentOutput}; use crate::{workload, AgentError, AgentResult}; use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; +use std::collections::HashSet; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; use std::{fs::create_dir_all, process::Command}; +use tokio::sync::Mutex; #[derive(Deserialize)] #[serde(rename_all = "kebab-case")] @@ -21,15 +26,23 @@ pub struct RustAgent { } impl RustAgent { - fn build(&self, function_dir: &String) -> AgentResult { + async fn build( + &self, + function_dir: &String, + child_processes: &Arc>>, + ) -> AgentResult { if self.rust_config.build.release { - let output = Command::new("cargo") + let child = Command::new("cargo") .arg("build") .arg("--release") .current_dir(function_dir) - .output() + .spawn() .expect("Failed to build function"); + child_processes.lock().await.insert(child.id()); + + let output = child.wait_with_output().expect("Failed to wait on child"); + Ok(AgentOutput { exit_code: output.status.code().unwrap(), stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), @@ -64,15 +77,21 @@ impl From for RustAgent { } impl Agent for RustAgent { - fn prepare(&self) -> AgentResult { - let function_dir = format!( - "/tmp/{}", - Alphanumeric.sample_string(&mut rand::thread_rng(), 16) - ); + fn prepare<'a>( + &'a self, + child_processes: &'a Arc>>, + ) -> Pin> + Send + '_>> { + Box::pin(async { + let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap(); + + let function_dir = format!( + "/tmp/{}", + Alphanumeric.sample_string(&mut rand::thread_rng(), 16) + ); - println!("Function directory: {}", function_dir); + println!("Function directory: {}", function_dir); - create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); + create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); std::fs::write( format!("{}/src/main.rs", &function_dir), @@ -93,7 +112,7 @@ impl Agent for RustAgent { std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) .expect("Unable to write Cargo.toml file"); - let result = self.build(&function_dir)?; + let result = self.build(&function_dir, child_processes).await?; if result.exit_code != 0 { println!("Build failed: {:?}", result); @@ -131,10 +150,14 @@ impl Agent for RustAgent { }) } - fn run(&self) -> AgentResult { - let output = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) - .output() - .expect("Failed to run function"); + fn run<'a>( + &'a self, + child_processes: &'a Arc>>, + ) -> Pin> + Send + '_>> { + Box::pin(async { + let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) + .spawn() + .expect("Failed to run function"); let agent_output = AgentOutput { exit_code: output.status.code().unwrap(), diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index 93437f2..780c36a 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -2,9 +2,16 @@ use agent::{ agent::workload_runner_server::WorkloadRunnerServer, workload::service::WorkloadRunnerService, }; use clap::Parser; -use std::net::ToSocketAddrs; +use once_cell::sync::Lazy; +use std::collections::HashSet; +use std::sync::Arc; +use std::{net::ToSocketAddrs, path::PathBuf}; +use tokio::sync::Mutex; use tonic::transport::Server; +static CHILD_PROCESSES: Lazy>>> = + Lazy::new(|| Arc::new(Mutex::new(HashSet::new()))); + #[derive(Debug, Parser)] struct Args { #[clap(long, env, default_value = "0.0.0.0")] diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index ae4e175..b6f461c 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -4,6 +4,9 @@ use crate::{ workload::config::Action, AgentError, AgentResult, }; +use std::collections::HashSet; +use std::sync::Arc; +use tokio::sync::Mutex; #[cfg(feature = "debug-agent")] use crate::agents::debug; @@ -15,17 +18,22 @@ use super::config::Config; pub struct Runner { config: Config, agent: Box, + child_processes: Arc>>, } impl Runner { - pub fn new(config: Config) -> Self { + pub fn new(config: Config, child_processes: Arc>>) -> Self { let agent: Box = match config.language { Language::Rust => Box::new(rust::RustAgent::from(config.clone())), #[cfg(feature = "debug-agent")] Language::Debug => Box::new(debug::DebugAgent::from(config.clone())), }; - Runner { config, agent } + Runner { + config, + agent, + child_processes, + } } pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result { @@ -33,14 +41,14 @@ impl Runner { Ok(Self::new(config)) } - pub fn run(&self) -> AgentResult { + pub async fn run(&self) -> AgentResult { let result = match self.config.action { - Action::Prepare => self.agent.prepare()?, - Action::Run => self.agent.run()?, + Action::Prepare => self.agent.prepare(&self.child_processes).await?, + Action::Run => self.agent.run(&self.child_processes).await?, Action::PrepareAndRun => { - let res = self.agent.prepare()?; + let res = self.agent.prepare(&self.child_processes).await?; println!("Prepare result {:?}", res); - self.agent.run()? + self.agent.run(&self.child_processes).await? } }; diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index 3b05386..38e6e66 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -1,7 +1,9 @@ use super::runner::Runner; use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest}; use agent::workload_runner_server::WorkloadRunner; -use std::process; +use std::collections::HashSet; +use std::{process, sync::Arc}; +use tokio::sync::Mutex; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response}; @@ -23,6 +25,7 @@ impl WorkloadRunner for WorkloadRunnerService { let res = runner .run() + .await .map_err(|e| tonic::Status::internal(e.to_string()))?; let _ = tx @@ -42,6 +45,16 @@ impl WorkloadRunner for WorkloadRunnerService { } async fn signal(&self, _: Request) -> Result<()> { + let child_processes = self.child_processes.lock().await; + + for &child_id in child_processes.iter() { + nix::sys::signal::kill( + nix::unistd::Pid::from_raw(child_id as i32), + nix::sys::signal::Signal::SIGTERM, + ) + .unwrap(); + } + process::exit(0); } } From a6a4b4fce7a6b901b4b1af0e5c6b1dc5badab5b0 Mon Sep 17 00:00:00 2001 From: ESPIE Date: Thu, 2 May 2024 12:00:19 +0200 Subject: [PATCH 2/5] chore: merge from main Signed-off-by: ESPIE --- src/agent/src/agents/rust.rs | 114 ++++++++++++++++-------------- src/agent/src/main.rs | 9 +-- src/agent/src/workload/runner.rs | 7 +- src/agent/src/workload/service.rs | 8 ++- 4 files changed, 71 insertions(+), 67 deletions(-) diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 180188a..b8c70ec 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -82,8 +82,6 @@ impl Agent for RustAgent { child_processes: &'a Arc>>, ) -> Pin> + Send + '_>> { Box::pin(async { - let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap(); - let function_dir = format!( "/tmp/{}", Alphanumeric.sample_string(&mut rand::thread_rng(), 16) @@ -93,60 +91,61 @@ impl Agent for RustAgent { create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); - std::fs::write( - format!("{}/src/main.rs", &function_dir), - &self.workload_config.code, - ) - .expect("Unable to write main.rs file"); + std::fs::write( + format!("{}/src/main.rs", &function_dir), + &self.workload_config.code, + ) + .expect("Unable to write main.rs file"); - let cargo_toml = format!( - r#" + let cargo_toml = format!( + r#" [package] name = "{}" version = "0.1.0" edition = "2018" "#, - self.workload_config.workload_name - ); + self.workload_config.workload_name + ); - std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) - .expect("Unable to write Cargo.toml file"); + std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) + .expect("Unable to write Cargo.toml file"); let result = self.build(&function_dir, child_processes).await?; - if result.exit_code != 0 { - println!("Build failed: {:?}", result); - return Err(AgentError::BuildFailed(AgentOutput { - exit_code: result.exit_code, - stdout: result.stdout, - stderr: result.stderr, - })); - } + if result.exit_code != 0 { + println!("Build failed: {:?}", result); + return Err(AgentError::BuildFailed(AgentOutput { + exit_code: result.exit_code, + stdout: result.stdout, + stderr: result.stderr, + })); + } + + // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) + let binary_path = match self.rust_config.build.release { + true => format!( + "{}/target/release/{}", + &function_dir, self.workload_config.workload_name + ), + false => format!( + "{}/target/debug/{}", + &function_dir, self.workload_config.workload_name + ), + }; + + std::fs::copy( + binary_path, + format!("/tmp/{}", self.workload_config.workload_name), + ) + .expect("Unable to copy binary"); + + std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); - // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) - let binary_path = match self.rust_config.build.release { - true => format!( - "{}/target/release/{}", - &function_dir, self.workload_config.workload_name - ), - false => format!( - "{}/target/debug/{}", - &function_dir, self.workload_config.workload_name - ), - }; - - std::fs::copy( - binary_path, - format!("/tmp/{}", self.workload_config.workload_name), - ) - .expect("Unable to copy binary"); - - std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); - - Ok(AgentOutput { - exit_code: result.exit_code, - stdout: "Build successful".to_string(), - stderr: "".to_string(), + Ok(AgentOutput { + exit_code: result.exit_code, + stdout: "Build successful".to_string(), + stderr: "".to_string(), + }) }) } @@ -159,17 +158,22 @@ impl Agent for RustAgent { .spawn() .expect("Failed to run function"); - let agent_output = AgentOutput { - exit_code: output.status.code().unwrap(), - stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), - stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), - }; + child_processes.lock().await.insert(child.id()); + + let output = child.wait_with_output().expect("Failed to wait on child"); - if !output.status.success() { - println!("Run failed: {:?}", agent_output); - return Err(AgentError::BuildFailed(agent_output)); - } + let agent_output = AgentOutput { + exit_code: output.status.code().unwrap(), + stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), + stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), + }; - Ok(agent_output) + if !output.status.success() { + println!("Run failed: {:?}", agent_output); + return Err(AgentError::BuildFailed(agent_output)); + } + + Ok(agent_output) + }) } } diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index 780c36a..93437f2 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -2,16 +2,9 @@ use agent::{ agent::workload_runner_server::WorkloadRunnerServer, workload::service::WorkloadRunnerService, }; use clap::Parser; -use once_cell::sync::Lazy; -use std::collections::HashSet; -use std::sync::Arc; -use std::{net::ToSocketAddrs, path::PathBuf}; -use tokio::sync::Mutex; +use std::net::ToSocketAddrs; use tonic::transport::Server; -static CHILD_PROCESSES: Lazy>>> = - Lazy::new(|| Arc::new(Mutex::new(HashSet::new()))); - #[derive(Debug, Parser)] struct Args { #[clap(long, env, default_value = "0.0.0.0")] diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index b6f461c..7e6ce94 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -36,9 +36,12 @@ impl Runner { } } - pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result { + pub fn new_from_execute_request( + execute_request: ExecuteRequest, + child_processes: Arc>>, + ) -> Result { let config = Config::new_from_execute_request(execute_request)?; - Ok(Self::new(config)) + Ok(Self::new(config, child_processes)) } pub async fn run(&self) -> AgentResult { diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index 38e6e66..d28d991 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -1,6 +1,7 @@ use super::runner::Runner; use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest}; use agent::workload_runner_server::WorkloadRunner; +use once_cell::sync::Lazy; use std::collections::HashSet; use std::{process, sync::Arc}; use tokio::sync::Mutex; @@ -9,6 +10,9 @@ use tonic::{Request, Response}; type Result = std::result::Result, tonic::Status>; +static CHILD_PROCESSES: Lazy>>> = + Lazy::new(|| Arc::new(Mutex::new(HashSet::new()))); + pub struct WorkloadRunnerService; #[tonic::async_trait] @@ -20,7 +24,7 @@ impl WorkloadRunner for WorkloadRunnerService { let execute_request = req.into_inner(); - let runner = Runner::new_from_execute_request(execute_request) + let runner = Runner::new_from_execute_request(execute_request, CHILD_PROCESSES.clone()) .map_err(|e| tonic::Status::internal(e.to_string()))?; let res = runner @@ -45,7 +49,7 @@ impl WorkloadRunner for WorkloadRunnerService { } async fn signal(&self, _: Request) -> Result<()> { - let child_processes = self.child_processes.lock().await; + let child_processes = CHILD_PROCESSES.lock().await; for &child_id in child_processes.iter() { nix::sys::signal::kill( From cd97bd14d5d34a47ec493a911fa551f5ce8371f2 Mon Sep 17 00:00:00 2001 From: ESPIE Date: Thu, 2 May 2024 18:12:52 +0200 Subject: [PATCH 3/5] fix: changed `std::process::Command` to `tokio::process::Command` to be non blocking Signed-off-by: ESPIE --- src/agent/src/agents/rust.rs | 17 ++++++++++------- src/agent/src/workload/service.rs | 8 +++++--- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index b8c70ec..1117cc5 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -6,7 +6,8 @@ use std::collections::HashSet; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::{fs::create_dir_all, process::Command}; +use std::{fs::create_dir_all}; +use tokio::process::Command; use tokio::sync::Mutex; #[derive(Deserialize)] @@ -39,9 +40,9 @@ impl RustAgent { .spawn() .expect("Failed to build function"); - child_processes.lock().await.insert(child.id()); + child_processes.lock().await.insert(child.id().unwrap()); - let output = child.wait_with_output().expect("Failed to wait on child"); + let output = child.wait_with_output().await.expect("Failed to wait on child"); Ok(AgentOutput { exit_code: output.status.code().unwrap(), @@ -49,11 +50,13 @@ impl RustAgent { stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), }) } else { - let output = Command::new("cargo") + let child = Command::new("cargo") .arg("build") .current_dir(function_dir) - .output() + .spawn() .expect("Failed to build function"); + + let output = child.wait_with_output().await.expect("Failed to wait on child"); Ok(AgentOutput { exit_code: output.status.code().unwrap(), @@ -158,9 +161,9 @@ impl Agent for RustAgent { .spawn() .expect("Failed to run function"); - child_processes.lock().await.insert(child.id()); + child_processes.lock().await.insert(child.id().unwrap()); - let output = child.wait_with_output().expect("Failed to wait on child"); + let output = child.wait_with_output().await.expect("Failed to wait on child"); let agent_output = AgentOutput { exit_code: output.status.code().unwrap(), diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index d28d991..3c25477 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -52,11 +52,13 @@ impl WorkloadRunner for WorkloadRunnerService { let child_processes = CHILD_PROCESSES.lock().await; for &child_id in child_processes.iter() { - nix::sys::signal::kill( + match nix::sys::signal::kill( nix::unistd::Pid::from_raw(child_id as i32), nix::sys::signal::Signal::SIGTERM, - ) - .unwrap(); + ) { + Ok(_) => println!("Sent SIGTERM to child process {}", child_id), + Err(e) => println!("Failed to send SIGTERM to child process {}: {}", child_id, e), + } } process::exit(0); From c6c5dc0af40b5add06b54a0a9804d9b374efb607 Mon Sep 17 00:00:00 2001 From: ESPIE Date: Thu, 2 May 2024 18:29:14 +0200 Subject: [PATCH 4/5] chore: cargo fmt Signed-off-by: ESPIE --- src/agent/src/agents/rust.rs | 19 ++++++++++++++----- src/agent/src/workload/service.rs | 5 ++++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 1117cc5..6ec41d0 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -3,10 +3,10 @@ use crate::{workload, AgentError, AgentResult}; use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; use std::collections::HashSet; +use std::fs::create_dir_all; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::{fs::create_dir_all}; use tokio::process::Command; use tokio::sync::Mutex; @@ -42,7 +42,10 @@ impl RustAgent { child_processes.lock().await.insert(child.id().unwrap()); - let output = child.wait_with_output().await.expect("Failed to wait on child"); + let output = child + .wait_with_output() + .await + .expect("Failed to wait on child"); Ok(AgentOutput { exit_code: output.status.code().unwrap(), @@ -55,8 +58,11 @@ impl RustAgent { .current_dir(function_dir) .spawn() .expect("Failed to build function"); - - let output = child.wait_with_output().await.expect("Failed to wait on child"); + + let output = child + .wait_with_output() + .await + .expect("Failed to wait on child"); Ok(AgentOutput { exit_code: output.status.code().unwrap(), @@ -163,7 +169,10 @@ impl Agent for RustAgent { child_processes.lock().await.insert(child.id().unwrap()); - let output = child.wait_with_output().await.expect("Failed to wait on child"); + let output = child + .wait_with_output() + .await + .expect("Failed to wait on child"); let agent_output = AgentOutput { exit_code: output.status.code().unwrap(), diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index 3c25477..996d256 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -57,7 +57,10 @@ impl WorkloadRunner for WorkloadRunnerService { nix::sys::signal::Signal::SIGTERM, ) { Ok(_) => println!("Sent SIGTERM to child process {}", child_id), - Err(e) => println!("Failed to send SIGTERM to child process {}: {}", child_id, e), + Err(e) => println!( + "Failed to send SIGTERM to child process {}: {}", + child_id, e + ), } } From 4f61ad851a8c4bf48c2031b3ae778bc087c64e9e Mon Sep 17 00:00:00 2001 From: Mateo Fernandez Date: Fri, 3 May 2024 09:49:30 +0200 Subject: [PATCH 5/5] refactor: use async_trait instead of Pin>> Signed-off-by: Mateo Fernandez --- src/agent/Cargo.toml | 5 +- src/agent/src/agents/debug.rs | 74 ++++++------- src/agent/src/agents/mod.rs | 14 +-- src/agent/src/agents/rust.rs | 172 +++++++++++++++---------------- src/agent/src/workload/runner.rs | 18 ++-- 5 files changed, 134 insertions(+), 149 deletions(-) diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index 716f20c..3f531e1 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -8,7 +8,10 @@ name = "agent" path = "src/lib.rs" [dependencies] +async-trait = "0.1.80" clap = { version = "4.5.4", features = ["derive", "env"] } +nix = { version = "0.28.0", features = ["signal"] } +once_cell = "1.19.0" prost = "0.12.4" rand = "0.8.5" serde = { version = "1.0.197", features = ["derive"] } @@ -16,8 +19,6 @@ tokio = { version = "1.37.0", features = ["full"] } tokio-stream = "0.1.15" toml = "0.8.12" tonic = "0.11" -nix = { version = "0.28.0", features = ["signal"] } -once_cell = "1.19.0" [build-dependencies] tonic-build = "0.11" diff --git a/src/agent/src/agents/debug.rs b/src/agent/src/agents/debug.rs index 3161d1c..dbbf241 100644 --- a/src/agent/src/agents/debug.rs +++ b/src/agent/src/agents/debug.rs @@ -1,10 +1,9 @@ use super::AgentOutput; use crate::agents::Agent; use crate::{workload, AgentResult}; +use async_trait::async_trait; use std::collections::HashSet; use std::fs::create_dir_all; -use std::future::Future; -use std::pin::Pin; use std::sync::Arc; use std::time::SystemTime; use tokio::sync::Mutex; @@ -19,53 +18,44 @@ impl From for DebugAgent { } } +#[async_trait] impl Agent for DebugAgent { - fn prepare( - &self, - _: &Arc>>, - ) -> Pin> + Send + '_>> { - Box::pin(async { - let dir = format!("/tmp/{}", self.workload_config.workload_name); - - println!("Function directory: {}", dir); - - create_dir_all(&dir).expect("Unable to create directory"); - - std::fs::write( - format!("{}/debug.txt", &dir), - format!( - "Debug agent for {} - written at {:?}", - self.workload_config.workload_name, - SystemTime::now(), - ), - ) - .expect("Unable to write debug.txt file"); - - Ok(AgentOutput { - exit_code: 0, - stdout: "Build successfully!".into(), - stderr: String::default(), - }) + async fn prepare(&self, _: Arc>>) -> AgentResult { + let dir = format!("/tmp/{}", self.workload_config.workload_name); + + println!("Function directory: {}", dir); + + create_dir_all(&dir).expect("Unable to create directory"); + + std::fs::write( + format!("{}/debug.txt", &dir), + format!( + "Debug agent for {} - written at {:?}", + self.workload_config.workload_name, + SystemTime::now(), + ), + ) + .expect("Unable to write debug.txt file"); + + Ok(AgentOutput { + exit_code: 0, + stdout: "Build successfully!".into(), + stderr: String::default(), }) } - fn run( - &self, - _: &Arc>>, - ) -> Pin> + Send + '_>> { - Box::pin(async { - let dir = format!("/tmp/{}", self.workload_config.workload_name); + async fn run(&self, _: Arc>>) -> AgentResult { + let dir = format!("/tmp/{}", self.workload_config.workload_name); - let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)) - .expect("Unable to read debug.txt file"); + let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)) + .expect("Unable to read debug.txt file"); - std::fs::remove_dir_all(dir).expect("Unable to remove directory"); + std::fs::remove_dir_all(dir).expect("Unable to remove directory"); - Ok(AgentOutput { - exit_code: 0, - stdout: content, - stderr: String::default(), - }) + Ok(AgentOutput { + exit_code: 0, + stdout: content, + stderr: String::default(), }) } } diff --git a/src/agent/src/agents/mod.rs b/src/agent/src/agents/mod.rs index eaf789d..9d4032c 100644 --- a/src/agent/src/agents/mod.rs +++ b/src/agent/src/agents/mod.rs @@ -1,8 +1,7 @@ use crate::{AgentError, AgentResult}; +use async_trait::async_trait; use serde::Deserialize; use std::collections::HashSet; -use std::future::Future; -use std::pin::Pin; use std::sync::Arc; use tokio::sync::Mutex; @@ -17,15 +16,10 @@ pub struct AgentOutput { pub stderr: String, } +#[async_trait] pub trait Agent { - fn prepare<'a>( - &'a self, - child_processes: &'a Arc>>, - ) -> Pin> + Send + '_>>; - fn run<'a>( - &'a self, - child_processes: &'a Arc>>, - ) -> Pin> + Send + '_>>; + async fn prepare(&self, child_processes: Arc>>) -> AgentResult; + async fn run(&self, child_processes: Arc>>) -> AgentResult; } #[derive(Debug, Clone, Deserialize)] diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 6ec41d0..207276e 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -1,11 +1,10 @@ use super::{Agent, AgentOutput}; use crate::{workload, AgentError, AgentResult}; +use async_trait::async_trait; use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; use std::collections::HashSet; use std::fs::create_dir_all; -use std::future::Future; -use std::pin::Pin; use std::sync::Arc; use tokio::process::Command; use tokio::sync::Mutex; @@ -29,8 +28,8 @@ pub struct RustAgent { impl RustAgent { async fn build( &self, - function_dir: &String, - child_processes: &Arc>>, + function_dir: &str, + child_processes: Arc>>, ) -> AgentResult { if self.rust_config.build.release { let child = Command::new("cargo") @@ -40,7 +39,9 @@ impl RustAgent { .spawn() .expect("Failed to build function"); - child_processes.lock().await.insert(child.id().unwrap()); + { + child_processes.lock().await.insert(child.id().unwrap()); + } let output = child .wait_with_output() @@ -85,107 +86,100 @@ impl From for RustAgent { } } +#[async_trait] impl Agent for RustAgent { - fn prepare<'a>( - &'a self, - child_processes: &'a Arc>>, - ) -> Pin> + Send + '_>> { - Box::pin(async { - let function_dir = format!( - "/tmp/{}", - Alphanumeric.sample_string(&mut rand::thread_rng(), 16) - ); - - println!("Function directory: {}", function_dir); - - create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); - - std::fs::write( - format!("{}/src/main.rs", &function_dir), - &self.workload_config.code, - ) - .expect("Unable to write main.rs file"); - - let cargo_toml = format!( - r#" + async fn prepare(&self, child_processes: Arc>>) -> AgentResult { + let function_dir = format!( + "/tmp/{}", + Alphanumeric.sample_string(&mut rand::thread_rng(), 16) + ); + + println!("Function directory: {}", function_dir); + + create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); + + std::fs::write( + format!("{}/src/main.rs", &function_dir), + &self.workload_config.code, + ) + .expect("Unable to write main.rs file"); + + let cargo_toml = format!( + r#" [package] name = "{}" version = "0.1.0" - edition = "2018" + edition = "2021" "#, - self.workload_config.workload_name - ); + self.workload_config.workload_name + ); - std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) - .expect("Unable to write Cargo.toml file"); + std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) + .expect("Unable to write Cargo.toml file"); - let result = self.build(&function_dir, child_processes).await?; + let result = self.build(&function_dir, child_processes).await?; - if result.exit_code != 0 { - println!("Build failed: {:?}", result); - return Err(AgentError::BuildFailed(AgentOutput { - exit_code: result.exit_code, - stdout: result.stdout, - stderr: result.stderr, - })); - } - - // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) - let binary_path = match self.rust_config.build.release { - true => format!( - "{}/target/release/{}", - &function_dir, self.workload_config.workload_name - ), - false => format!( - "{}/target/debug/{}", - &function_dir, self.workload_config.workload_name - ), - }; - - std::fs::copy( - binary_path, - format!("/tmp/{}", self.workload_config.workload_name), - ) - .expect("Unable to copy binary"); - - std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); - - Ok(AgentOutput { + if result.exit_code != 0 { + println!("Build failed: {:?}", result); + return Err(AgentError::BuildFailed(AgentOutput { exit_code: result.exit_code, - stdout: "Build successful".to_string(), - stderr: "".to_string(), - }) + stdout: result.stdout, + stderr: result.stderr, + })); + } + + // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) + let binary_path = match self.rust_config.build.release { + true => format!( + "{}/target/release/{}", + &function_dir, self.workload_config.workload_name + ), + false => format!( + "{}/target/debug/{}", + &function_dir, self.workload_config.workload_name + ), + }; + + std::fs::copy( + binary_path, + format!("/tmp/{}", self.workload_config.workload_name), + ) + .expect("Unable to copy binary"); + + std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); + + Ok(AgentOutput { + exit_code: result.exit_code, + stdout: "Build successful".to_string(), + stderr: "".to_string(), }) } - fn run<'a>( - &'a self, - child_processes: &'a Arc>>, - ) -> Pin> + Send + '_>> { - Box::pin(async { - let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) - .spawn() - .expect("Failed to run function"); + async fn run(&self, child_processes: Arc>>) -> AgentResult { + let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) + .spawn() + .expect("Failed to run function"); + { child_processes.lock().await.insert(child.id().unwrap()); + } - let output = child - .wait_with_output() - .await - .expect("Failed to wait on child"); + let output = child + .wait_with_output() + .await + .expect("Failed to wait on child"); - let agent_output = AgentOutput { - exit_code: output.status.code().unwrap(), - stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), - stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), - }; + let agent_output = AgentOutput { + exit_code: output.status.code().unwrap(), + stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), + stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), + }; - if !output.status.success() { - println!("Run failed: {:?}", agent_output); - return Err(AgentError::BuildFailed(agent_output)); - } + if !output.status.success() { + println!("Run failed: {:?}", agent_output); + return Err(AgentError::BuildFailed(agent_output)); + } - Ok(agent_output) - }) + Ok(agent_output) } } diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index 7e6ce94..1cbeb61 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -1,3 +1,4 @@ +use super::config::Config; use crate::{ agent::ExecuteRequest, agents::{rust, Agent, AgentOutput, Language}, @@ -11,8 +12,6 @@ use tokio::sync::Mutex; #[cfg(feature = "debug-agent")] use crate::agents::debug; -use super::config::Config; - /// Runner for a workload. /// Will execute the workload based on the inner agent (language). pub struct Runner { @@ -46,12 +45,19 @@ impl Runner { pub async fn run(&self) -> AgentResult { let result = match self.config.action { - Action::Prepare => self.agent.prepare(&self.child_processes).await?, - Action::Run => self.agent.run(&self.child_processes).await?, + Action::Prepare => { + self.agent + .prepare(Arc::clone(&self.child_processes)) + .await? + } + Action::Run => self.agent.run(Arc::clone(&self.child_processes)).await?, Action::PrepareAndRun => { - let res = self.agent.prepare(&self.child_processes).await?; + let res = self + .agent + .prepare(Arc::clone(&self.child_processes)) + .await?; println!("Prepare result {:?}", res); - self.agent.run(&self.child_processes).await? + self.agent.run(Arc::clone(&self.child_processes)).await? } };