From 4f61ad851a8c4bf48c2031b3ae778bc087c64e9e Mon Sep 17 00:00:00 2001 From: Mateo Fernandez Date: Fri, 3 May 2024 09:49:30 +0200 Subject: [PATCH] refactor: use async_trait instead of Pin>> Signed-off-by: Mateo Fernandez --- src/agent/Cargo.toml | 5 +- src/agent/src/agents/debug.rs | 74 ++++++------- src/agent/src/agents/mod.rs | 14 +-- src/agent/src/agents/rust.rs | 172 +++++++++++++++---------------- src/agent/src/workload/runner.rs | 18 ++-- 5 files changed, 134 insertions(+), 149 deletions(-) diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index 716f20c..3f531e1 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -8,7 +8,10 @@ name = "agent" path = "src/lib.rs" [dependencies] +async-trait = "0.1.80" clap = { version = "4.5.4", features = ["derive", "env"] } +nix = { version = "0.28.0", features = ["signal"] } +once_cell = "1.19.0" prost = "0.12.4" rand = "0.8.5" serde = { version = "1.0.197", features = ["derive"] } @@ -16,8 +19,6 @@ tokio = { version = "1.37.0", features = ["full"] } tokio-stream = "0.1.15" toml = "0.8.12" tonic = "0.11" -nix = { version = "0.28.0", features = ["signal"] } -once_cell = "1.19.0" [build-dependencies] tonic-build = "0.11" diff --git a/src/agent/src/agents/debug.rs b/src/agent/src/agents/debug.rs index 3161d1c..dbbf241 100644 --- a/src/agent/src/agents/debug.rs +++ b/src/agent/src/agents/debug.rs @@ -1,10 +1,9 @@ use super::AgentOutput; use crate::agents::Agent; use crate::{workload, AgentResult}; +use async_trait::async_trait; use std::collections::HashSet; use std::fs::create_dir_all; -use std::future::Future; -use std::pin::Pin; use std::sync::Arc; use std::time::SystemTime; use tokio::sync::Mutex; @@ -19,53 +18,44 @@ impl From for DebugAgent { } } +#[async_trait] impl Agent for DebugAgent { - fn prepare( - &self, - _: &Arc>>, - ) -> Pin> + Send + '_>> { - Box::pin(async { - let dir = format!("/tmp/{}", self.workload_config.workload_name); - - println!("Function directory: {}", dir); - - create_dir_all(&dir).expect("Unable to create directory"); - - std::fs::write( - format!("{}/debug.txt", &dir), - format!( - "Debug agent for {} - written at {:?}", - self.workload_config.workload_name, - SystemTime::now(), - ), - ) - .expect("Unable to write debug.txt file"); - - Ok(AgentOutput { - exit_code: 0, - stdout: "Build successfully!".into(), - stderr: String::default(), - }) + async fn prepare(&self, _: Arc>>) -> AgentResult { + let dir = format!("/tmp/{}", self.workload_config.workload_name); + + println!("Function directory: {}", dir); + + create_dir_all(&dir).expect("Unable to create directory"); + + std::fs::write( + format!("{}/debug.txt", &dir), + format!( + "Debug agent for {} - written at {:?}", + self.workload_config.workload_name, + SystemTime::now(), + ), + ) + .expect("Unable to write debug.txt file"); + + Ok(AgentOutput { + exit_code: 0, + stdout: "Build successfully!".into(), + stderr: String::default(), }) } - fn run( - &self, - _: &Arc>>, - ) -> Pin> + Send + '_>> { - Box::pin(async { - let dir = format!("/tmp/{}", self.workload_config.workload_name); + async fn run(&self, _: Arc>>) -> AgentResult { + let dir = format!("/tmp/{}", self.workload_config.workload_name); - let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)) - .expect("Unable to read debug.txt file"); + let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)) + .expect("Unable to read debug.txt file"); - std::fs::remove_dir_all(dir).expect("Unable to remove directory"); + std::fs::remove_dir_all(dir).expect("Unable to remove directory"); - Ok(AgentOutput { - exit_code: 0, - stdout: content, - stderr: String::default(), - }) + Ok(AgentOutput { + exit_code: 0, + stdout: content, + stderr: String::default(), }) } } diff --git a/src/agent/src/agents/mod.rs b/src/agent/src/agents/mod.rs index eaf789d..9d4032c 100644 --- a/src/agent/src/agents/mod.rs +++ b/src/agent/src/agents/mod.rs @@ -1,8 +1,7 @@ use crate::{AgentError, AgentResult}; +use async_trait::async_trait; use serde::Deserialize; use std::collections::HashSet; -use std::future::Future; -use std::pin::Pin; use std::sync::Arc; use tokio::sync::Mutex; @@ -17,15 +16,10 @@ pub struct AgentOutput { pub stderr: String, } +#[async_trait] pub trait Agent { - fn prepare<'a>( - &'a self, - child_processes: &'a Arc>>, - ) -> Pin> + Send + '_>>; - fn run<'a>( - &'a self, - child_processes: &'a Arc>>, - ) -> Pin> + Send + '_>>; + async fn prepare(&self, child_processes: Arc>>) -> AgentResult; + async fn run(&self, child_processes: Arc>>) -> AgentResult; } #[derive(Debug, Clone, Deserialize)] diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 6ec41d0..207276e 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -1,11 +1,10 @@ use super::{Agent, AgentOutput}; use crate::{workload, AgentError, AgentResult}; +use async_trait::async_trait; use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; use std::collections::HashSet; use std::fs::create_dir_all; -use std::future::Future; -use std::pin::Pin; use std::sync::Arc; use tokio::process::Command; use tokio::sync::Mutex; @@ -29,8 +28,8 @@ pub struct RustAgent { impl RustAgent { async fn build( &self, - function_dir: &String, - child_processes: &Arc>>, + function_dir: &str, + child_processes: Arc>>, ) -> AgentResult { if self.rust_config.build.release { let child = Command::new("cargo") @@ -40,7 +39,9 @@ impl RustAgent { .spawn() .expect("Failed to build function"); - child_processes.lock().await.insert(child.id().unwrap()); + { + child_processes.lock().await.insert(child.id().unwrap()); + } let output = child .wait_with_output() @@ -85,107 +86,100 @@ impl From for RustAgent { } } +#[async_trait] impl Agent for RustAgent { - fn prepare<'a>( - &'a self, - child_processes: &'a Arc>>, - ) -> Pin> + Send + '_>> { - Box::pin(async { - let function_dir = format!( - "/tmp/{}", - Alphanumeric.sample_string(&mut rand::thread_rng(), 16) - ); - - println!("Function directory: {}", function_dir); - - create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); - - std::fs::write( - format!("{}/src/main.rs", &function_dir), - &self.workload_config.code, - ) - .expect("Unable to write main.rs file"); - - let cargo_toml = format!( - r#" + async fn prepare(&self, child_processes: Arc>>) -> AgentResult { + let function_dir = format!( + "/tmp/{}", + Alphanumeric.sample_string(&mut rand::thread_rng(), 16) + ); + + println!("Function directory: {}", function_dir); + + create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); + + std::fs::write( + format!("{}/src/main.rs", &function_dir), + &self.workload_config.code, + ) + .expect("Unable to write main.rs file"); + + let cargo_toml = format!( + r#" [package] name = "{}" version = "0.1.0" - edition = "2018" + edition = "2021" "#, - self.workload_config.workload_name - ); + self.workload_config.workload_name + ); - std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) - .expect("Unable to write Cargo.toml file"); + std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) + .expect("Unable to write Cargo.toml file"); - let result = self.build(&function_dir, child_processes).await?; + let result = self.build(&function_dir, child_processes).await?; - if result.exit_code != 0 { - println!("Build failed: {:?}", result); - return Err(AgentError::BuildFailed(AgentOutput { - exit_code: result.exit_code, - stdout: result.stdout, - stderr: result.stderr, - })); - } - - // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) - let binary_path = match self.rust_config.build.release { - true => format!( - "{}/target/release/{}", - &function_dir, self.workload_config.workload_name - ), - false => format!( - "{}/target/debug/{}", - &function_dir, self.workload_config.workload_name - ), - }; - - std::fs::copy( - binary_path, - format!("/tmp/{}", self.workload_config.workload_name), - ) - .expect("Unable to copy binary"); - - std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); - - Ok(AgentOutput { + if result.exit_code != 0 { + println!("Build failed: {:?}", result); + return Err(AgentError::BuildFailed(AgentOutput { exit_code: result.exit_code, - stdout: "Build successful".to_string(), - stderr: "".to_string(), - }) + stdout: result.stdout, + stderr: result.stderr, + })); + } + + // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) + let binary_path = match self.rust_config.build.release { + true => format!( + "{}/target/release/{}", + &function_dir, self.workload_config.workload_name + ), + false => format!( + "{}/target/debug/{}", + &function_dir, self.workload_config.workload_name + ), + }; + + std::fs::copy( + binary_path, + format!("/tmp/{}", self.workload_config.workload_name), + ) + .expect("Unable to copy binary"); + + std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); + + Ok(AgentOutput { + exit_code: result.exit_code, + stdout: "Build successful".to_string(), + stderr: "".to_string(), }) } - fn run<'a>( - &'a self, - child_processes: &'a Arc>>, - ) -> Pin> + Send + '_>> { - Box::pin(async { - let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) - .spawn() - .expect("Failed to run function"); + async fn run(&self, child_processes: Arc>>) -> AgentResult { + let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) + .spawn() + .expect("Failed to run function"); + { child_processes.lock().await.insert(child.id().unwrap()); + } - let output = child - .wait_with_output() - .await - .expect("Failed to wait on child"); + let output = child + .wait_with_output() + .await + .expect("Failed to wait on child"); - let agent_output = AgentOutput { - exit_code: output.status.code().unwrap(), - stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), - stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), - }; + let agent_output = AgentOutput { + exit_code: output.status.code().unwrap(), + stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), + stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), + }; - if !output.status.success() { - println!("Run failed: {:?}", agent_output); - return Err(AgentError::BuildFailed(agent_output)); - } + if !output.status.success() { + println!("Run failed: {:?}", agent_output); + return Err(AgentError::BuildFailed(agent_output)); + } - Ok(agent_output) - }) + Ok(agent_output) } } diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index 7e6ce94..1cbeb61 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -1,3 +1,4 @@ +use super::config::Config; use crate::{ agent::ExecuteRequest, agents::{rust, Agent, AgentOutput, Language}, @@ -11,8 +12,6 @@ use tokio::sync::Mutex; #[cfg(feature = "debug-agent")] use crate::agents::debug; -use super::config::Config; - /// Runner for a workload. /// Will execute the workload based on the inner agent (language). pub struct Runner { @@ -46,12 +45,19 @@ impl Runner { pub async fn run(&self) -> AgentResult { let result = match self.config.action { - Action::Prepare => self.agent.prepare(&self.child_processes).await?, - Action::Run => self.agent.run(&self.child_processes).await?, + Action::Prepare => { + self.agent + .prepare(Arc::clone(&self.child_processes)) + .await? + } + Action::Run => self.agent.run(Arc::clone(&self.child_processes)).await?, Action::PrepareAndRun => { - let res = self.agent.prepare(&self.child_processes).await?; + let res = self + .agent + .prepare(Arc::clone(&self.child_processes)) + .await?; println!("Prepare result {:?}", res); - self.agent.run(&self.child_processes).await? + self.agent.run(Arc::clone(&self.child_processes)).await? } };