diff --git a/interactive_engine/executor/engine/pegasus/common/src/channel.rs b/interactive_engine/executor/engine/pegasus/common/src/channel.rs index e34f2ffdfbe7..24a0695a6e9d 100644 --- a/interactive_engine/executor/engine/pegasus/common/src/channel.rs +++ b/interactive_engine/executor/engine/pegasus/common/src/channel.rs @@ -136,7 +136,7 @@ impl Clone for MessageSender { impl Drop for MessageSender { fn drop(&mut self) { if !self.is_closed.get() { - warn!("dropping an unclosed 'MessageSender' id = {}", self.id); + trace!("dropping an unclosed 'MessageSender' id = {}", self.id); self.poison(); self.close(); } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/data_plane/intra_thread.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/data_plane/intra_thread.rs index 4cd792b3c495..1e34c2d88b99 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/data_plane/intra_thread.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/data_plane/intra_thread.rs @@ -82,7 +82,7 @@ impl Push for ThreadPush { impl Drop for ThreadPush { fn drop(&mut self) { if Arc::strong_count(&self.exhaust) == 2 && !self.exhaust.load(Ordering::SeqCst) { - warn_worker!("{:?}: drop 'ThreadPush' without close;", self.id); + trace_worker!("{:?}: drop 'ThreadPush' without close;", self.id); // if cfg!(debug_assertions) { // let bt = backtrace::Backtrace::new(); // error_worker!("caused by:\n{:?}", bt); diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/errors/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/errors/mod.rs index d2ba8261d8db..a23b59448c44 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/errors/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/errors/mod.rs @@ -340,6 +340,23 @@ impl From for StartupError { } } +#[derive(Debug)] +pub enum CancelError { + JobNotFoundError(u64), +} + +impl Display for CancelError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + CancelError::JobNotFoundError(e) => { + write!(f, "fail to find job, job id: {};", e) + } + } + } +} + +impl Error for CancelError {} + #[macro_export] macro_rules! throw_io_error { () => {{ diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs index 631cd9e2a0ae..57e02d60ddfc 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs @@ -51,9 +51,10 @@ pub mod stream; pub mod utils; mod worker; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; pub use config::{read_from, Configuration, JobConf, ServerConf}; pub use data::Data; @@ -65,7 +66,7 @@ pub use worker::Worker; pub use worker_id::{get_current_worker, get_current_worker_checked, set_current_worker, WorkerId}; use crate::api::Source; -pub use crate::errors::{BuildJobError, JobSubmitError, SpawnJobError, StartupError}; +pub use crate::errors::{BuildJobError, CancelError, JobSubmitError, SpawnJobError, StartupError}; use crate::resource::PartitionedResource; use crate::result::{ResultSink, ResultStream}; use crate::worker_id::WorkerIdIter; @@ -73,6 +74,7 @@ use crate::worker_id::WorkerIdIter; lazy_static! { static ref SERVER_ID: Mutex> = Mutex::new(None); static ref SERVERS: RwLock> = RwLock::new(vec![]); + static ref JOB_CANCEL_MAP: RwLock>> = RwLock::new(HashMap::new()); pub static ref PROFILE_TIME_FLAG: bool = configure_with_default!(bool, "PROFILE_TIME_FLAG", false); pub static ref PROFILE_COMM_FLAG: bool = configure_with_default!(bool, "PROFILE_COMM_FLAG", false); } @@ -261,6 +263,9 @@ where F: FnMut(&mut Worker) -> Result<(), BuildJobError>, { init_env(); + let cancel_hook = sink.get_cancel_hook().clone(); + let mut lock = JOB_CANCEL_MAP.write().expect("lock poisoned"); + lock.insert(conf.job_id, cancel_hook); let peer_guard = Arc::new(AtomicUsize::new(0)); let conf = Arc::new(conf); let workers = allocate_local_worker(&conf)?; @@ -293,6 +298,16 @@ where } } +pub fn cancel_job(job_id: u64) -> Result<(), CancelError> { + let mut hook = JOB_CANCEL_MAP.write().expect("lock poisoned"); + if let Some(cancel_hook) = hook.get_mut(&job_id) { + cancel_hook.store(true, Ordering::SeqCst); + } else { + return Err(CancelError::JobNotFoundError(job_id)); + } + Ok(()) +} + #[inline] fn allocate_local_worker(conf: &Arc) -> Result, BuildJobError> { let server_conf = conf.servers(); diff --git a/interactive_engine/executor/engine/pegasus/server/proto/job_service.proto b/interactive_engine/executor/engine/pegasus/server/proto/job_service.proto index 4b8c1d7ee652..5426205c4934 100644 --- a/interactive_engine/executor/engine/pegasus/server/proto/job_service.proto +++ b/interactive_engine/executor/engine/pegasus/server/proto/job_service.proto @@ -62,11 +62,17 @@ message JobResponse { bytes resp = 2; } +message CancelRequest { + uint64 job_id = 1; +} + service JobService { rpc AddLibrary(BinaryResource) returns(Empty) {} rpc RemoveLibrary(Name) returns(Empty) {} + rpc Cancel(CancelRequest) returns(Empty) {} + rpc Submit(JobRequest) returns(stream JobResponse) {} } diff --git a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs index ee8b9140fb91..7ecbdb9aec46 100644 --- a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs +++ b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs @@ -157,6 +157,12 @@ where type SubmitStream = UnboundedReceiverStream>; + async fn cancel(&self, req: Request) -> Result, Status> { + let pb::CancelRequest { job_id } = req.into_inner(); + pegasus::cancel_job(job_id); + Ok(Response::new(Empty {})) + } + async fn submit(&self, req: Request) -> Result, Status> { debug!("accept new request from {:?};", req.remote_addr()); let pb::JobRequest { conf, source, plan, resource } = req.into_inner();