Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(index): delete skipped documents for StructuredDoc #3463

Merged
merged 9 commits into from
Nov 25, 2024
2 changes: 1 addition & 1 deletion crates/tabby-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod public {
code::CodeIndexer,
structured_doc::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocIssueFields,
StructuredDocPullDocumentFields, StructuredDocWebFields,
StructuredDocPullDocumentFields, StructuredDocState, StructuredDocWebFields,
},
};

Expand Down
28 changes: 26 additions & 2 deletions crates/tabby-index/src/structured_doc/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@
use super::{create_structured_doc_builder, types::BuildStructuredDoc};
use crate::{indexer::TantivyDocBuilder, Indexer};

/// StructuredDocState is used to track the state of the document source.
/// It is used to determine whether the document should be updated or deleted.
pub struct StructuredDocState {
// updated_at is the time when the document was last updated.
wsxiaoys marked this conversation as resolved.
Show resolved Hide resolved
pub updated_at: DateTime<Utc>,
// deleted indecates whether the document should be deleted in indexer
wsxiaoys marked this conversation as resolved.
Show resolved Hide resolved
// for example, a closed pull request will be marked as deleted, and
// the indexer will remove it from the index.
pub deleted: bool,
}

pub struct StructuredDocIndexer {
builder: TantivyDocBuilder<StructuredDoc>,
indexer: Indexer,
Expand All @@ -26,11 +37,15 @@
Self { indexer, builder }
}

