Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webserver): trigger backend job when github / gitlab provider got updated #2009

Merged
merged 6 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ee/tabby-db/src/job_runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@

/// db read/write operations for `job_runs` table
impl DbConn {
pub async fn create_job_run(&self, job: String) -> Result<i32> {
pub async fn create_job_run(&self, job: String) -> Result<i64> {

Check warning on line 33 in ee/tabby-db/src/job_runs.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-db/src/job_runs.rs#L33

Added line #L33 was not covered by tests
let rowid = query!(
r#"INSERT INTO job_runs (job, start_ts, stdout, stderr) VALUES (?, DATETIME('now'), '', '')"#,
job,
).execute(&self.pool).await?.last_insert_rowid();

Ok(rowid as i32)
Ok(rowid)

Check warning on line 39 in ee/tabby-db/src/job_runs.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-db/src/job_runs.rs#L39

Added line #L39 was not covered by tests
}

pub async fn update_job_stdout(&self, job_id: i64, stdout: String) -> Result<()> {
Expand Down
204 changes: 105 additions & 99 deletions ee/tabby-webserver/src/cron/controller.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,71 @@
use std::{pin::Pin, sync::Arc, time::Duration};
use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};

use futures::Future;
use juniper::ID;
use rand::Rng;
use tabby_db::DbConn;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{debug, warn};

use crate::schema::job::JobService;

pub struct JobController {
scheduler: JobScheduler,
service: Arc<dyn JobService>,
is_oneshot: bool,
db: DbConn,
job_registry: HashMap<
&'static str,
Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>,
>,
}

impl JobController {
pub async fn new(service: Arc<dyn JobService>) -> Self {
service.cleanup().await.expect("failed to cleanup jobs");
pub async fn new(db: DbConn) -> Self {
db.finalize_stale_job_runs()
.await
.expect("failed to cleanup stale jobs");

Check warning on line 22 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L19-L22

Added lines #L19 - L22 were not covered by tests
let scheduler = JobScheduler::new()
.await
.expect("failed to create job scheduler");
let is_oneshot = std::env::var("TABBY_WEBSERVER_CONTROLLER_ONESHOT").is_ok();
if is_oneshot {
warn!(
"Running controller job as oneshot, this should only be used for debugging purpose..."
);
}
Self {
scheduler,
service,
is_oneshot,
db,
job_registry: HashMap::default(),
}
}

Check warning on line 31 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L28-L31

Added lines #L28 - L31 were not covered by tests

pub fn schedule(&self, name: &str) {
let func = self
.job_registry
.get(name)
.expect("failed to get job")
.clone();
let mut rng = rand::thread_rng();
let delay = rng.gen_range(1..5);
let _ = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5 + delay)).await;
func().await;
});
}

Check warning on line 45 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L33-L45

Added lines #L33 - L45 were not covered by tests

fn run_oneshot(&self) {
warn!(
"Running controller job as oneshot, this should only be used for debugging purpose..."
);
for name in self.job_registry.keys() {
self.schedule(name);

Check warning on line 52 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L47-L52

Added lines #L47 - L52 were not covered by tests
}
}

pub async fn run(&self) {
self.scheduler
.start()
.await
.expect("failed to start job scheduler")
if std::env::var("TABBY_WEBSERVER_CONTROLLER_ONESHOT").is_ok() {
self.run_oneshot();
} else {
self.scheduler
.start()
.await
.expect("failed to start job scheduler")

Check warning on line 63 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L57-L63

Added lines #L57 - L63 were not covered by tests
}
}

/// Register a new job with the scheduler, the job will be displayed in Jobs dashboard.
pub async fn register_public<T>(&mut self, name: &str, schedule: &str, func: T)
pub async fn register_public<T>(&mut self, name: &'static str, schedule: &str, func: T)

Check warning on line 68 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L68

Added line #L68 was not covered by tests
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
Expand All @@ -52,7 +77,7 @@
}

