Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling edge cases for bulk indexing lambda and added tests #728

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
get_s3_file,
get_secret_data,
)
from ..text_extraction import add_text_content
from ..text_extraction import TextExtractionStatus, add_text_content

logger = logging.getLogger()
logger.setLevel(logging.INFO)


class ConsignmentBulkIndexError(Exception):
pass


def bulk_index_consignment_from_aws(
consignment_reference: str, secret_id: str
) -> None:
Expand Down Expand Up @@ -79,13 +83,51 @@ def bulk_index_consignment(
"""
files = _fetch_files_in_consignment(consignment_reference, database_url)
documents_to_index = _construct_documents(files, bucket_name)
bulk_index_files_in_opensearch(
documents_to_index,
open_search_host_url,
open_search_http_auth,
open_search_bulk_index_timeout,
open_search_ca_certs,
)

document_text_extraction_exceptions_message = ""
for doc in documents_to_index:
if doc["document"]["text_extraction_status"] not in [
TextExtractionStatus.SKIPPED.value,
TextExtractionStatus.SUCCEEDED.value,
]:
if document_text_extraction_exceptions_message == "":
document_text_extraction_exceptions_message += (
"Text extraction failed on the following documents:"
)
document_text_extraction_exceptions_message += f"\n{doc['file_id']}"

bulk_indexing_exception_message = ""
try:
bulk_index_files_in_opensearch(
documents_to_index,
open_search_host_url,
open_search_http_auth,
open_search_bulk_index_timeout,
open_search_ca_certs,
)
except Exception as bulk_indexing_exception:
bulk_indexing_exception_message = bulk_indexing_exception.text
logger.error("Bulk indexing of files resulted in some errors")

# Combine and raise all errors from failed attempts to extract text or index documents
if (
document_text_extraction_exceptions_message
or bulk_indexing_exception_message
):
consignment_bulk_index_error_message = (
"The following errors occurred when attempting to "
f"bulk index consignment reference: {consignment_reference}"
)
if document_text_extraction_exceptions_message:
consignment_bulk_index_error_message += (
f"\n{document_text_extraction_exceptions_message}"
)
if bulk_indexing_exception_message:
consignment_bulk_index_error_message += (
f"\n{bulk_indexing_exception_message}"
)

raise ConsignmentBulkIndexError(consignment_bulk_index_error_message)


def _construct_documents(files: List[Dict], bucket_name: str) -> List[Dict]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import logging
import tempfile
from enum import Enum
from typing import Dict

import textract

logger = logging.getLogger()
logger.setLevel(logging.INFO)


class TextExtractionStatus(Enum):
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
SKIPPED = "SKIPPED"


SUPPORTED_TEXTRACT_FORMATS = [
"csv",
"doc",
Expand Down Expand Up @@ -45,18 +53,20 @@ def add_text_content(file: Dict, file_stream: bytes) -> Dict:
f"Text extraction skipped for unsupported file type: {file_type}"
)
file["content"] = ""
file["text_extraction_status"] = "n/a"
file["text_extraction_status"] = TextExtractionStatus.SKIPPED.value
else:
try:
file["content"] = extract_text(file_stream, file_type)
logger.info(f"Text extraction succeeded for file {file['file_id']}")
file["text_extraction_status"] = "success"
file["text_extraction_status"] = (
TextExtractionStatus.SUCCEEDED.value
)
except Exception as e:
logger.error(
f"Text extraction failed for file {file['file_id']}: {e}"
)
file["content"] = ""
file["text_extraction_status"] = "failed"
file["text_extraction_status"] = TextExtractionStatus.FAILED.value

return file

Expand Down
2 changes: 2 additions & 0 deletions data_management/opensearch_indexer/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ requests-aws4auth==1.3.1
SQLAlchemy==2.0.32
pg8000==1.31.2
textract==1.6.5
testing-postgresql==1.3.0
psycopg2==2.9.10
98 changes: 98 additions & 0 deletions data_management/opensearch_indexer/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import tempfile

import pytest
from sqlalchemy import (
Boolean,
Column,
DateTime,
ForeignKey,
String,
Text,
create_engine,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import declarative_base, relationship
from testing.postgresql import PostgresqlFactory

Base = declarative_base()


class Body(Base):
__tablename__ = "Body"
BodyId = Column(UUID(as_uuid=True), primary_key=True)
Name = Column(Text)
Description = Column(Text)


class Series(Base):
__tablename__ = "Series"
SeriesId = Column(UUID(as_uuid=True), primary_key=True)
BodyId = Column(UUID(as_uuid=True), ForeignKey("Body.BodyId"))
Name = Column(Text)
Description = Column(Text)
body = relationship("Body", foreign_keys="Series.BodyId")


class Consignment(Base):
__tablename__ = "Consignment"
ConsignmentId = Column(UUID(as_uuid=True), primary_key=True)
SeriesId = Column(UUID(as_uuid=True), ForeignKey("Series.SeriesId"))
BodyId = Column(UUID(as_uuid=True), ForeignKey("Body.BodyId"))
ConsignmentReference = Column(Text)
ConsignmentType = Column(String, nullable=False)
IncludeTopLevelFolder = Column(Boolean)
ContactName = Column(Text)
ContactEmail = Column(Text)
TransferStartDatetime = Column(DateTime)
TransferCompleteDatetime = Column(DateTime)
ExportDatetime = Column(DateTime)
CreatedDatetime = Column(DateTime)
series = relationship("Series", foreign_keys="Consignment.SeriesId")


class File(Base):
__tablename__ = "File"
FileId = Column(UUID(as_uuid=True), primary_key=True)
ConsignmentId = Column(
UUID(as_uuid=True), ForeignKey("Consignment.ConsignmentId")
)
FileReference = Column(Text, nullable=False)
FileType = Column(Text, nullable=False)
FileName = Column(Text, nullable=False)
FilePath = Column(Text, nullable=False)
CiteableReference = Column(Text)
Checksum = Column(Text)
CreatedDatetime = Column(DateTime)
consignment = relationship("Consignment", foreign_keys="File.ConsignmentId")


class FileMetadata(Base):
__tablename__ = "FileMetadata"
MetadataId = Column(UUID(as_uuid=True), primary_key=True)
FileId = Column(UUID(as_uuid=True), ForeignKey("File.FileId"))
PropertyName = Column(Text, nullable=False)
Value = Column(Text)
CreatedDatetime = Column(DateTime)
file = relationship("File", foreign_keys="FileMetadata.FileId")


@pytest.fixture()
def temp_db():
temp_db_file = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
temp_db_file.close()
database_url = f"sqlite:///{temp_db_file.name}"
engine = create_engine(database_url)
Base.metadata.create_all(engine)
return engine


@pytest.fixture(scope="session")
def database(request):
# Launch new PostgreSQL server
postgresql = PostgresqlFactory(cache_initialized_db=True)()
yield postgresql

# PostgreSQL server is terminated here
@request.addfinalizer
def drop_database():
postgresql.stop()
Loading
Loading