Skip to content

Commit

Permalink
Merge pull request #83 from Thaumy/main
Browse files Browse the repository at this point in the history
Supports notifying remote server of task completion
  • Loading branch information
saying121 authored Nov 29, 2024
2 parents 75345a7 + 86cac5a commit 0d4cc47
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 4 deletions.
8 changes: 7 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ fn main() -> Result<()> {

if let Some(args) = component_args {
let task = Task {
id: None,
wasm_component: fs::read(&args[0])?,
wasm_component_args: args,
end_time: Utc.with_ymd_and_hms(3000, 1, 1, 1, 1, 1).unwrap(),
Expand Down Expand Up @@ -106,12 +107,17 @@ async fn async_tasks(
task_rt.spawn(Some(client.clone()))?;
client.send_info().await?;
loop {
if let Some(task) = client.heartbeat(task_rt.is_idle()).await? {
let finished_task_id = task_rt.finished_task_id();
if let Some(task) = client
.heartbeat(task_rt.is_idle(), finished_task_id)
.await?
{
let end_time = match Utc.timestamp_millis_opt(task.end_time as _) {
LocalResult::Single(t) => t,
_ => bail!("Invalid task end time"),
};
let task = Task {
id: Some(task.id),
wasm_component: task.wasm,
wasm_component_args: task.wasm_args,
end_time,
Expand Down
13 changes: 12 additions & 1 deletion src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;

Expand All @@ -41,6 +42,7 @@ pub use state::PshState;
use crate::services::rpc::RpcClient;

pub struct Task {
pub id: Option<String>,
pub wasm_component: Vec<u8>,
pub wasm_component_args: Vec<String>,
pub end_time: DateTime<Utc>,
Expand All @@ -50,6 +52,7 @@ pub struct TaskRuntime {
tx: Sender<Task>,
rx: Option<Receiver<Task>>,
len: Arc<AtomicUsize>,
finished_task_id: Arc<Mutex<Vec<String>>>,
}

impl TaskRuntime {
Expand All @@ -60,6 +63,7 @@ impl TaskRuntime {
tx,
rx: Some(rx),
len: Arc::new(AtomicUsize::new(0)),
finished_task_id: Arc::new(Mutex::new(vec![])),
})
}

Expand All @@ -74,6 +78,10 @@ impl TaskRuntime {
len == 0
}

pub fn finished_task_id(&mut self) -> Option<String> {
self.finished_task_id.lock().unwrap().pop()
}

pub fn spawn(&mut self, rpc_client: Option<RpcClient>) -> Result<JoinHandle<()>> {
let rx = match self.rx.take() {
Some(rx) => rx,
Expand All @@ -84,6 +92,7 @@ impl TaskRuntime {
let data_export_ctx = DataExportCtx { rpc_client };

let len = self.len.clone();
let finished_task_id = self.finished_task_id.clone();
let handle = thread::spawn(move || {
while let Ok(task) = rx.recv() {
let mut envs = envs.clone();
Expand All @@ -108,7 +117,9 @@ impl TaskRuntime {
}
Err(e) => eprintln!("{}", e),
};

if let Some(id) = task.id {
finished_task_id.lock().unwrap().push(id);
}
len.fetch_sub(1, Ordering::Release);
}
});
Expand Down
5 changes: 5 additions & 0 deletions src/services/host_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl From<RawInfo> for HostInfoRequest {
local_ipv6_addr: value.ipv6.map(|v| v.into()),
instance_id: value.instance_id,
idle: value.idle,
task_id: value.task_id,
}
}
}
Expand All @@ -73,6 +74,7 @@ impl From<&RawInfo> for HostInfoRequest {
local_ipv6_addr: value.ipv6.map(|v| v.into()),
instance_id: value.instance_id.clone(),
idle: value.idle,
task_id: value.task_id.clone(),
}
}
}
Expand All @@ -87,6 +89,7 @@ pub struct RawInfo {
hostname: Option<String>,
instance_id: Option<String>,
idle: bool,
task_id: Option<String>,
}

impl RawInfo {
Expand Down Expand Up @@ -117,6 +120,7 @@ impl RawInfo {
kernel_version: None,
instance_id,
idle: false,
task_id: None,
};

let cpu_hd = CpuHandle::new();
Expand Down Expand Up @@ -162,6 +166,7 @@ impl RawInfo {
hostname: None,
instance_id: self.instance_id.clone(),
idle: false,
task_id: None,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/services/proto
Submodule proto updated 1 files
+6 −3 psh.proto
8 changes: 7 additions & 1 deletion src/services/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,16 @@ impl RpcClient {
Ok(())
}

pub async fn heartbeat(&mut self, idle: bool) -> Result<Option<pb::Task>> {
pub async fn heartbeat(
&mut self,
idle: bool,
finished_task_id: Option<String>,
) -> Result<Option<pb::Task>> {
let req: Request<HostInfoRequest> = {
let raw_info = self.raw_info.to_heartbeat();
let mut req: HostInfoRequest = raw_info.into();
req.idle = idle;
req.task_id = finished_task_id;
let mut req = Request::new(req);
req.metadata_mut()
.insert("authorization", format!("Bearer {}", self.token).parse()?);
Expand Down Expand Up @@ -215,6 +220,7 @@ mod rpc_tests {
local_ipv6_addr: None,
instance_id: None,
idle: false,
task_id: None,
};

let heartbeat = test_send_info(client, info_req);
Expand Down

0 comments on commit 0d4cc47

Please sign in to comment.