Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(webserver): extract JobController #1993

Merged
merged 8 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/tabby-scheduler/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
if code != 0 {
warn!(
"Failed to clone `{}`. Please check your repository configuration.",
&self.git_url
self.canonical_git_url()

Check warning on line 34 in crates/tabby-scheduler/src/repository.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/repository.rs#L34

Added line #L34 was not covered by tests
);
fs::remove_dir_all(&dir).expect("Failed to remove directory");
}
Expand Down
1 change: 1 addition & 0 deletions ee/tabby-webserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tabby-search = { path = "../tabby-search" }
octocrab = "0.38.0"
fs_extra = "1.3.0"
gitlab = "0.1610.0"
rand = "0.8.5"

[dev-dependencies]
assert_matches = "1.5.0"
Expand Down
229 changes: 229 additions & 0 deletions ee/tabby-webserver/src/cron/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
use std::{pin::Pin, sync::Arc, time::Duration};

use futures::Future;
use juniper::ID;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{debug, warn};

use crate::schema::job::JobService;

pub struct JobController {
scheduler: JobScheduler,
service: Arc<dyn JobService>,
is_oneshot: bool,
}

impl JobController {
pub async fn new(service: Arc<dyn JobService>) -> Self {
service.cleanup().await.expect("failed to cleanup jobs");
let scheduler = JobScheduler::new()
.await
.expect("failed to create job scheduler");
let is_oneshot = std::env::var("TABBY_WEBSERVER_CONTROLLER_ONESHOT").is_ok();
if is_oneshot {
warn!(
"Running controller job as oneshot, this should only be used for debugging purpose..."
);
}
Self {
scheduler,
service,
is_oneshot,
}
}

Check warning on line 33 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L17-L33

Added lines #L17 - L33 were not covered by tests

pub async fn run(&self) {
self.scheduler
.start()
.await
.expect("failed to start job scheduler")
}

Check warning on line 40 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L35-L40

Added lines #L35 - L40 were not covered by tests

/// Register a new job with the scheduler, the job will be displayed in Jobs dashboard.
pub async fn register_public<T>(&mut self, name: &str, schedule: &str, func: T)
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
self.register_impl(true, name, schedule, func).await;
}

Check warning on line 52 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L43-L52

Added lines #L43 - L52 were not covered by tests

/// Register a new job with the scheduler, the job will NOT be displayed in Jobs dashboard.
pub async fn register<T>(&mut self, name: &str, schedule: &str, func: T)
where
T: FnMut() -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
self.register_impl(false, name, schedule, move |_| {
let mut func = func.clone();
Box::pin(async move {
func().await?;
Ok(0)
})
})
.await;
}

Check warning on line 71 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L55-L71

Added lines #L55 - L71 were not covered by tests

async fn register_impl<T>(&mut self, is_public: bool, name: &str, schedule: &str, func: T)
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
if self.is_oneshot {
self.run_oneshot(is_public, name, func).await;

Check warning on line 82 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L73-L82

Added lines #L73 - L82 were not covered by tests
} else {
self.run_schedule(is_public, name, schedule, func).await;

Check warning on line 84 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L84

Added line #L84 was not covered by tests
};
}

Check warning on line 86 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L86

Added line #L86 was not covered by tests

async fn run_oneshot<T>(&self, is_public: bool, name: &str, mut func: T)
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
let name = name.to_owned();
let context = JobContext::new(is_public, &name, self.service.clone()).await;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;

Check warning on line 99 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L88-L99

Added lines #L88 - L99 were not covered by tests

match func(&context).await {
Ok(exit_code) => {
debug!("Job `{}` completed with exit code {}", &name, exit_code);
context.complete(exit_code).await;

Check warning on line 104 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L101-L104

Added lines #L101 - L104 were not covered by tests
}
Err(e) => {
warn!("Job `{}` failed: {}", &name, e);
context.complete(-1).await;

Check warning on line 108 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L106-L108

Added lines #L106 - L108 were not covered by tests
}
}
});
}

Check warning on line 112 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L111-L112

Added lines #L111 - L112 were not covered by tests

async fn run_schedule<T>(&mut self, is_public: bool, name: &str, schedule: &str, func: T)
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
let job_mutex = Arc::new(tokio::sync::Mutex::new(()));
let service = self.service.clone();
let name = name.to_owned();
let func = func.clone();
let job = Job::new_async(schedule, move |uuid, mut scheduler| {
let job_mutex = job_mutex.clone();
let service = service.clone();
let name = name.clone();
let mut func = func.clone();
Box::pin(async move {
let Ok(_guard) = job_mutex.try_lock() else {
warn!("Job `{}` overlapped, skipping...", name);
return;

Check warning on line 134 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L114-L134

Added lines #L114 - L134 were not covered by tests
};

debug!("Running job `{}`", name);

Check warning on line 137 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L137

Added line #L137 was not covered by tests

let context = JobContext::new(is_public, &name, service.clone()).await;
match func(&context).await {
Ok(exit_code) => {
debug!("Job `{}` completed with exit code {}", &name, exit_code);
context.complete(exit_code).await;

Check warning on line 143 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L139-L143

Added lines #L139 - L143 were not covered by tests
}
Err(e) => {
warn!("Job `{}` failed: {}", &name, e);
context.complete(-1).await;

Check warning on line 147 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L145-L147

Added lines #L145 - L147 were not covered by tests
}
}

if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await {
debug!(
"Next time for job `{}` is {:?}",
&name,
next_tick.with_timezone(&chrono::Local)
);
}
})
})
.expect("failed to create job");

self.scheduler.add(job).await.expect("failed to add job");
}

Check warning on line 163 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L151-L163

Added lines #L151 - L163 were not covered by tests
}

