Skip to content

Commit

Permalink
feat(webserver): implement doc crawler job (#2431)
Browse files Browse the repository at this point in the history
* feat(webserver): implement doc crawler job

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
boxbeam and autofix-ci[bot] authored Jun 18, 2024
1 parent 3ef1bc4 commit b485d12
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 13 deletions.
11 changes: 8 additions & 3 deletions crates/tabby-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use doc::create_web_index;
pub use doc::{DocIndexer, WebDocument};
use futures::StreamExt;
use indexer::{IndexAttributeBuilder, Indexer};
use tabby_inference::Embedding;

mod doc;
use std::{env, sync::Arc};
Expand Down Expand Up @@ -83,10 +84,14 @@ async fn doc_index_pipeline(config: &tabby_common::config::Config) {
};

let embedding_config = &config.model.embedding;

debug!("Starting doc index pipeline...");
let embedding = llama_cpp_server::create_embedding(embedding_config).await;
for url in &index_config.start_urls {

crawl_index_docs(&index_config.start_urls, embedding).await;
}

pub async fn crawl_index_docs(urls: &[String], embedding: Arc<dyn Embedding>) {
for url in urls {
debug!("Starting doc index pipeline for {url}");
let embedding = embedding.clone();
stream! {
let mut num_docs = 0;
Expand Down
13 changes: 13 additions & 0 deletions ee/tabby-webserver/src/service/background_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod db;
mod git;
mod helper;
mod third_party_integration;
mod web_crawler;

use std::{str::FromStr, sync::Arc};

Expand All @@ -15,9 +16,11 @@ use tabby_inference::Embedding;
use tabby_schema::{
integration::IntegrationService,
repository::{GitRepositoryService, ThirdPartyRepositoryService},
web_crawler::WebCrawlerService,
};
use third_party_integration::SchedulerGithubGitlabJob;
use tracing::warn;
use web_crawler::WebCrawlerJob;

use self::{
db::DbMaintainanceJob, git::SchedulerGitJob, third_party_integration::SyncIntegrationJob,
Expand All @@ -28,13 +31,15 @@ pub enum BackgroundJobEvent {
SchedulerGitRepository(RepositoryConfig),
SchedulerGithubGitlabRepository(ID),
SyncThirdPartyRepositories(ID),
WebCrawler(String),
}

pub async fn start(
db: DbConn,
git_repository_service: Arc<dyn GitRepositoryService>,
third_party_repository_service: Arc<dyn ThirdPartyRepositoryService>,
integration_service: Arc<dyn IntegrationService>,
web_crawler_service: Arc<dyn WebCrawlerService>,
embedding: Arc<dyn Embedding>,
sender: tokio::sync::mpsc::UnboundedSender<BackgroundJobEvent>,
mut receiver: tokio::sync::mpsc::UnboundedReceiver<BackgroundJobEvent>,
Expand Down Expand Up @@ -71,6 +76,10 @@ pub async fn start(
warn!("Index issues job failed: {err:?}");
}
}
BackgroundJobEvent::WebCrawler(url) => {
let job = WebCrawlerJob::new(url);
job.run(embedding.clone()).await;
}
}
},
Some(now) = hourly.next() => {
Expand All @@ -89,6 +98,10 @@ pub async fn start(
if let Err(err) = SchedulerGithubGitlabJob::cron(now, sender.clone(), third_party_repository_service.clone()).await {
warn!("Index issues job failed: {err:?}");
}

if let Err(err) = WebCrawlerJob::cron(now, sender.clone(), web_crawler_service.clone()).await {
warn!("Web crawler job failed: {err:?}");
}
},
else => {
warn!("Background job channel closed");
Expand Down
42 changes: 42 additions & 0 deletions ee/tabby-webserver/src/service/background_job/web_crawler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::sync::Arc;

use chrono::{DateTime, Utc};
use tabby_inference::Embedding;
use tabby_schema::{web_crawler::WebCrawlerService, Result};
use tokio::sync::mpsc::UnboundedSender;

use super::{helper::Job, BackgroundJobEvent};

pub struct WebCrawlerJob {
url: String,
}

impl Job for WebCrawlerJob {
const NAME: &'static str = "web_crawler";
}

impl WebCrawlerJob {
pub fn new(url: String) -> Self {
Self { url }
}

pub async fn run(self, embedding: Arc<dyn Embedding>) {
tabby_scheduler::crawl_index_docs(&[self.url], embedding).await;
}

pub async fn cron(
_now: DateTime<Utc>,
sender: UnboundedSender<BackgroundJobEvent>,
web_crawler_service: Arc<dyn WebCrawlerService>,
) -> Result<()> {
for url in web_crawler_service
.list_web_crawler_urls(None, None, None, None)
.await?
{
sender
.send(BackgroundJobEvent::WebCrawler(url.url))
.expect("Failed to enqueue web crawler job");
}
Ok(())
}
}
17 changes: 14 additions & 3 deletions ee/tabby-webserver/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod license;
pub mod repository;
mod setting;
mod user_event;
mod web_crawler;
pub mod web_crawler;

use std::sync::Arc;

Expand Down Expand Up @@ -71,6 +71,7 @@ impl ServerContext {
code: Arc<dyn CodeSearch>,
repository: Arc<dyn RepositoryService>,
integration: Arc<dyn IntegrationService>,
web_crawler: Arc<dyn WebCrawlerService>,
db_conn: DbConn,
is_chat_enabled_locally: bool,
) -> Self {
Expand All @@ -96,7 +97,7 @@ impl ServerContext {
license.clone(),
setting.clone(),
)),
web_crawler: Arc::new(web_crawler::create(db_conn.clone())),
web_crawler,
license,
repository,
integration,
Expand Down Expand Up @@ -257,11 +258,21 @@ pub async fn create_service_locator(
code: Arc<dyn CodeSearch>,
repository: Arc<dyn RepositoryService>,
integration: Arc<dyn IntegrationService>,
web_crawler: Arc<dyn WebCrawlerService>,
db: DbConn,
is_chat_enabled: bool,
) -> Arc<dyn ServiceLocator> {
Arc::new(ArcServerContext::new(
ServerContext::new(logger, code, repository, integration, db, is_chat_enabled).await,
ServerContext::new(
logger,
code,
repository,
integration,
web_crawler,
db,
is_chat_enabled,
)
.await,
))
}

Expand Down
13 changes: 9 additions & 4 deletions ee/tabby-webserver/src/service/web_crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ use tabby_schema::{
web_crawler::{WebCrawlerService, WebCrawlerUrl},
AsID, AsRowid, Result,
};
use tokio::sync::mpsc::UnboundedSender;

use super::graphql_pagination_to_filter;
use super::{background_job::BackgroundJobEvent, graphql_pagination_to_filter};

pub fn create(db: DbConn) -> impl WebCrawlerService {
WebCrawlerServiceImpl { db }
pub fn create(db: DbConn, sender: UnboundedSender<BackgroundJobEvent>) -> impl WebCrawlerService {
WebCrawlerServiceImpl { db, sender }
}

struct WebCrawlerServiceImpl {
db: DbConn,
sender: UnboundedSender<BackgroundJobEvent>,
}

#[async_trait]
Expand All @@ -34,7 +36,10 @@ impl WebCrawlerService for WebCrawlerServiceImpl {
}

async fn create_web_crawler_url(&self, url: String) -> Result<ID> {
let id = self.db.create_web_crawler_url(url).await?;
let id = self.db.create_web_crawler_url(url.clone()).await?;

let _ = self.sender.send(BackgroundJobEvent::WebCrawler(url));

Ok(id.as_id())
}

Expand Down
16 changes: 13 additions & 3 deletions ee/tabby-webserver/src/webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ use tabby_common::{
};
use tabby_db::DbConn;
use tabby_inference::Embedding;
use tabby_schema::{integration::IntegrationService, repository::RepositoryService};
use tabby_schema::{
integration::IntegrationService, repository::RepositoryService, web_crawler::WebCrawlerService,
};

use crate::{
path::db_file,
routes,
service::{
background_job, background_job::BackgroundJobEvent, create_service_locator,
event_logger::create_event_logger, integration, repository,
background_job::{self, BackgroundJobEvent},
create_service_locator,
event_logger::create_event_logger,
integration, repository, web_crawler,
},
};

Expand All @@ -26,6 +30,7 @@ pub struct Webserver {
logger: Arc<dyn EventLogger>,
repository: Arc<dyn RepositoryService>,
integration: Arc<dyn IntegrationService>,
web_crawler: Arc<dyn WebCrawlerService>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -54,20 +59,24 @@ impl Webserver {
let integration = Arc::new(integration::create(db.clone(), sender.clone()));
let repository = repository::create(db.clone(), integration.clone(), sender.clone());

let web_crawler = Arc::new(web_crawler::create(db.clone(), sender.clone()));

let logger2 = create_event_logger(db.clone());
let logger = Arc::new(ComposedLogger::new(logger1, logger2));
let ws = Arc::new(Webserver {
db: db.clone(),
logger,
repository: repository.clone(),
integration: integration.clone(),
web_crawler: web_crawler.clone(),
});

background_job::start(
db.clone(),
repository.git(),
repository.third_party(),
integration.clone(),
web_crawler,
embedding,
sender,
receiver,
Expand All @@ -93,6 +102,7 @@ impl Webserver {
code,
self.repository.clone(),
self.integration.clone(),
self.web_crawler.clone(),
self.db.clone(),
is_chat_enabled,
)
Expand Down

0 comments on commit b485d12

Please sign in to comment.