diff --git a/crates/tabby-common/src/languages.rs b/crates/tabby-common/src/languages.rs index f2783d3abf1b..d71180722aee 100644 --- a/crates/tabby-common/src/languages.rs +++ b/crates/tabby-common/src/languages.rs @@ -111,7 +111,7 @@ pub fn get_language(language: &str) -> &'static Language { } } -pub fn get_language_by_ext(ext: &OsStr) -> Option<&'static Language> { +pub fn get_language_info_by_ext(ext: &OsStr) -> Option<&'static Language> { let ext = ext.to_str()?; EXTS_LANGUAGE_MAPPING.get(ext).map(|x| get_language(x)) } diff --git a/crates/tabby-common/src/lib.rs b/crates/tabby-common/src/lib.rs index 948fdde75bba..9d2cdb370964 100644 --- a/crates/tabby-common/src/lib.rs +++ b/crates/tabby-common/src/lib.rs @@ -12,7 +12,7 @@ pub mod usage; use std::{ fs::File, - io::{BufReader, Error}, + io::BufReader, ops::Range, path::{Path, PathBuf}, }; @@ -38,14 +38,14 @@ impl SourceFile { dataset_dir().join("files.jsonl") } - pub fn all() -> Result, Error> { + pub fn all() -> impl Iterator { let files = glob::glob(format!("{}*", Self::files_jsonl().display()).as_str()).unwrap(); - let iter = files.filter_map(|x| x.ok()).flat_map(|path| { + + files.filter_map(|x| x.ok()).flat_map(|path| { let fp = BufReader::new(File::open(path).unwrap()); let reader = JsonLinesReader::new(fp); reader.read_all::().filter_map(|x| x.ok()) - }); - Ok(iter) + }) } pub fn read_content(&self) -> std::io::Result { diff --git a/crates/tabby-scheduler/src/dataset/mod.rs b/crates/tabby-scheduler/src/dataset/mod.rs index 6cfd50a93299..5b6df1207b91 100644 --- a/crates/tabby-scheduler/src/dataset/mod.rs +++ b/crates/tabby-scheduler/src/dataset/mod.rs @@ -5,33 +5,29 @@ use std::{ io::{IsTerminal, Write}, }; -use anyhow::{anyhow, Result}; +use anyhow::Result; use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate}; -use ignore::{DirEntry, Walk}; +use ignore::Walk; use kdam::BarExt; use serde_jsonlines::WriteExt; use tabby_common::{ config::RepositoryConfig, - languages::get_language_by_ext, + languages::get_language_info_by_ext, path::{dataset_dir, dependency_file}, DependencyFile, SourceFile, }; -use tracing::error; +use tracing::{debug, error}; use crate::{code::CodeIntelligence, utils::tqdm}; trait RepositoryExt { - fn create_dataset(&self, writer: &mut impl Write) -> Result<()>; + fn create_dataset(&self, writer: &mut impl Write); } impl RepositoryExt for RepositoryConfig { - fn create_dataset(&self, writer: &mut impl Write) -> Result<()> { + fn create_dataset(&self, writer: &mut impl Write) { let basedir = self.dir(); - let walk_dir_iter = || { - Walk::new(basedir.as_path()) - .filter_map(Result::ok) - .filter(is_source_code) - }; + let walk_dir_iter = || Walk::new(basedir.as_path()).filter_map(Result::ok); let mut pb = std::io::stdout() .is_terminal() @@ -40,20 +36,27 @@ impl RepositoryExt for RepositoryConfig { let mut code = CodeIntelligence::default(); for entry in walk_dir { - pb.as_mut().map(|b| b.update(1)).transpose()?; - let relative_path = entry .path() .strip_prefix(basedir.as_path()) .expect("Paths always begin with the prefix"); - let language = get_language_by_ext( - relative_path - .extension() - .ok_or_else(|| anyhow!("Unknown file extension for {relative_path:?}"))?, - ) - .ok_or_else(|| anyhow!("Unknown language for {relative_path:?}"))? - .to_owned() - .language(); + + let Some(ext) = relative_path.extension() else { + debug!("No extension for {relative_path:?}"); + continue; + }; + + let Some(language_info) = get_language_info_by_ext(ext) else { + debug!("Unknown language for {relative_path:?}"); + continue; + }; + + pb.as_mut() + .map(|b| b.update(1)) + .transpose() + .expect("Failed to update progress bar"); + + let language = language_info.language(); match read_to_string(entry.path()) { Ok(file_content) => { let source_file = SourceFile { @@ -66,7 +69,9 @@ impl RepositoryExt for RepositoryConfig { tags: code.find_tags(language, &file_content), language: language.into(), }; - writer.write_json_lines([source_file.clone()])?; + writer + .write_json_lines([source_file.clone()]) + .expect("Failed to write dataset jsonl file"); } Err(e) => { error!( @@ -77,26 +82,12 @@ impl RepositoryExt for RepositoryConfig { } } } - - Ok(()) - } -} - -fn is_source_code(entry: &DirEntry) -> bool { - if entry.file_type().is_some_and(|x| x.is_file()) { - entry - .path() - .extension() - .and_then(get_language_by_ext) - .is_some() - } else { - false } } -pub fn create_dataset(config: &[RepositoryConfig]) -> Result<()> { +pub fn create_dataset(config: &[RepositoryConfig]) { fs::remove_dir_all(dataset_dir()).ok(); - fs::create_dir_all(dataset_dir())?; + fs::create_dir_all(dataset_dir()).expect("Failed to create dataset directory"); let mut writer = FileRotate::new( SourceFile::files_jsonl(), @@ -110,13 +101,13 @@ pub fn create_dataset(config: &[RepositoryConfig]) -> Result<()> { let mut deps = DependencyFile::default(); for repository in config { deps::collect(repository.dir().as_path(), &mut deps); - repository.create_dataset(&mut writer)?; + repository.create_dataset(&mut writer); } - serdeconv::to_json_file(&deps, dependency_file())?; + serdeconv::to_json_file(&deps, dependency_file()) + .expect("Failed to write dependencies json file"); - writer.flush()?; - Ok(()) + writer.flush().expect("Failed to flush writer"); } mod metrics { diff --git a/crates/tabby-scheduler/src/index.rs b/crates/tabby-scheduler/src/index.rs index c03ee7273c78..c3c5f9d367eb 100644 --- a/crates/tabby-scheduler/src/index.rs +++ b/crates/tabby-scheduler/src/index.rs @@ -1,12 +1,10 @@ -use std::{fs, io::IsTerminal}; +use std::{fs, io::IsTerminal, path::Path}; -use anyhow::Result; use kdam::BarExt; use tabby_common::{ config::RepositoryConfig, index::{register_tokenizers, CodeSearchSchema}, - path::index_dir, - SourceFile, + path, SourceFile, }; use tantivy::{directory::MmapDirectory, doc, Index}; use tracing::warn; @@ -17,19 +15,23 @@ use crate::{code::CodeIntelligence, utils::tqdm}; static MAX_LINE_LENGTH_THRESHOLD: usize = 300; static AVG_LINE_LENGTH_THRESHOLD: f32 = 150f32; -pub fn index_repositories(_config: &[RepositoryConfig]) -> Result<()> { +pub fn index_repositories(_config: &[RepositoryConfig]) { let code = CodeSearchSchema::new(); - fs::create_dir_all(index_dir())?; - let directory = MmapDirectory::open(index_dir())?; - let index = Index::open_or_create(directory, code.schema)?; + let index_dir = path::index_dir(); + fs::create_dir_all(&index_dir).expect("Failed to create index directory"); + let index = open_or_create_index(&code, &index_dir); register_tokenizers(&index); // Initialize the search index writer with an initial arena size of 150 MB. - let mut writer = index.writer(150_000_000)?; - writer.delete_all_documents()?; + let mut writer = index + .writer(150_000_000) + .expect("Failed to create index writer"); + writer + .delete_all_documents() + .expect("Failed to delete all documents"); - let total_file_size: usize = SourceFile::all()? + let total_file_size: usize = SourceFile::all() .filter(is_valid_file) .map(|x| x.read_file_size()) .sum(); @@ -39,7 +41,7 @@ pub fn index_repositories(_config: &[RepositoryConfig]) -> Result<()> { .then(|| tqdm(total_file_size)); let intelligence = CodeIntelligence::default(); - for file in SourceFile::all()?.filter(is_valid_file) { + for file in SourceFile::all().filter(is_valid_file) { let text = match file.read_content() { Ok(content) => content, Err(e) => { @@ -49,24 +51,49 @@ pub fn index_repositories(_config: &[RepositoryConfig]) -> Result<()> { }; for body in intelligence.chunks(&text) { - pb.as_mut().map(|b| b.update(body.len())).transpose()?; + pb.as_mut() + .map(|b| b.update(body.len())) + .transpose() + .expect("Failed to update progress bar"); - writer.add_document(doc!( - code.field_git_url => file.git_url.clone(), - code.field_filepath => file.filepath.clone(), - code.field_language => file.language.clone(), - code.field_body => body, - ))?; + writer + .add_document(doc!( + code.field_git_url => file.git_url.clone(), + code.field_filepath => file.filepath.clone(), + code.field_language => file.language.clone(), + code.field_body => body, + )) + .expect("Failed to add document"); } } - writer.commit()?; - writer.wait_merging_threads()?; - - Ok(()) + writer.commit().expect("Failed to commit index"); + writer + .wait_merging_threads() + .expect("Failed to wait for merging threads"); } fn is_valid_file(file: &SourceFile) -> bool { file.max_line_length <= MAX_LINE_LENGTH_THRESHOLD && file.avg_line_length <= AVG_LINE_LENGTH_THRESHOLD } + +fn open_or_create_index(code: &CodeSearchSchema, path: &Path) -> Index { + match open_or_create_index_impl(code, path) { + Ok(index) => index, + Err(err) => { + warn!( + "Failed to open index repositories: {}, removing index directory '{}'...", + err, + path.display() + ); + fs::remove_dir_all(path).expect("Failed to remove index directory"); + open_or_create_index_impl(code, path).expect("Failed to open index") + } + } +} + +fn open_or_create_index_impl(code: &CodeSearchSchema, path: &Path) -> tantivy::Result { + let directory = MmapDirectory::open(path).expect("Failed to open index directory"); + Index::open_or_create(directory, code.schema.clone()) +} diff --git a/crates/tabby-scheduler/src/lib.rs b/crates/tabby-scheduler/src/lib.rs index 02ebbb90f8d7..d46f2d0a01a1 100644 --- a/crates/tabby-scheduler/src/lib.rs +++ b/crates/tabby-scheduler/src/lib.rs @@ -6,88 +6,70 @@ mod index; mod repository; mod utils; -use std::{fs, sync::Arc}; +use std::sync::Arc; -use anyhow::Result; -use tabby_common::{ - config::{RepositoryAccess, RepositoryConfig}, - path, -}; +use tabby_common::config::{RepositoryAccess, RepositoryConfig}; use tokio_cron_scheduler::{Job, JobScheduler}; -use tracing::{error, info, warn}; +use tracing::{info, warn}; -pub async fn scheduler(now: bool, access: T) -> Result<()> { +pub async fn scheduler(now: bool, access: T) { if now { - let repositories = access.list_repositories().await?; - job_sync(&repositories)?; - job_index(&repositories)?; + let repositories = access + .list_repositories() + .await + .expect("Must be able to retrieve repositories for sync"); + job_sync(&repositories); + job_index(&repositories); } else { let access = Arc::new(access); - let scheduler = JobScheduler::new().await?; + let scheduler = JobScheduler::new() + .await + .expect("Failed to create scheduler"); let scheduler_mutex = Arc::new(tokio::sync::Mutex::new(())); // Every 10 minutes scheduler - .add(Job::new_async("0 1/10 * * * *", move |_, _| { - let access = access.clone(); - let scheduler_mutex = scheduler_mutex.clone(); - Box::pin(async move { - let Ok(_guard) = scheduler_mutex.try_lock() else { - warn!("Scheduler job overlapped, skipping..."); - return; - }; + .add( + Job::new_async("0 1/10 * * * *", move |_, _| { + let access = access.clone(); + let scheduler_mutex = scheduler_mutex.clone(); + Box::pin(async move { + let Ok(_guard) = scheduler_mutex.try_lock() else { + warn!("Scheduler job overlapped, skipping..."); + return; + }; - let repositories = access - .list_repositories() - .await - .expect("Must be able to retrieve repositories for sync"); - if let Err(e) = job_sync(&repositories) { - error!("{e}"); - } - if let Err(e) = job_index(&repositories) { - error!("{e}") - } + let repositories = access + .list_repositories() + .await + .expect("Must be able to retrieve repositories for sync"); + + job_sync(&repositories); + job_index(&repositories); + }) }) - })?) - .await?; + .expect("Failed to create job"), + ) + .await + .expect("Failed to add job"); info!("Scheduler activated..."); - scheduler.start().await?; + scheduler.start().await.expect("Failed to start scheduler"); // Sleep 10 years (indefinitely) tokio::time::sleep(tokio::time::Duration::from_secs(3600 * 24 * 365 * 10)).await; } - - Ok(()) } -fn job_index(repositories: &[RepositoryConfig]) -> Result<()> { +fn job_index(repositories: &[RepositoryConfig]) { println!("Indexing repositories..."); - let ret = index::index_repositories(repositories); - if let Err(err) = ret { - let index_dir = path::index_dir(); - warn!( - "Failed to index repositories: {}, removing index directory '{}'...", - err, - index_dir.display() - ); - fs::remove_dir_all(index_dir)?; - return Err(err.context("Failed to index repositories")); - } - Ok(()) + index::index_repositories(repositories); } -fn job_sync(repositories: &[RepositoryConfig]) -> Result<()> { +fn job_sync(repositories: &[RepositoryConfig]) { println!("Syncing {} repositories...", repositories.len()); - let ret = repository::sync_repositories(repositories); - if let Err(err) = ret { - return Err(err.context("Failed to sync repositories")); - } + repository::sync_repositories(repositories); println!("Building dataset..."); - let ret = dataset::create_dataset(repositories); - if let Err(err) = ret { - return Err(err.context("Failed to build dataset")); - } - Ok(()) + dataset::create_dataset(repositories); } diff --git a/crates/tabby-scheduler/tests/integration_test.rs b/crates/tabby-scheduler/tests/integration_test.rs index 325534b8dcb2..de9dc9487aff 100644 --- a/crates/tabby-scheduler/tests/integration_test.rs +++ b/crates/tabby-scheduler/tests/integration_test.rs @@ -35,7 +35,6 @@ mod tests { )], }; - let res = tabby_scheduler::scheduler(true, config).await; - res.expect("Failed to run scheduler"); + tabby_scheduler::scheduler(true, config).await; } } diff --git a/crates/tabby/src/main.rs b/crates/tabby/src/main.rs index fc92e9399c4b..eb6327329786 100644 --- a/crates/tabby/src/main.rs +++ b/crates/tabby/src/main.rs @@ -156,14 +156,10 @@ async fn main() { token: Some(token), }) => { let client = tabby_webserver::public::create_scheduler_client(&url, &token).await; - tabby_scheduler::scheduler(now, client) - .await - .unwrap_or_else(|err| fatal!("Scheduler failed due to '{}'", err)) + tabby_scheduler::scheduler(now, client).await } Commands::Scheduler(SchedulerArgs { now, .. }) => { - tabby_scheduler::scheduler(now, ConfigRepositoryAccess) - .await - .unwrap_or_else(|err| fatal!("Scheduler failed due to '{}'", err)) + tabby_scheduler::scheduler(now, ConfigRepositoryAccess).await } #[cfg(feature = "ee")] Commands::WorkerCompletion(ref args) => {