/// Register a new job with the scheduler, the job will NOT be displayed in Jobs dashboard.
pub async fn register<T>(&mut self, name: &str, schedule: &str, func: T)
pub async fn register<T>(&mut self, name: &'static str, schedule: &str, func: T)

Check warning on line 80 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L80

Added line #L80 was not covered by tests
where
T: FnMut() -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
+ Send
Expand All @@ -70,84 +95,66 @@
.await;
}

async fn register_impl<T>(&mut self, is_public: bool, name: &str, schedule: &str, func: T)
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
async fn register_impl<F>(
&mut self,
is_public: bool,
name: &'static str,
schedule: &str,
func: F,
) where
F: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>

Check warning on line 105 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L98-L105

Added lines #L98 - L105 were not covered by tests
+ Send
+ Sync
+ Clone
+ 'static,
{
if self.is_oneshot {
self.run_oneshot(is_public, name, func).await;
} else {
self.run_schedule(is_public, name, schedule, func).await;
};
}
let job_mutex = Arc::new(tokio::sync::Mutex::new(()));
let db = self.db.clone();
self.job_registry.insert(
name,
Arc::new(move || {
let job_mutex = job_mutex.clone();
let db = db.clone();
let mut func = func.clone();

Box::pin(async move {
let Ok(_guard) = job_mutex.try_lock() else {
warn!("Job `{}` overlapped, skipping...", name);
return;

Check warning on line 123 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L111-L123

Added lines #L111 - L123 were not covered by tests
};

debug!("Running job `{}`", name);
let context = JobContext::new(is_public, name, db.clone()).await;
match func(&context).await {
Ok(exit_code) => {
debug!("Job `{}` completed with exit code {}", name, exit_code);
context.complete(exit_code).await;

Check warning on line 131 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L126-L131

Added lines #L126 - L131 were not covered by tests
}
Err(e) => {
warn!("Job `{}` failed: {}", name, e);
context.complete(-1).await;

Check warning on line 135 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L133-L135

Added lines #L133 - L135 were not covered by tests
}
};
})
}),
);

Check warning on line 140 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L138-L140

Added lines #L138 - L140 were not covered by tests

async fn run_oneshot<T>(&self, is_public: bool, name: &str, mut func: T)
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
let name = name.to_owned();
let context = JobContext::new(is_public, &name, self.service.clone()).await;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;

match func(&context).await {
Ok(exit_code) => {
debug!("Job `{}` completed with exit code {}", &name, exit_code);
context.complete(exit_code).await;
}
Err(e) => {
warn!("Job `{}` failed: {}", &name, e);
context.complete(-1).await;
}
}
});
self.add_to_schedule(name, schedule).await

Check warning on line 142 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L142

Added line #L142 was not covered by tests
}

