Skip to content

Commit

Permalink
feat(interactive): Support cancel in interactive (#3310)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #3309

---------

Co-authored-by: Longbin Lai <[email protected]>
  • Loading branch information
lnfjpt and longbinlai authored Nov 7, 2023
1 parent d61c7c7 commit f64e089
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl<T: Send> Clone for MessageSender<T> {
impl<T: Send> Drop for MessageSender<T> {
fn drop(&mut self) {
if !self.is_closed.get() {
warn!("dropping an unclosed 'MessageSender' id = {}", self.id);
trace!("dropping an unclosed 'MessageSender' id = {}", self.id);
self.poison();
self.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<T: Send> Push<T> for ThreadPush<T> {
impl<T> Drop for ThreadPush<T> {
fn drop(&mut self) {
if Arc::strong_count(&self.exhaust) == 2 && !self.exhaust.load(Ordering::SeqCst) {
warn_worker!("{:?}: drop 'ThreadPush' without close;", self.id);
trace_worker!("{:?}: drop 'ThreadPush' without close;", self.id);
// if cfg!(debug_assertions) {
// let bt = backtrace::Backtrace::new();
// error_worker!("caused by:\n{:?}", bt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,23 @@ impl From<std::io::Error> for StartupError {
}
}

#[derive(Debug)]
pub enum CancelError {
JobNotFoundError(u64),
}

impl Display for CancelError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
CancelError::JobNotFoundError(e) => {
write!(f, "fail to find job, job id: {};", e)
}
}
}
}

impl Error for CancelError {}

#[macro_export]
macro_rules! throw_io_error {
() => {{
Expand Down
19 changes: 17 additions & 2 deletions interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ pub mod stream;
pub mod utils;
mod worker;

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};

pub use config::{read_from, Configuration, JobConf, ServerConf};
pub use data::Data;
Expand All @@ -65,14 +66,15 @@ pub use worker::Worker;
pub use worker_id::{get_current_worker, get_current_worker_checked, set_current_worker, WorkerId};

use crate::api::Source;
pub use crate::errors::{BuildJobError, JobSubmitError, SpawnJobError, StartupError};
pub use crate::errors::{BuildJobError, CancelError, JobSubmitError, SpawnJobError, StartupError};
use crate::resource::PartitionedResource;
use crate::result::{ResultSink, ResultStream};
use crate::worker_id::WorkerIdIter;

lazy_static! {
static ref SERVER_ID: Mutex<Option<u64>> = Mutex::new(None);
static ref SERVERS: RwLock<Vec<u64>> = RwLock::new(vec![]);
static ref JOB_CANCEL_MAP: RwLock<HashMap<u64, Arc<AtomicBool>>> = RwLock::new(HashMap::new());
pub static ref PROFILE_TIME_FLAG: bool = configure_with_default!(bool, "PROFILE_TIME_FLAG", false);
pub static ref PROFILE_COMM_FLAG: bool = configure_with_default!(bool, "PROFILE_COMM_FLAG", false);
}
Expand Down Expand Up @@ -261,6 +263,9 @@ where
F: FnMut(&mut Worker<DI, DO>) -> Result<(), BuildJobError>,
{
init_env();
let cancel_hook = sink.get_cancel_hook().clone();
let mut lock = JOB_CANCEL_MAP.write().expect("lock poisoned");
lock.insert(conf.job_id, cancel_hook);
let peer_guard = Arc::new(AtomicUsize::new(0));
let conf = Arc::new(conf);
let workers = allocate_local_worker(&conf)?;
Expand Down Expand Up @@ -293,6 +298,16 @@ where
}
}

pub fn cancel_job(job_id: u64) -> Result<(), CancelError> {
let mut hook = JOB_CANCEL_MAP.write().expect("lock poisoned");
if let Some(cancel_hook) = hook.get_mut(&job_id) {
cancel_hook.store(true, Ordering::SeqCst);
} else {
return Err(CancelError::JobNotFoundError(job_id));
}
Ok(())
}

#[inline]
fn allocate_local_worker(conf: &Arc<JobConf>) -> Result<Option<WorkerIdIter>, BuildJobError> {
let server_conf = conf.servers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,17 @@ message JobResponse {
bytes resp = 2;
}

message CancelRequest {
uint64 job_id = 1;
}

service JobService {

rpc AddLibrary(BinaryResource) returns(Empty) {}

rpc RemoveLibrary(Name) returns(Empty) {}

rpc Cancel(CancelRequest) returns(Empty) {}

rpc Submit(JobRequest) returns(stream JobResponse) {}
}
6 changes: 6 additions & 0 deletions interactive_engine/executor/engine/pegasus/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ where

type SubmitStream = UnboundedReceiverStream<Result<pb::JobResponse, Status>>;

async fn cancel(&self, req: Request<pb::CancelRequest>) -> Result<Response<Empty>, Status> {
let pb::CancelRequest { job_id } = req.into_inner();
pegasus::cancel_job(job_id);
Ok(Response::new(Empty {}))
}

async fn submit(&self, req: Request<pb::JobRequest>) -> Result<Response<Self::SubmitStream>, Status> {
debug!("accept new request from {:?};", req.remote_addr());
let pb::JobRequest { conf, source, plan, resource } = req.into_inner();
Expand Down

0 comments on commit f64e089

Please sign in to comment.