Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(index): delete skipped documents for StructuredDoc #3463

Merged
merged 9 commits into from
Nov 25, 2024
2 changes: 1 addition & 1 deletion crates/tabby-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod public {
code::CodeIndexer,
structured_doc::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocIssueFields,
StructuredDocPullDocumentFields, StructuredDocWebFields,
StructuredDocPullDocumentFields, StructuredDocState, StructuredDocWebFields,
},
};

Expand Down
36 changes: 34 additions & 2 deletions crates/tabby-index/src/structured_doc/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ pub use super::types::{
use super::{create_structured_doc_builder, types::BuildStructuredDoc};
use crate::{indexer::TantivyDocBuilder, Indexer};

/// StructuredDocState tracks the state of the document source.
/// It helps determine whether the document should be updated or deleted.
pub struct StructuredDocState {
// updated_at is the time when the document was last updated.
wsxiaoys marked this conversation as resolved.
Show resolved Hide resolved
// when the updated_at is earlier than the document's index time,
// the update will be skipped.
pub updated_at: DateTime<Utc>,
// deleted indicates whether the document should be removed from the indexer.
// For instance, a closed pull request will be marked as deleted,
// prompting the indexer to remove it from the index.
pub deleted: bool,
}

pub struct StructuredDocIndexer {
builder: TantivyDocBuilder<StructuredDoc>,
indexer: Indexer,
Expand All @@ -26,11 +39,21 @@ impl StructuredDocIndexer {
Self { indexer, builder }
}

pub async fn add(&self, updated_at: DateTime<Utc>, document: StructuredDoc) -> bool {
if !self.require_updates(updated_at, &document) {
// The sync process updates the document in the indexer incrementally.
// It first determines whether the document requires an update.
//
// If an update is needed, it checks the deletion state of the document.
// If the document is marked as deleted, it will be removed.
// Next, the document is rebuilt, the original is deleted, and the newly indexed document is added.
pub async fn sync(&self, state: StructuredDocState, document: StructuredDoc) -> bool {
if !self.require_updates(state.updated_at, &document) {
return false;
}

if state.deleted {
return self.delete(document.id()).await;
}

stream! {
let (id, s) = self.builder.build(document).await;
self.indexer.delete(&id);
Expand All @@ -44,6 +67,15 @@ impl StructuredDocIndexer {
true
}

pub async fn delete(&self, id: &str) -> bool {
if self.indexer.is_indexed(id) {
self.indexer.delete(id);
true
} else {
false
}
}

pub fn commit(self) {
self.indexer.commit();
}
Expand Down
29 changes: 23 additions & 6 deletions crates/tabby-index/src/structured_doc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod structured_doc_tests {
use super::mock_embedding::MockEmbedding;
use crate::{
indexer::Indexer,
public::StructuredDocState,
structured_doc::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocIssueFields,
},
Expand Down Expand Up @@ -65,9 +66,17 @@ mod structured_doc_tests {

let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
let added = indexer.add(updated_at, doc).await;
println!("{}", added);
added
let updated = indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await;
println!("{}", updated);
updated
});
assert!(res);
indexer.commit();
Expand Down Expand Up @@ -109,9 +118,17 @@ mod structured_doc_tests {

let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
let added = indexer.add(updated_at, doc).await;
println!("{}", added);
added
let updated = indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await;
println!("{}", updated);
updated
});
assert!(res);
indexer.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use juniper::ID;
use pulls::list_github_pulls;
use serde::{Deserialize, Serialize};
use tabby_common::config::CodeRepository;
use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer};
use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer, StructuredDocState};
use tabby_inference::Embedding;
use tabby_schema::{
integration::{Integration, IntegrationKind, IntegrationService},
Expand Down Expand Up @@ -140,8 +140,8 @@ impl SchedulerGithubGitlabJob {
stream! {
let mut count = 0;
let mut num_updated = 0;
for await (updated_at, doc) in issue_stream.chain(pull_stream) {
if index.add(updated_at, doc).await {
for await (state, doc) in issue_stream.chain(pull_stream) {
if index.sync(state, doc).await {
num_updated += 1
}

Expand Down Expand Up @@ -182,8 +182,8 @@ impl SchedulerGithubGitlabJob {
async fn fetch_all_issues(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (DateTime<Utc>, StructuredDoc)>> {
let s: BoxStream<(DateTime<Utc>, StructuredDoc)> = match &integration.kind {
) -> tabby_schema::Result<BoxStream<'static, (StructuredDocState, StructuredDoc)>> {
let s: BoxStream<(StructuredDocState, StructuredDoc)> = match &integration.kind {
IntegrationKind::Github | IntegrationKind::GithubSelfHosted => list_github_issues(
&repository.source_id(),
integration.api_base(),
Expand All @@ -207,8 +207,8 @@ async fn fetch_all_issues(
async fn fetch_all_pulls(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (DateTime<Utc>, StructuredDoc)>> {
let s: BoxStream<(DateTime<Utc>, StructuredDoc)> = list_github_pulls(
) -> tabby_schema::Result<BoxStream<'static, (StructuredDocState, StructuredDoc)>> {
let s: BoxStream<(StructuredDocState, StructuredDoc)> = list_github_pulls(
&repository.source_id(),
integration.api_base(),
&repository.display_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use futures::Stream;
use gitlab::api::{issues::ProjectIssues, AsyncQuery};
use octocrab::Octocrab;
use serde::Deserialize;
use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocIssueFields};
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocIssueFields, StructuredDocState,
};

use crate::service::create_gitlab_client;

Expand All @@ -14,7 +16,7 @@ pub async fn list_github_issues(
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
Expand Down Expand Up @@ -62,7 +64,10 @@ pub async fn list_github_issues(
closed: issue.state == octocrab::models::IssueState::Closed,
})
};
yield (issue.updated_at, doc);
yield (StructuredDocState {
updated_at: issue.updated_at,
deleted: false,
}, doc);
}

page += 1;
Expand All @@ -89,7 +94,7 @@ pub async fn list_gitlab_issues(
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {
let gitlab = create_gitlab_client(api_base, access_token).await?;

let source_id = source_id.to_owned();
Expand Down Expand Up @@ -118,7 +123,10 @@ pub async fn list_gitlab_issues(
body: issue.description.unwrap_or_default(),
closed: issue.state == "closed",
})};
yield (issue.updated_at, doc);
yield (StructuredDocState {
updated_at: issue.updated_at,
deleted: false,
}, doc);
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use anyhow::{anyhow, Result};
use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::Stream;
use octocrab::{models::IssueState, Octocrab};
use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields};
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields, StructuredDocState,
};

pub async fn list_github_pulls(
source_id: &str,
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
Expand Down Expand Up @@ -43,14 +44,32 @@ pub async fn list_github_pulls(
let pages = response.number_of_pages().unwrap_or_default();

for pull in response.items {
let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url);
let title = pull.title.clone().unwrap_or_default();
let body = pull.body.clone().unwrap_or_default();
let doc = StructuredDoc {
source_id: source_id.to_string(),
fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields {
link: url.clone(),
title,
body,
merged: pull.merged_at.is_some(),
diff: String::new(),
}),
};

// skip closed but not merged pulls
if let Some(state) = pull.state {
if state == IssueState::Closed && pull.merged_at.is_none() {
continue
yield (StructuredDocState{
updated_at: pull.updated_at.unwrap(),
deleted: true,
}, doc);
continue;
}
}

let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url);

let diff = match octocrab.pulls(&owner, &repo).get_diff(pull.number).await {
Ok(x) if x.len() < 1024*1024*10 => x,
Ok(_) => {
Expand All @@ -71,10 +90,13 @@ pub async fn list_github_pulls(
body: pull.body.unwrap_or_default(),
diff,
merged: pull.merged_at.is_some(),
})
};
})};


yield (pull.updated_at.unwrap(), doc);
yield (StructuredDocState{
updated_at: pull.updated_at.unwrap(),
deleted: false,
}, doc);
}

page += 1;
Expand Down
13 changes: 11 additions & 2 deletions ee/tabby-webserver/src/service/background_job/web_crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tabby_crawler::crawl_pipeline;
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocWebFields,
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocState,
StructuredDocWebFields,
};
use tabby_inference::Embedding;

Expand Down Expand Up @@ -53,7 +54,15 @@ impl WebCrawlerJob {
};

num_docs += 1;
indexer.add(Utc::now(), source_doc).await;
indexer
.sync(
StructuredDocState {
updated_at: Utc::now(),
deleted: false,
},
source_doc,
)
.await;
}
logkit::info!("Crawled {} documents from '{}'", num_docs, self.url);
indexer.commit();
Expand Down
Loading