Skip to content

Commit

Permalink
fix(index): delete skipped documents for StructuredDoc (#3463)
Browse files Browse the repository at this point in the history
* chore: delete index when pr closed

* [autofix.ci] apply automated fixes

* chore: not count as deleted when not existed

* chore: fix duplicated add

* chore: use structuredDocState

* chore: rename index add to sync

* doc(structured_doc): add description to structuredDocState

* chore: deleted accidently added file

* doc(structured_doc): add sync logic comments

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
zwpaper and autofix-ci[bot] authored Nov 25, 2024
1 parent 8bcaf05 commit bf8c5d7
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 31 deletions.
2 changes: 1 addition & 1 deletion crates/tabby-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod public {
code::CodeIndexer,
structured_doc::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocIssueFields,
StructuredDocPullDocumentFields, StructuredDocWebFields,
StructuredDocPullDocumentFields, StructuredDocState, StructuredDocWebFields,
},
};

Expand Down
36 changes: 34 additions & 2 deletions crates/tabby-index/src/structured_doc/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ pub use super::types::{
use super::{create_structured_doc_builder, types::BuildStructuredDoc};
use crate::{indexer::TantivyDocBuilder, Indexer};

/// StructuredDocState tracks the state of the document source.
/// It helps determine whether the document should be updated or deleted.
pub struct StructuredDocState {
// updated_at is the time when the document was last updated.
// when the updated_at is earlier than the document's index time,
// the update will be skipped.
pub updated_at: DateTime<Utc>,
// deleted indicates whether the document should be removed from the indexer.
// For instance, a closed pull request will be marked as deleted,
// prompting the indexer to remove it from the index.
pub deleted: bool,
}

pub struct StructuredDocIndexer {
builder: TantivyDocBuilder<StructuredDoc>,
indexer: Indexer,
Expand All @@ -26,11 +39,21 @@ impl StructuredDocIndexer {
Self { indexer, builder }
}

pub async fn add(&self, updated_at: DateTime<Utc>, document: StructuredDoc) -> bool {
if !self.require_updates(updated_at, &document) {
// The sync process updates the document in the indexer incrementally.
// It first determines whether the document requires an update.
//
// If an update is needed, it checks the deletion state of the document.
// If the document is marked as deleted, it will be removed.
// Next, the document is rebuilt, the original is deleted, and the newly indexed document is added.
pub async fn sync(&self, state: StructuredDocState, document: StructuredDoc) -> bool {
if !self.require_updates(state.updated_at, &document) {
return false;
}

if state.deleted {
return self.delete(document.id()).await;
}

stream! {
let (id, s) = self.builder.build(document).await;
self.indexer.delete(&id);
Expand All @@ -44,6 +67,15 @@ impl StructuredDocIndexer {
true
}

pub async fn delete(&self, id: &str) -> bool {
if self.indexer.is_indexed(id) {
self.indexer.delete(id);
true
} else {
false
}
}

pub fn commit(self) {
self.indexer.commit();
}
Expand Down
29 changes: 23 additions & 6 deletions crates/tabby-index/src/structured_doc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod structured_doc_tests {
use super::mock_embedding::MockEmbedding;
use crate::{
indexer::Indexer,
public::StructuredDocState,
structured_doc::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocIssueFields,
},
Expand Down Expand Up @@ -65,9 +66,17 @@ mod structured_doc_tests {

let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
let added = indexer.add(updated_at, doc).await;
println!("{}", added);
added
let updated = indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await;
println!("{}", updated);
updated
});
assert!(res);
indexer.commit();
Expand Down Expand Up @@ -109,9 +118,17 @@ mod structured_doc_tests {

let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
let added = indexer.add(updated_at, doc).await;
println!("{}", added);
added
let updated = indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await;
println!("{}", updated);
updated
});
assert!(res);
indexer.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use juniper::ID;
use pulls::list_github_pulls;
use serde::{Deserialize, Serialize};
use tabby_common::config::CodeRepository;
use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer};
use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer, StructuredDocState};
use tabby_inference::Embedding;
use tabby_schema::{
integration::{Integration, IntegrationKind, IntegrationService},
Expand Down Expand Up @@ -140,8 +140,8 @@ impl SchedulerGithubGitlabJob {
stream! {
let mut count = 0;
let mut num_updated = 0;
for await (updated_at, doc) in issue_stream.chain(pull_stream) {
if index.add(updated_at, doc).await {
for await (state, doc) in issue_stream.chain(pull_stream) {
if index.sync(state, doc).await {
num_updated += 1
}

Expand Down Expand Up @@ -182,8 +182,8 @@ impl SchedulerGithubGitlabJob {
async fn fetch_all_issues(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (DateTime<Utc>, StructuredDoc)>> {
let s: BoxStream<(DateTime<Utc>, StructuredDoc)> = match &integration.kind {
) -> tabby_schema::Result<BoxStream<'static, (StructuredDocState, StructuredDoc)>> {
let s: BoxStream<(StructuredDocState, StructuredDoc)> = match &integration.kind {
IntegrationKind::Github | IntegrationKind::GithubSelfHosted => list_github_issues(
&repository.source_id(),
integration.api_base(),
Expand All @@ -207,8 +207,8 @@ async fn fetch_all_issues(
async fn fetch_all_pulls(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (DateTime<Utc>, StructuredDoc)>> {
let s: BoxStream<(DateTime<Utc>, StructuredDoc)> = list_github_pulls(
) -> tabby_schema::Result<BoxStream<'static, (StructuredDocState, StructuredDoc)>> {
let s: BoxStream<(StructuredDocState, StructuredDoc)> = list_github_pulls(
&repository.source_id(),
integration.api_base(),
&repository.display_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use futures::Stream;
use gitlab::api::{issues::ProjectIssues, AsyncQuery};
use octocrab::Octocrab;
use serde::Deserialize;
use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocIssueFields};
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocIssueFields, StructuredDocState,
};

use crate::service::create_gitlab_client;

Expand All @@ -14,7 +16,7 @@ pub async fn list_github_issues(
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
Expand Down Expand Up @@ -62,7 +64,10 @@ pub async fn list_github_issues(
closed: issue.state == octocrab::models::IssueState::Closed,
})
};
yield (issue.updated_at, doc);
yield (StructuredDocState {
updated_at: issue.updated_at,
deleted: false,
}, doc);
}

page += 1;
Expand All @@ -89,7 +94,7 @@ pub async fn list_gitlab_issues(
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {
let gitlab = create_gitlab_client(api_base, access_token).await?;

let source_id = source_id.to_owned();
Expand Down Expand Up @@ -118,7 +123,10 @@ pub async fn list_gitlab_issues(
body: issue.description.unwrap_or_default(),
closed: issue.state == "closed",
})};
yield (issue.updated_at, doc);
yield (StructuredDocState {
updated_at: issue.updated_at,
deleted: false,
}, doc);
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use anyhow::{anyhow, Result};
use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::Stream;
use octocrab::{models::IssueState, Octocrab};
use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields};
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields, StructuredDocState,
};

pub async fn list_github_pulls(
source_id: &str,
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
Expand Down Expand Up @@ -43,14 +44,32 @@ pub async fn list_github_pulls(
let pages = response.number_of_pages().unwrap_or_default();

for pull in response.items {
let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url);
let title = pull.title.clone().unwrap_or_default();
let body = pull.body.clone().unwrap_or_default();
let doc = StructuredDoc {
source_id: source_id.to_string(),
fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields {
link: url.clone(),
title,
body,
merged: pull.merged_at.is_some(),
diff: String::new(),
}),
};

// skip closed but not merged pulls
if let Some(state) = pull.state {
if state == IssueState::Closed && pull.merged_at.is_none() {
continue
yield (StructuredDocState{
updated_at: pull.updated_at.unwrap(),
deleted: true,
}, doc);
continue;
}
}

let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url);

let diff = match octocrab.pulls(&owner, &repo).get_diff(pull.number).await {
Ok(x) if x.len() < 1024*1024*10 => x,
Ok(_) => {
Expand All @@ -71,10 +90,13 @@ pub async fn list_github_pulls(
body: pull.body.unwrap_or_default(),
diff,
merged: pull.merged_at.is_some(),
})
};
})};


yield (pull.updated_at.unwrap(), doc);
yield (StructuredDocState{
updated_at: pull.updated_at.unwrap(),
deleted: false,
}, doc);
}

page += 1;
Expand Down
13 changes: 11 additions & 2 deletions ee/tabby-webserver/src/service/background_job/web_crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tabby_crawler::crawl_pipeline;
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocWebFields,
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocState,
StructuredDocWebFields,
};
use tabby_inference::Embedding;

Expand Down Expand Up @@ -53,7 +54,15 @@ impl WebCrawlerJob {
};

num_docs += 1;
indexer.add(Utc::now(), source_doc).await;
indexer
.sync(
StructuredDocState {
updated_at: Utc::now(),
deleted: false,
},
source_doc,
)
.await;
}
logkit::info!("Crawled {} documents from '{}'", num_docs, self.url);
indexer.commit();
Expand Down

0 comments on commit bf8c5d7

Please sign in to comment.