Skip to content

Commit

Permalink
fix: retry with full rebuild on errors
Browse files Browse the repository at this point in the history
report back unchanged datasets
  • Loading branch information
SimonThordal committed Aug 16, 2024
1 parent ab7c7c7 commit 450273c
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 29 deletions.
6 changes: 4 additions & 2 deletions yente/provider/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from asyncio import Semaphore
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
from typing import AsyncIterator

from yente import settings
Expand Down Expand Up @@ -60,6 +60,8 @@ async def search(
"""Search for entities in the index."""
raise NotImplementedError

async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> None:
async def bulk_index(
self, entities: AsyncIterator[Dict[str, Any]]
) -> Tuple[int, int]:
"""Index a list of entities into the search index."""
raise NotImplementedError
25 changes: 19 additions & 6 deletions yente/provider/elastic.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import asyncio
import warnings
from typing import Any, Dict, List, Optional, cast
from typing import Any, Dict, List, Optional, cast, Tuple
from typing import AsyncIterator
from elasticsearch import AsyncElasticsearch, ElasticsearchWarning
from elasticsearch.helpers import async_bulk, BulkIndexError
Expand Down Expand Up @@ -229,15 +229,28 @@ async def search(
)
raise YenteIndexError(f"Could not search index: {ae}") from ae

async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> None:
"""Index a list of entities into the search index."""
async def bulk_index(
self, entities: AsyncIterator[Dict[str, Any]]
) -> Tuple[int, int]:
"""Index a list of entities into the search index.
Args:
entities: An async iterator of entities to index.
Returns:
A tuple of the number of entities indexed and the number of errors.
"""
try:
await async_bulk(
n, errors = await async_bulk(
self.client(),
entities,
chunk_size=1000,
yield_ok=False,
stats_only=True,
raise_on_error=False,
)
errors = cast(List[Any], errors)
for error in errors:
log.error("Bulk index error", error=error)
return n, len(errors)

except BulkIndexError as exc:
raise YenteIndexError(f"Could not index entities: {exc}") from exc
15 changes: 10 additions & 5 deletions yente/provider/opensearch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import asyncio
import logging
from typing import Any, Dict, List, Optional, cast
from typing import Any, Dict, List, Optional, cast, Tuple
from typing import AsyncIterator
from opensearchpy import AsyncOpenSearch, AWSV4SignerAuth
from opensearchpy.helpers import async_bulk, BulkIndexError
Expand Down Expand Up @@ -219,17 +219,22 @@ async def search(
)
raise YenteIndexError(f"Could not search index: {ae}") from ae

async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> None:
async def bulk_index(
self, entities: AsyncIterator[Dict[str, Any]]
) -> Tuple[int, int]:
"""Index a list of entities into the search index."""
try:
await async_bulk(
n, errors = await async_bulk(
self.client,
entities,
chunk_size=1000,
yield_ok=False,
stats_only=True,
max_retries=3,
initial_backoff=2,
raise_on_error=False,
)
errors = cast(List[Any], errors)
for error in errors:
log.error("Bulk index error", error=error)
return n, len(errors)
except BulkIndexError as exc:
raise YenteIndexError(f"Could not index entities: {exc}") from exc
31 changes: 15 additions & 16 deletions yente/search/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,14 @@ async def index_entities(
if not force and await provider.exists_index_alias(alias, next_index):
log.info("Index is up to date.", index=next_index)
return False

# await es.indices.delete(index=next_index)
if updater.is_incremental and not force:
base_index = construct_index_name(dataset.name, updater.base_version)
await provider.clone_index(base_index, next_index)
else:
await provider.create_index(next_index)

try:
docs = iter_entity_docs(updater, next_index)
await provider.bulk_index(docs)
n_changed, _ = await provider.bulk_index(docs)
except (
YenteIndexError,
KeyboardInterrupt,
Expand All @@ -153,27 +150,22 @@ async def index_entities(
asyncio.TimeoutError,
asyncio.CancelledError,
) as exc:
log.exception(
"Indexing error: %r" % exc,
dataset=dataset.name,
index=next_index,
)
aliases = await provider.get_alias_indices(alias)
if next_index not in aliases:
log.warn("Deleting partial index", index=next_index)
await provider.delete_index(next_index)
return False
if not force:
return await index_entities(provider, dataset, force=True)
raise exc

await provider.refresh(index=next_index)
dataset_prefix = construct_index_name(dataset.name)
# FIXME: we're not actually deleting old indexes here any more!
await provider.rollover_index(
alias,
next_index,
prefix=dataset_prefix,
)
log.info("Index is now aliased to: %s" % alias, index=next_index)
return True
return n_changed > 0


async def delete_old_indices(provider: SearchProvider, catalog: Catalog) -> None:
Expand Down Expand Up @@ -205,11 +197,18 @@ async def update_index(force: bool = False) -> bool:
catalog = await get_catalog()
log.info("Index update check")
changed = False
exceptions = []
for dataset in catalog.datasets:
_changed = await index_entities_rate_limit(provider, dataset, force)
changed = changed or _changed

try:
_changed = await index_entities_rate_limit(provider, dataset, force)
changed = changed or _changed
except Exception as exc:
exceptions.append({"exc": exc, "dataset": dataset.name})
await delete_old_indices(provider, catalog)
if len(exceptions) > 0:
for e in exceptions:
log.error(f"{e["dataset"]} update error: {e["exc"]}")
raise Exception("One or more indices failed to update.")
log.info("Index update complete.", changed=changed)
return changed

Expand Down

0 comments on commit 450273c

Please sign in to comment.