#[derive(Clone)]
pub struct JobContext {
id: ID,
service: Arc<dyn JobService>,
}

impl JobContext {
async fn new(public: bool, name: &str, service: Arc<dyn JobService>) -> Self {
let id = if public {
service
.start(name.to_owned())
.await
.expect("failed to create job")

Check warning on line 178 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L173-L178

Added lines #L173 - L178 were not covered by tests
} else {
ID::from("".to_owned())

Check warning on line 180 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L180

Added line #L180 was not covered by tests
};
Self { id, service }
}

Check warning on line 183 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L182-L183

Added lines #L182 - L183 were not covered by tests

fn is_private(&self) -> bool {
self.id.is_empty()
}

Check warning on line 187 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L185-L187

Added lines #L185 - L187 were not covered by tests

pub async fn stdout_writeline(&self, stdout: String) {
if self.is_private() {
return;
}

let stdout = stdout + "\n";
match self.service.update_stdout(&self.id, stdout).await {
Ok(_) => (),

Check warning on line 196 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L189-L196

Added lines #L189 - L196 were not covered by tests
Err(_) => {
warn!("Failed to write stdout to job `{}`", self.id);

Check warning on line 198 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L198

Added line #L198 was not covered by tests
}
}
}

Check warning on line 201 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L201

Added line #L201 was not covered by tests

pub async fn stderr_writeline(&self, stderr: String) {
if self.is_private() {
return;
}

let stderr = stderr + "\n";
match self.service.update_stderr(&self.id, stderr).await {
Ok(_) => (),

Check warning on line 210 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L203-L210

Added lines #L203 - L210 were not covered by tests
Err(_) => {
warn!("Failed to write stderr to job `{}`", self.id);

Check warning on line 212 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L212

Added line #L212 was not covered by tests
}
}
}

Check warning on line 215 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L215

Added line #L215 was not covered by tests

async fn complete(&self, exit_code: i32) {
if self.is_private() {
return;
}

match self.service.complete(&self.id, exit_code).await {
Ok(_) => (),

Check warning on line 223 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L217-L223

Added lines #L217 - L223 were not covered by tests
Err(_) => {
warn!("Failed to complete job `{}`", self.id);

Check warning on line 225 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L225

Added line #L225 was not covered by tests
}
}
}

Check warning on line 228 in ee/tabby-webserver/src/cron/controller.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/controller.rs#L228

Added line #L228 was not covered by tests
}
44 changes: 34 additions & 10 deletions ee/tabby-webserver/src/cron/db/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,38 @@
use chrono::Utc;
use juniper::ID;
use octocrab::{models::Repository, GitHubError, Octocrab};
use tracing::warn;

use crate::schema::repository::{GithubRepositoryProvider, GithubRepositoryService};
use crate::{
cron::controller::JobContext,
schema::repository::{GithubRepositoryProvider, GithubRepositoryService},
};

pub async fn refresh_all_repositories(service: Arc<dyn GithubRepositoryService>) -> Result<()> {
pub async fn refresh_all_repositories(
context: JobContext,
service: Arc<dyn GithubRepositoryService>,
) -> Result<i32> {

Check warning on line 16 in ee/tabby-webserver/src/cron/db/github.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/db/github.rs#L13-L16

Added lines #L13 - L16 were not covered by tests
for provider in service
.list_providers(vec![], None, None, None, None)
.await?
{
let start = Utc::now();
refresh_repositories_for_provider(service.clone(), provider.id.clone()).await?;
context
.stdout_writeline(format!(
"Refreshing repositories for provider: {}\n",
provider.display_name
))
.await;
refresh_repositories_for_provider(context.clone(), service.clone(), provider.id.clone())
.await?;

Check warning on line 29 in ee/tabby-webserver/src/cron/db/github.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/db/github.rs#L22-L29

Added lines #L22 - L29 were not covered by tests
service
.delete_outdated_repositories(provider.id, start)
.await?;
}
Ok(())
Ok(0)

Check warning on line 34 in ee/tabby-webserver/src/cron/db/github.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/db/github.rs#L34

Added line #L34 was not covered by tests
}

async fn refresh_repositories_for_provider(
context: JobContext,

Check warning on line 38 in ee/tabby-webserver/src/cron/db/github.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/db/github.rs#L38

Added line #L38 was not covered by tests
service: Arc<dyn GithubRepositoryService>,
provider_id: ID,
) -> Result<()> {
Expand All @@ -36,18 +49,29 @@
service
.update_provider_status(provider.id.clone(), false)
.await?;
warn!(
"GitHub credentials for provider {} are expired or invalid",
provider.display_name
);
context
.stderr_writeline(format!(
"GitHub credentials for provider {} are expired or invalid",
provider.display_name
))
.await;

Check warning on line 57 in ee/tabby-webserver/src/cron/db/github.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/db/github.rs#L52-L57

Added lines #L52 - L57 were not covered by tests
return Err(source.into());
}
Err(e) => {
warn!("Failed to fetch repositories from github: {e}");
context
.stderr_writeline(format!("Failed to fetch repositories from github: {}", e))
.await;

Check warning on line 63 in ee/tabby-webserver/src/cron/db/github.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/db/github.rs#L61-L63

Added lines #L61 - L63 were not covered by tests
return Err(e.into());
}
};
for repo in repos {
context
.stdout_writeline(format!(
"Importing: {}",
repo.full_name.as_deref().unwrap_or(&repo.name)
))
.await;

Check warning on line 73 in ee/tabby-webserver/src/cron/db/github.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/cron/db/github.rs#L68-L73

Added lines #L68 - L73 were not covered by tests

let id = repo.id.to_string();
let Some(url) = repo.git_url else {
continue;
Expand Down
Loading
Loading