Skip to content

Commit

Permalink
- Classify/Split-Extract Document AI Processing Pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
evekhm committed Jul 13, 2024
1 parent 8a0d652 commit 9b9a44b
Show file tree
Hide file tree
Showing 34 changed files with 2,489 additions and 0 deletions.
3 changes: 3 additions & 0 deletions classify-split-extract-workflow/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/classify-extract.iml
/.env
/classify-job/config/config.json
346 changes: 346 additions & 0 deletions classify-split-extract-workflow/README.md

Large diffs are not rendered by default.

Empty file.
236 changes: 236 additions & 0 deletions classify-split-extract-workflow/classify-extract.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START workflows_cloud_run_jobs_payload]
main:
params: [event]
steps:
- init:
assign:
- results : {} # result from each iteration keyed by table name-
- exec_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- event_bucket: ${event.bucket}
- target_bucket: ${sys.get_env("CLASSIFY_INPUT_BUCKET")}
- output_bucket: ${sys.get_env("CLASSIFY_OUTPUT_BUCKET")}
- input_file: ${event.data.name}
- job_name: ${sys.get_env("CLASSIFY_JOB_NAME")}
- auto_extract_str: ${sys.get_env("AUTO_EXTRACT", "true")}
- auto_extract: ${auto_extract_str == "true"}
- data_synch_str: ${sys.get_env("DATA_SYNCH")}
- data_synch: ${data_synch_str == "true"}
- job_location: ${sys.get_env("REGION")}
- database_root: ${"projects/" + project_id + "/databases/(default)/documents/classify/"}
- file_name: ${text.substring(input_file, -14, len(input_file))}
- splitter_output_dir: ${sys.get_env("SPLITTER_OUTPUT_DIR", "splitter_output")}
- regex_pattern: ${"(.*/)" + splitter_output_dir + "(/|$)"}
- is_splitter_output: ${text.match_regex(input_file, regex_pattern)}

- log_event:
call: sys.log
args:
data: ${"event_bucket=" + event_bucket + ", input_file=" + input_file + ", target_bucket=" + target_bucket + ", data_sync=" + data_synch + ", file_name=" + file_name + ",is_splitter_output=" + string(is_splitter_output)}
- check_input_file:
switch:
- condition: ${event_bucket == target_bucket and (file_name == "START_PIPELINE" or (data_synch and not is_splitter_output))}
next: create_callback
- condition: true
steps:
- log_no_execution:
call: sys.log
args:
text: Pipeline not triggered
severity: INFO
- return_result_no_run:
assign:
- results["no_run"]: ${"No pipeline execution because the following conditions is not met [(1) and ((2) or (3))]. (1) event_bucket==target_bucket is [" + string(event_bucket == target_bucket) + "], (2) file_name==START_PIPELINE is [" + string(file_name == "START_PIPELINE") + "], (3) data_synch = [" + string(data_synch) + "] and not is_splitter_output = [" + string(not is_splitter_output) + "] "}
- complete_no_job_triggered:
next: return_results
- create_callback:
call: events.create_callback_endpoint
args:
http_callback_method: "POST"
result: callback_details
- log_callback_details:
call: sys.log
args:
text: ${callback_details}
- run_classify_job:
call: googleapis.run.v1.namespaces.jobs.run
args:
name: ${"namespaces/" + project_id + "/jobs/" + job_name}
location: ${job_location}
body:
overrides:
containerOverrides:
env:
- name: CLASSIFY_INPUT_BUCKET
value: ${target_bucket}
- name: INPUT_FILE
value: ${input_file}
- name: CLASSIFY_OUTPUT_BUCKET
value: ${output_bucket}
- name: CALL_BACK_URL
value: ${callback_details.url}
result: job_execution
- print_callback_details:
call: sys.log
args:
severity: "INFO"
text: ${"Listening for callbacks on " + callback_details.url}
- await_callback:
call: events.await_callback
args:
callback: ${callback_details}
timeout: 3600
result: callback_request
- log_callback_received:
call: sys.log
args:
severity: "INFO"
text: ${"Received " + json.encode_to_string(callback_request.http_request)}
- assign_extract:
assign:
- success: ${callback_request.http_request.body.success}
- bucket: ${callback_request.http_request.body.bucket}
- object: ${callback_request.http_request.body.object}
- check_callback_result:
switch:
- condition: ${not success}
raise: "Failed Classification Job Execution"
- condition: ${auto_extract}
next: get_classify_output
- condition: true
steps:
- return_result_no_success:
assign:
- results["no_extraction"]: ${"No pipeline execution because auto_extract = [" + string(auto_extract) + "] "}
- complete_no_success_triggered:
next: return_results

