Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent: sigterm children #36

Merged
merged 5 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ name = "agent"
path = "src/lib.rs"

[dependencies]
async-trait = "0.1.80"
clap = { version = "4.5.4", features = ["derive", "env"] }
nix = { version = "0.28.0", features = ["signal"] }
once_cell = "1.19.0"
prost = "0.12.4"
rand = "0.8.5"
serde = { version = "1.0.197", features = ["derive"] }
Expand Down
9 changes: 7 additions & 2 deletions src/agent/src/agents/debug.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use super::AgentOutput;
use crate::agents::Agent;
use crate::{workload, AgentResult};
use async_trait::async_trait;
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::Mutex;

pub struct DebugAgent {
workload_config: workload::config::Config,
Expand All @@ -14,8 +18,9 @@ impl From<workload::config::Config> for DebugAgent {
}
}

#[async_trait]
impl Agent for DebugAgent {
fn prepare(&self) -> AgentResult<AgentOutput> {
async fn prepare(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

println!("Function directory: {}", dir);
Expand All @@ -39,7 +44,7 @@ impl Agent for DebugAgent {
})
}

fn run(&self) -> AgentResult<AgentOutput> {
async fn run(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

let content = std::fs::read_to_string(format!("{}/debug.txt", &dir))
Expand Down
9 changes: 7 additions & 2 deletions src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use crate::{AgentError, AgentResult};
use async_trait::async_trait;
use serde::Deserialize;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;

#[cfg(feature = "debug-agent")]
pub mod debug;
Expand All @@ -12,9 +16,10 @@ pub struct AgentOutput {
pub stderr: String,
}

#[async_trait]
pub trait Agent {
fn prepare(&self) -> AgentResult<AgentOutput>;
fn run(&self) -> AgentResult<AgentOutput>;
async fn prepare(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
async fn run(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
}

#[derive(Debug, Clone, Deserialize)]
Expand Down
57 changes: 45 additions & 12 deletions src/agent/src/agents/rust.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use super::{Agent, AgentOutput};
use crate::{workload, AgentError, AgentResult};
use async_trait::async_trait;
use rand::distributions::{Alphanumeric, DistString};
use serde::Deserialize;
use std::{fs::create_dir_all, process::Command};
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::sync::Arc;
use tokio::process::Command;
use tokio::sync::Mutex;

#[derive(Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -21,27 +26,45 @@ pub struct RustAgent {
}

impl RustAgent {
fn build(&self, function_dir: &String) -> AgentResult<AgentOutput> {
async fn build(
&self,
function_dir: &str,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> AgentResult<AgentOutput> {
if self.rust_config.build.release {
let output = Command::new("cargo")
let child = Command::new("cargo")
.arg("build")
.arg("--release")
.current_dir(function_dir)
.output()
.spawn()
.expect("Failed to build function");

{
child_processes.lock().await.insert(child.id().unwrap());
}

let output = child
.wait_with_output()
.await
.expect("Failed to wait on child");

Ok(AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(),
})
} else {
let output = Command::new("cargo")
let child = Command::new("cargo")
.arg("build")
.current_dir(function_dir)
.output()
.spawn()
.expect("Failed to build function");

let output = child
.wait_with_output()
.await
.expect("Failed to wait on child");

Ok(AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
Expand All @@ -63,8 +86,9 @@ impl From<workload::config::Config> for RustAgent {
}
}

#[async_trait]
impl Agent for RustAgent {
fn prepare(&self) -> AgentResult<AgentOutput> {
async fn prepare(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let function_dir = format!(
"/tmp/{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
Expand All @@ -85,15 +109,15 @@ impl Agent for RustAgent {
[package]
name = "{}"
version = "0.1.0"
edition = "2018"
edition = "2021"
"#,
self.workload_config.workload_name
);

std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml)
.expect("Unable to write Cargo.toml file");

let result = self.build(&function_dir)?;
let result = self.build(&function_dir, child_processes).await?;

if result.exit_code != 0 {
println!("Build failed: {:?}", result);
Expand Down Expand Up @@ -131,11 +155,20 @@ impl Agent for RustAgent {
})
}

fn run(&self) -> AgentResult<AgentOutput> {
let output = Command::new(format!("/tmp/{}", self.workload_config.workload_name))
.output()
async fn run(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name))
.spawn()
.expect("Failed to run function");

{
child_processes.lock().await.insert(child.id().unwrap());
}

let output = child
.wait_with_output()
.await
.expect("Failed to wait on child");

let agent_output = AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
Expand Down
39 changes: 28 additions & 11 deletions src/agent/src/workload/runner.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,63 @@
use super::config::Config;
use crate::{
agent::ExecuteRequest,
agents::{rust, Agent, AgentOutput, Language},
workload::config::Action,
AgentError, AgentResult,
};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;

#[cfg(feature = "debug-agent")]
use crate::agents::debug;

use super::config::Config;

/// Runner for a workload.
/// Will execute the workload based on the inner agent (language).
pub struct Runner {
config: Config,
agent: Box<dyn Agent + Sync + Send>,
child_processes: Arc<Mutex<HashSet<u32>>>,
}

impl Runner {
pub fn new(config: Config) -> Self {
pub fn new(config: Config, child_processes: Arc<Mutex<HashSet<u32>>>) -> Self {
let agent: Box<dyn Agent + Sync + Send> = match config.language {
Language::Rust => Box::new(rust::RustAgent::from(config.clone())),
#[cfg(feature = "debug-agent")]
Language::Debug => Box::new(debug::DebugAgent::from(config.clone())),
};

Runner { config, agent }
Runner {
config,
agent,
child_processes,
}
}

pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result<Self, AgentError> {
pub fn new_from_execute_request(
execute_request: ExecuteRequest,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> Result<Self, AgentError> {
let config = Config::new_from_execute_request(execute_request)?;
Ok(Self::new(config))
Ok(Self::new(config, child_processes))
}

pub fn run(&self) -> AgentResult<AgentOutput> {
pub async fn run(&self) -> AgentResult<AgentOutput> {
let result = match self.config.action {
Action::Prepare => self.agent.prepare()?,
Action::Run => self.agent.run()?,
Action::Prepare => {
self.agent
.prepare(Arc::clone(&self.child_processes))
.await?
}
Action::Run => self.agent.run(Arc::clone(&self.child_processes)).await?,
Action::PrepareAndRun => {
let res = self.agent.prepare()?;
let res = self
.agent
.prepare(Arc::clone(&self.child_processes))
.await?;
println!("Prepare result {:?}", res);
self.agent.run()?
self.agent.run(Arc::clone(&self.child_processes)).await?
}
};

Expand Down
26 changes: 24 additions & 2 deletions src/agent/src/workload/service.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use super::runner::Runner;
use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest};
use agent::workload_runner_server::WorkloadRunner;
use std::process;
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::{process, sync::Arc};
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response};

type Result<T> = std::result::Result<Response<T>, tonic::Status>;

static CHILD_PROCESSES: Lazy<Arc<Mutex<HashSet<u32>>>> =
Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));

pub struct WorkloadRunnerService;

#[tonic::async_trait]
Expand All @@ -18,11 +24,12 @@ impl WorkloadRunner for WorkloadRunnerService {

let execute_request = req.into_inner();

let runner = Runner::new_from_execute_request(execute_request)
let runner = Runner::new_from_execute_request(execute_request, CHILD_PROCESSES.clone())
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let res = runner
.run()
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let _ = tx
Expand All @@ -42,6 +49,21 @@ impl WorkloadRunner for WorkloadRunnerService {
}

async fn signal(&self, _: Request<SignalRequest>) -> Result<()> {
let child_processes = CHILD_PROCESSES.lock().await;

for &child_id in child_processes.iter() {
match nix::sys::signal::kill(
nix::unistd::Pid::from_raw(child_id as i32),
nix::sys::signal::Signal::SIGTERM,
) {
Ok(_) => println!("Sent SIGTERM to child process {}", child_id),
Err(e) => println!(
"Failed to send SIGTERM to child process {}: {}",
child_id, e
),
}
}

process::exit(0);
}
}