diff --git a/crates/tabby-index/src/lib.rs b/crates/tabby-index/src/lib.rs index 063706427258..443c0b14fd8e 100644 --- a/crates/tabby-index/src/lib.rs +++ b/crates/tabby-index/src/lib.rs @@ -20,7 +20,7 @@ pub mod public { code::CodeIndexer, structured_doc::public::{ StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocIssueFields, - StructuredDocPullDocumentFields, StructuredDocWebFields, + StructuredDocPullDocumentFields, StructuredDocState, StructuredDocWebFields, }, }; diff --git a/crates/tabby-index/src/structured_doc/public.rs b/crates/tabby-index/src/structured_doc/public.rs index 0832d48c8bb9..aca3b6296b5a 100644 --- a/crates/tabby-index/src/structured_doc/public.rs +++ b/crates/tabby-index/src/structured_doc/public.rs @@ -14,6 +14,19 @@ pub use super::types::{ use super::{create_structured_doc_builder, types::BuildStructuredDoc}; use crate::{indexer::TantivyDocBuilder, Indexer}; +/// StructuredDocState tracks the state of the document source. +/// It helps determine whether the document should be updated or deleted. +pub struct StructuredDocState { + // updated_at is the time when the document was last updated. + // when the updated_at is earlier than the document's index time, + // the update will be skipped. + pub updated_at: DateTime, + // deleted indicates whether the document should be removed from the indexer. + // For instance, a closed pull request will be marked as deleted, + // prompting the indexer to remove it from the index. + pub deleted: bool, +} + pub struct StructuredDocIndexer { builder: TantivyDocBuilder, indexer: Indexer, @@ -26,11 +39,21 @@ impl StructuredDocIndexer { Self { indexer, builder } } - pub async fn add(&self, updated_at: DateTime, document: StructuredDoc) -> bool { - if !self.require_updates(updated_at, &document) { + // The sync process updates the document in the indexer incrementally. + // It first determines whether the document requires an update. + // + // If an update is needed, it checks the deletion state of the document. + // If the document is marked as deleted, it will be removed. + // Next, the document is rebuilt, the original is deleted, and the newly indexed document is added. + pub async fn sync(&self, state: StructuredDocState, document: StructuredDoc) -> bool { + if !self.require_updates(state.updated_at, &document) { return false; } + if state.deleted { + return self.delete(document.id()).await; + } + stream! { let (id, s) = self.builder.build(document).await; self.indexer.delete(&id); @@ -44,6 +67,15 @@ impl StructuredDocIndexer { true } + pub async fn delete(&self, id: &str) -> bool { + if self.indexer.is_indexed(id) { + self.indexer.delete(id); + true + } else { + false + } + } + pub fn commit(self) { self.indexer.commit(); } diff --git a/crates/tabby-index/src/structured_doc_tests.rs b/crates/tabby-index/src/structured_doc_tests.rs index 0ac8fe9fdabf..9e1bfb1e3d53 100644 --- a/crates/tabby-index/src/structured_doc_tests.rs +++ b/crates/tabby-index/src/structured_doc_tests.rs @@ -35,6 +35,7 @@ mod structured_doc_tests { use super::mock_embedding::MockEmbedding; use crate::{ indexer::Indexer, + public::StructuredDocState, structured_doc::public::{ StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocIssueFields, }, @@ -65,9 +66,17 @@ mod structured_doc_tests { let updated_at = chrono::Utc::now(); let res = tokio::runtime::Runtime::new().unwrap().block_on(async { - let added = indexer.add(updated_at, doc).await; - println!("{}", added); - added + let updated = indexer + .sync( + StructuredDocState { + updated_at, + deleted: false, + }, + doc, + ) + .await; + println!("{}", updated); + updated }); assert!(res); indexer.commit(); @@ -109,9 +118,17 @@ mod structured_doc_tests { let updated_at = chrono::Utc::now(); let res = tokio::runtime::Runtime::new().unwrap().block_on(async { - let added = indexer.add(updated_at, doc).await; - println!("{}", added); - added + let updated = indexer + .sync( + StructuredDocState { + updated_at, + deleted: false, + }, + doc, + ) + .await; + println!("{}", updated); + updated }); assert!(res); indexer.commit(); diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs index 2bdd85244f38..03c984fa3d45 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs @@ -8,7 +8,7 @@ use juniper::ID; use pulls::list_github_pulls; use serde::{Deserialize, Serialize}; use tabby_common::config::CodeRepository; -use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer}; +use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer, StructuredDocState}; use tabby_inference::Embedding; use tabby_schema::{ integration::{Integration, IntegrationKind, IntegrationService}, @@ -140,8 +140,8 @@ impl SchedulerGithubGitlabJob { stream! { let mut count = 0; let mut num_updated = 0; - for await (updated_at, doc) in issue_stream.chain(pull_stream) { - if index.add(updated_at, doc).await { + for await (state, doc) in issue_stream.chain(pull_stream) { + if index.sync(state, doc).await { num_updated += 1 } @@ -182,8 +182,8 @@ impl SchedulerGithubGitlabJob { async fn fetch_all_issues( integration: &Integration, repository: &ProvidedRepository, -) -> tabby_schema::Result, StructuredDoc)>> { - let s: BoxStream<(DateTime, StructuredDoc)> = match &integration.kind { +) -> tabby_schema::Result> { + let s: BoxStream<(StructuredDocState, StructuredDoc)> = match &integration.kind { IntegrationKind::Github | IntegrationKind::GithubSelfHosted => list_github_issues( &repository.source_id(), integration.api_base(), @@ -207,8 +207,8 @@ async fn fetch_all_issues( async fn fetch_all_pulls( integration: &Integration, repository: &ProvidedRepository, -) -> tabby_schema::Result, StructuredDoc)>> { - let s: BoxStream<(DateTime, StructuredDoc)> = list_github_pulls( +) -> tabby_schema::Result> { + let s: BoxStream<(StructuredDocState, StructuredDoc)> = list_github_pulls( &repository.source_id(), integration.api_base(), &repository.display_name, diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs index d718d630e78e..ecf9d5fbbb92 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs @@ -5,7 +5,9 @@ use futures::Stream; use gitlab::api::{issues::ProjectIssues, AsyncQuery}; use octocrab::Octocrab; use serde::Deserialize; -use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocIssueFields}; +use tabby_index::public::{ + StructuredDoc, StructuredDocFields, StructuredDocIssueFields, StructuredDocState, +}; use crate::service::create_gitlab_client; @@ -14,7 +16,7 @@ pub async fn list_github_issues( api_base: &str, full_name: &str, access_token: &str, -) -> Result, StructuredDoc)>> { +) -> Result> { let octocrab = Octocrab::builder() .personal_token(access_token.to_string()) .base_uri(api_base)? @@ -62,7 +64,10 @@ pub async fn list_github_issues( closed: issue.state == octocrab::models::IssueState::Closed, }) }; - yield (issue.updated_at, doc); + yield (StructuredDocState { + updated_at: issue.updated_at, + deleted: false, + }, doc); } page += 1; @@ -89,7 +94,7 @@ pub async fn list_gitlab_issues( api_base: &str, full_name: &str, access_token: &str, -) -> Result, StructuredDoc)>> { +) -> Result> { let gitlab = create_gitlab_client(api_base, access_token).await?; let source_id = source_id.to_owned(); @@ -118,7 +123,10 @@ pub async fn list_gitlab_issues( body: issue.description.unwrap_or_default(), closed: issue.state == "closed", })}; - yield (issue.updated_at, doc); + yield (StructuredDocState { + updated_at: issue.updated_at, + deleted: false, + }, doc); } }; diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs index 7559773bf885..3101497cb0e6 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs @@ -1,16 +1,17 @@ use anyhow::{anyhow, Result}; use async_stream::stream; -use chrono::{DateTime, Utc}; use futures::Stream; use octocrab::{models::IssueState, Octocrab}; -use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields}; +use tabby_index::public::{ + StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields, StructuredDocState, +}; pub async fn list_github_pulls( source_id: &str, api_base: &str, full_name: &str, access_token: &str, -) -> Result, StructuredDoc)>> { +) -> Result> { let octocrab = Octocrab::builder() .personal_token(access_token.to_string()) .base_uri(api_base)? @@ -43,14 +44,32 @@ pub async fn list_github_pulls( let pages = response.number_of_pages().unwrap_or_default(); for pull in response.items { + let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url); + let title = pull.title.clone().unwrap_or_default(); + let body = pull.body.clone().unwrap_or_default(); + let doc = StructuredDoc { + source_id: source_id.to_string(), + fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields { + link: url.clone(), + title, + body, + merged: pull.merged_at.is_some(), + diff: String::new(), + }), + }; + // skip closed but not merged pulls if let Some(state) = pull.state { if state == IssueState::Closed && pull.merged_at.is_none() { - continue + yield (StructuredDocState{ + updated_at: pull.updated_at.unwrap(), + deleted: true, + }, doc); + continue; } } - let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url); + let diff = match octocrab.pulls(&owner, &repo).get_diff(pull.number).await { Ok(x) if x.len() < 1024*1024*10 => x, Ok(_) => { @@ -71,10 +90,13 @@ pub async fn list_github_pulls( body: pull.body.unwrap_or_default(), diff, merged: pull.merged_at.is_some(), - }) - }; + })}; + - yield (pull.updated_at.unwrap(), doc); + yield (StructuredDocState{ + updated_at: pull.updated_at.unwrap(), + deleted: false, + }, doc); } page += 1; diff --git a/ee/tabby-webserver/src/service/background_job/web_crawler.rs b/ee/tabby-webserver/src/service/background_job/web_crawler.rs index 8d4450310309..da307f61e028 100644 --- a/ee/tabby-webserver/src/service/background_job/web_crawler.rs +++ b/ee/tabby-webserver/src/service/background_job/web_crawler.rs @@ -5,7 +5,8 @@ use futures::StreamExt; use serde::{Deserialize, Serialize}; use tabby_crawler::crawl_pipeline; use tabby_index::public::{ - StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocWebFields, + StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocState, + StructuredDocWebFields, }; use tabby_inference::Embedding; @@ -53,7 +54,15 @@ impl WebCrawlerJob { }; num_docs += 1; - indexer.add(Utc::now(), source_doc).await; + indexer + .sync( + StructuredDocState { + updated_at: Utc::now(), + deleted: false, + }, + source_doc, + ) + .await; } logkit::info!("Crawled {} documents from '{}'", num_docs, self.url); indexer.commit();