Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(index): delete skipped documents for StructuredDoc #3463

Merged
merged 9 commits into from
Nov 25, 2024
2 changes: 1 addition & 1 deletion crates/tabby-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod public {
code::CodeIndexer,
structured_doc::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocIssueFields,
StructuredDocPullDocumentFields, StructuredDocWebFields,
StructuredDocPullDocumentFields, StructuredDocState, StructuredDocWebFields,
},
};

Expand Down
22 changes: 20 additions & 2 deletions crates/tabby-index/src/structured_doc/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ pub use super::types::{
use super::{create_structured_doc_builder, types::BuildStructuredDoc};
use crate::{indexer::TantivyDocBuilder, Indexer};

pub struct StructuredDocState {
pub updated_at: DateTime<Utc>,
pub deleted: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add field level comments to explain the behavior.

}

pub struct StructuredDocIndexer {
builder: TantivyDocBuilder<StructuredDoc>,
indexer: Indexer,
Expand All @@ -26,11 +31,15 @@ impl StructuredDocIndexer {
Self { indexer, builder }
}

pub async fn add(&self, updated_at: DateTime<Utc>, document: StructuredDoc) -> bool {
if !self.require_updates(updated_at, &document) {
pub async fn add(&self, state: StructuredDocState, document: StructuredDoc) -> bool {
if !self.require_updates(state.updated_at, &document) {
return false;
}

if state.deleted {
return self.delete(document.id()).await;
}

stream! {
let (id, s) = self.builder.build(document).await;
self.indexer.delete(&id);
Expand All @@ -44,6 +53,15 @@ impl StructuredDocIndexer {
true
}

pub async fn delete(&self, id: &str) -> bool {
if self.indexer.is_indexed(id) {
self.indexer.delete(id);
true
} else {
false
}
}

pub fn commit(self) {
self.indexer.commit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use juniper::ID;
use pulls::list_github_pulls;
use serde::{Deserialize, Serialize};
use tabby_common::config::CodeRepository;
use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer};
use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer, StructuredDocState};
use tabby_inference::Embedding;
use tabby_schema::{
integration::{Integration, IntegrationKind, IntegrationService},
Expand Down Expand Up @@ -140,8 +140,8 @@ impl SchedulerGithubGitlabJob {
stream! {
let mut count = 0;
let mut num_updated = 0;
for await (updated_at, doc) in issue_stream.chain(pull_stream) {
if index.add(updated_at, doc).await {
for await (state, doc) in issue_stream.chain(pull_stream) {
if index.add(state, doc).await {
num_updated += 1
}

Expand Down Expand Up @@ -182,8 +182,8 @@ impl SchedulerGithubGitlabJob {
async fn fetch_all_issues(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (DateTime<Utc>, StructuredDoc)>> {
let s: BoxStream<(DateTime<Utc>, StructuredDoc)> = match &integration.kind {
) -> tabby_schema::Result<BoxStream<'static, (StructuredDocState, StructuredDoc)>> {
let s: BoxStream<(StructuredDocState, StructuredDoc)> = match &integration.kind {
IntegrationKind::Github | IntegrationKind::GithubSelfHosted => list_github_issues(
&repository.source_id(),
integration.api_base(),
Expand All @@ -207,8 +207,8 @@ async fn fetch_all_issues(
async fn fetch_all_pulls(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (DateTime<Utc>, StructuredDoc)>> {
let s: BoxStream<(DateTime<Utc>, StructuredDoc)> = list_github_pulls(
) -> tabby_schema::Result<BoxStream<'static, (StructuredDocState, StructuredDoc)>> {
let s: BoxStream<(StructuredDocState, StructuredDoc)> = list_github_pulls(
&repository.source_id(),
integration.api_base(),
&repository.display_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use futures::Stream;
use gitlab::api::{issues::ProjectIssues, AsyncQuery};
use octocrab::Octocrab;
use serde::Deserialize;
use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocIssueFields};
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocIssueFields, StructuredDocState,
};

use crate::service::create_gitlab_client;

Expand All @@ -14,7 +16,7 @@ pub async fn list_github_issues(
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
Expand Down Expand Up @@ -62,7 +64,10 @@ pub async fn list_github_issues(
closed: issue.state == octocrab::models::IssueState::Closed,
})
};
yield (issue.updated_at, doc);
yield (StructuredDocState {
updated_at: issue.updated_at,
deleted: false,
}, doc);
}

page += 1;
Expand All @@ -89,7 +94,7 @@ pub async fn list_gitlab_issues(
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {
let gitlab = create_gitlab_client(api_base, access_token).await?;

let source_id = source_id.to_owned();
Expand Down Expand Up @@ -118,7 +123,10 @@ pub async fn list_gitlab_issues(
body: issue.description.unwrap_or_default(),
closed: issue.state == "closed",
})};
yield (issue.updated_at, doc);
yield (StructuredDocState {
updated_at: issue.updated_at,
deleted: false,
}, doc);
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use anyhow::{anyhow, Result};
use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::Stream;
use octocrab::{models::IssueState, Octocrab};
use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields};
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields, StructuredDocState,
};

pub async fn list_github_pulls(
source_id: &str,
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
Expand Down Expand Up @@ -43,14 +44,32 @@ pub async fn list_github_pulls(
let pages = response.number_of_pages().unwrap_or_default();

for pull in response.items {
let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url);
let title = pull.title.clone().unwrap_or_default();
let body = pull.body.clone().unwrap_or_default();
let doc = StructuredDoc {
source_id: source_id.to_string(),
fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields {
link: url.clone(),
title,
body,
merged: pull.merged_at.is_some(),
diff: String::new(),
}),
};

// skip closed but not merged pulls
if let Some(state) = pull.state {
if state == IssueState::Closed && pull.merged_at.is_none() {
continue
yield (StructuredDocState{
updated_at: pull.updated_at.unwrap(),
deleted: true,
}, doc);
continue;
}
}

let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url);

let diff = match octocrab.pulls(&owner, &repo).get_diff(pull.number).await {
Ok(x) if x.len() < 1024*1024*10 => x,
Ok(_) => {
Expand All @@ -71,10 +90,13 @@ pub async fn list_github_pulls(
body: pull.body.unwrap_or_default(),
diff,
merged: pull.merged_at.is_some(),
})
};
})};


yield (pull.updated_at.unwrap(), doc);
yield (StructuredDocState{
updated_at: pull.updated_at.unwrap(),
deleted: false,
}, doc);
}

page += 1;
Expand Down
13 changes: 11 additions & 2 deletions ee/tabby-webserver/src/service/background_job/web_crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tabby_crawler::crawl_pipeline;
use tabby_index::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocWebFields,
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocState,
StructuredDocWebFields,
};
use tabby_inference::Embedding;

Expand Down Expand Up @@ -53,7 +54,15 @@ impl WebCrawlerJob {
};

num_docs += 1;
indexer.add(Utc::now(), source_doc).await;
indexer
.add(
StructuredDocState {
updated_at: Utc::now(),
deleted: false,
},
source_doc,
)
.await;
}
logkit::info!("Crawled {} documents from '{}'", num_docs, self.url);
indexer.commit();
Expand Down
Loading