Skip to content

Commit

Permalink
fix: indexer return error when embed failed
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Zhang <[email protected]>
  • Loading branch information
zwpaper committed Nov 28, 2024
1 parent e87ec84 commit d75a30e
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 36 deletions.
31 changes: 23 additions & 8 deletions crates/tabby-index/src/code/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use anyhow::{bail, Result};
use async_stream::stream;
use async_trait::async_trait;
use futures::stream::BoxStream;
Expand Down Expand Up @@ -68,7 +69,7 @@ impl IndexAttributeBuilder<SourceCode> for CodeBuilder {
async fn build_chunk_attributes<'a>(
&self,
source_code: &'a SourceCode,
) -> BoxStream<'a, JoinHandle<(Vec<String>, serde_json::Value)>> {
) -> BoxStream<'a, JoinHandle<Result<(Vec<String>, serde_json::Value)>>> {
let text = match source_code.read_content() {
Ok(content) => content,
Err(e) => {
Expand All @@ -77,13 +78,22 @@ impl IndexAttributeBuilder<SourceCode> for CodeBuilder {
source_code.filepath, e
);

return Box::pin(futures::stream::empty());
return Box::pin(stream! {
let path = source_code.filepath.clone();
yield tokio::spawn(async move {
bail!("Failed to read content of '{}': {}", path, e);
});
});
}
};

let Some(embedding) = self.embedding.clone() else {
warn!("No embedding service found for code indexing");
return Box::pin(futures::stream::empty());
return Box::pin(stream! {
yield tokio::spawn(async move {
bail!("No embedding service found for code indexing");
});
});
};

let source_code = source_code.clone();
Expand All @@ -100,8 +110,10 @@ impl IndexAttributeBuilder<SourceCode> for CodeBuilder {
let embedding = embedding.clone();
let rewritten_body = format!("```{}\n{}\n```", source_code.filepath, body);
yield tokio::spawn(async move {
let tokens = build_binarize_embedding_tokens(embedding.clone(), &rewritten_body).await;
(tokens, attributes)
match build_binarize_embedding_tokens(embedding.clone(), &rewritten_body).await {
Ok(tokens) => Ok((tokens, attributes)),
Err(err) => Err(err),
}
});
}
};
Expand All @@ -110,12 +122,15 @@ impl IndexAttributeBuilder<SourceCode> for CodeBuilder {
}
}

async fn build_binarize_embedding_tokens(embedding: Arc<dyn Embedding>, body: &str) -> Vec<String> {
async fn build_binarize_embedding_tokens(
embedding: Arc<dyn Embedding>,
body: &str,
) -> Result<Vec<String>> {
let embedding = match embedding.embed(body).await {
Ok(x) => x,
Err(err) => {
warn!("Failed to embed chunk text: {}", err);
return Vec::new();
bail!("Failed to embed chunk text: {}", err);
}
};

Expand All @@ -124,7 +139,7 @@ async fn build_binarize_embedding_tokens(embedding: Arc<dyn Embedding>, body: &s
tokens.push(token);
}

tokens
Ok(tokens)
}

pub fn create_code_builder(embedding: Option<Arc<dyn Embedding>>) -> TantivyDocBuilder<SourceCode> {
Expand Down
14 changes: 11 additions & 3 deletions crates/tabby-index/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;

use anyhow::bail;
use anyhow::{bail, Result};
use async_stream::stream;
use futures::{stream::BoxStream, Stream, StreamExt};
use serde_json::json;
Expand Down Expand Up @@ -43,7 +43,7 @@ pub trait IndexAttributeBuilder<T>: Send + Sync {
async fn build_chunk_attributes<'a>(
&self,
document: &'a T,
) -> BoxStream<'a, JoinHandle<(Vec<String>, serde_json::Value)>>;
) -> BoxStream<'a, JoinHandle<Result<(Vec<String>, serde_json::Value)>>>;
}

