Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to modify the main.py to process all .pdf extension file in specified bucket #505

Open
noopur100 opened this issue May 9, 2023 · 1 comment
Assignees
Labels
type: question Further information is requested

Comments

@noopur100
Copy link

noopur100 commented May 9, 2023

Hi Team, Can someone help me to modify the code to process all the document with .pdf extension and process it through docAi and load into BQ:

I tried below but when I run #python main.py, nothing happens and no error appears:

import json
import logging
import os
import traceback
from datetime import datetime
from google.api_core import retry
from google.cloud import bigquery
from google.cloud import storage


# Get ENV variables 
PROCESSOR_PROJECT_ID="dev-amer-agcg-scfc-svc-6b"    # The project id for the processor to be used
PROCESSOR_ID="dev-amer-agcg-scfc-svc-6b"           # The id of the processor to be used  BSC
bucket_name="docai_bucket10"             # The Google Cloud Storage bucket name for the source document. Example: 'split-docs'
FILE_NAME="$file_name"           # The file name for the source document within the bucket. Example: 'my-document-12.pdf'
DESTINATION_PROJECT_ID="dev-amer-agcg-scfc-svc-6b"    # The BigQuery project id for the destination
DESTINATION_DATASET_ID="docai"   # The BigQuery dataset id for the destination
DESTINATION_TABLE_ID="doc_reference"     # The BigQuery table id for the destination
CONTENT_TYPE="application/pdf"          # The MIME type of the document to be processed
PROCESSOR_LOCATION="us"                 # The location of the processor to be used
PARSING_METHODOLOGY="entities"          # entities,form,normalized_values   default="entities"
EXTRACTION_OUTPUT_BUCKET="docai-output"
ASYNC_OUTPUT_FOLDER_GCS_URI="docai-output10"


def bq_load(data, context):
    '''This function is executed whenever a file is added to Cloud Storage Landing bucket'''
    
    file_name = data['name']
    table_name = file_name.split(".")[0] 
    file_extension = str(file_name.split(".")[1])
    
    
    # Check for file extension 
    if(file_extension.lower() == "pdf"):
        message = 'PDF existing, file : \'%s\'' % (file_name)
        logging.info(message)
        _load_data_from_pdf(file_name)
        
    else: 
        message = 'Not supported file format, file : \'%s\'' % (file_name)
        logging.info(message)
        




