Skip to content

Commit

Permalink
feat(webserver): Reload repository cache when scheduler job finishes (#…
Browse files Browse the repository at this point in the history
…1585)

* feat(webserver): Reload repository cache when scheduler job finishes

* [autofix.ci] apply automated fixes

* Make RepositoryCache initialize its own reload listener

* Restructure events

* Remove SchedulerJobCompleteEvent

* Apply suggestion

* Apply suggestions

* Apply suggestions

* [autofix.ci] apply automated fixes

* Make start_reload_listener private

* Add test for start_listener

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
boxbeam and autofix-ci[bot] authored Mar 6, 2024
1 parent d701830 commit 77d7fe9
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 52 deletions.
101 changes: 80 additions & 21 deletions ee/tabby-webserver/src/cron/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,45 @@ mod scheduler;

use std::sync::Arc;

use futures::Future;
use tokio::sync::broadcast::{self, error::RecvError, Receiver};
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::error;

use crate::schema::{auth::AuthenticationService, job::JobService, worker::WorkerService};

pub(crate) struct CronEvents {
pub scheduler_job_succeeded: Receiver<()>,
}

pub trait StartListener<E> {
fn start_listener<F, Fut>(&self, handler: F)
where
F: Fn(E) -> Fut + Send + 'static,
Fut: Future + Send,
E: Clone + Send + 'static;
}

impl<E> StartListener<E> for Receiver<E> {
fn start_listener<F, Fut>(&self, handler: F)
where
F: Fn(E) -> Fut + Send + 'static,
Fut: Future + Send,
E: Clone + Send + 'static,
{
let mut recv = self.resubscribe();
tokio::spawn(async move {
loop {
let event = match recv.recv().await {
Ok(event) => event,
Err(RecvError::Closed) => break,
Err(_) => continue,
};
handler(event).await;
}
});
}
}

