From 185fe74ab4ecdabf97b7e38130a1d99cd435a1d2 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Fri, 3 May 2024 19:41:38 -0700 Subject: [PATCH] refactor(webserver): simplify background job creation. (#2045) * chore(webserver): extract layer mod * extract cinfo / cwarn * do not expose github / gitlab sync jobs * every backend job should log to db * simplify job construction * update --- ee/tabby-schema/src/schema/mod.rs | 9 +-- ee/tabby-webserver/src/lib.rs | 8 --- .../src/service/background_job/db.rs | 32 +++++---- .../src/service/background_job/github.rs | 45 +++++------- .../src/service/background_job/gitlab.rs | 49 +++++-------- .../background_job/{ => helper}/logger.rs | 45 +++++------- .../src/service/background_job/helper/mod.rs | 69 +++++++++++++++++++ .../src/service/background_job/mod.rs | 25 ++++++- .../src/service/background_job/scheduler.rs | 46 +++++++------ rules/use-basic-job.yml | 10 +++ 10 files changed, 197 insertions(+), 141 deletions(-) rename ee/tabby-webserver/src/service/background_job/{ => helper}/logger.rs (77%) create mode 100644 ee/tabby-webserver/src/service/background_job/helper/mod.rs create mode 100644 rules/use-basic-job.yml diff --git a/ee/tabby-schema/src/schema/mod.rs b/ee/tabby-schema/src/schema/mod.rs index dc817b62979d..99844e5fe1d0 100644 --- a/ee/tabby-schema/src/schema/mod.rs +++ b/ee/tabby-schema/src/schema/mod.rs @@ -421,14 +421,7 @@ impl Query { // FIXME(meng): This is a temporary solution to expose the list of jobs, we should consider switching to a enum based approach. async fn jobs() -> Result> { - Ok(vec![ - "scheduler", - "import_github_repositories", - "import_gitlab_repositories", - ] - .into_iter() - .map(Into::into) - .collect()) + Ok(vec!["scheduler"].into_iter().map(Into::into).collect()) } async fn daily_stats_in_past_year( diff --git a/ee/tabby-webserver/src/lib.rs b/ee/tabby-webserver/src/lib.rs index c220c8a9d8cf..1b6b58f1f8ca 100644 --- a/ee/tabby-webserver/src/lib.rs +++ b/ee/tabby-webserver/src/lib.rs @@ -36,11 +36,3 @@ macro_rules! bail { return std::result::Result::Err(anyhow::anyhow!($fmt, $($arg)*).into()) }; } - -#[macro_export] -macro_rules! warn_stderr { - ($ctx:expr, $($params:tt)+) => { - tracing::warn!($($params)+); - $ctx.stderr_writeline(format!($($params)+)).await; - } -} diff --git a/ee/tabby-webserver/src/service/background_job/db.rs b/ee/tabby-webserver/src/service/background_job/db.rs index 34a5af5c1d92..c66ee1c02491 100644 --- a/ee/tabby-webserver/src/service/background_job/db.rs +++ b/ee/tabby-webserver/src/service/background_job/db.rs @@ -1,14 +1,15 @@ -use std::str::FromStr; - use apalis::{ - cron::{CronStream, Schedule}, - prelude::{Data, Job, Monitor, WorkerBuilder, WorkerFactoryFn}, + prelude::{Data, Job, Monitor, WorkerFactoryFn}, utils::TokioExecutor, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tabby_db::DbConn; -use tracing::debug; + +use super::{ + cprintln, + helper::{CronJob, JobLogger}, +}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct DbMaintainanceJob; @@ -17,22 +18,23 @@ impl Job for DbMaintainanceJob { const NAME: &'static str = "db_maintainance"; } +impl CronJob for DbMaintainanceJob { + const SCHEDULE: &'static str = "@hourly"; +} + impl DbMaintainanceJob { - async fn cron(_now: DateTime, db: Data) -> tabby_schema::Result<()> { - debug!("Running db maintainance job"); + async fn cron( + _now: DateTime, + logger: Data, + db: Data, + ) -> tabby_schema::Result<()> { + cprintln!(logger, "Running db maintainance job"); db.delete_expired_token().await?; db.delete_expired_password_resets().await?; Ok(()) } pub fn register(monitor: Monitor, db: DbConn) -> Monitor { - let schedule = Schedule::from_str("@hourly").expect("unable to parse cron schedule"); - - monitor.register( - WorkerBuilder::new(DbMaintainanceJob::NAME) - .stream(CronStream::new(schedule).into_stream()) - .data(db) - .build_fn(DbMaintainanceJob::cron), - ) + monitor.register(Self::cron_worker(db.clone()).build_fn(DbMaintainanceJob::cron)) } } diff --git a/ee/tabby-webserver/src/service/background_job/github.rs b/ee/tabby-webserver/src/service/background_job/github.rs index 65c2daed3226..e0ab55287eb3 100644 --- a/ee/tabby-webserver/src/service/background_job/github.rs +++ b/ee/tabby-webserver/src/service/background_job/github.rs @@ -1,9 +1,6 @@ -use std::str::FromStr; - use anyhow::Result; use apalis::{ - cron::{CronStream, Schedule}, - prelude::{Data, Job, Monitor, Storage, WorkerBuilder, WorkerFactoryFn}, + prelude::{Data, Job, Monitor, Storage, WorkerFactoryFn}, sqlite::{SqlitePool, SqliteStorage}, utils::TokioExecutor, }; @@ -11,11 +8,12 @@ use chrono::{DateTime, Utc}; use octocrab::{models::Repository, GitHubError, Octocrab}; use serde::{Deserialize, Serialize}; use tabby_db::{DbConn, GithubRepositoryProviderDAO}; -use tower::limit::ConcurrencyLimitLayer; use tracing::debug; -use super::logger::{JobLogLayer, JobLogger}; -use crate::warn_stderr; +use super::{ + ceprintln, cprintln, + helper::{BasicJob, CronJob, JobLogger}, +}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct SyncGithubJob { @@ -32,6 +30,10 @@ impl Job for SyncGithubJob { const NAME: &'static str = "import_github_repositories"; } +impl CronJob for SyncGithubJob { + const SCHEDULE: &'static str = "@hourly"; +} + impl SyncGithubJob { async fn run(self, logger: Data, db: Data) -> tabby_schema::Result<()> { refresh_repositories_for_provider((*logger).clone(), (*db).clone(), self.provider_id) @@ -66,21 +68,11 @@ impl SyncGithubJob { db: DbConn, ) -> (SqliteStorage, Monitor) { let storage = SqliteStorage::new(pool); - let schedule = Schedule::from_str("@hourly").expect("unable to parse cron schedule"); let monitor = monitor + .register(Self::basic_worker(storage.clone(), db.clone()).build_fn(Self::run)) .register( - WorkerBuilder::new(Self::NAME) - .with_storage(storage.clone()) - .layer(ConcurrencyLimitLayer::new(1)) - .layer(JobLogLayer::new(db.clone(), Self::NAME)) - .data(db.clone()) - .build_fn(Self::run), - ) - .register( - WorkerBuilder::new(format!("{}-cron", Self::NAME)) - .stream(CronStream::new(schedule).into_stream()) + Self::cron_worker(db.clone()) .data(storage.clone()) - .data(db.clone()) .build_fn(Self::cron), ); @@ -102,7 +94,7 @@ async fn refresh_repositories_for_provider( }) if source.status_code.is_client_error() => { db.update_github_provider_sync_status(provider_id, false) .await?; - warn_stderr!( + ceprintln!( context, "GitHub credentials for provider {} are expired or invalid", provider.display_name @@ -110,17 +102,16 @@ async fn refresh_repositories_for_provider( return Err(source.into()); } Err(e) => { - warn_stderr!(context, "Failed to fetch repositories from github: {e}"); + ceprintln!(context, "Failed to fetch repositories from github: {e}"); return Err(e.into()); } }; for repo in repos { - context - .stdout_writeline(format!( - "importing: {}", - repo.full_name.as_deref().unwrap_or(&repo.name) - )) - .await; + cprintln!( + context, + "importing: {}", + repo.full_name.as_deref().unwrap_or(&repo.name) + ); let id = repo.id.to_string(); let Some(url) = repo.git_url else { diff --git a/ee/tabby-webserver/src/service/background_job/gitlab.rs b/ee/tabby-webserver/src/service/background_job/gitlab.rs index 052dfdcc1174..7c41d7414c66 100644 --- a/ee/tabby-webserver/src/service/background_job/gitlab.rs +++ b/ee/tabby-webserver/src/service/background_job/gitlab.rs @@ -1,9 +1,6 @@ -use std::str::FromStr; - use anyhow::Result; use apalis::{ - cron::{CronStream, Schedule}, - prelude::{Data, Job, Monitor, Storage, WorkerBuilder, WorkerFactoryFn}, + prelude::{Data, Job, Monitor, Storage, WorkerFactoryFn}, sqlite::{SqlitePool, SqliteStorage}, utils::TokioExecutor, }; @@ -14,11 +11,12 @@ use gitlab::{ }; use serde::{Deserialize, Serialize}; use tabby_db::{DbConn, GitlabRepositoryProviderDAO}; -use tower::limit::ConcurrencyLimitLayer; use tracing::debug; -use super::logger::{JobLogLayer, JobLogger}; -use crate::warn_stderr; +use super::{ + ceprintln, cprintln, + helper::{BasicJob, CronJob, JobLogger}, +}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct SyncGitlabJob { @@ -35,6 +33,10 @@ impl Job for SyncGitlabJob { const NAME: &'static str = "import_gitlab_repositories"; } +impl CronJob for SyncGitlabJob { + const SCHEDULE: &'static str = "@hourly"; +} + impl SyncGitlabJob { async fn run(self, logger: Data, db: Data) -> tabby_schema::Result<()> { refresh_repositories_for_provider((*logger).clone(), (*db).clone(), self.provider_id) @@ -69,21 +71,11 @@ impl SyncGitlabJob { db: DbConn, ) -> (SqliteStorage, Monitor) { let storage = SqliteStorage::new(pool); - let schedule = Schedule::from_str("@hourly").expect("unable to parse cron schedule"); let monitor = monitor + .register(Self::basic_worker(storage.clone(), db.clone()).build_fn(Self::run)) .register( - WorkerBuilder::new(Self::NAME) - .with_storage(storage.clone()) - .layer(ConcurrencyLimitLayer::new(1)) - .layer(JobLogLayer::new(db.clone(), Self::NAME)) - .data(db.clone()) - .build_fn(Self::run), - ) - .register( - WorkerBuilder::new(format!("{}-cron", Self::NAME)) - .stream(CronStream::new(schedule).into_stream()) + Self::cron_worker(db.clone()) .data(storage.clone()) - .data(db.clone()) .build_fn(Self::cron), ); @@ -93,18 +85,17 @@ impl SyncGitlabJob { async fn refresh_repositories_for_provider(logger: JobLogger, db: DbConn, id: i64) -> Result<()> { let provider = db.get_gitlab_provider(id).await?; - logger - .stdout_writeline(format!( - "Refreshing repositories for provider: {}\n", - provider.display_name - )) - .await; + cprintln!( + logger, + "Refreshing repositories for provider: {}", + provider.display_name + ); let start = Utc::now(); let repos = match fetch_all_repos(&provider).await { Ok(repos) => repos, Err(e) if e.is_client_error() => { db.update_gitlab_provider_sync_status(id, false).await?; - warn_stderr!( + ceprintln!( logger, "GitLab credentials for provider {} are expired or invalid", provider.display_name @@ -112,14 +103,12 @@ async fn refresh_repositories_for_provider(logger: JobLogger, db: DbConn, id: i6 return Err(e.into()); } Err(e) => { - warn_stderr!(logger, "Failed to fetch repositories from gitlab: {e}"); + ceprintln!(logger, "Failed to fetch repositories from gitlab: {e}"); return Err(e.into()); } }; for repo in repos { - logger - .stdout_writeline(format!("importing: {}", &repo.path_with_namespace)) - .await; + cprintln!(logger, "importing: {}", &repo.path_with_namespace); let id = repo.id.to_string(); let url = repo.http_url_to_repo; let url = url.strip_suffix(".git").unwrap_or(&url); diff --git a/ee/tabby-webserver/src/service/background_job/logger.rs b/ee/tabby-webserver/src/service/background_job/helper/logger.rs similarity index 77% rename from ee/tabby-webserver/src/service/background_job/logger.rs rename to ee/tabby-webserver/src/service/background_job/helper/logger.rs index 2eeeed8d186c..43c3ac76b74e 100644 --- a/ee/tabby-webserver/src/service/background_job/logger.rs +++ b/ee/tabby-webserver/src/service/background_job/helper/logger.rs @@ -16,7 +16,7 @@ pub struct JobLogger { } impl JobLogger { - async fn new(name: &'static str, db: DbConn) -> Self { + async fn new(name: &str, db: DbConn) -> Self { let id = db .create_job_run(name.to_owned()) .await @@ -24,7 +24,7 @@ impl JobLogger { Self { id, db } } - pub async fn stdout_writeline(&self, stdout: String) { + pub async fn r#internal_println(&self, stdout: String) { let stdout = stdout + "\n"; match self.db.update_job_stdout(self.id, stdout).await { Ok(_) => (), @@ -34,7 +34,7 @@ impl JobLogger { } } - pub async fn stderr_writeline(&self, stderr: String) { + pub async fn r#internal_eprintln(&self, stderr: String) { let stderr = stderr + "\n"; match self.db.update_job_stderr(self.id, stderr).await { Ok(_) => (), @@ -56,12 +56,15 @@ impl JobLogger { pub struct JobLogLayer { db: DbConn, - name: &'static str, + name: String, } impl JobLogLayer { - pub fn new(db: DbConn, name: &'static str) -> Self { - Self { db, name } + pub fn new(db: DbConn, name: &str) -> Self { + Self { + db, + name: name.to_owned(), + } } } @@ -71,7 +74,7 @@ impl Layer for JobLogLayer { fn layer(&self, service: S) -> Self::Service { JobLogService { db: self.db.clone(), - name: self.name, + name: self.name.clone(), service, } } @@ -80,33 +83,17 @@ impl Layer for JobLogLayer { #[derive(Clone)] pub struct JobLogService { db: DbConn, - name: &'static str, + name: String, service: S, } -pub trait ExitCode { - fn into_exit_code(self) -> i32; -} - -impl ExitCode for i32 { - fn into_exit_code(self) -> i32 { - self - } -} - -impl ExitCode for () { - fn into_exit_code(self) -> i32 { - 0 - } -} - impl Service> for JobLogService where S: Service> + Clone, Request: Send + 'static, S: Send + 'static, S::Future: Send + 'static, - S::Response: Send + ExitCode + 'static, + S::Response: Send + 'static, S::Error: Send + Debug + 'static, { type Response = (); @@ -119,16 +106,16 @@ where fn call(&mut self, mut request: Request) -> Self::Future { debug!("Starting job `{}`", self.name); - let name = self.name; let db = self.db.clone(); let mut service = self.service.clone(); + let name = self.name.clone(); let fut_with_log = async move { - let mut logger = JobLogger::new(name, db).await; + let mut logger = JobLogger::new(&name, db).await; request.insert(logger.clone()); match service.call(request).await { - Ok(res) => { + Ok(_) => { debug!("Job `{}` completed", name); - logger.complete(res.into_exit_code()).await; + logger.complete(0).await; Ok(()) } Err(e) => { diff --git a/ee/tabby-webserver/src/service/background_job/helper/mod.rs b/ee/tabby-webserver/src/service/background_job/helper/mod.rs new file mode 100644 index 000000000000..163a5bd2c027 --- /dev/null +++ b/ee/tabby-webserver/src/service/background_job/helper/mod.rs @@ -0,0 +1,69 @@ +mod logger; + +use std::{pin::Pin, str::FromStr}; + +use apalis::{ + cron::{CronStream, Schedule}, + prelude::{Data, Job, Storage, WorkerBuilder}, +}; +use chrono::{DateTime, Utc}; +use futures::Stream; +pub use logger::{JobLogLayer, JobLogger}; +use tabby_db::DbConn; +use tower::{ + layer::util::{Identity, Stack}, + limit::ConcurrencyLimitLayer, +}; + +type DefaultMiddleware = + Stack, Identity>>>; + +pub trait BasicJob: Job + Sized { + fn basic_worker( + storage: NS, + db: DbConn, + ) -> WorkerBuilder + where + NS: Storage, + { + WorkerBuilder::new(Self::NAME) + .with_storage(storage) + .data(db.clone()) + .layer(ConcurrencyLimitLayer::new(1)) + .layer(JobLogLayer::new(db, Self::NAME)) + } +} + +impl BasicJob for T {} + +pub trait CronJob: Job { + const SCHEDULE: &'static str; + + fn cron_worker( + db: DbConn, + ) -> WorkerBuilder< + DateTime, + Pin< + Box< + (dyn Stream< + Item = Result< + std::option::Option>>, + apalis::prelude::Error, + >, + > + std::marker::Send + + 'static), + >, + >, + DefaultMiddleware, + Serv, + > { + let schedule = Schedule::from_str(Self::SCHEDULE).expect("invalid cron schedule"); + let stream = CronStream::new(schedule).into_stream(); + let name = format!("{}-cron", Self::NAME); + WorkerBuilder::new(&name) + .data(db.clone()) + .stream(stream) + .layer(ConcurrencyLimitLayer::new(1)) + .layer(JobLogLayer::new(db, &name)) + } +} diff --git a/ee/tabby-webserver/src/service/background_job/mod.rs b/ee/tabby-webserver/src/service/background_job/mod.rs index e6a873c89538..f72905c3a42f 100644 --- a/ee/tabby-webserver/src/service/background_job/mod.rs +++ b/ee/tabby-webserver/src/service/background_job/mod.rs @@ -1,7 +1,7 @@ mod db; mod github; mod gitlab; -mod logger; +mod helper; mod scheduler; use std::sync::Arc; @@ -77,7 +77,7 @@ impl BackgroundJob for BackgroundJobImpl { async fn trigger_scheduler(&self) { self.scheduler .clone() - .push(SchedulerJob {}) + .push(SchedulerJob) .await .expect("unable to push job"); } @@ -98,3 +98,24 @@ impl BackgroundJob for BackgroundJobImpl { .expect("unable to push job"); } } + +macro_rules! ceprintln { + ($ctx:expr, $($params:tt)+) => { + { + tracing::warn!($($params)+); + $ctx.r#internal_eprintln(format!($($params)+)).await; + } + } +} + +macro_rules! cprintln { + ($ctx:expr, $($params:tt)+) => { + { + tracing::debug!($($params)+); + $ctx.r#internal_println(format!($($params)+)).await; + } + } +} + +use ceprintln; +use cprintln; diff --git a/ee/tabby-webserver/src/service/background_job/scheduler.rs b/ee/tabby-webserver/src/service/background_job/scheduler.rs index c377e0a79d48..d3b8505455f6 100644 --- a/ee/tabby-webserver/src/service/background_job/scheduler.rs +++ b/ee/tabby-webserver/src/service/background_job/scheduler.rs @@ -1,34 +1,40 @@ -use std::{process::Stdio, str::FromStr}; +use std::process::Stdio; use anyhow::Context; use apalis::{ - cron::{CronStream, Schedule}, - prelude::{Data, Job, Monitor, Storage, WorkerBuilder, WorkerFactoryFn}, + prelude::{Data, Job, Monitor, Storage, WorkerFactoryFn}, sqlite::{SqlitePool, SqliteStorage}, utils::TokioExecutor, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tabby_db::DbConn; +use tabby_schema::bail; use tokio::io::AsyncBufReadExt; -use tower::limit::ConcurrencyLimitLayer; -use super::logger::{JobLogLayer, JobLogger}; +use super::{ + ceprintln, cprintln, + helper::{BasicJob, CronJob, JobLogger}, +}; #[derive(Debug, Serialize, Deserialize, Clone)] -pub struct SchedulerJob {} +pub struct SchedulerJob; impl Job for SchedulerJob { const NAME: &'static str = "scheduler"; } +impl CronJob for SchedulerJob { + const SCHEDULE: &'static str = "@hourly"; +} + impl SchedulerJob { async fn run_impl( self, job_logger: Data, db: Data, local_port: Data, - ) -> anyhow::Result { + ) -> anyhow::Result<()> { let local_port = *local_port; let exe = std::env::current_exe()?; @@ -51,7 +57,7 @@ impl SchedulerJob { let stdout = tokio::io::BufReader::new(stdout); let mut stdout = stdout.lines(); while let Ok(Some(line)) = stdout.next_line().await { - let _ = logger.stdout_writeline(line).await; + cprintln!(logger, "{line}"); } }); } @@ -64,15 +70,17 @@ impl SchedulerJob { let stderr = tokio::io::BufReader::new(stderr); let mut stdout = stderr.lines(); while let Ok(Some(line)) = stdout.next_line().await { - let _ = logger.stderr_writeline(line).await; + ceprintln!(logger, "{line}"); } }); } if let Some(exit_code) = child.wait().await.ok().and_then(|s| s.code()) { - Ok(exit_code) - } else { - Ok(-1) + if exit_code != 0 { + bail!("scheduler exited with code {exit_code}") + } } + + Ok(()) } async fn run( @@ -80,7 +88,7 @@ impl SchedulerJob { logger: Data, db: Data, local_port: Data, - ) -> tabby_schema::Result { + ) -> tabby_schema::Result<()> { Ok(self.run_impl(logger, db, local_port).await?) } @@ -90,7 +98,7 @@ impl SchedulerJob { ) -> tabby_schema::Result<()> { let mut storage = (*storage).clone(); storage - .push(SchedulerJob {}) + .push(SchedulerJob) .await .expect("unable to push job"); Ok(()) @@ -103,20 +111,14 @@ impl SchedulerJob { local_port: u16, ) -> (SqliteStorage, Monitor) { let storage = SqliteStorage::new(pool); - let schedule = Schedule::from_str("@hourly").expect("unable to parse cron schedule"); let monitor = monitor .register( - WorkerBuilder::new(Self::NAME) - .with_storage(storage.clone()) - .layer(ConcurrencyLimitLayer::new(1)) - .layer(JobLogLayer::new(db.clone(), Self::NAME)) - .data(db.clone()) + Self::basic_worker(storage.clone(), db.clone()) .data(local_port) .build_fn(Self::run), ) .register( - WorkerBuilder::new(SchedulerJob::NAME) - .stream(CronStream::new(schedule).into_stream()) + Self::cron_worker(db.clone()) .data(storage.clone()) .build_fn(SchedulerJob::cron), ); diff --git a/rules/use-basic-job.yml b/rules/use-basic-job.yml new file mode 100644 index 000000000000..63a4bec7c525 --- /dev/null +++ b/rules/use-basic-job.yml @@ -0,0 +1,10 @@ +id: use-basic-job +message: Use BasicJob / CronJob for worker creation. +severity: error +language: rust +files: +- ./ee/tabby-webserver/src/service/background_job/** +ignores: +- ./ee/tabby-webserver/src/service/background_job/helper/mod.rs +rule: + pattern: WorkerBuilder \ No newline at end of file