From 568be7a89a7034a45d5dcd53a0017276b1ba5609 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Sun, 28 Apr 2024 14:23:33 -0700 Subject: [PATCH] support running with TABBY_WEBSERVER_CONTROLLER_ONESHOT --- ee/tabby-webserver/src/cron/controller.rs | 56 ++++++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/ee/tabby-webserver/src/cron/controller.rs b/ee/tabby-webserver/src/cron/controller.rs index 61ccf0805f22..f0e31bcbea73 100644 --- a/ee/tabby-webserver/src/cron/controller.rs +++ b/ee/tabby-webserver/src/cron/controller.rs @@ -1,4 +1,4 @@ -use std::{pin::Pin, sync::Arc}; +use std::{pin::Pin, sync::Arc, time::Duration}; use futures::Future; use juniper::ID; @@ -10,6 +10,7 @@ use crate::schema::job::JobService; pub struct JobController { scheduler: JobScheduler, service: Arc, + is_oneshot: bool, } impl JobController { @@ -18,7 +19,17 @@ impl JobController { let scheduler = JobScheduler::new() .await .expect("failed to create job scheduler"); - Self { scheduler, service } + let is_oneshot = std::env::var("TABBY_WEBSERVER_CONTROLLER_ONESHOT").is_ok(); + if is_oneshot { + warn!( + "Running controller job as oneshot, this should only be used for debugging purpose..." + ); + } + Self { + scheduler, + service, + is_oneshot, + } } pub async fn run(&self) { @@ -60,6 +71,47 @@ impl JobController { } async fn register_impl(&mut self, is_public: bool, name: &str, schedule: &str, func: T) + where + T: FnMut(&JobContext) -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + if self.is_oneshot { + self.run_oneshot(is_public, name, func).await; + } else { + self.run_schedule(is_public, name, schedule, func).await; + }; + } + + async fn run_oneshot(&self, is_public: bool, name: &str, mut func: T) + where + T: FnMut(&JobContext) -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + let name = name.to_owned(); + let context = JobContext::new(is_public, &name, self.service.clone()).await; + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(5)).await; + + match func(&context).await { + Ok(exit_code) => { + debug!("Job `{}` completed with exit code {}", &name, exit_code); + context.complete(exit_code).await; + } + Err(e) => { + warn!("Job `{}` failed: {}", &name, e); + context.complete(-1).await; + } + } + }); + } + + async fn run_schedule(&mut self, is_public: bool, name: &str, schedule: &str, func: T) where T: FnMut(&JobContext) -> Pin> + Send>> + Send