async fn new_job_scheduler(jobs: Vec<Job>) -> anyhow::Result<JobScheduler> {
let scheduler = JobScheduler::new().await?;
for job in jobs {
Expand All @@ -22,34 +56,59 @@ pub async fn run_cron(
job: Arc<dyn JobService>,
worker: Arc<dyn WorkerService>,
local_port: u16,
) {
) -> CronEvents {
let mut jobs = vec![];
let (send_scheduler_complete, receive_scheduler_complete) = broadcast::channel::<()>(2);

let Ok(job1) = db::refresh_token_job(auth.clone()).await else {
error!("failed to create refresh token cleanup job");
return;
};
let job1 = db::refresh_token_job(auth.clone())
.await
.expect("failed to create refresh token cleanup job");
jobs.push(job1);

let Ok(job2) = db::password_reset_job(auth).await else {
error!("failed to create password reset token cleanup job");
return;
};
let job2 = db::password_reset_job(auth)
.await
.expect("failed to create password reset token cleanup job");
jobs.push(job2);

let Ok(job3) = scheduler::scheduler_job(job.clone(), worker, local_port).await else {
error!("failed to create scheduler job");
return;
};
let job3 = scheduler::scheduler_job(job.clone(), worker, send_scheduler_complete, local_port)
.await
.expect("failed to create scheduler job");
jobs.push(job3);

let Ok(job4) = db::stale_job_runs_job(job).await else {
error!("failed to create stale job runs cleanup job");
return;
};
let job4 = db::stale_job_runs_job(job)
.await
.expect("failed to create stale job runs cleanup job");
jobs.push(job4);

if new_job_scheduler(jobs).await.is_err() {
error!("failed to start job scheduler");
};
new_job_scheduler(jobs)
.await
.expect("failed to start job scheduler");
CronEvents {
scheduler_job_succeeded: receive_scheduler_complete,
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use tokio::sync::Mutex;

use super::*;

#[tokio::test]
async fn test_receiver_events() {
let (send, receive) = broadcast::channel(1);
let counter = Arc::new(Mutex::new(0));
let clone = counter.clone();
receive.start_listener(move |_| {
let clone = clone.clone();
async move {
*clone.lock().await += 1;
}
});
send.send(()).unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(*counter.lock().await, 1);
}
}
6 changes: 5 additions & 1 deletion ee/tabby-webserver/src/cron/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{process::Stdio, sync::Arc};

use anyhow::{Context, Result};
use tokio::io::AsyncBufReadExt;
use tokio::{io::AsyncBufReadExt, sync::broadcast};
use tokio_cron_scheduler::Job;
use tracing::{error, info, warn};

Expand All @@ -10,6 +10,7 @@ use crate::schema::{job::JobService, worker::WorkerService};
pub async fn scheduler_job(
job: Arc<dyn JobService>,
worker: Arc<dyn WorkerService>,
events: broadcast::Sender<()>,
local_port: u16,
) -> anyhow::Result<Job> {
let scheduler_mutex = Arc::new(tokio::sync::Mutex::new(()));
Expand All @@ -18,6 +19,7 @@ pub async fn scheduler_job(
let worker = worker.clone();
let job = job.clone();
let scheduler_mutex = scheduler_mutex.clone();
let events = events.clone();
Box::pin(async move {
let Ok(_guard) = scheduler_mutex.try_lock() else {
warn!("Scheduler job overlapped, skipping...");
Expand All @@ -26,6 +28,8 @@ pub async fn scheduler_job(

if let Err(err) = run_scheduler_now(job, worker, local_port).await {
error!("Failed to run scheduler job, reason: `{}`", err);
} else {
let _ = events.send(());
}

if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await {
Expand Down
9 changes: 4 additions & 5 deletions ee/tabby-webserver/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use tabby_common::api::{code::CodeSearch, event::RawEventLogger, server_setting:
use tracing::warn;

use crate::{
cron, hub, oauth,
cron::{self},
hub, oauth,
repositories::{self, RepositoryCache},
schema::{create_schema, Schema, ServiceLocator},
service::create_service_locator,
Expand All @@ -28,10 +29,8 @@ pub async fn attach_webserver(
local_port: u16,
) -> (Router, Router) {
let ctx = create_service_locator(logger, code, is_chat_enabled).await;
cron::run_cron(ctx.auth(), ctx.job(), ctx.worker(), local_port).await;

let repository_cache = Arc::new(RepositoryCache::new_initialized(ctx.repository()).await);
repository_cache.start_reload_job().await;
let events = cron::run_cron(ctx.auth(), ctx.job(), ctx.worker(), local_port).await;
let repository_cache = RepositoryCache::new_initialized(ctx.repository(), &events).await;

let schema = Arc::new(create_schema());
let rs = Arc::new(repository_cache);
Expand Down
48 changes: 23 additions & 25 deletions ee/tabby-webserver/src/repositories/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ use axum::{
use hyper::Body;
use serde::{Deserialize, Serialize};
use tabby_common::{config::RepositoryConfig, SourceFile, Tag};
use tokio_cron_scheduler::{Job, JobScheduler};
use tower::ServiceExt;
use tower_http::services::ServeDir;
use tracing::{debug, error};
use tracing::{debug, error, warn};

use crate::schema::repository::RepositoryService;
use crate::{
cron::{CronEvents, StartListener},
schema::repository::RepositoryService,
};

pub struct RepositoryCache {
repository_lookup: RwLock<HashMap<RepositoryKey, RepositoryMeta>>,
Expand All @@ -37,14 +39,19 @@ impl std::fmt::Debug for RepositoryCache {
}

impl RepositoryCache {
pub async fn new_initialized(service: Arc<dyn RepositoryService>) -> RepositoryCache {
pub async fn new_initialized(
service: Arc<dyn RepositoryService>,
events: &CronEvents,
) -> Arc<RepositoryCache> {
let cache = RepositoryCache {
repository_lookup: Default::default(),
service,
};
if let Err(e) = cache.reload().await {
error!("Failed to load repositories: {e}");
};
let cache = Arc::new(cache);
cache.start_reload_listener(events);
cache
}

Expand All @@ -62,6 +69,18 @@ impl RepositoryCache {
Ok(())
}

fn start_reload_listener(self: &Arc<Self>, events: &CronEvents) {
let clone = self.clone();
events.scheduler_job_succeeded.start_listener(move |_| {
let clone = clone.clone();
async move {
if let Err(e) = clone.reload().await {
warn!("Error when reloading repository cache: {e}");
};
}
});
}

fn repositories(&self) -> impl Deref<Target = HashMap<RepositoryKey, RepositoryMeta>> + '_ {
self.repository_lookup.read().unwrap()
}
Expand Down Expand Up @@ -265,27 +284,6 @@ impl RepositoryCache {
Ok(resp)
}

pub async fn start_reload_job(self: &Arc<Self>) {
let cache = self.clone();
let scheduler = JobScheduler::new().await.unwrap();
scheduler
.add(
// Reload every 5 minutes
Job::new_async("0 1/5 * * * * *", move |_, _| {
let cache = cache.clone();
Box::pin(async move {
if let Err(e) = cache.reload().await {
error!("Failed to load repositories: {e}");
};
})
})
.unwrap(),
)
.await
.unwrap();
scheduler.start().await.unwrap();
}

pub fn find_repository(&self, name: &str) -> Option<RepositoryConfig> {
let repository_lookup = self.repository_lookup.read().unwrap();
let key = repository_lookup
Expand Down

0 comments on commit 77d7fe9

Please sign in to comment.