diff --git a/yente/provider/base.py b/yente/provider/base.py index d20e2bba..ae1c68e7 100644 --- a/yente/provider/base.py +++ b/yente/provider/base.py @@ -1,5 +1,5 @@ from asyncio import Semaphore -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple from typing import AsyncIterator from yente import settings @@ -60,6 +60,8 @@ async def search( """Search for entities in the index.""" raise NotImplementedError - async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> None: + async def bulk_index( + self, entities: AsyncIterator[Dict[str, Any]] + ) -> Tuple[int, int]: """Index a list of entities into the search index.""" raise NotImplementedError diff --git a/yente/provider/elastic.py b/yente/provider/elastic.py index 93de7b0d..bf7fef6f 100644 --- a/yente/provider/elastic.py +++ b/yente/provider/elastic.py @@ -1,7 +1,7 @@ import json import asyncio import warnings -from typing import Any, Dict, List, Optional, cast +from typing import Any, Dict, List, Optional, cast, Tuple from typing import AsyncIterator from elasticsearch import AsyncElasticsearch, ElasticsearchWarning from elasticsearch.helpers import async_bulk, BulkIndexError @@ -229,15 +229,28 @@ async def search( ) raise YenteIndexError(f"Could not search index: {ae}") from ae - async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> None: - """Index a list of entities into the search index.""" + async def bulk_index( + self, entities: AsyncIterator[Dict[str, Any]] + ) -> Tuple[int, int]: + """Index a list of entities into the search index. + + Args: + entities: An async iterator of entities to index. + + Returns: + A tuple of the number of entities indexed and the number of errors. + """ try: - await async_bulk( + n, errors = await async_bulk( self.client(), entities, chunk_size=1000, - yield_ok=False, - stats_only=True, + raise_on_error=False, ) + errors = cast(List[Any], errors) + for error in errors: + log.error("Bulk index error", error=error) + return n, len(errors) + except BulkIndexError as exc: raise YenteIndexError(f"Could not index entities: {exc}") from exc diff --git a/yente/provider/opensearch.py b/yente/provider/opensearch.py index 6e3d94a8..760ecd7a 100644 --- a/yente/provider/opensearch.py +++ b/yente/provider/opensearch.py @@ -1,7 +1,7 @@ import json import asyncio import logging -from typing import Any, Dict, List, Optional, cast +from typing import Any, Dict, List, Optional, cast, Tuple from typing import AsyncIterator from opensearchpy import AsyncOpenSearch, AWSV4SignerAuth from opensearchpy.helpers import async_bulk, BulkIndexError @@ -219,17 +219,22 @@ async def search( ) raise YenteIndexError(f"Could not search index: {ae}") from ae - async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> None: + async def bulk_index( + self, entities: AsyncIterator[Dict[str, Any]] + ) -> Tuple[int, int]: """Index a list of entities into the search index.""" try: - await async_bulk( + n, errors = await async_bulk( self.client, entities, chunk_size=1000, - yield_ok=False, - stats_only=True, max_retries=3, initial_backoff=2, + raise_on_error=False, ) + errors = cast(List[Any], errors) + for error in errors: + log.error("Bulk index error", error=error) + return n, len(errors) except BulkIndexError as exc: raise YenteIndexError(f"Could not index entities: {exc}") from exc diff --git a/yente/search/indexer.py b/yente/search/indexer.py index 8db6b05c..c204b976 100644 --- a/yente/search/indexer.py +++ b/yente/search/indexer.py @@ -134,17 +134,14 @@ async def index_entities( if not force and await provider.exists_index_alias(alias, next_index): log.info("Index is up to date.", index=next_index) return False - - # await es.indices.delete(index=next_index) if updater.is_incremental and not force: base_index = construct_index_name(dataset.name, updater.base_version) await provider.clone_index(base_index, next_index) else: await provider.create_index(next_index) - try: docs = iter_entity_docs(updater, next_index) - await provider.bulk_index(docs) + n_changed, _ = await provider.bulk_index(docs) except ( YenteIndexError, KeyboardInterrupt, @@ -153,27 +150,22 @@ async def index_entities( asyncio.TimeoutError, asyncio.CancelledError, ) as exc: - log.exception( - "Indexing error: %r" % exc, - dataset=dataset.name, - index=next_index, - ) aliases = await provider.get_alias_indices(alias) if next_index not in aliases: log.warn("Deleting partial index", index=next_index) await provider.delete_index(next_index) - return False + if not force: + return await index_entities(provider, dataset, force=True) + raise exc await provider.refresh(index=next_index) dataset_prefix = construct_index_name(dataset.name) - # FIXME: we're not actually deleting old indexes here any more! await provider.rollover_index( alias, next_index, prefix=dataset_prefix, ) - log.info("Index is now aliased to: %s" % alias, index=next_index) - return True + return n_changed > 0 async def delete_old_indices(provider: SearchProvider, catalog: Catalog) -> None: @@ -205,11 +197,18 @@ async def update_index(force: bool = False) -> bool: catalog = await get_catalog() log.info("Index update check") changed = False + exceptions = [] for dataset in catalog.datasets: - _changed = await index_entities_rate_limit(provider, dataset, force) - changed = changed or _changed - + try: + _changed = await index_entities_rate_limit(provider, dataset, force) + changed = changed or _changed + except Exception as exc: + exceptions.append({"exc": exc, "dataset": dataset.name}) await delete_old_indices(provider, catalog) + if len(exceptions) > 0: + for e in exceptions: + log.error(f"{e["dataset"]} update error: {e["exc"]}") + raise Exception("One or more indices failed to update.") log.info("Index update complete.", changed=changed) return changed