diff --git a/crates/tabby-index/src/code/mod.rs b/crates/tabby-index/src/code/mod.rs index 1be6daeaf8ef..17b1ff0b9926 100644 --- a/crates/tabby-index/src/code/mod.rs +++ b/crates/tabby-index/src/code/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use anyhow::{bail, Result}; use async_stream::stream; use async_trait::async_trait; use futures::stream::BoxStream; @@ -68,7 +69,7 @@ impl IndexAttributeBuilder for CodeBuilder { async fn build_chunk_attributes<'a>( &self, source_code: &'a SourceCode, - ) -> BoxStream<'a, JoinHandle<(Vec, serde_json::Value)>> { + ) -> BoxStream<'a, JoinHandle, serde_json::Value)>>> { let text = match source_code.read_content() { Ok(content) => content, Err(e) => { @@ -77,13 +78,22 @@ impl IndexAttributeBuilder for CodeBuilder { source_code.filepath, e ); - return Box::pin(futures::stream::empty()); + return Box::pin(stream! { + let path = source_code.filepath.clone(); + yield tokio::spawn(async move { + bail!("Failed to read content of '{}': {}", path, e); + }); + }); } }; let Some(embedding) = self.embedding.clone() else { warn!("No embedding service found for code indexing"); - return Box::pin(futures::stream::empty()); + return Box::pin(stream! { + yield tokio::spawn(async move { + bail!("No embedding service found for code indexing"); + }); + }); }; let source_code = source_code.clone(); @@ -100,8 +110,10 @@ impl IndexAttributeBuilder for CodeBuilder { let embedding = embedding.clone(); let rewritten_body = format!("```{}\n{}\n```", source_code.filepath, body); yield tokio::spawn(async move { - let tokens = build_binarize_embedding_tokens(embedding.clone(), &rewritten_body).await; - (tokens, attributes) + match build_binarize_embedding_tokens(embedding.clone(), &rewritten_body).await { + Ok(tokens) => Ok((tokens, attributes)), + Err(err) => Err(err), + } }); } }; @@ -110,12 +122,15 @@ impl IndexAttributeBuilder for CodeBuilder { } } -async fn build_binarize_embedding_tokens(embedding: Arc, body: &str) -> Vec { +async fn build_binarize_embedding_tokens( + embedding: Arc, + body: &str, +) -> Result> { let embedding = match embedding.embed(body).await { Ok(x) => x, Err(err) => { warn!("Failed to embed chunk text: {}", err); - return Vec::new(); + bail!("Failed to embed chunk text: {}", err); } }; @@ -124,7 +139,7 @@ async fn build_binarize_embedding_tokens(embedding: Arc, body: &s tokens.push(token); } - tokens + Ok(tokens) } pub fn create_code_builder(embedding: Option>) -> TantivyDocBuilder { diff --git a/crates/tabby-index/src/indexer.rs b/crates/tabby-index/src/indexer.rs index d152e07c3510..796e1585f265 100644 --- a/crates/tabby-index/src/indexer.rs +++ b/crates/tabby-index/src/indexer.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use anyhow::bail; +use anyhow::{bail, Result}; use async_stream::stream; use futures::{stream::BoxStream, Stream, StreamExt}; use serde_json::json; @@ -43,7 +43,7 @@ pub trait IndexAttributeBuilder: Send + Sync { async fn build_chunk_attributes<'a>( &self, document: &'a T, - ) -> BoxStream<'a, JoinHandle<(Vec, serde_json::Value)>>; + ) -> BoxStream<'a, JoinHandle, serde_json::Value)>>>; } pub struct TantivyDocBuilder { @@ -132,10 +132,18 @@ impl TantivyDocBuilder { // the document, and // a flag indicating whether the tokens were created successfully. yield tokio::spawn(async move { - let Ok((tokens, chunk_attributes)) = task.await else { + let Ok(built_chunk_attributes_result) = task.await else { + // Join error, there is no attr, return None and false return (None, false); }; + let (tokens, chunk_attributes) = match built_chunk_attributes_result{ + Ok((tokens, chunk_attributes)) => (tokens, chunk_attributes), + Err(e) => { + warn!("Failed to build chunk attributes for document '{}': {}", id, e); + return (None, false); + } + }; let mut doc = doc! { schema.field_id => id, schema.field_source_id => source_id, diff --git a/crates/tabby-index/src/indexer_tests.rs b/crates/tabby-index/src/indexer_tests.rs index b16134006684..77b966f3716e 100644 --- a/crates/tabby-index/src/indexer_tests.rs +++ b/crates/tabby-index/src/indexer_tests.rs @@ -5,19 +5,23 @@ mod mock_embedding { pub struct MockEmbedding { result: Vec, + error: bool, } impl MockEmbedding { - pub fn new(result: Vec) -> Self { - Self { result } + pub fn new(result: Vec, error: bool) -> Self { + Self { result, error } } } #[async_trait] impl Embedding for MockEmbedding { async fn embed(&self, prompt: &str) -> Result> { - if prompt.starts_with("error") { - Err(anyhow::anyhow!(prompt.to_owned())) + if self.error { + Err(anyhow::anyhow!( + "Mock error, prompt length {}", + prompt.len() + )) } else { Ok(self.result.clone()) } @@ -51,7 +55,7 @@ mod structured_doc_tests { tabby_common::path::set_tabby_root(temp_dir.to_owned()); let id = "structured_doc_empty_embedding"; - let embedding = MockEmbedding::new(vec![]); + let embedding = MockEmbedding::new(vec![], true); let embedding = Arc::new(embedding); let indexer = StructuredDocIndexer::new(embedding.clone()); let doc = StructuredDoc { @@ -103,7 +107,7 @@ mod structured_doc_tests { tabby_common::path::set_tabby_root(temp_dir.to_owned()); let id = "structured_doc_with_embedding"; - let embedding = MockEmbedding::new(vec![1.0]); + let embedding = MockEmbedding::new(vec![1.0], false); let embedding = Arc::new(embedding); let indexer = StructuredDocIndexer::new(embedding.clone()); let doc = StructuredDoc { @@ -179,7 +183,7 @@ mod builder_tests { let origin_root = tabby_common::path::tabby_root(); tabby_common::path::set_tabby_root(get_tabby_root()); - let embedding = MockEmbedding::new(vec![]); + let embedding = MockEmbedding::new(vec![], true); let builder = Arc::new(create_code_builder(Some(Arc::new(embedding)))); let repo = get_repository_config(); @@ -226,7 +230,7 @@ mod builder_tests { tabby_common::path::set_tabby_root(temp_dir.to_owned()); let test_id = "builder_empty_embedding"; - let embedding = MockEmbedding::new(vec![]); + let embedding = MockEmbedding::new(vec![], true); let builder = StructuredDocBuilder::new(Arc::new(embedding)); let tantivy_builder = TantivyDocBuilder::new(corpus::STRUCTURED_DOC, builder); @@ -281,7 +285,7 @@ mod builder_tests { tabby_common::path::set_tabby_root(temp_dir.to_owned()); let test_id = "builder_with_embedding"; - let embedding = MockEmbedding::new(vec![1.0]); + let embedding = MockEmbedding::new(vec![1.0], false); let builder = StructuredDocBuilder::new(Arc::new(embedding)); let tantivy_builder = TantivyDocBuilder::new(corpus::STRUCTURED_DOC, builder); diff --git a/crates/tabby-index/src/structured_doc/mod.rs b/crates/tabby-index/src/structured_doc/mod.rs index 7fd92dc54cb5..8501734cdaae 100644 --- a/crates/tabby-index/src/structured_doc/mod.rs +++ b/crates/tabby-index/src/structured_doc/mod.rs @@ -3,6 +3,7 @@ mod types; use std::sync::Arc; +use anyhow::Result; use async_trait::async_trait; use futures::stream::BoxStream; use serde_json::json; @@ -37,7 +38,7 @@ impl IndexAttributeBuilder for StructuredDocBuilder { async fn build_chunk_attributes<'a>( &self, document: &'a StructuredDoc, - ) -> BoxStream<'a, JoinHandle<(Vec, serde_json::Value)>> { + ) -> BoxStream<'a, JoinHandle, serde_json::Value)>>> { let embedding = self.embedding.clone(); document.build_chunk_attributes(embedding).await } diff --git a/crates/tabby-index/src/structured_doc/types.rs b/crates/tabby-index/src/structured_doc/types.rs index 69172139e62c..be95b1e589e5 100644 --- a/crates/tabby-index/src/structured_doc/types.rs +++ b/crates/tabby-index/src/structured_doc/types.rs @@ -4,6 +4,7 @@ pub mod web; use std::sync::Arc; +use anyhow::{bail, Result}; use async_trait::async_trait; use futures::stream::BoxStream; use tabby_inference::Embedding; @@ -52,7 +53,7 @@ pub trait BuildStructuredDoc { async fn build_chunk_attributes( &self, embedding: Arc, - ) -> BoxStream, serde_json::Value)>>; + ) -> BoxStream, serde_json::Value)>>>; } pub enum StructuredDocFields { @@ -82,7 +83,7 @@ impl BuildStructuredDoc for StructuredDoc { async fn build_chunk_attributes( &self, embedding: Arc, - ) -> BoxStream, serde_json::Value)>> { + ) -> BoxStream, serde_json::Value)>>> { match &self.fields { StructuredDocFields::Web(doc) => doc.build_chunk_attributes(embedding).await, StructuredDocFields::Issue(doc) => doc.build_chunk_attributes(embedding).await, @@ -91,12 +92,12 @@ impl BuildStructuredDoc for StructuredDoc { } } -async fn build_tokens(embedding: Arc, text: &str) -> Vec { +async fn build_tokens(embedding: Arc, text: &str) -> Result> { let embedding = match embedding.embed(text).await { Ok(embedding) => embedding, Err(err) => { warn!("Failed to embed chunk text: {}", err); - return vec![]; + bail!("Failed to embed chunk text: {}", err); } }; @@ -105,5 +106,5 @@ async fn build_tokens(embedding: Arc, text: &str) -> Vec chunk_embedding_tokens.push(token); } - chunk_embedding_tokens + Ok(chunk_embedding_tokens) } diff --git a/crates/tabby-index/src/structured_doc/types/issue.rs b/crates/tabby-index/src/structured_doc/types/issue.rs index d760ad17f309..030ad7ea946c 100644 --- a/crates/tabby-index/src/structured_doc/types/issue.rs +++ b/crates/tabby-index/src/structured_doc/types/issue.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use anyhow::Result; use async_stream::stream; use async_trait::async_trait; use futures::stream::BoxStream; @@ -35,13 +36,18 @@ impl BuildStructuredDoc for IssueDocument { async fn build_chunk_attributes( &self, embedding: Arc, - ) -> BoxStream, serde_json::Value)>> { + ) -> BoxStream, serde_json::Value)>>> { let text = format!("{}\n\n{}", self.title, self.body); let s = stream! { yield tokio::spawn(async move { - let tokens = build_tokens(embedding, &text).await; + let tokens = match build_tokens(embedding, &text).await{ + Ok(tokens) => tokens, + Err(e) => { + return Err(anyhow::anyhow!("Failed to build tokens for text: {}", e)); + } + }; let chunk_attributes = json!({}); - (tokens, chunk_attributes) + Ok((tokens, chunk_attributes)) }) }; diff --git a/crates/tabby-index/src/structured_doc/types/pull.rs b/crates/tabby-index/src/structured_doc/types/pull.rs index 67b7bf4ea2e2..0a80f847c972 100644 --- a/crates/tabby-index/src/structured_doc/types/pull.rs +++ b/crates/tabby-index/src/structured_doc/types/pull.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use anyhow::Result; use async_stream::stream; use async_trait::async_trait; use futures::stream::BoxStream; @@ -42,14 +43,19 @@ impl BuildStructuredDoc for PullDocument { async fn build_chunk_attributes( &self, embedding: Arc, - ) -> BoxStream, serde_json::Value)>> { + ) -> BoxStream, serde_json::Value)>>> { // currently not indexing the diff let text = format!("{}\n\n{}", self.title, self.body); let s = stream! { yield tokio::spawn(async move { - let tokens = build_tokens(embedding, &text).await; + let tokens = match build_tokens(embedding, &text).await{ + Ok(tokens) => tokens, + Err(e) => { + return Err(anyhow::anyhow!("Failed to build tokens for text: {}", e)); + } + }; let chunk_attributes = json!({}); - (tokens, chunk_attributes) + Ok((tokens, chunk_attributes)) }) }; diff --git a/crates/tabby-index/src/structured_doc/types/web.rs b/crates/tabby-index/src/structured_doc/types/web.rs index 5565258a6149..3dc2e4d3312e 100644 --- a/crates/tabby-index/src/structured_doc/types/web.rs +++ b/crates/tabby-index/src/structured_doc/types/web.rs @@ -1,5 +1,6 @@ use std::{collections::HashSet, sync::Arc}; +use anyhow::Result; use async_stream::stream; use async_trait::async_trait; use futures::stream::BoxStream; @@ -33,26 +34,40 @@ impl BuildStructuredDoc for WebDocument { async fn build_chunk_attributes( &self, embedding: Arc, - ) -> BoxStream, serde_json::Value)>> { + ) -> BoxStream, serde_json::Value)>>> { let chunks: Vec<_> = TextSplitter::new(2048) .chunks(&self.body) .map(|x| x.to_owned()) .collect(); - let title_embedding_tokens = build_tokens(embedding.clone(), &self.title).await; + let title_embedding_tokens = match build_tokens(embedding.clone(), &self.title).await { + Ok(tokens) => tokens, + Err(e) => { + return Box::pin(stream! { + yield tokio::spawn(async move { + Err(anyhow::anyhow!("Failed to build tokens for title: {}", e)) + }); + }); + } + }; let s = stream! { for chunk_text in chunks { let title_embedding_tokens = title_embedding_tokens.clone(); let embedding = embedding.clone(); yield tokio::spawn(async move { - let chunk_embedding_tokens = build_tokens(embedding.clone(), &chunk_text).await; + let chunk_embedding_tokens = match build_tokens(embedding.clone(), &chunk_text).await { + Ok(tokens) => tokens, + Err(e) => { + return Err(anyhow::anyhow!("Failed to build tokens for chunk: {}", e)); + } + }; let chunk = json!({ fields::web::CHUNK_TEXT: chunk_text, }); // Title embedding tokens are merged with chunk embedding tokens to enhance the search results. let tokens = merge_tokens(vec![title_embedding_tokens, chunk_embedding_tokens]); - (tokens, chunk) + Ok((tokens, chunk)) }); } };