pub struct TantivyDocBuilder<T> {
Expand Down Expand Up @@ -132,10 +132,18 @@ impl<T: ToIndexId> TantivyDocBuilder<T> {
// the document, and
// a flag indicating whether the tokens were created successfully.
yield tokio::spawn(async move {
let Ok((tokens, chunk_attributes)) = task.await else {
let Ok(built_chunk_attributes_result) = task.await else {
// Join error, there is no attr, return None and false
return (None, false);
};

let (tokens, chunk_attributes) = match built_chunk_attributes_result{
Ok((tokens, chunk_attributes)) => (tokens, chunk_attributes),
Err(e) => {
warn!("Failed to build chunk attributes for document '{}': {}", id, e);
return (None, false);
}
};
let mut doc = doc! {
schema.field_id => id,
schema.field_source_id => source_id,
Expand Down
22 changes: 13 additions & 9 deletions crates/tabby-index/src/indexer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@ mod mock_embedding {

pub struct MockEmbedding {
result: Vec<f32>,
error: bool,
}

impl MockEmbedding {
pub fn new(result: Vec<f32>) -> Self {
Self { result }
pub fn new(result: Vec<f32>, error: bool) -> Self {
Self { result, error }
}
}

#[async_trait]
impl Embedding for MockEmbedding {
async fn embed(&self, prompt: &str) -> Result<Vec<f32>> {
if prompt.starts_with("error") {
Err(anyhow::anyhow!(prompt.to_owned()))
if self.error {
Err(anyhow::anyhow!(
"Mock error, prompt length {}",
prompt.len()
))
} else {
Ok(self.result.clone())
}
Expand Down Expand Up @@ -51,7 +55,7 @@ mod structured_doc_tests {
tabby_common::path::set_tabby_root(temp_dir.to_owned());

let id = "structured_doc_empty_embedding";
let embedding = MockEmbedding::new(vec![]);
let embedding = MockEmbedding::new(vec![], true);
let embedding = Arc::new(embedding);
let indexer = StructuredDocIndexer::new(embedding.clone());
let doc = StructuredDoc {
Expand Down Expand Up @@ -103,7 +107,7 @@ mod structured_doc_tests {
tabby_common::path::set_tabby_root(temp_dir.to_owned());

let id = "structured_doc_with_embedding";
let embedding = MockEmbedding::new(vec![1.0]);
let embedding = MockEmbedding::new(vec![1.0], false);
let embedding = Arc::new(embedding);
let indexer = StructuredDocIndexer::new(embedding.clone());
let doc = StructuredDoc {
Expand Down Expand Up @@ -179,7 +183,7 @@ mod builder_tests {
let origin_root = tabby_common::path::tabby_root();
tabby_common::path::set_tabby_root(get_tabby_root());

let embedding = MockEmbedding::new(vec![]);
let embedding = MockEmbedding::new(vec![], true);
let builder = Arc::new(create_code_builder(Some(Arc::new(embedding))));

let repo = get_repository_config();
Expand Down Expand Up @@ -226,7 +230,7 @@ mod builder_tests {
tabby_common::path::set_tabby_root(temp_dir.to_owned());

let test_id = "builder_empty_embedding";
let embedding = MockEmbedding::new(vec![]);
let embedding = MockEmbedding::new(vec![], true);
let builder = StructuredDocBuilder::new(Arc::new(embedding));
let tantivy_builder = TantivyDocBuilder::new(corpus::STRUCTURED_DOC, builder);

Expand Down Expand Up @@ -281,7 +285,7 @@ mod builder_tests {
tabby_common::path::set_tabby_root(temp_dir.to_owned());

let test_id = "builder_with_embedding";
let embedding = MockEmbedding::new(vec![1.0]);
let embedding = MockEmbedding::new(vec![1.0], false);
let builder = StructuredDocBuilder::new(Arc::new(embedding));
let tantivy_builder = TantivyDocBuilder::new(corpus::STRUCTURED_DOC, builder);

Expand Down
3 changes: 2 additions & 1 deletion crates/tabby-index/src/structured_doc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod types;

use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use futures::stream::BoxStream;
use serde_json::json;
Expand Down Expand Up @@ -37,7 +38,7 @@ impl IndexAttributeBuilder<StructuredDoc> for StructuredDocBuilder {
async fn build_chunk_attributes<'a>(
&self,
document: &'a StructuredDoc,
) -> BoxStream<'a, JoinHandle<(Vec<String>, serde_json::Value)>> {
) -> BoxStream<'a, JoinHandle<Result<(Vec<String>, serde_json::Value)>>> {
let embedding = self.embedding.clone();
document.build_chunk_attributes(embedding).await
}
Expand Down
11 changes: 6 additions & 5 deletions crates/tabby-index/src/structured_doc/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod web;

use std::sync::Arc;

use anyhow::{bail, Result};
use async_trait::async_trait;
use futures::stream::BoxStream;
use tabby_inference::Embedding;
Expand Down Expand Up @@ -52,7 +53,7 @@ pub trait BuildStructuredDoc {
async fn build_chunk_attributes(
&self,
embedding: Arc<dyn Embedding>,
) -> BoxStream<JoinHandle<(Vec<String>, serde_json::Value)>>;
) -> BoxStream<JoinHandle<Result<(Vec<String>, serde_json::Value)>>>;
}

pub enum StructuredDocFields {
Expand Down Expand Up @@ -82,7 +83,7 @@ impl BuildStructuredDoc for StructuredDoc {
async fn build_chunk_attributes(
&self,
embedding: Arc<dyn Embedding>,
) -> BoxStream<JoinHandle<(Vec<String>, serde_json::Value)>> {
) -> BoxStream<JoinHandle<Result<(Vec<String>, serde_json::Value)>>> {
match &self.fields {
StructuredDocFields::Web(doc) => doc.build_chunk_attributes(embedding).await,
StructuredDocFields::Issue(doc) => doc.build_chunk_attributes(embedding).await,
Expand All @@ -91,12 +92,12 @@ impl BuildStructuredDoc for StructuredDoc {
}
}

async fn build_tokens(embedding: Arc<dyn Embedding>, text: &str) -> Vec<String> {
async fn build_tokens(embedding: Arc<dyn Embedding>, text: &str) -> Result<Vec<String>> {
let embedding = match embedding.embed(text).await {
Ok(embedding) => embedding,
Err(err) => {
warn!("Failed to embed chunk text: {}", err);
return vec![];
bail!("Failed to embed chunk text: {}", err);
}
};

Expand All @@ -105,5 +106,5 @@ async fn build_tokens(embedding: Arc<dyn Embedding>, text: &str) -> Vec<String>
chunk_embedding_tokens.push(token);
}

chunk_embedding_tokens
Ok(chunk_embedding_tokens)
}
12 changes: 9 additions & 3 deletions crates/tabby-index/src/structured_doc/types/issue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use anyhow::Result;
use async_stream::stream;
use async_trait::async_trait;
use futures::stream::BoxStream;
Expand Down Expand Up @@ -35,13 +36,18 @@ impl BuildStructuredDoc for IssueDocument {
async fn build_chunk_attributes(
&self,
embedding: Arc<dyn Embedding>,
) -> BoxStream<JoinHandle<(Vec<String>, serde_json::Value)>> {
) -> BoxStream<JoinHandle<Result<(Vec<String>, serde_json::Value)>>> {
let text = format!("{}\n\n{}", self.title, self.body);
let s = stream! {
yield tokio::spawn(async move {
let tokens = build_tokens(embedding, &text).await;
let tokens = match build_tokens(embedding, &text).await{
Ok(tokens) => tokens,
Err(e) => {
return Err(anyhow::anyhow!("Failed to build tokens for text: {}", e));
}
};
let chunk_attributes = json!({});
(tokens, chunk_attributes)
Ok((tokens, chunk_attributes))
})
};

Expand Down
12 changes: 9 additions & 3 deletions crates/tabby-index/src/structured_doc/types/pull.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use anyhow::Result;
use async_stream::stream;
use async_trait::async_trait;
use futures::stream::BoxStream;
Expand Down Expand Up @@ -42,14 +43,19 @@ impl BuildStructuredDoc for PullDocument {
async fn build_chunk_attributes(
&self,
embedding: Arc<dyn Embedding>,
) -> BoxStream<JoinHandle<(Vec<String>, serde_json::Value)>> {
) -> BoxStream<JoinHandle<Result<(Vec<String>, serde_json::Value)>>> {
// currently not indexing the diff
let text = format!("{}\n\n{}", self.title, self.body);
let s = stream! {
yield tokio::spawn(async move {
let tokens = build_tokens(embedding, &text).await;
let tokens = match build_tokens(embedding, &text).await{
Ok(tokens) => tokens,
Err(e) => {
return Err(anyhow::anyhow!("Failed to build tokens for text: {}", e));
}
};
let chunk_attributes = json!({});
(tokens, chunk_attributes)
Ok((tokens, chunk_attributes))
})
};

Expand Down
23 changes: 19 additions & 4 deletions crates/tabby-index/src/structured_doc/types/web.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashSet, sync::Arc};

use anyhow::Result;
use async_stream::stream;
use async_trait::async_trait;
use futures::stream::BoxStream;
Expand Down Expand Up @@ -33,26 +34,40 @@ impl BuildStructuredDoc for WebDocument {
async fn build_chunk_attributes(
&self,
embedding: Arc<dyn Embedding>,
) -> BoxStream<JoinHandle<(Vec<String>, serde_json::Value)>> {
) -> BoxStream<JoinHandle<Result<(Vec<String>, serde_json::Value)>>> {
let chunks: Vec<_> = TextSplitter::new(2048)
.chunks(&self.body)
.map(|x| x.to_owned())
.collect();

let title_embedding_tokens = build_tokens(embedding.clone(), &self.title).await;
let title_embedding_tokens = match build_tokens(embedding.clone(), &self.title).await {
Ok(tokens) => tokens,
Err(e) => {
return Box::pin(stream! {
yield tokio::spawn(async move {
Err(anyhow::anyhow!("Failed to build tokens for title: {}", e))
});
});
}
};
let s = stream! {
for chunk_text in chunks {
let title_embedding_tokens = title_embedding_tokens.clone();
let embedding = embedding.clone();
yield tokio::spawn(async move {
let chunk_embedding_tokens = build_tokens(embedding.clone(), &chunk_text).await;
let chunk_embedding_tokens = match build_tokens(embedding.clone(), &chunk_text).await {
Ok(tokens) => tokens,
Err(e) => {
return Err(anyhow::anyhow!("Failed to build tokens for chunk: {}", e));
}
};
let chunk = json!({
fields::web::CHUNK_TEXT: chunk_text,
});

// Title embedding tokens are merged with chunk embedding tokens to enhance the search results.
let tokens = merge_tokens(vec![title_embedding_tokens, chunk_embedding_tokens]);
(tokens, chunk)
Ok((tokens, chunk))
});
}
};
Expand Down

0 comments on commit d75a30e

Please sign in to comment.