diff --git a/Cargo.lock b/Cargo.lock index 92db4adeab3b..2c3d29b16dfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5106,6 +5106,7 @@ dependencies = [ "octocrab", "pin-project", "querystring", + "rand 0.8.5", "regex", "reqwest", "rust-embed 8.0.0", diff --git a/crates/tabby-scheduler/src/repository.rs b/crates/tabby-scheduler/src/repository.rs index ee01a126bdec..9d0c7c42c26d 100644 --- a/crates/tabby-scheduler/src/repository.rs +++ b/crates/tabby-scheduler/src/repository.rs @@ -31,7 +31,7 @@ impl RepositoryExt for RepositoryConfig { if code != 0 { warn!( "Failed to clone `{}`. Please check your repository configuration.", - &self.git_url + self.canonical_git_url() ); fs::remove_dir_all(&dir).expect("Failed to remove directory"); } diff --git a/ee/tabby-webserver/Cargo.toml b/ee/tabby-webserver/Cargo.toml index 8205406e730b..462488569f28 100644 --- a/ee/tabby-webserver/Cargo.toml +++ b/ee/tabby-webserver/Cargo.toml @@ -53,6 +53,7 @@ tabby-search = { path = "../tabby-search" } octocrab = "0.38.0" fs_extra = "1.3.0" gitlab = "0.1610.0" +rand = "0.8.5" [dev-dependencies] assert_matches = "1.5.0" diff --git a/ee/tabby-webserver/src/cron/controller.rs b/ee/tabby-webserver/src/cron/controller.rs new file mode 100644 index 000000000000..f0e31bcbea73 --- /dev/null +++ b/ee/tabby-webserver/src/cron/controller.rs @@ -0,0 +1,229 @@ +use std::{pin::Pin, sync::Arc, time::Duration}; + +use futures::Future; +use juniper::ID; +use tokio_cron_scheduler::{Job, JobScheduler}; +use tracing::{debug, warn}; + +use crate::schema::job::JobService; + +pub struct JobController { + scheduler: JobScheduler, + service: Arc, + is_oneshot: bool, +} + +impl JobController { + pub async fn new(service: Arc) -> Self { + service.cleanup().await.expect("failed to cleanup jobs"); + let scheduler = JobScheduler::new() + .await + .expect("failed to create job scheduler"); + let is_oneshot = std::env::var("TABBY_WEBSERVER_CONTROLLER_ONESHOT").is_ok(); + if is_oneshot { + warn!( + "Running controller job as oneshot, this should only be used for debugging purpose..." + ); + } + Self { + scheduler, + service, + is_oneshot, + } + } + + pub async fn run(&self) { + self.scheduler + .start() + .await + .expect("failed to start job scheduler") + } + + /// Register a new job with the scheduler, the job will be displayed in Jobs dashboard. + pub async fn register_public(&mut self, name: &str, schedule: &str, func: T) + where + T: FnMut(&JobContext) -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + self.register_impl(true, name, schedule, func).await; + } + + /// Register a new job with the scheduler, the job will NOT be displayed in Jobs dashboard. + pub async fn register(&mut self, name: &str, schedule: &str, func: T) + where + T: FnMut() -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + self.register_impl(false, name, schedule, move |_| { + let mut func = func.clone(); + Box::pin(async move { + func().await?; + Ok(0) + }) + }) + .await; + } + + async fn register_impl(&mut self, is_public: bool, name: &str, schedule: &str, func: T) + where + T: FnMut(&JobContext) -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + if self.is_oneshot { + self.run_oneshot(is_public, name, func).await; + } else { + self.run_schedule(is_public, name, schedule, func).await; + }; + } + + async fn run_oneshot(&self, is_public: bool, name: &str, mut func: T) + where + T: FnMut(&JobContext) -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + let name = name.to_owned(); + let context = JobContext::new(is_public, &name, self.service.clone()).await; + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(5)).await; + + match func(&context).await { + Ok(exit_code) => { + debug!("Job `{}` completed with exit code {}", &name, exit_code); + context.complete(exit_code).await; + } + Err(e) => { + warn!("Job `{}` failed: {}", &name, e); + context.complete(-1).await; + } + } + }); + } + + async fn run_schedule(&mut self, is_public: bool, name: &str, schedule: &str, func: T) + where + T: FnMut(&JobContext) -> Pin> + Send>> + + Send + + Sync + + Clone + + 'static, + { + let job_mutex = Arc::new(tokio::sync::Mutex::new(())); + let service = self.service.clone(); + let name = name.to_owned(); + let func = func.clone(); + let job = Job::new_async(schedule, move |uuid, mut scheduler| { + let job_mutex = job_mutex.clone(); + let service = service.clone(); + let name = name.clone(); + let mut func = func.clone(); + Box::pin(async move { + let Ok(_guard) = job_mutex.try_lock() else { + warn!("Job `{}` overlapped, skipping...", name); + return; + }; + + debug!("Running job `{}`", name); + + let context = JobContext::new(is_public, &name, service.clone()).await; + match func(&context).await { + Ok(exit_code) => { + debug!("Job `{}` completed with exit code {}", &name, exit_code); + context.complete(exit_code).await; + } + Err(e) => { + warn!("Job `{}` failed: {}", &name, e); + context.complete(-1).await; + } + } + + if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await { + debug!( + "Next time for job `{}` is {:?}", + &name, + next_tick.with_timezone(&chrono::Local) + ); + } + }) + }) + .expect("failed to create job"); + + self.scheduler.add(job).await.expect("failed to add job"); + } +} + +#[derive(Clone)] +pub struct JobContext { + id: ID, + service: Arc, +} + +impl JobContext { + async fn new(public: bool, name: &str, service: Arc) -> Self { + let id = if public { + service + .start(name.to_owned()) + .await + .expect("failed to create job") + } else { + ID::from("".to_owned()) + }; + Self { id, service } + } + + fn is_private(&self) -> bool { + self.id.is_empty() + } + + pub async fn stdout_writeline(&self, stdout: String) { + if self.is_private() { + return; + } + + let stdout = stdout + "\n"; + match self.service.update_stdout(&self.id, stdout).await { + Ok(_) => (), + Err(_) => { + warn!("Failed to write stdout to job `{}`", self.id); + } + } + } + + pub async fn stderr_writeline(&self, stderr: String) { + if self.is_private() { + return; + } + + let stderr = stderr + "\n"; + match self.service.update_stderr(&self.id, stderr).await { + Ok(_) => (), + Err(_) => { + warn!("Failed to write stderr to job `{}`", self.id); + } + } + } + + async fn complete(&self, exit_code: i32) { + if self.is_private() { + return; + } + + match self.service.complete(&self.id, exit_code).await { + Ok(_) => (), + Err(_) => { + warn!("Failed to complete job `{}`", self.id); + } + } + } +} diff --git a/ee/tabby-webserver/src/cron/db/github.rs b/ee/tabby-webserver/src/cron/db/github.rs index 1f184ca891c9..e2201fc539ec 100644 --- a/ee/tabby-webserver/src/cron/db/github.rs +++ b/ee/tabby-webserver/src/cron/db/github.rs @@ -4,25 +4,38 @@ use anyhow::Result; use chrono::Utc; use juniper::ID; use octocrab::{models::Repository, GitHubError, Octocrab}; -use tracing::warn; -use crate::schema::repository::{GithubRepositoryProvider, GithubRepositoryService}; +use crate::{ + cron::controller::JobContext, + schema::repository::{GithubRepositoryProvider, GithubRepositoryService}, +}; -pub async fn refresh_all_repositories(service: Arc) -> Result<()> { +pub async fn refresh_all_repositories( + context: JobContext, + service: Arc, +) -> Result { for provider in service .list_providers(vec![], None, None, None, None) .await? { let start = Utc::now(); - refresh_repositories_for_provider(service.clone(), provider.id.clone()).await?; + context + .stdout_writeline(format!( + "Refreshing repositories for provider: {}\n", + provider.display_name + )) + .await; + refresh_repositories_for_provider(context.clone(), service.clone(), provider.id.clone()) + .await?; service .delete_outdated_repositories(provider.id, start) .await?; } - Ok(()) + Ok(0) } async fn refresh_repositories_for_provider( + context: JobContext, service: Arc, provider_id: ID, ) -> Result<()> { @@ -36,18 +49,29 @@ async fn refresh_repositories_for_provider( service .update_provider_status(provider.id.clone(), false) .await?; - warn!( - "GitHub credentials for provider {} are expired or invalid", - provider.display_name - ); + context + .stderr_writeline(format!( + "GitHub credentials for provider {} are expired or invalid", + provider.display_name + )) + .await; return Err(source.into()); } Err(e) => { - warn!("Failed to fetch repositories from github: {e}"); + context + .stderr_writeline(format!("Failed to fetch repositories from github: {}", e)) + .await; return Err(e.into()); } }; for repo in repos { + context + .stdout_writeline(format!( + "Importing: {}", + repo.full_name.as_deref().unwrap_or(&repo.name) + )) + .await; + let id = repo.id.to_string(); let Some(url) = repo.git_url else { continue; diff --git a/ee/tabby-webserver/src/cron/db/gitlab.rs b/ee/tabby-webserver/src/cron/db/gitlab.rs index 00d00f7c2bed..6d73b6dbf25e 100644 --- a/ee/tabby-webserver/src/cron/db/gitlab.rs +++ b/ee/tabby-webserver/src/cron/db/gitlab.rs @@ -8,25 +8,38 @@ use gitlab::{ }; use juniper::ID; use serde::Deserialize; -use tracing::warn; -use crate::schema::repository::{GitlabRepositoryProvider, GitlabRepositoryService}; +use crate::{ + cron::controller::JobContext, + schema::repository::{GitlabRepositoryProvider, GitlabRepositoryService}, +}; -pub async fn refresh_all_repositories(service: Arc) -> Result<()> { +pub async fn refresh_all_repositories( + context: JobContext, + service: Arc, +) -> Result { for provider in service .list_providers(vec![], None, None, None, None) .await? { let start = Utc::now(); - refresh_repositories_for_provider(service.clone(), provider.id.clone()).await?; + context + .stdout_writeline(format!( + "Refreshing repositories for provider: {}\n", + provider.display_name + )) + .await; + refresh_repositories_for_provider(context.clone(), service.clone(), provider.id.clone()) + .await?; service .delete_outdated_repositories(provider.id, start) .await?; } - Ok(()) + Ok(0) } async fn refresh_repositories_for_provider( + context: JobContext, service: Arc, provider_id: ID, ) -> Result<()> { @@ -37,18 +50,25 @@ async fn refresh_repositories_for_provider( service .update_provider_status(provider.id.clone(), false) .await?; - warn!( - "GitLab credentials for provider {} are expired or invalid", - provider.display_name - ); + context + .stderr_writeline(format!( + "GitLab credentials for provider {} are expired or invalid", + provider.display_name + )) + .await; return Err(e); } Err(e) => { - warn!("Failed to fetch repositories from github: {e}"); + context + .stderr_writeline(format!("Failed to fetch repositories from gitlab: {e}")) + .await; return Err(e); } }; for repo in repos { + context + .stdout_writeline(format!("Importing: {}", &repo.name_with_namespace)) + .await; let id = repo.id.to_string(); let url = repo.http_url_to_repo; let url = url.strip_suffix(".git").unwrap_or(&url); diff --git a/ee/tabby-webserver/src/cron/db/mod.rs b/ee/tabby-webserver/src/cron/db/mod.rs index 8f52796b70de..eb63d817c69c 100644 --- a/ee/tabby-webserver/src/cron/db/mod.rs +++ b/ee/tabby-webserver/src/cron/db/mod.rs @@ -3,105 +3,65 @@ mod github; mod gitlab; -use std::{sync::Arc, time::Duration}; - -use anyhow::Result; -use futures::Future; -use tokio_cron_scheduler::Job; -use tracing::{debug, error}; +use std::sync::Arc; +use super::{controller::JobController, every_ten_minutes, every_two_hours}; use crate::schema::{ auth::AuthenticationService, - job::JobService, repository::{GithubRepositoryService, GitlabRepositoryService}, }; -const EVERY_TWO_HOURS: &str = "0 0 1/2 * * * *"; -const EVERY_TEN_MINUTES: &str = "0 1/10 * * * *"; - -async fn service_job( - name: &str, - frequency: &'static str, - service: Arc, - job: fn(Arc) -> F, -) -> Result -where - F: Future> + 'static + Send, - S: Send + Sync + 'static + ?Sized, -{ - let name = name.to_owned(); - let job = Job::new_async(frequency, move |_, _| { - let name = name.clone(); - let auth = service.clone(); - Box::pin(async move { - let res = job(auth.clone()).await; - if let Err(e) = res { - error!("Failed to run `{name}` job: {}", e); - } - }) - })?; - - Ok(job) -} - -pub async fn refresh_token_job(auth: Arc) -> Result { - service_job( - "cleanup staled refresh token", - EVERY_TWO_HOURS, - auth, - |auth| async move { Ok(auth.delete_expired_token().await?) }, - ) - .await -} +pub async fn register_jobs( + controller: &mut JobController, + auth: Arc, + github: Arc, + gitlab: Arc, +) { + let cloned_auth = auth.clone(); + controller + .register( + "remove_staled_refresh_token", + &every_two_hours(), + move || { + let auth = cloned_auth.clone(); + Box::pin(async move { Ok(auth.delete_expired_token().await?) }) + }, + ) + .await; -pub async fn password_reset_job(auth: Arc) -> Result { - service_job( - "cleanup staled password reset", - EVERY_TWO_HOURS, - auth, - |auth| async move { Ok(auth.delete_expired_password_resets().await?) }, - ) - .await -} + let cloned_auth = auth.clone(); + controller + .register( + "remove_staled_password_reset", + &every_two_hours(), + move || { + let auth = cloned_auth.clone(); + Box::pin(async move { Ok(auth.delete_expired_password_resets().await?) }) + }, + ) + .await; -pub async fn update_integrated_github_repositories_job( - github_repository_provider: Arc, -) -> Result { - service_job( - "sync github repositories", - EVERY_TEN_MINUTES, - github_repository_provider, - |github_repository_provider| async move { - debug!("Syncing github repositories..."); - github::refresh_all_repositories(github_repository_provider).await - }, - ) - .await -} - -pub async fn update_integrated_gitlab_repositories_job( - gitlab_repository_provider: Arc, -) -> Result { - service_job( - "sync gitlab repositories", - EVERY_TEN_MINUTES, - gitlab_repository_provider, - |gitlab_repository_provider| async move { - debug!("Syncing gitlab repositories..."); - gitlab::refresh_all_repositories(gitlab_repository_provider).await - }, - ) - .await -} + controller + .register_public( + "github_repositories", + &every_ten_minutes(), + move |context| { + let context = context.clone(); + let github = github.clone(); + Box::pin(async move { github::refresh_all_repositories(context, github).await }) + }, + ) + .await; -pub async fn job_cleanup(jobs: Arc) -> Result { - let job_res = Job::new_one_shot_async(Duration::from_secs(0), move |_, _| { - let jobs = jobs.clone(); - Box::pin(async move { - if let Err(e) = jobs.cleanup().await { - error!("failed to finalize stale job runs: {e}"); - } - }) - }); - Ok(job_res?) + controller + .register_public( + "gitlab_repositories", + &every_ten_minutes(), + move |context| { + let gitlab = gitlab.clone(); + let context = context.clone(); + Box::pin(async move { gitlab::refresh_all_repositories(context, gitlab).await }) + }, + ) + .await; } diff --git a/ee/tabby-webserver/src/cron/mod.rs b/ee/tabby-webserver/src/cron/mod.rs index c61e7cba7539..2e28b3c469d0 100644 --- a/ee/tabby-webserver/src/cron/mod.rs +++ b/ee/tabby-webserver/src/cron/mod.rs @@ -1,24 +1,16 @@ +mod controller; mod db; mod scheduler; use std::sync::Arc; -use tokio_cron_scheduler::{Job, JobScheduler}; +use rand::Rng; use crate::schema::{ auth::AuthenticationService, job::JobService, repository::RepositoryService, worker::WorkerService, }; -async fn new_job_scheduler(jobs: Vec) -> anyhow::Result { - let scheduler = JobScheduler::new().await?; - for job in jobs { - scheduler.add(job).await?; - } - scheduler.start().await?; - Ok(scheduler) -} - pub async fn run_cron( auth: Arc, job: Arc, @@ -26,39 +18,30 @@ pub async fn run_cron( repository: Arc, local_port: u16, ) { - let mut jobs = vec![]; - - let job1 = db::refresh_token_job(auth.clone()) - .await - .expect("failed to create refresh token cleanup job"); - jobs.push(job1); - - let job2 = db::password_reset_job(auth) - .await - .expect("failed to create password reset token cleanup job"); - jobs.push(job2); - - let job3 = scheduler::scheduler_job(job.clone(), worker, local_port) - .await - .expect("failed to create scheduler job"); - jobs.push(job3); - - let job4 = db::job_cleanup(job) - .await - .expect("failed to create stale job runs cleanup job"); - jobs.push(job4); - - let job5 = db::update_integrated_github_repositories_job(repository.github()) - .await - .expect("Failed to create github repository refresh job"); - jobs.push(job5); + let mut controller = controller::JobController::new(job).await; + db::register_jobs( + &mut controller, + auth, + repository.github(), + repository.gitlab(), + ) + .await; + + scheduler::register(&mut controller, worker, local_port).await; + + controller.run().await +} - let job6 = db::update_integrated_gitlab_repositories_job(repository.gitlab()) - .await - .expect("Failed to create gitlab repository refresh job"); - jobs.push(job6); +fn every_two_hours() -> String { + let mut rng = rand::thread_rng(); + format!( + "{} {} */2 * * *", + rng.gen_range(0..59), + rng.gen_range(0..59) + ) +} - new_job_scheduler(jobs) - .await - .expect("failed to start job scheduler"); +fn every_ten_minutes() -> String { + let mut rng = rand::thread_rng(); + format!("{} */10 * * * *", rng.gen_range(0..59)) } diff --git a/ee/tabby-webserver/src/cron/scheduler.rs b/ee/tabby-webserver/src/cron/scheduler.rs index 24522902c067..35137a0723cf 100644 --- a/ee/tabby-webserver/src/cron/scheduler.rs +++ b/ee/tabby-webserver/src/cron/scheduler.rs @@ -1,64 +1,36 @@ -use std::{pin::Pin, process::Stdio, sync::Arc}; +use std::{process::Stdio, sync::Arc}; use anyhow::{Context, Result}; -use futures::Future; use tokio::io::AsyncBufReadExt; -use tokio_cron_scheduler::{Job, JobScheduler}; -use tracing::{debug, error, warn}; +use tracing::debug; -use crate::schema::{job::JobService, worker::WorkerService}; +use super::{ + controller::{JobContext, JobController}, + every_ten_minutes, +}; +use crate::schema::worker::WorkerService; -pub async fn scheduler_job( - job: Arc, +pub async fn register( + controller: &mut JobController, worker: Arc, local_port: u16, -) -> anyhow::Result { - let scheduler_mutex = Arc::new(tokio::sync::Mutex::new(())); - - let scheduler_job = - move |uuid, mut scheduler: JobScheduler| -> Pin + Send>> { +) { + controller + .register_public("scheduler", &every_ten_minutes(), move |context| { + let context = context.clone(); let worker = worker.clone(); - let job = job.clone(); - let scheduler_mutex = scheduler_mutex.clone(); - Box::pin(async move { - let Ok(_guard) = scheduler_mutex.try_lock() else { - warn!("Scheduler job overlapped, skipping..."); - return; - }; - - if let Err(err) = run_scheduler_now(job, worker, local_port).await { - error!("Failed to run scheduler job, reason: `{}`", err); - } - - if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await { - debug!( - "Next time for scheduler job is {:?}", - next_tick.with_timezone(&chrono::Local) - ); - } - }) - }; - - let job = if std::env::var("TABBY_WEBSERVER_SCHEDULER_ONESHOT").is_ok() { - warn!( - "Running scheduler job as oneshot, this should only be used for debugging purpose..." - ); - Job::new_one_shot_async(std::time::Duration::from_secs(10), scheduler_job)? - } else { - Job::new_async("0 1/10 * * * *", scheduler_job)? - }; - - Ok(job) + Box::pin(async move { run_scheduler_now(context, worker, local_port).await }) + }) + .await; } async fn run_scheduler_now( - job: Arc, + context: JobContext, worker: Arc, local_port: u16, -) -> Result<()> { +) -> Result { debug!("Running scheduler job..."); let exe = std::env::current_exe()?; - let job_id = job.start("scheduler".to_owned()).await?; let mut child = tokio::process::Command::new(exe) .arg("scheduler") @@ -73,14 +45,13 @@ async fn run_scheduler_now( { // Pipe stdout - let job = job.clone(); - let job_id = job_id.clone(); let stdout = child.stdout.take().context("Failed to acquire stdout")?; + let ctx = context.clone(); tokio::spawn(async move { let stdout = tokio::io::BufReader::new(stdout); let mut stdout = stdout.lines(); while let Ok(Some(line)) = stdout.next_line().await { - let _ = job.update_stdout(&job_id, line + "\n").await; + let _ = ctx.stdout_writeline(line).await; } }); } @@ -88,21 +59,18 @@ async fn run_scheduler_now( { // Pipe stderr let stderr = child.stderr.take().context("Failed to acquire stderr")?; - let job = job.clone(); - let job_id = job_id.clone(); + let ctx = context.clone(); tokio::spawn(async move { let stderr = tokio::io::BufReader::new(stderr); let mut stdout = stderr.lines(); while let Ok(Some(line)) = stdout.next_line().await { - let _ = job.update_stderr(&job_id, line + "\n").await; + let _ = ctx.stderr_writeline(line).await; } }); } if let Some(exit_code) = child.wait().await.ok().and_then(|s| s.code()) { - job.complete(&job_id, exit_code).await?; + Ok(exit_code) } else { - job.complete(&job_id, -1).await?; + Ok(-1) } - - Ok(()) } diff --git a/ee/tabby-webserver/src/schema/mod.rs b/ee/tabby-webserver/src/schema/mod.rs index ad07b55c413c..73250e3e88cc 100644 --- a/ee/tabby-webserver/src/schema/mod.rs +++ b/ee/tabby-webserver/src/schema/mod.rs @@ -423,8 +423,14 @@ impl Query { ctx.locator.license().read().await } + // FIXME(meng): This is a temporary solution to expose the list of jobs, we should consider switching to a enum based approach. async fn jobs() -> Result> { - Ok(vec!["scheduler".into()]) + Ok( + vec!["scheduler", "github_repositories", "gitlab_repositories"] + .into_iter() + .map(Into::into) + .collect(), + ) } async fn daily_stats_in_past_year(