async fn run_schedule<T>(&mut self, is_public: bool, name: &str, schedule: &str, func: T)
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
let job_mutex = Arc::new(tokio::sync::Mutex::new(()));
let service = self.service.clone();
let name = name.to_owned();
let func = func.clone();
async fn add_to_schedule(&mut self, name: &'static str, schedule: &str) {
let func = self
.job_registry
.get_mut(name)
.expect("failed to get job")
.clone();

Check warning on line 151 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L145-L151

Added lines #L145 - L151 were not covered by tests
let job = Job::new_async(schedule, move |uuid, mut scheduler| {
let job_mutex = job_mutex.clone();
let service = service.clone();
let name = name.clone();
let mut func = func.clone();
let func = func.clone();

Check warning on line 153 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L153

Added line #L153 was not covered by tests
Box::pin(async move {
let Ok(_guard) = job_mutex.try_lock() else {
warn!("Job `{}` overlapped, skipping...", name);
return;
};

debug!("Running job `{}`", name);

let context = JobContext::new(is_public, &name, service.clone()).await;
match func(&context).await {
Ok(exit_code) => {
debug!("Job `{}` completed with exit code {}", &name, exit_code);
context.complete(exit_code).await;
}
Err(e) => {
warn!("Job `{}` failed: {}", &name, e);
context.complete(-1).await;
}
}

(*func)().await;

Check warning on line 157 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L157

Added line #L157 was not covered by tests
if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await {
debug!(
"Next time for job `{}` is {:?}",
Expand All @@ -165,25 +172,24 @@

#[derive(Clone)]
pub struct JobContext {
id: ID,
service: Arc<dyn JobService>,
id: i64,
db: DbConn,
}

impl JobContext {
async fn new(public: bool, name: &str, service: Arc<dyn JobService>) -> Self {
async fn new(public: bool, name: &'static str, db: DbConn) -> Self {

Check warning on line 180 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L180

Added line #L180 was not covered by tests
let id = if public {
service
.start(name.to_owned())
db.create_job_run(name.to_owned())

Check warning on line 182 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L182

Added line #L182 was not covered by tests
.await
.expect("failed to create job")
} else {
ID::from("".to_owned())
-1

Check warning on line 186 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L186

Added line #L186 was not covered by tests
};
Self { id, service }
Self { id, db }

Check warning on line 188 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L188

Added line #L188 was not covered by tests
}

fn is_private(&self) -> bool {
self.id.is_empty()
self.id < 0

Check warning on line 192 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L192

Added line #L192 was not covered by tests
}

pub async fn stdout_writeline(&self, stdout: String) {
Expand All @@ -192,7 +198,7 @@
}

let stdout = stdout + "\n";
match self.service.update_stdout(&self.id, stdout).await {
match self.db.update_job_stdout(self.id, stdout).await {

Check warning on line 201 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L201

Added line #L201 was not covered by tests
Ok(_) => (),
Err(_) => {
warn!("Failed to write stdout to job `{}`", self.id);
Expand All @@ -206,7 +212,7 @@
}

let stderr = stderr + "\n";
match self.service.update_stderr(&self.id, stderr).await {
match self.db.update_job_stderr(self.id, stderr).await {

Check warning on line 215 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L215

Added line #L215 was not covered by tests
Ok(_) => (),
Err(_) => {
warn!("Failed to write stderr to job `{}`", self.id);
Expand All @@ -219,7 +225,7 @@
return;
}

match self.service.complete(&self.id, exit_code).await {
match self.db.update_job_status(self.id, exit_code).await {

Check warning on line 228 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L228

Added line #L228 was not covered by tests
Ok(_) => (),
Err(_) => {
warn!("Failed to complete job `{}`", self.id);
Expand Down
30 changes: 11 additions & 19 deletions ee/tabby-webserver/src/cron/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use std::sync::Arc;

use super::{controller::JobController, every_ten_minutes, every_two_hours};
use super::{controller::JobController, every_two_hours};
use crate::schema::{
auth::AuthenticationService,
repository::{GithubRepositoryService, GitlabRepositoryService},
Expand Down Expand Up @@ -42,26 +42,18 @@
.await;

controller
.register_public(
"github_repositories",
&every_ten_minutes(),
move |context| {
let context = context.clone();
let github = github.clone();
Box::pin(async move { github::refresh_all_repositories(context, github).await })
},
)
.register_public("github_repositories", &every_two_hours(), move |context| {
let context = context.clone();
let github = github.clone();
Box::pin(async move { github::refresh_all_repositories(context, github).await })
})

Check warning on line 49 in ee/tabby-webserver/src/cron/db/mod.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/db/mod.rs#L45-L49

Added lines #L45 - L49 were not covered by tests
.await;

controller
.register_public(
"gitlab_repositories",
&every_ten_minutes(),
move |context| {
let gitlab = gitlab.clone();
let context = context.clone();
Box::pin(async move { gitlab::refresh_all_repositories(context, gitlab).await })
},
)
.register_public("gitlab_repositories", &every_two_hours(), move |context| {
let gitlab = gitlab.clone();
let context = context.clone();
Box::pin(async move { gitlab::refresh_all_repositories(context, gitlab).await })
})

Check warning on line 57 in ee/tabby-webserver/src/cron/db/mod.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/db/mod.rs#L53-L57

Added lines #L53 - L57 were not covered by tests
.await;
}
Loading
Loading