diff --git a/Cargo.lock b/Cargo.lock index cffea930671d..b9502880d0d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,8 +138,9 @@ checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3" [[package]] name = "apalis" -version = "0.5.2" -source = "git+https://github.com/geofmureithi/apalis?rev=f63480c#f63480c3b53682ae29ec7e6c28757709e8c6e6ac" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be13bf89e734a1ec4d44233429aafea5a9e693c98a4a126b00a29f321d4a2e03" dependencies = [ "apalis-core", "apalis-cron", @@ -157,8 +158,9 @@ dependencies = [ [[package]] name = "apalis-core" -version = "0.5.2" -source = "git+https://github.com/geofmureithi/apalis?rev=f63480c#f63480c3b53682ae29ec7e6c28757709e8c6e6ac" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fb0704a3274e289bebbe042d7adf2b1455a2afd084c7a835cfc2e918cad2eff" dependencies = [ "async-oneshot", "futures", @@ -173,8 +175,9 @@ dependencies = [ [[package]] name = "apalis-cron" -version = "0.5.2" -source = "git+https://github.com/geofmureithi/apalis?rev=f63480c#f63480c3b53682ae29ec7e6c28757709e8c6e6ac" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3918af413df3fb888bb662b7504ea16cbbabd20293a08f9e7548c57764612db" dependencies = [ "apalis-core", "async-stream", @@ -186,8 +189,9 @@ dependencies = [ [[package]] name = "apalis-redis" -version = "0.5.2" -source = "git+https://github.com/geofmureithi/apalis?rev=f63480c#f63480c3b53682ae29ec7e6c28757709e8c6e6ac" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8deabd06576b44f87e0fa709e44aa7edc47937b4325eac78384168df47ba30b" dependencies = [ "apalis-core", "async-stream", @@ -202,8 +206,9 @@ dependencies = [ [[package]] name = "apalis-sql" -version = "0.5.2" -source = "git+https://github.com/geofmureithi/apalis?rev=f63480c#f63480c3b53682ae29ec7e6c28757709e8c6e6ac" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb4df1ac2762e170a12a920d1f74207816341e5eed5870887cc3bcd9e8c59028" dependencies = [ "apalis-core", "async-stream", @@ -5567,7 +5572,6 @@ version = "0.12.0-dev.0" dependencies = [ "anyhow", "apalis", - "apalis-sql", "argon2", "assert_matches", "async-trait", diff --git a/crates/llama-cpp-server/src/supervisor.rs b/crates/llama-cpp-server/src/supervisor.rs index 2561cda74d03..9cd8bc4b7f50 100644 --- a/crates/llama-cpp-server/src/supervisor.rs +++ b/crates/llama-cpp-server/src/supervisor.rs @@ -89,6 +89,8 @@ impl LlamaCppSupervisor { "llama-server exited with status code {}, restarting...", status_code ); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } }); diff --git a/crates/tabby/src/services/code.rs b/crates/tabby/src/services/code.rs index c99568ce3a19..4b7a2bf10d59 100644 --- a/crates/tabby/src/services/code.rs +++ b/crates/tabby/src/services/code.rs @@ -3,7 +3,6 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::Result; use async_trait::async_trait; use cached::{CachedAsync, TimedCache}; -use color_eyre::owo_colors::colors::css::Tan; use parse_git_url::GitUrl; use tabby_common::{ api::code::{ @@ -19,13 +18,13 @@ use tabby_common::{ }; use tabby_inference::Embedding; use tantivy::{ - collector::{Count, TopDocs}, + collector::TopDocs, schema::{self, Value}, IndexReader, TantivyDocument, }; use tokio::sync::Mutex; -use super::{embedding, tantivy::IndexReaderProvider}; +use super::tantivy::IndexReaderProvider; struct CodeSearchImpl { config_access: Arc, @@ -118,10 +117,12 @@ fn merge_code_responses_by_rank( ) -> CodeSearchResponse { let mut scored_hits: HashMap = HashMap::default(); - for (rank, embedding, doc) in compute_rank_score(embedding_resp).into_iter() { - let mut scores = CodeSearchScores::default(); - scores.combined_rank = rank; - scores.embedding = embedding; + for (combined_rank, embedding, doc) in compute_rank_score(embedding_resp).into_iter() { + let scores = CodeSearchScores { + combined_rank, + embedding, + ..Default::default() + }; scored_hits.insert(get_chunk_id(&doc).to_owned(), (scores, doc)); } @@ -137,7 +138,6 @@ fn merge_code_responses_by_rank( let mut scored_hits: Vec = scored_hits .into_values() - .into_iter() .map(|(scores, doc)| create_hit(scores, doc)) .collect(); scored_hits.sort_unstable_by_key(|x| x.scores.combined_rank); @@ -160,7 +160,7 @@ fn compute_rank_score(resp: Vec<(f32, TantivyDocument)>) -> Vec<(i32, f32, Tanti .collect() } -fn get_chunk_id<'a>(doc: &'a TantivyDocument) -> &'a str { +fn get_chunk_id(doc: &TantivyDocument) -> &str { let schema = IndexSchema::instance(); get_text(doc, schema.field_chunk_id) } diff --git a/ee/tabby-webserver/Cargo.toml b/ee/tabby-webserver/Cargo.toml index c73d75d393dd..e31190d1dbcc 100644 --- a/ee/tabby-webserver/Cargo.toml +++ b/ee/tabby-webserver/Cargo.toml @@ -13,7 +13,7 @@ anyhow.workspace = true argon2 = "0.5.1" async-trait.workspace = true axum = { workspace = true, features = ["ws"] } -axum-extra = {workspace = true, features = ["typed-header"]} +axum-extra = { workspace = true, features = ["typed-header"] } bincode = "1.3.3" chrono = { workspace = true, features = ["serde"] } futures.workspace = true @@ -48,8 +48,7 @@ tabby-git = { path = "../../crates/tabby-git" } octocrab = "0.38.0" fs_extra = "1.3.0" gitlab = "0.1610.0" -apalis = { git = "https://github.com/geofmureithi/apalis", rev = "f63480c", features = ["sqlite", "cron" ] } -apalis-sql = { git = "https://github.com/geofmureithi/apalis", rev = "f63480c" } +apalis = { version = "0.5.3", features = ["cron"] } uuid.workspace = true strum.workspace = true @@ -57,6 +56,6 @@ strum.workspace = true assert_matches.workspace = true tokio = { workspace = true, features = ["macros"] } tabby-db = { path = "../../ee/tabby-db", features = ["testutils"] } -tabby-common = { path = "../../crates/tabby-common", features = [ "testutils" ] } +tabby-common = { path = "../../crates/tabby-common", features = ["testutils"] } serial_test = { workspace = true } temp_testdir = { workspace = true } diff --git a/ee/tabby-webserver/src/path.rs b/ee/tabby-webserver/src/path.rs index a0c1379da597..28a8d6136c8a 100644 --- a/ee/tabby-webserver/src/path.rs +++ b/ee/tabby-webserver/src/path.rs @@ -13,11 +13,3 @@ pub fn db_file() -> PathBuf { tabby_ee_root().join("dev-db.sqlite") } } - -pub fn job_db_file() -> PathBuf { - if cfg!(feature = "prod") { - tabby_ee_root().join("job.sqlite") - } else { - tabby_ee_root().join("dev-job.sqlite") - } -} diff --git a/ee/tabby-webserver/src/service/background_job/helper/mod.rs b/ee/tabby-webserver/src/service/background_job/helper/mod.rs index 27638608ac87..783b40ec04c4 100644 --- a/ee/tabby-webserver/src/service/background_job/helper/mod.rs +++ b/ee/tabby-webserver/src/service/background_job/helper/mod.rs @@ -4,7 +4,7 @@ use std::{pin::Pin, str::FromStr}; use apalis::{ cron::{CronStream, Schedule}, - prelude::{Data, Job, Storage, WorkerBuilder}, + prelude::{Backend, Data, Job, Request, WorkerBuilder}, }; use chrono::{DateTime, Utc}; use futures::Stream; @@ -20,14 +20,14 @@ type DefaultMiddleware = pub trait BasicJob: Job + Sized { fn basic_worker( - storage: NS, + backend: NS, db: DbConn, ) -> WorkerBuilder where - NS: Storage, + NS: Backend>, { WorkerBuilder::new(Self::NAME) - .with_storage(storage) + .source(backend) .data(db.clone()) .layer(ConcurrencyLimitLayer::new(1)) .layer(JobLogLayer::new(db, Self::NAME)) diff --git a/ee/tabby-webserver/src/service/background_job/mod.rs b/ee/tabby-webserver/src/service/background_job/mod.rs index 1bf211e74657..c97a5277039c 100644 --- a/ee/tabby-webserver/src/service/background_job/mod.rs +++ b/ee/tabby-webserver/src/service/background_job/mod.rs @@ -3,12 +3,9 @@ mod helper; mod scheduler; mod third_party_integration; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; -use apalis::{ - prelude::{Monitor, Storage}, - sqlite::{SqlitePool, SqliteStorage}, -}; +use apalis::prelude::{MemoryStorage, MessageQueue, Monitor}; use juniper::ID; use tabby_common::config::{ConfigAccess, RepositoryConfig}; use tabby_db::DbConn; @@ -18,7 +15,6 @@ use tabby_schema::{integration::IntegrationService, repository::ThirdPartyReposi use self::{ db::DbMaintainanceJob, scheduler::SchedulerJob, third_party_integration::SyncIntegrationJob, }; -use crate::path::job_db_file; pub enum BackgroundJobEvent { Scheduler(RepositoryConfig), @@ -26,8 +22,8 @@ pub enum BackgroundJobEvent { } struct BackgroundJobImpl { - scheduler: SqliteStorage, - third_party_repository: SqliteStorage, + scheduler: MemoryStorage, + third_party_repository: MemoryStorage, } pub async fn start( @@ -38,30 +34,13 @@ pub async fn start( embedding: Arc, mut receiver: tokio::sync::mpsc::UnboundedReceiver, ) { - let path = format!("sqlite://{}?mode=rwc", job_db_file().display()); - let pool = SqlitePool::connect(&path) - .await - .expect("unable to create sqlite pool"); - SqliteStorage::setup(&pool) - .await - .expect("unable to run migrations for sqlite"); - - let config = apalis_sql::Config::default().poll_interval(Duration::from_secs(5)); let monitor = Monitor::new(); let monitor = DbMaintainanceJob::register(monitor, db.clone()); - let (scheduler, monitor) = SchedulerJob::register( - monitor, - pool.clone(), - db.clone(), - config.clone(), - config_access, - embedding, - ); + let (scheduler, monitor) = + SchedulerJob::register(monitor, db.clone(), config_access, embedding); let (third_party_repository, monitor) = SyncIntegrationJob::register( monitor, - pool.clone(), db.clone(), - config.clone(), third_party_repository_service, integration_service, ); @@ -86,7 +65,7 @@ impl BackgroundJobImpl { async fn trigger_scheduler(&self, repository: RepositoryConfig) { self.scheduler .clone() - .push(SchedulerJob::new(repository)) + .enqueue(SchedulerJob::new(repository)) .await .expect("unable to push job"); } @@ -94,7 +73,7 @@ impl BackgroundJobImpl { async fn trigger_sync_integration(&self, provider_id: ID) { self.third_party_repository .clone() - .push(SyncIntegrationJob::new(provider_id)) + .enqueue(SyncIntegrationJob::new(provider_id)) .await .expect("Unable to push job"); } diff --git a/ee/tabby-webserver/src/service/background_job/scheduler.rs b/ee/tabby-webserver/src/service/background_job/scheduler.rs index 532caeccb832..92e1feb425fd 100644 --- a/ee/tabby-webserver/src/service/background_job/scheduler.rs +++ b/ee/tabby-webserver/src/service/background_job/scheduler.rs @@ -1,12 +1,10 @@ use std::sync::Arc; -use anyhow::Context; +use anyhow::{anyhow, Context}; use apalis::{ - prelude::{Data, Job, Monitor, Storage, WorkerFactoryFn}, - sqlite::{SqlitePool, SqliteStorage}, + prelude::{Data, Job, MemoryStorage, MessageQueue, Monitor, WorkerFactoryFn}, utils::TokioExecutor, }; -use apalis_sql::Config; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tabby_common::config::{ConfigAccess, RepositoryConfig}; @@ -62,7 +60,7 @@ impl SchedulerJob { async fn cron( _now: DateTime, config_access: Data>, - storage: Data>, + storage: Data>, ) -> tabby_schema::Result<()> { let repositories = config_access .repositories() @@ -72,26 +70,22 @@ impl SchedulerJob { let mut code = CodeIndexer::default(); code.garbage_collection(&repositories); - let mut storage = (*storage).clone(); - for repository in repositories { storage - .push(SchedulerJob::new(repository)) + .enqueue(SchedulerJob::new(repository)) .await - .context("unable to push job")?; + .map_err(|_| anyhow!("Failed to enqueue scheduler job"))?; } Ok(()) } pub fn register( monitor: Monitor, - pool: SqlitePool, db: DbConn, - config: Config, config_access: Arc, embedding: Arc, - ) -> (SqliteStorage, Monitor) { - let storage = SqliteStorage::new_with_config(pool, config); + ) -> (MemoryStorage, Monitor) { + let storage = MemoryStorage::default(); let monitor = monitor .register( Self::basic_worker(storage.clone(), db.clone()) diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs index 80d74fd0deb5..54a34f1c50b0 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs @@ -1,11 +1,10 @@ use std::sync::Arc; +use anyhow::anyhow; use apalis::{ - prelude::{Data, Job, Monitor, Storage, WorkerFactoryFn}, - sqlite::{SqlitePool, SqliteStorage}, + prelude::{Data, Job, MemoryStorage, MessageQueue, Monitor, WorkerFactoryFn}, utils::TokioExecutor, }; -use apalis_sql::Config; use chrono::{DateTime, Utc}; use juniper::ID; use serde::{Deserialize, Serialize}; @@ -45,33 +44,30 @@ impl SyncIntegrationJob { async fn cron( _now: DateTime, - storage: Data>, + storage: Data>, integration_service: Data>, ) -> tabby_schema::Result<()> { debug!("Syncing all third-party repositories"); - let mut storage = (*storage).clone(); for integration in integration_service .list_integrations(None, None, None, None, None, None) .await? { storage - .push(SyncIntegrationJob::new(integration.id)) + .enqueue(SyncIntegrationJob::new(integration.id)) .await - .expect("Unable to push job"); + .map_err(|_| anyhow!("Failed to enqueue scheduler job"))?; } Ok(()) } pub fn register( monitor: Monitor, - pool: SqlitePool, db: DbConn, - config: Config, repository_service: Arc, integration_service: Arc, - ) -> (SqliteStorage, Monitor) { - let storage = SqliteStorage::new_with_config(pool, config); + ) -> (MemoryStorage, Monitor) { + let storage = MemoryStorage::default(); let monitor = monitor .register( Self::basic_worker(storage.clone(), db.clone())