From adc4328fd0d5bae7f2dd8a1193c895822e0c7d00 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Mon, 29 Apr 2024 18:54:11 -0700 Subject: [PATCH 1/6] support-trigger-job-once --- ee/tabby-webserver/src/cron/controller.rs | 159 +++++++++++----------- ee/tabby-webserver/src/cron/scheduler.rs | 2 - 2 files changed, 82 insertions(+), 79 deletions(-) diff --git a/ee/tabby-webserver/src/cron/controller.rs b/ee/tabby-webserver/src/cron/controller.rs index f0e31bcbea73..ee6031ca9959 100644 --- a/ee/tabby-webserver/src/cron/controller.rs +++ b/ee/tabby-webserver/src/cron/controller.rs @@ -1,7 +1,8 @@ -use std::{pin::Pin, sync::Arc, time::Duration}; +use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; use futures::Future; use juniper::ID; +use rand::Rng; use tokio_cron_scheduler::{Job, JobScheduler}; use tracing::{debug, warn}; @@ -10,7 +11,10 @@ use crate::schema::job::JobService; pub struct JobController { scheduler: JobScheduler, service: Arc, - is_oneshot: bool, + job_registry: HashMap< + String, + Arc Pin + Send>> + Send + Sync + 'static>, + >, } impl JobController { @@ -19,24 +23,45 @@ impl JobController { let scheduler = JobScheduler::new() .await .expect("failed to create job scheduler"); - let is_oneshot = std::env::var("TABBY_WEBSERVER_CONTROLLER_ONESHOT").is_ok(); - if is_oneshot { - warn!( - "Running controller job as oneshot, this should only be used for debugging purpose..." - ); - } Self { scheduler, service, - is_oneshot, + job_registry: HashMap::default(), + } + } + + pub fn schedule(&self, name: &str) { + let func = self + .job_registry + .get(name) + .expect("failed to get job") + .clone(); + let mut rng = rand::thread_rng(); + let delay = rng.gen_range(1..5); + let _ = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(5 + delay)).await; + func().await; + }); + } + + fn run_oneshot(&self) { + warn!( + "Running controller job as oneshot, this should only be used for debugging purpose..." + ); + for name in self.job_registry.keys() { + self.schedule(name); } } pub async fn run(&self) { - self.scheduler - .start() - .await - .expect("failed to start job scheduler") + if std::env::var("TABBY_WEBSERVER_CONTROLLER_ONESHOT").is_ok() { + self.run_oneshot(); + } else { + self.scheduler + .start() + .await + .expect("failed to start job scheduler") + } } /// Register a new job with the scheduler, the job will be displayed in Jobs dashboard. @@ -70,84 +95,64 @@ impl JobController { .await; } - async fn register_impl(&mut self, is_public: bool, name: &str, schedule: &str, func: T) + async fn register_impl(&mut self, is_public: bool, name: &str, schedule: &str, func: F) where - T: FnMut(&JobContext) -> Pin> + Send>> + F: FnMut(&JobContext) -> Pin> + Send>> + Send + Sync + Clone + 'static, { - if self.is_oneshot { - self.run_oneshot(is_public, name, func).await; - } else { - self.run_schedule(is_public, name, schedule, func).await; - }; - } + let cloned_name = name.to_owned(); + let job_mutex = Arc::new(tokio::sync::Mutex::new(())); + let service = self.service.clone(); + self.job_registry.insert( + cloned_name.clone(), + Arc::new(move || { + let job_mutex = job_mutex.clone(); + let service = service.clone(); + let name = cloned_name.clone(); + let mut func = func.clone(); + + Box::pin(async move { + let Ok(_guard) = job_mutex.try_lock() else { + warn!("Job `{}` overlapped, skipping...", name); + return; + }; + + debug!("Running job `{}`", name); + let context = JobContext::new(is_public, &name, service.clone()).await; + match func(&context).await { + Ok(exit_code) => { + debug!("Job `{}` completed with exit code {}", &name, exit_code); + context.complete(exit_code).await; + } + Err(e) => { + warn!("Job `{}` failed: {}", &name, e); + context.complete(-1).await; + } + }; + }) + }), + ); - async fn run_oneshot(&self, is_public: bool, name: &str, mut func: T) - where - T: FnMut(&JobContext) -> Pin> + Send>> - + Send - + Sync - + Clone - + 'static, - { - let name = name.to_owned(); - let context = JobContext::new(is_public, &name, self.service.clone()).await; - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(5)).await; - - match func(&context).await { - Ok(exit_code) => { - debug!("Job `{}` completed with exit code {}", &name, exit_code); - context.complete(exit_code).await; - } - Err(e) => { - warn!("Job `{}` failed: {}", &name, e); - context.complete(-1).await; - } - } - }); + self.run_schedule(name.to_owned(), schedule).await } - async fn run_schedule(&mut self, is_public: bool, name: &str, schedule: &str, func: T) - where - T: FnMut(&JobContext) -> Pin> + Send>> - + Send - + Sync - + Clone - + 'static, - { - let job_mutex = Arc::new(tokio::sync::Mutex::new(())); - let service = self.service.clone(); - let name = name.to_owned(); - let func = func.clone(); + async fn run_schedule(&mut self, name: String, schedule: &str) { + let func = self + .job_registry + .get_mut(&name) + .expect("failed to get job") + .clone(); + let job = Job::new_async(schedule, move |uuid, mut scheduler| { - let job_mutex = job_mutex.clone(); - let service = service.clone(); let name = name.clone(); - let mut func = func.clone(); + let func = func.clone(); Box::pin(async move { - let Ok(_guard) = job_mutex.try_lock() else { - warn!("Job `{}` overlapped, skipping...", name); - return; - }; - debug!("Running job `{}`", name); - let context = JobContext::new(is_public, &name, service.clone()).await; - match func(&context).await { - Ok(exit_code) => { - debug!("Job `{}` completed with exit code {}", &name, exit_code); - context.complete(exit_code).await; - } - Err(e) => { - warn!("Job `{}` failed: {}", &name, e); - context.complete(-1).await; - } - } - + (*func)().await; if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await { debug!( "Next time for job `{}` is {:?}", diff --git a/ee/tabby-webserver/src/cron/scheduler.rs b/ee/tabby-webserver/src/cron/scheduler.rs index 35137a0723cf..b9885d7bd34c 100644 --- a/ee/tabby-webserver/src/cron/scheduler.rs +++ b/ee/tabby-webserver/src/cron/scheduler.rs @@ -2,7 +2,6 @@ use std::{process::Stdio, sync::Arc}; use anyhow::{Context, Result}; use tokio::io::AsyncBufReadExt; -use tracing::debug; use super::{ controller::{JobContext, JobController}, @@ -29,7 +28,6 @@ async fn run_scheduler_now( worker: Arc, local_port: u16, ) -> Result { - debug!("Running scheduler job..."); let exe = std::env::current_exe()?; let mut child = tokio::process::Command::new(exe) From 817a0a7473319d34bd49cef3c32d1b4187709ad8 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Mon, 29 Apr 2024 19:29:33 -0700 Subject: [PATCH 2/6] cleanup job context --- ee/tabby-webserver/src/cron/controller.rs | 39 ++++----- ee/tabby-webserver/src/cron/mod.rs | 8 +- ee/tabby-webserver/src/handler.rs | 2 +- ee/tabby-webserver/src/schema/job.rs | 8 +- ee/tabby-webserver/src/service/job.rs | 97 +---------------------- 5 files changed, 30 insertions(+), 124 deletions(-) diff --git a/ee/tabby-webserver/src/cron/controller.rs b/ee/tabby-webserver/src/cron/controller.rs index ee6031ca9959..02efb0092b56 100644 --- a/ee/tabby-webserver/src/cron/controller.rs +++ b/ee/tabby-webserver/src/cron/controller.rs @@ -1,8 +1,8 @@ use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; use futures::Future; -use juniper::ID; use rand::Rng; +use tabby_db::DbConn; use tokio_cron_scheduler::{Job, JobScheduler}; use tracing::{debug, warn}; @@ -10,7 +10,7 @@ use crate::schema::job::JobService; pub struct JobController { scheduler: JobScheduler, - service: Arc, + db: DbConn, job_registry: HashMap< String, Arc Pin + Send>> + Send + Sync + 'static>, @@ -18,14 +18,16 @@ pub struct JobController { } impl JobController { - pub async fn new(service: Arc) -> Self { - service.cleanup().await.expect("failed to cleanup jobs"); + pub async fn new(db: DbConn) -> Self { + db.finalize_stale_job_runs() + .await + .expect("failed to cleanup stale jobs"); let scheduler = JobScheduler::new() .await .expect("failed to create job scheduler"); Self { scheduler, - service, + db, job_registry: HashMap::default(), } } @@ -105,12 +107,12 @@ impl JobController { { let cloned_name = name.to_owned(); let job_mutex = Arc::new(tokio::sync::Mutex::new(())); - let service = self.service.clone(); + let db = self.db.clone(); self.job_registry.insert( cloned_name.clone(), Arc::new(move || { let job_mutex = job_mutex.clone(); - let service = service.clone(); + let db = db.clone(); let name = cloned_name.clone(); let mut func = func.clone(); @@ -121,7 +123,7 @@ impl JobController { }; debug!("Running job `{}`", name); - let context = JobContext::new(is_public, &name, service.clone()).await; + let context = JobContext::new(is_public, &name, db.clone()).await; match func(&context).await { Ok(exit_code) => { debug!("Job `{}` completed with exit code {}", &name, exit_code); @@ -170,25 +172,24 @@ impl JobController { #[derive(Clone)] pub struct JobContext { - id: ID, - service: Arc, + id: i64, + db: DbConn, } impl JobContext { - async fn new(public: bool, name: &str, service: Arc) -> Self { + async fn new(public: bool, name: &str, db: DbConn) -> Self { let id = if public { - service - .start(name.to_owned()) + db.create_job_run(name.to_owned()) .await .expect("failed to create job") } else { - ID::from("".to_owned()) + -1 }; - Self { id, service } + Self { id: id as i64, db } } fn is_private(&self) -> bool { - self.id.is_empty() + self.id < 0 } pub async fn stdout_writeline(&self, stdout: String) { @@ -197,7 +198,7 @@ impl JobContext { } let stdout = stdout + "\n"; - match self.service.update_stdout(&self.id, stdout).await { + match self.db.update_job_stdout(self.id, stdout).await { Ok(_) => (), Err(_) => { warn!("Failed to write stdout to job `{}`", self.id); @@ -211,7 +212,7 @@ impl JobContext { } let stderr = stderr + "\n"; - match self.service.update_stderr(&self.id, stderr).await { + match self.db.update_job_stderr(self.id, stderr).await { Ok(_) => (), Err(_) => { warn!("Failed to write stderr to job `{}`", self.id); @@ -224,7 +225,7 @@ impl JobContext { return; } - match self.service.complete(&self.id, exit_code).await { + match self.db.update_job_status(self.id, exit_code).await { Ok(_) => (), Err(_) => { warn!("Failed to complete job `{}`", self.id); diff --git a/ee/tabby-webserver/src/cron/mod.rs b/ee/tabby-webserver/src/cron/mod.rs index d3b8165ebe23..ac3ac305724b 100644 --- a/ee/tabby-webserver/src/cron/mod.rs +++ b/ee/tabby-webserver/src/cron/mod.rs @@ -5,10 +5,10 @@ mod scheduler; use std::sync::Arc; use rand::Rng; +use tabby_db::DbConn; use crate::schema::{ - auth::AuthenticationService, job::JobService, repository::RepositoryService, - worker::WorkerService, + auth::AuthenticationService, repository::RepositoryService, worker::WorkerService, }; #[macro_export] @@ -20,13 +20,13 @@ macro_rules! warn_stderr { } pub async fn run_cron( + db: DbConn, auth: Arc, - job: Arc, worker: Arc, repository: Arc, local_port: u16, ) { - let mut controller = controller::JobController::new(job).await; + let mut controller = controller::JobController::new(db).await; db::register_jobs( &mut controller, auth, diff --git a/ee/tabby-webserver/src/handler.rs b/ee/tabby-webserver/src/handler.rs index 5e9e9a687e04..f29dd2d1bd95 100644 --- a/ee/tabby-webserver/src/handler.rs +++ b/ee/tabby-webserver/src/handler.rs @@ -84,8 +84,8 @@ impl WebserverHandle { ) .await; cron::run_cron( + self.db.clone(), ctx.auth(), - ctx.job(), ctx.worker(), ctx.repository(), local_port, diff --git a/ee/tabby-webserver/src/schema/job.rs b/ee/tabby-webserver/src/schema/job.rs index 46d2ed27f106..8eb15ce97863 100644 --- a/ee/tabby-webserver/src/schema/job.rs +++ b/ee/tabby-webserver/src/schema/job.rs @@ -47,8 +47,9 @@ impl relay::NodeType for JobRun { #[async_trait] pub trait JobService: Send + Sync { - async fn start(&self, name: String) -> Result; - async fn complete(&self, id: &ID, exit_code: i32) -> Result<()>; + // Schedule one job immediately. + async fn schedule(&self, name: String); + async fn list( &self, ids: Option>, @@ -60,7 +61,4 @@ pub trait JobService: Send + Sync { ) -> Result>; async fn compute_stats(&self, jobs: Option>) -> Result; - async fn update_stdout(&self, id: &ID, stdout: String) -> Result<()>; - async fn update_stderr(&self, id: &ID, stderr: String) -> Result<()>; - async fn cleanup(&self) -> Result<()>; } diff --git a/ee/tabby-webserver/src/service/job.rs b/ee/tabby-webserver/src/service/job.rs index 73a3d181e518..48bfc9fdc700 100644 --- a/ee/tabby-webserver/src/service/job.rs +++ b/ee/tabby-webserver/src/service/job.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use juniper::ID; use tabby_db::DbConn; -use super::{graphql_pagination_to_filter, AsID, AsRowid}; +use super::{graphql_pagination_to_filter, AsRowid}; use crate::schema::{ job::{JobRun, JobService, JobStats}, Result, @@ -10,29 +10,7 @@ use crate::schema::{ #[async_trait] impl JobService for DbConn { - async fn start(&self, name: String) -> Result { - Ok(self.create_job_run(name).await.map(|x| x.as_id())?) - } - - async fn update_stdout(&self, id: &ID, stdout: String) -> Result<()> { - self.update_job_stdout(id.as_rowid()?, stdout).await?; - Ok(()) - } - - async fn update_stderr(&self, id: &ID, stderr: String) -> Result<()> { - self.update_job_stderr(id.as_rowid()?, stderr).await?; - Ok(()) - } - - async fn complete(&self, id: &ID, exit_code: i32) -> Result<()> { - self.update_job_status(id.as_rowid()?, exit_code).await?; - Ok(()) - } - - async fn cleanup(&self) -> Result<()> { - (self as &DbConn).finalize_stale_job_runs().await?; - Ok(()) - } + async fn schedule(&self, _name: String) {} async fn list( &self, @@ -66,74 +44,3 @@ impl JobService for DbConn { }) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_job_service() { - let svc: Box = Box::new(DbConn::new_in_memory().await.unwrap()); - - let id = svc.start("test-job".to_owned()).await.unwrap(); - svc.update_stdout(&id, "stdout".to_owned()).await.unwrap(); - svc.update_stderr(&id, "stderr".to_owned()).await.unwrap(); - svc.complete(&id, 0).await.unwrap(); - - let job = svc.list(None, None, None, None, None, None).await.unwrap(); - let job = job.first().unwrap(); - assert_eq!(job.job, "test-job"); - assert_eq!(job.stdout, "stdout"); - assert_eq!(job.stderr, "stderr"); - assert_eq!(job.exit_code, Some(0)); - - let jobs = svc - .list(Some(vec![id]), None, None, None, None, None) - .await - .unwrap(); - assert_eq!(jobs.len(), 1); - - svc.start("another-job".into()).await.unwrap(); - let jobs = svc - .list( - None, - Some(vec!["another-job".into()]), - None, - None, - None, - None, - ) - .await - .unwrap(); - assert_eq!(jobs.len(), 1); - } - - #[tokio::test] - async fn test_job_stats() { - let db = DbConn::new_in_memory().await.unwrap(); - let jobs: Box = Box::new(db); - - let id = jobs.start("test-job".into()).await.unwrap(); - jobs.complete(&id, 0).await.unwrap(); - - let id2 = jobs.start("test-job".into()).await.unwrap(); - jobs.complete(&id2, 1).await.unwrap(); - - jobs.start("pending-job".into()).await.unwrap(); - - let stats = jobs.compute_stats(None).await.unwrap(); - - assert_eq!(stats.success, 1); - assert_eq!(stats.failed, 1); - assert_eq!(stats.pending, 1); - - let stats = jobs - .compute_stats(Some(vec!["test-job".into()])) - .await - .unwrap(); - - assert_eq!(stats.success, 1); - assert_eq!(stats.failed, 1); - assert_eq!(stats.pending, 0); - } -} From 236123b38a08df1e5a9a7749600e76f3a711d3ce Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Mon, 29 Apr 2024 19:47:52 -0700 Subject: [PATCH 3/6] cleanup --- ee/tabby-webserver/src/cron/controller.rs | 36 +++++++++++------------ ee/tabby-webserver/src/cron/db/mod.rs | 30 +++++++------------ ee/tabby-webserver/src/cron/mod.rs | 10 +++++++ ee/tabby-webserver/src/handler.rs | 5 ++++ ee/tabby-webserver/src/schema/job.rs | 2 +- ee/tabby-webserver/src/schema/mod.rs | 4 +++ ee/tabby-webserver/src/service/job.rs | 21 +++++++++++-- ee/tabby-webserver/src/service/mod.rs | 17 +++++++++-- 8 files changed, 82 insertions(+), 43 deletions(-) diff --git a/ee/tabby-webserver/src/cron/controller.rs b/ee/tabby-webserver/src/cron/controller.rs index 02efb0092b56..324102d9edd5 100644 --- a/ee/tabby-webserver/src/cron/controller.rs +++ b/ee/tabby-webserver/src/cron/controller.rs @@ -6,13 +6,11 @@ use tabby_db::DbConn; use tokio_cron_scheduler::{Job, JobScheduler}; use tracing::{debug, warn}; -use crate::schema::job::JobService; - pub struct JobController { scheduler: JobScheduler, db: DbConn, job_registry: HashMap< - String, + &'static str, Arc Pin + Send>> + Send + Sync + 'static>, >, } @@ -67,7 +65,7 @@ impl JobController { } /// Register a new job with the scheduler, the job will be displayed in Jobs dashboard. - pub async fn register_public(&mut self, name: &str, schedule: &str, func: T) + pub async fn register_public(&mut self, name: &'static str, schedule: &str, func: T) where T: FnMut(&JobContext) -> Pin> + Send>> + Send @@ -79,7 +77,7 @@ impl JobController { } /// Register a new job with the scheduler, the job will NOT be displayed in Jobs dashboard. - pub async fn register(&mut self, name: &str, schedule: &str, func: T) + pub async fn register(&mut self, name: &'static str, schedule: &str, func: T) where T: FnMut() -> Pin> + Send>> + Send @@ -97,23 +95,26 @@ impl JobController { .await; } - async fn register_impl(&mut self, is_public: bool, name: &str, schedule: &str, func: F) - where + async fn register_impl( + &mut self, + is_public: bool, + name: &'static str, + schedule: &str, + func: F, + ) where F: FnMut(&JobContext) -> Pin> + Send>> + Send + Sync + Clone + 'static, { - let cloned_name = name.to_owned(); let job_mutex = Arc::new(tokio::sync::Mutex::new(())); let db = self.db.clone(); self.job_registry.insert( - cloned_name.clone(), + name, Arc::new(move || { let job_mutex = job_mutex.clone(); let db = db.clone(); - let name = cloned_name.clone(); let mut func = func.clone(); Box::pin(async move { @@ -123,14 +124,14 @@ impl JobController { }; debug!("Running job `{}`", name); - let context = JobContext::new(is_public, &name, db.clone()).await; + let context = JobContext::new(is_public, name, db.clone()).await; match func(&context).await { Ok(exit_code) => { - debug!("Job `{}` completed with exit code {}", &name, exit_code); + debug!("Job `{}` completed with exit code {}", name, exit_code); context.complete(exit_code).await; } Err(e) => { - warn!("Job `{}` failed: {}", &name, e); + warn!("Job `{}` failed: {}", name, e); context.complete(-1).await; } }; @@ -138,18 +139,17 @@ impl JobController { }), ); - self.run_schedule(name.to_owned(), schedule).await + self.add_to_schedule(name, schedule).await } - async fn run_schedule(&mut self, name: String, schedule: &str) { + async fn add_to_schedule(&mut self, name: &'static str, schedule: &str) { let func = self .job_registry - .get_mut(&name) + .get_mut(name) .expect("failed to get job") .clone(); let job = Job::new_async(schedule, move |uuid, mut scheduler| { - let name = name.clone(); let func = func.clone(); Box::pin(async move { debug!("Running job `{}`", name); @@ -177,7 +177,7 @@ pub struct JobContext { } impl JobContext { - async fn new(public: bool, name: &str, db: DbConn) -> Self { + async fn new(public: bool, name: &'static str, db: DbConn) -> Self { let id = if public { db.create_job_run(name.to_owned()) .await diff --git a/ee/tabby-webserver/src/cron/db/mod.rs b/ee/tabby-webserver/src/cron/db/mod.rs index eb63d817c69c..3e3e5aebcecd 100644 --- a/ee/tabby-webserver/src/cron/db/mod.rs +++ b/ee/tabby-webserver/src/cron/db/mod.rs @@ -5,7 +5,7 @@ mod gitlab; use std::sync::Arc; -use super::{controller::JobController, every_ten_minutes, every_two_hours}; +use super::{controller::JobController, every_two_hours}; use crate::schema::{ auth::AuthenticationService, repository::{GithubRepositoryService, GitlabRepositoryService}, @@ -42,26 +42,18 @@ pub async fn register_jobs( .await; controller - .register_public( - "github_repositories", - &every_ten_minutes(), - move |context| { - let context = context.clone(); - let github = github.clone(); - Box::pin(async move { github::refresh_all_repositories(context, github).await }) - }, - ) + .register_public("github_repositories", &every_two_hours(), move |context| { + let context = context.clone(); + let github = github.clone(); + Box::pin(async move { github::refresh_all_repositories(context, github).await }) + }) .await; controller - .register_public( - "gitlab_repositories", - &every_ten_minutes(), - move |context| { - let gitlab = gitlab.clone(); - let context = context.clone(); - Box::pin(async move { gitlab::refresh_all_repositories(context, gitlab).await }) - }, - ) + .register_public("gitlab_repositories", &every_two_hours(), move |context| { + let gitlab = gitlab.clone(); + let context = context.clone(); + Box::pin(async move { gitlab::refresh_all_repositories(context, gitlab).await }) + }) .await; } diff --git a/ee/tabby-webserver/src/cron/mod.rs b/ee/tabby-webserver/src/cron/mod.rs index ac3ac305724b..22d93aef1bfb 100644 --- a/ee/tabby-webserver/src/cron/mod.rs +++ b/ee/tabby-webserver/src/cron/mod.rs @@ -20,6 +20,7 @@ macro_rules! warn_stderr { } pub async fn run_cron( + mut schedule_event_receiver: tokio::sync::mpsc::UnboundedReceiver, db: DbConn, auth: Arc, worker: Arc, @@ -37,6 +38,15 @@ pub async fn run_cron( scheduler::register(&mut controller, worker, local_port).await; + let controller = Arc::new(controller); + + let cloned_controller = controller.clone(); + tokio::spawn(async move { + while let Some(name) = schedule_event_receiver.recv().await { + cloned_controller.schedule(&name); + } + }); + controller.run().await } diff --git a/ee/tabby-webserver/src/handler.rs b/ee/tabby-webserver/src/handler.rs index f29dd2d1bd95..bd7791ad46b3 100644 --- a/ee/tabby-webserver/src/handler.rs +++ b/ee/tabby-webserver/src/handler.rs @@ -75,15 +75,20 @@ impl WebserverHandle { is_chat_enabled: bool, local_port: u16, ) -> (Router, Router) { + let (schedule_event_sender, schedule_event_receiver) = + tokio::sync::mpsc::unbounded_channel(); + let ctx = create_service_locator( self.logger(), code, self.repository.clone(), self.db.clone(), is_chat_enabled, + schedule_event_sender, ) .await; cron::run_cron( + schedule_event_receiver, self.db.clone(), ctx.auth(), ctx.worker(), diff --git a/ee/tabby-webserver/src/schema/job.rs b/ee/tabby-webserver/src/schema/job.rs index 8eb15ce97863..97231cd2cd69 100644 --- a/ee/tabby-webserver/src/schema/job.rs +++ b/ee/tabby-webserver/src/schema/job.rs @@ -48,7 +48,7 @@ impl relay::NodeType for JobRun { #[async_trait] pub trait JobService: Send + Sync { // Schedule one job immediately. - async fn schedule(&self, name: String); + fn schedule(&self, name: &str); async fn list( &self, diff --git a/ee/tabby-webserver/src/schema/mod.rs b/ee/tabby-webserver/src/schema/mod.rs index 190d4e62c340..e34fded511a3 100644 --- a/ee/tabby-webserver/src/schema/mod.rs +++ b/ee/tabby-webserver/src/schema/mod.rs @@ -784,6 +784,7 @@ impl Mutation { .github() .create_provider(input.display_name, input.access_token) .await?; + ctx.locator.job().schedule("github_repositories"); Ok(id) } @@ -808,6 +809,7 @@ impl Mutation { .github() .update_provider(input.id, input.display_name, input.access_token) .await?; + ctx.locator.job().schedule("github_repositories"); Ok(true) } @@ -836,6 +838,7 @@ impl Mutation { .gitlab() .create_provider(input.display_name, input.access_token) .await?; + ctx.locator.job().schedule("gitlab_repositories"); Ok(id) } @@ -860,6 +863,7 @@ impl Mutation { .gitlab() .update_provider(input.id, input.display_name, input.access_token) .await?; + ctx.locator.job().schedule("gitlab_repositories"); Ok(true) } diff --git a/ee/tabby-webserver/src/service/job.rs b/ee/tabby-webserver/src/service/job.rs index 48bfc9fdc700..6977fe3e7fbf 100644 --- a/ee/tabby-webserver/src/service/job.rs +++ b/ee/tabby-webserver/src/service/job.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use juniper::ID; use tabby_db::DbConn; +use tracing::error; use super::{graphql_pagination_to_filter, AsRowid}; use crate::schema::{ @@ -8,9 +9,22 @@ use crate::schema::{ Result, }; +struct JobControllerImpl { + db: DbConn, + sender: tokio::sync::mpsc::UnboundedSender, +} + +pub fn create(db: DbConn, sender: tokio::sync::mpsc::UnboundedSender) -> impl JobService { + JobControllerImpl { db, sender } +} + #[async_trait] -impl JobService for DbConn { - async fn schedule(&self, _name: String) {} +impl JobService for JobControllerImpl { + fn schedule(&self, name: &str) { + if let Err(e) = self.sender.send(name.to_owned()) { + error!("failed to send job to scheduler: {}", e); + } + } async fn list( &self, @@ -28,6 +42,7 @@ impl JobService for DbConn { .collect() }); Ok(self + .db .list_job_runs_with_filter(rowids, jobs, limit, skip_id, backwards) .await? .into_iter() @@ -36,7 +51,7 @@ impl JobService for DbConn { } async fn compute_stats(&self, jobs: Option>) -> Result { - let stats = (self as &DbConn).compute_job_stats(jobs).await?; + let stats = self.db.compute_job_stats(jobs).await?; Ok(JobStats { success: stats.success, failed: stats.failed, diff --git a/ee/tabby-webserver/src/service/mod.rs b/ee/tabby-webserver/src/service/mod.rs index 3eb366766887..63d72e2a863f 100644 --- a/ee/tabby-webserver/src/service/mod.rs +++ b/ee/tabby-webserver/src/service/mod.rs @@ -59,6 +59,7 @@ struct ServerContext { license: Arc, repository: Arc, user_event: Arc, + job: Arc, logger: Arc, code: Arc, @@ -73,6 +74,7 @@ impl ServerContext { repository: Arc, db_conn: DbConn, is_chat_enabled_locally: bool, + schedule_event_sender: tokio::sync::mpsc::UnboundedSender, ) -> Self { let mail = Arc::new( new_email_service(db_conn.clone()) @@ -85,6 +87,7 @@ impl ServerContext { .expect("failed to initialize license service"), ); let user_event = Arc::new(user_event::create(db_conn.clone())); + let job = Arc::new(job::create(db_conn.clone(), schedule_event_sender)); Self { client: Client::default(), completion: worker::WorkerGroup::default(), @@ -98,6 +101,7 @@ impl ServerContext { license, repository, user_event, + job, db_conn, logger, code, @@ -288,7 +292,7 @@ impl ServiceLocator for Arc { } fn job(&self) -> Arc { - Arc::new(self.db_conn.clone()) + self.job.clone() } fn repository(&self) -> Arc { @@ -322,9 +326,18 @@ pub async fn create_service_locator( repository: Arc, db: DbConn, is_chat_enabled: bool, + schedule_event_sender: tokio::sync::mpsc::UnboundedSender, ) -> Arc { Arc::new(Arc::new( - ServerContext::new(logger, code, repository, db, is_chat_enabled).await, + ServerContext::new( + logger, + code, + repository, + db, + is_chat_enabled, + schedule_event_sender, + ) + .await, )) } From 72844e03cee37b9ca724d5f12c2a9b5e5479f755 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Mon, 29 Apr 2024 19:51:54 -0700 Subject: [PATCH 4/6] update --- ee/tabby-webserver/src/service/job.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ee/tabby-webserver/src/service/job.rs b/ee/tabby-webserver/src/service/job.rs index 6977fe3e7fbf..2aeb9e738d7d 100644 --- a/ee/tabby-webserver/src/service/job.rs +++ b/ee/tabby-webserver/src/service/job.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use juniper::ID; use tabby_db::DbConn; -use tracing::error; +use tracing::{debug, error}; use super::{graphql_pagination_to_filter, AsRowid}; use crate::schema::{ @@ -21,6 +21,7 @@ pub fn create(db: DbConn, sender: tokio::sync::mpsc::UnboundedSender) -> #[async_trait] impl JobService for JobControllerImpl { fn schedule(&self, name: &str) { + debug!("scheduling job: {}", name); if let Err(e) = self.sender.send(name.to_owned()) { error!("failed to send job to scheduler: {}", e); } From 91cb32a54c63c769d056da26429d6c4d4fdc8520 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Mon, 29 Apr 2024 19:53:38 -0700 Subject: [PATCH 5/6] update --- rules/only-service-can-depend-tabby-db.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/rules/only-service-can-depend-tabby-db.yml b/rules/only-service-can-depend-tabby-db.yml index 2c743d009a29..5a15cff73383 100644 --- a/rules/only-service-can-depend-tabby-db.yml +++ b/rules/only-service-can-depend-tabby-db.yml @@ -6,6 +6,7 @@ files: - ./ee/tabby-webserver/src/** ignores: - ./ee/tabby-webserver/src/service/** +- ./ee/tabby-webserver/src/cron/** - ./ee/tabby-webserver/src/handler.rs rule: pattern: tabby_db From 85e155310bab210604317f2ae9eda0253c437f62 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Mon, 29 Apr 2024 20:37:55 -0700 Subject: [PATCH 6/6] update --- ee/tabby-db/src/job_runs.rs | 4 ++-- ee/tabby-webserver/src/cron/controller.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ee/tabby-db/src/job_runs.rs b/ee/tabby-db/src/job_runs.rs index 205078388f27..6ed1abf5ac61 100644 --- a/ee/tabby-db/src/job_runs.rs +++ b/ee/tabby-db/src/job_runs.rs @@ -30,13 +30,13 @@ pub struct JobStatsDAO { /// db read/write operations for `job_runs` table impl DbConn { - pub async fn create_job_run(&self, job: String) -> Result { + pub async fn create_job_run(&self, job: String) -> Result { let rowid = query!( r#"INSERT INTO job_runs (job, start_ts, stdout, stderr) VALUES (?, DATETIME('now'), '', '')"#, job, ).execute(&self.pool).await?.last_insert_rowid(); - Ok(rowid as i32) + Ok(rowid) } pub async fn update_job_stdout(&self, job_id: i64, stdout: String) -> Result<()> { diff --git a/ee/tabby-webserver/src/cron/controller.rs b/ee/tabby-webserver/src/cron/controller.rs index 324102d9edd5..511ee02b2748 100644 --- a/ee/tabby-webserver/src/cron/controller.rs +++ b/ee/tabby-webserver/src/cron/controller.rs @@ -185,7 +185,7 @@ impl JobContext { } else { -1 }; - Self { id: id as i64, db } + Self { id, db } } fn is_private(&self) -> bool {