diff --git a/ci/scripts/build-linux.sh b/ci/scripts/build-linux.sh index dba66edc..882ed01a 100755 --- a/ci/scripts/build-linux.sh +++ b/ci/scripts/build-linux.sh @@ -34,7 +34,7 @@ function setup_postgres() { # Fix pg_config (sometimes it points to wrong version) rm -f /usr/bin/pg_config && ln -s /usr/lib/postgresql/$PG_VERSION/bin/pg_config /usr/bin/pg_config # preload pg_cron, necessary for async tasks test - echo "shared_preload_libraries = 'pg_cron' " >> /etc/postgresql/$PG_VERSION/main/postgresql.conf + echo "shared_preload_libraries = 'pg_cron,lantern_extras' " >> /etc/postgresql/$PG_VERSION/main/postgresql.conf # Enable auth without password echo "local all all trust" > /etc/postgresql/$PG_VERSION/main/pg_hba.conf echo "host all all 127.0.0.1/32 trust" >> /etc/postgresql/$PG_VERSION/main/pg_hba.conf diff --git a/lantern_cli/Cargo.toml b/lantern_cli/Cargo.toml index 43ae7669..716cecf5 100644 --- a/lantern_cli/Cargo.toml +++ b/lantern_cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lantern_cli" -version = "0.4.1" +version = "0.4.2" edition = "2021" [[bin]] @@ -37,7 +37,6 @@ image = { version = "0.25.4", features = ["jpeg", "png", "webp" ], optional = tr nvml-wrapper = { version = "0.10.0", optional = true } strum = { version = "0.26", features = ["derive"], optional = true } regex = { version = "1.11.1", optional = true } -postgres-types = { version = "0.2.8", features = ["derive"], optional = true } usearch = { git = "https://github.com/Ngalstyan4/usearch.git", rev = "aa4f91d21230fd611b6c7741fa06be8c20acc9a9", optional = true } actix-web = { version = "4.9.0", optional = true } env_logger = { version = "0.11.5", optional = true } @@ -55,13 +54,12 @@ glob = { version="0.3.1", optional=true } reqwest = { version = "0.12.9", default-features = false, features = ["json", "blocking", "rustls-tls"], optional = true } [features] -default = ["cli", "daemon", "http-server", "autotune", "pq", "external-index", "external-index-server", "external-index-status-server", "embeddings"] +default = ["cli", "daemon", "http-server", "autotune", "pq", "external-index-server", "external-index-status-server", "embeddings"] daemon = ["dep:tokio-postgres"] http-server = ["dep:deadpool-postgres", "dep:deadpool", "dep:bytes", "dep:utoipa", "dep:utoipa-swagger-ui", "dep:actix-web", "dep:tokio-postgres", "dep:env_logger", "dep:actix-web-httpauth", "dep:regex"] autotune = [] pq = ["dep:gcp_auth", "dep:linfa", "dep:linfa-clustering", "dep:md5", "dep:rayon", "dep:reqwest", "dep:postgres", "dep:ndarray"] cli = [] -external-index = ["dep:postgres-types", "dep:usearch", "dep:postgres"] external-index-server = ["dep:bitvec", "dep:rustls", "dep:rustls-pemfile", "dep:glob", "dep:usearch"] external-index-status-server = ["dep:actix-web"] embeddings = ["dep:bytes", "dep:sysinfo", "dep:tiktoken-rs", "dep:url", "dep:num_cpus", "dep:ort", "dep:tokenizers", "dep:image", "dep:nvml-wrapper", "dep:strum", "dep:regex", "dep:reqwest", "dep:ndarray"] diff --git a/lantern_cli/README.md b/lantern_cli/README.md index 6037ce9f..6289b8a9 100644 --- a/lantern_cli/README.md +++ b/lantern_cli/README.md @@ -14,21 +14,31 @@ Run `cargo install --path lantern_cli` to install the binary ### Usage -Run `lantern-cli create-index --help` to show the cli options. +## Lantern External Index +Run `cargo run start-indexing-server --help` to show the cli options. ```bash -Usage: lantern-cli create-index --uri --table --column -m --efc --ef -d --metric-kind --out --import -``` - -### Example - -```bash -lantern-cli create-index -u "postgresql://localhost/test" -t "small_world" -c "vec" -m 16 --ef 64 --efc 128 -d 3 --metric-kind cos --out /tmp/index.usearch --import +Usage: lantern-cli start-indexing-server [OPTIONS] + +Options: + --host Host to bind [default: 0.0.0.0] + --tmp-dir Temp directory to save intermediate files [default: /tmp] + --port Port to bind [default: 8998] + --status-port Status Server Port to bind [default: 8999] + --cert SSL Certificate path + --key SSL Certificate key path + -h, --help Print help ``` ### Notes -The index should be created from the same database on which it will be loaded, so row tids will match later. +This will start external indexing server, which will be used when creating an index using `external=true`. +```sql +SET lantern.external_index_host='127.0.0.1'; +SET lantern.external_index_port='8998'; +SET lantern.external_index_secure=false; +CREATE INDEX ON test_table USING lantern_hnsw(v) WITH (external=true); +``` ## Lantern Embeddings @@ -122,7 +132,7 @@ To get full list of arguments use `bash lantern-cli autotune-index -h` Lantern CLI can be used in daemon mode to continousely listen to postgres table and generate embeddings, external indexes or autotune jobs. ```bash - lantern-cli start-daemon --uri 'postgres://postgres@localhost:5432/postgres' --embedding-table embedding_jobs --autotune-table index_autotune_jobs --autotune-results-table index_parameter_experiment_results --external-index-table external_index_jobs --schema public --log-level debug + lantern-cli start-daemon --uri 'postgres://postgres@localhost:5432/postgres' --embedding-table embedding_jobs --autotune-table index_autotune_jobs --autotune-results-table index_parameter_experiment_results --schema public --log-level debug ``` This will set up trigger on specified table (`lantern_jobs`) and when new row will be inserted it will start embedding generation based on row data. @@ -133,12 +143,15 @@ The jobs table should have the following structure -- Embedding Jobs Table should have the following structure: CREATE TABLE "public"."embedding_jobs" ( "id" SERIAL PRIMARY KEY, - "database_id" text NOT NULL, - "db_connection" text NOT NULL, - "schema" text NOT NULL, + "schema" text NOT NULL DEFAULT 'public', "table" text NOT NULL, - "runtime" text NOT NULL, + "pk" text NOT NULL DEFAULT 'id', + "label" text NULL, + "job_type" text DEFAULT 'embedding_generation', + "column_type" text DEFAULT 'REAL[]', + "runtime" text NOT NULL DEFAULT 'ort', "runtime_params" jsonb, + "batch_size" int NULL, "src_column" text NOT NULL, "dst_column" text NOT NULL, "embedding_model" text NOT NULL, @@ -151,28 +164,6 @@ CREATE TABLE "public"."embedding_jobs" ( "init_failure_reason" text, "init_progress" int2 DEFAULT 0 ); --- External Index Jobs Table should have the following structure: -CREATE TABLE "public"."external_index_jobs" ( - "id" SERIAL PRIMARY KEY, - "database_id" text NOT NULL, - "db_connection" text NOT NULL, - "schema" text NOT NULL, - "table" text NOT NULL, - "column" text NOT NULL, - "index" text, - "operator" text NOT NULL, - "efc" INT NOT NULL, - "ef" INT NOT NULL, - "m" INT NOT NULL, - "created_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - "updated_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - "canceled_at" timestamp, - "started_at" timestamp, - "finished_at" timestamp, - "failed_at" timestamp, - "failure_reason" text, - "progress" INT2 DEFAULT 0 -); -- Autotune Jobs Table should have the following structure: CREATE TABLE "public"."index_autotune_jobs" ( "id" SERIAL PRIMARY KEY, diff --git a/lantern_cli/src/cli.rs b/lantern_cli/src/cli.rs index c16aad02..885197a3 100644 --- a/lantern_cli/src/cli.rs +++ b/lantern_cli/src/cli.rs @@ -1,7 +1,6 @@ use clap::{Parser, Subcommand}; use lantern_cli::daemon::cli::DaemonArgs; use lantern_cli::embeddings::cli::{EmbeddingArgs, MeasureModelSpeedArgs, ShowModelsArgs}; -use lantern_cli::external_index::cli::CreateIndexArgs; use lantern_cli::external_index::cli::IndexServerArgs; use lantern_cli::http_server::cli::HttpServerArgs; use lantern_cli::index_autotune::cli::IndexAutotuneArgs; @@ -9,8 +8,6 @@ use lantern_cli::pq::cli::PQArgs; #[derive(Subcommand, Debug)] pub enum Commands { - /// Create external index - CreateIndex(CreateIndexArgs), /// Create embeddings CreateEmbeddings(EmbeddingArgs), /// Show embedding models diff --git a/lantern_cli/src/daemon/external_index_jobs.rs b/lantern_cli/src/daemon/external_index_jobs.rs deleted file mode 100644 index 9dda982e..00000000 --- a/lantern_cli/src/daemon/external_index_jobs.rs +++ /dev/null @@ -1,411 +0,0 @@ -use super::helpers::{ - cancel_all_jobs, cancellation_handler, collect_pending_index_jobs, db_notification_listener, - index_job_update_processor, startup_hook, -}; -use super::types::{ - ExternalIndexProcessorArgs, JobEvent, JobEventHandlersMap, JobInsertNotification, JobRunArgs, - JobTaskEventTx, JobUpdateNotification, -}; -use crate::daemon::helpers::anyhow_wrap_connection; -use crate::external_index::cli::{CreateIndexArgs, UMetricKind}; -use crate::logger::{LogLevel, Logger}; -use crate::types::*; -use crate::utils::get_full_table_name; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::SystemTime; -use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}; -use tokio::sync::RwLock; -use tokio_postgres::{Client, NoTls, Row}; -use tokio_util::sync::CancellationToken; - -pub const JOB_TABLE_DEFINITION: &'static str = r#" -"id" SERIAL PRIMARY KEY, -"schema" text NOT NULL DEFAULT 'public', -"table" text NOT NULL, -"column" text NOT NULL, -"index" text, -"operator" text NOT NULL, -"efc" INT NOT NULL, -"ef" INT NOT NULL, -"m" INT NOT NULL, -"created_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -"updated_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -"canceled_at" timestamp, -"started_at" timestamp, -"finished_at" timestamp, -"failed_at" timestamp, -"failure_reason" text, -"progress" INT2 DEFAULT 0 -"#; - -#[derive(Debug)] -pub struct ExternalIndexJob { - pub id: i32, - pub db_uri: String, - pub schema: String, - pub table: String, - pub column: String, - pub operator_class: String, - pub index_name: Option, - pub ef: usize, - pub efc: usize, - pub m: usize, -} - -impl ExternalIndexJob { - pub fn new(row: Row, db_uri: &str) -> ExternalIndexJob { - Self { - id: row.get::<&str, i32>("id"), - db_uri: db_uri.to_owned(), - schema: row.get::<&str, String>("schema"), - table: row.get::<&str, String>("table"), - column: row.get::<&str, String>("column"), - operator_class: row.get::<&str, String>("operator"), - index_name: row.get::<&str, Option>("index"), - ef: row.get::<&str, i32>("ef") as usize, - efc: row.get::<&str, i32>("efc") as usize, - m: row.get::<&str, i32>("m") as usize, - } - } -} - -async fn set_job_handle( - jobs_map: Arc, - job_id: i32, - handle: JobTaskEventTx, -) -> AnyhowVoidResult { - let mut jobs = jobs_map.write().await; - jobs.insert(job_id, handle); - Ok(()) -} - -async fn remove_job_handle(jobs_map: Arc, job_id: i32) -> AnyhowVoidResult { - let mut jobs = jobs_map.write().await; - jobs.remove(&job_id); - Ok(()) -} - -pub async fn external_index_job_processor( - mut rx: Receiver, - cancel_token: CancellationToken, -) -> AnyhowVoidResult { - loop { - tokio::select! { - msg = rx.recv() => { - if msg.is_none() { - break - } - - let (external_index_args, response_tx, event_tx, progress_callback, is_canceled, task_logger) = msg.unwrap(); - let result = tokio::task::spawn_blocking(move || crate::external_index::create_usearch_index(&external_index_args, - progress_callback, Some(is_canceled), Some(task_logger))).await?; - event_tx.send(JobEvent::Done).await?; - response_tx.send(result).await?; - }, - _ = cancel_token.cancelled() => { - break; - } - } - } - - Ok(()) -} - -async fn external_index_worker( - mut job_queue_rx: UnboundedReceiver, - external_index_processor_tx: Sender, - client: Arc, - schema: String, - table: String, - jobs_map: Arc, - logger: Arc, -) -> AnyhowVoidResult { - let schema = Arc::new(schema); - let table = Arc::new(table); - let jobs_table_name = Arc::new(get_full_table_name(&schema, &table)); - - tokio::spawn(async move { - logger.info("External index worker started"); - while let Some(job) = job_queue_rx.recv().await { - logger.info(&format!("Starting execution of index creation job {}", job.id)); - let client_ref = client.clone(); - let client_ref2 = client.clone(); - let logger_ref = logger.clone(); - let job = Arc::new(job); - let job_ref = job.clone(); - let jobs_table_name_r1 = jobs_table_name.clone(); - - let progress_callback = move |progress: u8| { - // Passed progress in a format string to avoid type casts between - // different int types - let res = futures::executor::block_on(client_ref2.execute( - &format!( - "UPDATE {jobs_table_name_r1} SET progress={progress} WHERE id=$1" - ), - &[&job_ref.id], - )); - - if let Err(e) = res { - logger_ref.error(&format!( - "Error while updating progress for job {job_id}: {e}", - job_id = job_ref.id - )); - } - }; - - let progress_callback = - Some(Box::new(progress_callback) as ProgressCbFn); - - let task_logger = - Logger::new(&format!("{}|{}", logger.label, job.id), LogLevel::Info); - let job_clone = job.clone(); - - let (event_tx, mut event_rx) = mpsc::channel(1); - - // We will spawn 2 tasks - // The first one will run index creation job and as soon as it finish - // It will send the result via job_tx channel - // The second task will listen to cancel_rx channel, so if someone send a message - // via cancel_tx channel it will change is_canceled to true - // And index job will be cancelled on next cycle - // We will keep the cancel_tx in static hashmap, so we can cancel the job if - // canceled_at will be changed to true - - let is_canceled = Arc::new(std::sync::RwLock::new(false)); - let is_canceled_clone = is_canceled.clone(); - let metric_kind = UMetricKind::from_ops(&job_clone.operator_class); - - if let Err(e) = &metric_kind { - // update failure reason - client_ref.execute(&format!("UPDATE {jobs_table_name} SET failed_at=NOW(), updated_at=NOW(), failure_reason=$1 WHERE id=$2"), &[&e.to_string(), &job.id]).await?; - continue; - } - - let metric_kind = metric_kind.unwrap(); - let val: u32 = rand::random(); - let index_path = format!("/tmp/daemon-index-{val}.usearch"); - let (tx, mut rx) = mpsc::channel(1); - external_index_processor_tx.send(( - CreateIndexArgs { - schema: job_clone.schema.clone(), - uri: job_clone.db_uri.clone(), - table: job_clone.table.clone(), - column: job_clone.column.clone(), - metric_kind, - index_name: job_clone.index_name.clone(), - m: job_clone.m, - ef: job_clone.ef, - efc: job_clone.efc, - import: true, - dims: 0, - out: index_path, - remote_database: true, - pq: false, - }, - tx, - event_tx.clone(), - progress_callback, - is_canceled_clone, - task_logger - )).await?; - - set_job_handle(jobs_map.clone(), job.id, event_tx).await?; - - while let Some(event) = event_rx.recv().await { - if let JobEvent::Errored(msg) = event { - if msg == JOB_CANCELLED_MESSAGE.to_owned() { - *is_canceled.write().unwrap() = true; - } - } - break; - } - - - let result = rx.recv().await; - - if result.is_none() { - logger.error(&format!("No result received for job {}", job.id)); - } - - - match result.unwrap() { - Ok(_) => { - remove_job_handle(jobs_map.clone(), job.id).await?; - // mark success - client_ref.execute(&format!("UPDATE {jobs_table_name} SET finished_at=NOW(), updated_at=NOW() WHERE id=$1"), &[&job.id]).await?; - }, - Err(e) => { - logger.error(&format!("Error while executing job {job_id}: {e}", job_id=job.id)); - remove_job_handle(jobs_map.clone(), job.id).await?; - // update failure reason - client_ref.execute(&format!("UPDATE {jobs_table_name} SET failed_at=NOW(), updated_at=NOW(), failure_reason=$1 WHERE id=$2"), &[&e.to_string(), &job.id]).await?; - } - } - } - Ok(()) as AnyhowVoidResult - }) - .await??; - Ok(()) -} - -async fn job_insert_processor( - client: Arc, - mut notifications_rx: UnboundedReceiver, - job_tx: UnboundedSender, - db_uri: String, - schema: String, - table: String, - logger: Arc, -) -> AnyhowVoidResult { - // This function will handle newcoming jobs - // It will update started_at create external index job from the row - // And pass to external_index_worker - // On startup this function will also be called for unfinished jobs - - tokio::spawn(async move { - let full_table_name = Arc::new(get_full_table_name(&schema, &table)); - let job_query_sql = Arc::new(format!("SELECT id, \"column\", \"table\", \"schema\", operator, efc, ef, m, \"index\", finished_at FROM {0}", &full_table_name)); - while let Some(notification) = notifications_rx.recv().await { - let id = notification.id; - - let job_result = client - .query_one( - &format!("{job_query_sql} WHERE id=$1 AND canceled_at IS NULL"), - &[&id], - ) - .await; - - if let Ok(row) = job_result { - let is_init = row - .get::<&str, Option>("finished_at") - .is_none(); - - if is_init { - // Only update init time if this is the first time job is being executed - let updated_count = client.execute(&format!("UPDATE {0} SET started_at=NOW() WHERE started_at IS NULL AND id=$1", &full_table_name), &[&id]).await?; - if updated_count == 0 && !notification.generate_missing { - continue; - } - } - job_tx.send(ExternalIndexJob::new(row, &db_uri.clone()))?; - } else { - logger.error(&format!( - "Error while getting job {id}: {}", - job_result.err().unwrap() - )); - } - } - Ok(()) as AnyhowVoidResult - }).await??; - - Ok(()) -} - -pub async fn start( - args: JobRunArgs, - external_index_processor_tx: Sender, - logger: Arc, - cancel_token: CancellationToken, -) -> AnyhowVoidResult { - logger.info("Starting External Index Jobs"); - - let (mut main_db_client, connection) = tokio_postgres::connect(&args.uri, NoTls).await?; - - let connection_task = tokio::spawn(async move { connection.await }); - - let notification_channel = "lantern_cloud_index_jobs_v2"; - - let (insert_notification_queue_tx, insert_notification_queue_rx): ( - UnboundedSender, - UnboundedReceiver, - ) = mpsc::unbounded_channel(); - let (update_notification_queue_tx, update_notification_queue_rx): ( - UnboundedSender, - UnboundedReceiver, - ) = mpsc::unbounded_channel(); - - let (job_queue_tx, job_queue_rx): ( - UnboundedSender, - UnboundedReceiver, - ) = mpsc::unbounded_channel(); - - let table = args.table_name; - - startup_hook( - &mut main_db_client, - &table, - JOB_TABLE_DEFINITION, - &args.schema, - None, - None, - None, - None, - None, - None, - None, - None, - ¬ification_channel, - logger.clone(), - ) - .await?; - - connection_task.abort(); - let (main_db_client, connection) = tokio_postgres::connect(&args.uri, NoTls).await?; - let main_db_client = Arc::new(main_db_client); - - let jobs_map: Arc = Arc::new(RwLock::new(HashMap::new())); - let jobs_map_clone = jobs_map.clone(); - - tokio::try_join!( - anyhow_wrap_connection::(connection), - db_notification_listener( - args.uri.clone(), - ¬ification_channel, - insert_notification_queue_tx.clone(), - Some(update_notification_queue_tx.clone()), - cancel_token.clone(), - logger.clone(), - ), - job_insert_processor( - main_db_client.clone(), - insert_notification_queue_rx, - job_queue_tx, - args.uri.clone(), - args.schema.clone(), - table.clone(), - logger.clone(), - ), - index_job_update_processor( - main_db_client.clone(), - update_notification_queue_rx, - args.schema.clone(), - table.clone(), - jobs_map.clone() - ), - external_index_worker( - job_queue_rx, - external_index_processor_tx, - main_db_client.clone(), - args.schema.clone(), - table.clone(), - jobs_map.clone(), - logger.clone(), - ), - collect_pending_index_jobs( - main_db_client.clone(), - insert_notification_queue_tx.clone(), - get_full_table_name(&args.schema, &table), - ), - cancellation_handler( - cancel_token.clone(), - Some(move || async { - cancel_all_jobs(jobs_map_clone).await?; - - Ok::<(), anyhow::Error>(()) - }) - ) - )?; - - Ok(()) -} diff --git a/lantern_cli/src/daemon/helpers.rs b/lantern_cli/src/daemon/helpers.rs index 65cdb10b..de9127ee 100644 --- a/lantern_cli/src/daemon/helpers.rs +++ b/lantern_cli/src/daemon/helpers.rs @@ -305,7 +305,7 @@ pub async fn startup_hook( Ok(()) } -#[cfg(any(feature = "autotune", feature = "external-index"))] +#[cfg(feature = "autotune")] pub async fn collect_pending_index_jobs( client: Arc, insert_notification_tx: UnboundedSender, @@ -336,7 +336,7 @@ pub async fn collect_pending_index_jobs( Ok(()) } -#[cfg(any(feature = "autotune", feature = "external-index"))] +#[cfg(feature = "autotune")] pub async fn index_job_update_processor( client: Arc, mut update_queue_rx: tokio::sync::mpsc::UnboundedReceiver, @@ -377,7 +377,7 @@ pub async fn index_job_update_processor( Ok(()) } -#[cfg(any(feature = "autotune", feature = "external-index"))] +#[cfg(feature = "autotune")] pub async fn cancel_all_jobs(map: Arc) -> AnyhowVoidResult { let mut jobs_map = map.write().await; let jobs: Vec<(i32, JobTaskEventTx)> = jobs_map.drain().collect(); diff --git a/lantern_cli/src/daemon/mod.rs b/lantern_cli/src/daemon/mod.rs index da73acc8..a5506c9d 100644 --- a/lantern_cli/src/daemon/mod.rs +++ b/lantern_cli/src/daemon/mod.rs @@ -5,8 +5,6 @@ pub mod cli; mod client_embedding_jobs; #[cfg(feature = "embeddings")] pub mod embedding_jobs; -#[cfg(feature = "external-index")] -pub mod external_index_jobs; mod helpers; mod types; @@ -25,7 +23,7 @@ use crate::types::AnyhowVoidResult; use crate::{logger::Logger, utils::get_full_table_name}; use types::{DaemonJobHandlerMap, JobRunArgs, TargetDB}; -use types::{AutotuneProcessorArgs, EmbeddingProcessorArgs, ExternalIndexProcessorArgs, JobType}; +use types::{AutotuneProcessorArgs, EmbeddingProcessorArgs, JobType}; lazy_static! { static ref JOBS: DaemonJobHandlerMap = RwLock::new(HashMap::new()); @@ -99,7 +97,6 @@ async fn spawn_job( let log_label = match job_type { JobType::Embeddings(_) => "embeddings", JobType::Autotune(_) => "autotune", - JobType::ExternalIndex(_) => "indexing", }; let logger = Arc::new(Logger::new( @@ -145,27 +142,6 @@ async fn spawn_job( JobType::Embeddings(_) => { anyhow::bail!("Embedding jobs are not enabled"); } - #[cfg(feature = "external-index")] - JobType::ExternalIndex(processor_tx) => { - external_index_jobs::start( - JobRunArgs { - label: args.label.clone(), - uri: target_db.uri.clone(), - schema: args.schema.clone(), - log_level: args.log_level.value(), - data_path: None, - table_name: "external_index_jobs".to_owned(), - }, - processor_tx.clone(), - logger.clone(), - cancel_token.clone(), - ) - .await - } - #[cfg(not(feature = "external-index"))] - JobType::ExternalIndex(_) => { - anyhow::bail!("External Index jobs are not enabled"); - } #[cfg(feature = "autotune")] JobType::Autotune(processor_tx) => { autotune_jobs::start( @@ -215,7 +191,6 @@ async fn spawn_jobs( args: Arc, embedding_tx: Sender, autotune_tx: Sender, - external_index_tx: Sender, cancel_token: CancellationToken, ) { let target_db = Arc::new(target_db); @@ -229,15 +204,6 @@ async fn spawn_jobs( )); } - if args.external_index { - tokio::spawn(spawn_job( - target_db.clone(), - args.clone(), - JobType::ExternalIndex(external_index_tx), - cancel_token.clone(), - )); - } - if args.autotune { tokio::spawn(spawn_job( target_db.clone(), @@ -252,7 +218,6 @@ async fn db_change_listener( args: Arc, embedding_tx: Sender, autotune_tx: Sender, - external_index_tx: Sender, logger: Arc, cancel_token: CancellationToken, ) -> AnyhowVoidResult { @@ -329,7 +294,7 @@ async fn db_change_listener( destroy_jobs(&target_db, logger.clone()).await; } "insert" => { - spawn_jobs(target_db, args.clone(), embedding_tx.clone(), autotune_tx.clone(), external_index_tx.clone(), cancel_token.clone()).await; + spawn_jobs(target_db, args.clone(), embedding_tx.clone(), autotune_tx.clone(), cancel_token.clone()).await; } _ => logger.error(&format!("Invalid action received: {action}")), } @@ -397,10 +362,6 @@ pub async fn start( Sender, Receiver, ) = mpsc::channel(1); - let external_index_channel: ( - Sender, - Receiver, - ) = mpsc::channel(1); #[cfg(feature = "embeddings")] if args_arc.embeddings { @@ -418,21 +379,12 @@ pub async fn start( )); } - #[cfg(feature = "external-index")] - if args_arc.external_index { - tokio::spawn(external_index_jobs::external_index_job_processor( - external_index_channel.1, - cancel_token.clone(), - )); - } - for target_db in target_databases { spawn_jobs( target_db, args_arc_clone.clone(), embedding_channel.0.clone(), autotune_channel.0.clone(), - external_index_channel.0.clone(), cancel_token.clone(), ) .await; @@ -443,7 +395,6 @@ pub async fn start( args_arc.clone(), embedding_channel.0.clone(), autotune_channel.0.clone(), - external_index_channel.0.clone(), logger.clone(), cancel_token.clone(), ) diff --git a/lantern_cli/src/daemon/types.rs b/lantern_cli/src/daemon/types.rs index 4bda413b..3178541e 100644 --- a/lantern_cli/src/daemon/types.rs +++ b/lantern_cli/src/daemon/types.rs @@ -96,22 +96,8 @@ pub type AutotuneProcessorArgs = ( #[cfg(not(feature = "autotune"))] pub type AutotuneProcessorArgs = (); -#[cfg(feature = "external-index")] -pub type ExternalIndexProcessorArgs = ( - crate::external_index::cli::CreateIndexArgs, - Sender, - JobTaskEventTx, - Option, - std::sync::Arc>, - crate::logger::Logger, -); -#[cfg(not(feature = "external-index"))] -pub type ExternalIndexProcessorArgs = (); - pub enum JobType { Embeddings(Sender), #[allow(dead_code)] - ExternalIndex(Sender), - #[allow(dead_code)] Autotune(Sender), } diff --git a/lantern_cli/src/external_index/cli.rs b/lantern_cli/src/external_index/cli.rs index 4e3c2e79..af040007 100644 --- a/lantern_cli/src/external_index/cli.rs +++ b/lantern_cli/src/external_index/cli.rs @@ -1,4 +1,4 @@ -use clap::{ArgAction, Parser, ValueEnum}; +use clap::{Parser, ValueEnum}; use usearch::ffi::*; #[derive(Debug, Clone, ValueEnum)] // ArgEnum here @@ -123,65 +123,6 @@ impl UMetricKind { } } -#[derive(Parser, Debug)] -pub struct CreateIndexArgs { - /// Fully associated database connection string including db name - #[arg(short, long)] - pub uri: String, - - /// Schema name - #[arg(short, long, default_value = "public")] - pub schema: String, - - /// Table name - #[arg(short, long)] - pub table: String, - - /// Column name - #[arg(short, long)] - pub column: String, - - /// Use already created codebook to create product-quantized binary index - #[arg(short, long, default_value_t = false)] - pub pq: bool, - - /// Number of neighbours for each vector - #[arg(short, default_value_t = 16)] - pub m: usize, - - /// The size of the dynamic list for the nearest neighbors in construction - #[arg(long, default_value_t = 128)] - pub efc: usize, - - /// The size of the dynamic list for the nearest neighbors in search - #[arg(long, default_value_t = 64)] - pub ef: usize, - - /// Dimensions of vector - #[arg(short, default_value_t = 0)] - pub dims: usize, - - /// Distance algorithm - #[arg(long, value_enum, default_value_t = UMetricKind::L2sq)] // arg_enum here - pub metric_kind: UMetricKind, - - /// Index output file - #[arg(short, long, default_value = "index.usearch")] // arg_enum here - pub out: String, - - /// Import index to database (should be run as db superuser to have access) - #[arg(short, long, default_value_t = false)] - pub import: bool, - - /// If database is not on the same server where the job is running - #[arg(short, long, default_value_t = true, action = ArgAction::Set)] - pub remote_database: bool, - - /// Index name to use when imporrting index to database - #[arg(long)] - pub index_name: Option, -} - #[derive(Parser, Debug, Clone)] pub struct IndexServerArgs { /// Host to bind diff --git a/lantern_cli/src/external_index/mod.rs b/lantern_cli/src/external_index/mod.rs index 227d8a4c..5a9b926e 100644 --- a/lantern_cli/src/external_index/mod.rs +++ b/lantern_cli/src/external_index/mod.rs @@ -1,463 +1,2 @@ -use rand::Rng; -use std::io::BufWriter; -use std::path::Path; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::mpsc::{Receiver, Sender, SyncSender}; -use std::sync::{mpsc, RwLock}; -use std::sync::{Arc, Mutex}; -use std::time::Instant; -use std::{fs, io}; -use usearch::ffi::{IndexOptions, ScalarKind}; -use usearch::Index; - -use crate::logger::{LogLevel, Logger}; -use crate::types::*; -use crate::utils::{get_full_table_name, quote_ident}; -use postgres::{Client, NoTls, Row}; -use postgres_large_objects::LargeObject; -use postgres_types::FromSql; - pub mod cli; -mod postgres_large_objects; -pub mod utils; - -#[cfg(feature = "external-index-server")] pub mod server; - -// Used to control chunk size when copying index file to postgres server -static COPY_BUFFER_CHUNK_SIZE: usize = 1024 * 1024 * 10; // 10MB - -#[derive(Debug)] -struct Tid { - label: u64, -} - -impl<'a> FromSql<'a> for Tid { - fn from_sql( - _: &postgres_types::Type, - raw: &'a [u8], - ) -> Result> { - let mut bytes: Vec = Vec::with_capacity(raw.len()); - - // Copy bytes of block_number->bi_hi first 2 bytes - for b in raw[..2].iter().rev() { - bytes.push(*b); - } - - // Copy bytes of block_number->bi_lo next 2 bytes - for b in raw[2..4].iter().rev() { - bytes.push(*b); - } - - // Copy bytes of index_number last 2 bytes - for b in raw[4..6].iter().rev() { - bytes.push(*b); - } - - let label: u64 = utils::bytes_to_integer_le(&bytes); - Ok(Tid { label }) - } - - fn accepts(ty: &postgres_types::Type) -> bool { - ty.name() == "tid" - } -} - -fn index_chunk(rows: Vec, index: Arc) -> AnyhowVoidResult { - for row in rows { - let ctid: Tid = row.get(0); - let vec: Vec = row.get(1); - index.add(ctid.label, &vec)?; - } - Ok(()) -} - -struct ThreadSafeIndex { - inner: Index, -} - -impl ThreadSafeIndex { - fn add(&self, label: u64, data: &[f32]) -> AnyhowVoidResult { - self.inner.add(label, data)?; - Ok(()) - } - fn save(&self, path: &str) -> AnyhowVoidResult { - self.inner.save(path)?; - Ok(()) - } -} - -unsafe impl Sync for ThreadSafeIndex {} -unsafe impl Send for ThreadSafeIndex {} - -fn report_progress(progress_cb: &Option, logger: &Logger, progress: u8) { - logger.info(&format!("Progress {progress}%")); - if progress_cb.is_some() { - let cb = progress_cb.as_ref().unwrap(); - cb(progress); - } -} - -pub fn create_usearch_index( - args: &cli::CreateIndexArgs, - progress_cb: Option, - is_canceled: Option>>, - logger: Option, -) -> Result<(), anyhow::Error> { - let logger = Arc::new(logger.unwrap_or(Logger::new("Lantern Index", LogLevel::Debug))); - let num_cores: usize = std::thread::available_parallelism().unwrap().into(); - let total_start_time = Instant::now(); - logger.info(&format!("Number of available CPU cores: {}", num_cores)); - - // get all row count - let mut client = Client::connect(&args.uri, NoTls)?; - let mut transaction = client.transaction()?; - let full_table_name = get_full_table_name(&args.schema, &args.table); - - transaction.execute("SET lock_timeout='5s'", &[])?; - transaction.execute( - &format!("LOCK TABLE ONLY {full_table_name} IN SHARE MODE"), - &[], - )?; - - let rows = transaction.query(&format!("SELECT ARRAY_LENGTH({col}, 1) as dim FROM {full_table_name} WHERE {col} IS NOT NULL LIMIT 1",col=quote_ident(&args.column)), &[])?; - - if rows.len() == 0 { - anyhow::bail!("Cannot create an external index on empty table"); - } - - let row = rows.first().unwrap(); - let infered_dimensions = row.try_get::(0)? as usize; - - if args.dims != 0 && infered_dimensions != args.dims { - // I didn't complitely remove the dimensions from args - // To have extra validation when reindexing external index - // This is invariant and should never be a case - anyhow::bail!("Infered dimensions ({infered_dimensions}) does not match with the provided dimensions ({dims})", dims=args.dims); - } - - let dimensions = infered_dimensions; - - logger.info(&format!( - "Creating index with parameters dimensions={} m={} ef={} ef_construction={}", - dimensions, args.m, args.ef, args.efc - )); - - let mut pq_codebook: *const f32 = std::ptr::null(); - let mut v: Vec = vec![]; - let mut num_centroids: usize = 0; - let mut num_subvectors: usize = 0; - - if args.pq { - let codebook_table_name = format!( - "pq_{table_name}_{column_name}", - table_name = &args.table, - column_name = &args.column - ); - let full_codebook_table_name = - get_full_table_name("_lantern_internal", &codebook_table_name); - - let rows_codebook_exists = transaction.query("SELECT true FROM information_schema.tables WHERE table_schema='_lantern_internal' AND table_name=$1;", &[&codebook_table_name])?; - - if rows_codebook_exists.len() == 0 { - anyhow::bail!("Codebook table {full_codebook_table_name} does not exist"); - } - - let rows_c = transaction.query( - &format!("SELECT COUNT(*) FROM {full_codebook_table_name} WHERE subvector_id = 0;"), - &[], - )?; - let rows_sv = transaction.query( - &format!("SELECT COUNT(*) FROM {full_codebook_table_name} WHERE centroid_id = 0;"), - &[], - )?; - - if rows_c.len() == 0 || rows_sv.len() == 0 { - anyhow::bail!("Invalid codebook table"); - } - - num_centroids = rows_c.first().unwrap().get::(0) as usize; - num_subvectors = rows_sv.first().unwrap().get::(0) as usize; - - v.resize(num_centroids * dimensions, 0.); - - let rows = transaction.query( - &format!("SELECT subvector_id, centroid_id, c FROM {full_codebook_table_name};",), - &[], - )?; - - logger.info(&format!( - "Codebook has {} rows - {num_centroids} centroids and {num_subvectors} subvectors", - rows.len() - )); - - for r in rows { - let subvector_id: i32 = r.get(0); - let centroid_id: i32 = r.get(1); - let subvector: Vec = r.get(2); - for i in 0..subvector.len() { - v[centroid_id as usize * dimensions - + subvector_id as usize * subvector.len() - + i] = subvector[i]; - } - } - pq_codebook = v.as_ptr(); - } - - let options = IndexOptions { - dimensions, - metric: args.metric_kind.value(), - quantization: ScalarKind::F32, - multi: false, - connectivity: args.m, - expansion_add: args.efc, - expansion_search: args.ef, - - num_threads: 0, // automatic - - // note: pq_construction and pq_output distinction is not yet implemented in usearch - // in the future, if pq_construction is false, we will use full vectors in memory (and - // require large memory for construction) but will output pq-quantized graph - // - // currently, regardless of pq_construction value, as long as pq_output is true, - // we construct a pq_quantized index using quantized values during construction - pq_construction: args.pq, - pq_output: args.pq, - num_centroids, - num_subvectors, - codebook: pq_codebook, - }; - let index = Index::new(&options)?; - - let start_time = Instant::now(); - let rows = transaction.query( - &format!( - "SELECT COUNT(*) FROM {full_table_name} WHERE {} IS NOT NULL;", - quote_ident(&args.column) - ), - &[], - )?; - logger.debug(&format!( - "Count estimation took {}s", - start_time.elapsed().as_secs() - )); - - let start_time = Instant::now(); - let count: i64 = rows[0].get(0); - // reserve enough memory on index - index.reserve(count as usize)?; - let thread_safe_index = ThreadSafeIndex { inner: index }; - - logger.info(&format!("Items to index {}", count)); - - let index_arc = Arc::new(thread_safe_index); - - // Create a vector to store thread handles - let mut handles = vec![]; - - let (tx, rx): (SyncSender>, Receiver>) = mpsc::sync_channel(num_cores); - let rx_arc = Arc::new(Mutex::new(rx)); - let is_canceled = is_canceled.unwrap_or(Arc::new(RwLock::new(false))); - let (progress_tx, progress_rx): (Sender, Receiver) = mpsc::channel(); - let progress_logger = logger.clone(); - let should_create_index = args.import; - // reserve 20% progress for index import - let count_for_progress = if should_create_index { - (count as f64 * 1.2).ceil() as i64 - } else { - count - }; - - std::thread::spawn(move || -> AnyhowVoidResult { - let mut prev_progress = 0; - for progress in progress_rx { - if progress == prev_progress { - continue; - } - prev_progress = progress; - report_progress(&progress_cb, &progress_logger, progress); - - if progress == 100 { - break; - } - } - Ok(()) - }); - - let processed_cnt = Arc::new(AtomicU64::new(0)); - for _ in 0..num_cores { - // spawn thread - let index_ref = index_arc.clone(); - let receiver = rx_arc.clone(); - let is_canceled = is_canceled.clone(); - let progress_tx = progress_tx.clone(); - let processed_cnt = processed_cnt.clone(); - - let handle = std::thread::spawn(move || -> AnyhowVoidResult { - loop { - let lock = receiver.lock(); - - if let Err(e) = lock { - anyhow::bail!("{e}"); - } - - let rx = lock.unwrap(); - let rows = rx.recv(); - // release the lock so other threads can take rows - drop(rx); - - if rows.is_err() { - // channel has been closed - break; - } - - if *is_canceled.read().unwrap() { - // This variable will be changed from outside to gracefully - // exit job on next chunk - anyhow::bail!(JOB_CANCELLED_MESSAGE); - } - - let rows = rows.unwrap(); - let rows_cnt = rows.len() as u64; - index_chunk(rows, index_ref.clone())?; - let all_count = processed_cnt.fetch_add(rows_cnt, Ordering::SeqCst) + rows_cnt; - let progress = (all_count as f64 / count_for_progress as f64 * 100.0) as u8; - - if progress > 0 { - progress_tx.send(progress)?; - } - } - Ok(()) - }); - handles.push(handle); - } - - // With portal we can execute a query and poll values from it in chunks - let portal = transaction.bind( - &format!( - "SELECT ctid, {col} FROM {table} WHERE {col} IS NOT NULL;", - col = quote_ident(&args.column), - table = get_full_table_name(&args.schema, &args.table) - ), - &[], - )?; - - loop { - // poll 2000 rows from portal and send it to worker threads via channel - let rows = transaction.query_portal(&portal, 2000)?; - if rows.len() == 0 { - break; - } - if *is_canceled.read().unwrap() { - // This variable will be changed from outside to gracefully - // exit job on next chunk - anyhow::bail!(JOB_CANCELLED_MESSAGE); - } - tx.send(rows)?; - } - - // Exit all channels - drop(tx); - - // Wait for all threads to finish processing - for handle in handles { - if let Err(e) = handle.join() { - logger.error("{e}"); - anyhow::bail!("{:?}", e); - } - } - logger.debug(&format!( - "Indexing took {}s", - start_time.elapsed().as_secs() - )); - - index_arc.save(&args.out)?; - logger.info(&format!( - "Index saved under {} in {}s", - &args.out, - start_time.elapsed().as_secs() - )); - - drop(index_arc); - drop(portal); - drop(rx_arc); - - if args.import { - let op_class = args.metric_kind.to_ops(); - if args.remote_database { - let start_time = Instant::now(); - logger.info("Copying index file into database server..."); - let mut rng = rand::thread_rng(); - let data_dir = transaction.query_one("SHOW data_directory", &[])?; - let data_dir: String = data_dir.try_get(0)?; - let index_path = format!("{data_dir}/ldb-index-{}.usearch", rng.gen_range(0..1000)); - let mut large_object = LargeObject::new(transaction, &index_path); - large_object.create()?; - let mut reader = fs::File::open(Path::new(&args.out))?; - let mut buf_writer = - BufWriter::with_capacity(COPY_BUFFER_CHUNK_SIZE, &mut large_object); - io::copy(&mut reader, &mut buf_writer)?; - logger.debug(&format!( - "Index copy to database took {}s", - start_time.elapsed().as_secs() - )); - progress_tx.send(90)?; - drop(reader); - drop(buf_writer); - logger.info("Creating index from file..."); - let start_time = Instant::now(); - large_object.finish( - &get_full_table_name(&args.schema, &args.table), - "e_ident(&args.column), - args.index_name.as_deref(), - &op_class, - args.ef, - args.efc, - dimensions, - args.m, - args.pq, - )?; - logger.debug(&format!( - "Index import took {}s", - start_time.elapsed().as_secs() - )); - fs::remove_file(Path::new(&args.out))?; - } else { - // If job is run on the same server as database we can skip copying part - progress_tx.send(90)?; - logger.info("Creating index from file..."); - let start_time = Instant::now(); - - let mut idx_name = "".to_owned(); - - if let Some(name) = &args.index_name { - idx_name = quote_ident(name); - transaction.execute(&format!("DROP INDEX IF EXISTS {idx_name}"), &[])?; - } - - transaction.execute( - &format!("CREATE INDEX {idx_name} ON {table_name} USING lantern_hnsw({column_name} {op_class}) WITH (_experimental_index_path='{index_path}', pq={pq}, ef={ef}, dim={dim}, m={m}, ef_construction={ef_construction});", index_path=args.out, table_name=&get_full_table_name(&args.schema, &args.table), - column_name="e_ident(&args.column), pq=args.pq, m=args.m, ef=args.ef, ef_construction=args.efc, dim=dimensions), - &[], - )?; - - transaction.commit()?; - logger.debug(&format!( - "Index import took {}s", - start_time.elapsed().as_secs() - )); - fs::remove_file(Path::new(&args.out))?; - } - progress_tx.send(100)?; - logger.info(&format!( - "Index imported to table {} and removed from filesystem", - &args.table - )); - logger.debug(&format!( - "Total indexing took {}s", - total_start_time.elapsed().as_secs() - )); - } - - Ok(()) -} diff --git a/lantern_cli/src/external_index/postgres_large_objects.rs b/lantern_cli/src/external_index/postgres_large_objects.rs deleted file mode 100644 index 2a1ec50d..00000000 --- a/lantern_cli/src/external_index/postgres_large_objects.rs +++ /dev/null @@ -1,97 +0,0 @@ -use crate::types::AnyhowVoidResult; -use crate::utils::quote_ident; -use postgres::Transaction; -use postgres_types::Oid; -use std::{cmp, io}; - -pub struct LargeObject<'a> { - pub transaction: Option>, - fd: Option, - pub oid: Option, - index_path: String, -} - -impl<'a> LargeObject<'a> { - pub fn new(transaction: Transaction<'a>, index_path: &str) -> LargeObject<'a> { - LargeObject { - transaction: Some(transaction), - oid: None, - fd: None, - index_path: index_path.to_owned(), - } - } - - pub fn create(&mut self) -> AnyhowVoidResult { - let transaction = self.transaction.as_mut().unwrap(); - let lo_oid = transaction.query_one("SELECT pg_catalog.lo_create(0)", &[])?; - let lo_oid: Oid = lo_oid.get(0); - let fd = transaction.query_one("SELECT pg_catalog.lo_open($1, 131072)", &[&lo_oid])?; - let fd: i32 = fd.get(0); - self.fd = Some(fd); - self.oid = Some(lo_oid); - Ok(()) - } - - pub fn finish( - self, - table_name: &str, - column_name: &str, - index_name: Option<&str>, - op_class: &str, - ef: usize, - ef_construction: usize, - dim: usize, - m: usize, - pq: bool, - ) -> AnyhowVoidResult { - let mut transaction = self.transaction.unwrap(); - transaction.execute( - "SELECT pg_catalog.lo_export($1, $2)", - &[&self.oid.unwrap(), &self.index_path], - )?; - transaction.execute("SELECT pg_catalog.lo_unlink($1)", &[&self.oid.unwrap()])?; - - let mut idx_name = "".to_owned(); - - if let Some(name) = index_name { - idx_name = quote_ident(name); - transaction.execute(&format!("DROP INDEX IF EXISTS {idx_name}"), &[])?; - } - - transaction.execute( - &format!("CREATE INDEX {idx_name} ON {table_name} USING lantern_hnsw({column_name} {op_class}) WITH (_experimental_index_path='{index_path}', pq={pq}, ef={ef}, dim={dim}, m={m}, ef_construction={ef_construction});", - index_path=self.index_path), - &[], - )?; - - transaction.batch_execute(&format!( - " - CREATE TEMPORARY TABLE _rm_lantern_index_output(output TEXT); - COPY _rm_lantern_index_output FROM PROGRAM 'rm -rf {path}'", - path = &self.index_path - ))?; - - transaction.commit()?; - Ok(()) - } -} - -impl<'a> io::Write for LargeObject<'a> { - fn write(&mut self, buf: &[u8]) -> io::Result { - let cap = cmp::min(buf.len(), i32::MAX as usize); - let transaction = self.transaction.as_mut().unwrap(); - let res = transaction.execute( - "SELECT pg_catalog.lowrite($1, $2)", - &[&self.fd.unwrap(), &&buf[..cap]], - ); - - if let Err(e) = res { - return Err(io::Error::new(io::ErrorKind::Other, e)); - } - Ok(cap) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} diff --git a/lantern_cli/src/external_index/utils.rs b/lantern_cli/src/external_index/utils.rs deleted file mode 100644 index 6b785c40..00000000 --- a/lantern_cli/src/external_index/utils.rs +++ /dev/null @@ -1,12 +0,0 @@ -pub fn bytes_to_integer_le(bytes: &[u8]) -> T -where - T: From + std::ops::Shl + std::ops::BitOr + Default, -{ - let mut result: T = Default::default(); - - for &byte in bytes.iter().rev() { - result = (result << 8) | T::from(byte); - } - - result -} diff --git a/lantern_cli/src/http_server/cli.rs b/lantern_cli/src/http_server/cli.rs index b68d25e5..d8f6aa8d 100644 --- a/lantern_cli/src/http_server/cli.rs +++ b/lantern_cli/src/http_server/cli.rs @@ -7,10 +7,6 @@ pub struct HttpServerArgs { #[arg(short, long)] pub db_uri: String, - /// Indicates if this is remote or local connection - #[arg(short, long, default_value_t = false)] - pub remote_database: bool, - /// Host to listen #[arg(long, default_value = "0.0.0.0")] pub host: String, diff --git a/lantern_cli/src/http_server/index.rs b/lantern_cli/src/http_server/index.rs index f4bfcdc2..f64805dc 100644 --- a/lantern_cli/src/http_server/index.rs +++ b/lantern_cli/src/http_server/index.rs @@ -5,15 +5,8 @@ use actix_web::{ post, web, HttpResponse, Responder, Result, }; -use crate::{ - external_index::{ - cli::{CreateIndexArgs, UMetricKind}, - create_usearch_index, - }, - utils::quote_ident, -}; +use crate::{external_index::cli::UMetricKind, utils::quote_ident}; -use rand::Rng; use serde::Deserialize; use super::AppState; @@ -73,11 +66,10 @@ async fn create_index( let metric_kind = UMetricKind::from(&metric).map_err(ErrorBadRequest)?; let client = data.pool.get().await?; - if !external { - client + client .execute( &format!( - "CREATE INDEX {index_name} ON {name} USING lantern_hnsw({column} {op_class}) WITH (m={m}, ef={ef}, ef_construction={ef_construction}, pq={pq})", + "CREATE INDEX {index_name} ON {name} USING lantern_hnsw({column} {op_class}) WITH (m={m}, ef={ef}, ef_construction={ef_construction}, pq={pq}, external={external})", index_name = quote_ident(&index_name), name = quote_ident(&name), column = quote_ident(&column), @@ -86,54 +78,6 @@ async fn create_index( &[], ) .await.map_err(ErrorInternalServerError)?; - } else { - let data_dir = if !data.is_remote_database { - match client - .query_one( - "SELECT setting::text FROM pg_settings WHERE name = 'data_directory'", - &[], - ) - .await - { - Err(e) => { - return Err(ErrorInternalServerError(e)); - } - Ok(row) => row.get::(0), - } - } else { - "/tmp".to_owned() - }; - - let mut rng = rand::thread_rng(); - let index_path = format!("{data_dir}/ldb-index-{}.usearch", rng.gen_range(0..1000)); - let body_index_name = body.name.clone(); - tokio::task::spawn_blocking(move || { - create_usearch_index( - &CreateIndexArgs { - column: column.to_owned(), - metric_kind, - efc: ef_construction, - ef, - m, - dims: 0, - import: true, - index_name: body_index_name, - uri: data.db_uri.clone(), - out: index_path, - schema: "public".to_owned(), - table: name.clone(), - pq, - remote_database: data.is_remote_database, - }, - None, - None, - None, - ) - }) - .await - .map_err(ErrorInternalServerError)? - .map_err(ErrorInternalServerError)?; - } Ok(HttpResponse::new(StatusCode::from_u16(200).unwrap())) } diff --git a/lantern_cli/src/http_server/mod.rs b/lantern_cli/src/http_server/mod.rs index 72a3b2f6..74c22712 100644 --- a/lantern_cli/src/http_server/mod.rs +++ b/lantern_cli/src/http_server/mod.rs @@ -52,7 +52,6 @@ pub struct AppState { db_uri: String, auth_credentials: Option, pool: AppPool, - is_remote_database: bool, #[allow(dead_code)] logger: crate::logger::Logger, } @@ -140,7 +139,6 @@ pub async fn start( let state = web::Data::new(AppState { auth_credentials, db_uri: args.db_uri.clone(), - is_remote_database: args.remote_database, pool: AppPool::new(pool), logger, }); diff --git a/lantern_cli/src/index_autotune/mod.rs b/lantern_cli/src/index_autotune/mod.rs index f1f47c44..50c52b9e 100644 --- a/lantern_cli/src/index_autotune/mod.rs +++ b/lantern_cli/src/index_autotune/mod.rs @@ -4,7 +4,6 @@ use std::{ time::Instant, }; -use crate::external_index::cli::CreateIndexArgs; use crate::logger::{LogLevel, Logger}; use crate::types::*; use crate::utils::{append_params_to_uri, get_full_table_name, quote_ident}; @@ -42,7 +41,7 @@ fn create_test_table( src_table_name: &str, column_name: &str, test_data_size: usize, -) -> Result<(usize, i32), anyhow::Error> { +) -> Result { client.batch_execute(&format!( " CREATE SCHEMA IF NOT EXISTS {INTERNAL_SCHEMA_NAME}; @@ -64,7 +63,7 @@ fn create_test_table( let sample_size = client.query_one(&format!("SELECT COUNT(*) FROM {tmp_table_name}"), &[])?; let sample_size: i64 = sample_size.get(0); - Ok((dims as usize, sample_size as i32)) + Ok(sample_size as i32) } fn export_results( @@ -304,7 +303,7 @@ pub fn autotune_index( // Create table where we will create intermediate index results // This temp table will contain random subset of rows in size of test_data_size from source table - let (column_dims, sample_size) = create_test_table( + let sample_size = create_test_table( &mut client, &tmp_table_full_name, &src_table_name, @@ -361,7 +360,6 @@ pub fn autotune_index( // Create random index file name and job_id if not provided let mut rng = rand::thread_rng(); - let index_path = format!("/tmp/index-autotune-{}.usearch", rng.gen_range(0..1000)); let index_name = format!("lantern_autotune_idx_{}", rng.gen_range(0..1000)); let uuid = rng.gen_range(0..1000000); let job_id = args.job_id.as_ref().unwrap_or(&uuid); @@ -431,27 +429,18 @@ pub fn autotune_index( } let start = Instant::now(); - crate::external_index::create_usearch_index( - &CreateIndexArgs { - import: true, - out: index_path.clone(), - table: tmp_table_name.clone(), - schema: INTERNAL_SCHEMA_NAME.to_owned(), - metric_kind: args.metric_kind.clone(), - efc: variant.ef_construction, - ef: variant.ef, - m: variant.m, - uri: uri.clone(), - column: "v".to_owned(), - dims: column_dims as usize, - index_name: Some(index_name.clone()), - remote_database: true, - pq: false, - }, - None, - Some(is_canceled.clone()), - Some(Logger::new(&logger.label, LogLevel::Info)), - )?; + client.batch_execute(&format!( + " + SET lantern.external_index_host='127.0.0.1'; + SET lantern.external_index_port=8998; + SET lantern.external_index_secure=false; + CREATE INDEX {index_name} ON {tmp_table_full_name} USING lantern_hnsw(v {op_class}) WITH (m={m}, ef={ef}, ef_construction={ef_construction}, external=true) + ", + op_class=args.metric_kind.to_ops(), + m=variant.m, + ef=variant.ef, + ef_construction=variant.ef_construction + ))?; let mut indexing_duration = start.elapsed().as_secs() as f64; indexing_duration = f64::trunc(indexing_duration * 100.0) / 100.0; // round to 2 decimal points @@ -517,27 +506,17 @@ pub fn autotune_index( "Creating index with the best result for job {job_id}" )); let start = Instant::now(); - crate::external_index::create_usearch_index( - &CreateIndexArgs { - import: true, - out: index_path.clone(), - table: args.table.clone(), - schema: args.schema.clone(), - metric_kind: args.metric_kind.clone(), - efc: best_result.ef_construction as usize, - ef: best_result.ef as usize, - m: best_result.m as usize, - uri: uri.clone(), - column: args.column.clone(), - dims: column_dims as usize, - index_name: None, - remote_database: true, - pq: false, - }, - None, - Some(is_canceled.clone()), - Some(Logger::new(&logger.label, LogLevel::Info)), - )?; + client.batch_execute(&format!( + " + CREATE INDEX {index_name} ON {full_table_name} USING lantern_hnsw({column_name} {op_class}) WITH (m={m}, ef={ef}, ef_construction={ef_construction}, external=true) + ", + full_table_name=get_full_table_name(&args.schema, &args.table), + column_name=quote_ident(&args.column), + op_class=args.metric_kind.to_ops(), + m=best_result.m, + ef=best_result.ef, + ef_construction=best_result.ef_construction + ))?; let duration = start.elapsed().as_secs(); logger.debug(&format!("Index for job {job_id} created in {duration}s")); } diff --git a/lantern_cli/src/lib.rs b/lantern_cli/src/lib.rs index fe9fb622..fda07511 100644 --- a/lantern_cli/src/lib.rs +++ b/lantern_cli/src/lib.rs @@ -2,7 +2,7 @@ pub mod daemon; #[cfg(feature = "embeddings")] pub mod embeddings; -#[cfg(feature = "external-index")] +#[cfg(feature = "external-index-server")] pub mod external_index; #[cfg(feature = "http-server")] pub mod http_server; diff --git a/lantern_cli/src/main.rs b/lantern_cli/src/main.rs index d9229242..9f6f8701 100644 --- a/lantern_cli/src/main.rs +++ b/lantern_cli/src/main.rs @@ -18,15 +18,6 @@ async fn main() { let cli = cli::Cli::parse(); let mut _main_logger = None; let res = match cli.command { - cli::Commands::CreateIndex(args) => { - let logger = Logger::new("Lantern Index", LogLevel::Debug); - _main_logger = Some(logger.clone()); - tokio::task::spawn_blocking(move || { - external_index::create_usearch_index(&args, None, None, Some(logger)) - }) - .await - .unwrap() - } cli::Commands::CreateEmbeddings(args) => { let logger = Logger::new("Lantern Embeddings", LogLevel::Debug); _main_logger = Some(logger.clone()); diff --git a/lantern_cli/src/utils/test_utils.rs b/lantern_cli/src/utils/test_utils.rs index ba282e67..daeec40a 100644 --- a/lantern_cli/src/utils/test_utils.rs +++ b/lantern_cli/src/utils/test_utils.rs @@ -11,11 +11,6 @@ pub mod daemon_test_utils { #[cfg(feature = "autotune")] pub static AUTOTUNE_JOB_TABLE_DEF: &'static str = crate::daemon::autotune_jobs::JOB_TABLE_DEFINITION; - #[cfg(not(feature = "external-index"))] - pub static EXTERNAL_INDEX_JOB_TABLE_DEF: &'static str = "(id INT)"; - #[cfg(feature = "external-index")] - pub static EXTERNAL_INDEX_JOB_TABLE_DEF: &'static str = - crate::daemon::external_index_jobs::JOB_TABLE_DEFINITION; #[cfg(not(feature = "embeddings"))] pub static EMBEDDING_JOB_TABLE_DEF: &'static str = "(id INT)"; #[cfg(feature = "embeddings")] @@ -71,12 +66,10 @@ pub mod daemon_test_utils { CREATE TABLE _lantern_extras_internal.embedding_generation_jobs ({embedding_job_table_def}); CREATE TABLE _lantern_extras_internal.autotune_jobs ({autotune_job_table_def}); - CREATE TABLE _lantern_extras_internal.external_index_jobs ({indexing_job_table_def}); "#, embedding_job_table_def = EMBEDDING_JOB_TABLE_DEF, autotune_job_table_def = AUTOTUNE_JOB_TABLE_DEF, - indexing_job_table_def = EXTERNAL_INDEX_JOB_TABLE_DEF, )) .await?; diff --git a/lantern_cli/tests/daemon_index_test_with_db.rs b/lantern_cli/tests/daemon_index_test_with_db.rs deleted file mode 100644 index be2e7b9e..00000000 --- a/lantern_cli/tests/daemon_index_test_with_db.rs +++ /dev/null @@ -1,185 +0,0 @@ -use lantern_cli::{ - daemon::{ - self, - cli::{DaemonArgs, LogLevel}, - }, - utils::test_utils::daemon_test_utils::{setup_test, wait_for_completion}, -}; -use tokio_util::sync::CancellationToken; - -static CLIENT_TABLE_NAME: &'static str = "_lantern_cloud_client1"; - -#[tokio::test] -async fn test_daemon_external_index_create_on_small_table() { - let (new_connection_uri, mut new_db_client) = - setup_test("test_daemon_external_index_create_on_small_table") - .await - .unwrap(); - new_db_client - .batch_execute(&format!( - r#" - INSERT INTO {CLIENT_TABLE_NAME} (title, title_embedding) - VALUES ('Test1', '{{0,0,0}}'), - ('Test2', '{{0,0,1}}'), - ('Test5', '{{0,0,4}}'); - - INSERT INTO _lantern_extras_internal.external_index_jobs ("id", "table", "column", "operator", "index", efc, ef, m) - VALUES (1, '{CLIENT_TABLE_NAME}', 'title_embedding', 'dist_cos_ops', 'test_idx1', 32, 32, 10); - "# - )) - .await - .unwrap(); - let cancel_token = CancellationToken::new(); - let cancel_token_clone = cancel_token.clone(); - - tokio::spawn(async { - daemon::start( - DaemonArgs { - label: None, - master_db: None, - master_db_schema: String::new(), - embeddings: false, - autotune: false, - external_index: true, - databases_table: String::new(), - schema: "_lantern_extras_internal".to_owned(), - target_db: Some(vec![new_connection_uri]), - log_level: LogLevel::Debug, - data_path: None, - inside_postgres: false, - }, - None, - cancel_token_clone, - ) - .await - .unwrap(); - }); - - wait_for_completion( - &mut new_db_client, - &format!("SELECT COUNT(*)=1 FROM pg_indexes WHERE indexname='test_idx1'"), - 30, - ) - .await - .unwrap(); - - cancel_token.cancel(); -} -#[tokio::test] -async fn test_daemon_external_index_create() { - let (new_connection_uri, mut new_db_client) = setup_test("test_daemon_external_index_create") - .await - .unwrap(); - new_db_client - .batch_execute(&format!( - r#" - INSERT INTO {CLIENT_TABLE_NAME} (title, title_embedding) - VALUES ('Test1', '{{0,0,0}}'), - ('Test2', '{{0,0,1}}'), - ('Test3', '{{0,0,2}}'), - ('Test4', '{{0,0,3}}'), - ('Test5', '{{0,0,4}}'); - - INSERT INTO _lantern_extras_internal.external_index_jobs ("id", "table", "column", "operator", "index", efc, ef, m) - VALUES (2, '{CLIENT_TABLE_NAME}', 'title_embedding', 'dist_cos_ops', 'test_idx1', 32, 32, 10); - "# - )) - .await - .unwrap(); - let cancel_token = CancellationToken::new(); - let cancel_token_clone = cancel_token.clone(); - - tokio::spawn(async { - daemon::start( - DaemonArgs { - label: None, - master_db: None, - master_db_schema: String::new(), - embeddings: false, - autotune: false, - external_index: true, - databases_table: String::new(), - schema: "_lantern_extras_internal".to_owned(), - target_db: Some(vec![new_connection_uri]), - log_level: LogLevel::Debug, - data_path: None, - inside_postgres: false, - }, - None, - cancel_token_clone, - ) - .await - .unwrap(); - }); - - wait_for_completion( - &mut new_db_client, - &format!("SELECT COUNT(*)=1 FROM pg_indexes WHERE indexname='test_idx1'"), - 30, - ) - .await - .unwrap(); - - cancel_token.cancel(); -} - -#[tokio::test] -async fn test_daemon_external_index_wrong_ops() { - let (new_connection_uri, mut new_db_client) = - setup_test("test_daemon_external_index_wrong_ops") - .await - .unwrap(); - new_db_client - .batch_execute(&format!( - r#" - INSERT INTO {CLIENT_TABLE_NAME} (title, title_embedding) - VALUES ('Test1', '{{0,0,0}}'), - ('Test2', '{{0,0,1}}'), - ('Test3', '{{0,0,2}}'), - ('Test4', '{{0,0,3}}'), - ('Test5', '{{0,0,4}}'); - - INSERT INTO _lantern_extras_internal.external_index_jobs ("id", "table", "column", "operator", "index", efc, ef, m) - VALUES (2, '{CLIENT_TABLE_NAME}', 'title_embedding', 'cos', 'test_idx1', 32, 32, 10); - "# - )) - .await - .unwrap(); - let cancel_token = CancellationToken::new(); - let cancel_token_clone = cancel_token.clone(); - - tokio::spawn(async { - daemon::start( - DaemonArgs { - label: None, - master_db: None, - master_db_schema: String::new(), - embeddings: false, - autotune: false, - external_index: true, - databases_table: String::new(), - schema: "_lantern_extras_internal".to_owned(), - target_db: Some(vec![new_connection_uri]), - log_level: LogLevel::Debug, - data_path: None, - inside_postgres: false, - }, - None, - cancel_token_clone, - ) - .await - .unwrap(); - }); - - wait_for_completion( - &mut new_db_client, - &format!( - "SELECT COUNT(*)=1 FROM _lantern_extras_internal.external_index_jobs WHERE failed_at IS NOT NULL" - ), - 30, - ) - .await - .unwrap(); - - cancel_token.cancel(); -} diff --git a/lantern_cli/tests/external_index_server_test.rs b/lantern_cli/tests/external_index_server_test.rs index 10f1552c..372c5161 100644 --- a/lantern_cli/tests/external_index_server_test.rs +++ b/lantern_cli/tests/external_index_server_test.rs @@ -83,8 +83,8 @@ fn initialize() { external_index::server::start_tcp_server( IndexServerArgs { host: "127.0.0.1".to_owned(), - port: 8998, - status_port: 8999, + port: 7998, + status_port: 7999, tmp_dir: "/tmp".to_owned(), cert: None, key: None, @@ -141,7 +141,7 @@ fn initialize_ssl() { #[tokio::test] async fn test_external_index_server_invalid_header() { initialize(); - let mut stream = TcpStream::connect("127.0.0.1:8998").unwrap(); + let mut stream = TcpStream::connect("127.0.0.1:7998").unwrap(); let mut uint32_buf = [0; 4]; stream.read_exact(&mut uint32_buf).unwrap(); assert_eq!(u32::from_le_bytes(uint32_buf), PROTOCOL_VERSION); @@ -169,7 +169,7 @@ async fn test_external_index_server_invalid_header() { #[tokio::test] async fn test_external_index_server_short_message() { initialize(); - let mut stream = TcpStream::connect("127.0.0.1:8998").unwrap(); + let mut stream = TcpStream::connect("127.0.0.1:7998").unwrap(); let mut uint32_buf = [0; 4]; stream.read_exact(&mut uint32_buf).unwrap(); assert_eq!(u32::from_le_bytes(uint32_buf), PROTOCOL_VERSION); @@ -231,7 +231,7 @@ async fn test_external_index_server_indexing() { (13, vec![1.0, 1.0, 1.0]), ]; - let mut stream = TcpStream::connect("127.0.0.1:8998").unwrap(); + let mut stream = TcpStream::connect("127.0.0.1:7998").unwrap(); let mut uint32_buf = [0; 4]; stream.read_exact(&mut uint32_buf).unwrap(); assert_eq!(u32::from_le_bytes(uint32_buf), PROTOCOL_VERSION); @@ -261,7 +261,7 @@ async fn test_external_index_server_indexing() { assert_eq!(buf[0], 0); - let request = reqwest::get(&format!("http://127.0.0.1:8999")) + let request = reqwest::get(&format!("http://127.0.0.1:7999")) .await .unwrap(); let body_json: ServerStatusResponse = @@ -317,7 +317,7 @@ async fn test_external_index_server_indexing() { drop(stream); std::thread::sleep(Duration::from_secs(1)); - let request = reqwest::get(&format!("http://127.0.0.1:8999")) + let request = reqwest::get(&format!("http://127.0.0.1:7999")) .await .unwrap(); let body_json: ServerStatusResponse = @@ -488,7 +488,7 @@ async fn test_external_index_server_indexing_scalar_quantization() { (13, vec![1.0, 1.0, 1.0]), ]; - let mut stream = TcpStream::connect("127.0.0.1:8998").unwrap(); + let mut stream = TcpStream::connect("127.0.0.1:7998").unwrap(); let mut uint32_buf = [0; 4]; stream.read_exact(&mut uint32_buf).unwrap(); assert_eq!(u32::from_le_bytes(uint32_buf), PROTOCOL_VERSION); @@ -601,7 +601,7 @@ async fn test_external_index_server_indexing_hamming_distance() { (13, vec![1, 1, 1]), ]; - let mut stream = TcpStream::connect("127.0.0.1:8998").unwrap(); + let mut stream = TcpStream::connect("127.0.0.1:7998").unwrap(); let mut uint32_buf = [0; 4]; stream.read_exact(&mut uint32_buf).unwrap(); assert_eq!(u32::from_le_bytes(uint32_buf), PROTOCOL_VERSION); @@ -721,7 +721,7 @@ async fn test_external_index_server_indexing_pq() { (13, vec![1.0, 1.0, 1.0]), ]; - let mut stream = TcpStream::connect("127.0.0.1:8998").unwrap(); + let mut stream = TcpStream::connect("127.0.0.1:7998").unwrap(); let mut uint32_buf = [0; 4]; stream.read_exact(&mut uint32_buf).unwrap(); assert_eq!(u32::from_le_bytes(uint32_buf), PROTOCOL_VERSION); diff --git a/lantern_cli/tests/http_server_test_with_db.rs b/lantern_cli/tests/http_server_test_with_db.rs index c3f1d1b8..69266555 100644 --- a/lantern_cli/tests/http_server_test_with_db.rs +++ b/lantern_cli/tests/http_server_test_with_db.rs @@ -44,7 +44,6 @@ fn start_server(db_uri: String) -> Sender<()> { http_server::start( HttpServerArgs { db_uri, - remote_database: true, host: "127.0.0.1".to_owned(), port: 7777, username: Some("test".to_owned()), @@ -284,7 +283,6 @@ async fn test_index_create() -> AnyhowVoidResult { assert_eq!(indexes[0].get("m").unwrap(), "16"); assert_eq!(indexes[0].get("ef_construction").unwrap(), "128"); assert_eq!(indexes[0].get("ef").unwrap(), "64"); - assert_eq!(indexes[0].get("dim").unwrap(), "3"); assert_eq!(indexes[0].get("metric").unwrap(), "cos"); Ok(()) } diff --git a/lantern_extras/Cargo.toml b/lantern_extras/Cargo.toml index 3eca579c..0c0cefa7 100644 --- a/lantern_extras/Cargo.toml +++ b/lantern_extras/Cargo.toml @@ -26,7 +26,6 @@ itertools = "0.13" backtrace = "0.3" url = "2.5" lantern_cli = { path = "../lantern_cli", default-features = false, features = [ - "external-index", "external-index-server", "embeddings", "daemon",