Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(structured_doc): split pre-sync stage to optimize pull request indexing speed #3538

Merged
merged 7 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Fixed and Improvements
body: Refactors the pull request indexing process to enhance the speed of incremental indexing for pull docs.
time: 2024-12-12T09:57:38.860665+08:00
45 changes: 17 additions & 28 deletions crates/tabby-index/src/indexer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,13 @@ mod structured_doc_tests {
let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
let updated = indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await;
.presync(&StructuredDocState {
id: doc.id().to_string(),
updated_at,
deleted: false,
})
.await
&& indexer.sync(doc).await;
println!("{}", updated);
updated
});
Expand Down Expand Up @@ -119,14 +118,13 @@ mod structured_doc_tests {
let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
let updated = indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await;
.presync(&StructuredDocState {
id: doc.id().to_string(),
updated_at,
deleted: false,
})
.await
&& indexer.sync(doc).await;
println!("{}", updated);
updated
});
Expand Down Expand Up @@ -163,18 +161,9 @@ mod structured_doc_tests {
}),
};

let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await
});
let res = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { indexer.sync(doc).await });
assert!(res);
indexer.commit();

Expand Down
38 changes: 25 additions & 13 deletions crates/tabby-index/src/structured_doc/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
/// StructuredDocState tracks the state of the document source.
/// It helps determine whether the document should be updated or deleted.
pub struct StructuredDocState {
// id is the unique identifier of the document.
// It is used to track the document in the indexer.
pub id: String,

// updated_at is the time when the document was last updated.
// when the updated_at is earlier than the document's index time,
// the update will be skipped.
pub updated_at: DateTime<Utc>,

// deleted indicates whether the document should be removed from the indexer.
// For instance, a closed pull request will be marked as deleted,
// prompting the indexer to remove it from the index.
Expand All @@ -39,21 +44,34 @@
Self { indexer, builder }
}

// Runs pre-sync checks to determine if the document needs to be updated.
// Returns false if `sync` is not required to be called.
pub async fn presync(&self, state: &StructuredDocState) -> bool {
if state.deleted {
self.indexer.delete(&state.id);
return false;

Check warning on line 52 in crates/tabby-index/src/structured_doc/public.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-index/src/structured_doc/public.rs#L51-L52

Added lines #L51 - L52 were not covered by tests
}

if self.indexer.is_indexed_after(&state.id, state.updated_at)
&& !self.indexer.has_failed_chunks(&state.id)

Check warning on line 56 in crates/tabby-index/src/structured_doc/public.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-index/src/structured_doc/public.rs#L56

Added line #L56 was not covered by tests
{
return false;

Check warning on line 58 in crates/tabby-index/src/structured_doc/public.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-index/src/structured_doc/public.rs#L58

Added line #L58 was not covered by tests
};

true
}