- get_classify_output:
call: googleapis.storage.v1.objects.get
args:
bucket: ${bucket}
object: ${object}
alt: media
result: groups
- log_execution:
call: sys.log
args:
data: ${groups}
- process_classification_results:
for:
value: group
in: ${groups}
steps:
- logTable:
call: sys.log
args:
text: ${"Running query for object_table " + group.object_table_name + " and model " + group.model_name + " and out_table " + group.out_table_name}
severity: INFO
- extract_table_components:
assign:
- tableComponents: ${text.split(group.out_table_name, ".")} # Split the string using "." as the delimiter
- bq_projectId: ${tableComponents[0]}
- datasetId: ${tableComponents[1]}
- tableId: ${tableComponents[2]}
- check_table_exists:
try:
steps:
- get_table_info:
call: googleapis.bigquery.v2.tables.get
args:
projectId: ${bq_projectId}
datasetId: ${datasetId}
tableId: ${tableId}
result: table_info
- assign_query:
switch:
- condition: ${table_info != null}
assign:
- query_string: ${"INSERT INTO `" + group.out_table_name + "` SELECT * FROM ML.PROCESS_DOCUMENT(MODEL `" + group.model_name + "`, TABLE `" + group.object_table_name + "`); SELECT TO_JSON_STRING(t) AS json_row FROM `" + group.out_table_name + "` t;"}
- condition: true
steps:
- return_result_empty_insert:
assign:
- results[group.out_table_name]: No query execution Unknown error
- complete_no_job_insert:
next: end_job
except:
as: e
steps:
- log_check_table_exists_error:
call: sys.log
args:
text: ${"Received " + json.encode_to_string(e)}
severity: INFO
- create_table:
switch:
- condition: ${e.code == 404}
assign:
- query_string: ${"CREATE TABLE `" + group.out_table_name + "` AS SELECT * FROM ML.PROCESS_DOCUMENT(MODEL `" + group.model_name + "`, TABLE `" + group.object_table_name + "`); SELECT TO_JSON_STRING(t) AS json_row FROM `" + group.out_table_name + "` t;"}
- condition: true
steps:
- return_result_empty:
assign:
- results[group.out_table_name]: No query execution
- complete_no_job_create:
next: end_job
- log_query:
call: sys.log
args:
text: ${"Running query " + query_string}
severity: INFO
- run_query:
try:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
body:
useLegacySql: false
useQueryCache: false
timeoutMs: 30000
query: ${query_string}
result: queryResult
except:
as: e
steps:
- log_error_on_create:
call: sys.log
args:
severity: "ERROR"
text: ${"Received error " + e.message}
- raise_error:
raise: ${"Failed Query Execution with message " + e.message}
- log_create_query_result:
call: sys.log
args:
text: ${"Result of query " + json.encode_to_string(queryResult)}
severity: INFO
- return_insert_result:
assign:
- results[group.out_table_name]: {}
- results[group.out_table_name].data: ${queryResult.rows[0].f[0].v}
- end_job:
call: sys.log
args:
text: ${"Job completed for object_table " + group.object_table_name + " and model " + group.model_name + " and out_table " + group.out_table_name}
severity: INFO
- return_results:
return: ${results}



# [END workflows_cloud_run_jobs_payload]
14 changes: 14 additions & 0 deletions classify-split-extract-workflow/classify-job/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# https://hub.docker.com/_/python
FROM python:3.10-buster

