Skip to content

Commit

Permalink
feat(webserver): Delete job runs with null exit code on startup (#1492)
Browse files Browse the repository at this point in the history
* feat(webserver): Delete job runs with null exit code on startup

* Apply suggested changes
  • Loading branch information
boxbeam authored Feb 22, 2024
1 parent d5f9483 commit c66f726
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 11 deletions.
7 changes: 7 additions & 0 deletions ee/tabby-db/src/job_runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,11 @@ impl DbConn {
let runs = sqlx::query_as(&query).fetch_all(&self.pool).await?;
Ok(runs)
}

pub async fn cleanup_stale_job_runs(&self) -> Result<()> {
query!("DELETE FROM job_runs WHERE exit_code IS NULL;")
.execute(&self.pool)
.await?;
Ok(())
}
}
30 changes: 20 additions & 10 deletions ee/tabby-webserver/src/cron/db.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
//! db maintenance jobs
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use anyhow::Result;
use futures::Future;
use tokio_cron_scheduler::Job;
use tracing::error;

use crate::schema::auth::AuthenticationService;
use crate::schema::{auth::AuthenticationService, job::JobService};

async fn auth_job<F>(
auth: Arc<dyn AuthenticationService>,
job: fn(Arc<dyn AuthenticationService>) -> F,
) -> Result<Job>
async fn service_job<F, S>(service: Arc<S>, job: fn(Arc<S>) -> F) -> Result<Job>
where
F: Future<Output = Result<()>> + 'static + Send,
S: Send + Sync + 'static + ?Sized,
{
// job is run every 2 hours
let job = Job::new_async("0 0 1/2 * * * *", move |_, _| {
let auth = auth.clone();
let auth = service.clone();
Box::pin(async move {
let res = job(auth.clone()).await;
if let Err(e) = res {
error!("failed to delete expired token: {}", e);
error!("failed to run cleanup job: {}", e);
}
})
})?;
Expand All @@ -31,16 +29,28 @@ where
}

pub async fn refresh_token_job(auth: Arc<dyn AuthenticationService>) -> Result<Job> {
auth_job(
service_job(
auth,
|auth| async move { auth.delete_expired_token().await },
)
.await
}

pub async fn password_reset_job(auth: Arc<dyn AuthenticationService>) -> Result<Job> {
auth_job(auth, |auth| async move {
service_job(auth, |auth| async move {
auth.delete_expired_password_resets().await
})
.await
}

pub async fn stale_job_runs_job(jobs: Arc<dyn JobService>) -> Result<Job> {
let job_res = Job::new_one_shot_async(Duration::from_secs(0), move |_, _| {
let jobs = jobs.clone();
Box::pin(async move {
if let Err(e) = jobs.cleanup_stale_job_runs().await {
error!("failed to cleanup stale job runs: {e}");
}
})
});
Ok(job_res?)
}
8 changes: 7 additions & 1 deletion ee/tabby-webserver/src/cron/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,18 @@ pub async fn run_cron(
};
jobs.push(job2);

let Ok(job3) = scheduler::scheduler_job(job, worker, local_port).await else {
let Ok(job3) = scheduler::scheduler_job(job.clone(), worker, local_port).await else {
error!("failed to create scheduler job");
return;
};
jobs.push(job3);

let Ok(job4) = db::stale_job_runs_job(job).await else {
error!("failed to create stale job runs cleanup job");
return;
};
jobs.push(job4);

if new_job_scheduler(jobs).await.is_err() {
error!("failed to start job scheduler");
};
Expand Down
1 change: 1 addition & 0 deletions ee/tabby-webserver/src/schema/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub trait JobService: Send + Sync {
async fn update_job_stdout(&self, id: &ID, stdout: String) -> Result<()>;
async fn update_job_stderr(&self, id: &ID, stderr: String) -> Result<()>;
async fn complete_job_run(&self, id: &ID, exit_code: i32) -> Result<()>;
async fn cleanup_stale_job_runs(&self) -> Result<()>;

async fn list_job_runs(
&self,
Expand Down
5 changes: 5 additions & 0 deletions ee/tabby-webserver/src/service/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ impl JobService for DbConn {
Ok(())
}

async fn cleanup_stale_job_runs(&self) -> Result<()> {
(self as &DbConn).cleanup_stale_job_runs().await?;
Ok(())
}

async fn list_job_runs(
&self,
ids: Option<Vec<ID>>,
Expand Down

0 comments on commit c66f726

Please sign in to comment.