diff --git a/crates/tabby-scheduler/src/lib.rs b/crates/tabby-scheduler/src/lib.rs index e9c313b67727..0eea06b9533c 100644 --- a/crates/tabby-scheduler/src/lib.rs +++ b/crates/tabby-scheduler/src/lib.rs @@ -12,6 +12,7 @@ use doc::create_web_index; pub use doc::{DocIndexer, WebDocument}; use futures::StreamExt; use indexer::{IndexAttributeBuilder, Indexer}; +use tabby_inference::Embedding; mod doc; use std::{env, sync::Arc}; @@ -83,10 +84,14 @@ async fn doc_index_pipeline(config: &tabby_common::config::Config) { }; let embedding_config = &config.model.embedding; - - debug!("Starting doc index pipeline..."); let embedding = llama_cpp_server::create_embedding(embedding_config).await; - for url in &index_config.start_urls { + + crawl_index_docs(&index_config.start_urls, embedding).await; +} + +pub async fn crawl_index_docs(urls: &[String], embedding: Arc) { + for url in urls { + debug!("Starting doc index pipeline for {url}"); let embedding = embedding.clone(); stream! { let mut num_docs = 0; diff --git a/ee/tabby-webserver/src/service/background_job/mod.rs b/ee/tabby-webserver/src/service/background_job/mod.rs index fb7922169ac8..ad8bd0b0cf63 100644 --- a/ee/tabby-webserver/src/service/background_job/mod.rs +++ b/ee/tabby-webserver/src/service/background_job/mod.rs @@ -2,6 +2,7 @@ mod db; mod git; mod helper; mod third_party_integration; +mod web_crawler; use std::{str::FromStr, sync::Arc}; @@ -15,9 +16,11 @@ use tabby_inference::Embedding; use tabby_schema::{ integration::IntegrationService, repository::{GitRepositoryService, ThirdPartyRepositoryService}, + web_crawler::WebCrawlerService, }; use third_party_integration::SchedulerGithubGitlabJob; use tracing::warn; +use web_crawler::WebCrawlerJob; use self::{ db::DbMaintainanceJob, git::SchedulerGitJob, third_party_integration::SyncIntegrationJob, @@ -28,6 +31,7 @@ pub enum BackgroundJobEvent { SchedulerGitRepository(RepositoryConfig), SchedulerGithubGitlabRepository(ID), SyncThirdPartyRepositories(ID), + WebCrawler(String), } pub async fn start( @@ -35,6 +39,7 @@ pub async fn start( git_repository_service: Arc, third_party_repository_service: Arc, integration_service: Arc, + web_crawler_service: Arc, embedding: Arc, sender: tokio::sync::mpsc::UnboundedSender, mut receiver: tokio::sync::mpsc::UnboundedReceiver, @@ -71,6 +76,10 @@ pub async fn start( warn!("Index issues job failed: {err:?}"); } } + BackgroundJobEvent::WebCrawler(url) => { + let job = WebCrawlerJob::new(url); + job.run(embedding.clone()).await; + } } }, Some(now) = hourly.next() => { @@ -89,6 +98,10 @@ pub async fn start( if let Err(err) = SchedulerGithubGitlabJob::cron(now, sender.clone(), third_party_repository_service.clone()).await { warn!("Index issues job failed: {err:?}"); } + + if let Err(err) = WebCrawlerJob::cron(now, sender.clone(), web_crawler_service.clone()).await { + warn!("Web crawler job failed: {err:?}"); + } }, else => { warn!("Background job channel closed"); diff --git a/ee/tabby-webserver/src/service/background_job/web_crawler.rs b/ee/tabby-webserver/src/service/background_job/web_crawler.rs new file mode 100644 index 000000000000..0d07c5419297 --- /dev/null +++ b/ee/tabby-webserver/src/service/background_job/web_crawler.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use tabby_inference::Embedding; +use tabby_schema::{web_crawler::WebCrawlerService, Result}; +use tokio::sync::mpsc::UnboundedSender; + +use super::{helper::Job, BackgroundJobEvent}; + +pub struct WebCrawlerJob { + url: String, +} + +impl Job for WebCrawlerJob { + const NAME: &'static str = "web_crawler"; +} + +impl WebCrawlerJob { + pub fn new(url: String) -> Self { + Self { url } + } + + pub async fn run(self, embedding: Arc) { + tabby_scheduler::crawl_index_docs(&[self.url], embedding).await; + } + + pub async fn cron( + _now: DateTime, + sender: UnboundedSender, + web_crawler_service: Arc, + ) -> Result<()> { + for url in web_crawler_service + .list_web_crawler_urls(None, None, None, None) + .await? + { + sender + .send(BackgroundJobEvent::WebCrawler(url.url)) + .expect("Failed to enqueue web crawler job"); + } + Ok(()) + } +} diff --git a/ee/tabby-webserver/src/service/mod.rs b/ee/tabby-webserver/src/service/mod.rs index b38c4b650a63..9d8b4bb6dd13 100644 --- a/ee/tabby-webserver/src/service/mod.rs +++ b/ee/tabby-webserver/src/service/mod.rs @@ -9,7 +9,7 @@ mod license; pub mod repository; mod setting; mod user_event; -mod web_crawler; +pub mod web_crawler; use std::sync::Arc; @@ -71,6 +71,7 @@ impl ServerContext { code: Arc, repository: Arc, integration: Arc, + web_crawler: Arc, db_conn: DbConn, is_chat_enabled_locally: bool, ) -> Self { @@ -96,7 +97,7 @@ impl ServerContext { license.clone(), setting.clone(), )), - web_crawler: Arc::new(web_crawler::create(db_conn.clone())), + web_crawler, license, repository, integration, @@ -257,11 +258,21 @@ pub async fn create_service_locator( code: Arc, repository: Arc, integration: Arc, + web_crawler: Arc, db: DbConn, is_chat_enabled: bool, ) -> Arc { Arc::new(ArcServerContext::new( - ServerContext::new(logger, code, repository, integration, db, is_chat_enabled).await, + ServerContext::new( + logger, + code, + repository, + integration, + web_crawler, + db, + is_chat_enabled, + ) + .await, )) } diff --git a/ee/tabby-webserver/src/service/web_crawler.rs b/ee/tabby-webserver/src/service/web_crawler.rs index 879daf2c3af6..bc3d2c81c2b5 100644 --- a/ee/tabby-webserver/src/service/web_crawler.rs +++ b/ee/tabby-webserver/src/service/web_crawler.rs @@ -5,15 +5,17 @@ use tabby_schema::{ web_crawler::{WebCrawlerService, WebCrawlerUrl}, AsID, AsRowid, Result, }; +use tokio::sync::mpsc::UnboundedSender; -use super::graphql_pagination_to_filter; +use super::{background_job::BackgroundJobEvent, graphql_pagination_to_filter}; -pub fn create(db: DbConn) -> impl WebCrawlerService { - WebCrawlerServiceImpl { db } +pub fn create(db: DbConn, sender: UnboundedSender) -> impl WebCrawlerService { + WebCrawlerServiceImpl { db, sender } } struct WebCrawlerServiceImpl { db: DbConn, + sender: UnboundedSender, } #[async_trait] @@ -34,7 +36,10 @@ impl WebCrawlerService for WebCrawlerServiceImpl { } async fn create_web_crawler_url(&self, url: String) -> Result { - let id = self.db.create_web_crawler_url(url).await?; + let id = self.db.create_web_crawler_url(url.clone()).await?; + + let _ = self.sender.send(BackgroundJobEvent::WebCrawler(url)); + Ok(id.as_id()) } diff --git a/ee/tabby-webserver/src/webserver.rs b/ee/tabby-webserver/src/webserver.rs index 955bde6d086d..089700aac6ea 100644 --- a/ee/tabby-webserver/src/webserver.rs +++ b/ee/tabby-webserver/src/webserver.rs @@ -10,14 +10,18 @@ use tabby_common::{ }; use tabby_db::DbConn; use tabby_inference::Embedding; -use tabby_schema::{integration::IntegrationService, repository::RepositoryService}; +use tabby_schema::{ + integration::IntegrationService, repository::RepositoryService, web_crawler::WebCrawlerService, +}; use crate::{ path::db_file, routes, service::{ - background_job, background_job::BackgroundJobEvent, create_service_locator, - event_logger::create_event_logger, integration, repository, + background_job::{self, BackgroundJobEvent}, + create_service_locator, + event_logger::create_event_logger, + integration, repository, web_crawler, }, }; @@ -26,6 +30,7 @@ pub struct Webserver { logger: Arc, repository: Arc, integration: Arc, + web_crawler: Arc, } #[async_trait::async_trait] @@ -54,6 +59,8 @@ impl Webserver { let integration = Arc::new(integration::create(db.clone(), sender.clone())); let repository = repository::create(db.clone(), integration.clone(), sender.clone()); + let web_crawler = Arc::new(web_crawler::create(db.clone(), sender.clone())); + let logger2 = create_event_logger(db.clone()); let logger = Arc::new(ComposedLogger::new(logger1, logger2)); let ws = Arc::new(Webserver { @@ -61,6 +68,7 @@ impl Webserver { logger, repository: repository.clone(), integration: integration.clone(), + web_crawler: web_crawler.clone(), }); background_job::start( @@ -68,6 +76,7 @@ impl Webserver { repository.git(), repository.third_party(), integration.clone(), + web_crawler, embedding, sender, receiver, @@ -93,6 +102,7 @@ impl Webserver { code, self.repository.clone(), self.integration.clone(), + self.web_crawler.clone(), self.db.clone(), is_chat_enabled, )