Skip to content

Commit

Permalink
fix(indexer): fan out when index code chunks (#3496)
Browse files Browse the repository at this point in the history
* fix(indexer): fan out when build index chunks

Signed-off-by: Wei Zhang <[email protected]>

* test: fix for fan out

Signed-off-by: Wei Zhang <[email protected]>

* [autofix.ci] apply automated fixes

* chore: use tokio channel

Signed-off-by: Wei Zhang <[email protected]>

---------

Signed-off-by: Wei Zhang <[email protected]>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
zwpaper and autofix-ci[bot] authored Dec 2, 2024
1 parent 5c15a7c commit 08e6262
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 20 deletions.
57 changes: 39 additions & 18 deletions crates/tabby-index/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tantivy::{
schema::{self, Value},
DocAddress, DocSet, IndexWriter, Searcher, TantivyDocument, Term, TERMINATED,
};
use tokio::task::JoinHandle;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::{debug, warn};

use crate::tantivy_utils::open_or_create_index;
Expand Down Expand Up @@ -76,35 +76,56 @@ impl<T: ToIndexId> TantivyDocBuilder<T> {
let doc_id = id.clone();
let doc_attributes = self.builder.build_attributes(&document).await;
let s = stream! {
let mut failed_count: u64 = 0;
let (tx, mut rx) = mpsc::channel(32);

for await chunk_doc in self.build_chunks(cloned_id, source_id.clone(), updated_at, document).await {
match chunk_doc.await {
Ok(Ok(doc)) => {
yield tokio::spawn(async move { Some(doc) });
}
Ok(Err(e)) => {
warn!("Failed to build chunk for document '{}': {}", doc_id, e);
failed_count += 1;
}
Err(e) => {
warn!("Failed to call build chunk '{}': {}", doc_id, e);
failed_count += 1;
let tx = tx.clone();
let doc_id = doc_id.clone();
yield tokio::spawn(async move {
match chunk_doc.await {
Ok(Ok(doc)) => {
Some(doc)
}
Ok(Err(e)) => {
warn!("Failed to build chunk for document '{}': {}", doc_id, e);
tx.send(()).await.unwrap_or_else(|e| {
warn!("Failed to send error signal for document '{}': {}", doc_id, e);
});
None
}
Err(e) => {
warn!("Failed to call build chunk '{}': {}", doc_id, e);
tx.send(()).await.unwrap_or_else(|e| {
warn!("Failed to send error signal for document '{}': {}", doc_id, e);
});
None
}
}
}
});
};

// drop tx to signal the end of the stream
// the cloned is dropped in its own thread
drop(tx);

let mut doc = doc! {
schema.field_id => doc_id,
schema.field_source_id => source_id,
schema.field_corpus => self.corpus,
schema.field_attributes => doc_attributes,
schema.field_updated_at => updated_at,
};
if failed_count > 0 {
doc.add_u64(schema.field_failed_chunks_count, failed_count);
}

yield tokio::spawn(async move { Some(doc) });
yield tokio::spawn(async move {
let mut failed_count = 0;
while let Some(_) = rx.recv().await {
failed_count += 1;
}
if failed_count > 0 {
doc.add_u64(schema.field_failed_chunks_count, failed_count as u64);
}
Some(doc)
});
};

(id, s)
Expand Down
12 changes: 10 additions & 2 deletions crates/tabby-index/src/indexer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,18 @@ mod builder_tests {
std::thread::available_parallelism().unwrap().get() * 2,
32,
))
.map(|handler| handler.unwrap())
.collect::<Vec<_>>()
.await
.into_iter()
.flatten()
.collect::<Vec<_>>()
});

// the chunks should be failed as no embedding is provided
// the last element is the document itself
assert_eq!(res.len(), 1);
let doc = res.last().unwrap().as_ref().unwrap().as_ref().unwrap();
let doc = res.last().unwrap();

let schema = IndexSchema::instance();
let failed_count = doc
Expand Down Expand Up @@ -250,16 +254,20 @@ mod builder_tests {
std::thread::available_parallelism().unwrap().get() * 2,
32,
))
.map(|handler| handler.unwrap())
.collect::<Vec<_>>()
.await
.into_iter()
.flatten()
.collect::<Vec<_>>()
});

// The last element is the document itself,
// while the preceding elements are the chunks.
// Given that the embedding is empty,
// all chunks should be considered failed and skipped.
assert_eq!(res.len(), 1);
let doc = res.last().unwrap().as_ref().unwrap().as_ref().unwrap();
let doc = res.last().unwrap();

let schema = IndexSchema::instance();
let failed_count = doc
Expand Down

0 comments on commit 08e6262

Please sign in to comment.