From 99aae8edffd892d73a9c0991bd57b73826a48191 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Sat, 27 Apr 2024 23:47:49 -0700 Subject: [PATCH 1/8] refactor(webserver): extract JobController --- ee/tabby-webserver/src/cron/controller.rs | 165 ++++++++++++++++++++++ ee/tabby-webserver/src/cron/db/github.rs | 44 ++++-- ee/tabby-webserver/src/cron/db/gitlab.rs | 40 ++++-- ee/tabby-webserver/src/cron/db/mod.rs | 121 +++++----------- ee/tabby-webserver/src/cron/mod.rs | 59 ++------ ee/tabby-webserver/src/cron/scheduler.rs | 77 +++------- ee/tabby-webserver/src/schema/mod.rs | 8 +- 7 files changed, 304 insertions(+), 210 deletions(-) create mode 100644 ee/tabby-webserver/src/cron/controller.rs diff --git a/ee/tabby-webserver/src/cron/controller.rs b/ee/tabby-webserver/src/cron/controller.rs new file mode 100644 index 000000000000..ceb00efaeefb --- /dev/null +++ b/ee/tabby-webserver/src/cron/controller.rs @@ -0,0 +1,165 @@ +use std::{pin::Pin, sync::Arc}; + +use futures::Future; +use juniper::ID; +use tokio_cron_scheduler::{Job, JobScheduler}; +use tracing::{debug, warn}; + +use crate::schema::job::JobService; + +pub struct JobController { + scheduler: JobScheduler, + service: Arc, +} + +impl JobController { + pub async fn new(service: Arc) -> Self { + service.cleanup().await.expect("failed to cleanup jobs"); + let scheduler = JobScheduler::new() + .await + .expect("failed to create job scheduler"); + Self { scheduler, service } + } + + pub async fn run(&self) { + self.scheduler + .start() + .await + .expect("failed to start job scheduler") + } + + /// Register a new job with the scheduler, the job will be displayed in Jobs dashboard. + pub async fn register_public(&mut self, name: &str, schedule: &str, func: T) + where + T: FnMut(&JobContext) -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + let job_mutex = Arc::new(tokio::sync::Mutex::new(())); + let service = self.service.clone(); + let name = name.to_owned(); + let func = func.clone(); + let job = Job::new_async(schedule, move |uuid, mut scheduler| { + let job_mutex = job_mutex.clone(); + let service = service.clone(); + let name = name.clone(); + let mut func = func.clone(); + Box::pin(async move { + let Ok(_guard) = job_mutex.try_lock() else { + warn!("Scheduler job overlapped, skipping..."); + return; + }; + + debug!("Running public job `{}`", name); + + let Ok(id) = service.start(name.clone()).await else { + warn!("failed to create job `{}`", &name); + return; + }; + + let context = JobContext::new(id.clone(), service.clone()); + match func(&context).await { + Ok(exit_code) => { + debug!("Job `{}` completed with exit code {}", &name, exit_code); + let _ = service.complete(&id, exit_code).await; + } + Err(e) => { + warn!("Job `{}` failed: {}", &name, e); + let _ = service.complete(&id, -1).await; + } + } + + if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await { + debug!( + "Next time for job `{}` is {:?}", + &name, + next_tick.with_timezone(&chrono::Local) + ); + } + }) + }) + .expect("failed to create job"); + + self.scheduler.add(job).await.expect("failed to add job"); + } + + /// Register a new job with the scheduler, the job will NOT be displayed in Jobs dashboard. + pub async fn register(&mut self, name: &str, schedule: &str, func: T) + where + T: FnMut() -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + let job_mutex = Arc::new(tokio::sync::Mutex::new(())); + let func = func.clone(); + let name = name.to_owned(); + let job = Job::new_async(schedule, move |uuid, mut scheduler| { + let job_mutex = job_mutex.clone(); + let name = name.clone(); + let mut func = func.clone(); + Box::pin(async move { + let Ok(_guard) = job_mutex.try_lock() else { + warn!("Scheduler job overlapped, skipping..."); + return; + }; + + debug!("Running job `{}`", name); + match func().await { + Ok(_) => { + debug!("Job `{}` completed", name); + } + Err(e) => { + warn!("Job `{}` failed: {}", name, e); + } + } + + if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await { + debug!( + "Next time for job `{}` is {:?}", + &name, + next_tick.with_timezone(&chrono::Local) + ); + } + }) + }) + .expect("failed to create job"); + + self.scheduler.add(job).await.expect("failed to add job"); + } +} + +#[derive(Clone)] +pub struct JobContext { + id: ID, + service: Arc, +} + +impl JobContext { + pub fn new(id: ID, service: Arc) -> Self { + Self { id, service } + } + + pub async fn stdout_writeline(&self, stdout: String) { + let stdout = stdout + "\n"; + match self.service.update_stdout(&self.id, stdout).await { + Ok(_) => (), + Err(_) => { + warn!("Failed to write stdout to job `{}`", self.id); + } + } + } + + pub async fn stderr_writeline(&self, stderr: String) { + let stderr = stderr + "\n"; + match self.service.update_stderr(&self.id, stderr).await { + Ok(_) => (), + Err(_) => { + warn!("Failed to write stderr to job `{}`", self.id); + } + } + } +} diff --git a/ee/tabby-webserver/src/cron/db/github.rs b/ee/tabby-webserver/src/cron/db/github.rs index 1f184ca891c9..e2201fc539ec 100644 --- a/ee/tabby-webserver/src/cron/db/github.rs +++ b/ee/tabby-webserver/src/cron/db/github.rs @@ -4,25 +4,38 @@ use anyhow::Result; use chrono::Utc; use juniper::ID; use octocrab::{models::Repository, GitHubError, Octocrab}; -use tracing::warn; -use crate::schema::repository::{GithubRepositoryProvider, GithubRepositoryService}; +use crate::{ + cron::controller::JobContext, + schema::repository::{GithubRepositoryProvider, GithubRepositoryService}, +}; -pub async fn refresh_all_repositories(service: Arc) -> Result<()> { +pub async fn refresh_all_repositories( + context: JobContext, + service: Arc, +) -> Result { for provider in service .list_providers(vec![], None, None, None, None) .await? { let start = Utc::now(); - refresh_repositories_for_provider(service.clone(), provider.id.clone()).await?; + context + .stdout_writeline(format!( + "Refreshing repositories for provider: {}\n", + provider.display_name + )) + .await; + refresh_repositories_for_provider(context.clone(), service.clone(), provider.id.clone()) + .await?; service .delete_outdated_repositories(provider.id, start) .await?; } - Ok(()) + Ok(0) } async fn refresh_repositories_for_provider( + context: JobContext, service: Arc, provider_id: ID, ) -> Result<()> { @@ -36,18 +49,29 @@ async fn refresh_repositories_for_provider( service .update_provider_status(provider.id.clone(), false) .await?; - warn!( - "GitHub credentials for provider {} are expired or invalid", - provider.display_name - ); + context + .stderr_writeline(format!( + "GitHub credentials for provider {} are expired or invalid", + provider.display_name + )) + .await; return Err(source.into()); } Err(e) => { - warn!("Failed to fetch repositories from github: {e}"); + context + .stderr_writeline(format!("Failed to fetch repositories from github: {}", e)) + .await; return Err(e.into()); } }; for repo in repos { + context + .stdout_writeline(format!( + "Importing: {}", + repo.full_name.as_deref().unwrap_or(&repo.name) + )) + .await; + let id = repo.id.to_string(); let Some(url) = repo.git_url else { continue; diff --git a/ee/tabby-webserver/src/cron/db/gitlab.rs b/ee/tabby-webserver/src/cron/db/gitlab.rs index 00d00f7c2bed..6d73b6dbf25e 100644 --- a/ee/tabby-webserver/src/cron/db/gitlab.rs +++ b/ee/tabby-webserver/src/cron/db/gitlab.rs @@ -8,25 +8,38 @@ use gitlab::{ }; use juniper::ID; use serde::Deserialize; -use tracing::warn; -use crate::schema::repository::{GitlabRepositoryProvider, GitlabRepositoryService}; +use crate::{ + cron::controller::JobContext, + schema::repository::{GitlabRepositoryProvider, GitlabRepositoryService}, +}; -pub async fn refresh_all_repositories(service: Arc) -> Result<()> { +pub async fn refresh_all_repositories( + context: JobContext, + service: Arc, +) -> Result { for provider in service .list_providers(vec![], None, None, None, None) .await? { let start = Utc::now(); - refresh_repositories_for_provider(service.clone(), provider.id.clone()).await?; + context + .stdout_writeline(format!( + "Refreshing repositories for provider: {}\n", + provider.display_name + )) + .await; + refresh_repositories_for_provider(context.clone(), service.clone(), provider.id.clone()) + .await?; service .delete_outdated_repositories(provider.id, start) .await?; } - Ok(()) + Ok(0) } async fn refresh_repositories_for_provider( + context: JobContext, service: Arc, provider_id: ID, ) -> Result<()> { @@ -37,18 +50,25 @@ async fn refresh_repositories_for_provider( service .update_provider_status(provider.id.clone(), false) .await?; - warn!( - "GitLab credentials for provider {} are expired or invalid", - provider.display_name - ); + context + .stderr_writeline(format!( + "GitLab credentials for provider {} are expired or invalid", + provider.display_name + )) + .await; return Err(e); } Err(e) => { - warn!("Failed to fetch repositories from github: {e}"); + context + .stderr_writeline(format!("Failed to fetch repositories from gitlab: {e}")) + .await; return Err(e); } }; for repo in repos { + context + .stdout_writeline(format!("Importing: {}", &repo.name_with_namespace)) + .await; let id = repo.id.to_string(); let url = repo.http_url_to_repo; let url = url.strip_suffix(".git").unwrap_or(&url); diff --git a/ee/tabby-webserver/src/cron/db/mod.rs b/ee/tabby-webserver/src/cron/db/mod.rs index 8f52796b70de..1b1c24aadc14 100644 --- a/ee/tabby-webserver/src/cron/db/mod.rs +++ b/ee/tabby-webserver/src/cron/db/mod.rs @@ -3,105 +3,52 @@ mod github; mod gitlab; -use std::{sync::Arc, time::Duration}; - -use anyhow::Result; -use futures::Future; -use tokio_cron_scheduler::Job; -use tracing::{debug, error}; +use std::sync::Arc; +use super::controller::JobController; use crate::schema::{ auth::AuthenticationService, - job::JobService, repository::{GithubRepositoryService, GitlabRepositoryService}, }; const EVERY_TWO_HOURS: &str = "0 0 1/2 * * * *"; const EVERY_TEN_MINUTES: &str = "0 1/10 * * * *"; -async fn service_job( - name: &str, - frequency: &'static str, - service: Arc, - job: fn(Arc) -> F, -) -> Result -where - F: Future> + 'static + Send, - S: Send + Sync + 'static + ?Sized, -{ - let name = name.to_owned(); - let job = Job::new_async(frequency, move |_, _| { - let name = name.clone(); - let auth = service.clone(); - Box::pin(async move { - let res = job(auth.clone()).await; - if let Err(e) = res { - error!("Failed to run `{name}` job: {}", e); - } +pub async fn register_jobs( + controller: &mut JobController, + auth: Arc, + github: Arc, + gitlab: Arc, +) { + let cloned_auth = auth.clone(); + controller + .register("remove_staled_refresh_token", EVERY_TWO_HOURS, move || { + let auth = cloned_auth.clone(); + Box::pin(async move { Ok(auth.delete_expired_token().await?) }) }) - })?; - - Ok(job) -} + .await; -pub async fn refresh_token_job(auth: Arc) -> Result { - service_job( - "cleanup staled refresh token", - EVERY_TWO_HOURS, - auth, - |auth| async move { Ok(auth.delete_expired_token().await?) }, - ) - .await -} - -pub async fn password_reset_job(auth: Arc) -> Result { - service_job( - "cleanup staled password reset", - EVERY_TWO_HOURS, - auth, - |auth| async move { Ok(auth.delete_expired_password_resets().await?) }, - ) - .await -} - -pub async fn update_integrated_github_repositories_job( - github_repository_provider: Arc, -) -> Result { - service_job( - "sync github repositories", - EVERY_TEN_MINUTES, - github_repository_provider, - |github_repository_provider| async move { - debug!("Syncing github repositories..."); - github::refresh_all_repositories(github_repository_provider).await - }, - ) - .await -} + let cloned_auth = auth.clone(); + controller + .register("remove_staled_password_reset", EVERY_TWO_HOURS, move || { + let auth = cloned_auth.clone(); + Box::pin(async move { Ok(auth.delete_expired_password_resets().await?) }) + }) + .await; -pub async fn update_integrated_gitlab_repositories_job( - gitlab_repository_provider: Arc, -) -> Result { - service_job( - "sync gitlab repositories", - EVERY_TEN_MINUTES, - gitlab_repository_provider, - |gitlab_repository_provider| async move { - debug!("Syncing gitlab repositories..."); - gitlab::refresh_all_repositories(gitlab_repository_provider).await - }, - ) - .await -} + controller + .register_public("github_repositories", EVERY_TEN_MINUTES, move |context| { + let context = context.clone(); + let github = github.clone(); + Box::pin(async move { github::refresh_all_repositories(context, github).await }) + }) + .await; -pub async fn job_cleanup(jobs: Arc) -> Result { - let job_res = Job::new_one_shot_async(Duration::from_secs(0), move |_, _| { - let jobs = jobs.clone(); - Box::pin(async move { - if let Err(e) = jobs.cleanup().await { - error!("failed to finalize stale job runs: {e}"); - } + controller + .register_public("gitlab_repositories", EVERY_TEN_MINUTES, move |context| { + let gitlab = gitlab.clone(); + let context = context.clone(); + Box::pin(async move { gitlab::refresh_all_repositories(context, gitlab).await }) }) - }); - Ok(job_res?) + .await; } diff --git a/ee/tabby-webserver/src/cron/mod.rs b/ee/tabby-webserver/src/cron/mod.rs index c61e7cba7539..12d8ba9dd63f 100644 --- a/ee/tabby-webserver/src/cron/mod.rs +++ b/ee/tabby-webserver/src/cron/mod.rs @@ -1,24 +1,14 @@ +mod controller; mod db; mod scheduler; use std::sync::Arc; -use tokio_cron_scheduler::{Job, JobScheduler}; - use crate::schema::{ auth::AuthenticationService, job::JobService, repository::RepositoryService, worker::WorkerService, }; -async fn new_job_scheduler(jobs: Vec) -> anyhow::Result { - let scheduler = JobScheduler::new().await?; - for job in jobs { - scheduler.add(job).await?; - } - scheduler.start().await?; - Ok(scheduler) -} - pub async fn run_cron( auth: Arc, job: Arc, @@ -26,39 +16,16 @@ pub async fn run_cron( repository: Arc, local_port: u16, ) { - let mut jobs = vec![]; - - let job1 = db::refresh_token_job(auth.clone()) - .await - .expect("failed to create refresh token cleanup job"); - jobs.push(job1); - - let job2 = db::password_reset_job(auth) - .await - .expect("failed to create password reset token cleanup job"); - jobs.push(job2); - - let job3 = scheduler::scheduler_job(job.clone(), worker, local_port) - .await - .expect("failed to create scheduler job"); - jobs.push(job3); - - let job4 = db::job_cleanup(job) - .await - .expect("failed to create stale job runs cleanup job"); - jobs.push(job4); - - let job5 = db::update_integrated_github_repositories_job(repository.github()) - .await - .expect("Failed to create github repository refresh job"); - jobs.push(job5); - - let job6 = db::update_integrated_gitlab_repositories_job(repository.gitlab()) - .await - .expect("Failed to create gitlab repository refresh job"); - jobs.push(job6); - - new_job_scheduler(jobs) - .await - .expect("failed to start job scheduler"); + let mut controller = controller::JobController::new(job).await; + db::register_jobs( + &mut controller, + auth, + repository.github(), + repository.gitlab(), + ) + .await; + + scheduler::register(&mut controller, worker, local_port).await; + + controller.run().await } diff --git a/ee/tabby-webserver/src/cron/scheduler.rs b/ee/tabby-webserver/src/cron/scheduler.rs index 24522902c067..daf11b045d81 100644 --- a/ee/tabby-webserver/src/cron/scheduler.rs +++ b/ee/tabby-webserver/src/cron/scheduler.rs @@ -1,64 +1,33 @@ -use std::{pin::Pin, process::Stdio, sync::Arc}; +use std::{process::Stdio, sync::Arc}; use anyhow::{Context, Result}; -use futures::Future; use tokio::io::AsyncBufReadExt; -use tokio_cron_scheduler::{Job, JobScheduler}; -use tracing::{debug, error, warn}; +use tracing::debug; -use crate::schema::{job::JobService, worker::WorkerService}; +use super::controller::{JobContext, JobController}; +use crate::schema::worker::WorkerService; -pub async fn scheduler_job( - job: Arc, +pub async fn register( + controller: &mut JobController, worker: Arc, local_port: u16, -) -> anyhow::Result { - let scheduler_mutex = Arc::new(tokio::sync::Mutex::new(())); - - let scheduler_job = - move |uuid, mut scheduler: JobScheduler| -> Pin + Send>> { +) { + controller + .register_public("scheduler", "0 1/10 * * * *", move |context| { + let context = context.clone(); let worker = worker.clone(); - let job = job.clone(); - let scheduler_mutex = scheduler_mutex.clone(); - Box::pin(async move { - let Ok(_guard) = scheduler_mutex.try_lock() else { - warn!("Scheduler job overlapped, skipping..."); - return; - }; - - if let Err(err) = run_scheduler_now(job, worker, local_port).await { - error!("Failed to run scheduler job, reason: `{}`", err); - } - - if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await { - debug!( - "Next time for scheduler job is {:?}", - next_tick.with_timezone(&chrono::Local) - ); - } - }) - }; - - let job = if std::env::var("TABBY_WEBSERVER_SCHEDULER_ONESHOT").is_ok() { - warn!( - "Running scheduler job as oneshot, this should only be used for debugging purpose..." - ); - Job::new_one_shot_async(std::time::Duration::from_secs(10), scheduler_job)? - } else { - Job::new_async("0 1/10 * * * *", scheduler_job)? - }; - - Ok(job) + Box::pin(async move { run_scheduler_now(context, worker, local_port).await }) + }) + .await; } async fn run_scheduler_now( - job: Arc, + context: JobContext, worker: Arc, local_port: u16, -) -> Result<()> { +) -> Result { debug!("Running scheduler job..."); let exe = std::env::current_exe()?; - let job_id = job.start("scheduler".to_owned()).await?; let mut child = tokio::process::Command::new(exe) .arg("scheduler") @@ -73,14 +42,13 @@ async fn run_scheduler_now( { // Pipe stdout - let job = job.clone(); - let job_id = job_id.clone(); let stdout = child.stdout.take().context("Failed to acquire stdout")?; + let ctx = context.clone(); tokio::spawn(async move { let stdout = tokio::io::BufReader::new(stdout); let mut stdout = stdout.lines(); while let Ok(Some(line)) = stdout.next_line().await { - let _ = job.update_stdout(&job_id, line + "\n").await; + let _ = ctx.stdout_writeline(line).await; } }); } @@ -88,21 +56,18 @@ async fn run_scheduler_now( { // Pipe stderr let stderr = child.stderr.take().context("Failed to acquire stderr")?; - let job = job.clone(); - let job_id = job_id.clone(); + let ctx = context.clone(); tokio::spawn(async move { let stderr = tokio::io::BufReader::new(stderr); let mut stdout = stderr.lines(); while let Ok(Some(line)) = stdout.next_line().await { - let _ = job.update_stderr(&job_id, line + "\n").await; + let _ = ctx.stderr_writeline(line).await; } }); } if let Some(exit_code) = child.wait().await.ok().and_then(|s| s.code()) { - job.complete(&job_id, exit_code).await?; + Ok(exit_code) } else { - job.complete(&job_id, -1).await?; + Ok(-1) } - - Ok(()) } diff --git a/ee/tabby-webserver/src/schema/mod.rs b/ee/tabby-webserver/src/schema/mod.rs index ad07b55c413c..73250e3e88cc 100644 --- a/ee/tabby-webserver/src/schema/mod.rs +++ b/ee/tabby-webserver/src/schema/mod.rs @@ -423,8 +423,14 @@ impl Query { ctx.locator.license().read().await } + // FIXME(meng): This is a temporary solution to expose the list of jobs, we should consider switching to a enum based approach. async fn jobs() -> Result> { - Ok(vec!["scheduler".into()]) + Ok( + vec!["scheduler", "github_repositories", "gitlab_repositories"] + .into_iter() + .map(Into::into) + .collect(), + ) } async fn daily_stats_in_past_year( From c32ed1da5987f94b1f1454f4d2a44f1c9d71a3de Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Sat, 27 Apr 2024 23:54:40 -0700 Subject: [PATCH 2/8] update --- ee/tabby-webserver/src/cron/controller.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ee/tabby-webserver/src/cron/controller.rs b/ee/tabby-webserver/src/cron/controller.rs index ceb00efaeefb..2c8121707331 100644 --- a/ee/tabby-webserver/src/cron/controller.rs +++ b/ee/tabby-webserver/src/cron/controller.rs @@ -48,7 +48,7 @@ impl JobController { let mut func = func.clone(); Box::pin(async move { let Ok(_guard) = job_mutex.try_lock() else { - warn!("Scheduler job overlapped, skipping..."); + warn!("Job `{}` overlapped, skipping...", name); return; }; @@ -103,7 +103,7 @@ impl JobController { let mut func = func.clone(); Box::pin(async move { let Ok(_guard) = job_mutex.try_lock() else { - warn!("Scheduler job overlapped, skipping..."); + warn!("Job `{}` overlapped, skipping...", name); return; }; From 6835c96201baf522331ee9d46147dccf4a0a1817 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Sun, 28 Apr 2024 01:18:43 -0700 Subject: [PATCH 3/8] using randomized cronls --- ee/tabby-webserver/src/cron/db/mod.rs | 4 ++-- ee/tabby-webserver/src/cron/scheduler.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ee/tabby-webserver/src/cron/db/mod.rs b/ee/tabby-webserver/src/cron/db/mod.rs index 1b1c24aadc14..8f456a5c142c 100644 --- a/ee/tabby-webserver/src/cron/db/mod.rs +++ b/ee/tabby-webserver/src/cron/db/mod.rs @@ -11,8 +11,8 @@ use crate::schema::{ repository::{GithubRepositoryService, GitlabRepositoryService}, }; -const EVERY_TWO_HOURS: &str = "0 0 1/2 * * * *"; -const EVERY_TEN_MINUTES: &str = "0 1/10 * * * *"; +const EVERY_TWO_HOURS: &str = "0 0 */2 * * * *"; +const EVERY_TEN_MINUTES: &str = "0 */10 * * * *"; pub async fn register_jobs( controller: &mut JobController, diff --git a/ee/tabby-webserver/src/cron/scheduler.rs b/ee/tabby-webserver/src/cron/scheduler.rs index daf11b045d81..d8f282c83bbe 100644 --- a/ee/tabby-webserver/src/cron/scheduler.rs +++ b/ee/tabby-webserver/src/cron/scheduler.rs @@ -13,7 +13,7 @@ pub async fn register( local_port: u16, ) { controller - .register_public("scheduler", "0 1/10 * * * *", move |context| { + .register_public("scheduler", "0 */10 * * * *", move |context| { let context = context.clone(); let worker = worker.clone(); Box::pin(async move { run_scheduler_now(context, worker, local_port).await }) From e3e29136e330f6da9bb662150d3dc1860393c40e Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Sun, 28 Apr 2024 01:21:22 -0700 Subject: [PATCH 4/8] randomize --- Cargo.lock | 1 + ee/tabby-webserver/Cargo.toml | 1 + ee/tabby-webserver/src/cron/db/mod.rs | 17 ++++++++--------- ee/tabby-webserver/src/cron/mod.rs | 12 ++++++++++++ ee/tabby-webserver/src/cron/scheduler.rs | 7 +++++-- 5 files changed, 27 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 92db4adeab3b..2c3d29b16dfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5106,6 +5106,7 @@ dependencies = [ "octocrab", "pin-project", "querystring", + "rand 0.8.5", "regex", "reqwest", "rust-embed 8.0.0", diff --git a/ee/tabby-webserver/Cargo.toml b/ee/tabby-webserver/Cargo.toml index 8205406e730b..462488569f28 100644 --- a/ee/tabby-webserver/Cargo.toml +++ b/ee/tabby-webserver/Cargo.toml @@ -53,6 +53,7 @@ tabby-search = { path = "../tabby-search" } octocrab = "0.38.0" fs_extra = "1.3.0" gitlab = "0.1610.0" +rand = "0.8.5" [dev-dependencies] assert_matches = "1.5.0" diff --git a/ee/tabby-webserver/src/cron/db/mod.rs b/ee/tabby-webserver/src/cron/db/mod.rs index 8f456a5c142c..d2f506252042 100644 --- a/ee/tabby-webserver/src/cron/db/mod.rs +++ b/ee/tabby-webserver/src/cron/db/mod.rs @@ -5,15 +5,14 @@ mod gitlab; use std::sync::Arc; -use super::controller::JobController; +use rand::Rng; + +use super::{controller::JobController, every_ten_minutes, every_two_hours}; use crate::schema::{ auth::AuthenticationService, repository::{GithubRepositoryService, GitlabRepositoryService}, }; -const EVERY_TWO_HOURS: &str = "0 0 */2 * * * *"; -const EVERY_TEN_MINUTES: &str = "0 */10 * * * *"; - pub async fn register_jobs( controller: &mut JobController, auth: Arc, @@ -22,7 +21,7 @@ pub async fn register_jobs( ) { let cloned_auth = auth.clone(); controller - .register("remove_staled_refresh_token", EVERY_TWO_HOURS, move || { + .register("remove_staled_refresh_token", &every_two_hours(), move || { let auth = cloned_auth.clone(); Box::pin(async move { Ok(auth.delete_expired_token().await?) }) }) @@ -30,14 +29,14 @@ pub async fn register_jobs( let cloned_auth = auth.clone(); controller - .register("remove_staled_password_reset", EVERY_TWO_HOURS, move || { + .register("remove_staled_password_reset", &every_two_hours(), move || { let auth = cloned_auth.clone(); Box::pin(async move { Ok(auth.delete_expired_password_resets().await?) }) }) .await; controller - .register_public("github_repositories", EVERY_TEN_MINUTES, move |context| { + .register_public("github_repositories", &every_ten_minutes(), move |context| { let context = context.clone(); let github = github.clone(); Box::pin(async move { github::refresh_all_repositories(context, github).await }) @@ -45,10 +44,10 @@ pub async fn register_jobs( .await; controller - .register_public("gitlab_repositories", EVERY_TEN_MINUTES, move |context| { + .register_public("gitlab_repositories", &every_ten_minutes(), move |context| { let gitlab = gitlab.clone(); let context = context.clone(); Box::pin(async move { gitlab::refresh_all_repositories(context, gitlab).await }) }) .await; -} +} \ No newline at end of file diff --git a/ee/tabby-webserver/src/cron/mod.rs b/ee/tabby-webserver/src/cron/mod.rs index 12d8ba9dd63f..73992298fcd4 100644 --- a/ee/tabby-webserver/src/cron/mod.rs +++ b/ee/tabby-webserver/src/cron/mod.rs @@ -4,6 +4,8 @@ mod scheduler; use std::sync::Arc; +use rand::Rng; + use crate::schema::{ auth::AuthenticationService, job::JobService, repository::RepositoryService, worker::WorkerService, @@ -29,3 +31,13 @@ pub async fn run_cron( controller.run().await } + +fn every_two_hours() -> String { + let mut rng = rand::thread_rng(); + format!("{} {} */2 * * * *", rng.gen_range(0..59), rng.gen_range(0..59)) +} + +fn every_ten_minutes() -> String { + let mut rng = rand::thread_rng(); + format!("{} {} */10 * * * *", rng.gen_range(0..59), rng.gen_range(0..59)) +} \ No newline at end of file diff --git a/ee/tabby-webserver/src/cron/scheduler.rs b/ee/tabby-webserver/src/cron/scheduler.rs index d8f282c83bbe..35137a0723cf 100644 --- a/ee/tabby-webserver/src/cron/scheduler.rs +++ b/ee/tabby-webserver/src/cron/scheduler.rs @@ -4,7 +4,10 @@ use anyhow::{Context, Result}; use tokio::io::AsyncBufReadExt; use tracing::debug; -use super::controller::{JobContext, JobController}; +use super::{ + controller::{JobContext, JobController}, + every_ten_minutes, +}; use crate::schema::worker::WorkerService; pub async fn register( @@ -13,7 +16,7 @@ pub async fn register( local_port: u16, ) { controller - .register_public("scheduler", "0 */10 * * * *", move |context| { + .register_public("scheduler", &every_ten_minutes(), move |context| { let context = context.clone(); let worker = worker.clone(); Box::pin(async move { run_scheduler_now(context, worker, local_port).await }) From 5c9d1c7622a0391657049f679a182467d87213ab Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Sun, 28 Apr 2024 01:23:08 -0700 Subject: [PATCH 5/8] update --- ee/tabby-webserver/src/cron/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ee/tabby-webserver/src/cron/mod.rs b/ee/tabby-webserver/src/cron/mod.rs index 73992298fcd4..c9a0f06ede6a 100644 --- a/ee/tabby-webserver/src/cron/mod.rs +++ b/ee/tabby-webserver/src/cron/mod.rs @@ -34,10 +34,10 @@ pub async fn run_cron( fn every_two_hours() -> String { let mut rng = rand::thread_rng(); - format!("{} {} */2 * * * *", rng.gen_range(0..59), rng.gen_range(0..59)) + format!("{} {} */2 * * *", rng.gen_range(0..59), rng.gen_range(0..59)) } fn every_ten_minutes() -> String { let mut rng = rand::thread_rng(); - format!("{} {} */10 * * * *", rng.gen_range(0..59), rng.gen_range(0..59)) + format!("{} */10 * * * *", rng.gen_range(0..59)) } \ No newline at end of file From 2c29f0bb2eb4a8a340bfd58527e80141436394b8 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Sun, 28 Apr 2024 14:05:24 -0700 Subject: [PATCH 6/8] simplify register/register_public implementation --- ee/tabby-webserver/src/cron/controller.rs | 114 ++++++++++++---------- ee/tabby-webserver/src/cron/db/mod.rs | 56 +++++++---- ee/tabby-webserver/src/cron/mod.rs | 8 +- 3 files changed, 104 insertions(+), 74 deletions(-) diff --git a/ee/tabby-webserver/src/cron/controller.rs b/ee/tabby-webserver/src/cron/controller.rs index 2c8121707331..61ccf0805f22 100644 --- a/ee/tabby-webserver/src/cron/controller.rs +++ b/ee/tabby-webserver/src/cron/controller.rs @@ -37,68 +37,43 @@ impl JobController { + Clone + 'static, { - let job_mutex = Arc::new(tokio::sync::Mutex::new(())); - let service = self.service.clone(); - let name = name.to_owned(); - let func = func.clone(); - let job = Job::new_async(schedule, move |uuid, mut scheduler| { - let job_mutex = job_mutex.clone(); - let service = service.clone(); - let name = name.clone(); + self.register_impl(true, name, schedule, func).await; + } + + /// Register a new job with the scheduler, the job will NOT be displayed in Jobs dashboard. + pub async fn register(&mut self, name: &str, schedule: &str, func: T) + where + T: FnMut() -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + self.register_impl(false, name, schedule, move |_| { let mut func = func.clone(); Box::pin(async move { - let Ok(_guard) = job_mutex.try_lock() else { - warn!("Job `{}` overlapped, skipping...", name); - return; - }; - - debug!("Running public job `{}`", name); - - let Ok(id) = service.start(name.clone()).await else { - warn!("failed to create job `{}`", &name); - return; - }; - - let context = JobContext::new(id.clone(), service.clone()); - match func(&context).await { - Ok(exit_code) => { - debug!("Job `{}` completed with exit code {}", &name, exit_code); - let _ = service.complete(&id, exit_code).await; - } - Err(e) => { - warn!("Job `{}` failed: {}", &name, e); - let _ = service.complete(&id, -1).await; - } - } - - if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await { - debug!( - "Next time for job `{}` is {:?}", - &name, - next_tick.with_timezone(&chrono::Local) - ); - } + func().await?; + Ok(0) }) }) - .expect("failed to create job"); - - self.scheduler.add(job).await.expect("failed to add job"); + .await; } - /// Register a new job with the scheduler, the job will NOT be displayed in Jobs dashboard. - pub async fn register(&mut self, name: &str, schedule: &str, func: T) + async fn register_impl(&mut self, is_public: bool, name: &str, schedule: &str, func: T) where - T: FnMut() -> Pin> + Send>> + T: FnMut(&JobContext) -> Pin> + Send>> + Send + Sync + Clone + 'static, { let job_mutex = Arc::new(tokio::sync::Mutex::new(())); - let func = func.clone(); + let service = self.service.clone(); let name = name.to_owned(); + let func = func.clone(); let job = Job::new_async(schedule, move |uuid, mut scheduler| { let job_mutex = job_mutex.clone(); + let service = service.clone(); let name = name.clone(); let mut func = func.clone(); Box::pin(async move { @@ -108,12 +83,16 @@ impl JobController { }; debug!("Running job `{}`", name); - match func().await { - Ok(_) => { - debug!("Job `{}` completed", name); + + let context = JobContext::new(is_public, &name, service.clone()).await; + match func(&context).await { + Ok(exit_code) => { + debug!("Job `{}` completed with exit code {}", &name, exit_code); + context.complete(exit_code).await; } Err(e) => { - warn!("Job `{}` failed: {}", name, e); + warn!("Job `{}` failed: {}", &name, e); + context.complete(-1).await; } } @@ -139,11 +118,27 @@ pub struct JobContext { } impl JobContext { - pub fn new(id: ID, service: Arc) -> Self { + async fn new(public: bool, name: &str, service: Arc) -> Self { + let id = if public { + service + .start(name.to_owned()) + .await + .expect("failed to create job") + } else { + ID::from("".to_owned()) + }; Self { id, service } } + fn is_private(&self) -> bool { + self.id.is_empty() + } + pub async fn stdout_writeline(&self, stdout: String) { + if self.is_private() { + return; + } + let stdout = stdout + "\n"; match self.service.update_stdout(&self.id, stdout).await { Ok(_) => (), @@ -154,6 +149,10 @@ impl JobContext { } pub async fn stderr_writeline(&self, stderr: String) { + if self.is_private() { + return; + } + let stderr = stderr + "\n"; match self.service.update_stderr(&self.id, stderr).await { Ok(_) => (), @@ -162,4 +161,17 @@ impl JobContext { } } } + + async fn complete(&self, exit_code: i32) { + if self.is_private() { + return; + } + + match self.service.complete(&self.id, exit_code).await { + Ok(_) => (), + Err(_) => { + warn!("Failed to complete job `{}`", self.id); + } + } + } } diff --git a/ee/tabby-webserver/src/cron/db/mod.rs b/ee/tabby-webserver/src/cron/db/mod.rs index d2f506252042..eb63d817c69c 100644 --- a/ee/tabby-webserver/src/cron/db/mod.rs +++ b/ee/tabby-webserver/src/cron/db/mod.rs @@ -5,8 +5,6 @@ mod gitlab; use std::sync::Arc; -use rand::Rng; - use super::{controller::JobController, every_ten_minutes, every_two_hours}; use crate::schema::{ auth::AuthenticationService, @@ -21,33 +19,49 @@ pub async fn register_jobs( ) { let cloned_auth = auth.clone(); controller - .register("remove_staled_refresh_token", &every_two_hours(), move || { - let auth = cloned_auth.clone(); - Box::pin(async move { Ok(auth.delete_expired_token().await?) }) - }) + .register( + "remove_staled_refresh_token", + &every_two_hours(), + move || { + let auth = cloned_auth.clone(); + Box::pin(async move { Ok(auth.delete_expired_token().await?) }) + }, + ) .await; let cloned_auth = auth.clone(); controller - .register("remove_staled_password_reset", &every_two_hours(), move || { - let auth = cloned_auth.clone(); - Box::pin(async move { Ok(auth.delete_expired_password_resets().await?) }) - }) + .register( + "remove_staled_password_reset", + &every_two_hours(), + move || { + let auth = cloned_auth.clone(); + Box::pin(async move { Ok(auth.delete_expired_password_resets().await?) }) + }, + ) .await; controller - .register_public("github_repositories", &every_ten_minutes(), move |context| { - let context = context.clone(); - let github = github.clone(); - Box::pin(async move { github::refresh_all_repositories(context, github).await }) - }) + .register_public( + "github_repositories", + &every_ten_minutes(), + move |context| { + let context = context.clone(); + let github = github.clone(); + Box::pin(async move { github::refresh_all_repositories(context, github).await }) + }, + ) .await; controller - .register_public("gitlab_repositories", &every_ten_minutes(), move |context| { - let gitlab = gitlab.clone(); - let context = context.clone(); - Box::pin(async move { gitlab::refresh_all_repositories(context, gitlab).await }) - }) + .register_public( + "gitlab_repositories", + &every_ten_minutes(), + move |context| { + let gitlab = gitlab.clone(); + let context = context.clone(); + Box::pin(async move { gitlab::refresh_all_repositories(context, gitlab).await }) + }, + ) .await; -} \ No newline at end of file +} diff --git a/ee/tabby-webserver/src/cron/mod.rs b/ee/tabby-webserver/src/cron/mod.rs index c9a0f06ede6a..2e28b3c469d0 100644 --- a/ee/tabby-webserver/src/cron/mod.rs +++ b/ee/tabby-webserver/src/cron/mod.rs @@ -34,10 +34,14 @@ pub async fn run_cron( fn every_two_hours() -> String { let mut rng = rand::thread_rng(); - format!("{} {} */2 * * *", rng.gen_range(0..59), rng.gen_range(0..59)) + format!( + "{} {} */2 * * *", + rng.gen_range(0..59), + rng.gen_range(0..59) + ) } fn every_ten_minutes() -> String { let mut rng = rand::thread_rng(); format!("{} */10 * * * *", rng.gen_range(0..59)) -} \ No newline at end of file +} From 2cc45924ae4d43e65f8b8a43511d1a751c75a281 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Sun, 28 Apr 2024 14:23:33 -0700 Subject: [PATCH 7/8] support running with TABBY_WEBSERVER_CONTROLLER_ONESHOT --- ee/tabby-webserver/src/cron/controller.rs | 56 ++++++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/ee/tabby-webserver/src/cron/controller.rs b/ee/tabby-webserver/src/cron/controller.rs index 61ccf0805f22..f0e31bcbea73 100644 --- a/ee/tabby-webserver/src/cron/controller.rs +++ b/ee/tabby-webserver/src/cron/controller.rs @@ -1,4 +1,4 @@ -use std::{pin::Pin, sync::Arc}; +use std::{pin::Pin, sync::Arc, time::Duration}; use futures::Future; use juniper::ID; @@ -10,6 +10,7 @@ use crate::schema::job::JobService; pub struct JobController { scheduler: JobScheduler, service: Arc, + is_oneshot: bool, } impl JobController { @@ -18,7 +19,17 @@ impl JobController { let scheduler = JobScheduler::new() .await .expect("failed to create job scheduler"); - Self { scheduler, service } + let is_oneshot = std::env::var("TABBY_WEBSERVER_CONTROLLER_ONESHOT").is_ok(); + if is_oneshot { + warn!( + "Running controller job as oneshot, this should only be used for debugging purpose..." + ); + } + Self { + scheduler, + service, + is_oneshot, + } } pub async fn run(&self) { @@ -60,6 +71,47 @@ impl JobController { } async fn register_impl(&mut self, is_public: bool, name: &str, schedule: &str, func: T) + where + T: FnMut(&JobContext) -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + if self.is_oneshot { + self.run_oneshot(is_public, name, func).await; + } else { + self.run_schedule(is_public, name, schedule, func).await; + }; + } + + async fn run_oneshot(&self, is_public: bool, name: &str, mut func: T) + where + T: FnMut(&JobContext) -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + let name = name.to_owned(); + let context = JobContext::new(is_public, &name, self.service.clone()).await; + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(5)).await; + + match func(&context).await { + Ok(exit_code) => { + debug!("Job `{}` completed with exit code {}", &name, exit_code); + context.complete(exit_code).await; + } + Err(e) => { + warn!("Job `{}` failed: {}", &name, e); + context.complete(-1).await; + } + } + }); + } + + async fn run_schedule(&mut self, is_public: bool, name: &str, schedule: &str, func: T) where T: FnMut(&JobContext) -> Pin> + Send>> + Send From 26a18b7085c7d3eb8bac97f54fa298a8967879a4 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Sun, 28 Apr 2024 14:29:42 -0700 Subject: [PATCH 8/8] update --- crates/tabby-scheduler/src/repository.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/tabby-scheduler/src/repository.rs b/crates/tabby-scheduler/src/repository.rs index ee01a126bdec..9d0c7c42c26d 100644 --- a/crates/tabby-scheduler/src/repository.rs +++ b/crates/tabby-scheduler/src/repository.rs @@ -31,7 +31,7 @@ impl RepositoryExt for RepositoryConfig { if code != 0 { warn!( "Failed to clone `{}`. Please check your repository configuration.", - &self.git_url + self.canonical_git_url() ); fs::remove_dir_all(&dir).expect("Failed to remove directory"); }