From d7205dc9eb351fbfc6820b78eb2ede32eb20e3fd Mon Sep 17 00:00:00 2001 From: Thaumy Date: Fri, 29 Nov 2024 14:05:13 +0800 Subject: [PATCH 1/2] update proto def --- src/services/proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/proto b/src/services/proto index f21cf63..d5ae18c 160000 --- a/src/services/proto +++ b/src/services/proto @@ -1 +1 @@ -Subproject commit f21cf63baa6b47e2d51fe563df2575afee521a4b +Subproject commit d5ae18c050863092eee9aebbf1063a08d76a28dd From 86cac5ab81eac25521ba5f892dda93489374d05c Mon Sep 17 00:00:00 2001 From: Thaumy Date: Fri, 29 Nov 2024 16:58:59 +0800 Subject: [PATCH 2/2] supports notifying remote server of task completion --- src/main.rs | 8 +++++++- src/runtime/mod.rs | 13 ++++++++++++- src/services/host_info.rs | 5 +++++ src/services/rpc.rs | 8 +++++++- 4 files changed, 31 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 625db57..d617b95 100644 --- a/src/main.rs +++ b/src/main.rs @@ -68,6 +68,7 @@ fn main() -> Result<()> { if let Some(args) = component_args { let task = Task { + id: None, wasm_component: fs::read(&args[0])?, wasm_component_args: args, end_time: Utc.with_ymd_and_hms(3000, 1, 1, 1, 1, 1).unwrap(), @@ -106,12 +107,17 @@ async fn async_tasks( task_rt.spawn(Some(client.clone()))?; client.send_info().await?; loop { - if let Some(task) = client.heartbeat(task_rt.is_idle()).await? { + let finished_task_id = task_rt.finished_task_id(); + if let Some(task) = client + .heartbeat(task_rt.is_idle(), finished_task_id) + .await? + { let end_time = match Utc.timestamp_millis_opt(task.end_time as _) { LocalResult::Single(t) => t, _ => bail!("Invalid task end time"), }; let task = Task { + id: Some(task.id), wasm_component: task.wasm, wasm_component_args: task.wasm_args, end_time, diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index dc7be1a..a859a6c 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -26,6 +26,7 @@ use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; use std::sync::mpsc::Sender; use std::sync::Arc; +use std::sync::Mutex; use std::thread; use std::thread::JoinHandle; @@ -41,6 +42,7 @@ pub use state::PshState; use crate::services::rpc::RpcClient; pub struct Task { + pub id: Option, pub wasm_component: Vec, pub wasm_component_args: Vec, pub end_time: DateTime, @@ -50,6 +52,7 @@ pub struct TaskRuntime { tx: Sender, rx: Option>, len: Arc, + finished_task_id: Arc>>, } impl TaskRuntime { @@ -60,6 +63,7 @@ impl TaskRuntime { tx, rx: Some(rx), len: Arc::new(AtomicUsize::new(0)), + finished_task_id: Arc::new(Mutex::new(vec![])), }) } @@ -74,6 +78,10 @@ impl TaskRuntime { len == 0 } + pub fn finished_task_id(&mut self) -> Option { + self.finished_task_id.lock().unwrap().pop() + } + pub fn spawn(&mut self, rpc_client: Option) -> Result> { let rx = match self.rx.take() { Some(rx) => rx, @@ -84,6 +92,7 @@ impl TaskRuntime { let data_export_ctx = DataExportCtx { rpc_client }; let len = self.len.clone(); + let finished_task_id = self.finished_task_id.clone(); let handle = thread::spawn(move || { while let Ok(task) = rx.recv() { let mut envs = envs.clone(); @@ -108,7 +117,9 @@ impl TaskRuntime { } Err(e) => eprintln!("{}", e), }; - + if let Some(id) = task.id { + finished_task_id.lock().unwrap().push(id); + } len.fetch_sub(1, Ordering::Release); } }); diff --git a/src/services/host_info.rs b/src/services/host_info.rs index 5f41544..33d6226 100644 --- a/src/services/host_info.rs +++ b/src/services/host_info.rs @@ -58,6 +58,7 @@ impl From for HostInfoRequest { local_ipv6_addr: value.ipv6.map(|v| v.into()), instance_id: value.instance_id, idle: value.idle, + task_id: value.task_id, } } } @@ -73,6 +74,7 @@ impl From<&RawInfo> for HostInfoRequest { local_ipv6_addr: value.ipv6.map(|v| v.into()), instance_id: value.instance_id.clone(), idle: value.idle, + task_id: value.task_id.clone(), } } } @@ -87,6 +89,7 @@ pub struct RawInfo { hostname: Option, instance_id: Option, idle: bool, + task_id: Option, } impl RawInfo { @@ -117,6 +120,7 @@ impl RawInfo { kernel_version: None, instance_id, idle: false, + task_id: None, }; let cpu_hd = CpuHandle::new(); @@ -162,6 +166,7 @@ impl RawInfo { hostname: None, instance_id: self.instance_id.clone(), idle: false, + task_id: None, } } diff --git a/src/services/rpc.rs b/src/services/rpc.rs index 7036bf7..1348432 100644 --- a/src/services/rpc.rs +++ b/src/services/rpc.rs @@ -81,11 +81,16 @@ impl RpcClient { Ok(()) } - pub async fn heartbeat(&mut self, idle: bool) -> Result> { + pub async fn heartbeat( + &mut self, + idle: bool, + finished_task_id: Option, + ) -> Result> { let req: Request = { let raw_info = self.raw_info.to_heartbeat(); let mut req: HostInfoRequest = raw_info.into(); req.idle = idle; + req.task_id = finished_task_id; let mut req = Request::new(req); req.metadata_mut() .insert("authorization", format!("Bearer {}", self.token).parse()?); @@ -215,6 +220,7 @@ mod rpc_tests { local_ipv6_addr: None, instance_id: None, idle: false, + task_id: None, }; let heartbeat = test_send_info(client, info_req);