pub async fn add(&self, updated_at: DateTime<Utc>, document: StructuredDoc) -> bool {
if !self.require_updates(updated_at, &document) {
pub async fn sync(&self, state: StructuredDocState, document: StructuredDoc) -> bool {
if !self.require_updates(state.updated_at, &document) {
return false;
}

if state.deleted {
return self.delete(document.id()).await;

Check warning on line 46 in crates/tabby-index/src/structured_doc/public.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-index/src/structured_doc/public.rs#L46

Added line #L46 was not covered by tests
}

stream! {
let (id, s) = self.builder.build(document).await;
self.indexer.delete(&id);
Expand All @@ -44,6 +59,15 @@
true
}

pub async fn delete(&self, id: &str) -> bool {
if self.indexer.is_indexed(id) {
self.indexer.delete(id);
true

Check warning on line 65 in crates/tabby-index/src/structured_doc/public.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-index/src/structured_doc/public.rs#L62-L65

Added lines #L62 - L65 were not covered by tests
} else {
false

Check warning on line 67 in crates/tabby-index/src/structured_doc/public.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-index/src/structured_doc/public.rs#L67

Added line #L67 was not covered by tests
}
}

Check warning on line 69 in crates/tabby-index/src/structured_doc/public.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-index/src/structured_doc/public.rs#L69

Added line #L69 was not covered by tests

pub fn commit(self) {
self.indexer.commit();
}
Expand Down
29 changes: 23 additions & 6 deletions crates/tabby-index/src/structured_doc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod structured_doc_tests {
use super::mock_embedding::MockEmbedding;
use crate::{
indexer::Indexer,
public::StructuredDocState,
structured_doc::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocIssueFields,
},
Expand Down Expand Up @@ -65,9 +66,17 @@ mod structured_doc_tests {

let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
let added = indexer.add(updated_at, doc).await;
println!("{}", added);
added
let updated = indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await;
println!("{}", updated);
updated
});
assert!(res);
indexer.commit();
Expand Down Expand Up @@ -109,9 +118,17 @@ mod structured_doc_tests {

let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
let added = indexer.add(updated_at, doc).await;
println!("{}", added);
added
let updated = indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await;
println!("{}", updated);
updated
});
assert!(res);
indexer.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use pulls::list_github_pulls;
use serde::{Deserialize, Serialize};
use tabby_common::config::CodeRepository;
use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer};
use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer, StructuredDocState};
use tabby_inference::Embedding;
use tabby_schema::{
integration::{Integration, IntegrationKind, IntegrationService},
Expand Down Expand Up @@ -140,8 +140,8 @@
stream! {
let mut count = 0;
let mut num_updated = 0;
for await (updated_at, doc) in issue_stream.chain(pull_stream) {
if index.add(updated_at, doc).await {
for await (state, doc) in issue_stream.chain(pull_stream) {
if index.sync(state, doc).await {

Check warning on line 144 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L143-L144

Added lines #L143 - L144 were not covered by tests
num_updated += 1
}

Expand Down Expand Up @@ -182,8 +182,8 @@
async fn fetch_all_issues(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (DateTime<Utc>, StructuredDoc)>> {
let s: BoxStream<(DateTime<Utc>, StructuredDoc)> = match &integration.kind {
) -> tabby_schema::Result<BoxStream<'static, (StructuredDocState, StructuredDoc)>> {
let s: BoxStream<(StructuredDocState, StructuredDoc)> = match &integration.kind {

Check warning on line 186 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L185-L186

Added lines #L185 - L186 were not covered by tests
IntegrationKind::Github | IntegrationKind::GithubSelfHosted => list_github_issues(
&repository.source_id(),
integration.api_base(),
Expand All @@ -207,8 +207,8 @@
async fn fetch_all_pulls(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (DateTime<Utc>, StructuredDoc)>> {
let s: BoxStream<(DateTime<Utc>, StructuredDoc)> = list_github_pulls(
) -> tabby_schema::Result<BoxStream<'static, (StructuredDocState, StructuredDoc)>> {
let s: BoxStream<(StructuredDocState, StructuredDoc)> = list_github_pulls(

Check warning on line 211 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L210-L211

Added lines #L210 - L211 were not covered by tests
&repository.source_id(),
integration.api_base(),
&repository.display_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
use gitlab::api::{issues::ProjectIssues, AsyncQuery};
use octocrab::Octocrab;
use serde::Deserialize;
use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocIssueFields};
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocIssueFields, StructuredDocState,
};

use crate::service::create_gitlab_client;

Expand All @@ -14,7 +16,7 @@
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {

Check warning on line 19 in ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs#L19

Added line #L19 was not covered by tests
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
Expand Down Expand Up @@ -62,7 +64,10 @@
closed: issue.state == octocrab::models::IssueState::Closed,
})
};
yield (issue.updated_at, doc);
yield (StructuredDocState {
updated_at: issue.updated_at,
deleted: false,
}, doc);

Check warning on line 70 in ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs#L67-L70

Added lines #L67 - L70 were not covered by tests
}

page += 1;
Expand All @@ -89,7 +94,7 @@
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {

Check warning on line 97 in ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs#L97

Added line #L97 was not covered by tests
let gitlab = create_gitlab_client(api_base, access_token).await?;

let source_id = source_id.to_owned();
Expand Down Expand Up @@ -118,7 +123,10 @@
body: issue.description.unwrap_or_default(),
closed: issue.state == "closed",
})};
yield (issue.updated_at, doc);
yield (StructuredDocState {
updated_at: issue.updated_at,
deleted: false,
}, doc);

Check warning on line 129 in ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs#L126-L129

Added lines #L126 - L129 were not covered by tests
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use anyhow::{anyhow, Result};
use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::Stream;
use octocrab::{models::IssueState, Octocrab};
use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields};
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields, StructuredDocState,
};

pub async fn list_github_pulls(
source_id: &str,
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {

Check warning on line 14 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L14

Added line #L14 was not covered by tests
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
Expand Down Expand Up @@ -43,14 +44,32 @@
let pages = response.number_of_pages().unwrap_or_default();

for pull in response.items {
let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url);
let title = pull.title.clone().unwrap_or_default();
let body = pull.body.clone().unwrap_or_default();
let doc = StructuredDoc {
source_id: source_id.to_string(),
fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields {
link: url.clone(),
title,
body,
merged: pull.merged_at.is_some(),
diff: String::new(),
}),
};

Check warning on line 60 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L47-L60

Added lines #L47 - L60 were not covered by tests
// skip closed but not merged pulls
if let Some(state) = pull.state {
if state == IssueState::Closed && pull.merged_at.is_none() {
continue
yield (StructuredDocState{
updated_at: pull.updated_at.unwrap(),
deleted: true,
}, doc);
continue;

Check warning on line 68 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L64-L68

Added lines #L64 - L68 were not covered by tests
}
}

let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url);

Check warning on line 72 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L72

Added line #L72 was not covered by tests
let diff = match octocrab.pulls(&owner, &repo).get_diff(pull.number).await {
Ok(x) if x.len() < 1024*1024*10 => x,
Ok(_) => {
Expand All @@ -71,10 +90,13 @@
body: pull.body.unwrap_or_default(),
diff,
merged: pull.merged_at.is_some(),
})
};
})};

Check warning on line 94 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L93-L94

Added lines #L93 - L94 were not covered by tests

yield (pull.updated_at.unwrap(), doc);
yield (StructuredDocState{
updated_at: pull.updated_at.unwrap(),
deleted: false,
}, doc);

Check warning on line 99 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L96-L99

Added lines #L96 - L99 were not covered by tests
}

page += 1;
Expand Down
13 changes: 11 additions & 2 deletions ee/tabby-webserver/src/service/background_job/web_crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
use serde::{Deserialize, Serialize};
use tabby_crawler::crawl_pipeline;
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocWebFields,
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocState,
StructuredDocWebFields,
};
use tabby_inference::Embedding;

Expand Down Expand Up @@ -53,7 +54,15 @@
};

num_docs += 1;
indexer.add(Utc::now(), source_doc).await;
indexer
.sync(
StructuredDocState {
updated_at: Utc::now(),
deleted: false,
},
source_doc,
)
.await;

Check warning on line 65 in ee/tabby-webserver/src/service/background_job/web_crawler.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/web_crawler.rs#L57-L65

Added lines #L57 - L65 were not covered by tests
}
logkit::info!("Crawled {} documents from '{}'", num_docs, self.url);
indexer.commit();
Expand Down
Loading