Skip to content

Commit

Permalink
remove importing external index from file functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
var77 committed Nov 4, 2024
1 parent 4d6e4a4 commit cad8cb3
Show file tree
Hide file tree
Showing 17 changed files with 69 additions and 731 deletions.
2 changes: 1 addition & 1 deletion lantern_extras/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub static LLM_DEPLOYMENT_URL: GucSetting<Option<&'static CStr>> =
pub static OPENAI_AZURE_ENTRA_TOKEN: GucSetting<Option<&'static CStr>> =
GucSetting::<Option<&'static CStr>>::new(None);
pub static ENABLE_DAEMON: GucSetting<bool> = GucSetting::<bool>::new(false);
pub static ENABLE_INDEXING_SERVER: GucSetting<bool> = GucSetting::<bool>::new(false);
pub static ENABLE_INDEXING_SERVER: GucSetting<bool> = GucSetting::<bool>::new(true);

pub static DAEMON_DATABASES: GucSetting<Option<&'static CStr>> =
GucSetting::<Option<&'static CStr>>::new(None);
Expand Down
9 changes: 0 additions & 9 deletions lantern_hnsw/scripts/run_all_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,6 @@ if [[ -n "$FILTER" || -n "$EXCLUDE" ]]; then
if [[ "$pg_cron_installed" == "1" ]]; then
TEST_FILES="${TEST_FILES}${NEWLINE}$(cat $SCHEDULE | grep -E '^(test_pg_cron:)' | sed -e 's/^test_pg_cron:/test:/' | tr " " "\n" | sed -e '/^$/d')"
fi

if [[ "$lantern_extras_installed" ]]; then
TEST_FILES="${TEST_FILES}${NEWLINE}$(cat $SCHEDULE | grep -E '^(test_extras:)' | sed -e 's/^test_extras:/test:/' | tr " " "\n" | sed -e '/^$/d')"
fi
fi

if [ $PG_VERSION -lt 13 ]; then
Expand Down Expand Up @@ -205,11 +201,6 @@ else
if [ "$pg_cron_installed" == "1" ]; then
echo "test: $test_name" >> $TMP_OUTDIR/schedule.txt
fi
elif [[ "$line" =~ ^test_extras: ]]; then
test_name=$(echo "$line" | sed -e 's/test_extras://')
if [ "$lantern_extras_installed" == "1" ]; then
echo "test: $test_name" >> $TMP_OUTDIR/schedule.txt
fi
elif [[ "$line" =~ ^test_begin: ]]; then
test_name=$(echo "$line" | sed -e 's/test_begin:/test:/')
echo "$test_name" >> $TMP_OUTDIR/schedule.txt
Expand Down
24 changes: 0 additions & 24 deletions lantern_hnsw/sql/lantern.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
CREATE FUNCTION hnsw_handler(internal) RETURNS index_am_handler
AS 'MODULE_PATHNAME' LANGUAGE C;

CREATE FUNCTION lantern_reindex_external_index(index regclass) RETURNS VOID
AS 'MODULE_PATHNAME', 'lantern_reindex_external_index' LANGUAGE C STABLE STRICT PARALLEL UNSAFE;

-- functions
CREATE FUNCTION ldb_generic_dist(real[], real[]) RETURNS real
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
Expand Down Expand Up @@ -161,27 +158,6 @@ END;
$BODY$
LANGUAGE plpgsql;

-- Database updates
-- Must be run in update scripts every time index storage format changes and a finer-grained update
-- method is not shipped for the format change
CREATE OR REPLACE FUNCTION _lantern_internal.reindex_lantern_indexes()
RETURNS VOID AS $$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT indexname, indexdef FROM pg_indexes
WHERE indexdef ILIKE '%USING lantern_hnsw%'
LOOP
RAISE NOTICE 'Reindexing index: %', r.indexname;
IF POSITION('_experimental_index_path' in r.indexdef) > 0 THEN
PERFORM lantern_reindex_external_index(r.indexname::regclass);
ELSE
EXECUTE 'REINDEX INDEX ' || quote_ident(r.indexname) || ';';
END IF;
RAISE NOTICE 'Reindexed index: %', r.indexname;
END LOOP;
END $$ LANGUAGE plpgsql VOLATILE;

-------------------------------------
-------- Product Quantization -------
-------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions lantern_hnsw/sql/updates/0.4.1--0.4.2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ BEGIN
RETURN COALESCE(type_oid, 0);
END;
$$ LANGUAGE plpgsql;

DROP FUNCTION lantern_reindex_external_index;
DROP FUNCTION _lantern_internal.reindex_lantern_indexes;
16 changes: 0 additions & 16 deletions lantern_hnsw/src/hnsw.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,22 +425,6 @@ Datum lantern_internal_failure_point_enable(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}

// todo:: remove in 0.3.1
PGDLLEXPORT PG_FUNCTION_INFO_V1(lantern_internal_continue_blockmap_group_initialization);
Datum lantern_internal_continue_blockmap_group_initialization(PG_FUNCTION_ARGS)
{
LDB_UNUSED(fcinfo);
PG_RETURN_NULL();
}