// The sync process updates the document in the indexer incrementally.
// It first determines whether the document requires an update.
//
// If an update is needed, it checks the deletion state of the document.
// If the document is marked as deleted, it will be removed.
// Next, the document is rebuilt, the original is deleted, and the newly indexed document is added.
pub async fn sync(&self, state: StructuredDocState, document: StructuredDoc) -> bool {
if !self.require_updates(state.updated_at, &document) {
pub async fn sync(&self, document: StructuredDoc) -> bool {
if !self.require_updates(&document) {
return false;
}

if state.deleted {
return self.delete(document.id()).await;
}

stream! {
let (id, s) = self.builder.build(document).await;
self.indexer.delete(&id);
Expand All @@ -80,7 +98,7 @@
self.indexer.commit();
}

fn require_updates(&self, updated_at: DateTime<Utc>, document: &StructuredDoc) -> bool {
fn require_updates(&self, document: &StructuredDoc) -> bool {
if document.should_skip() {
return false;
}
Expand All @@ -89,12 +107,6 @@
return true;
}

if self.indexer.is_indexed_after(document.id(), updated_at)
&& !self.indexer.has_failed_chunks(document.id())
{
return false;
};

true
}

Expand Down
2 changes: 1 addition & 1 deletion ee/tabby-schema/src/schema/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,5 @@ pub trait IntegrationService: Send + Sync {
) -> Result<Vec<Integration>>;

async fn get_integration(&self, id: ID) -> Result<Integration>;
async fn update_integration_sync_status(&self, id: ID, error: Option<String>) -> Result<()>;
async fn update_integration_sync_status(&self, id: &ID, error: Option<String>) -> Result<()>;
}
2 changes: 1 addition & 1 deletion ee/tabby-schema/src/schema/repository/third_party.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub trait ThirdPartyRepositoryService: Send + Sync + RepositoryProvider {
last: Option<usize>,
) -> Result<Vec<ProvidedRepository>>;

async fn get_provided_repository(&self, id: ID) -> Result<ProvidedRepository>;
async fn get_provided_repository(&self, id: &ID) -> Result<ProvidedRepository>;

async fn update_repository_active(&self, id: ID, active: bool) -> Result<()>;
async fn upsert_repository(
Expand Down
139 changes: 107 additions & 32 deletions ee/tabby-webserver/src/service/background_job/third_party_integration.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::sync::Arc;

use anyhow::Result;
use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use issues::{list_github_issues, list_gitlab_issues};
use juniper::ID;
use pulls::list_github_pulls;
use pulls::{get_github_pull_doc, list_github_pull_states};
use serde::{Deserialize, Serialize};
use tabby_common::config::CodeRepository;
use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer, StructuredDocState};
Expand Down Expand Up @@ -90,7 +91,7 @@
integration_service: Arc<dyn IntegrationService>,
) -> tabby_schema::Result<()> {
let repository = repository_service
.get_provided_repository(self.repository_id)
.get_provided_repository(&self.repository_id)

Check warning on line 94 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L94

Added line #L94 was not covered by tests
.await?;
let integration = integration_service
.get_integration(repository.integration_id.clone())
Expand All @@ -116,50 +117,105 @@
"Indexing documents for repository {}",
repository.display_name
);
let index = StructuredDocIndexer::new(embedding);
let issue_stream = match fetch_all_issues(&integration, &repository).await {

self.sync_issues(
&integration,
integration_service.clone(),
&repository,
embedding.clone(),
)
.await?;

Check warning on line 127 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L121-L127

Added lines #L121 - L127 were not covered by tests

self.sync_pulls(&integration, integration_service, &repository, embedding)
.await?;

Check warning on line 130 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L129-L130

Added lines #L129 - L130 were not covered by tests

Ok(())
}

Check warning on line 133 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L132-L133

Added lines #L132 - L133 were not covered by tests

async fn sync_pulls(
&self,
integration: &Integration,
integration_service: Arc<dyn IntegrationService>,
repository: &ProvidedRepository,
embedding: Arc<dyn Embedding>,
) -> tabby_schema::Result<()> {
let mut pull_state_stream = match fetch_all_pull_states(integration, repository).await {

Check warning on line 142 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L135-L142

Added lines #L135 - L142 were not covered by tests
Ok(s) => s,
Err(e) => {
integration_service
.update_integration_sync_status(integration.id, Some(e.to_string()))
.update_integration_sync_status(&integration.id, Some(e.to_string()))

Check warning on line 146 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L146

Added line #L146 was not covered by tests
.await?;
logkit::error!("Failed to fetch issues: {}", e);
return Err(e);
logkit::error!("Failed to fetch pulls: {}", e);
return Ok(());

Check warning on line 149 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L148-L149

Added lines #L148 - L149 were not covered by tests
}
};

let pull_stream = match fetch_all_pulls(&integration, &repository).await {
Ok(s) => Some(s),
let mut count = 0;
let mut num_updated = 0;

let index = StructuredDocIndexer::new(embedding);
while let Some((pull, state)) = pull_state_stream.next().await {
count += 1;
if count % 100 == 0 {
logkit::info!(
"{} pull docs seen, {} pull docs updated",

Check warning on line 161 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L153-L161

Added lines #L153 - L161 were not covered by tests
count,
num_updated
);
}

Check warning on line 165 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L165

Added line #L165 was not covered by tests

if !index.presync(&state).await {
continue;
}

Check warning on line 169 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L167-L169

Added lines #L167 - L169 were not covered by tests

let pull_doc = fetch_pull_structured_doc(integration, repository, pull).await?;

Check warning on line 171 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L171

Added line #L171 was not covered by tests

index.sync(pull_doc).await;
num_updated += 1;

Check warning on line 174 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L173-L174

Added lines #L173 - L174 were not covered by tests
}
logkit::info!(
"{} pull docs seen, {} pull docs updated",

Check warning on line 177 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L176-L177

Added lines #L176 - L177 were not covered by tests
count,
num_updated
);
index.commit();

Ok(())
}

Check warning on line 184 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L181-L184

Added lines #L181 - L184 were not covered by tests

async fn sync_issues(
&self,
integration: &Integration,
integration_service: Arc<dyn IntegrationService>,
repository: &ProvidedRepository,
embedding: Arc<dyn Embedding>,
) -> tabby_schema::Result<()> {
let issue_stream = match fetch_all_issues(integration, repository).await {
Ok(s) => s,

Check warning on line 194 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L186-L194

Added lines #L186 - L194 were not covered by tests
Err(e) => {
integration_service
.update_integration_sync_status(integration.id, Some(e.to_string()))
.update_integration_sync_status(&integration.id, Some(e.to_string()))

Check warning on line 197 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L197

Added line #L197 was not covered by tests
.await?;
logkit::warn!("Failed to fetch pulls: {}", e);
None
logkit::error!("Failed to fetch issues: {}", e);
return Err(e);

Check warning on line 200 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L199-L200

Added lines #L199 - L200 were not covered by tests
}
};

let index = StructuredDocIndexer::new(embedding);

Check warning on line 204 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L204

Added line #L204 was not covered by tests
stream! {
let mut count = 0;
let mut num_updated = 0;
let combined_stream = if let Some(pull_stream) = pull_stream {
issue_stream.chain(pull_stream).boxed()
} else {
issue_stream.boxed()
};

for await (state, doc) in combined_stream {
if index.sync(state, doc).await {
num_updated += 1
for await (state, doc) in issue_stream {
if index.presync(&state).await && index.sync(doc).await {
num_updated += 1
}
count += 1;
if count % 100 == 0 {
logkit::info!("{} issue docs seen, {} issue docs updated", count, num_updated);
};

Check warning on line 215 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L208-L215

Added lines #L208 - L215 were not covered by tests
}

count += 1;
if count % 100 == 0 {
logkit::info!("{} docs seen, {} docs updated", count, num_updated);
};
}

logkit::info!("{} docs seen, {} docs updated", count, num_updated);
logkit::info!("{} issue docs seen, {} issue docs updated", count, num_updated);

Check warning on line 218 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L218

Added line #L218 was not covered by tests
index.commit();
}
.count()
Expand Down Expand Up @@ -212,13 +268,13 @@

Ok(s)
}
async fn fetch_all_pulls(

async fn fetch_all_pull_states(

Check warning on line 272 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L272

Added line #L272 was not covered by tests
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (StructuredDocState, StructuredDoc)>> {
) -> tabby_schema::Result<BoxStream<'static, (pulls::Pull, StructuredDocState)>> {

Check warning on line 275 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L275

Added line #L275 was not covered by tests
match &integration.kind {
IntegrationKind::Github | IntegrationKind::GithubSelfHosted => Ok(list_github_pulls(
&repository.source_id(),
IntegrationKind::Github | IntegrationKind::GithubSelfHosted => Ok(list_github_pull_states(

Check warning on line 277 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L277

Added line #L277 was not covered by tests
integration.api_base(),
&repository.display_name,
&integration.access_token,
Expand All @@ -230,3 +286,22 @@
)),
}
}

async fn fetch_pull_structured_doc(
integration: &Integration,
repository: &ProvidedRepository,
pull: pulls::Pull,
) -> Result<StructuredDoc> {
match pull {
pulls::Pull::GitHub(pull) => {
get_github_pull_doc(
&repository.source_id(),
pull,
integration.api_base(),
&repository.display_name,
&integration.access_token,
)
.await

Check warning on line 304 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L290-L304

Added lines #L290 - L304 were not covered by tests
}
}
}

Check warning on line 307 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L307

Added line #L307 was not covered by tests
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
})
};
yield (StructuredDocState {
id: doc.id().to_string(),

Check warning on line 70 in ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs#L70

Added line #L70 was not covered by tests
updated_at: issue.updated_at,
deleted: false,
}, doc);
Expand Down Expand Up @@ -133,6 +134,7 @@
closed: issue.state == "closed",
})};
yield (StructuredDocState {
id: doc.id().to_string(),

Check warning on line 137 in ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs#L137

Added line #L137 was not covered by tests
updated_at: issue.updated_at,
deleted: false,
}, doc);
Expand Down
Loading
Loading