diff --git a/requirements/requirements.txt b/requirements/requirements.txt index e02c401d..dffc6f02 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -143,6 +143,7 @@ pathy==0.10.1 # via spacy pdftotext==2.2.2 # via sec-certs (./../pyproject.toml) +pymupdf==1.23.4 pexpect==4.8.0 # via ipython pickleshare==0.7.5 diff --git a/src/sec_certs/utils/ocr.py b/src/sec_certs/utils/ocr.py new file mode 100644 index 00000000..010e114e --- /dev/null +++ b/src/sec_certs/utils/ocr.py @@ -0,0 +1,175 @@ +import abc +import glob +import logging +import os +import subprocess +from io import BytesIO +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import Any, Dict, Optional + +import fitz +import pytesseract +from PIL import Image + +INVALID_PYMUPDF_CHARACTER = chr(0xFFFD) + +class OCREngineBase(abc.ABC): + """Abstract base class for all OCR engines.""" + + @abc.abstractmethod + def extract_text(self, image): + """Extract text from the given image path using the OCR engine.""" + pass + +class TesseractOCREngine(OCREngineBase): + """ + Implementation of the OCR engine using Tesseract. + + Args: + lang (str): Languages to be used by Tesseract for OCR. + tesseract_cmd (str): Path to the Tesseract command. + """ + + def __init__(self, lang: str="eng+deu+fr", tesseract_cmd: str="/var/tmp/xmacko1/master_thesis/code/tesseract/AppRun") -> None: + self._lang = lang + if tesseract_cmd: + pytesseract.pytesseract.tesseract_cmd = tesseract_cmd + + def extract_text(self, image) -> str: + """Extract text from the given image using Tesseract.""" + return pytesseract.image_to_string(image, lang=self._lang) + +class EasyOCROCREngine(OCREngineBase): + pass + +class PaddleOCREngine(OCREngineBase): + pass + +class TrOCREngine(OCREngineBase): + pass + +def build_ocr_engine(engine_choice: str, engine_kwargs: Dict[str, str]={}) -> OCREngineBase: + """ + Factory method to build and return an instance of the desired OCR engine. + + Args: + engine_choice (str): Name of the desired OCR engine. + engine_kwargs (dict): Additional arguments for the OCR engine. + + Returns: + An instance of the chosen OCR engine. + """ + + if engine_choice == "TesseractOCR": + os.environ["OMP_THREAD_LIMIT"] = "1" # to not parallelize inside one tesseract process + return TesseractOCREngine(**engine_kwargs) + if engine_choice == "PaddleOCR": + raise Exception("OCR NOT IMPLEMENTED") + if engine_choice == "EasyOCR": + raise Exception("OCR NOT IMPLEMENTED") + if engine_choice == "TrOCR": + raise Exception("OCR NOT IMPLEMENTED") + raise Exception("Unknown OCR Engine") + +def ocr_segment(page, old_text: Optional[str], bbox, ocr_engine: OCREngineBase) -> str: + """ + Perform OCR on a particular segment of a page. + + Args: + page: The PDF page containing the segment. + old_text (str): The previous text from the segment. + bbox: Bounding box of the segment. + ocr_engine: The OCR engine to use. + + Returns: + The extracted text from the segment. + """ + logging.debug("Performing OCR on a segment of the page.") + pix = page.get_pixmap( + colorspace=fitz.csGRAY, # we need no color + matrix=fitz.Matrix(5, 5), + clip=bbox, + ) + if old_text is None: + old_text = "" + image_data = pix.tobytes("png") + image = Image.open(BytesIO(image_data)) + new_text = ocr_engine.extract_text(image) + left_spaces = " " * (len(old_text) - len(old_text.lstrip())) + right_spaces = " " * (len(old_text) - len(old_text.rstrip())) + + return left_spaces + new_text + right_spaces + + +def ocr_segments_with_garbage_text(page: fitz.Page, page_content: Dict[str, Any], ocr_engine: OCREngineBase) -> int: + """ + Perform OCR on segments of a page that have text which couldn't be read properly. + + Args: + page: The PDF page. + page_content (dict): The content extracted from the page. + ocr_engine: The OCR engine to use. + """ + ocr_count = 0 + for block in page_content["blocks"]: + if block["type"] == "figure": + continue + if block["type"] == "text": + for line in block["lines"]: + for span in line["spans"]: + text, bbox = span["text"], span["bbox"] + if text is None or INVALID_PYMUPDF_CHARACTER in text: + span["text"] = ocr_segment(page, text, bbox, ocr_engine) + span["ocr"] = True + ocr_count += 1 + elif block["type"] == "table": + for row in block["rows"]: # we also need to iterate over header + text_cells = row["text"] + bbox_cells = row["cells"] + if len(text_cells) > len(bbox_cells): + logging.warning(f"Skipping OCR correction. There is more text cells ({len(text_cells)}) than bbox cells ({len(bbox_cells)}).") + continue + if len(text_cells) < len(bbox_cells): + logging.warning(f"There is less text cells ({len(text_cells)}) than bbox cells ({len(bbox_cells)}).") + text_cells.extend([None] * (len(bbox_cells) - len(text_cells))) + for i in range(len(bbox_cells)): + text, bbox = text_cells[i], bbox_cells[i] + if text is None or INVALID_PYMUPDF_CHARACTER in text: + text_cells[i] = ocr_segment(page, text, bbox, ocr_engine) + row["ocr"] = True + ocr_count += 1 + + return ocr_count + +def ocr_pdf_file(pdf_path: Path) -> str: + """ + OCR a PDF file and return its text contents, uses `pdftoppm` and `tesseract`. + + :param pdf_path: The PDF file to OCR. + :return: The text contents. + """ + with TemporaryDirectory() as tmpdir: + tmppath = Path(tmpdir) + ppm = subprocess.run( + ["pdftoppm", pdf_path, tmppath / "image"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) + if ppm.returncode != 0: + raise ValueError(f"pdftoppm failed: {ppm.returncode}") + for ppm_path in map(Path, glob.glob(str(tmppath / "image*.ppm"))): + base = ppm_path.with_suffix("") + tes = subprocess.run( + ["tesseract", "-l", "eng+deu+fra", ppm_path, base], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) + if tes.returncode != 0: + raise ValueError(f"tesseract failed: {tes.returncode}") + + contents = "" + + txt_paths = [x for x in tmppath.iterdir() if x.is_file() and "image-" in x.stem and x.suffix == ".txt"] + txt_paths = sorted(txt_paths, key=lambda txt_path: int(txt_path.stem.split("-")[1])) + + for txt_path in txt_paths: + with txt_path.open("r", encoding="utf-8") as f: + contents += f.read() + return contents diff --git a/src/sec_certs/utils/pdf.py b/src/sec_certs/utils/pdf.py index 7736b5d0..d94398d3 100644 --- a/src/sec_certs/utils/pdf.py +++ b/src/sec_certs/utils/pdf.py @@ -1,18 +1,13 @@ from __future__ import annotations -import glob import logging -import subprocess from datetime import datetime, timedelta, timezone from functools import reduce from pathlib import Path -from tempfile import TemporaryDirectory from typing import Any -import pdftotext +import fitz # PyMuPDF import pikepdf -import pytesseract -from PIL import Image from sec_certs import constants from sec_certs.constants import ( @@ -22,10 +17,15 @@ GARBAGE_LINES_THRESHOLD, GARBAGE_SIZE_THRESHOLD, ) +from sec_certs.utils.ocr import OCREngineBase, build_ocr_engine, ocr_pdf_file, ocr_segments_with_garbage_text logger = logging.getLogger(__name__) logging.getLogger("pypdf").setLevel(logging.ERROR) +PYMUPDF_TYPES = { + 0: "text", + 1: "figure" +} def repair_pdf(file: Path) -> None: """ @@ -37,42 +37,117 @@ def repair_pdf(file: Path) -> None: """ pdf = pikepdf.Pdf.open(file, allow_overwriting_input=True) pdf.save(file) - - -def ocr_pdf_file(pdf_path: Path) -> str: + +def extract_texts_and_figures(pdf_page: fitz.Page) -> dict[str, Any]: """ - OCR a PDF file and return its text contents, uses `pdftoppm` and `tesseract`. - - :param pdf_path: The PDF file to OCR. - :return: The text contents. + Extract text and figures from a given PDF page. + + Args: + pdf_page: The page from which to extract the content. + + Returns: + A dictionary containing extracted texts and figures. + """ + page_dict = pdf_page.get_text("dict", sort=True) + for block in page_dict.get("blocks", []): + block["type"] = PYMUPDF_TYPES[block["type"]] + if block["type"] == "figure": + block.pop("image") # this is too big and useless in byte format + return page_dict + +def extract_tables_from_page(pdf_page: fitz.Page, logging_metadata: dict[str, str]) -> list[dict[str, Any]]: """ - with TemporaryDirectory() as tmpdir: - tmppath = Path(tmpdir) - ppm = subprocess.run( - ["pdftoppm", pdf_path, tmppath / "image"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL - ) - if ppm.returncode != 0: - raise ValueError(f"pdftoppm failed: {ppm.returncode}") - - for ppm_path in map(Path, glob.glob(str(tmppath / "image*.ppm"))): - base = ppm_path.with_suffix("") - content = pytesseract.image_to_string(Image.open(ppm_path), lang="eng+deu+fra") - - if content: - with Path(base.with_suffix(".txt")).open("w") as file: - file.write(content) - else: - raise ValueError(f"OCR failed for document {ppm_path}. Check document manually") + Extract tables from a given PDF page. + + Args: + pdf_page: The page from which to extract tables. + + Returns: + A list of dictionaries, each representing a table. + """ + + if not hasattr(fitz.Page, "find_tables"): + raise Exception("This PyMuPDF version does not support the table feature") + + tables = None + try: + tables = pdf_page.find_tables() + except Exception as e: # can fail when table is detected but it is actually empty + logging.error(f"Extract tables for {logging_metadata}: {e}") + return [] + out_tables = [] + for table in tables: + rows_text = table.extract() + bbox = table.bbox + rows = table.rows + + out_table = table.__dict__.copy() + # remove useless + out_table.pop("page") + out_table.pop("cells") + # add header while renaming strings to text + out_table["header"] = out_table["header"].__dict__.copy() + out_table["header"]["text"] = out_table["header"].pop("names") + + out_table["bbox"] = bbox + out_table["rows"] = [ + {"text": rows_text[i], **rows[i].__dict__} + for i in + range(len(rows)) + ] + out_table["type"] = "table" + out_table["df"] = table.to_pandas() + + out_tables.append(out_table) + + logging.debug(f"Extracted {len(out_tables)} tables from page.") + return out_tables + + +def extract_from_page(pdf_page: fitz.Page, extract_tables: bool, logging_metadata: dict[str, str]) -> dict[str, Any]: + """ + Extract all relevant information (text, figures, tables) from a given PDF page. + + Args: + pdf_page: The page from which to extract content. + + Returns: + A dictionary containing the extracted content. + """ + texts_and_figures = extract_texts_and_figures(pdf_page) + if extract_tables: + tables = extract_tables_from_page(pdf_page, logging_metadata) + texts_and_figures["blocks"].extend(tables) + # sort just in case + texts_and_figures["blocks"] = sorted( + texts_and_figures["blocks"], + # bbox is x0, y0, x1, y1, we sort by y1 and x0 as in PyMuPDF + key=lambda block: (block["bbox"][3], block["bbox"][0]) + ) + return texts_and_figures - contents = "" - txt_paths = [x for x in tmppath.iterdir() if x.is_file() and "image-" in x.stem and x.suffix == ".txt"] - txt_paths = sorted(txt_paths, key=lambda txt_path: int(txt_path.stem.split("-")[1])) +def segment_pdf(pdf, ocr_engine: OCREngineBase, extract_tables: bool, logging_metadata: dict[str, str]) -> list[dict[str, Any]]: + """ + Segment a PDF into its constituent parts (texts, tables, figures). + + Args: + pdf: The PDF document. + ocr_engine: The OCR engine to use for text extraction. + + Returns: + A list of dictionaries, each representing content from a page. + """ + pages = [] + ocr_count = 0 + for i, page in enumerate(pdf): + page_content = extract_from_page(page, extract_tables, {"page_index": str(i), **logging_metadata}) + ocr_count += ocr_segments_with_garbage_text(page, page_content, ocr_engine) + pages.append(page_content) + if ocr_count > 0: + logging.debug(f"Used OCR for {logging_metadata} in {ocr_count} cases") + return pages - for txt_path in txt_paths: - with txt_path.open("r", encoding="utf-8") as f: - contents += f.read() - return contents def convert_pdf_file(pdf_path: Path, txt_path: Path) -> tuple[bool, bool]: @@ -84,16 +159,67 @@ def convert_pdf_file(pdf_path: Path, txt_path: Path) -> tuple[bool, bool]: :return: A tuple of two results, whether OCR was done and what the complete result was (OK/NOK). """ + + def segmented_pdf_to_text(segmented_pdf: list[dict[str, Any]]) -> str: + + pdf_page_texts = [] + for pdf_page in segmented_pdf: + block_texts = [] + for block in pdf_page["blocks"]: + # skip figure + if block["type"] == "figure": + continue + # deal with text which is composed of lines composed of spans + if block["type"] == "text": + lines = [] + for line in block["lines"]: + spans = [] + for span in line["spans"]: + spans.append(span["text"].strip()) + line = " ".join(spans) + if len(line.strip()) > 0: + lines.append(line) + block_text = "\n".join(lines) # TODO maybe change to " ", depends how we wanna view it + if len(block_text.strip()) > 0: + block_texts.append(block_text) # lines are separated by "\n" + # deal with table which has header and rows + elif block["type"] == "table": + row_texts = [] + for row in [block["header"]] + block["rows"]: # iterate both header and rows + row_text = "\t".join( + [ + cell_text.strip() if cell_text is not None else "" + for cell_text + in row["text"] + ] + ) + row_texts.append(row_text) + block_texts.append("\n".join(row_texts)) + + pdf_page_texts.append("\n\n".join(block_texts)) # free line between blocks + + return "\n\n".join(pdf_page_texts) # create free line between pages + txt = None ok = False ocr = False - try: - with pdf_path.open("rb") as pdf_handle: - pdf = pdftotext.PDF(pdf_handle, "", True) # No password, Raw=True - txt = "".join(pdf) - except Exception as e: - logger.error(f"Error when converting pdf->txt: {e}") + # TODO move these things outside the function... + ocr_engine = build_ocr_engine("TesseractOCR") + extract_tables = False # SET THIS TO TRUE TO EXTRACT TABLES + + # parse structure of the document + try: + doc = fitz.open(pdf_path) + segmented_doc = segment_pdf(doc, ocr_engine, extract_tables, {}) # last argument is logging metadata, empty in this PoC + doc.close() + if not doc.is_closed: + logging.warning("There was issue closing the doc.") + txt = segmented_pdf_to_text(segmented_doc) + except Exception: + logger.error("Error when parsing pdf using PyMuPDF") + + # TODO this check should be revisited (changed or fully removed) as now OCR is done inside `segment_pdf` if txt is None or text_is_garbage(txt): logger.warning(f"Detected garbage during conversion of {pdf_path}") ocr = True