From 40402ae066d09202efa1d271cd3c50abbfa63efa Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Sun, 1 Dec 2024 01:25:11 +0800 Subject: [PATCH] support pre sync --- crates/tabby-index/src/indexer_tests.rs | 30 +++++++------- .../tabby-index/src/structured_doc/public.rs | 40 +++++++++---------- .../background_job/third_party_integration.rs | 2 +- .../third_party_integration/issues.rs | 2 + .../third_party_integration/pulls.rs | 2 + .../src/service/background_job/web_crawler.rs | 20 +++++----- 6 files changed, 50 insertions(+), 46 deletions(-) diff --git a/crates/tabby-index/src/indexer_tests.rs b/crates/tabby-index/src/indexer_tests.rs index 989825491bb5..3cc485f6fa31 100644 --- a/crates/tabby-index/src/indexer_tests.rs +++ b/crates/tabby-index/src/indexer_tests.rs @@ -71,14 +71,13 @@ mod structured_doc_tests { let updated_at = chrono::Utc::now(); let res = tokio::runtime::Runtime::new().unwrap().block_on(async { let updated = indexer - .sync( - StructuredDocState { - updated_at, - deleted: false, - }, - doc, - ) - .await; + .presync(StructuredDocState { + id: doc.id().to_string(), + updated_at, + deleted: false, + }) + .await + && indexer.sync(doc).await; println!("{}", updated); updated }); @@ -123,14 +122,13 @@ mod structured_doc_tests { let updated_at = chrono::Utc::now(); let res = tokio::runtime::Runtime::new().unwrap().block_on(async { let updated = indexer - .sync( - StructuredDocState { - updated_at, - deleted: false, - }, - doc, - ) - .await; + .presync(StructuredDocState { + id: doc.id().to_string(), + updated_at, + deleted: false, + }) + .await + && indexer.sync(doc).await; println!("{}", updated); updated }); diff --git a/crates/tabby-index/src/structured_doc/public.rs b/crates/tabby-index/src/structured_doc/public.rs index aca3b6296b5a..0b3e1aee0afa 100644 --- a/crates/tabby-index/src/structured_doc/public.rs +++ b/crates/tabby-index/src/structured_doc/public.rs @@ -25,6 +25,7 @@ pub struct StructuredDocState { // For instance, a closed pull request will be marked as deleted, // prompting the indexer to remove it from the index. pub deleted: bool, + pub id: String, } pub struct StructuredDocIndexer { @@ -39,21 +40,34 @@ impl StructuredDocIndexer { Self { indexer, builder } } + // Runs pre-sync checks to determine if the document needs to be updated. + // Returns false if `sync` is not required to be called. + pub async fn presync(&self, state: StructuredDocState) -> bool { + if state.deleted { + self.indexer.delete(&state.id); + return false; + } + + if self.indexer.is_indexed_after(&state.id, state.updated_at) + && !self.indexer.has_failed_chunks(&state.id) + { + return false; + }; + + true + } + // The sync process updates the document in the indexer incrementally. // It first determines whether the document requires an update. // // If an update is needed, it checks the deletion state of the document. // If the document is marked as deleted, it will be removed. // Next, the document is rebuilt, the original is deleted, and the newly indexed document is added. - pub async fn sync(&self, state: StructuredDocState, document: StructuredDoc) -> bool { - if !self.require_updates(state.updated_at, &document) { + pub async fn sync(&self, document: StructuredDoc) -> bool { + if document.should_skip() { return false; } - if state.deleted { - return self.delete(document.id()).await; - } - stream! { let (id, s) = self.builder.build(document).await; self.indexer.delete(&id); @@ -79,18 +93,4 @@ impl StructuredDocIndexer { pub fn commit(self) { self.indexer.commit(); } - - fn require_updates(&self, updated_at: DateTime, document: &StructuredDoc) -> bool { - if document.should_skip() { - return false; - } - - if self.indexer.is_indexed_after(document.id(), updated_at) - && !self.indexer.has_failed_chunks(document.id()) - { - return false; - }; - - true - } } diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs index 1a283a4b5365..115fff3f3bb5 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs @@ -142,7 +142,7 @@ impl SchedulerGithubGitlabJob { let mut count = 0; let mut num_updated = 0; for await (state, doc) in issue_stream.chain(pull_stream) { - if index.sync(state, doc).await { + if index.presync(state).await && index.sync(doc).await { num_updated += 1 } diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs index 6f8398a7a0b0..714cbb718bdf 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs @@ -66,6 +66,7 @@ pub async fn list_github_issues( }) }; yield (StructuredDocState { + id: doc.id().to_string(), updated_at: issue.updated_at, deleted: false, }, doc); @@ -125,6 +126,7 @@ pub async fn list_gitlab_issues( closed: issue.state == "closed", })}; yield (StructuredDocState { + id: doc.id().to_string(), updated_at: issue.updated_at, deleted: false, }, doc); diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs index 3eb5c5e44194..927191aae893 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs @@ -64,6 +64,7 @@ pub async fn list_github_pulls( if let Some(state) = pull.state { if state == IssueState::Closed && pull.merged_at.is_none() { yield (StructuredDocState{ + id: doc.id().to_string(), updated_at: pull.updated_at.unwrap(), deleted: true, }, doc); @@ -99,6 +100,7 @@ pub async fn list_github_pulls( yield (StructuredDocState{ + id: doc.id().to_string(), updated_at: pull.updated_at.unwrap(), deleted: false, }, doc); diff --git a/ee/tabby-webserver/src/service/background_job/web_crawler.rs b/ee/tabby-webserver/src/service/background_job/web_crawler.rs index da307f61e028..a0fdab9b11f8 100644 --- a/ee/tabby-webserver/src/service/background_job/web_crawler.rs +++ b/ee/tabby-webserver/src/service/background_job/web_crawler.rs @@ -54,15 +54,17 @@ impl WebCrawlerJob { }; num_docs += 1; - indexer - .sync( - StructuredDocState { - updated_at: Utc::now(), - deleted: false, - }, - source_doc, - ) - .await; + + if indexer + .presync(StructuredDocState { + id: source_doc.id().to_string(), + updated_at: Utc::now(), + deleted: false, + }) + .await + { + indexer.sync(source_doc).await; + } } logkit::info!("Crawled {} documents from '{}'", num_docs, self.url); indexer.commit();