Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#205] New custom schema for redis index #258

Merged
merged 8 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 156 additions & 43 deletions hyperon_das_atomdb/adapters/redis_mongo_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import base64
import collections
import itertools
import pickle
import re
import sys
from copy import deepcopy
from enum import Enum
Expand Down Expand Up @@ -179,6 +181,7 @@ def __init__(self, **kwargs: Optional[dict[str, Any]]) -> None:
"""Initialize an instance of a custom class with Redis and MongoDB connections."""
super().__init__()
self.database_name = "das"
self.max_pos_size_custom_index_template = 4

self._setup_databases(**kwargs)

Expand All @@ -188,13 +191,16 @@ def __init__(self, **kwargs: Optional[dict[str, Any]]) -> None:
(MongoCollectionNames.ATOMS, self.mongo_atoms_collection),
(MongoCollectionNames.ATOM_TYPES, self.mongo_types_collection),
]
self.pattern_index_templates: dict[str, list[DocumentT]] | None = None
self.pattern_index_templates: list[dict[str, Any]] | None = None
self.pattern_templates: list | None = None
self.mongo_das_config_collection: Collection | None = None
if MongoCollectionNames.DAS_CONFIG in self.mongo_db.list_collection_names():
self.mongo_das_config_collection = self.mongo_db.get_collection(
MongoCollectionNames.DAS_CONFIG
)
if MongoCollectionNames.DAS_CONFIG not in self.mongo_db.list_collection_names():
self.mongo_db.create_collection(MongoCollectionNames.DAS_CONFIG)

self.mongo_das_config_collection = self.mongo_db.get_collection(
MongoCollectionNames.DAS_CONFIG
)
self._setup_indexes(kwargs)
self.wildcard_hash = ExpressionHasher.compute_hash(WILDCARD)
self.typedef_mark_hash = ExpressionHasher.compute_hash(":")
self.typedef_base_type_hash = ExpressionHasher.compute_hash("Type")
Expand All @@ -217,7 +223,6 @@ def __init__(self, **kwargs: Optional[dict[str, Any]]) -> None:
}
self.mongo_bulk_insertion_limit = 100000
self.max_mongo_db_document_size = 16000000
self._setup_indexes()
logger().info("Database setup finished")

