Skip to content

Commit

Permalink
refactor(webserver): switch MemoryStorage for apalis background jobs (#…
Browse files Browse the repository at this point in the history
…2322)

* change to 0.5.3

* switch to memory storage
  • Loading branch information
wsxiaoys authored Jun 1, 2024
1 parent 98f1e32 commit 074dec1
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 89 deletions.
26 changes: 15 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/llama-cpp-server/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ impl LlamaCppSupervisor {
"llama-server exited with status code {}, restarting...",
status_code
);

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
});
Expand Down
18 changes: 9 additions & 9 deletions crates/tabby/src/services/code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{collections::HashMap, sync::Arc};
use anyhow::Result;
use async_trait::async_trait;
use cached::{CachedAsync, TimedCache};
use color_eyre::owo_colors::colors::css::Tan;
use parse_git_url::GitUrl;
use tabby_common::{
api::code::{
Expand All @@ -19,13 +18,13 @@ use tabby_common::{
};
use tabby_inference::Embedding;
use tantivy::{
collector::{Count, TopDocs},
collector::TopDocs,
schema::{self, Value},
IndexReader, TantivyDocument,
};
use tokio::sync::Mutex;

use super::{embedding, tantivy::IndexReaderProvider};
use super::tantivy::IndexReaderProvider;

struct CodeSearchImpl {
config_access: Arc<dyn ConfigAccess>,
Expand Down Expand Up @@ -118,10 +117,12 @@ fn merge_code_responses_by_rank(
) -> CodeSearchResponse {
let mut scored_hits: HashMap<String, (CodeSearchScores, TantivyDocument)> = HashMap::default();

for (rank, embedding, doc) in compute_rank_score(embedding_resp).into_iter() {
let mut scores = CodeSearchScores::default();
scores.combined_rank = rank;
scores.embedding = embedding;
for (combined_rank, embedding, doc) in compute_rank_score(embedding_resp).into_iter() {
let scores = CodeSearchScores {
combined_rank,
embedding,
..Default::default()
};

scored_hits.insert(get_chunk_id(&doc).to_owned(), (scores, doc));
}
Expand All @@ -137,7 +138,6 @@ fn merge_code_responses_by_rank(

let mut scored_hits: Vec<CodeSearchHit> = scored_hits
.into_values()
.into_iter()
.map(|(scores, doc)| create_hit(scores, doc))
.collect();
scored_hits.sort_unstable_by_key(|x| x.scores.combined_rank);
Expand All @@ -160,7 +160,7 @@ fn compute_rank_score(resp: Vec<(f32, TantivyDocument)>) -> Vec<(i32, f32, Tanti
.collect()
}

fn get_chunk_id<'a>(doc: &'a TantivyDocument) -> &'a str {
fn get_chunk_id(doc: &TantivyDocument) -> &str {
let schema = IndexSchema::instance();
get_text(doc, schema.field_chunk_id)
}
Expand Down
7 changes: 3 additions & 4 deletions ee/tabby-webserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ anyhow.workspace = true
argon2 = "0.5.1"
async-trait.workspace = true
axum = { workspace = true, features = ["ws"] }
axum-extra = {workspace = true, features = ["typed-header"]}
axum-extra = { workspace = true, features = ["typed-header"] }
bincode = "1.3.3"
chrono = { workspace = true, features = ["serde"] }
futures.workspace = true
Expand Down Expand Up @@ -48,15 +48,14 @@ tabby-git = { path = "../../crates/tabby-git" }
octocrab = "0.38.0"
fs_extra = "1.3.0"
gitlab = "0.1610.0"
apalis = { git = "https://github.com/geofmureithi/apalis", rev = "f63480c", features = ["sqlite", "cron" ] }
apalis-sql = { git = "https://github.com/geofmureithi/apalis", rev = "f63480c" }
apalis = { version = "0.5.3", features = ["cron"] }
uuid.workspace = true
strum.workspace = true

[dev-dependencies]
assert_matches.workspace = true
tokio = { workspace = true, features = ["macros"] }
tabby-db = { path = "../../ee/tabby-db", features = ["testutils"] }
tabby-common = { path = "../../crates/tabby-common", features = [ "testutils" ] }
tabby-common = { path = "../../crates/tabby-common", features = ["testutils"] }
serial_test = { workspace = true }
temp_testdir = { workspace = true }
8 changes: 0 additions & 8 deletions ee/tabby-webserver/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,3 @@ pub fn db_file() -> PathBuf {
tabby_ee_root().join("dev-db.sqlite")
}
}

pub fn job_db_file() -> PathBuf {
if cfg!(feature = "prod") {
tabby_ee_root().join("job.sqlite")
} else {
tabby_ee_root().join("dev-job.sqlite")
}
}
8 changes: 4 additions & 4 deletions ee/tabby-webserver/src/service/background_job/helper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{pin::Pin, str::FromStr};

use apalis::{
cron::{CronStream, Schedule},
prelude::{Data, Job, Storage, WorkerBuilder},
prelude::{Backend, Data, Job, Request, WorkerBuilder},
};
use chrono::{DateTime, Utc};
use futures::Stream;
Expand All @@ -20,14 +20,14 @@ type DefaultMiddleware =

pub trait BasicJob: Job + Sized {
fn basic_worker<NS, Serv>(
storage: NS,
backend: NS,
db: DbConn,
) -> WorkerBuilder<Self, NS, DefaultMiddleware, Serv>
where
NS: Storage<Job = Self>,
NS: Backend<Request<Self>>,
{
WorkerBuilder::new(Self::NAME)
.with_storage(storage)
.source(backend)
.data(db.clone())
.layer(ConcurrencyLimitLayer::new(1))
.layer(JobLogLayer::new(db, Self::NAME))
Expand Down
37 changes: 8 additions & 29 deletions ee/tabby-webserver/src/service/background_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ mod helper;
mod scheduler;
mod third_party_integration;

use std::{sync::Arc, time::Duration};
use std::sync::Arc;

use apalis::{
prelude::{Monitor, Storage},
sqlite::{SqlitePool, SqliteStorage},
};
use apalis::prelude::{MemoryStorage, MessageQueue, Monitor};
use juniper::ID;
use tabby_common::config::{ConfigAccess, RepositoryConfig};
use tabby_db::DbConn;
Expand All @@ -18,16 +15,15 @@ use tabby_schema::{integration::IntegrationService, repository::ThirdPartyReposi
use self::{
db::DbMaintainanceJob, scheduler::SchedulerJob, third_party_integration::SyncIntegrationJob,
};
use crate::path::job_db_file;

pub enum BackgroundJobEvent {
Scheduler(RepositoryConfig),
SyncThirdPartyRepositories(ID),
}

struct BackgroundJobImpl {
scheduler: SqliteStorage<SchedulerJob>,
third_party_repository: SqliteStorage<SyncIntegrationJob>,
scheduler: MemoryStorage<SchedulerJob>,
third_party_repository: MemoryStorage<SyncIntegrationJob>,
}

pub async fn start(
Expand All @@ -38,30 +34,13 @@ pub async fn start(
embedding: Arc<dyn Embedding>,
mut receiver: tokio::sync::mpsc::UnboundedReceiver<BackgroundJobEvent>,
) {
let path = format!("sqlite://{}?mode=rwc", job_db_file().display());
let pool = SqlitePool::connect(&path)
.await
.expect("unable to create sqlite pool");
SqliteStorage::setup(&pool)
.await
.expect("unable to run migrations for sqlite");

let config = apalis_sql::Config::default().poll_interval(Duration::from_secs(5));
let monitor = Monitor::new();
let monitor = DbMaintainanceJob::register(monitor, db.clone());
let (scheduler, monitor) = SchedulerJob::register(
monitor,
pool.clone(),
db.clone(),
config.clone(),
config_access,
embedding,
);
let (scheduler, monitor) =
SchedulerJob::register(monitor, db.clone(), config_access, embedding);
let (third_party_repository, monitor) = SyncIntegrationJob::register(
monitor,
pool.clone(),
db.clone(),
config.clone(),
third_party_repository_service,
integration_service,
);
Expand All @@ -86,15 +65,15 @@ impl BackgroundJobImpl {
async fn trigger_scheduler(&self, repository: RepositoryConfig) {
self.scheduler
.clone()
.push(SchedulerJob::new(repository))
.enqueue(SchedulerJob::new(repository))
.await
.expect("unable to push job");
}

async fn trigger_sync_integration(&self, provider_id: ID) {
self.third_party_repository
.clone()
.push(SyncIntegrationJob::new(provider_id))
.enqueue(SyncIntegrationJob::new(provider_id))
.await
.expect("Unable to push job");
}
Expand Down
20 changes: 7 additions & 13 deletions ee/tabby-webserver/src/service/background_job/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::sync::Arc;

use anyhow::Context;
use anyhow::{anyhow, Context};
use apalis::{
prelude::{Data, Job, Monitor, Storage, WorkerFactoryFn},
sqlite::{SqlitePool, SqliteStorage},
prelude::{Data, Job, MemoryStorage, MessageQueue, Monitor, WorkerFactoryFn},
utils::TokioExecutor,
};
use apalis_sql::Config;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tabby_common::config::{ConfigAccess, RepositoryConfig};
Expand Down Expand Up @@ -62,7 +60,7 @@ impl SchedulerJob {
async fn cron(
_now: DateTime<Utc>,
config_access: Data<Arc<dyn ConfigAccess>>,
storage: Data<SqliteStorage<SchedulerJob>>,
storage: Data<MemoryStorage<SchedulerJob>>,
) -> tabby_schema::Result<()> {
let repositories = config_access
.repositories()
Expand All @@ -72,26 +70,22 @@ impl SchedulerJob {
let mut code = CodeIndexer::default();
code.garbage_collection(&repositories);

let mut storage = (*storage).clone();

for repository in repositories {
storage
.push(SchedulerJob::new(repository))
.enqueue(SchedulerJob::new(repository))
.await
.context("unable to push job")?;
.map_err(|_| anyhow!("Failed to enqueue scheduler job"))?;
}
Ok(())
}

pub fn register(
monitor: Monitor<TokioExecutor>,
pool: SqlitePool,
db: DbConn,
config: Config,
config_access: Arc<dyn ConfigAccess>,
embedding: Arc<dyn Embedding>,
) -> (SqliteStorage<SchedulerJob>, Monitor<TokioExecutor>) {
let storage = SqliteStorage::new_with_config(pool, config);
) -> (MemoryStorage<SchedulerJob>, Monitor<TokioExecutor>) {
let storage = MemoryStorage::default();
let monitor = monitor
.register(
Self::basic_worker(storage.clone(), db.clone())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::sync::Arc;

use anyhow::anyhow;
use apalis::{
prelude::{Data, Job, Monitor, Storage, WorkerFactoryFn},
sqlite::{SqlitePool, SqliteStorage},
prelude::{Data, Job, MemoryStorage, MessageQueue, Monitor, WorkerFactoryFn},
utils::TokioExecutor,
};
use apalis_sql::Config;
use chrono::{DateTime, Utc};
use juniper::ID;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -45,33 +44,30 @@ impl SyncIntegrationJob {

async fn cron(
_now: DateTime<Utc>,
storage: Data<SqliteStorage<SyncIntegrationJob>>,
storage: Data<MemoryStorage<SyncIntegrationJob>>,
integration_service: Data<Arc<dyn IntegrationService>>,
) -> tabby_schema::Result<()> {
debug!("Syncing all third-party repositories");

let mut storage = (*storage).clone();
for integration in integration_service
.list_integrations(None, None, None, None, None, None)
.await?
{
storage
.push(SyncIntegrationJob::new(integration.id))
.enqueue(SyncIntegrationJob::new(integration.id))
.await
.expect("Unable to push job");
.map_err(|_| anyhow!("Failed to enqueue scheduler job"))?;
}
Ok(())
}

pub fn register(
monitor: Monitor<TokioExecutor>,
pool: SqlitePool,
db: DbConn,
config: Config,
repository_service: Arc<dyn ThirdPartyRepositoryService>,
integration_service: Arc<dyn IntegrationService>,
) -> (SqliteStorage<SyncIntegrationJob>, Monitor<TokioExecutor>) {
let storage = SqliteStorage::new_with_config(pool, config);
) -> (MemoryStorage<SyncIntegrationJob>, Monitor<TokioExecutor>) {
let storage = MemoryStorage::default();
let monitor = monitor
.register(
Self::basic_worker(storage.clone(), db.clone())
Expand Down

0 comments on commit 074dec1

Please sign in to comment.