def _load_data_from_pdf(file_name):

    arg_parser = argparse.ArgumentParser(
        description="Document AI BQ Connector process input arguments",
        allow_abbrev=False,
    )

    doc_options_group = arg_parser.add_argument_group("document arguments")
    doc_options_group.add_argument(
        "--bucket_name",
        type=str,
        help="The Google Cloud Storage bucket name for the "
        "source document. Example: 'split-docs'",
    )
    doc_options_group.add_argument(
        "--file_name",
        type=str,
        help="The file name for the source document within the bucket. Example: "
        "'my-document-12.pdf'",
    )
    doc_options_group.add_argument(
        "--content_type", type=str, help="The MIME type of the document to be processed"
    )
    doc_options_group.add_argument(
        "--processing_type_override",
        choices=["sync", "async"],
        default=None,
        help="If specified, overrides the default async/sync processing logic",
    )
    doc_options_group.add_argument(
        "--processor_project_id",
        type=str,
        help="The project id for the processor to be used",
    )
    doc_options_group.add_argument(
        "--processor_location",
        type=str,
        help="The location of the processor to be used",
    )
    doc_options_group.add_argument(
        "--processor_id", type=str, help="The id of the processor to be used"
    )
    doc_options_group.add_argument("--async_output_folder_gcs_uri", type=str, help="")
    doc_options_group.add_argument(
        "--max_sync_page_count",
        type=int,
        default=5,
        help="The maximum number of pages "
        "that will be supported for "
        "sync processing. If page count "
        "is larger, async processing "
        "will be used.",
    )
    doc_options_group.add_argument(
        "--write_extraction_result",
        action="store_true",
        help="Indicates if raw results of " "extraction should be " "written " "to GCS",
    )
    doc_options_group.add_argument("--extraction_output_bucket", type=str, help="")
    doc_options_group.add_argument(
        "--custom_fields",
        type=json.loads,
        help="Custom field json dictionary to union "
        "with the "
        "resulting dictionary for BigQuery. "
        'Example: \'{"event_id": 1, '
        '"document_type": "my_document"}\'',
    )
    doc_options_group.add_argument(
        "--metadata_mapping_info",
        type=json.loads,
        help="Json object holding information on how to map document "
        "metadata to BigQuery. If column name or value not provided, "
        "defaults will be used if possible. "
        'Example: \'{"file_name": {"bq_column_name": "doc_file_name", '
        '               "metadata_value": "my_file.pdf", '
        '               "skip_map": "false"  }\'',
    )
    doc_options_group.add_argument(
        "--should_async_wait",
        type=bool,
        default=True,
        help="Specifies if the CLI should "
        "block and wait until async "
        "document operation is "
        "completed and process result "
        "into BigQuery",
    )
    doc_options_group.add_argument(
        "--operation_id",
        type=str,
        help="An existing operation id for which to complete " "BQ processing",
    )
    doc_options_group.add_argument(
        "--parsing_methodology",
        choices=["entities", "form", "normalized_values"],
        default="entities",
        help="The parsing methodology",
    )

    timeout_filter_group = doc_options_group.add_mutually_exclusive_group()
    timeout_filter_group.add_argument(
        "--doc_ai_sync_timeout",
        type=int,
        default=900,
        help="The sync processor timeout",
    )
    timeout_filter_group.add_argument(
        "--doc_ai_async_timeout",
        type=int,
        default=900,
        help="The async processor timeout",
    )

    bigquery_options_group = arg_parser.add_argument_group("bigquery arguments")
    bigquery_options_group.add_argument(
        "--destination_project_id", help="The BigQuery project id for the destination"
    )
    bigquery_options_group.add_argument(
        "--destination_dataset_id", help="The BigQuery dataset id for the destination"
    )
    bigquery_options_group.add_argument(
        "--destination_table_id", help="The BigQuery table id for the destination"
    )
    bigquery_options_group.add_argument(
        "--include_raw_entities",
        action="store_true",
        help="If raw_entities field should be outputted to the specified table",
    )
    bigquery_options_group.add_argument(
        "--include_error_fields",
        action="store_true",
        help="If 'has_errors' and 'errors' fields should be outputted to the "
        "specified table",
    )

    arg_parser.add_argument(
        "--retry_count",
        type=int,
        default=1,
        help="The retry attempt count if continue_on_error "
        "is True. Default is 1. If "
        "there are no retries, a final insert attempt "
        "will still be made excluding the parsed "
        "document fields",
    )
    arg_parser.add_argument(
        "--continue_on_error",
        action="store_true",
        help="Indicates if processing should continue " "upon errors",
    )
    arg_parser.add_argument(
        "--log",
        choices=["notset", "debug", "info", "warning", "error", "critical"],
        default="info",
        help="The default logging level.",
    )
    arg_parser.add_argument(
        "-q", "--quiet", action="store_true", help="Suppress message output to console."
    )
    arg_parser.add_argument(
        "-v", "--version", action="version", version="Document AI BQ Connector 1.0.0"
    )

    args = arg_parser.parse_args()

    logging.basicConfig(level=args.log.upper())
    logging.debug(args)

    bucket_name = args.bucket_name
    file_name = args.file_name
    content_type = args.content_type
    processing_type_override = args.processing_type_override
    processor_project_id = args.processor_project_id
    processor_location = args.processor_location
    processor_id = args.processor_id
    async_output_folder_gcs_uri = args.async_output_folder_gcs_uri
    should_async_wait = args.should_async_wait
    should_write_extraction_result = args.write_extraction_result
    extraction_result_output_bucket = args.extraction_output_bucket
    operation_id = args.operation_id
    doc_ai_sync_timeout = args.doc_ai_sync_timeout
    doc_ai_async_timeout = args.doc_ai_async_timeout
    destination_project_id = args.destination_project_id
    destination_dataset_id = args.destination_dataset_id
    destination_table_id = args.destination_table_id
    include_raw_entities = args.include_raw_entities
    include_error_fields = args.include_error_fields
    retry_count = args.retry_count
    continue_on_error = args.continue_on_error
    custom_fields = args.custom_fields
    max_sync_page_count = args.max_sync_page_count
    parsing_methodology = args.parsing_methodology

    my_metadata_mapping_info = None
    if args.metadata_mapping_info is not None:
        my_metadata_mapping_info = {}
        for (
            cur_metadata_name,
            cur_metadata_mapping_info,
        ) in args.metadata_mapping_info.items():
            my_metadata_mapping_info[cur_metadata_name] = BqMetadataMappingInfo(
                bq_column_name=cur_metadata_mapping_info.get("bq_column_name"),
                metadata_value=cur_metadata_mapping_info.get("metadata_value"),
                skip_map=cur_metadata_mapping_info.get("skip_map"),
            )

    connector = DocAIBQConnector(
        bucket_name=bucket_name,
        file_name=file_name,
        content_type=content_type,
        processing_type_override=processing_type_override,
        processor_project_id=processor_project_id,
        processor_location=processor_location,
        processor_id=processor_id,
        async_output_folder_gcs_uri=async_output_folder_gcs_uri,
        should_async_wait=should_async_wait,
        extraction_result_output_bucket=extraction_result_output_bucket,
        should_write_extraction_result=should_write_extraction_result,
        operation_id=operation_id,
        destination_project_id=destination_project_id,
        destination_dataset_id=destination_dataset_id,
        destination_table_id=destination_table_id,
        doc_ai_sync_timeout=doc_ai_sync_timeout,
        doc_ai_async_timeout=doc_ai_async_timeout,
        custom_fields=custom_fields,
        metadata_mapping_info=my_metadata_mapping_info,
        include_raw_entities=include_raw_entities,
        include_error_fields=include_error_fields,
        retry_count=retry_count,
        continue_on_error=continue_on_error,
        max_sync_page_count=max_sync_page_count,
        parsing_methodology=parsing_methodology,
    )

    connector.run()
    print(
        f"Finished processing document - Extracted fields using parsing methodology '{parsing_methodology}' "
        "and saved results to BigQuery"
        ""
    )  # noqa: E127


if __name__ == "___load_data_from_pdf__":
    _load_data_from_pdf()

    print("Job finished.")
@anguillanneuf anguillanneuf added the type: question Further information is requested label Jul 31, 2023
@anguillanneuf
Copy link
Member

friendly ping to both @holtskinner @mservidio

cc: @gericdong or @telpirion for DocAI triaging?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants