From bd99a08fb43d72258bc5adbba1cac2a8f3055180 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Tue, 26 Nov 2024 15:10:47 -0800 Subject: [PATCH] Implement reindexer sweeper (#149) * [skeleton] implement reindexer sweeper * update metadata flag to use date instead of boolean * [revert me] add test init code * implement reindexer sweeper, with types resolved from the *-dd index, or as "keyword" if not present there Manual tests against docker registry: - logged counts are correct - missing mappings are added - types of missing mappings change according to the resolved typename - metadata updates are written to db - metadata updates are sufficient to trigger re-index, causing previously-unsearchable properties to become searchable. - presence of metadata attribute excludes document from document set on subsequent runs * implement harvest-time filter to ensure that products harvested mid-sweep do not erroneously get flagged as processed * implement logging of problematic harvest timestamp span and harvest software versions * clean up code * remove test code * improve comment * add mypy ignores - None-guard is provided by conditionals * add registry-dd to allowed index types for resolve_multitenant_index_name() * ensure reindexer sweeper captures all relevant documents with a single sweep * improve logging * implement batching approach in reindexer sweeper * squash! implement batching approach in reindexer sweeper * [weeeird bugfix] Patch apparent issues when paginating. See comments * map special-case properties onto their types and incorporate them into the resolution logic * implement stall while update indexing queue is backed up * disable noisy log * tweak stall time/log * make reindexer hits count more human-friendly in logs * clean up logging * exclude ops:Provenance* properties from canonical_type_undefined_property_names * bump hits_stall_tolerance from 5% to 10% of batch_size_limit this should prevent unintended continuous looping * fix stall logic * fix format_hits_count() * change type hint to indicate that consumable iterators are not appropriate this is because the retry would pass the consumed iterator to subsequent calls * Incorporate detection/log/retry of HTTP429 (circuit-breaking throttle) * remove manual stall logic * disable default typing, per jpadams * re-enable generation of updates for docs having properties not in mappings protection against race condition is provided by harvest-time constraint to LT sweeper execution timestamp * support all ISO-formatted harvest timestamp strings dateutil is the official third-party library for parsing * correct erroneous log message * bugfix edge-cases * demote noisy log * deduplicate missing sweepers property logs * remove cruft * flesh out static type mappings * fix infinite loop when there are fewer hits than a full batch * comment out log_filepath * lint * add explanation --- docker/sweepers_driver.py | 7 +- setup.cfg | 1 + .../registrysweepers/ancestry/generation.py | 2 +- .../registrysweepers/reindexer/__init__.py | 0 .../registrysweepers/reindexer/constants.py | 1 + src/pds/registrysweepers/reindexer/main.py | 383 ++++++++++++++++++ src/pds/registrysweepers/utils/db/__init__.py | 36 +- src/pds/registrysweepers/utils/db/indexing.py | 6 +- .../registrysweepers/utils/db/multitenancy.py | 2 +- 9 files changed, 432 insertions(+), 6 deletions(-) create mode 100644 src/pds/registrysweepers/reindexer/__init__.py create mode 100644 src/pds/registrysweepers/reindexer/constants.py create mode 100644 src/pds/registrysweepers/reindexer/main.py diff --git a/docker/sweepers_driver.py b/docker/sweepers_driver.py index c15e165..c8d288f 100755 --- a/docker/sweepers_driver.py +++ b/docker/sweepers_driver.py @@ -64,6 +64,7 @@ from typing import Callable from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync +from pds.registrysweepers.reindexer import main as reindexer from pds.registrysweepers.utils import configure_logging, parse_log_level from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since @@ -85,7 +86,8 @@ def run_factory(sweeper_f: Callable) -> Callable: return functools.partial( sweeper_f, client=get_opensearch_client_from_environment(verify_certs=True if not dev_mode else False), - log_filepath='registry-sweepers.log', + # enable for development if required - not necessary in production + # log_filepath='registry-sweepers.log', log_level=log_level ) @@ -107,7 +109,8 @@ def run_factory(sweeper_f: Callable) -> Callable: sweepers = [ repairkit.run, provenance.run, - ancestry.run + ancestry.run, + reindexer.run ] for option, sweeper in optional_sweepers.items(): diff --git a/setup.cfg b/setup.cfg index 5aa69aa..e48ba80 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,6 +38,7 @@ install_requires = botocore~=1.34.91 botocore-stubs~=1.34.94 requests-aws4auth~=1.2.3 + python-dateutil~=2.9.0 # Change this to False if you use things like __file__ or __path__—which you # shouldn't use anyway, because that's what ``pkg_resources`` is for 🙂 diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index 3a49722..14c4cf3 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -187,7 +187,7 @@ def generate_nonaggregate_and_collection_records_iteratively( for lid, collections_records_for_lid in collection_records_by_lid.items(): if all([record.skip_write for record in collections_records_for_lid]): - log.info(f"Skipping updates for up-to-date collection family: {str(lid)}") + log.debug(f"Skipping updates for up-to-date collection family: {str(lid)}") continue else: log.info( diff --git a/src/pds/registrysweepers/reindexer/__init__.py b/src/pds/registrysweepers/reindexer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pds/registrysweepers/reindexer/constants.py b/src/pds/registrysweepers/reindexer/constants.py new file mode 100644 index 0000000..ac8200e --- /dev/null +++ b/src/pds/registrysweepers/reindexer/constants.py @@ -0,0 +1 @@ +REINDEXER_FLAG_METADATA_KEY = "ops:Provenance/ops:reindexed_at" diff --git a/src/pds/registrysweepers/reindexer/main.py b/src/pds/registrysweepers/reindexer/main.py new file mode 100644 index 0000000..0231f4c --- /dev/null +++ b/src/pds/registrysweepers/reindexer/main.py @@ -0,0 +1,383 @@ +import logging +import math +from datetime import datetime +from datetime import timedelta +from datetime import timezone +from time import sleep +from typing import Collection +from typing import Dict +from typing import Iterable +from typing import Union + +import dateutil.parser +from opensearchpy import OpenSearch +from pds.registrysweepers.reindexer.constants import REINDEXER_FLAG_METADATA_KEY +from pds.registrysweepers.utils import configure_logging +from pds.registrysweepers.utils import parse_args +from pds.registrysweepers.utils.db import get_query_hits_count +from pds.registrysweepers.utils.db import query_registry_db_with_search_after +from pds.registrysweepers.utils.db import write_updated_docs +from pds.registrysweepers.utils.db.client import get_userpass_opensearch_client +from pds.registrysweepers.utils.db.indexing import ensure_index_mapping +from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name +from pds.registrysweepers.utils.db.update import Update +from tqdm import tqdm + +log = logging.getLogger(__name__) + + +def get_docs_query(filter_to_harvested_before: datetime): + """ + Return a query to get all docs which haven't been reindexed by this sweeper and which haven't been harvested + since this sweeper process instance started running + + i.e. + - Query all documents + - Exclude anything which has already been processed, to avoid redundant reprocessing + - Exclude anything which was harvested in the middle of this sweeper running, since this can cause erroneous results + due to inconsistency in the document set across query calls which are expected to be identical. + """ + # TODO: Remove this once query_registry_db_with_search_after is modified to remove mutation side-effects + return { + "query": { + "bool": { + "must_not": [{"exists": {"field": REINDEXER_FLAG_METADATA_KEY}}], + "must": { + "range": { + "ops:Harvest_Info/ops:harvest_date_time": { + "lt": filter_to_harvested_before.astimezone(timezone.utc).isoformat() + } + } + }, + } + } + } + + +def fetch_dd_field_types(client: OpenSearch) -> Dict[str, str]: + dd_index_name = resolve_multitenant_index_name("registry-dd") + name_key = "es_field_name" + type_key = "es_data_type" + dd_docs = query_registry_db_with_search_after( + client, + dd_index_name, + _source={"includes": ["es_field_name", "es_data_type"]}, + query={"query": {"match_all": {}}}, + sort_fields=[name_key], + ) + doc_sources = iter(doc["_source"] for doc in dd_docs) + dd_types = { + source[name_key]: source[type_key] for source in doc_sources if name_key in source and type_key in source + } + return dd_types + + +def get_mapping_field_types_by_field_name(client: OpenSearch, index_name: str) -> Dict[str, str]: + return { + k: v["type"] for k, v in client.indices.get_mapping(index_name)[index_name]["mappings"]["properties"].items() + } + + +def accumulate_missing_mappings( + dd_field_types_by_name: Dict[str, str], mapping_field_types_by_field_name: Dict[str, str], docs: Iterable[dict] +) -> Dict[str, str]: + """ + Iterate over all properties of all docs, test whether they are present in the given set of mapping keys, and + return a mapping of the missing properties onto their types. + @param dd_field_types_by_name: a mapping of document property names onto their types, derived from the data-dictionary db data + @param mapping_field_types_by_field_name: a mapping of document property names onto their types, derived from the existing index mappings + @param docs: an iterable collection of product documents + """ + + # Static mappings for fields not defined in the data dictionaries + # NoneType indicates that the property is to be excluded. + # Anything with prefix 'ops:Provenance' is excluded, as these properties are the responsibility of their + # respective sweepers. + special_case_property_types_by_name = { + "@timestamp": None, + "@version": None, + "_package_id": None, + "description": "text", + "lid": "keyword", + "lidvid": "keyword", + "ops:Harvest_Info/ops:harvest_date_time": "date", + "ops:Label_File_Info/ops:json_blob": None, + "product_class": "keyword", + "ref_lid_associate": "keyword", + "ref_lid_collection": "keyword", + "ref_lid_collection_secondary": "keyword", + "ref_lid_data": "keyword", + "ref_lid_document": "keyword", + "ref_lid_facility": "keyword", + "ref_lid_instrument": "keyword", + "ref_lid_instrument_host": "keyword", + "ref_lid_investigation": "keyword", + "ref_lid_target": "keyword", + "ref_lid_telescope": "keyword", + "title": "text", + # 'vid' # TODO: need to determine what this should be, as keyword lexical(?) sorting will be a problem + } + + missing_mapping_updates: Dict[str, str] = {} + + canonical_type_undefined_property_names = set() # used to prevent duplicate WARN logs + bad_mapping_property_names = set() # used to log mappings requiring manual attention + sweepers_missing_property_names = set() + + earliest_problem_doc_harvested_at = None + latest_problem_doc_harvested_at = None + problematic_harvest_versions = set() + problem_docs_count = 0 + total_docs_count = 0 + for doc in docs: + problem_detected_in_document_already = False + total_docs_count += 1 + + for property_name, value in doc["_source"].items(): + # Resolve canonical type from data dictionary or - failing that - from the hardcoded types + canonical_type = dd_field_types_by_name.get(property_name) or special_case_property_types_by_name.get( + property_name + ) + current_mapping_type = mapping_field_types_by_field_name.get(property_name) + + mapping_missing = property_name not in mapping_field_types_by_field_name + canonical_type_is_defined = canonical_type is not None + mapping_is_bad = ( + canonical_type != current_mapping_type + and canonical_type is not None + and current_mapping_type is not None + ) + + if ( + not canonical_type_is_defined + and property_name not in special_case_property_types_by_name + and not property_name.startswith("ops:Provenance") + and property_name not in canonical_type_undefined_property_names + ): + log.warning( + f"Property {property_name} does not have an entry in the data dictionary index or hardcoded mappings - this may indicate a problem" + ) + canonical_type_undefined_property_names.add(property_name) + + if mapping_is_bad and property_name not in bad_mapping_property_names: + log.warning( + f'Property {property_name} is defined in data dictionary index or hardcoded mappings as type "{canonical_type}" but exists in index mapping as type "{current_mapping_type}")' + ) + bad_mapping_property_names.add(property_name) + + if (mapping_missing or mapping_is_bad) and not problem_detected_in_document_already: + problem_detected_in_document_already = True + problem_docs_count += 1 + attr_value = doc["_source"].get("ops:Harvest_Info/ops:harvest_date_time", None) + try: + doc_harvest_time = dateutil.parser.isoparse(attr_value[0]).astimezone(timezone.utc) + + earliest_problem_doc_harvested_at = min( + doc_harvest_time, earliest_problem_doc_harvested_at or doc_harvest_time + ) + latest_problem_doc_harvested_at = max( + doc_harvest_time, latest_problem_doc_harvested_at or doc_harvest_time + ) + except (KeyError, ValueError) as err: + log.warning( + f'Unable to parse first element of "ops:Harvest_Info/ops:harvest_date_time" as ISO-formatted date from document {doc["_id"]}: {attr_value} ({err})' + ) + + try: + problematic_harvest_versions.update(doc["_source"]["ops:Harvest_Info/ops:harvest_version"]) + except KeyError as err: + # Noisy log temporarily disabled but may be re-enabled at jpadams' discretion + # log.warning(f'Unable to extract harvest version from document {doc["_id"]}: {err}') + pass + + if mapping_missing and property_name not in missing_mapping_updates: + if canonical_type_is_defined: + log.info( + f'Property {property_name} will be updated to type "{canonical_type}" from data dictionary' + ) + missing_mapping_updates[property_name] = canonical_type # type: ignore + elif property_name.startswith( + "ops:Provenance" + ): # TODO: extract this to a constant, used by all metadata key definitions + # mappings for registry-sweepers are the responsibility of their respective sweepers and should not + # be touched by the reindexer sweeper + if property_name not in sweepers_missing_property_names: + log.warning( + f"Property {property_name} is missing from the index mapping, but is a sweepers metadata attribute and will not be fixed here. Please run the full set of sweepers on this index" + ) + sweepers_missing_property_names.add(property_name) + else: + # if there is no canonical type and it is not a provenance metadata key, do nothing, per jpadams + pass + + log.info( + f"RESULT: Detected {format_hits_count(problem_docs_count)} docs with {len(missing_mapping_updates)} missing mappings and {len(bad_mapping_property_names)} mappings conflicting with the DD, out of a total of {format_hits_count(total_docs_count)} docs" + ) + + if problem_docs_count > 0: + log.warning( + f"RESULT: Problems were detected with docs having harvest timestamps between {earliest_problem_doc_harvested_at.isoformat()} and {latest_problem_doc_harvested_at.isoformat()}" # type: ignore + ) + log.warning( + f"RESULT: Problems were detected with docs having harvest versions {sorted(problematic_harvest_versions)}" + ) + + if len(missing_mapping_updates) > 0: + log.info( + f"RESULT: Mappings will be added for the following properties: {sorted(missing_mapping_updates.keys())}" + ) + + if len(canonical_type_undefined_property_names) > 0: + log.info( + f"RESULT: Mappings were not found in the DD or static types for the following properties: {sorted(canonical_type_undefined_property_names)}" + ) + + if len(bad_mapping_property_names) > 0: + log.error( + f"RESULT: The following mappings have a type which does not match the type described by the data dictionary: {bad_mapping_property_names} - in-place update is not possible, data will need to be manually reindexed with manual updates (or that functionality must be added to this sweeper" + ) + + return missing_mapping_updates + + +def generate_updates( + timestamp: datetime, extant_mapping_keys: Collection[str], docs: Iterable[Dict] +) -> Iterable[Update]: + for document in docs: + id = document["_id"] + extant_mapping_keys = set(extant_mapping_keys) + document_field_names = set(document["_source"].keys()) + document_fields_missing_from_mappings = document_field_names.difference(extant_mapping_keys) + if len(document_fields_missing_from_mappings) > 0: + logging.debug( + f"Missing mappings {document_fields_missing_from_mappings} detected when attempting to create Update for doc with id {id} - skipping" + ) + + yield Update(id=id, content={REINDEXER_FLAG_METADATA_KEY: timestamp.isoformat()}) + + +def format_hits_count(count: int) -> str: + """Format hits count in a more human-friendly manner for logs""" + if count < 1e4: + return str(count) + elif count < 1e5: + adjusted_count = count / 1e3 + return "{:,.1f}K".format(adjusted_count) + elif count < 1e6: + adjusted_count = count / 1e3 + return "{:,.0f}K".format(adjusted_count) + else: + adjusted_count = count / 1e6 + return "{:,.2f}M".format(adjusted_count) + + +def run( + client: OpenSearch, + log_filepath: Union[str, None] = None, + log_level: int = logging.INFO, +): + configure_logging(filepath=log_filepath, log_level=log_level) + + sweeper_start_timestamp = datetime.now() + products_index_name = resolve_multitenant_index_name("registry") + ensure_index_mapping(client, products_index_name, REINDEXER_FLAG_METADATA_KEY, "date") + + dd_field_types_by_field_name = fetch_dd_field_types(client) + + def get_updated_hits_count(): + return get_query_hits_count(client, products_index_name, get_docs_query(sweeper_start_timestamp)) + + # AOSS was becoming overloaded during iteration while accumulating missing mappings on populous nodes, so it is + # necessary to impose a limit for how many products are iterated over before a batch of updates is created and + # written. This allows incremental progress to be made and limits the amount of work discarded in the event of an + # overload condition. + # Using the harvest timestamp as a sort field acts as a soft guarantee of consistency of query results between the + # searches performed during accumulate_missing_mappings() and generate_updates(), and then a final check is applied + # within generate_updates() to ensure that the second stage (update generation) hasn't picked up any products which + # weren't processed in the first stage (missing mapping accumulation) + batch_size_limit = 100000 + sort_fields = ["ops:Harvest_Info/ops:harvest_date_time"] + total_outstanding_doc_count = get_updated_hits_count() + + with tqdm( + total=total_outstanding_doc_count, + desc="Reindexer sweeper progress", + ) as pbar: + current_batch_size = min(batch_size_limit, total_outstanding_doc_count) + final_batch_is_processed = False + while not final_batch_is_processed: + mapping_field_types_by_field_name = get_mapping_field_types_by_field_name(client, products_index_name) + + missing_mappings = accumulate_missing_mappings( + dd_field_types_by_field_name, + mapping_field_types_by_field_name, + query_registry_db_with_search_after( + client, + products_index_name, + _source={}, + query=get_docs_query(sweeper_start_timestamp), + limit=batch_size_limit, + sort_fields=sort_fields, + ), + ) + for property, mapping_typename in missing_mappings.items(): + log.info(f"Updating index {products_index_name} with missing mapping ({property}, {mapping_typename})") + ensure_index_mapping(client, products_index_name, property, mapping_typename) + + updated_mapping_keys = get_mapping_field_types_by_field_name(client, products_index_name).keys() + updates = generate_updates( + sweeper_start_timestamp, + updated_mapping_keys, + query_registry_db_with_search_after( + client, + products_index_name, + _source={}, + query=get_docs_query(sweeper_start_timestamp), + limit=batch_size_limit, + sort_fields=sort_fields, + ), + ) + log.info( + f"Updating newly-processed documents with {REINDEXER_FLAG_METADATA_KEY}={sweeper_start_timestamp.isoformat()}..." + ) + write_updated_docs( + client, + updates, + index_name=products_index_name, + ) + + # If the current batch isn't a full page, it must be the last page and all updates are pending. + # Terminate loop on this basis to avoid lots of redundant updates. + final_batch_is_processed = current_batch_size < batch_size_limit + pbar.update(current_batch_size) + + # Update batch size for next page of hits + current_batch_size = min(batch_size_limit, get_updated_hits_count()) + + log.info("Completed reindexer sweeper processing!") + + +if __name__ == "__main__": + cli_description = f""" + Tests untested documents in registry index to ensure that all properties are present in the index mapping (i.e. that + they are searchable). Mapping types are derived from <<>> + + When a document is tested, metadata attribute {REINDEXER_FLAG_METADATA_KEY} is given a value equal to the timestamp + at sweeper runtime. The presence of attribute {REINDEXER_FLAG_METADATA_KEY} indicates that the document has been + tested and may be skipped in future. + + Writing a new value to this attribute triggers a re-index of the entire document, ensuring that the document is + fully-searchable. + + """ + + args = parse_args(description=cli_description) + client = get_userpass_opensearch_client( + endpoint_url=args.base_URL, username=args.username, password=args.password, verify_certs=not args.insecure + ) + + run( + client=client, + log_level=args.log_level, + log_filepath=args.log_file, + ) diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 9f58f9e..521dd99 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -162,6 +162,18 @@ def query_registry_db_with_search_after( with tqdm(total=expected_hits, desc=f"Query {query_id}") as pbar: while more_data_exists: + # Manually set sort - this is required for subsequent calls, despite being passed in fetch_func's call to + # client.search as sort kwarg. + # It is unclear why this issue is only presenting now - edunn 20241023 + # It appears that OpenSearch.search() sort kwarg behaves inconsistently if the values contain certain + # characters. It is unclear which of /: is the issue but it is suggested that :-+^ may be problematic - edunn 20241105 + # Related: https://discuss.elastic.co/t/query-a-field-that-has-a-colon/323966 + # https://discuss.elastic.co/t/problem-with-colon-in-fieldname-where-can-i-find-naming-guidelines/5437/4 + # https://discuss.elastic.co/t/revisiting-colons-in-field-names/25005 + # TODO: investigate and open ticket with opensearch-py if confirmed + special_characters = {"/", ":"} + query["sort"] = [f for f in sort_fields if any(c in f for c in special_characters)] + if search_after_values is not None: query["search_after"] = search_after_values log.debug( @@ -205,6 +217,21 @@ def fetch_func(): # simpler to set the value after every hit than worry about OBO errors detecting the last hit in the page search_after_values = [hit["_source"].get(field) for field in sort_fields] + # Flatten single-element search-after-values. Attempting to sort/search-after on MCP AOSS by + # ops:Harvest_Info/ops:harvest_date_time is throwing + # RequestError(400, 'parsing_exception', 'Expected [VALUE_STRING] or [VALUE_NUMBER] or + # [VALUE_BOOLEAN] or [VALUE_NULL] but found [START_ARRAY] inside search_after.') + # It is unclear why this issue is only presenting now - edunn 20241023 + if search_after_values is not None: + for idx, value in enumerate(search_after_values): + if isinstance(value, list): + if len(value) == 1: + search_after_values[idx] = value[0] + else: + raise ValueError( + f"Failed to flatten array-like search-after value {value} into single element" + ) + # This is a temporary, ad-hoc guard against empty/erroneous responses which do not return non-200 status codes. # Previously, this has cause infinite loops in production due to served_hits sticking and never reaching the # expected total hits value. @@ -306,7 +333,7 @@ def update_as_statements(update: Update) -> Iterable[str]: @retry(tries=6, delay=15, backoff=2, logger=log) -def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: Iterable[str]): +def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: List[str]): bulk_data = "\n".join(bulk_updates) + "\n" request_timeout = 180 @@ -315,6 +342,13 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: if response_content.get("errors"): warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour items_with_problems = [item for item in response_content["items"] if "error" in item["update"]] + if any( + item["update"]["status"] == 429 and item["update"]["error"]["type"] == "circuit_breaking_exception" + for item in items_with_problems + ): + raise RuntimeWarning( + "Bulk updates response includes item with status HTTP429, circuit_breaking_exception/throttled - chunk will need to be resubmitted" + ) def get_ids_list_str(ids: List[str]) -> str: max_display_ids = 50 diff --git a/src/pds/registrysweepers/utils/db/indexing.py b/src/pds/registrysweepers/utils/db/indexing.py index ba305dc..cef411c 100644 --- a/src/pds/registrysweepers/utils/db/indexing.py +++ b/src/pds/registrysweepers/utils/db/indexing.py @@ -2,5 +2,9 @@ def ensure_index_mapping(client: OpenSearch, index_name: str, property_name: str, property_type: str): - """Provides an easy-to-use wrapper for ensuring the presence of a given property name/type in a given index""" + """ + Provides an easy-to-use wrapper for ensuring the presence of a given property name/type in a given index. + N.B. This cannot change the type of a mapping, as modification/deletion is impossible in ES/OS. If the mapping + already exists, matching type or not, the function will gracefully fail and log an HTTP400 error. + """ client.indices.put_mapping(index=index_name, body={"properties": {property_name: {"type": property_type}}}) diff --git a/src/pds/registrysweepers/utils/db/multitenancy.py b/src/pds/registrysweepers/utils/db/multitenancy.py index f343b0b..90bd454 100644 --- a/src/pds/registrysweepers/utils/db/multitenancy.py +++ b/src/pds/registrysweepers/utils/db/multitenancy.py @@ -2,7 +2,7 @@ def resolve_multitenant_index_name(index_type: str): - supported_index_types = {"registry", "registry-refs"} + supported_index_types = {"registry", "registry-refs", "registry-dd"} node_id = os.environ.get("MULTITENANCY_NODE_ID", "").strip(" ") if node_id == "":