Skip to content

Commit

Permalink
Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
evekhm committed Jul 18, 2024
1 parent 00a4485 commit 1634311
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 121 deletions.
6 changes: 4 additions & 2 deletions classify-split-extract-workflow/classify-job/bq_mlops.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ def object_table_create(
f_uris (List[str]): List of file URIs.
document_type (str): Type of the document.
table_suffix (str, optional): Suffix for the table name. Defaults to current UTC timestamp.
retention_days (int, optional): Number of days before the table expires. Defaults to BQ_OBJECT_TABLE_RETENTION_DAYS.
retention_days (int, optional): Number of days before the table expires.
Defaults to BQ_OBJECT_TABLE_RETENTION_DAYS.
Returns:
str: The name of the created BigQuery table.
"""

uris = "', '".join(f_uris)
object_table_name = f"{BQ_PROJECT_ID}.{BQ_DATASET_ID_MLOPS}.{document_type.upper()}_DOCUMENTS_{table_suffix}"
object_table_name = f"{BQ_PROJECT_ID}.{BQ_DATASET_ID_MLOPS}." \
f"{document_type.upper()}_DOCUMENTS_{table_suffix}"
query = f"""
CREATE OR REPLACE EXTERNAL TABLE `{object_table_name}`
WITH CONNECTION `{BQ_PROJECT_ID}.{BQ_REGION}.{BQ_GCS_CONNECTION_NAME}`
Expand Down
201 changes: 138 additions & 63 deletions classify-split-extract-workflow/classify-job/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from google.cloud import run_v2, storage
import json
import os
from typing import Optional, Dict

from typing import Optional, Dict, Any, Union
from google.cloud import run_v2, storage
import google.auth
from logging_handler import Logger

Expand Down Expand Up @@ -52,57 +51,57 @@
PDF_MIME_TYPE = "application/pdf"
MIME_TYPES = [
PDF_MIME_TYPE,
# "image/gif", # TODO: Add/Check support for all these types
# "image/tiff",
# "image/jpeg",
# "image/png",
# "image/bmp",
# "image/webp"
]

OTHER_MIME_TYPES_TO_SUPPORT = [
"image/gif",
"image/tiff",
"image/jpeg",
"image/png",
"image/bmp",
"image/webp"
]

NO_CLASSIFIER_LABEL = "No Classifier"
METADATA_CONFIDENCE = "confidence"
METADATA_DOCUMENT_TYPE = "type"


CONFIG_JSON_DOCUMENT_TYPES_CONFIG = "document_types_config"
FULL_JOB_NAME = run_v2.ExecutionsClient.job_path(PROJECT_ID, REGION, "classify-job")

# Global variables
gcs = None
bucket = None
last_modified_time_of_object = None
config_data = None
BUCKET = None
LAST_MODIFIED_TIME_OF_CONFIG = None
CONFIG_DATA = None

logger.info(
f"Settings used: CLASSIFY_INPUT_BUCKET=gs://{CLASSIFY_INPUT_BUCKET}, INPUT_FILE={INPUT_FILE}, "
f"CLASSIFY_OUTPUT_BUCKET=gs://{CLASSIFY_OUTPUT_BUCKET}, OUTPUT_FILE_JSON={OUTPUT_FILE_JSON}, "
f"OUTPUT_FILE_CSV={OUTPUT_FILE_CSV}, CALL_BACK_URL={CALL_BACK_URL}, "
f"BQ_DATASET_ID_PROCESSED_DOCS={BQ_DATASET_ID_PROCESSED_DOCS}, BQ_DATASET_ID_MLOPS={BQ_DATASET_ID_MLOPS}, "
f"BQ_DATASET_ID_PROCESSED_DOCS={BQ_DATASET_ID_PROCESSED_DOCS}, "
f"BQ_DATASET_ID_MLOPS={BQ_DATASET_ID_MLOPS}, "
f"BQ_PROJECT_ID={BQ_PROJECT_ID}, BQ_GCS_CONNECTION_NAME={BQ_GCS_CONNECTION_NAME}, "
f"DOCAI_OUTPUT_BUCKET={DOCAI_OUTPUT_BUCKET}"
)


def init_bucket(bucket_name: str) -> None:
def init_bucket(bucket_name: str) -> Optional[storage.Bucket]:
"""
Initializes the GCS bucket.
Args:
bucket_name (str): The name of the bucket.
"""
global gcs, bucket
if not gcs:
gcs = storage.Client()

if not bucket:
bucket = gcs.bucket(bucket_name)
if not bucket.exists():
logger.error(f"Bucket does not exist: gs://{bucket_name}")
bucket = None
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
if not bucket.exists():
logger.error(f"Bucket does not exist: gs://{bucket_name}")
return None # Return None to indicate failure
return bucket


def get_config(config_name: Optional[str] = None, element_path: str = None) -> Optional[Dict]:
def get_config(config_name: Optional[str] = None,
element_path: Optional[str] = None) -> Optional[Dict[Any, Any]]:
"""
Retrieves the configuration data.
Expand All @@ -113,81 +112,122 @@ def get_config(config_name: Optional[str] = None, element_path: str = None) -> O
Returns:
Dict: The configuration data.
"""
global config_data, last_modified_time_of_object
if not config_data:
config_data = load_config(CONFIG_BUCKET, CONFIG_FILE_NAME)
assert config_data, "Unable to load configuration data"
global CONFIG_DATA, LAST_MODIFIED_TIME_OF_CONFIG
if not CONFIG_DATA:
CONFIG_DATA = load_config(CONFIG_BUCKET, CONFIG_FILE_NAME)
assert CONFIG_DATA, "Unable to load configuration data"

config_data_loaded = config_data.get(config_name, {}) if config_name else config_data
config_data_loaded = CONFIG_DATA.get(config_name, {}) if config_name else CONFIG_DATA

if element_path:
keys = element_path.split('.')
for key in keys:
if isinstance(config_data_loaded, dict):
config_data_loaded_new = config_data_loaded.get(key)
if config_data_loaded_new is None:
logger.error(f"Key '{key}' not present in the configuration {json.dumps(config_data_loaded, indent=4)}")
logger.error(f"Key '{key}' not present in the "
f"configuration {json.dumps(config_data_loaded, indent=4)}")
return None
config_data_loaded = config_data_loaded_new
else:
logger.error(f"Expected a dictionary at '{key}' but found a {type(config_data_loaded).__name__}")
logger.error(f"Expected a dictionary at '{key}' but found a "
f"{type(config_data_loaded).__name__}")
return None

return config_data_loaded


def get_parser_name_by_doc_type(doc_type: str) -> Optional[str]:
"""Retrieves the parser name based on the document type.
Args:
doc_type (str): The document type.
Returns:
Optional[str]: The parser name, or None if not found.
"""
return get_config(CONFIG_JSON_DOCUMENT_TYPES_CONFIG, f"{doc_type}.parser")


def get_document_types_config() -> Dict:
def get_document_types_config() -> Dict[Any, Any]:
"""
Retrieves the document types configuration.
Returns:
Dict: The document types configuration.
"""
return get_config(CONFIG_JSON_DOCUMENT_TYPES_CONFIG)


def get_parser_by_doc_type(doc_type: str) -> Optional[Dict]:
def get_parser_by_doc_type(doc_type: str) -> Optional[Dict[Any, Any]]:
"""
Retrieves the parser by document type.
Args:
doc_type (str): The document type.
Returns:
Optional[Dict]: The parser configuration.
"""
parser_name = get_parser_name_by_doc_type(doc_type)
if parser_name:
return get_config("parser_config", parser_name)

return None


def load_config(bucket_name: str, filename: str) -> Optional[Dict]:
global bucket, last_modified_time_of_object, config_data
def load_config(bucket_name: str, filename: str) -> Optional[Dict[Any, Any]]:
"""
Loads the configuration data from a GCS bucket or local file.
Args:
bucket_name (str): The GCS bucket name.
filename (str): The configuration file name.
Returns:
Optional[Dict]: The configuration data.
"""
global BUCKET, LAST_MODIFIED_TIME_OF_CONFIG, CONFIG_DATA

if not bucket:
init_bucket(bucket_name)
if not BUCKET:
BUCKET = init_bucket(bucket_name)

if not bucket:
if not BUCKET:
return None

blob = bucket.get_blob(filename)
blob = BUCKET.get_blob(filename)
if not blob:
logger.error(f"Error: file does not exist gs://{bucket_name}/{filename}")
return None

last_modified_time = blob.updated
if last_modified_time == last_modified_time_of_object:
return config_data
if last_modified_time == LAST_MODIFIED_TIME_OF_CONFIG:
return CONFIG_DATA

logger.info(f"Reloading config from: {filename}")
try:
config_data = json.loads(blob.download_as_text(encoding="utf-8"))
last_modified_time_of_object = last_modified_time
CONFIG_DATA = json.loads(blob.download_as_text(encoding="utf-8"))
LAST_MODIFIED_TIME_OF_CONFIG = last_modified_time
except Exception as e:
logger.error(f"Error while obtaining file from GCS gs://{bucket_name}/{filename}: {e}")
logger.warning(f"Using local {filename}")
try:
with open(os.path.join(os.path.dirname(__file__), "config", filename)) as json_file:
config_data = json.load(json_file)
except Exception as e:
logger.error(f"Error loading local config file {filename}: {e}")
with open(os.path.join(os.path.dirname(__file__), "config", filename),
encoding='utf-8') as json_file:
CONFIG_DATA = json.load(json_file)
except (FileNotFoundError, json.JSONDecodeError) as exc:
logger.error(f"Error loading local config file {filename}: {exc}")
return None

return config_data
return CONFIG_DATA


def get_docai_settings() -> Dict:
def get_docai_settings() -> Dict[Any, Any]:
"""
Retrieves the Document AI settings configuration.
Returns:
Dict: The Document AI settings configuration.
"""
return get_config("settings_config")


Expand All @@ -212,41 +252,76 @@ def get_classification_default_class() -> str:
"""

settings = get_docai_settings()
classification_default_class = settings.get("classification_default_class", CLASSIFICATION_UNDETECTABLE)
classification_default_class = settings.get("classification_default_class",
CLASSIFICATION_UNDETECTABLE)
parser = get_parser_by_doc_type(classification_default_class)
if parser:
return classification_default_class

logger.warning(
f"Classification default label {classification_default_class} is not a valid Label or missing a corresponding "
f"Classification default label {classification_default_class}"
f" is not a valid Label or missing a corresponding "
f"parser in parser_config"
)
return CLASSIFICATION_UNDETECTABLE


def get_document_class_by_classifier_label(label_name: str) -> Optional[str]:
"""
Retrieves the document class by classifier label.
Args:
label_name (str): The classifier label name.
Returns:
Optional[str]: The document class.
"""
for k, v in get_document_types_config().items():
if v.get("classifier_label") == label_name:
return k
logger.error(f"classifier_label={label_name} is not assigned to any document in the config")
return None


def get_parser_by_name(parser_name: str) -> Optional[Dict]:
def get_parser_by_name(parser_name: str) -> Optional[Dict[Any, Any]]:
"""
Retrieves the parser configuration by parser name.
Args:
parser_name (str): The parser name.
Returns:
Optional[Dict]: The parser configuration.
"""
return get_config("parser_config", parser_name)


def get_model_name_table_name(document_type: str) -> tuple[Optional[str], Optional[str]]:
parser = get_parser_by_doc_type(document_type)
if parser:
parser_name = get_parser_name_by_doc_type(document_type)
model_name = f"{BQ_PROJECT_ID}.{BQ_DATASET_ID_MLOPS}.{parser.get('model_name', parser_name.upper() + '_MODEL')}"
out_table_name = f"{BQ_PROJECT_ID}.{BQ_DATASET_ID_PROCESSED_DOCS}." \
f"{parser.get('out_table_name', parser_name.upper() + '_DOCUMENTS')}"
def get_model_name_table_name(document_type: str) -> \
Union[tuple[Optional[str], Optional[str]], tuple[None, None]]:
"""
Retrieves the output table name and model name by document type.
Args:
document_type (str): The document type.
Returns:
Union[tuple[Optional[str], Optional[str]], tuple[None, None]]: The output table name and
model name.
"""
parser_name = get_parser_name_by_doc_type(document_type)
if parser_name:
parser = get_parser_by_doc_type(document_type)
model_name = (
f"{BQ_PROJECT_ID}.{BQ_DATASET_ID_MLOPS}."
f"{parser.get('model_name', parser_name.upper() + '_MODEL')}"
)
out_table_name = (
f"{BQ_PROJECT_ID}.{BQ_DATASET_ID_PROCESSED_DOCS}."
f"{parser.get('out_table_name', parser_name.upper() + '_DOCUMENTS')}"
)
else:
logger.warning(f"No parser found for document type {document_type}")
return None, None

logger.info(f"model_name={model_name}, out_table_name={out_table_name}")
return model_name, out_table_name

Loading

0 comments on commit 1634311

Please sign in to comment.