PGDLLEXPORT PG_FUNCTION_INFO_V1(lantern_reindex_external_index);
Datum lantern_reindex_external_index(PG_FUNCTION_ARGS)
{
Oid indrelid = PG_GETARG_OID(0);
ldb_reindex_external_index(indrelid);
PG_RETURN_VOID();
}

/*
* Get data type for give oid
* */
Expand Down
1 change: 0 additions & 1 deletion lantern_hnsw/src/hnsw.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ PGDLLEXPORT Datum hamming_dist_with_guard(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum cos_dist(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum cos_dist_with_guard(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum vector_cos_dist(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum lantern_reindex_external_index(PG_FUNCTION_ARGS);

HnswColumnType GetColumnTypeFromOid(Oid oid);
HnswColumnType GetIndexColumnType(Relation index);
Expand Down
187 changes: 30 additions & 157 deletions lantern_hnsw/src/hnsw/build.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,12 @@ static void BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo, ldb_

InitBuildState(buildstate, heap, index, indexInfo);

if(buildstate->index_file_path) {
elog(ERROR,
"Importing index from file is deprecated.\n"
"If you want to use external indexing pass `external=true` in index options");
}

opts.dimensions = buildstate->dimensions;

PopulateUsearchOpts(index, &opts);
Expand All @@ -512,76 +518,46 @@ static void BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo, ldb_
elog(INFO, "done init usearch index");
assert(error == NULL);

if(buildstate->index_file_path) {
if(access(buildstate->index_file_path, F_OK) != 0) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Invalid index file path. "
"If this is REINDEX operation call `SELECT "
"lantern_reindex_external_index('%s')` to recreate index",
RelationGetRelationName(index))));
}
usearch_load(buildstate->usearch_index, buildstate->index_file_path, &error);
if(error != NULL) {
elog(ERROR, "%s", error);
}
elog(INFO, "done loading usearch index");
uint32_t estimated_row_count = EstimateRowCount(heap);

metadata = usearch_index_metadata(buildstate->usearch_index, &error);
assert(error == NULL);
opts.connectivity = metadata.connectivity;
opts.dimensions = metadata.dimensions;
opts.expansion_add = metadata.expansion_add;
opts.expansion_search = metadata.expansion_search;
opts.metric_kind = metadata.init_options.metric_kind;
opts.pq = metadata.init_options.pq;
opts.num_centroids = metadata.init_options.num_centroids;
opts.num_subvectors = metadata.init_options.num_subvectors;
if(buildstate->external) {
buildstate->external_socket = palloc0(sizeof(external_index_socket_t));
create_external_index_session(ldb_external_index_host,
ldb_external_index_port,
ldb_external_index_secure,
&opts,
buildstate,
estimated_row_count);
} else {
uint32_t estimated_row_count = EstimateRowCount(heap);
CheckMem(maintenance_work_mem,
index,
buildstate->usearch_index,
estimated_row_count,
"index size exceeded maintenance_work_mem during index construction, consider increasing "
"index size exceeded maintenance_work_mem during index construction, consider increasing"
"maintenance_work_mem");

if(buildstate->external) {
buildstate->external_socket = palloc0(sizeof(external_index_socket_t));
create_external_index_session(ldb_external_index_host,
ldb_external_index_port,
ldb_external_index_secure,
&opts,
buildstate,
estimated_row_count);
} else {
usearch_reserve(buildstate->usearch_index, estimated_row_count, &error);

if(error != NULL) {
// There's not much we can do if free throws an error, but we want to preserve the contents of the first
// one in case it does
usearch_error_t local_error = NULL;
usearch_free(buildstate->usearch_index, &local_error);
elog(ERROR, "Error reserving space for index: %s", error);
}
usearch_reserve(buildstate->usearch_index, estimated_row_count, &error);

if(error != NULL) {
// There's not much we can do if free throws an error, but we want to preserve the contents of the first
// one in case it does
usearch_error_t local_error = NULL;
usearch_free(buildstate->usearch_index, &local_error);
elog(ERROR, "Error reserving space for index: %s", error);
}
}

UpdateProgress(PROGRESS_CREATEIDX_PHASE, LDB_PROGRESS_HNSW_PHASE_IN_MEMORY_INSERT);
LanternBench("build hnsw index", ScanTable(buildstate));
UpdateProgress(PROGRESS_CREATEIDX_PHASE, LDB_PROGRESS_HNSW_PHASE_IN_MEMORY_INSERT);
LanternBench("build hnsw index", ScanTable(buildstate));

if(!buildstate->external) {
elog(INFO, "inserted %ld elements", usearch_size(buildstate->usearch_index, &error));
}
assert(error == NULL);
if(!buildstate->external) {
elog(INFO, "inserted %ld elements", usearch_size(buildstate->usearch_index, &error));
}

metadata = usearch_index_metadata(buildstate->usearch_index, &error);
assert(error == NULL);

if(buildstate->index_file_path) {
buildstate->index_file_fd = open(buildstate->index_file_path, O_RDONLY);
assert(buildstate->index_file_fd > 0);
} else if(buildstate->external) {
if(buildstate->external) {
buildstate->index_buffer = palloc(USEARCH_HEADER_SIZE);
external_index_receive_metadata(
buildstate->external_socket, &num_added_vectors, &buildstate->index_buffer_size);
Expand Down Expand Up @@ -745,106 +721,3 @@ void ldb_ambuildunlogged(Relation index)
IndexInfo *indexInfo = BuildIndexInfo(index);
BuildEmptyIndex(index, indexInfo, &buildstate);
}

void ldb_reindex_external_index(Oid indrelid)
{
HnswIndexHeaderPage *headerp;
FmgrInfo reindex_finfo = {0};
BlockNumber HEADER_BLOCK = 0;
Relation index_rel;
Buffer buf;
Page page;
Oid lantern_extras_namespace_oid = InvalidOid;
Oid function_oid;
Oid function_argtypes_oid[ 7 ];
oidvector *function_argtypes;
char *metric_kind;
const char *lantern_extras_schema = "lantern_extras";
uint32_t dim = 0;
uint32_t m = 0;
uint32_t ef_construction = 0;
uint32_t ef = 0;
bool pq = false;
char *ext_not_found_err = "Please install 'lantern_extras' extension or update it to the latest version";

lantern_extras_namespace_oid = get_namespace_oid(lantern_extras_schema, true);

if(!OidIsValid(lantern_extras_namespace_oid)) {
elog(ERROR, "%s", ext_not_found_err);
}

// Check if _reindex_external_index function exists in lantern schema
function_argtypes_oid[ 0 ] = REGCLASSOID;
function_argtypes_oid[ 1 ] = TEXTOID;
function_argtypes_oid[ 2 ] = INT4OID;
function_argtypes_oid[ 3 ] = INT4OID;
function_argtypes_oid[ 4 ] = INT4OID;
function_argtypes_oid[ 5 ] = INT4OID;
function_argtypes_oid[ 6 ] = BOOLOID;
function_argtypes = buildoidvector(function_argtypes_oid, 7);

function_oid = GetSysCacheOid(PROCNAMEARGSNSP,
#if PG_VERSION_NUM >= 120000
Anum_pg_proc_oid,
#endif
CStringGetDatum("_reindex_external_index"),
PointerGetDatum(function_argtypes),
ObjectIdGetDatum(lantern_extras_namespace_oid),
0);

if(!OidIsValid(function_oid)) {
elog(ERROR, "%s", ext_not_found_err);
}
// Get index params from index header page
index_rel = relation_open(indrelid, AccessShareLock);
buf = ReadBuffer(index_rel, HEADER_BLOCK);
LockBuffer(buf, BUFFER_LOCK_SHARE);
page = BufferGetPage(buf);
headerp = (HnswIndexHeaderPage *)PageGetContents(page);

assert(headerp->magicNumber == LDB_WAL_MAGIC_NUMBER);

// Convert metric_kind enum to string representation
switch(headerp->metric_kind) {
case usearch_metric_l2sq_k:
metric_kind = "l2sq";
break;
case usearch_metric_cos_k:
metric_kind = "cos";
break;
case usearch_metric_hamming_k:
metric_kind = "hamming";
break;
default:
metric_kind = NULL;
ldb_invariant(true, "Unsupported metric kind");
}

dim = headerp->vector_dim;
m = headerp->m;
ef = headerp->ef;
ef_construction = headerp->ef_construction;
pq = headerp->pq;

UnlockReleaseBuffer(buf);
relation_close(index_rel, AccessShareLock);

// We can not have external index without knowing dimensions
if(dim <= 0) {
elog(ERROR, "Column does not have dimensions: can not create external index on empty table");
}

// Get _reindex_external_index function info to do direct call into it
fmgr_info(function_oid, &reindex_finfo);

assert(reindex_finfo.fn_addr != NULL);

DirectFunctionCall7(reindex_finfo.fn_addr,
ObjectIdGetDatum(indrelid),
CStringGetTextDatum(metric_kind),
Int32GetDatum(dim),
Int32GetDatum(m),
Int32GetDatum(ef_construction),
Int32GetDatum(ef),
BoolGetDatum(pq));
}
1 change: 0 additions & 1 deletion lantern_hnsw/src/hnsw/build.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,5 @@ IndexBuildResult *ldb_ambuild(Relation heap, Relation index, IndexInfo *indexInf
void ldb_ambuildunlogged(Relation index);
int GetHnswIndexDimensions(Relation index, IndexInfo *indexInfo);
void CheckHnswIndexDimensions(Relation index, Datum arrayDatum, int deimensions);
void ldb_reindex_external_index(Oid indrelid);
// todo: does this render my check unnecessary
#endif // LDB_HNSW_BUILD_H
4 changes: 2 additions & 2 deletions lantern_hnsw/src/hnsw/options.c
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,9 @@ void _PG_init(void)

DefineCustomBoolVariable("lantern.external_index_secure",
"Use SSL connection when connecting to external index socket",
"Set this to 0 to disable secure connection",
"Set this to false to disable secure connection",
&ldb_external_index_secure,
true,
false,
PGC_USERSET,
0,
NULL,
Expand Down
Loading

0 comments on commit cad8cb3

Please sign in to comment.