ENV PYTHONUNBUFFERED True

# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./

# Install production dependencies.
RUN pip install -r requirements.txt

CMD ["/usr/local/bin/python3", "classify.py"]
5 changes: 5 additions & 0 deletions classify-split-extract-workflow/classify-job/Procfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Buildpacks require a web process to be defined
# but this process will not be used.
web: echo "no web"

python: python
108 changes: 108 additions & 0 deletions classify-split-extract-workflow/classify-job/bq_mlops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python3

"""
Copyright 2024 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from typing import List
from google.api_core.exceptions import NotFound
from google.cloud import bigquery
from google.cloud.documentai_v1 import Processor

from config import (
PROJECT_ID, BQ_PROJECT_ID, BQ_REGION, BQ_GCS_CONNECTION_NAME,
BQ_DATASET_ID_MLOPS, BQ_OBJECT_TABLE_RETENTION_DAYS
)
from logging_handler import Logger
from utils import get_utc_timestamp

# BigQuery client
bq = bigquery.Client(project=PROJECT_ID)
logger = Logger.get_logger(__file__)


def object_table_create(
f_uris: List[str],
document_type: str,
table_suffix: str = get_utc_timestamp(),
retention_days: int = BQ_OBJECT_TABLE_RETENTION_DAYS
) -> str:
"""
Creates an external table in BigQuery to store document URIs.
Args:
f_uris (List[str]): List of file URIs.
document_type (str): Type of the document.
table_suffix (str, optional): Suffix for the table name. Defaults to current UTC timestamp.
retention_days (int, optional): Number of days before the table expires. Defaults to BQ_OBJECT_TABLE_RETENTION_DAYS.
Returns:
str: The name of the created BigQuery table.
"""
uris = "', '".join(f_uris)
object_table_name = f"{BQ_PROJECT_ID}.{BQ_DATASET_ID_MLOPS}.{document_type.upper()}_DOCUMENTS_{table_suffix}"
query = f"""
CREATE OR REPLACE EXTERNAL TABLE `{object_table_name}`
WITH CONNECTION `{BQ_PROJECT_ID}.{BQ_REGION}.{BQ_GCS_CONNECTION_NAME}`
OPTIONS(
object_metadata = 'SIMPLE',
metadata_cache_mode = 'AUTOMATIC',
uris = ['{uris}'],
max_staleness = INTERVAL 1 DAY,
expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL {retention_days} DAY)
)
"""
job = bq.query(query=query)
job.result()
logger.info(f"Created external table {object_table_name}")
return object_table_name


def table_create(table_id: str) -> None:
"""
Checks if a BigQuery table exists, and if not, creates it.
Args:
table_id (str): The BigQuery table ID.
"""
try:
bq.get_table(table_id) # Make an API request.
logger.info(f"Table {table_id} already exists.")
except NotFound:
table = bigquery.Table(table_id)
table = bq.create_table(table) # Make an API request.
logger.info(f"Created table {table.table_id}.")


def remote_model_create(processor: Processor, model_name: str = None) -> None:
"""
Creates or replaces a remote model in BigQuery using a Document AI processor.
Args:
processor (Processor): Document AI processor.
model_name (str, optional): The name of the model. Defaults to a name based on processor.
"""
if not model_name:
model_name = f"{BQ_PROJECT_ID}.{BQ_DATASET_ID_MLOPS}.{processor.name.upper()}_MODEL"
query = f"""
CREATE OR REPLACE MODEL `{model_name}`
REMOTE WITH CONNECTION `{BQ_PROJECT_ID}.{BQ_REGION}.{BQ_GCS_CONNECTION_NAME}`
OPTIONS(
REMOTE_SERVICE_TYPE = 'cloud_ai_document_v1',
DOCUMENT_PROCESSOR = "{processor.default_processor_version}"
)
"""
job = bq.query(query=query)
job.result()
logger.info(f"Created or replaced remote model {model_name}")
Loading

0 comments on commit 9b9a44b

Please sign in to comment.