Skip to content

Commit

Permalink
feat: persist tabby scheduler run output into db (#1064)
Browse files Browse the repository at this point in the history
  • Loading branch information
darknight authored Dec 19, 2023
1 parent 6ac52a1 commit 73aa937
Show file tree
Hide file tree
Showing 16 changed files with 289 additions and 93 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions crates/aim-downloader/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,7 @@ fn get_output_file(path: &str, silent: bool) -> (Option<std::fs::File>, u64) {
if !silent {
println!("File exists. Resuming.");
}
file = Some(
std::fs::OpenOptions::new()
.write(true)
.append(true)
.open(path)
.unwrap(),
);
file = Some(std::fs::OpenOptions::new().append(true).open(path).unwrap());

let file_size = std::fs::metadata(path).unwrap().len();
transferred = file_size;
Expand Down
2 changes: 2 additions & 0 deletions crates/tabby-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ version = "0.7.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
ee = []

[dependencies]
anyhow = { workspace = true }
Expand Down
18 changes: 17 additions & 1 deletion crates/tabby-scheduler/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tabby_common::{
use tracing::error;
use tree_sitter_tags::TagsContext;

use crate::utils::tqdm;
use crate::{repository, utils::tqdm};

trait RepositoryExt {
fn create_dataset(&self, writer: &mut impl Write) -> Result<()>;
Expand Down Expand Up @@ -83,6 +83,22 @@ fn is_source_code(entry: &DirEntry) -> bool {
}
}

pub fn sync_repository(config: &Config) {
println!("Syncing repositories...");
let ret = repository::sync_repositories(config);
if let Err(err) = ret {
error!("Failed to sync repositories, err: '{}'", err);
return;
}

println!("Building dataset...");
let ret = create_dataset(config);
if let Err(err) = ret {
error!("Failed to build dataset, err: '{}'", err);
}
println!();
}

pub fn create_dataset(config: &Config) -> Result<()> {
fs::remove_dir_all(dataset_dir()).ok();
fs::create_dir_all(dataset_dir())?;
Expand Down
10 changes: 10 additions & 0 deletions crates/tabby-scheduler/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tabby_common::{
SourceFile,
};
use tantivy::{directory::MmapDirectory, doc, Index};
use tracing::error;

use crate::utils::tqdm;

Expand All @@ -17,6 +18,15 @@ static MAX_LINE_LENGTH_THRESHOLD: usize = 300;
static AVG_LINE_LENGTH_THRESHOLD: f32 = 150f32;
static MAX_BODY_LINES_THRESHOLD: usize = 15;

pub fn index_repository(config: &Config) {
println!("Indexing repositories...");
let ret = index_repositories(config);
if let Err(err) = ret {
error!("Failed to index repositories, err: '{}'", err);
}
println!();
}

pub fn index_repositories(_config: &Config) -> Result<()> {
let code = CodeSearchSchema::new();

Expand Down
39 changes: 20 additions & 19 deletions crates/tabby-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,11 @@ pub async fn scheduler(now: bool) -> Result<()> {
let mut scheduler = JobScheduler::new();

let job1 = || {
println!("Syncing repositories...");
let ret = repository::sync_repositories(&config);
if let Err(err) = ret {
error!("Failed to sync repositories, err: '{}'", err);
return;
}

println!("Building dataset...");
let ret = dataset::create_dataset(&config);
if let Err(err) = ret {
error!("Failed to build dataset, err: '{}'", err);
}
println!();
dataset::sync_repository(&config);
};

let job2 = || {
println!("Indexing repositories...");
let ret = index::index_repositories(&config);
if let Err(err) = ret {
error!("Failed to index repositories, err: '{}'", err);
}
println!()
index::index_repository(&config);
};

if now {
Expand All @@ -58,3 +41,21 @@ pub async fn scheduler(now: bool) -> Result<()> {

Ok(())
}

#[cfg(feature = "ee")]
pub fn job_sync() {
let Ok(config) = Config::load() else {
error!("Scheduler job failed to load config");
return;
};
dataset::sync_repository(&config)
}

#[cfg(feature = "ee")]
pub fn job_index() {
let Ok(config) = Config::load() else {
error!("Scheduler job failed to load config");
return;
};
index::index_repository(&config)
}
2 changes: 1 addition & 1 deletion crates/tabby/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"

[features]
default = ["ee"]
ee = ["dep:tabby-webserver"]
ee = ["dep:tabby-webserver", "tabby-scheduler/ee"]
cuda = ["llama-cpp-bindings/cuda"]
rocm = ["llama-cpp-bindings/rocm"]
experimental-http = ["dep:http-api-bindings"]
Expand Down
14 changes: 14 additions & 0 deletions crates/tabby/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ pub enum Commands {
/// Run scheduler progress for cron jobs integrating external code repositories.
Scheduler(SchedulerArgs),

/// Run repository sync in background
#[cfg(feature = "ee")]
#[clap(name = "job::sync", hide = true)]
JobSync,

/// Run repository index in background
#[cfg(feature = "ee")]
#[clap(name = "job::index", hide = true)]
JobIndex,

/// Run completion model as worker
#[cfg(feature = "ee")]
#[clap(name = "worker::completion")]
Expand Down Expand Up @@ -122,6 +132,10 @@ async fn main() {
.await
.unwrap_or_else(|err| fatal!("Scheduler failed due to '{}'", err)),
#[cfg(feature = "ee")]
Commands::JobSync => tabby_scheduler::job_sync(),
#[cfg(feature = "ee")]
Commands::JobIndex => tabby_scheduler::job_index(),
#[cfg(feature = "ee")]
Commands::WorkerCompletion(args) => {
worker::main(tabby_webserver::public::WorkerKind::Completion, args).await
}
Expand Down
4 changes: 2 additions & 2 deletions ee/tabby-webserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ argon2 = "0.5.1"
async-trait.workspace = true
axum = { workspace = true, features = ["ws", "headers"] }
bincode = "1.3.3"
chrono = "0.4"
chrono = { version = "0.4", features = ["serde"] }
futures.workspace = true
hyper = { workspace = true, features=["client"]}
include_dir = "0.7.3"
Expand Down Expand Up @@ -50,4 +50,4 @@ features = [

[dev-dependencies]
assert_matches = "1.5.0"
tokio = { workspace = true, features = ["macros"] }
tokio = { workspace = true, features = ["macros", "process"] }
1 change: 1 addition & 0 deletions ee/tabby-webserver/migrations/05-job-runs-table/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE job_runs;
11 changes: 11 additions & 0 deletions ee/tabby-webserver/migrations/05-job-runs-table/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE job_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job VARCHAR(255) NOT NULL,
start_ts TIMESTAMP NOT NULL,
end_ts TIMESTAMP,
exit_code INTEGER,
stdout TEXT NOT NULL,
stderr TEXT NOT NULL,
created_at TIMESTAMP DEFAULT (DATETIME('now')),
updated_at TIMESTAMP DEFAULT (DATETIME('now'))
);
61 changes: 0 additions & 61 deletions ee/tabby-webserver/src/service/cron.rs

This file was deleted.

22 changes: 22 additions & 0 deletions ee/tabby-webserver/src/service/cron/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! db maintenance jobs
use anyhow::Result;
use tokio_cron_scheduler::Job;
use tracing::error;

use crate::service::db::DbConn;

pub async fn refresh_token_job(db_conn: DbConn) -> Result<Job> {
// job is run every 2 hours
let job = Job::new_async("0 0 1/2 * * * *", move |_, _| {
let db_conn = db_conn.clone();
Box::pin(async move {
let res = db_conn.delete_expired_token().await;
if let Err(e) = res {
error!("failed to delete expired token: {}", e);
}
})
})?;

Ok(job)
}
97 changes: 97 additions & 0 deletions ee/tabby-webserver/src/service/cron/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
mod db;

use std::time::Duration;

use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::error;

use crate::service::db::{DbConn, JobRun};

async fn new_job_scheduler(jobs: Vec<Job>) -> anyhow::Result<JobScheduler> {
let scheduler = JobScheduler::new().await?;
for job in jobs {
scheduler.add(job).await?;
}
scheduler.start().await?;
Ok(scheduler)
}

pub fn run_cron(db_conn: DbConn) {
tokio::spawn(async move {
let Ok(job1) = db::refresh_token_job(db_conn.clone()).await else {
error!("failed to create db job");
return;
};
// run every 5 minutes
let Ok(job2) = repository_job(db_conn.clone(), "sync".to_owned(), "0 1/5 * * * * *").await
else {
error!("failed to create sync job");
return;
};
// run every 5 hours
let Ok(job3) = repository_job(db_conn.clone(), "index".to_owned(), "0 0 1/5 * * * *").await
else {
error!("failed to create index job");
return;
};

let Ok(mut scheduler) = new_job_scheduler(vec![job1, job2, job3]).await else {
error!("failed to start job scheduler");
return;
};

loop {
match scheduler.time_till_next_job().await {
Ok(Some(duration)) => {
tokio::time::sleep(duration).await;
}
Ok(None) => {
// wait until scheduler increases jobs' tick
tokio::time::sleep(Duration::from_millis(500)).await;
}
Err(e) => {
error!("failed to get job sleep time: {}, re-try in 1 second", e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
}

async fn repository_job(db_conn: DbConn, job_name: String, schedule: &str) -> anyhow::Result<Job> {
let job = Job::new_async(schedule, move |_, _| {
let job_name = job_name.clone();
let db_conn = db_conn.clone();
Box::pin(async move {
// run command as a child process
let start_time = chrono::Utc::now();
let exe = std::env::current_exe().unwrap();
let output = tokio::process::Command::new(exe)
.arg(&format!("job::{}", &job_name))
.output()
.await;
let Ok(output) = output else {
error!("`{}` failed: {:?}", &job_name, output.unwrap_err());
return;
};
let finish_time = chrono::Utc::now();

// save run result to db
let run = JobRun {
id: 0,
job_name,
start_time,
finish_time: Some(finish_time),
exit_code: output.status.code(),
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
};
let res = db_conn.create_job_run(run).await;
if let Err(e) = res {
error!("failed to save job run result: {}", e);
}
})
})?;

Ok(job)
}
Loading

0 comments on commit 73aa937

Please sign in to comment.