def _setup_databases(self, **kwargs) -> None:
Expand Down Expand Up @@ -377,35 +382,129 @@ def _connection_redis(
else:
return Redis(**redis_connection) # type: ignore

def _setup_indexes(self) -> None:
"""
Set up the default and custom pattern index templates for the database.
def _validate_index_templates(self, templates):
validator = {
"field": r"(named_type|targets\[\d+\])",
"value": r".+",
"positions": r"\[(\d+(,\s*\d+)*)?\]",
"arity": r"[0-9]+",
}
for template in templates:
if not isinstance(template, dict):
raise ValueError("Templates values must be a dict")
for k in template.keys():
if re.search(validator[k], str(template[k])) is None:
raise ValueError(f"Value '{template[k]}' is not supported in '{k}'.")

def _save_pattern_index(self, pattern_index):
self._validate_index_templates(pattern_index)
if self.mongo_das_config_collection is not None:
self.mongo_das_config_collection.replace_one(
{"_id": "pattern_index_templates"},
{"_id": "pattern_index_templates", "templates": pattern_index},
upsert=True,
)
self.pattern_templates = pattern_index

def _load_pattern_index(self, options: dict[str, Any] | None) -> None:
"""
This method initializes the default pattern index templates based on various
combinations of named type and selected positions. If the DAS_CONFIG collection
exists in the MongoDB database, it retrieves the custom pattern index templates
from the collection. Otherwise, it sets the pattern index templates to None.
Additionally, it creates a field index for node names.
combinations of named type and selected positions. It retrieves the custom
pattern index templates from the collection the DAS_CONFIG collection in the
MongoDB database if exists. Otherwise, it sets the default pattern index templates
and save it in the MongoDB database.

Args:
options (dict | None): Dict containing the key 'pattern_index_templates', a list of
templates.

"""
self.default_pattern_index_templates = []
for named_type in [True, False]:
for pos0 in [True, False]:
for pos1 in [True, False]:
for pos2 in [True, False]:
if named_type and pos0 and pos1 and pos2:
# not a pattern but an actual atom
continue
template = {
FieldNames.TYPE_NAME: named_type,
"selected_positions": [
i for i, pos in enumerate([pos0, pos1, pos2]) if pos
],
}
self.default_pattern_index_templates.append(template)
default_templates = [
{"field": "named_type", "value": "*", "positions": [0, 1, 2], "arity": 3}
]
user_templates = None
found = None
if options is not None:
user_templates = options.get("pattern_index_templates", None)

if self.mongo_das_config_collection is not None:
found = self.mongo_das_config_collection.find_one({"_id": "pattern_index_templates"})
self.pattern_index_templates = found.get("templates", None) if found else None

if found is not None:
self.pattern_templates = found.get("templates", None) if found else None
if self.pattern_templates is not None:
if user_templates is not None and user_templates != self.pattern_templates:
raise ValueError(
"'pattern_index_templates' value doesn't match with found on database"
)
else:
self._save_pattern_index(user_templates if user_templates else default_templates)

def _validate_template_index_and_get_position(self, template):
if not isinstance(template["arity"], int) or template["arity"] < 0:
raise ValueError("'arity' must be an integer greater than or equal to zero.")

if len(template["positions"]) > self.max_pos_size_custom_index_template:
raise ValueError(
f"'positions' array should be less than {self.max_pos_size_custom_index_template}."
)

if any(pos >= template["arity"] for pos in template["positions"]):
raise ValueError("'positions' parameter must be in range of the arity.")

if template["field"] != "named_type" and (
found := re.search(r"targets\[(\d+)]", template["field"])
):
target_pos = int(found.group(1))
if target_pos >= template["arity"]:
raise ValueError("'target[]' index must be in range of arity.")
else:
target_pos = None

return target_pos

def _setup_indexes(self, options: dict[str, Any] | None) -> None:
"""
This method reads the template list and generate the index templates.
Additionally, it creates a field index for node names.

Args:
options (dict | None): Dict containing the key 'pattern_index_templates', a list of
templates.
"""
self._load_pattern_index(options)
if not self.pattern_templates:
raise ValueError("Index not loaded")

self.pattern_index_templates = []
for template in self.pattern_templates:
is_named_type = template["field"] == "named_type"
p_size = len(template["positions"])
arity = template["arity"]
i_size = p_size + 1
is_wild_card = template["value"] == "*"
target_pos = self._validate_template_index_and_get_position(template)

values = itertools.product([True, False], repeat=i_size)

for v in values:
if is_wild_card and all(v) and arity == p_size:
continue
if p_size == 0 and not is_wild_card:
continue
if not is_wild_card and not v[0] and is_named_type:
continue
t = {
FieldNames.TYPE_NAME: v[0]
if is_wild_card or not is_named_type
else ExpressionHasher.named_type_hash(template["value"]),
"target_position": target_pos,
"target_value": None if is_named_type else template["value"],
"selected_positions": [
template["positions"][i] for i, pos in enumerate(v[1:]) if pos
],
}
self.pattern_index_templates.append(t)
# NOTE creating index for name search
self.create_field_index("node", fields=["name"])

Expand Down Expand Up @@ -844,7 +943,7 @@ def _get_and_delete_links_by_handles(self, handles: HandleListT) -> list[Documen
@staticmethod
def _apply_index_template(
template: dict[str, Any], named_type: str, targets: HandleListT, arity: int
) -> str:
) -> str | None:
"""
Apply the index template to generate a Redis key.

Expand All @@ -862,10 +961,26 @@ def _apply_index_template(
Returns:
str: The generated Redis key after applying the index template.
"""
key = [WILDCARD] if template[FieldNames.TYPE_NAME] else [named_type]
key = None
if isinstance(template[FieldNames.TYPE_NAME], bool):
key = [WILDCARD] if template[FieldNames.TYPE_NAME] else [named_type]
else:
if named_type == template[FieldNames.TYPE_NAME]:
key = [template[FieldNames.TYPE_NAME]]

if key is None:
return None

target_selected_pos = template["selected_positions"]
for cursor in range(arity):
key.append(WILDCARD if cursor in target_selected_pos else targets[cursor])
if template["target_position"] is not None and len(targets) > template["target_position"]:
if targets[template["target_position"]] == template["target_value"]:
for cursor in range(arity):
key.append(WILDCARD if cursor in target_selected_pos else targets[cursor])
else:
return None
else:
for cursor in range(arity):
key.append(WILDCARD if cursor in target_selected_pos else targets[cursor])
return _build_redis_key(KeyPrefix.PATTERNS, ExpressionHasher.composite_hash(key))

def _retrieve_incoming_set(self, handle: str, **kwargs) -> HandleSetT:
Expand Down Expand Up @@ -1123,14 +1238,9 @@ def _update_link_index(self, document: DocumentT, **kwargs) -> None:
targets: HandleListT = self._get_document_keys(document)
targets_str: str = "".join(targets)
arity: int = len(targets)
named_type: str = document[FieldNames.TYPE_NAME]
named_type_hash: str = document[FieldNames.TYPE_NAME_HASH]

index_templates: list[dict[str, Any]]
if self.pattern_index_templates:
index_templates = self.pattern_index_templates.get(named_type, [])
else:
index_templates = self.default_pattern_index_templates
index_templates = self.pattern_index_templates or []

if kwargs.get("delete_atom", False):
links_handle = self._retrieve_and_delete_incoming_set(handle)
Expand All @@ -1153,7 +1263,8 @@ def _update_link_index(self, document: DocumentT, **kwargs) -> None:

for template in index_templates:
key = self._apply_index_template(template, named_type_hash, targets, arity)
self.redis.srem(key, handle)
if key:
self.redis.srem(key, handle)
else:
incoming_buffer: dict[str, HandleListT] = {}
key = _build_redis_key(KeyPrefix.OUTGOING_SET, handle)
Expand All @@ -1175,7 +1286,8 @@ def _update_link_index(self, document: DocumentT, **kwargs) -> None:

for template in index_templates:
key = self._apply_index_template(template, named_type_hash, targets, arity)
self.redis.sadd(key, handle)
if key:
self.redis.sadd(key, handle)

for handle in incoming_buffer:
key = _build_redis_key(KeyPrefix.INCOMING_SET, handle)
Expand Down Expand Up @@ -1287,8 +1399,9 @@ def _retrieve_documents_by_index(
raise ValueError(f"Index '{index_id}' does not exist in collection '{collection}'")

def reindex(self, pattern_index_templates: dict[str, list[DocumentT]] | None = None) -> None:
if pattern_index_templates is not None:
self.pattern_index_templates = deepcopy(pattern_index_templates)
if isinstance(pattern_index_templates, list):
self._save_pattern_index(deepcopy(pattern_index_templates))
self._setup_indexes({'pattern_index_templates': pattern_index_templates})
self.redis.flushall()
self._update_atom_indexes(self.mongo_atoms_collection.find({}))

Expand Down
Loading