Skip to content

Commit

Permalink
refactor(webserver): simplify background job creation. (#2045)
Browse files Browse the repository at this point in the history
* chore(webserver): extract layer mod

* extract cinfo / cwarn

* do not expose github / gitlab sync jobs

* every backend job should log to db

* simplify job construction

* update
  • Loading branch information
wsxiaoys authored May 4, 2024
1 parent fee3982 commit 185fe74
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 141 deletions.
9 changes: 1 addition & 8 deletions ee/tabby-schema/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,7 @@ impl Query {

// FIXME(meng): This is a temporary solution to expose the list of jobs, we should consider switching to a enum based approach.
async fn jobs() -> Result<Vec<String>> {
Ok(vec![
"scheduler",
"import_github_repositories",
"import_gitlab_repositories",
]
.into_iter()
.map(Into::into)
.collect())
Ok(vec!["scheduler"].into_iter().map(Into::into).collect())
}

async fn daily_stats_in_past_year(
Expand Down
8 changes: 0 additions & 8 deletions ee/tabby-webserver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,3 @@ macro_rules! bail {
return std::result::Result::Err(anyhow::anyhow!($fmt, $($arg)*).into())
};
}

#[macro_export]
macro_rules! warn_stderr {
($ctx:expr, $($params:tt)+) => {
tracing::warn!($($params)+);
$ctx.stderr_writeline(format!($($params)+)).await;
}
}
32 changes: 17 additions & 15 deletions ee/tabby-webserver/src/service/background_job/db.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::str::FromStr;

use apalis::{
cron::{CronStream, Schedule},
prelude::{Data, Job, Monitor, WorkerBuilder, WorkerFactoryFn},
prelude::{Data, Job, Monitor, WorkerFactoryFn},
utils::TokioExecutor,
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tabby_db::DbConn;
use tracing::debug;

use super::{
cprintln,
helper::{CronJob, JobLogger},
};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DbMaintainanceJob;
Expand All @@ -17,22 +18,23 @@ impl Job for DbMaintainanceJob {
const NAME: &'static str = "db_maintainance";
}

impl CronJob for DbMaintainanceJob {
const SCHEDULE: &'static str = "@hourly";
}

impl DbMaintainanceJob {
async fn cron(_now: DateTime<Utc>, db: Data<DbConn>) -> tabby_schema::Result<()> {
debug!("Running db maintainance job");
async fn cron(
_now: DateTime<Utc>,
logger: Data<JobLogger>,
db: Data<DbConn>,
) -> tabby_schema::Result<()> {
cprintln!(logger, "Running db maintainance job");
db.delete_expired_token().await?;
db.delete_expired_password_resets().await?;
Ok(())
}

pub fn register(monitor: Monitor<TokioExecutor>, db: DbConn) -> Monitor<TokioExecutor> {
let schedule = Schedule::from_str("@hourly").expect("unable to parse cron schedule");

monitor.register(
WorkerBuilder::new(DbMaintainanceJob::NAME)
.stream(CronStream::new(schedule).into_stream())
.data(db)
.build_fn(DbMaintainanceJob::cron),
)
monitor.register(Self::cron_worker(db.clone()).build_fn(DbMaintainanceJob::cron))
}
}
45 changes: 18 additions & 27 deletions ee/tabby-webserver/src/service/background_job/github.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use std::str::FromStr;

use anyhow::Result;
use apalis::{
cron::{CronStream, Schedule},
prelude::{Data, Job, Monitor, Storage, WorkerBuilder, WorkerFactoryFn},
prelude::{Data, Job, Monitor, Storage, WorkerFactoryFn},
sqlite::{SqlitePool, SqliteStorage},
utils::TokioExecutor,
};
use chrono::{DateTime, Utc};
use octocrab::{models::Repository, GitHubError, Octocrab};
use serde::{Deserialize, Serialize};
use tabby_db::{DbConn, GithubRepositoryProviderDAO};
use tower::limit::ConcurrencyLimitLayer;
use tracing::debug;

use super::logger::{JobLogLayer, JobLogger};
use crate::warn_stderr;
use super::{
ceprintln, cprintln,
helper::{BasicJob, CronJob, JobLogger},
};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SyncGithubJob {
Expand All @@ -32,6 +30,10 @@ impl Job for SyncGithubJob {
const NAME: &'static str = "import_github_repositories";
}

impl CronJob for SyncGithubJob {
const SCHEDULE: &'static str = "@hourly";
}

impl SyncGithubJob {
async fn run(self, logger: Data<JobLogger>, db: Data<DbConn>) -> tabby_schema::Result<()> {
refresh_repositories_for_provider((*logger).clone(), (*db).clone(), self.provider_id)
Expand Down Expand Up @@ -66,21 +68,11 @@ impl SyncGithubJob {
db: DbConn,
) -> (SqliteStorage<SyncGithubJob>, Monitor<TokioExecutor>) {
let storage = SqliteStorage::new(pool);
let schedule = Schedule::from_str("@hourly").expect("unable to parse cron schedule");
let monitor = monitor
.register(Self::basic_worker(storage.clone(), db.clone()).build_fn(Self::run))
.register(
WorkerBuilder::new(Self::NAME)
.with_storage(storage.clone())
.layer(ConcurrencyLimitLayer::new(1))
.layer(JobLogLayer::new(db.clone(), Self::NAME))
.data(db.clone())
.build_fn(Self::run),
)
.register(
WorkerBuilder::new(format!("{}-cron", Self::NAME))
.stream(CronStream::new(schedule).into_stream())
Self::cron_worker(db.clone())
.data(storage.clone())
.data(db.clone())
.build_fn(Self::cron),
);

Expand All @@ -102,25 +94,24 @@ async fn refresh_repositories_for_provider(
}) if source.status_code.is_client_error() => {
db.update_github_provider_sync_status(provider_id, false)
.await?;
warn_stderr!(
ceprintln!(
context,
"GitHub credentials for provider {} are expired or invalid",
provider.display_name
);
return Err(source.into());
}
Err(e) => {
warn_stderr!(context, "Failed to fetch repositories from github: {e}");
ceprintln!(context, "Failed to fetch repositories from github: {e}");
return Err(e.into());
}
};
for repo in repos {
context
.stdout_writeline(format!(
"importing: {}",
repo.full_name.as_deref().unwrap_or(&repo.name)
))
.await;
cprintln!(
context,
"importing: {}",
repo.full_name.as_deref().unwrap_or(&repo.name)
);

let id = repo.id.to_string();
let Some(url) = repo.git_url else {
Expand Down
49 changes: 19 additions & 30 deletions ee/tabby-webserver/src/service/background_job/gitlab.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::str::FromStr;

use anyhow::Result;
use apalis::{
cron::{CronStream, Schedule},
prelude::{Data, Job, Monitor, Storage, WorkerBuilder, WorkerFactoryFn},
prelude::{Data, Job, Monitor, Storage, WorkerFactoryFn},
sqlite::{SqlitePool, SqliteStorage},
utils::TokioExecutor,
};
Expand All @@ -14,11 +11,12 @@ use gitlab::{
};
use serde::{Deserialize, Serialize};
use tabby_db::{DbConn, GitlabRepositoryProviderDAO};
use tower::limit::ConcurrencyLimitLayer;
use tracing::debug;

use super::logger::{JobLogLayer, JobLogger};
use crate::warn_stderr;
use super::{
ceprintln, cprintln,
helper::{BasicJob, CronJob, JobLogger},
};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SyncGitlabJob {
Expand All @@ -35,6 +33,10 @@ impl Job for SyncGitlabJob {
const NAME: &'static str = "import_gitlab_repositories";
}

impl CronJob for SyncGitlabJob {
const SCHEDULE: &'static str = "@hourly";
}

impl SyncGitlabJob {
async fn run(self, logger: Data<JobLogger>, db: Data<DbConn>) -> tabby_schema::Result<()> {
refresh_repositories_for_provider((*logger).clone(), (*db).clone(), self.provider_id)
Expand Down Expand Up @@ -69,21 +71,11 @@ impl SyncGitlabJob {
db: DbConn,
) -> (SqliteStorage<SyncGitlabJob>, Monitor<TokioExecutor>) {
let storage = SqliteStorage::new(pool);
let schedule = Schedule::from_str("@hourly").expect("unable to parse cron schedule");
let monitor = monitor
.register(Self::basic_worker(storage.clone(), db.clone()).build_fn(Self::run))
.register(
WorkerBuilder::new(Self::NAME)
.with_storage(storage.clone())
.layer(ConcurrencyLimitLayer::new(1))
.layer(JobLogLayer::new(db.clone(), Self::NAME))
.data(db.clone())
.build_fn(Self::run),
)
.register(
WorkerBuilder::new(format!("{}-cron", Self::NAME))
.stream(CronStream::new(schedule).into_stream())
Self::cron_worker(db.clone())
.data(storage.clone())
.data(db.clone())
.build_fn(Self::cron),
);

Expand All @@ -93,33 +85,30 @@ impl SyncGitlabJob {

async fn refresh_repositories_for_provider(logger: JobLogger, db: DbConn, id: i64) -> Result<()> {
let provider = db.get_gitlab_provider(id).await?;
logger
.stdout_writeline(format!(
"Refreshing repositories for provider: {}\n",
provider.display_name
))
.await;
cprintln!(
logger,
"Refreshing repositories for provider: {}",
provider.display_name
);
let start = Utc::now();
let repos = match fetch_all_repos(&provider).await {
Ok(repos) => repos,
Err(e) if e.is_client_error() => {
db.update_gitlab_provider_sync_status(id, false).await?;
warn_stderr!(
ceprintln!(
logger,
"GitLab credentials for provider {} are expired or invalid",
provider.display_name
);
return Err(e.into());
}
Err(e) => {
warn_stderr!(logger, "Failed to fetch repositories from gitlab: {e}");
ceprintln!(logger, "Failed to fetch repositories from gitlab: {e}");
return Err(e.into());
}
};
for repo in repos {
logger
.stdout_writeline(format!("importing: {}", &repo.path_with_namespace))
.await;
cprintln!(logger, "importing: {}", &repo.path_with_namespace);
let id = repo.id.to_string();
let url = repo.http_url_to_repo;
let url = url.strip_suffix(".git").unwrap_or(&url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ pub struct JobLogger {
}

impl JobLogger {
async fn new(name: &'static str, db: DbConn) -> Self {
async fn new(name: &str, db: DbConn) -> Self {
let id = db
.create_job_run(name.to_owned())
.await
.expect("failed to create job");
Self { id, db }
}

pub async fn stdout_writeline(&self, stdout: String) {
pub async fn r#internal_println(&self, stdout: String) {
let stdout = stdout + "\n";
match self.db.update_job_stdout(self.id, stdout).await {
Ok(_) => (),
Expand All @@ -34,7 +34,7 @@ impl JobLogger {
}
}

pub async fn stderr_writeline(&self, stderr: String) {
pub async fn r#internal_eprintln(&self, stderr: String) {
let stderr = stderr + "\n";
match self.db.update_job_stderr(self.id, stderr).await {
Ok(_) => (),
Expand All @@ -56,12 +56,15 @@ impl JobLogger {

pub struct JobLogLayer {
db: DbConn,
name: &'static str,
name: String,
}

impl JobLogLayer {
pub fn new(db: DbConn, name: &'static str) -> Self {
Self { db, name }
pub fn new(db: DbConn, name: &str) -> Self {
Self {
db,
name: name.to_owned(),
}
}
}

Expand All @@ -71,7 +74,7 @@ impl<S> Layer<S> for JobLogLayer {
fn layer(&self, service: S) -> Self::Service {
JobLogService {
db: self.db.clone(),
name: self.name,
name: self.name.clone(),
service,
}
}
Expand All @@ -80,33 +83,17 @@ impl<S> Layer<S> for JobLogLayer {
#[derive(Clone)]
pub struct JobLogService<S> {
db: DbConn,
name: &'static str,
name: String,
service: S,
}

pub trait ExitCode {
fn into_exit_code(self) -> i32;
}

impl ExitCode for i32 {
fn into_exit_code(self) -> i32 {
self
}
}

impl ExitCode for () {
fn into_exit_code(self) -> i32 {
0
}
}

impl<S, Req> Service<Request<Req>> for JobLogService<S>
where
S: Service<Request<Req>> + Clone,
Request<Req>: Send + 'static,
S: Send + 'static,
S::Future: Send + 'static,
S::Response: Send + ExitCode + 'static,
S::Response: Send + 'static,
S::Error: Send + Debug + 'static,
{
type Response = ();
Expand All @@ -119,16 +106,16 @@ where

fn call(&mut self, mut request: Request<Req>) -> Self::Future {
debug!("Starting job `{}`", self.name);
let name = self.name;
let db = self.db.clone();
let mut service = self.service.clone();
let name = self.name.clone();
let fut_with_log = async move {
let mut logger = JobLogger::new(name, db).await;
let mut logger = JobLogger::new(&name, db).await;
request.insert(logger.clone());
match service.call(request).await {
Ok(res) => {
Ok(_) => {
debug!("Job `{}` completed", name);
logger.complete(res.into_exit_code()).await;
logger.complete(0).await;
Ok(())
}
Err(e) => {
Expand Down
Loading

0 comments on commit 185fe74

Please sign in to comment.