Skip to content

Commit

Permalink
chore: dedupe issues / PRs indexing by updated_at (#2576)
Browse files Browse the repository at this point in the history
* chore: dedupe issues / PRs indexing by updated_at

* add dedup logic with updated_at comparision

* refactor: extract doc.public

* refactor: combine SourceDocument and WebDocument

* add unit test to verify updated_at dedup logic work

* use query to compare date to avoid read date
  • Loading branch information
wsxiaoys authored Jul 5, 2024
1 parent 2e97f45 commit ac49f8e
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Fixed and Improvements
body: dedupe issues / PRs indexing by updated_at
time: 2024-07-04T12:55:08.732282+09:00
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/tabby-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ url.workspace = true
derive_builder.workspace = true
hash-ids.workspace = true
tracing.workspace = true
chrono.workspace = true

[dev-dependencies]
temp_testdir = { workspace = true }
Expand Down
43 changes: 40 additions & 3 deletions crates/tabby-common/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::borrow::Cow;

use lazy_static::lazy_static;
use tantivy::{
query::{BooleanQuery, ConstScoreQuery, ExistsQuery, Occur, Query, TermQuery},
query::{BooleanQuery, ConstScoreQuery, ExistsQuery, Occur, Query, RangeQuery, TermQuery},
schema::{
Field, IndexRecordOption, JsonObjectOptions, Schema, TextFieldIndexing, FAST, INDEXED,
STORED, STRING,
},
Term,
DateTime, Term,
};

pub struct IndexSchema {
Expand Down Expand Up @@ -46,6 +46,7 @@ pub struct IndexSchema {
}

const FIELD_CHUNK_ID: &str = "chunk_id";
const FIELD_UPDATED_AT: &str = "updated_at";

pub mod corpus {
pub const CODE: &str = "code";
Expand All @@ -64,7 +65,7 @@ impl IndexSchema {
let field_source_id = builder.add_text_field("source_id", STRING | FAST);
let field_id = builder.add_text_field("id", STRING | STORED);

let field_updated_at = builder.add_date_field("updated_at", INDEXED);
let field_updated_at = builder.add_date_field(FIELD_UPDATED_AT, INDEXED);
let field_attributes = builder.add_text_field("attributes", STORED);

let field_chunk_id = builder.add_text_field(FIELD_CHUNK_ID, STRING | FAST | STORED);
Expand Down Expand Up @@ -132,6 +133,42 @@ impl IndexSchema {
])
}

pub fn doc_indexed_after(
&self,
corpus: &str,
doc_id: &str,
updated_at: chrono::DateTime<chrono::Utc>,
) -> impl Query {
let doc_id_query = TermQuery::new(
Term::from_field_text(self.field_id, doc_id),
tantivy::schema::IndexRecordOption::Basic,
);

let updated_at = DateTime::from_timestamp_nanos(
updated_at.timestamp_nanos_opt().expect("valid timestamp"),
);

BooleanQuery::new(vec![
// Must match the corpus
(Occur::Must, self.corpus_query(corpus)),
// Must match the doc id
(Occur::Must, Box::new(doc_id_query)),
// Must match the updated_at
(
Occur::Must,
Box::new(RangeQuery::new_date(
FIELD_UPDATED_AT.to_owned(),
updated_at..DateTime::MAX,
)),
),
// Exclude chunk documents
(
Occur::MustNot,
Box::new(ExistsQuery::new_exists_query(FIELD_CHUNK_ID.into())),
),
])
}

/// Build a query to find the document with the given `doc_id`, include chunks.
pub fn doc_query_with_chunks(&self, corpus: &str, doc_id: &str) -> impl Query {
let doc_id_query = TermQuery::new(
Expand Down
2 changes: 2 additions & 0 deletions crates/tabby-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ git2.workspace = true
insta.workspace = true
async-trait.workspace = true
logkit.workspace = true
chrono.workspace = true

[dev-dependencies]
temp_testdir = { workspace = true }
Expand All @@ -48,3 +49,4 @@ tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] }
serde_json = { workspace = true }
async-trait = { workspace = true }
tracing-subscriber = { workspace = true }
serial_test = { workspace = true }
2 changes: 2 additions & 0 deletions crates/tabby-scheduler/src/code/intelligence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ mod metrics {
mod tests {
use std::path::PathBuf;

use serial_test::serial;
use tabby_common::path::set_tabby_root;
use tracing_test::traced_test;

Expand All @@ -266,6 +267,7 @@ mod tests {

#[test]
#[traced_test]
#[serial(set_tabby_root)]
fn test_create_source_file() {
set_tabby_root(get_tabby_root());
let config = get_repository_config();
Expand Down
83 changes: 9 additions & 74 deletions crates/tabby-scheduler/src/doc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
pub mod public;

use std::{collections::HashSet, sync::Arc};

use async_stream::stream;
use async_trait::async_trait;
use futures::{stream::BoxStream, StreamExt};
use public::WebDocument;
use serde_json::json;
use tabby_common::index::{self, corpus, doc};
use tabby_inference::Embedding;
Expand All @@ -11,27 +14,7 @@ use text_splitter::TextSplitter;
use tokio::task::JoinHandle;
use tracing::warn;

use crate::{
indexer::{IndexId, TantivyDocBuilder, ToIndexId},
IndexAttributeBuilder, Indexer,
};

pub struct SourceDocument {
pub source_id: String,
pub id: String,
pub title: String,
pub link: String,
pub body: String,
}

impl ToIndexId for SourceDocument {
fn to_index_id(&self) -> IndexId {
IndexId {
source_id: self.source_id.clone(),
id: self.id.clone(),
}
}
}
use crate::{indexer::TantivyDocBuilder, IndexAttributeBuilder};

const CHUNK_SIZE: usize = 2048;

Expand All @@ -46,8 +29,8 @@ impl DocBuilder {
}

#[async_trait]
impl IndexAttributeBuilder<SourceDocument> for DocBuilder {
async fn build_attributes(&self, document: &SourceDocument) -> serde_json::Value {
impl IndexAttributeBuilder<WebDocument> for DocBuilder {
async fn build_attributes(&self, document: &WebDocument) -> serde_json::Value {
json!({
doc::fields::TITLE: document.title,
doc::fields::LINK: document.link,
Expand All @@ -58,7 +41,7 @@ impl IndexAttributeBuilder<SourceDocument> for DocBuilder {
/// into binarized tokens by thresholding on zero.
async fn build_chunk_attributes(
&self,
document: &SourceDocument,
document: &WebDocument,
) -> BoxStream<JoinHandle<(Vec<String>, serde_json::Value)>> {
let embedding = self.embedding.clone();
let chunks: Vec<_> = TextSplitter::new(CHUNK_SIZE)
Expand Down Expand Up @@ -105,7 +88,8 @@ async fn build_tokens(embedding: Arc<dyn Embedding>, text: &str) -> Vec<String>
chunk_embedding_tokens
}

pub fn create_web_builder(embedding: Arc<dyn Embedding>) -> TantivyDocBuilder<SourceDocument> {
// FIXME(meng): make this private interface, always prefer using public::DocIndexer for web doc building.
pub fn create_web_builder(embedding: Arc<dyn Embedding>) -> TantivyDocBuilder<WebDocument> {
let builder = DocBuilder::new(embedding);
TantivyDocBuilder::new(corpus::WEB, builder)
}
Expand All @@ -114,52 +98,3 @@ pub fn merge_tokens(tokens: Vec<Vec<String>>) -> Vec<String> {
let tokens = tokens.into_iter().flatten().collect::<HashSet<_>>();
tokens.into_iter().collect()
}

pub struct DocIndexer {
builder: TantivyDocBuilder<SourceDocument>,
indexer: Indexer,
}

pub struct WebDocument {
pub id: String,
pub source_id: String,
pub link: String,
pub title: String,
pub body: String,
}

impl From<WebDocument> for SourceDocument {
fn from(value: WebDocument) -> Self {
Self {
id: value.id,
source_id: value.source_id,
link: value.link,
title: value.title,
body: value.body,
}
}
}

impl DocIndexer {
pub fn new(embedding: Arc<dyn Embedding>) -> Self {
let builder = create_web_builder(embedding);
let indexer = Indexer::new(corpus::WEB);
Self { indexer, builder }
}

pub async fn add(&self, document: WebDocument) {
stream! {
let (id, s) = self.builder.build(document.into()).await;
self.indexer.delete(&id);
for await doc in s.buffer_unordered(std::cmp::max(std::thread::available_parallelism().unwrap().get() * 2, 32)) {
if let Ok(Some(doc)) = doc {
self.indexer.add(doc).await;
}
}
}.collect::<()>().await;
}

pub fn commit(self) {
self.indexer.commit();
}
}
124 changes: 124 additions & 0 deletions crates/tabby-scheduler/src/doc/public.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::sync::Arc;

use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use tabby_common::index::corpus;
use tabby_inference::Embedding;

use super::create_web_builder;
use crate::{
indexer::{IndexId, TantivyDocBuilder, ToIndexId},
Indexer,
};

pub struct DocIndexer {
builder: TantivyDocBuilder<WebDocument>,
indexer: Indexer,
}

pub struct WebDocument {
pub id: String,
pub source_id: String,
pub link: String,
pub title: String,
pub body: String,
}

impl ToIndexId for WebDocument {
fn to_index_id(&self) -> IndexId {
IndexId {
source_id: self.source_id.clone(),
id: self.id.clone(),
}
}
}

impl DocIndexer {
pub fn new(embedding: Arc<dyn Embedding>) -> Self {
let builder = create_web_builder(embedding);
let indexer = Indexer::new(corpus::WEB);
Self { indexer, builder }
}

pub async fn add(&self, updated_at: DateTime<Utc>, document: WebDocument) -> bool {
if self.indexer.is_indexed_after(&document.id, updated_at) {
return false;
};

stream! {
let (id, s) = self.builder.build(document).await;
self.indexer.delete(&id);
for await doc in s.buffer_unordered(std::cmp::max(std::thread::available_parallelism().unwrap().get() * 2, 32)) {
if let Ok(Some(doc)) = doc {
self.indexer.add(doc).await;
}
}
}.count().await;

true
}

pub fn commit(self) {
self.indexer.commit();
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use async_trait::async_trait;
use serial_test::serial;
use tabby_common::path::set_tabby_root;
use temp_testdir::TempDir;

use super::*;

struct FakeEmbedding;

#[async_trait]
impl Embedding for FakeEmbedding {
async fn embed(&self, _prompt: &str) -> anyhow::Result<Vec<f32>> {
Ok(vec![0.0; 16])
}
}

fn create_testing_document() -> WebDocument {
WebDocument {
id: "1".to_string(),
source_id: "1".to_string(),
link: "https://example.com".to_string(),
title: "Example".to_string(),
body: "Hello, world!".to_string(),
}
}

#[tokio::test]
#[serial(set_tabby_root)]
async fn test_add() {
let tmp_dir = TempDir::default();
set_tabby_root(tmp_dir.to_path_buf());
let embedding = Arc::new(FakeEmbedding);
let indexer = DocIndexer::new(embedding.clone());
let updated_at = Utc::now();

// Insert a new document
assert!(indexer.add(updated_at, create_testing_document()).await);
indexer.commit();

// For document with the same id, and the updated_at is not newer, it should not be added.
let indexer = DocIndexer::new(embedding);
assert!(!indexer.add(updated_at, create_testing_document()).await);

// For document with the same id, and the updated_at is newer, it should be added.
assert!(
indexer
.add(
updated_at + chrono::Duration::seconds(1),
create_testing_document()
)
.await
);
}
}
Loading

0 comments on commit ac49f8e

Please sign in to comment.