Skip to content

Commit

Permalink
simplify register/register_public implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
wsxiaoys committed Apr 28, 2024
1 parent d877573 commit 4868c91
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 74 deletions.
114 changes: 63 additions & 51 deletions ee/tabby-webserver/src/cron/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,68 +37,43 @@ impl JobController {
+ Clone
+ 'static,
{
let job_mutex = Arc::new(tokio::sync::Mutex::new(()));
let service = self.service.clone();
let name = name.to_owned();
let func = func.clone();
let job = Job::new_async(schedule, move |uuid, mut scheduler| {
let job_mutex = job_mutex.clone();
let service = service.clone();
let name = name.clone();
self.register_impl(true, name, schedule, func).await;
}

/// Register a new job with the scheduler, the job will NOT be displayed in Jobs dashboard.
pub async fn register<T>(&mut self, name: &str, schedule: &str, func: T)
where
T: FnMut() -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
self.register_impl(false, name, schedule, move |_| {
let mut func = func.clone();
Box::pin(async move {
let Ok(_guard) = job_mutex.try_lock() else {
warn!("Job `{}` overlapped, skipping...", name);
return;
};

debug!("Running public job `{}`", name);

let Ok(id) = service.start(name.clone()).await else {
warn!("failed to create job `{}`", &name);
return;
};

let context = JobContext::new(id.clone(), service.clone());
match func(&context).await {
Ok(exit_code) => {
debug!("Job `{}` completed with exit code {}", &name, exit_code);
let _ = service.complete(&id, exit_code).await;
}
Err(e) => {
warn!("Job `{}` failed: {}", &name, e);
let _ = service.complete(&id, -1).await;
}
}

if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await {
debug!(
"Next time for job `{}` is {:?}",
&name,
next_tick.with_timezone(&chrono::Local)
);
}
func().await?;
Ok(0)
})
})
.expect("failed to create job");

self.scheduler.add(job).await.expect("failed to add job");
.await;
}

/// Register a new job with the scheduler, the job will NOT be displayed in Jobs dashboard.
pub async fn register<T>(&mut self, name: &str, schedule: &str, func: T)
async fn register_impl<T>(&mut self, is_public: bool, name: &str, schedule: &str, func: T)
where
T: FnMut() -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
let job_mutex = Arc::new(tokio::sync::Mutex::new(()));
let func = func.clone();
let service = self.service.clone();
let name = name.to_owned();
let func = func.clone();
let job = Job::new_async(schedule, move |uuid, mut scheduler| {
let job_mutex = job_mutex.clone();
let service = service.clone();
let name = name.clone();
let mut func = func.clone();
Box::pin(async move {
Expand All @@ -108,12 +83,16 @@ impl JobController {
};

debug!("Running job `{}`", name);
match func().await {
Ok(_) => {
debug!("Job `{}` completed", name);

let context = JobContext::new(is_public, &name, service.clone()).await;
match func(&context).await {
Ok(exit_code) => {
debug!("Job `{}` completed with exit code {}", &name, exit_code);
context.complete(exit_code).await;
}
Err(e) => {
warn!("Job `{}` failed: {}", name, e);
warn!("Job `{}` failed: {}", &name, e);
context.complete(-1).await;
}
}

Expand All @@ -139,11 +118,27 @@ pub struct JobContext {
}

impl JobContext {
pub fn new(id: ID, service: Arc<dyn JobService>) -> Self {
async fn new(public: bool, name: &str, service: Arc<dyn JobService>) -> Self {
let id = if public {
service
.start(name.to_owned())
.await
.expect("failed to create job")
} else {
ID::from("".to_owned())
};
Self { id, service }
}

fn is_private(&self) -> bool {
self.id.is_empty()
}

pub async fn stdout_writeline(&self, stdout: String) {
if self.is_private() {
return;
}

let stdout = stdout + "\n";
match self.service.update_stdout(&self.id, stdout).await {
Ok(_) => (),
Expand All @@ -154,6 +149,10 @@ impl JobContext {
}

pub async fn stderr_writeline(&self, stderr: String) {
if self.is_private() {
return;
}

let stderr = stderr + "\n";
match self.service.update_stderr(&self.id, stderr).await {
Ok(_) => (),
Expand All @@ -162,4 +161,17 @@ impl JobContext {
}
}
}

async fn complete(&self, exit_code: i32) {
if self.is_private() {
return;
}

match self.service.complete(&self.id, exit_code).await {
Ok(_) => (),
Err(_) => {
warn!("Failed to complete job `{}`", self.id);
}
}
}
}
56 changes: 35 additions & 21 deletions ee/tabby-webserver/src/cron/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ mod gitlab;

use std::sync::Arc;

use rand::Rng;

use super::{controller::JobController, every_ten_minutes, every_two_hours};
use crate::schema::{
auth::AuthenticationService,
Expand All @@ -21,33 +19,49 @@ pub async fn register_jobs(
) {
let cloned_auth = auth.clone();
controller
.register("remove_staled_refresh_token", &every_two_hours(), move || {
let auth = cloned_auth.clone();
Box::pin(async move { Ok(auth.delete_expired_token().await?) })
})
.register(
"remove_staled_refresh_token",
&every_two_hours(),
move || {
let auth = cloned_auth.clone();
Box::pin(async move { Ok(auth.delete_expired_token().await?) })
},
)
.await;

let cloned_auth = auth.clone();
controller
.register("remove_staled_password_reset", &every_two_hours(), move || {
let auth = cloned_auth.clone();
Box::pin(async move { Ok(auth.delete_expired_password_resets().await?) })
})
.register(
"remove_staled_password_reset",
&every_two_hours(),
move || {
let auth = cloned_auth.clone();
Box::pin(async move { Ok(auth.delete_expired_password_resets().await?) })
},
)
.await;

controller
.register_public("github_repositories", &every_ten_minutes(), move |context| {
let context = context.clone();
let github = github.clone();
Box::pin(async move { github::refresh_all_repositories(context, github).await })
})
.register_public(
"github_repositories",
&every_ten_minutes(),
move |context| {
let context = context.clone();
let github = github.clone();
Box::pin(async move { github::refresh_all_repositories(context, github).await })
},
)
.await;

controller
.register_public("gitlab_repositories", &every_ten_minutes(), move |context| {
let gitlab = gitlab.clone();
let context = context.clone();
Box::pin(async move { gitlab::refresh_all_repositories(context, gitlab).await })
})
.register_public(
"gitlab_repositories",
&every_ten_minutes(),
move |context| {
let gitlab = gitlab.clone();
let context = context.clone();
Box::pin(async move { gitlab::refresh_all_repositories(context, gitlab).await })
},
)
.await;
}
}
8 changes: 6 additions & 2 deletions ee/tabby-webserver/src/cron/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ pub async fn run_cron(

fn every_two_hours() -> String {
let mut rng = rand::thread_rng();
format!("{} {} */2 * * *", rng.gen_range(0..59), rng.gen_range(0..59))
format!(
"{} {} */2 * * *",
rng.gen_range(0..59),
rng.gen_range(0..59)
)
}

fn every_ten_minutes() -> String {
let mut rng = rand::thread_rng();
format!("{} */10 * * * *", rng.gen_range(0..59))
}
}

0 comments on commit 4868c91

Please sign in to comment.